Files
2025-10-06 15:29:51 +07:00

41 lines
1.0 KiB
Go

package pubsub
import (
"database/sql"
"fmt"
"log"
)
func CreateTopicTables(db *sql.DB, topic string) {
topicTable := fmt.Sprintf("watermill_%s", topic)
offsetTable := fmt.Sprintf("watermill_offsets_%s", topic)
_, err := db.Exec(fmt.Sprintf(`
CREATE TABLE IF NOT EXISTS %s (
"offset" BIGSERIAL PRIMARY KEY,
uuid UUID NOT NULL,
payload BYTEA NOT NULL,
metadata JSONB NOT NULL DEFAULT '{}'::jsonb,
created_at TIMESTAMP NOT NULL DEFAULT NOW()
);
`, topicTable))
if err != nil {
log.Fatalf("❌ Failed to ensure topic table %s: %v", topicTable, err)
}
_, err = db.Exec(fmt.Sprintf(`
CREATE TABLE IF NOT EXISTS %s (
consumer_group VARCHAR(255) NOT NULL,
"offset" BIGINT NOT NULL DEFAULT 0,
offset_acked BIGINT NOT NULL DEFAULT 0,
offset_consumed BIGINT NOT NULL DEFAULT 0,
PRIMARY KEY (consumer_group)
);
`, offsetTable))
if err != nil {
log.Fatalf("❌ Failed to ensure offsets table %s: %v", offsetTable, err)
}
log.Printf("✅ Watermill tables ensured for topic: %s", topic)
}