Skip to content

Commit f883d30

Browse files
committed
update
1 parent 43030dd commit f883d30

File tree

3 files changed

+165
-34
lines changed

3 files changed

+165
-34
lines changed

comet/server.go

Lines changed: 40 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -10,10 +10,12 @@ import (
1010
"github.com/chenyf/gibbon/utils/safemap"
1111
)
1212

13+
type MsgHandler func(*Client, *Header, []byte)(int)
14+
1315
type Server struct {
1416
exitCh chan bool
1517
waitGroup *sync.WaitGroup
16-
funcMap map[uint8]func(*Client, Header, []byte)(int)
18+
funcMap map[uint8]MsgHandler
1719
acceptTimeout time.Duration
1820
readTimeout time.Duration
1921
writeTimeout time.Duration
@@ -24,7 +26,7 @@ func NewServer() *Server {
2426
return &Server {
2527
exitCh: make(chan bool),
2628
waitGroup: &sync.WaitGroup{},
27-
funcMap: make(map[uint8]func(*Client, Header, []byte)(int)),
29+
funcMap: make(map[uint8]MsgHandler),
2830
acceptTimeout: 60,
2931
readTimeout: 60,
3032
writeTimeout: 60,
@@ -35,32 +37,36 @@ func NewServer() *Server {
3537
type Client struct {
3638
devId string
3739
ctrl chan bool
38-
MsgOut chan *Message
39-
MsgFoo map[uint32]chan *Message
40+
MsgOut chan *Pack
41+
WaitingChannels map[uint32]chan *Message
4042
NextSeqId uint32
4143
LastAlive time.Time
4244
}
4345

44-
func (client *Client)SendMessage(msgType uint8, body []byte, reply chan *Message) (uint32) {
45-
seqid := client.NextSeqId
46+
type Pack struct {
47+
msg *Message
48+
client *Client
49+
reply chan *Message
50+
}
51+
52+
func (client *Client)SendMessage(msgType uint8, body []byte, reply chan *Message) {
4653
header := Header{
4754
Type: msgType,
4855
Ver: 0,
49-
Seq: seqid,
56+
Seq: 0,
5057
Len: uint32(len(body)),
5158
}
5259
msg := &Message{
5360
Header: header,
5461
Data: body,
5562
}
56-
client.NextSeqId += 1
57-
if reply == nil {
58-
client.MsgOut <- msg
59-
return 0
63+
64+
pack := &Pack{
65+
msg: msg,
66+
client: client,
67+
reply: reply,
6068
}
61-
client.MsgFoo[seqid] = reply
62-
client.MsgOut <- msg
63-
return seqid
69+
client.MsgOut <- pack
6470
}
6571

6672
var (
@@ -71,8 +77,8 @@ func InitClient(conn *net.TCPConn, devid string) (*Client) {
7177
client := &Client {
7278
devId: devid,
7379
ctrl: make(chan bool),
74-
MsgOut: make(chan *Message, 100),
75-
MsgFoo: make(map[uint32]chan *Message),
80+
MsgOut: make(chan *Pack, 100),
81+
WaitingChannels: make(map[uint32]chan *Message),
7682
NextSeqId: 1,
7783
LastAlive: time.Now(),
7884
}
@@ -83,16 +89,21 @@ func InitClient(conn *net.TCPConn, devid string) (*Client) {
8389
for {
8490
log.Printf("run loop")
8591
select {
86-
case msg := <-client.MsgOut:
87-
b, _ := msg.Header.Serialize()
92+
case pack := <-client.MsgOut:
93+
seqid := pack.client.NextSeqId
94+
pack.msg.Header.Seq = seqid
95+
b, _ := pack.msg.Header.Serialize()
8896
conn.Write(b)
89-
conn.Write(msg.Data)
90-
log.Printf("send msg ok, (%s)", string(msg.Data))
97+
conn.Write(pack.msg.Data)
98+
log.Printf("send msg ok, (%s)", string(pack.msg.Data))
99+
pack.client.NextSeqId += 1
100+
// add reply channel
101+
if pack.reply != nil {
102+
pack.client.WaitingChannels[seqid] = pack.reply
103+
}
91104
case <-client.ctrl:
92105
log.Printf("leave send routine")
93106
return
94-
//case <-time.After(1*time.Second)
95-
// continue
96107
}
97108
}
98109
}()
@@ -104,14 +115,16 @@ func CloseClient(client *Client) {
104115
DevMap.Delete(client.devId)
105116
}
106117

107-
func handleReply(client *Client, header Header, body []byte) int {
108-
ch, ok := client.MsgFoo[header.Seq]; if ok {
109-
ch <- &Message{Header: header, Data: body}
118+
func handleReply(client *Client, header *Header, body []byte) int {
119+
ch, ok := client.WaitingChannels[header.Seq]; if ok {
120+
//remove waiting channel from map
121+
delete(client.WaitingChannels, header.Seq)
122+
ch <- &Message{Header: *header, Data: body}
110123
}
111124
return 0
112125
}
113126

114-
func handleHeartbeat(client *Client, header Header, body []byte) int {
127+
func handleHeartbeat(client *Client, header *Header, body []byte) int {
115128
client.LastAlive = time.Now()
116129
return 0
117130
}
@@ -289,7 +302,7 @@ func (this *Server)handleConnection(conn *net.TCPConn) {
289302
}
290303

291304
handler, ok := this.funcMap[header.Type]; if ok {
292-
ret := handler(client, header, data)
305+
ret := handler(client, &header, data)
293306
if ret < 0 {
294307
break
295308
}

main.go

Lines changed: 95 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,21 +4,113 @@ import (
44
"flag"
55
"log"
66
"sync"
7+
"io/ioutil"
78
"os"
89
"os/signal"
910
"syscall"
1011
"net/http"
1112
"fmt"
1213
"time"
13-
//"github.com/chenyf/echoserver/engine"
14+
"encoding/json"
1415
"github.com/chenyf/gibbon/comet"
1516
)
1617

18+
type CommandRequest struct {
19+
Uid string `json:"uid"`
20+
Cmd string `json:"cmd"`
21+
}
22+
23+
type CommandResponse struct {
24+
Status int `json:"status"`
25+
Error string `json:"error"`
26+
}
27+
1728
func getStatus(w http.ResponseWriter, r *http.Request) {
1829
size := comet.DevMap.Size()
1930
fmt.Fprintf(w, "total register device: %d\n", size)
2031
}
2132

33+
func postRouterCommand(w http.ResponseWriter, r *http.Request) {
34+
var response CommandResponse
35+
response.Status = 1
36+
if r.Method != "POST" {
37+
response.Error = "must using 'POST' method\n"
38+
b, _ := json.Marshal(response)
39+
fmt.Fprintf(w, string(b))
40+
return
41+
}
42+
r.ParseForm()
43+
rid := r.FormValue("rid")
44+
if rid == "" {
45+
response.Error = "missing 'rid'"
46+
b, _ := json.Marshal(response)
47+
fmt.Fprintf(w, string(b))
48+
return
49+
}
50+
51+
uid := r.FormValue("uid")
52+
if uid == "" {
53+
response.Error = "missing 'uid'"
54+
b, _ := json.Marshal(response)
55+
fmt.Fprintf(w, string(b))
56+
return
57+
}
58+
59+
/*
60+
uid := r.FormValue("uid")
61+
if uid == "" { fmt.Fprintf(w, "missing 'uid'\n"); return; }
62+
tid := r.FormValue("tid")
63+
if tid == "" { fmt.Fprintf(w, "missing 'tid'\n"); return; }
64+
sign := r.FormValue("sign")
65+
if sign == "" { fmt.Fprintf(w, "missing 'sign'\n"); return; }
66+
tm := r.FormValue("tm")
67+
if tm == "" { fmt.Fprintf(w, "missing 'tm'\n"); return; }
68+
pmtt := r.FormValue("pmtt")
69+
if pmtt == "" { fmt.Fprintf(w, "missing 'pmtt'\n"); return; }
70+
*/
71+
72+
if r.Body == nil {
73+
response.Error = "missing POST data"
74+
b, _ := json.Marshal(response)
75+
fmt.Fprintf(w, string(b))
76+
return
77+
}
78+
79+
if !comet.DevMap.Check(rid) {
80+
response.Error = fmt.Sprintf("device (%s) offline", rid)
81+
b, _ := json.Marshal(response)
82+
fmt.Fprintf(w, string(b))
83+
return
84+
}
85+
client := comet.DevMap.Get(rid).(*comet.Client)
86+
87+
body, err := ioutil.ReadAll(r.Body)
88+
r.Body.Close()
89+
if err != nil {
90+
response.Error = "invalid POST body"
91+
b, _ := json.Marshal(response)
92+
fmt.Fprintf(w, string(b))
93+
return
94+
}
95+
96+
cmdRequest := CommandRequest{
97+
Uid: uid,
98+
Cmd: string(body),
99+
}
100+
101+
bCmd, _ := json.Marshal(cmdRequest)
102+
reply := make(chan *comet.Message)
103+
client.SendMessage(comet.MSG_REQUEST, bCmd, reply)
104+
select {
105+
case msg := <-reply:
106+
fmt.Fprintf(w, string(msg.Data))
107+
case <- time.After(10 * time.Second):
108+
response.Error = "recv response timeout"
109+
b, _ := json.Marshal(response)
110+
fmt.Fprintf(w, string(b))
111+
}
112+
}
113+
22114
func getCommand(w http.ResponseWriter, r *http.Request) {
23115

24116
r.ParseForm()
@@ -34,13 +126,11 @@ func getCommand(w http.ResponseWriter, r *http.Request) {
34126
cmd := r.FormValue("cmd")
35127
client := comet.DevMap.Get(devid).(*comet.Client)
36128
reply := make(chan *comet.Message)
37-
seqid := client.SendMessage(comet.MSG_REQUEST, []byte(cmd), reply)
129+
client.SendMessage(comet.MSG_REQUEST, []byte(cmd), reply)
38130
select {
39131
case msg := <-reply:
40-
delete(client.MsgFoo, seqid)
41132
fmt.Fprintf(w, "recv reply (%s)\n", string(msg.Data))
42133
case <- time.After(10 * time.Second):
43-
delete(client.MsgFoo, seqid)
44134
fmt.Fprintf(w, "recv timeout\n")
45135
}
46136
}
@@ -84,6 +174,7 @@ func main() {
84174
}()
85175
waitGroup.Add(1)
86176
go func() {
177+
http.HandleFunc("/router/command", postRouterCommand)
87178
http.HandleFunc("/command", getCommand)
88179
http.HandleFunc("/status", getStatus)
89180
err := http.ListenAndServe("0.0.0.0:9999", nil)

test/agent.go

Lines changed: 30 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,22 @@ import (
77
"os"
88
"fmt"
99
"time"
10+
"encoding/json"
1011
//"strings"
1112
"github.com/chenyf/gibbon/comet"
1213
)
1314

15+
type CommandRequest struct {
16+
Uid string `json:"uid"`
17+
Cmd string `json:"cmd"`
18+
}
19+
20+
type CommandResponse struct {
21+
Status int `json:"status"`
22+
Error string `json:"error"`
23+
Response string `json:"response"`
24+
}
25+
1426
func main() {
1527
if len(os.Args) <= 2 {
1628
log.Printf("Usage: server_addr devid")
@@ -85,16 +97,31 @@ func main() {
8597

8698
log.Printf("recv from server (%s)", string(data))
8799
if header.Type == comet.MSG_REQUEST {
88-
s := fmt.Sprintf("Sir, %s got it!", devid)
100+
101+
var request CommandRequest
102+
if err := json.Unmarshal(data, &request); err != nil {
103+
log.Printf("invalid request, not JSON\n")
104+
return
105+
}
106+
107+
fmt.Printf("UID: (%s)\n", request.Uid)
108+
109+
response := CommandResponse{
110+
Status: 0,
111+
Error : "OK",
112+
Response : fmt.Sprintf("Sir, %s got it!", devid),
113+
}
114+
115+
b, _ := json.Marshal(response)
89116
reply_header := comet.Header{
90117
Type: comet.MSG_REQUEST_REPLY,
91118
Ver: 0,
92119
Seq: header.Seq,
93-
Len: uint32(len(s)),
120+
Len: uint32(len(b)),
94121
}
95122
reply_msg := &comet.Message{
96123
Header: reply_header,
97-
Data: []byte(s),
124+
Data: b,
98125
}
99126
outMsg <- reply_msg
100127
}

0 commit comments

Comments
 (0)