Skip to content

Commit 48b500c

Browse files
authored
Merge pull request kubernetes-client#2608 from niteshs7/master
Use synchronization classes instead of Thread.sleep and fix flaky test
2 parents b1ac31c + 80c0e79 commit 48b500c

File tree

1 file changed

+104
-23
lines changed

1 file changed

+104
-23
lines changed

util/src/test/java/io/kubernetes/client/informer/impl/DefaultSharedIndexInformerWireMockTest.java

Lines changed: 104 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,11 @@
2525
import static org.junit.Assert.assertFalse;
2626
import static org.junit.Assert.assertTrue;
2727

28+
import com.github.tomakehurst.wiremock.core.Admin;
29+
import com.github.tomakehurst.wiremock.extension.Parameters;
30+
import com.github.tomakehurst.wiremock.extension.PostServeAction;
2831
import com.github.tomakehurst.wiremock.junit.WireMockRule;
32+
import com.github.tomakehurst.wiremock.stubbing.ServeEvent;
2933
import io.kubernetes.client.informer.EventType;
3034
import io.kubernetes.client.informer.ResourceEventHandler;
3135
import io.kubernetes.client.informer.SharedIndexInformer;
@@ -46,6 +50,8 @@
4650
import java.io.IOException;
4751
import java.util.Arrays;
4852
import java.util.Collections;
53+
import java.util.concurrent.CountDownLatch;
54+
import java.util.concurrent.Semaphore;
4955
import java.util.concurrent.atomic.AtomicBoolean;
5056
import org.junit.Before;
5157
import org.junit.Rule;
@@ -59,7 +65,20 @@ public class DefaultSharedIndexInformerWireMockTest {
5965

6066
private ApiClient client;
6167

62-
@Rule public WireMockRule wireMockRule = new WireMockRule(options().dynamicPort());
68+
public static class CountRequestAction extends PostServeAction {
69+
@Override
70+
public String getName() {
71+
return "semaphore";
72+
}
73+
74+
@Override
75+
public void doAction(ServeEvent serveEvent, Admin admin, Parameters parameters) {
76+
Semaphore count = (Semaphore) parameters.get("semaphore");
77+
count.release();
78+
}
79+
}
80+
81+
@Rule public WireMockRule wireMockRule = new WireMockRule(options().dynamicPort().extensions(new CountRequestAction()));
6382

6483
@Before
6584
public void setup() throws IOException {
@@ -74,15 +93,21 @@ public void setup() throws IOException {
7493
public void testNamespacedPodInformerNormalBehavior() throws InterruptedException {
7594

7695
CoreV1Api coreV1Api = new CoreV1Api(client);
77-
96+
Semaphore getCount = new Semaphore(1);
97+
Semaphore watchCount = new Semaphore(2);
98+
Parameters getParams = new Parameters();
99+
Parameters watchParams = new Parameters();
100+
getParams.put("semaphore", getCount);
101+
watchParams.put("semaphore", watchCount);
78102
String startRV = "1000";
79103
String endRV = "1001";
80104

81105
V1PodList podList =
82106
new V1PodList().metadata(new V1ListMeta().resourceVersion(startRV)).items(Arrays.asList());
83107

84-
stubFor(
108+
wireMockRule.stubFor(
85109
get(urlPathEqualTo("/api/v1/namespaces/" + namespace + "/pods"))
110+
.withPostServeAction("semaphore", getParams)
86111
.withQueryParam("watch", equalTo("false"))
87112
.willReturn(
88113
aResponse()
@@ -95,8 +120,9 @@ public void testNamespacedPodInformerNormalBehavior() throws InterruptedExceptio
95120
new V1Pod()
96121
.metadata(
97122
new V1ObjectMeta().namespace(namespace).name(podName).resourceVersion(endRV)));
98-
stubFor(
123+
wireMockRule.stubFor(
99124
get(urlPathEqualTo("/api/v1/namespaces/" + namespace + "/pods"))
125+
.withPostServeAction("semaphore", watchParams)
100126
.withQueryParam("watch", equalTo("true"))
101127
.willReturn(
102128
aResponse()
@@ -130,13 +156,15 @@ public void testNamespacedPodInformerNormalBehavior() throws InterruptedExceptio
130156
V1PodList.class);
131157

132158
AtomicBoolean foundExistingPod = new AtomicBoolean(false);
159+
CountDownLatch latch = new CountDownLatch(1);
133160
podInformer.addEventHandler(
134161
new ResourceEventHandler<V1Pod>() {
135162
@Override
136163
public void onAdd(V1Pod obj) {
137164
if (podName.equals(obj.getMetadata().getName())
138165
&& namespace.equals(obj.getMetadata().getNamespace())) {
139166
foundExistingPod.set(true);
167+
latch.countDown();
140168
}
141169
}
142170

@@ -146,9 +174,15 @@ public void onUpdate(V1Pod oldObj, V1Pod newObj) {}
146174
@Override
147175
public void onDelete(V1Pod obj, boolean deletedFinalStateUnknown) {}
148176
});
177+
178+
getCount.acquire(1);
179+
watchCount.acquire(2);
180+
149181
factory.startAllRegisteredInformers();
150182

151-
Thread.sleep(1000);
183+
latch.await();
184+
getCount.acquire(1);
185+
watchCount.acquire(2);
152186

153187
assertEquals(true, foundExistingPod.get());
154188
assertEquals(endRV, podInformer.lastSyncResourceVersion());
@@ -169,15 +203,21 @@ public void onDelete(V1Pod obj, boolean deletedFinalStateUnknown) {}
169203
public void testAllNamespacedPodInformerNormalBehavior() throws InterruptedException {
170204

171205
CoreV1Api coreV1Api = new CoreV1Api(client);
172-
206+
Semaphore getCount = new Semaphore(1);
207+
Semaphore watchCount = new Semaphore(2);
208+
Parameters getParams = new Parameters();
209+
Parameters watchParams = new Parameters();
210+
getParams.put("semaphore", getCount);
211+
watchParams.put("semaphore", watchCount);
173212
String startRV = "1000";
174213
String endRV = "1001";
175214

176215
V1PodList podList =
177216
new V1PodList().metadata(new V1ListMeta().resourceVersion(startRV)).items(Arrays.asList());
178217

179-
stubFor(
218+
wireMockRule.stubFor(
180219
get(urlPathEqualTo("/api/v1/pods"))
220+
.withPostServeAction("semaphore", getParams)
181221
.withQueryParam("watch", equalTo("false"))
182222
.willReturn(
183223
aResponse()
@@ -197,8 +237,9 @@ public void testAllNamespacedPodInformerNormalBehavior() throws InterruptedExcep
197237
.labels(Collections.singletonMap("foo", "bar"))
198238
.annotations(Collections.singletonMap("foo", "bar"))));
199239

200-
stubFor(
240+
wireMockRule.stubFor(
201241
get(urlPathEqualTo("/api/v1/pods"))
242+
.withPostServeAction("semaphore", watchParams)
202243
.withQueryParam("watch", equalTo("true"))
203244
.willReturn(
204245
aResponse()
@@ -243,19 +284,22 @@ public void testAllNamespacedPodInformerNormalBehavior() throws InterruptedExcep
243284
AtomicBoolean foundExistingPod = new AtomicBoolean(false);
244285
AtomicBoolean transformed = new AtomicBoolean(false);
245286
AtomicBoolean setTransformAfterStarted = new AtomicBoolean(false);
287+
CountDownLatch latch = new CountDownLatch(2);
246288
podInformer.addEventHandler(
247289
new ResourceEventHandler<V1Pod>() {
248290
@Override
249291
public void onAdd(V1Pod obj) {
250292
if (podName.equals(obj.getMetadata().getName())
251293
&& namespace.equals(obj.getMetadata().getNamespace())) {
252294
foundExistingPod.set(true);
295+
latch.countDown();
253296
}
254297
V1ObjectMeta metadata = obj.getMetadata();
255298
// check if the object was transformed
256299
if (metadata.getLabels().get("foo").equals("bar")
257300
&& metadata.getAnnotations() == null) {
258301
transformed.set(true);
302+
latch.countDown();
259303
}
260304
}
261305

@@ -265,9 +309,15 @@ public void onUpdate(V1Pod oldObj, V1Pod newObj) {}
265309
@Override
266310
public void onDelete(V1Pod obj, boolean deletedFinalStateUnknown) {}
267311
});
312+
313+
getCount.acquire(1);
314+
watchCount.acquire(2);
315+
268316
factory.startAllRegisteredInformers();
269-
Thread.sleep(1000);
270317

318+
latch.await();
319+
getCount.acquire(1);
320+
watchCount.acquire(2);
271321
// can not set transform func if the informer has started
272322
try {
273323
podInformer.setTransform((obj) -> new V1Pod());
@@ -293,15 +343,21 @@ public void onDelete(V1Pod obj, boolean deletedFinalStateUnknown) {}
293343
public void testAllNamespacedPodInformerTransformFailure() throws InterruptedException {
294344

295345
CoreV1Api coreV1Api = new CoreV1Api(client);
296-
346+
Semaphore getCount = new Semaphore(1);
347+
Semaphore watchCount = new Semaphore(2);
348+
Parameters getParams = new Parameters();
349+
Parameters watchParams = new Parameters();
350+
getParams.put("semaphore", getCount);
351+
watchParams.put("semaphore", watchCount);
297352
String startRV = "1000";
298353
String endRV = "1001";
299354

300355
V1PodList podList =
301356
new V1PodList().metadata(new V1ListMeta().resourceVersion(startRV)).items(Arrays.asList());
302357

303-
stubFor(
358+
wireMockRule.stubFor(
304359
get(urlPathEqualTo("/api/v1/pods"))
360+
.withPostServeAction("semaphore", getParams)
305361
.withQueryParam("watch", equalTo("false"))
306362
.willReturn(
307363
aResponse()
@@ -316,8 +372,9 @@ public void testAllNamespacedPodInformerTransformFailure() throws InterruptedExc
316372
.metadata(
317373
new V1ObjectMeta().namespace(namespace).name(podName).resourceVersion(endRV)));
318374

319-
stubFor(
375+
wireMockRule.stubFor(
320376
get(urlPathEqualTo("/api/v1/pods"))
377+
.withPostServeAction("semaphore", watchParams)
321378
.withQueryParam("watch", equalTo("true"))
322379
.willReturn(
323380
aResponse()
@@ -348,20 +405,22 @@ public void testAllNamespacedPodInformerTransformFailure() throws InterruptedExc
348405
},
349406
V1Pod.class,
350407
V1PodList.class);
351-
408+
CountDownLatch latch = new CountDownLatch(2);
352409
podInformer.setTransform(
353410
(obj) -> {
411+
latch.countDown();
354412
throw new ObjectTransformException("test transform failure");
355413
});
356-
357414
AtomicBoolean foundExistingPod = new AtomicBoolean(false);
415+
358416
podInformer.addEventHandler(
359417
new ResourceEventHandler<V1Pod>() {
360418
@Override
361419
public void onAdd(V1Pod obj) {
362420
if (podName.equals(obj.getMetadata().getName())
363421
&& namespace.equals(obj.getMetadata().getNamespace())) {
364422
foundExistingPod.set(true);
423+
latch.countDown();
365424
}
366425
}
367426

@@ -371,8 +430,15 @@ public void onUpdate(V1Pod oldObj, V1Pod newObj) {}
371430
@Override
372431
public void onDelete(V1Pod obj, boolean deletedFinalStateUnknown) {}
373432
});
433+
434+
getCount.acquire(1);
435+
watchCount.acquire(2);
436+
374437
factory.startAllRegisteredInformers();
375-
Thread.sleep(1000);
438+
439+
latch.await();
440+
getCount.acquire(1);
441+
watchCount.acquire(2);
376442

377443
// cannot find the pod due to transform failure
378444
assertFalse(foundExistingPod.get());
@@ -395,9 +461,17 @@ public void testInformerReListWatchOnWatchConflict() throws InterruptedException
395461
String startRV = "1000";
396462
V1PodList podList =
397463
new V1PodList().metadata(new V1ListMeta().resourceVersion(startRV)).items(Arrays.asList());
464+
Semaphore getCount = new Semaphore(2);
465+
Semaphore watchCount = new Semaphore(2);
466+
Parameters getParams = new Parameters();
467+
Parameters watchParams = new Parameters();
468+
getParams.put("semaphore", getCount);
469+
watchParams.put("semaphore", watchCount);
470+
398471

399-
stubFor(
472+
wireMockRule.stubFor(
400473
get(urlPathEqualTo("/api/v1/namespaces/" + namespace + "/pods"))
474+
.withPostServeAction("semaphore", getParams)
401475
.withQueryParam("watch", equalTo("false"))
402476
.willReturn(
403477
aResponse()
@@ -408,8 +482,9 @@ public void testInformerReListWatchOnWatchConflict() throws InterruptedException
408482
Watch.Response<V1Pod> watchResponse =
409483
new Watch.Response<>(
410484
EventType.ERROR.name(), new V1Status().apiVersion("v1").kind("Status").code(409));
411-
stubFor(
485+
wireMockRule.stubFor(
412486
get(urlPathEqualTo("/api/v1/namespaces/" + namespace + "/pods"))
487+
.withPostServeAction("semaphore", watchParams)
413488
.withQueryParam("watch", equalTo("true"))
414489
.withQueryParam("resourceVersion", equalTo(startRV))
415490
.willReturn(
@@ -443,10 +518,13 @@ public void testInformerReListWatchOnWatchConflict() throws InterruptedException
443518
V1Pod.class,
444519
V1PodList.class);
445520

521+
getCount.acquire(2);
522+
watchCount.acquire(2);
523+
446524
factory.startAllRegisteredInformers();
447525

448-
// Sleep mroe than 1s so that informer can perform multiple rounds of list-watch
449-
Thread.sleep(3000);
526+
getCount.acquire(2);
527+
watchCount.acquire(2);
450528

451529
verify(
452530
moreThan(1),
@@ -463,9 +541,13 @@ public void testInformerReListWatchOnWatchConflict() throws InterruptedException
463541
public void testInformerReListingOnListForbidden() throws InterruptedException {
464542

465543
CoreV1Api coreV1Api = new CoreV1Api(client);
544+
Semaphore getCount = new Semaphore(2);
545+
Parameters getParams = new Parameters();
546+
getParams.put("semaphore", getCount);
466547

467-
stubFor(
548+
wireMockRule.stubFor(
468549
get(urlPathEqualTo("/api/v1/namespaces/" + namespace + "/pods"))
550+
.withPostServeAction("semaphore", getParams)
469551
.withQueryParam("watch", equalTo("false"))
470552
.willReturn(
471553
aResponse()
@@ -505,10 +587,9 @@ public void testInformerReListingOnListForbidden() throws InterruptedException {
505587
V1Pod.class,
506588
V1PodList.class);
507589

590+
getCount.acquire(2);
508591
factory.startAllRegisteredInformers();
509-
510-
// Sleep mroe than 1s so that informer can perform multiple rounds of list-watch
511-
Thread.sleep(3000);
592+
getCount.acquire(2);
512593

513594
verify(
514595
moreThan(1),

0 commit comments

Comments
 (0)