-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathmigrate.go
145 lines (134 loc) · 3.13 KB
/
migrate.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
/*
Package pgxmgr loads and executes the migration scripts on a connection aquired from a pgx pool.
All migrations are run incrementaly in seperate transaction blocks.
Execution is terminated when ay migration fails and a rollback is performed to the beginning
of this failing migration. Any previous migrations will already be committed.
*/
package pgxmgr
import (
"errors"
"path/filepath"
"strconv"
"strings"
"github.com/inconshreveable/log15"
"github.com/jackc/pgx"
"github.com/usrpro/dotpgx"
)
const createRevTable string = `
CREATE TABLE IF NOT EXISTS schema_version (
major int NOT NULL,
minor int NOT NULL,
fix int NOT NULL,
CONSTRAINT schema_version_pkey PRIMARY KEY ( major , minor, fix)
);
`
const insertRev string = `
INSERT INTO schema_version (major, minor, fix)
VALUES ( $1, $2, $3);
`
const checkRev string = `
SELECT true::bool FROM schema_version
WHERE
major = $1
AND
minor = $2
AND
fix = $3
;
`
// Run loads and executes the migrations.
// The first argument needs to be an instance of a configured pgx connection pool.
// The second argument should be directory where the migration files are loaded from.
// Files with the signature of ##-##-####-<name>.sql will be loaded and executed in order.
// The three number groups stand for major, minor and fix version.
func Run(db *dotpgx.DB, path string) (err error) {
_, err = db.Pool.Exec(createRevTable)
if err != nil {
return
}
files, err := listFiles(path)
if err != nil {
return
}
// TODO: error in map was not empty.
if err = db.ClearMap(); err != nil {
return
}
for _, f := range files {
err = exec(db, f)
if err != nil {
return
}
}
return
}
type file struct {
name string
major int
minor int
fix int
}
func (f *file) skip(tx *dotpgx.Tx) (b bool, err error) {
r := tx.Ptx.QueryRow(checkRev, f.major, f.minor, f.fix)
if err = r.Scan(&b); err != nil {
if err == pgx.ErrNoRows {
return b, nil
}
}
return
}
func listFiles(path string) (files []file, err error) {
// TODO: improve glob pattern
g := strings.Join([]string{path, "/*.sql"}, "")
names, err := filepath.Glob(g)
if err != nil {
return
}
if len(names) == 0 {
return nil, errors.New("No migration files loaded")
}
for _, name := range names {
f := file{name: name}
n := strings.Split(name, "/")
v := strings.Split(n[len(n)-1], "-")
if f.major, err = strconv.Atoi(v[0]); err != nil {
return
}
if f.minor, err = strconv.Atoi(v[1]); err != nil {
return
}
if f.fix, err = strconv.Atoi(v[2]); err != nil {
return
}
files = append(files, f)
}
return
}
func exec(db *dotpgx.DB, f file) (err error) {
log15.Info("Migration exec", "parse", f)
if err = db.ParseFiles(f.name); err != nil {
return
}
defer db.ClearMap()
tx, err := db.Begin()
if err != nil {
return
}
defer tx.Rollback()
var skip bool
if skip, err = f.skip(tx); skip || err != nil {
log15.Info("Migration exec", "skip", f)
return
}
log15.Info("Migration exec", "start", f)
if _, err = tx.Ptx.Exec(insertRev, f.major, f.minor, f.fix); err != nil {
return
}
for _, q := range db.List() {
if _, err = tx.Exec(q); err != nil {
return
}
}
err = tx.Commit()
return
}