Skip to content

Commit 8c1e19d

Browse files
authored
Critical client fix #114. Cleanup code
Provides critical client fix where request IDs were incorrectly generated, and could lead to failed response when multiple clients called same procedure. Code cleanup to obsolete the use of `wamp.OptionXXX` functions.
1 parent 0f544f2 commit 8c1e19d

22 files changed

+335
-300
lines changed

aat/auth_test.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ func TestJoinRealmWithCRAuth(t *testing.T) {
4545
}
4646

4747
realmDetails := cli.RealmDetails()
48-
if wamp.OptionString(realmDetails, "authrole") != "user" {
48+
if s, _ := wamp.AsString(realmDetails["authrole"]); s != "user" {
4949
t.Fatal("missing or incorrect authrole")
5050
}
5151

@@ -80,7 +80,7 @@ func TestJoinRealmWithCRCookieAuth(t *testing.T) {
8080
details := cli.RealmDetails()
8181

8282
// Client should not have be authenticated by cookie first time.
83-
if wamp.OptionFlag(details, "authbycookie") {
83+
if ok, _ := wamp.AsBool(details["authbycookie"]); ok {
8484
t.Fatal("authbycookie set incorrectly to true")
8585
}
8686

@@ -93,11 +93,11 @@ func TestJoinRealmWithCRCookieAuth(t *testing.T) {
9393

9494
// If websocket, then should be authenticated by cookie this time.
9595
if cfg.WsCfg.Jar != nil {
96-
if !wamp.OptionFlag(details, "authbycookie") {
96+
if ok, _ := wamp.AsBool(details["authbycookie"]); !ok {
9797
t.Fatal("should have been authenticated by cookie")
9898
}
9999
} else {
100-
if wamp.OptionFlag(details, "authbycookie") {
100+
if ok, _ := wamp.AsBool(details["authbycookie"]); ok {
101101
t.Fatal("authbycookie set incorrectly to true")
102102
}
103103
}
@@ -159,7 +159,7 @@ func TestAuthz(t *testing.T) {
159159
t.Fatal("Call error:", err)
160160
}
161161
dict, _ := wamp.AsDict(result.Arguments[0])
162-
if wamp.OptionString(dict, "foobar") != "" {
162+
if _, ok := dict["foobar"]; ok {
163163
t.Fatal("Should not have special info in session")
164164
}
165165

@@ -183,7 +183,7 @@ func TestAuthz(t *testing.T) {
183183
t.Fatal("Call error:", err)
184184
}
185185
dict, _ = wamp.AsDict(result.Arguments[0])
186-
if wamp.OptionString(dict, "foobar") != "baz" {
186+
if s, _ := wamp.AsString(dict["foobar"]); s != "baz" {
187187
t.Fatal("Missing special info in session")
188188
}
189189

aat/pubsub_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,7 @@ func TestPubSubWildcard(t *testing.T) {
100100
errChan <- errors.New("event missing or bad args")
101101
return
102102
}
103-
origTopic := wamp.OptionURI(details, "topic")
103+
origTopic, _ := wamp.AsURI(details["topic"])
104104
if origTopic != testTopic {
105105
errChan <- errors.New("wrong original topic")
106106
return

aat/registrationmeta_test.go

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -48,14 +48,13 @@ func TestMetaEventRegOnCreateRegOnRegister(t *testing.T) {
4848
errChanC <- errors.New("argument 0 (session) was not wamp.ID")
4949
return
5050
}
51-
onCreateID = wamp.OptionID(dict, "id")
52-
if wamp.OptionURI(dict, "uri") != wamp.URI("some.proc") {
51+
onCreateID, _ = wamp.AsID(dict["id"])
52+
if u, _ := wamp.AsURI(dict["uri"]); u != wamp.URI("some.proc") {
5353
errChanC <- fmt.Errorf(
54-
"on_create had wrong procedure, got '%v' want 'some.proc'",
55-
wamp.OptionURI(dict, "uri"))
54+
"on_create had wrong procedure, got '%v' want 'some.proc'", u)
5655
return
5756
}
58-
if wamp.OptionString(dict, "created") == "" {
57+
if s, _ := wamp.AsString(dict["created"]); s == "" {
5958
errChanC <- errors.New("on_create missing created time")
6059
return
6160
}

aat/rpc_test.go

Lines changed: 75 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package aat
22

33
import (
44
"context"
5+
"sync"
56
"testing"
67
"time"
78

@@ -42,34 +43,95 @@ func TestRPCRegisterAndCall(t *testing.T) {
4243
t.Fatal("Failed to connect client:", err)
4344
}
4445

45-
// Test calling the procedure.
46-
callArgs := wamp.List{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}
47-
ctx := context.Background()
48-
result, err := caller.Call(ctx, procName, nil, callArgs, nil, "")
46+
// Connect second caller session.
47+
caller2, err := connectClient()
4948
if err != nil {
50-
t.Fatal("Failed to call procedure:", err)
49+
t.Fatal("Failed to connect client:", err)
5150
}
52-
sum, ok := wamp.AsInt64(result.Arguments[0])
53-
if !ok {
54-
t.Fatal("Could not convert result to int64")
51+
52+
// Connect third caller session.
53+
caller3, err := connectClient()
54+
if err != nil {
55+
t.Fatal("Failed to connect client:", err)
5556
}
56-
if sum != 55 {
57-
t.Fatal("Wrong result:", sum)
57+
58+
// Test calling the procedure.
59+
callArgs := wamp.List{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}
60+
var result1, result2, result3 *wamp.Result
61+
var err1, err2, err3 error
62+
var ready, allDone sync.WaitGroup
63+
release := make(chan struct{})
64+
ready.Add(3)
65+
allDone.Add(3)
66+
go func() {
67+
defer allDone.Done()
68+
ready.Done()
69+
<-release
70+
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
71+
defer cancel()
72+
result1, err1 = caller.Call(ctx, procName, nil, callArgs, nil, "")
73+
}()
74+
go func() {
75+
defer allDone.Done()
76+
// Call it with caller2.
77+
ready.Done()
78+
<-release
79+
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
80+
defer cancel()
81+
result2, err2 = caller2.Call(ctx, procName, nil, callArgs, nil, "")
82+
}()
83+
go func() {
84+
// Call it with caller3.
85+
defer allDone.Done()
86+
ready.Done()
87+
<-release
88+
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
89+
defer cancel()
90+
result3, err3 = caller3.Call(ctx, procName, nil, callArgs, nil, "")
91+
}()
92+
93+
ready.Wait()
94+
close(release)
95+
allDone.Wait()
96+
97+
errs := []error{err1, err2, err3}
98+
results := []*wamp.Result{result1, result2, result3}
99+
for i := 0; i < 3; i++ {
100+
if errs[i] != nil {
101+
t.Error("Caller", i, "failed to call procedure:", errs[i])
102+
} else {
103+
sum, ok := wamp.AsInt64(results[i].Arguments[0])
104+
if !ok {
105+
t.Error("Could not convert result", i, "to int64")
106+
} else if sum != 55 {
107+
t.Errorf("Wrong result %d: %d", i, sum)
108+
}
109+
}
58110
}
59111

60112
// Test unregister.
61113
if err = callee.Unregister(procName); err != nil {
62-
t.Fatal("Failed to unregister procedure:", err)
114+
t.Error("Failed to unregister procedure:", err)
63115
}
64116

65117
err = caller.Close()
66118
if err != nil {
67-
t.Fatal("Failed to disconnect client:", err)
119+
t.Error("Failed to disconnect client:", err)
120+
}
121+
122+
err = caller2.Close()
123+
if err != nil {
124+
t.Error("Failed to disconnect client:", err)
125+
}
126+
127+
err = caller3.Close()
128+
if err != nil {
129+
t.Error("Failed to disconnect client:", err)
68130
}
69131

70132
err = callee.Close()
71133
if err != nil {
72-
t.Fatal("Failed to disconnect client:", err)
134+
t.Error("Failed to disconnect client:", err)
73135
}
74136
}
75137

aat/sessionmeta_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ func TestMetaEventOnJoin(t *testing.T) {
4949
errChan <- errors.New("argument was not wamp.Dict")
5050
return
5151
}
52-
onJoinID = wamp.OptionID(details, "session")
52+
onJoinID, _ = wamp.AsID(details["session"])
5353
errChan <- nil
5454
}
5555

@@ -436,7 +436,7 @@ func TestMetaProcSessionGet(t *testing.T) {
436436
if !ok {
437437
t.Fatal("Could not convert result to wamp.Dict")
438438
}
439-
resultID := wamp.OptionID(dict, "session")
439+
resultID, _ := wamp.AsID(dict["session"])
440440
if resultID != sess.ID() {
441441
t.Fatal("Wrong session ID in result")
442442
}

aat/subscriptionmeta_test.go

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -46,14 +46,13 @@ func TestMetaEventOnCreateOnSubscribe(t *testing.T) {
4646
errChanC <- errors.New("argument 0 (session) was not wamp.ID")
4747
return
4848
}
49-
onCreateID = wamp.OptionID(dict, "id")
50-
if wamp.OptionURI(dict, "uri") != wamp.URI("some.topic") {
49+
onCreateID, _ = wamp.AsID(dict["id"])
50+
if u, _ := wamp.AsURI(dict["uri"]); u != wamp.URI("some.topic") {
5151
errChanC <- fmt.Errorf(
52-
"on_create had wrong topic, got '%v' want 'some.topic'",
53-
wamp.OptionURI(dict, "uri"))
52+
"on_create had wrong topic, got '%v' want 'some.topic'", u)
5453
return
5554
}
56-
if wamp.OptionString(dict, "created") == "" {
55+
if s, _ := wamp.AsString(dict["created"]); s == "" {
5756
errChanC <- errors.New("on_create missing created time")
5857
return
5958
}

client/client.go

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,6 @@ type Client struct {
143143
progGate map[context.Context]wamp.ID
144144

145145
actionChan chan func()
146-
idGen *wamp.SyncIDGen
147146

148147
stopping chan struct{}
149148
activeInvHandlers sync.WaitGroup
@@ -191,7 +190,6 @@ func NewClient(p wamp.Peer, cfg Config) (*Client, error) {
191190
progGate: map[context.Context]wamp.ID{},
192191

193192
actionChan: make(chan func()),
194-
idGen: new(wamp.SyncIDGen),
195193
stopping: make(chan struct{}),
196194
done: make(chan struct{}),
197195

@@ -252,7 +250,7 @@ func (c *Client) Subscribe(topic string, fn EventHandler, options wamp.Dict) err
252250
if options == nil {
253251
options = wamp.Dict{}
254252
}
255-
id := c.idGen.Next()
253+
id := wamp.GlobalID()
256254
c.expectReply(id)
257255
c.sess.Send(&wamp.Subscribe{
258256
Request: id,
@@ -322,7 +320,7 @@ func (c *Client) Unsubscribe(topic string) error {
322320
return err
323321
}
324322

325-
id := c.idGen.Next()
323+
id := wamp.GlobalID()
326324
c.expectReply(id)
327325
c.sess.Send(&wamp.Unsubscribe{
328326
Request: id,
@@ -382,7 +380,7 @@ func (c *Client) Publish(topic string, options wamp.Dict, args wamp.List, kwargs
382380
// Check if the client is asking for a PUBLISHED response.
383381
pubAck, _ := options[wamp.OptAcknowledge].(bool)
384382

385-
id := c.idGen.Next()
383+
id := wamp.GlobalID()
386384
if pubAck {
387385
c.expectReply(id)
388386
}
@@ -443,7 +441,7 @@ type InvocationHandler func(context.Context, wamp.List, wamp.Dict, wamp.Dict) (r
443441
//
444442
// NOTE: Use consts defined in wamp/options.go instead of raw strings.
445443
func (c *Client) Register(procedure string, fn InvocationHandler, options wamp.Dict) error {
446-
id := c.idGen.Next()
444+
id := wamp.GlobalID()
447445
c.expectReply(id)
448446
c.sess.Send(&wamp.Register{
449447
Request: id,
@@ -517,7 +515,7 @@ func (c *Client) Unregister(procedure string) error {
517515
return err
518516
}
519517

520-
id := c.idGen.Next()
518+
id := wamp.GlobalID()
521519
c.expectReply(id)
522520
c.sess.Send(&wamp.Unregister{
523521
Request: id,
@@ -657,7 +655,7 @@ func (c *Client) CallProgress(ctx context.Context, procedure string, options wam
657655
}()
658656
}
659657

660-
id := c.idGen.Next()
658+
id := wamp.GlobalID()
661659
c.expectReply(id)
662660
c.sess.Send(&wamp.Call{
663661
Request: id,
@@ -848,7 +846,7 @@ func handleCRAuth(peer wamp.Peer, challenge *wamp.Challenge, authHandlers map[st
848846
// If router sent back ABORT in response to client's authentication attempt
849847
// return error.
850848
if abort, ok := msg.(*wamp.Abort); ok {
851-
authErr := wamp.OptionString(abort.Details, wamp.OptError)
849+
authErr, _ := wamp.AsString(abort.Details[wamp.OptError])
852850
if authErr == "" {
853851
authErr = "authentication failed"
854852
}
@@ -1002,10 +1000,11 @@ CollectResults:
10021000
}
10031001
// If this is a progressive result.
10041002
if progChan != nil {
1005-
result, ok := msg.(*wamp.Result)
1006-
if ok && wamp.OptionFlag(result.Details, wamp.OptProgress) {
1007-
progChan <- result
1008-
goto CollectResults
1003+
if result, ok := msg.(*wamp.Result); ok {
1004+
if ok, _ = wamp.AsBool(result.Details[wamp.OptProgress]); ok {
1005+
progChan <- result
1006+
goto CollectResults
1007+
}
10091008
}
10101009
}
10111010
c.actionChan <- func() {
@@ -1126,7 +1125,7 @@ func (c *Client) runHandleInvocation(msg *wamp.Invocation) {
11261125
// Create a kill switch so that invocation can be canceled.
11271126
var cancel context.CancelFunc
11281127
var ctx context.Context
1129-
timeout := wamp.OptionInt64(msg.Details, wamp.OptTimeout)
1128+
timeout, _ := wamp.AsInt64(msg.Details[wamp.OptTimeout])
11301129
if timeout > 0 {
11311130
// The caller specified a timeout, in milliseconds.
11321131
ctx, cancel = context.WithTimeout(context.Background(),
@@ -1139,7 +1138,7 @@ func (c *Client) runHandleInvocation(msg *wamp.Invocation) {
11391138

11401139
// If caller is accepting progressive results, create map entry to
11411140
// allow progress to be sent.
1142-
if wamp.OptionFlag(msg.Details, wamp.OptReceiveProgress) {
1141+
if ok, _ = wamp.AsBool(msg.Details[wamp.OptReceiveProgress]); ok {
11431142
c.progGate[ctx] = msg.Request
11441143
}
11451144

@@ -1230,7 +1229,8 @@ func (c *Client) runHandleInterrupt(msg *wamp.Interrupt) {
12301229
// If the interrupt mode is "killnowait", then the router is not
12311230
// waiting for a response, so do not send one. This is indicated by
12321231
// deleting the cancel for the invocation early.
1233-
if wamp.OptionString(msg.Options, wamp.OptMode) == wamp.CancelModeKillNoWait {
1232+
mode, _ := wamp.AsString(msg.Options[wamp.OptMode])
1233+
if mode == wamp.CancelModeKillNoWait {
12341234
delete(c.invHandlerKill, msg.Request)
12351235
}
12361236
cancel()

client/client_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -202,7 +202,7 @@ func TestSubscribe(t *testing.T) {
202202
errChan <- errors.New("event missing or bad args")
203203
return
204204
}
205-
origTopic := wamp.OptionURI(details, "topic")
205+
origTopic, _ := wamp.AsURI(details["topic"])
206206
if origTopic != wamp.URI(testTopic) {
207207
errChan <- errors.New("wrong original topic")
208208
return

router/auth/anonymous_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,10 @@ func TestAnonAuth(t *testing.T) {
2424
t.Fatal("expected WELCOME message, got: ", welcome.MessageType())
2525
}
2626

27-
if wamp.OptionString(welcome.Details, "authmethod") != "anonymous" {
27+
if s, _ := wamp.AsString(welcome.Details["authmethod"]); s != "anonymous" {
2828
t.Fatal("invalid authmethod in welcome details")
2929
}
30-
if wamp.OptionString(welcome.Details, "authrole") != "anonymous" {
30+
if s, _ := wamp.AsString(welcome.Details["authrole"]); s != "anonymous" {
3131
t.Fatal("incorrect authrole in welcome details")
3232
}
3333
}

router/auth/crauth.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ func NewCRAuthenticator(keyStore KeyStore, timeout time.Duration) *CRAuthenticat
2929
func (cr *CRAuthenticator) AuthMethod() string { return "wampcra" }
3030

3131
func (cr *CRAuthenticator) Authenticate(sid wamp.ID, details wamp.Dict, client wamp.Peer) (*wamp.Welcome, error) {
32-
authid := wamp.OptionString(details, "authid")
32+
authid, _ := wamp.AsString(details["authid"])
3333
if authid == "" {
3434
return nil, errors.New("missing authid")
3535
}

0 commit comments

Comments
 (0)