41 lines
1.0 KiB
Go
41 lines
1.0 KiB
Go
package pubsub
|
|
|
|
import (
|
|
"database/sql"
|
|
"fmt"
|
|
"log"
|
|
)
|
|
|
|
func CreateTopicTables(db *sql.DB, topic string) {
|
|
topicTable := fmt.Sprintf("watermill_%s", topic)
|
|
offsetTable := fmt.Sprintf("watermill_offsets_%s", topic)
|
|
|
|
_, err := db.Exec(fmt.Sprintf(`
|
|
CREATE TABLE IF NOT EXISTS %s (
|
|
"offset" BIGSERIAL PRIMARY KEY,
|
|
uuid UUID NOT NULL,
|
|
payload BYTEA NOT NULL,
|
|
metadata JSONB NOT NULL DEFAULT '{}'::jsonb,
|
|
created_at TIMESTAMP NOT NULL DEFAULT NOW()
|
|
);
|
|
`, topicTable))
|
|
if err != nil {
|
|
log.Fatalf("❌ Failed to ensure topic table %s: %v", topicTable, err)
|
|
}
|
|
|
|
_, err = db.Exec(fmt.Sprintf(`
|
|
CREATE TABLE IF NOT EXISTS %s (
|
|
consumer_group VARCHAR(255) NOT NULL,
|
|
"offset" BIGINT NOT NULL DEFAULT 0,
|
|
offset_acked BIGINT NOT NULL DEFAULT 0,
|
|
offset_consumed BIGINT NOT NULL DEFAULT 0,
|
|
PRIMARY KEY (consumer_group)
|
|
);
|
|
`, offsetTable))
|
|
if err != nil {
|
|
log.Fatalf("❌ Failed to ensure offsets table %s: %v", offsetTable, err)
|
|
}
|
|
|
|
log.Printf("✅ Watermill tables ensured for topic: %s", topic)
|
|
}
|