package pubsub import ( "database/sql" "fmt" "log" ) func CreateTopicTables(db *sql.DB, topic string) { topicTable := fmt.Sprintf("watermill_%s", topic) offsetTable := fmt.Sprintf("watermill_offsets_%s", topic) _, err := db.Exec(fmt.Sprintf(` CREATE TABLE IF NOT EXISTS %s ( "offset" BIGSERIAL PRIMARY KEY, uuid UUID NOT NULL, payload BYTEA NOT NULL, metadata JSONB NOT NULL DEFAULT '{}'::jsonb, created_at TIMESTAMP NOT NULL DEFAULT NOW() ); `, topicTable)) if err != nil { log.Fatalf("❌ Failed to ensure topic table %s: %v", topicTable, err) } _, err = db.Exec(fmt.Sprintf(` CREATE TABLE IF NOT EXISTS %s ( consumer_group VARCHAR(255) NOT NULL, "offset" BIGINT NOT NULL DEFAULT 0, offset_acked BIGINT NOT NULL DEFAULT 0, offset_consumed BIGINT NOT NULL DEFAULT 0, PRIMARY KEY (consumer_group) ); `, offsetTable)) if err != nil { log.Fatalf("❌ Failed to ensure offsets table %s: %v", offsetTable, err) } log.Printf("✅ Watermill tables ensured for topic: %s", topic) }