Skip to content

Commit 022bd55

Browse files
committed
Merge branch 'develop'
2 parents a32a60c + 57c1503 commit 022bd55

30 files changed

+1276
-1033
lines changed

README.md

Lines changed: 31 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -83,14 +83,15 @@ A schema must have a default rule with only one node assigned.
8383

8484
For hash and range routing you can see the example below.
8585

86-
## admin
86+
## admin commands
8787

8888
Mixer suplies `admin` statement to administrate. The `admin` format is `admin func(arg, ...)` like `select func(arg,...)`. Later we may add admin password for safe use.
8989

9090
Support admin functions now:
9191

92-
+ ```admin upnode(node, serverype, addr)```
93-
+ ```admin downnode(node, servertype)```
92+
- admin upnode(node, serverype, addr);
93+
- admin downnode(node, servertype);
94+
- show proxy config;
9495

9596
## Base Example
9697

@@ -129,12 +130,18 @@ mysql> select id, str from mixer_test_conn;
129130
## Hash Sharding Example
130131

131132
```
132-
hash rule:
133-
- db: mixer
134-
table: mixer_test_shard_hash
135-
key: id
136-
nodes: node2,node3
137-
type: hash
133+
schemas :
134+
-
135+
db : mixer
136+
nodes: [node1, node2, node3]
137+
rules:
138+
default: node1
139+
shard:
140+
-
141+
table: mixer_test_shard_hash
142+
key: id
143+
nodes: [node2, node3]
144+
type: hash
138145
139146
hash algorithm: value % len(nodes)
140147
@@ -190,14 +197,19 @@ Empty set
190197
## Range Sharding Example
191198

192199
```
193-
range rule:
194-
- db: mixer
195-
table: mixer_test_shard_range
196-
key: id
197-
range:
198-
nodes: node2, node3
199-
range: -10000-
200-
type: range
200+
schemas :
201+
-
202+
db : mixer
203+
nodes: [node1, node2, node3]
204+
rules:
205+
default: node1
206+
shard:
207+
-
208+
table: mixer_test_shard_range
209+
key: id
210+
nodes: [node2, node3]
211+
range: -10000-
212+
type: range
201213
202214
range algorithm: node key start <= value < node key stop
203215
@@ -316,7 +328,7 @@ proxy> select str from mixer_test_shard_range where id >=0 and id < 100000;
316328
+ Mixer uses 2PC to handle write operations for multi nodes. You take the risk that data becomes corrupted if some nodes commit ok but others error. In that case, you must try to recover your data by yourself.
317329
+ You must design your routing rule and write SQL carefully. (e.g. if your where condition contains no routing key, mixer will route the SQL to all nodes, maybe).
318330

319-
## Why not vitess?
331+
## Why not [vitess](https://github.com/youtube/vitess)?
320332

321333
Vitess is very awesome, and I use some of its code like sqlparser. Why not use vitess directly? Maybe below:
322334

@@ -330,4 +342,4 @@ Mixer now is still in development and should not be used in production.
330342

331343
## Feedback
332344

333-
345+

client/conn.go

Lines changed: 42 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,8 @@ type Conn struct {
3535
salt []byte
3636

3737
lastPing int64
38+
39+
pkgErr error
3840
}
3941

