From ec0011cf640a55942809b1f5f34e253a4d53cc28 Mon Sep 17 00:00:00 2001 From: Reinaldy Rafli Date: Fri, 7 Apr 2023 18:59:20 +0700 Subject: [PATCH] fix: resolve kafka connection problems --- README.md | 1 + balance-processor/go.mod | 6 +++++- balance-processor/go.sum | 4 ++++ balance-processor/main.go | 17 +++++++++++++++-- docker-compose.yml | 4 ++-- 5 files changed, 27 insertions(+), 5 deletions(-) diff --git a/README.md b/README.md index 4cd0a55..b131385 100644 --- a/README.md +++ b/README.md @@ -10,6 +10,7 @@ This project contains a few tasks for you to work with. Create a subdirectory called `watcher` that will do a SQL query periodically to the `transactions` table, and for every new row on that table, the `watcher` will produce a message to `transactions` topic on Redpanda. +You will also need to create the `transactions` topic yourself. The schema of the message is defined on the `kafka-schemas` directory. Search for `transaction.json` file. diff --git a/balance-processor/go.mod b/balance-processor/go.mod index 9c18a2b..9de820f 100644 --- a/balance-processor/go.mod +++ b/balance-processor/go.mod @@ -2,10 +2,14 @@ module balance-processor go 1.20 -require github.com/twmb/franz-go v1.13.2 +require ( + github.com/twmb/franz-go v1.13.2 + github.com/twmb/franz-go/pkg/kadm v1.8.0 +) require ( github.com/klauspost/compress v1.16.3 // indirect github.com/pierrec/lz4/v4 v4.1.17 // indirect github.com/twmb/franz-go/pkg/kmsg v1.4.0 // indirect + golang.org/x/crypto v0.7.0 // indirect ) diff --git a/balance-processor/go.sum b/balance-processor/go.sum index 3c8ce9e..fbaac08 100644 --- a/balance-processor/go.sum +++ b/balance-processor/go.sum @@ -4,5 +4,9 @@ github.com/pierrec/lz4/v4 v4.1.17 h1:kV4Ip+/hUBC+8T6+2EgburRtkE9ef4nbY3f4dFhGjMc github.com/pierrec/lz4/v4 v4.1.17/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= github.com/twmb/franz-go v1.13.2 h1:jIdDoFiq8uP3Zrx6TZZTXpaXrv3bh1w3tV5mn/B+Gw8= github.com/twmb/franz-go v1.13.2/go.mod h1:jm/FtYxmhxDTN0gNSb26XaJY0irdSVcsckLiR5tQNMk= +github.com/twmb/franz-go/pkg/kadm v1.8.0 h1:vvKwZpxYn+VmM32v4mKkecHLKavZW+HcYLRKKxly5ZY= +github.com/twmb/franz-go/pkg/kadm v1.8.0/go.mod h1:qUSM7pxoMCU1UNu5H4USE64ODcVmeG9LS96mysv1nu8= github.com/twmb/franz-go/pkg/kmsg v1.4.0 h1:tbp9hxU6m8qZhQTlpGiaIJOm4BXix5lsuEZ7K00dF0s= github.com/twmb/franz-go/pkg/kmsg v1.4.0/go.mod h1:SxG/xJKhgPu25SamAq0rrucfp7lbzCpEXOC+vH/ELrY= +golang.org/x/crypto v0.7.0 h1:AvwMYaRytfdeVt3u6mLaxYtErKYjxA2OXjJ1HHq6t3A= +golang.org/x/crypto v0.7.0/go.mod h1:pYwdfH91IfpZVANVyUOhSIPZaFoJGxTFbZhFTx+dXZU= diff --git a/balance-processor/main.go b/balance-processor/main.go index 8f76bb6..1be0da9 100644 --- a/balance-processor/main.go +++ b/balance-processor/main.go @@ -3,6 +3,7 @@ package main import ( "context" "errors" + "github.com/twmb/franz-go/pkg/kadm" "github.com/twmb/franz-go/pkg/kgo" "github.com/twmb/franz-go/pkg/kversion" "log" @@ -12,15 +13,18 @@ import ( ) func main() { - kafkaHost, ok := os.LookupEnv("KAFKA_HOST") + kafkaHost, ok := os.LookupEnv("KAFKA_ADDRESSES") if !ok { kafkaHost = "localhost:9092" } kafkaOptions := []kgo.Opt{ kgo.MinVersions(kversion.V0_11_0()), - kgo.SeedBrokers(strings.Split(kafkaHost, ",")...), + kgo.SeedBrokers(kafkaHost), kgo.ConsumeTopics("transactions"), + kgo.AllowAutoTopicCreation(), + kgo.WithLogger(kgo.BasicLogger(os.Stdout, kgo.LogLevelWarn, nil)), + kgo.ClientID("balance-processor"), } kafkaClient, err := kgo.NewClient(kafkaOptions...) @@ -29,6 +33,15 @@ func main() { } defer kafkaClient.Close() + // Create balance topic + kafkaAdmin := kadm.NewClient(kafkaClient) + _, err = kafkaAdmin.CreateTopic(context.Background(), 1, 1, nil, "balance") + if err != nil && !strings.Contains(err.Error(), "already exists") { + kafkaClient.Close() + + log.Fatalf("Creating 'balance' topic: %s", err.Error()) + } + exitSignal := make(chan os.Signal, 1) signal.Notify(exitSignal, os.Interrupt) diff --git a/docker-compose.yml b/docker-compose.yml index 4fb274f..4891b25 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -31,9 +31,9 @@ services: - --node-id 0 - --check=false - --pandaproxy-addr PLAINTEXT://0.0.0.0:28082,OUTSIDE://0.0.0.0:8082 - - --advertise-pandaproxy-addr PLAINTEXT://kafka:8082,OUTSIDE://localhost:8082 + - --advertise-pandaproxy-addr PLAINTEXT://kafka:8082,OUTSIDE://kafka:8082 - --kafka-addr PLAINTEXT://0.0.0.0:29092,OUTSIDE://0.0.0.0:9092 - - --advertise-kafka-addr PLAINTEXT://kafka:29092,OUTSIDE://localhost:9092 + - --advertise-kafka-addr PLAINTEXT://kafka:9092,OUTSIDE://kafka:9092 - --rpc-addr 0.0.0.0:33145 - --advertise-rpc-addr kafka:33145 healthcheck: