48 lines
1.2 KiB
Go
48 lines
1.2 KiB
Go
|
package main
|
||
|
|
||
|
import (
|
||
|
"encoding/json"
|
||
|
"fmt"
|
||
|
"github.com/twmb/franz-go/pkg/kgo"
|
||
|
"strings"
|
||
|
"time"
|
||
|
)
|
||
|
|
||
|
type TransactionLog struct {
|
||
|
TransactionID int64 `json:"transaction_id"`
|
||
|
TransactionType string `json:"transaction_type"`
|
||
|
CustomerNumber int64 `json:"customer_number"`
|
||
|
TransactionAmount int64 `json:"transaction_amount"`
|
||
|
Timestamp string `json:"timestamp"`
|
||
|
}
|
||
|
|
||
|
func consume(client *kgo.Client, payload []byte) error {
|
||
|
var transactionLog TransactionLog
|
||
|
if err := json.Unmarshal(payload, &transactionLog); err != nil {
|
||
|
return fmt.Errorf("invalid payload: %s", err.Error())
|
||
|
}
|
||
|
|
||
|
var amount int64
|
||
|
switch strings.ToUpper(transactionLog.TransactionType) {
|
||
|
case "TOP_UP":
|
||
|
amount = transactionLog.TransactionAmount
|
||
|
break
|
||
|
case "TRANSFER":
|
||
|
fallthrough
|
||
|
case "WITHDRAW":
|
||
|
fallthrough
|
||
|
case "FEE":
|
||
|
amount = -1 * transactionLog.TransactionAmount
|
||
|
break
|
||
|
default:
|
||
|
return fmt.Errorf("unknown transaction type: %s", transactionLog.TransactionType)
|
||
|
}
|
||
|
|
||
|
return produce(client, BalanceLog{
|
||
|
CustomerNumber: transactionLog.CustomerNumber,
|
||
|
Amount: amount,
|
||
|
Timestamp: time.Now().Format(time.RFC3339),
|
||
|
TransactionID: transactionLog.TransactionID,
|
||
|
})
|
||
|
}
|