4042
func (c *Conn) Connect(addr string, user string, password string, db string) error {
@@ -109,11 +111,15 @@ func (c *Conn) Close() error {
109111
}
110112

111113
func (c *Conn) readPacket() ([]byte, error) {
112-
return c.pkg.ReadPacket()
114+
d, err := c.pkg.ReadPacket()
115+
c.pkgErr = err
116+
return d, err
113117
}
114118

115119
func (c *Conn) writePacket(data []byte) error {
116-
return c.pkg.WritePacket(data)
120+
err := c.pkg.WritePacket(data)
121+
c.pkgErr = err
122+
return err
117123
}
118124

119125
func (c *Conn) readInitialHandshake() error {
@@ -400,6 +406,40 @@ func (c *Conn) SetCharset(charset string) error {
400406
}
401407
}
402408

409+
func (c *Conn) FieldList(table string, wildcard string) ([]*Field, error) {
410+
if err := c.writeCommandStrStr(COM_FIELD_LIST, table, wildcard); err != nil {
411+
return nil, err
412+
}
413+
414+
data, err := c.readPacket()
415+
if err != nil {
416+
return nil, err
417+
}
418+
419+
fs := make([]*Field, 0, 4)
420+
var f *Field
421+
if data[0] == ERR_HEADER {
422+
return nil, c.handleErrorPacket(data)
423+
} else {
424+
for {
425+
if data, err = c.readPacket(); err != nil {
426+
return nil, err
427+
}
428+
429+
// EOF Packet
430+
if c.isEOFPacket(data) {
431+
return fs, nil
432+
}
433+
434+
if f, err = FieldData(data).Parse(); err != nil {
435+
return nil, err
436+
}
437+
fs = append(fs, f)
438+
}
439+
}
440+
return nil, fmt.Errorf("field list error")
441+
}
442+
403443
func (c *Conn) exec(query string) (*Result, error) {
404444
if err := c.writeCommandStr(COM_QUERY, query); err != nil {
405445
return nil, err

client/db.go

Lines changed: 47 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -2,20 +2,24 @@ package client
22

33
import (
44
"container/list"
5+
"fmt"
56
. "github.com/siddontang/mixer/mysql"
67
"sync"
8+
"sync/atomic"
79
)
810

911
type DB struct {
1012
sync.Mutex
1113

12-
addr string
13-
user string
14-
password string
15-
db string
16-
idleConns int
14+
addr string
15+
user string
16+
password string
17+
db string
18+
maxIdleConns int
1719

18-
conns *list.List
20+
idleConns *list.List
21+
22+
connNum int32
1923
}
2024

2125
func Open(addr string, user string, password string, dbName string) (*DB, error) {
@@ -26,7 +30,8 @@ func Open(addr string, user string, password string, dbName string) (*DB, error)
2630
db.password = password
2731
db.db = dbName
2832

29-
db.conns = list.New()
33+
db.idleConns = list.New()
34+
db.connNum = 0
3035

3136
return db, nil
3237
}
@@ -35,14 +40,19 @@ func (db *DB) Addr() string {
3540
return db.addr
3641
}
3742

43+
func (db *DB) String() string {
44+
return fmt.Sprintf("%s:%s@%s/%s?maxIdleConns=%v",
45+
db.user, db.password, db.addr, db.db, db.maxIdleConns)
46+
}
47+
3848
func (db *DB) Close() error {
3949
db.Lock()
4050

4151
for {
42-
if db.conns.Len() > 0 {
43-
v := db.conns.Back()
52+
if db.idleConns.Len() > 0 {
53+
v := db.idleConns.Back()
4454
co := v.Value.(*Conn)
45-
db.conns.Remove(v)
55+
db.idleConns.Remove(v)
4656

4757
co.Close()
4858

@@ -67,8 +77,16 @@ func (db *DB) Ping() error {
6777
return err
6878
}
6979

70-
func (db *DB) SetIdleConns(num int) {
71-
db.idleConns = num
80+
func (db *DB) SetMaxIdleConnNum(num int) {
81+
db.maxIdleConns = num
82+
}
83+
84+
func (db *DB) GetIdleConnNum() int {
85+
return db.idleConns.Len()
86+
}
87+
88+
func (db *DB) GetConnNum() int {
89+
return int(db.connNum)
7290
}
7391

7492
func (db *DB) newConn() (*Conn, error) {
@@ -109,10 +127,10 @@ func (db *DB) tryReuse(co *Conn) error {
109127

110128
func (db *DB) PopConn() (co *Conn, err error) {
111129
db.Lock()
112-
if db.conns.Len() > 0 {
113-
v := db.conns.Front()
130+
if db.idleConns.Len() > 0 {
131+
v := db.idleConns.Front()
114132
co = v.Value.(*Conn)
115-
db.conns.Remove(v)
133+
db.idleConns.Remove(v)
116134
}
117135
db.Unlock()
118136

@@ -126,25 +144,29 @@ func (db *DB) PopConn() (co *Conn, err error) {
126144
co.Close()
127145
}
128146

129-
return db.newConn()
147+
co, err = db.newConn()
148+
if err == nil {
149+
atomic.AddInt32(&db.connNum, 1)
150+
}
151+
return
130152
}
131153

132154
func (db *DB) PushConn(co *Conn, err error) {
133155
var closeConn *Conn = nil
134156

135-
if err == ErrBadConn {
157+
if err != nil {
136158
closeConn = co
137159
} else {
138-
if db.idleConns > 0 {
160+
if db.maxIdleConns > 0 {
139161
db.Lock()
140162

141-
if db.conns.Len() >= db.idleConns {
142-
v := db.conns.Front()
163+
if db.idleConns.Len() >= db.maxIdleConns {
164+
v := db.idleConns.Front()
143165
closeConn = v.Value.(*Conn)
144-
db.conns.Remove(v)
166+
db.idleConns.Remove(v)
145167
}
146168

147-
db.conns.PushBack(co)
169+
db.idleConns.PushBack(co)
148170

149171
db.Unlock()
150172

@@ -155,6 +177,8 @@ func (db *DB) PushConn(co *Conn, err error) {
155177
}
156178

157179
if closeConn != nil {
180+
atomic.AddInt32(&db.connNum, -1)
181+
158182
closeConn.Close()
159183
}
160184
}
@@ -167,7 +191,7 @@ type SqlConn struct {
167191

168192
func (p *SqlConn) Close() {
169193
if p.Conn != nil {
170-
p.db.PushConn(p.Conn, nil)
194+
p.db.PushConn(p.Conn, p.Conn.pkgErr)
171195
p.Conn = nil
172196
}
173197
}

cmd/mixer-proxy/main.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ import (
1212
"syscall"
1313
)
1414

15-
var configFile = flag.String("config", "/etc/mixer.conf", "mixer proxy config file")
15+
var configFile *string = flag.String("config", "/etc/mixer.conf", "mixer proxy config file")
1616
var logLevel *string = flag.String("log-level", "", "log level [debug|info|warn|error], default error")
1717

1818
func main() {
@@ -21,13 +21,13 @@ func main() {
2121
flag.Parse()
2222

2323
if len(*configFile) == 0 {
24-
println("must use a config file")
24+
log.Error("must use a config file")
2525
return
2626
}
2727

2828
cfg, err := config.ParseConfigFile(*configFile)
2929
if err != nil {
30-
println(err.Error())
30+
log.Error(err.Error())
3131
return
3232
}
3333

@@ -40,7 +40,7 @@ func main() {
4040
var svr *proxy.Server
4141
svr, err = proxy.NewServer(cfg)
4242
if err != nil {
43-
println(err.Error())
43+
log.Error(err.Error())
4444
return
4545
}
4646

@@ -52,8 +52,8 @@ func main() {
5252
syscall.SIGQUIT)
5353

5454
go func() {
55-
<-sc
56-
55+
sig := <-sc
56+
log.Info("Got signal [%d] to exit.", sig)
5757
svr.Close()
5858
}()
5959

config/config.go

Lines changed: 14 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -20,17 +20,22 @@ type NodeConfig struct {
2020
}
2121

2222
type SchemaConfig struct {
23-
DB string `yaml:"db"`
24-
Nodes []string `yaml:"nodes"`
23+
DB string `yaml:"db"`
24+
Nodes []string `yaml:"nodes"`
25+
RulesConifg RulesConfig `yaml:"rules"`
2526
}
2627

27-
type RuleConfig struct {
28-
DB string `yaml:"db"`
29-
Table string `yaml:"table"`
30-
Key string `yaml:"key"`
31-
Nodes string `yaml:"nodes"`
32-
Type string `yaml:"type"`
33-
Range string `yaml:"range"`
28+
type RulesConfig struct {
29+
Default string `yaml:"default"`
30+
ShardRule []ShardConfig `yaml:"shard"`
31+
}
32+
33+
type ShardConfig struct {
34+
Table string `yaml:"table"`
35+
Key string `yaml:"key"`
36+
Nodes []string `yaml:"nodes"`
37+
Type string `yaml:"type"`
38+
Range string `yaml:"range"`
3439
}
3540

3641
type Config struct {
@@ -42,8 +47,6 @@ type Config struct {
4247
Nodes []NodeConfig `yaml:"nodes"`
4348

4449
Schemas []SchemaConfig `yaml:"schemas"`
45-
46-
Rules []RuleConfig `yaml:"rules"`
4750
}
4851

4952
func ParseConfigData(data []byte) (*Config, error) {

0 commit comments

Comments
 (0)