-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathdriver-postgres.go
135 lines (115 loc) · 3.49 KB
/
driver-postgres.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
package db
import (
"context"
"database/sql"
"errors"
"fmt"
"log/slog"
"path/filepath"
"time"
"github.com/chadweimer/gomp/models"
"github.com/golang-migrate/migrate/v4"
"github.com/golang-migrate/migrate/v4/database/postgres"
"github.com/jmoiron/sqlx"
"github.com/samber/lo"
// postgres database driver
_ "github.com/lib/pq"
// File source for db migration
_ "github.com/golang-migrate/migrate/v4/source/file"
)
// PostgresDriverName is the name to use for this driver
const PostgresDriverName string = "postgres"
type postgresRecipeDriverAdapter struct{}
func (postgresRecipeDriverAdapter) GetSearchFields(filterFields []models.SearchField, query string) (string, []any) {
fieldStr := ""
fieldArgs := make([]any, 0)
for _, field := range supportedSearchFields {
if lo.Contains(filterFields, field) {
if fieldStr != "" {
fieldStr += " OR "
}
fieldStr += fmt.Sprintf("to_tsvector('english', r.%s) @@ plainto_tsquery('english', ?)", field)
fieldArgs = append(fieldArgs, query)
}
}
return fieldStr, fieldArgs
}
func openPostgres(connectionString string, migrationsTableName string, migrationsForceVersion int) (Driver, error) {
// In docker, on first bring up, the DB takes a little while.
// Let's try a few times to establish connection before giving up.
const maxAttempts = 20
var db *sqlx.DB
var err error
for i := 1; i <= maxAttempts; i++ {
db, err = sqlx.Connect(PostgresDriverName, connectionString)
if err == nil {
break
}
if i == maxAttempts {
return nil, fmt.Errorf("giving up after failing to open database on attempt %d: '%w'", i, err)
}
slog.Error("Failed to open database. Will try again...",
"error", err,
"attempt", i)
time.Sleep(500 * time.Millisecond)
}
// This is meant to mitigate connection drops
db.SetConnMaxLifetime(time.Minute * 15)
if err := migratePostgresDatabase(db, migrationsTableName, migrationsForceVersion); err != nil {
return nil, fmt.Errorf("failed to migrate database: '%w'", err)
}
drv := newSQLDriver(db, postgresRecipeDriverAdapter{})
return drv, nil
}
func migratePostgresDatabase(db *sqlx.DB, migrationsTableName string, migrationsForceVersion int) error {
// Lock the database while we're migrating so that multiple instances
// don't attempt to migrate simultaneously. This requires the same connection
// to be used for both locking and unlocking.
conn, err := db.Conn(context.Background())
if err != nil {
return err
}
defer conn.Close()
// This should block until the lock has been acquired
if err := lockPostgres(conn); err != nil {
return err
}
defer func() {
if unlockErr := unlockPostgres(conn); unlockErr != nil {
panic("Failed to unlock database")
}
}()
driver, err := postgres.WithInstance(db.DB, &postgres.Config{
MigrationsTable: migrationsTableName,
})
if err != nil {
return err
}
migrationPath := "file://" + filepath.Join("db", "migrations", PostgresDriverName)
m, err := migrate.NewWithDatabaseInstance(
migrationPath,
PostgresDriverName,
driver)
if err != nil {
return err
}
if migrationsForceVersion > 0 {
err = m.Force(migrationsForceVersion)
} else {
err = m.Up()
}
if err != nil && !errors.Is(err, migrate.ErrNoChange) {
return err
}
return nil
}
func lockPostgres(conn *sql.Conn) error {
stmt := "SELECT pg_advisory_lock(1)"
_, err := conn.ExecContext(context.Background(), stmt)
return err
}
func unlockPostgres(conn *sql.Conn) error {
stmt := "SELECT pg_advisory_unlock(1)"
_, err := conn.ExecContext(context.Background(), stmt)
return err
}