transaction-watcher/balance-processor/main.go

83 lines
1.8 KiB
Go
Raw Normal View History

2023-04-07 08:30:01 +00:00
package main
import (
"context"
"errors"
2023-04-07 11:59:20 +00:00
"github.com/twmb/franz-go/pkg/kadm"
2023-04-07 08:30:01 +00:00
"github.com/twmb/franz-go/pkg/kgo"
"github.com/twmb/franz-go/pkg/kversion"
"log"
"os"
"os/signal"
"strings"
)
func main() {
2023-04-07 11:59:20 +00:00
kafkaHost, ok := os.LookupEnv("KAFKA_ADDRESSES")
2023-04-07 08:30:01 +00:00
if !ok {
kafkaHost = "localhost:9092"
}
kafkaOptions := []kgo.Opt{
kgo.MinVersions(kversion.V0_11_0()),
2023-04-07 11:59:20 +00:00
kgo.SeedBrokers(kafkaHost),
2023-04-07 08:30:01 +00:00
kgo.ConsumeTopics("transactions"),
2023-04-07 11:59:20 +00:00
kgo.AllowAutoTopicCreation(),
kgo.WithLogger(kgo.BasicLogger(os.Stdout, kgo.LogLevelWarn, nil)),
kgo.ClientID("balance-processor"),
2023-04-07 08:30:01 +00:00
}
kafkaClient, err := kgo.NewClient(kafkaOptions...)
if err != nil {
log.Fatalf("initiating kafka client: %s", err.Error())
}
defer kafkaClient.Close()
2023-04-07 11:59:20 +00:00
// Create balance topic
kafkaAdmin := kadm.NewClient(kafkaClient)
_, err = kafkaAdmin.CreateTopic(context.Background(), 1, 1, nil, "balance")
if err != nil && !strings.Contains(err.Error(), "already exists") {
kafkaClient.Close()
log.Fatalf("Creating 'balance' topic: %s", err.Error())
}
2023-04-07 08:30:01 +00:00
exitSignal := make(chan os.Signal, 1)
signal.Notify(exitSignal, os.Interrupt)
go func() {
<-exitSignal
kafkaClient.Close()
}()
log.Println("Ready, waiting for messages...")
for {
ctx := context.Background()
fetches := kafkaClient.PollRecords(ctx, 128)
switch {
case fetches.IsClientClosed():
break
case fetches.Err() != nil:
fetches.EachError(func(topic string, partition int32, err error) {
if !errors.Is(err, context.DeadlineExceeded) && !errors.Is(err, context.Canceled) {
log.Printf("Error: topic: %s, partition: %d, message: %v", topic, partition, err)
}
})
continue
}
for _, msg := range fetches.Records() {
err := consume(kafkaClient, msg.Value)
if err != nil {
log.Println(err)
continue
}
log.Printf("Processed: %s", string(msg.Value))
}
}
}