Skip to content

Commit 0372767

Browse files
Synchornize websockets writes
Change-Id: If5b81abe1a161a34479b2115d0918c96414ead54
1 parent 07599a8 commit 0372767

File tree

3 files changed

+35
-4
lines changed

3 files changed

+35
-4
lines changed

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,7 @@ To run example first run [websocket server](websockets-server/README.md), than u
100100
to your gradle file:
101101

102102
```groovy
103-
compile "com.appunite:websockets-rxjava:4.0.0"
103+
compile "com.appunite:websockets-rxjava:4.0.1"
104104
```
105105
106106
## License

websockets-rxjava/build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ signing {
4646
}
4747
group = "com.appunite"
4848
archivesBaseName = "websockets-rxjava"
49-
version = "4.0.0"
49+
version = "4.0.1"
5050

5151
if (!project.hasProperty("ossrhUsername")) {
5252
project.ext.setProperty("ossrhUsername", null)

websockets-rxjava/src/main/java/com/appunite/websocket/rx/RxWebSockets.java

Lines changed: 33 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import com.appunite.websocket.rx.messages.RxEventStringMessage;
2626
import okhttp3.OkHttpClient;
2727
import okhttp3.Request;
28+
import okhttp3.RequestBody;
2829
import okhttp3.Response;
2930
import okhttp3.ResponseBody;
3031
import okhttp3.ws.WebSocket;
@@ -62,7 +63,6 @@ public RxWebSockets(@Nonnull OkHttpClient client, @Nonnull Request request) {
6263
this.request = request;
6364
}
6465

65-
6666
/**
6767
* Returns observable that connected to a websocket and returns {@link RxObjectEvent}'s
6868
*
@@ -91,7 +91,7 @@ public void onOpen(WebSocket webSocket, Response response) {
9191
subscriber.onNext(new RxEventDisconnected(e));
9292
}
9393
} else {
94-
notifyConnected = webSocket;
94+
notifyConnected = new LockingWebSocket(webSocket);
9595
}
9696
webSocketItem = notifyConnected;
9797
}
@@ -178,4 +178,35 @@ public void call() {
178178
}
179179
});
180180
}
181+
182+
/**
183+
* Class that synchronizes writes to websocket
184+
*/
185+
private static class LockingWebSocket implements WebSocket {
186+
@Nonnull
187+
private final WebSocket webSocket;
188+
189+
public LockingWebSocket(@Nonnull WebSocket webSocket) {
190+
this.webSocket = webSocket;
191+
}
192+
193+
@Override
194+
public void sendMessage(RequestBody message) throws IOException {
195+
synchronized (this) {
196+
webSocket.sendMessage(message);
197+
}
198+
}
199+
200+
@Override
201+
public void sendPing(Buffer payload) throws IOException {
202+
synchronized (this) {
203+
webSocket.sendPing(payload);
204+
}
205+
}
206+
207+
@Override
208+
public void close(int code, String reason) throws IOException {
209+
webSocket.close(code, reason);
210+
}
211+
}
181212
}

0 commit comments

Comments
 (0)