transaction-watcher/balance-processor/main.go

70 lines
1.4 KiB
Go

package main
import (
"context"
"errors"
"github.com/twmb/franz-go/pkg/kgo"
"github.com/twmb/franz-go/pkg/kversion"
"log"
"os"
"os/signal"
"strings"
)
func main() {
kafkaHost, ok := os.LookupEnv("KAFKA_HOST")
if !ok {
kafkaHost = "localhost:9092"
}
kafkaOptions := []kgo.Opt{
kgo.MinVersions(kversion.V0_11_0()),
kgo.SeedBrokers(strings.Split(kafkaHost, ",")...),
kgo.ConsumeTopics("transactions"),
}
kafkaClient, err := kgo.NewClient(kafkaOptions...)
if err != nil {
log.Fatalf("initiating kafka client: %s", err.Error())
}
defer kafkaClient.Close()
exitSignal := make(chan os.Signal, 1)
signal.Notify(exitSignal, os.Interrupt)
go func() {
<-exitSignal
kafkaClient.Close()
}()
log.Println("Ready, waiting for messages...")
for {
ctx := context.Background()
fetches := kafkaClient.PollRecords(ctx, 128)
switch {
case fetches.IsClientClosed():
break
case fetches.Err() != nil:
fetches.EachError(func(topic string, partition int32, err error) {
if !errors.Is(err, context.DeadlineExceeded) && !errors.Is(err, context.Canceled) {
log.Printf("Error: topic: %s, partition: %d, message: %v", topic, partition, err)
}
})
continue
}
for _, msg := range fetches.Records() {
err := consume(kafkaClient, msg.Value)
if err != nil {
log.Println(err)
continue
}
log.Printf("Processed: %s", string(msg.Value))
}
}
}