|
4 | 4 |
|
5 | 5 | import java.util.concurrent.CountDownLatch; |
6 | 6 | import java.util.concurrent.TimeUnit; |
| 7 | +import java.util.concurrent.atomic.AtomicBoolean; |
| 8 | +import java.util.concurrent.atomic.AtomicInteger; |
7 | 9 | import java.util.concurrent.atomic.AtomicReference; |
8 | 10 |
|
9 | 11 | import org.json.JSONObject; |
@@ -340,5 +342,38 @@ public void topicWithSpecialCharacters() throws Exception { |
340 | 342 | assertEquals(latch.getCount(), 0); |
341 | 343 | } |
342 | 344 |
|
343 | | - |
| 345 | + @Test |
| 346 | + public void addRemoveSubListener() throws Exception { |
| 347 | + String topic = "addRemoveSubListener"; |
| 348 | + int cnt = 10; |
| 349 | + |
| 350 | + CountDownLatch latch = new CountDownLatch(cnt + cnt/2); |
| 351 | + final AtomicInteger invokeCnt = new AtomicInteger(0); |
| 352 | + |
| 353 | + BusListener[] listeners = new BusListener[cnt]; |
| 354 | + for (int i=0; i<cnt; i++) { |
| 355 | + listeners[i] = new BusListener() { |
| 356 | + @Override |
| 357 | + public void onMessageReceived(String sourceUuid, String topic, Object payload) { |
| 358 | + invokeCnt.incrementAndGet(); |
| 359 | + latch.countDown(); |
| 360 | + } |
| 361 | + }; |
| 362 | + subscribeToTopic("*", topic, listeners[i]); |
| 363 | + } |
| 364 | + |
| 365 | + desktopConnection.getInterApplicationBus().send("*", topic, "whatever"); |
| 366 | + |
| 367 | + latch.await(5, TimeUnit.SECONDS); |
| 368 | + |
| 369 | + for (int i=0; i<cnt/2; i++) { |
| 370 | + unsubscribeToTopic("*", topic, listeners[i*2]); |
| 371 | + } |
| 372 | + |
| 373 | + desktopConnection.getInterApplicationBus().send("*", topic, "whatever again"); |
| 374 | + |
| 375 | + latch.await(5, TimeUnit.SECONDS); |
| 376 | + assertEquals(0, latch.getCount()); |
| 377 | + assertEquals(cnt + cnt/2, invokeCnt.get()); |
| 378 | + } |
344 | 379 | } |
0 commit comments