transaction-watcher/balance-processor/consume.go

48 lines
1.2 KiB
Go
Raw Permalink Normal View History

2023-04-07 08:30:01 +00:00
package main
import (
"encoding/json"
"fmt"
"github.com/twmb/franz-go/pkg/kgo"
"strings"
"time"
)
type TransactionLog struct {
TransactionID int64 `json:"transaction_id"`
TransactionType string `json:"transaction_type"`
CustomerNumber int64 `json:"customer_number"`
TransactionAmount int64 `json:"transaction_amount"`
Timestamp string `json:"timestamp"`
}
func consume(client *kgo.Client, payload []byte) error {
var transactionLog TransactionLog
if err := json.Unmarshal(payload, &transactionLog); err != nil {
return fmt.Errorf("invalid payload: %s", err.Error())
}
var amount int64
switch strings.ToUpper(transactionLog.TransactionType) {
case "TOP_UP":
amount = transactionLog.TransactionAmount
break
case "TRANSFER":
fallthrough
case "WITHDRAW":
fallthrough
case "FEE":
amount = -1 * transactionLog.TransactionAmount
break
default:
return fmt.Errorf("unknown transaction type: %s", transactionLog.TransactionType)
}
return produce(client, BalanceLog{
CustomerNumber: transactionLog.CustomerNumber,
Amount: amount,
Timestamp: time.Now().Format(time.RFC3339),
TransactionID: transactionLog.TransactionID,
})
}