123 lines
3.1 KiB
Go
123 lines
3.1 KiB
Go
package main
|
|
|
|
import (
|
|
"context"
|
|
"database/sql"
|
|
"fmt"
|
|
"log"
|
|
"os"
|
|
|
|
"go-watermill-template/internal/config"
|
|
"go-watermill-template/internal/db"
|
|
"go-watermill-template/internal/pubsub"
|
|
|
|
"github.com/ThreeDotsLabs/watermill"
|
|
"github.com/ThreeDotsLabs/watermill/message"
|
|
)
|
|
|
|
// -------------------------------------------------------------
|
|
// Main Entry
|
|
// -------------------------------------------------------------
|
|
func main() {
|
|
cfg := config.Load()
|
|
logger := watermill.NewStdLogger(cfg.LogDebug, cfg.LogDebug)
|
|
|
|
// Connect ke Postgres
|
|
conn, err := db.Connect(cfg.DBHost, cfg.DBPort, cfg.DBUser, cfg.DBPassword, cfg.DBName, cfg.DBSSLMode)
|
|
config.Must(err)
|
|
defer conn.Close()
|
|
|
|
args := os.Args
|
|
if len(args) > 1 {
|
|
switch args[1] {
|
|
case "create-tables":
|
|
// buat semua topik dari .env
|
|
for _, t := range cfg.Topics {
|
|
pubsub.CreateTopicTables(conn, t)
|
|
}
|
|
fmt.Println("✅ All tables created successfully from .env")
|
|
return
|
|
|
|
case "create-table":
|
|
if len(args) < 3 {
|
|
fmt.Println("❌ Usage: go run cmd/main.go create-table <topic_name>")
|
|
return
|
|
}
|
|
topic := args[2]
|
|
pubsub.CreateTopicTables(conn, topic)
|
|
fmt.Printf("✅ Table created for topic: %s\n", topic)
|
|
return
|
|
}
|
|
}
|
|
|
|
// kalau tidak ada argumen, jalankan service utama
|
|
runService(cfg, conn, logger)
|
|
}
|
|
|
|
// -------------------------------------------------------------
|
|
// Watermill Runtime Service
|
|
// -------------------------------------------------------------
|
|
func runService(cfg *config.Config, conn *sql.DB, logger watermill.LoggerAdapter) {
|
|
log.Println("🚀 Starting Watermill PostgreSQL Service...")
|
|
|
|
// Inisialisasi Publisher dan Subscriber
|
|
publisher, err := pubsub.NewPublisher(conn, logger)
|
|
config.Must(err)
|
|
|
|
subscriber, err := pubsub.NewSubscriber(conn, logger)
|
|
config.Must(err)
|
|
|
|
log.Println("✅ Watermill PostgreSQL Pub/Sub initialized")
|
|
|
|
// Pastikan semua tabel ada
|
|
for _, t := range cfg.Topics {
|
|
pubsub.CreateTopicTables(conn, t)
|
|
}
|
|
|
|
// Jalankan subscriber untuk semua topik
|
|
for _, topic := range cfg.Topics {
|
|
go startDynamicSubscriber(subscriber, topic)
|
|
}
|
|
|
|
// Kirim contoh pesan ke topik pertama
|
|
if len(cfg.Topics) > 0 {
|
|
sendExampleMessage(publisher, cfg.Topics[0])
|
|
}
|
|
|
|
select {} // block forever
|
|
}
|
|
|
|
// -------------------------------------------------------------
|
|
// Contoh Publisher
|
|
// -------------------------------------------------------------
|
|
func sendExampleMessage(publisher message.Publisher, topic string) {
|
|
msg := message.NewMessage(
|
|
watermill.NewUUID(),
|
|
[]byte(fmt.Sprintf(`{"hello": "from topic %s"}`, topic)),
|
|
)
|
|
|
|
if err := publisher.Publish(topic, msg); err != nil {
|
|
log.Printf("❌ Failed to publish to %s: %v", topic, err)
|
|
return
|
|
}
|
|
|
|
log.Printf("📤 Published message to %s", topic)
|
|
}
|
|
|
|
// -------------------------------------------------------------
|
|
// Contoh Subscriber
|
|
// -------------------------------------------------------------
|
|
func startDynamicSubscriber(sub message.Subscriber, topic string) {
|
|
messages, err := sub.Subscribe(context.Background(), topic)
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
|
|
go func() {
|
|
for msg := range messages {
|
|
log.Printf("📥 [%s] Received: %s", topic, string(msg.Payload))
|
|
msg.Ack()
|
|
}
|
|
}()
|
|
}
|