-
Notifications
You must be signed in to change notification settings - Fork 10
/
server.go
157 lines (137 loc) · 3.27 KB
/
server.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
package mc
// Handles all server connections to a particular memcached servers.
import (
"net"
"net/url"
"strings"
"sync"
"time"
)
// server represents a server and contains all connections to that server
type server struct {
address string
scheme string
config *Config
// NOTE: organizing the pool as a chan makes the usage of the containing
// connections treadsafe
pool chan mcConn
isAlive bool
lock sync.Mutex
}
const defaultPort = "11211"
func newServer(address, username, password string, config *Config, newMcConn connGen) *server {
addr := address
scheme := "tcp"
if u, err := url.Parse(address); err == nil {
switch strings.ToLower(u.Scheme) {
case "tcp":
if len(u.Port()) == 0 {
addr = net.JoinHostPort(u.Host, defaultPort)
} else {
addr = u.Host
}
case "unix":
addr = u.Path
scheme = "unix"
case "":
if host, port, err := net.SplitHostPort(address); err == nil {
addr = net.JoinHostPort(host, port)
} else {
addr = net.JoinHostPort(address, defaultPort)
}
}
}
server := &server{
address: addr,
scheme: scheme,
config: config,
pool: make(chan mcConn, config.PoolSize),
isAlive: true,
}
for i := 0; i < config.PoolSize; i++ {
server.pool <- newMcConn(addr, scheme, username, password, config)
}
return server
}
func (s *server) perform(m *msg) error {
var err error
for i := 0; ; {
timeout := time.After(s.config.ConnectionTimeout)
select {
case c := <-s.pool:
// NOTE: this serverConn is no longer available in the pool (equivalent to locking)
if c == nil {
return &Error{StatusUnknownError, "Client is closed (did you call Quit?)", nil}
}
// backup request if a retry might be possible
if i+1 < s.config.Retries {
c.backup(m)
}
err = c.perform(m)
s.pool <- c
if err == nil {
return nil
}
// Return Memcached errors except network errors.
mErr := err.(*Error)
if mErr.Status != StatusNetworkError {
return err
}
// check if retry needed
i++
if i < s.config.Retries {
// restore request since m now contains the failed response
c.restore(m)
time.Sleep(s.config.RetryDelay)
} else {
return err
}
case <-timeout:
// do not retry
return &Error{StatusUnknownError,
"Timed out while waiting for connection from pool. " +
"Maybe increase your pool size?",
nil}
}
}
// return err
}
func (s *server) performStats(m *msg) (McStats, error) {
timeout := time.After(s.config.ConnectionTimeout)
select {
case c := <-s.pool:
// NOTE: this serverConn is no longer available in the pool (equivalent to locking)
if c == nil {
return nil, &Error{StatusUnknownError, "Client is closed (did you call Quit?)", nil}
}
stats, err := c.performStats(m)
s.pool <- c
return stats, err
case <-timeout:
// do not retry
return nil, &Error{StatusUnknownError,
"Timed out while waiting for connection from pool. " +
"Maybe increase your pool size?",
nil}
}
}
func (s *server) quit(m *msg) {
for i := 0; i < s.config.PoolSize; i++ {
c := <-s.pool
if c == nil {
// Do not double quit
return
}
c.quit(m)
}
close(s.pool)
}
func (s *server) changeAlive(alive bool) bool {
s.lock.Lock()
defer s.lock.Unlock()
if s.isAlive != alive {
s.isAlive = alive
return true
}
return false
}