Files
go-watermill-template/cmd/main.go
2025-10-06 15:29:51 +07:00

123 lines
3.1 KiB
Go

package main
import (
"context"
"database/sql"
"fmt"
"log"
"os"
"go-watermill-template/internal/config"
"go-watermill-template/internal/db"
"go-watermill-template/internal/pubsub"
"github.com/ThreeDotsLabs/watermill"
"github.com/ThreeDotsLabs/watermill/message"
)
// -------------------------------------------------------------
// Main Entry
// -------------------------------------------------------------
func main() {
cfg := config.Load()
logger := watermill.NewStdLogger(cfg.LogDebug, cfg.LogDebug)
// Connect ke Postgres
conn, err := db.Connect(cfg.DBHost, cfg.DBPort, cfg.DBUser, cfg.DBPassword, cfg.DBName, cfg.DBSSLMode)
config.Must(err)
defer conn.Close()
args := os.Args
if len(args) > 1 {
switch args[1] {
case "create-tables":
// buat semua topik dari .env
for _, t := range cfg.Topics {
pubsub.CreateTopicTables(conn, t)
}
fmt.Println("✅ All tables created successfully from .env")
return
case "create-table":
if len(args) < 3 {
fmt.Println("❌ Usage: go run cmd/main.go create-table <topic_name>")
return
}
topic := args[2]
pubsub.CreateTopicTables(conn, topic)
fmt.Printf("✅ Table created for topic: %s\n", topic)
return
}
}
// kalau tidak ada argumen, jalankan service utama
runService(cfg, conn, logger)
}
// -------------------------------------------------------------
// Watermill Runtime Service
// -------------------------------------------------------------
func runService(cfg *config.Config, conn *sql.DB, logger watermill.LoggerAdapter) {
log.Println("🚀 Starting Watermill PostgreSQL Service...")
// Inisialisasi Publisher dan Subscriber
publisher, err := pubsub.NewPublisher(conn, logger)
config.Must(err)
subscriber, err := pubsub.NewSubscriber(conn, logger)
config.Must(err)
log.Println("✅ Watermill PostgreSQL Pub/Sub initialized")
// Pastikan semua tabel ada
for _, t := range cfg.Topics {
pubsub.CreateTopicTables(conn, t)
}
// Jalankan subscriber untuk semua topik
for _, topic := range cfg.Topics {
go startDynamicSubscriber(subscriber, topic)
}
// Kirim contoh pesan ke topik pertama
if len(cfg.Topics) > 0 {
sendExampleMessage(publisher, cfg.Topics[0])
}
select {} // block forever
}
// -------------------------------------------------------------
// Contoh Publisher
// -------------------------------------------------------------
func sendExampleMessage(publisher message.Publisher, topic string) {
msg := message.NewMessage(
watermill.NewUUID(),
[]byte(fmt.Sprintf(`{"hello": "from topic %s"}`, topic)),
)
if err := publisher.Publish(topic, msg); err != nil {
log.Printf("❌ Failed to publish to %s: %v", topic, err)
return
}
log.Printf("📤 Published message to %s", topic)
}
// -------------------------------------------------------------
// Contoh Subscriber
// -------------------------------------------------------------
func startDynamicSubscriber(sub message.Subscriber, topic string) {
messages, err := sub.Subscribe(context.Background(), topic)
if err != nil {
panic(err)
}
go func() {
for msg := range messages {
log.Printf("📥 [%s] Received: %s", topic, string(msg.Payload))
msg.Ack()
}
}()
}