2525import static org .junit .Assert .assertFalse ;
2626import static org .junit .Assert .assertTrue ;
2727
28+ import com .github .tomakehurst .wiremock .core .Admin ;
29+ import com .github .tomakehurst .wiremock .extension .Parameters ;
30+ import com .github .tomakehurst .wiremock .extension .PostServeAction ;
2831import com .github .tomakehurst .wiremock .junit .WireMockRule ;
32+ import com .github .tomakehurst .wiremock .stubbing .ServeEvent ;
2933import io .kubernetes .client .informer .EventType ;
3034import io .kubernetes .client .informer .ResourceEventHandler ;
3135import io .kubernetes .client .informer .SharedIndexInformer ;
4650import java .io .IOException ;
4751import java .util .Arrays ;
4852import java .util .Collections ;
53+ import java .util .concurrent .CountDownLatch ;
54+ import java .util .concurrent .Semaphore ;
4955import java .util .concurrent .atomic .AtomicBoolean ;
5056import org .junit .Before ;
5157import org .junit .Rule ;
@@ -59,7 +65,20 @@ public class DefaultSharedIndexInformerWireMockTest {
5965
6066 private ApiClient client ;
6167
62- @ Rule public WireMockRule wireMockRule = new WireMockRule (options ().dynamicPort ());
68+ public static class CountRequestAction extends PostServeAction {
69+ @ Override
70+ public String getName () {
71+ return "semaphore" ;
72+ }
73+
74+ @ Override
75+ public void doAction (ServeEvent serveEvent , Admin admin , Parameters parameters ) {
76+ Semaphore count = (Semaphore ) parameters .get ("semaphore" );
77+ count .release ();
78+ }
79+ }
80+
81+ @ Rule public WireMockRule wireMockRule = new WireMockRule (options ().dynamicPort ().extensions (new CountRequestAction ()));
6382
6483 @ Before
6584 public void setup () throws IOException {
@@ -74,15 +93,21 @@ public void setup() throws IOException {
7493 public void testNamespacedPodInformerNormalBehavior () throws InterruptedException {
7594
7695 CoreV1Api coreV1Api = new CoreV1Api (client );
77-
96+ Semaphore getCount = new Semaphore (1 );
97+ Semaphore watchCount = new Semaphore (2 );
98+ Parameters getParams = new Parameters ();
99+ Parameters watchParams = new Parameters ();
100+ getParams .put ("semaphore" , getCount );
101+ watchParams .put ("semaphore" , watchCount );
78102 String startRV = "1000" ;
79103 String endRV = "1001" ;
80104
81105 V1PodList podList =
82106 new V1PodList ().metadata (new V1ListMeta ().resourceVersion (startRV )).items (Arrays .asList ());
83107
84- stubFor (
108+ wireMockRule . stubFor (
85109 get (urlPathEqualTo ("/api/v1/namespaces/" + namespace + "/pods" ))
110+ .withPostServeAction ("semaphore" , getParams )
86111 .withQueryParam ("watch" , equalTo ("false" ))
87112 .willReturn (
88113 aResponse ()
@@ -95,8 +120,9 @@ public void testNamespacedPodInformerNormalBehavior() throws InterruptedExceptio
95120 new V1Pod ()
96121 .metadata (
97122 new V1ObjectMeta ().namespace (namespace ).name (podName ).resourceVersion (endRV )));
98- stubFor (
123+ wireMockRule . stubFor (
99124 get (urlPathEqualTo ("/api/v1/namespaces/" + namespace + "/pods" ))
125+ .withPostServeAction ("semaphore" , watchParams )
100126 .withQueryParam ("watch" , equalTo ("true" ))
101127 .willReturn (
102128 aResponse ()
@@ -130,13 +156,15 @@ public void testNamespacedPodInformerNormalBehavior() throws InterruptedExceptio
130156 V1PodList .class );
131157
132158 AtomicBoolean foundExistingPod = new AtomicBoolean (false );
159+ CountDownLatch latch = new CountDownLatch (1 );
133160 podInformer .addEventHandler (
134161 new ResourceEventHandler <V1Pod >() {
135162 @ Override
136163 public void onAdd (V1Pod obj ) {
137164 if (podName .equals (obj .getMetadata ().getName ())
138165 && namespace .equals (obj .getMetadata ().getNamespace ())) {
139166 foundExistingPod .set (true );
167+ latch .countDown ();
140168 }
141169 }
142170
@@ -146,9 +174,15 @@ public void onUpdate(V1Pod oldObj, V1Pod newObj) {}
146174 @ Override
147175 public void onDelete (V1Pod obj , boolean deletedFinalStateUnknown ) {}
148176 });
177+
178+ getCount .acquire (1 );
179+ watchCount .acquire (2 );
180+
149181 factory .startAllRegisteredInformers ();
150182
151- Thread .sleep (1000 );
183+ latch .await ();
184+ getCount .acquire (1 );
185+ watchCount .acquire (2 );
152186
153187 assertEquals (true , foundExistingPod .get ());
154188 assertEquals (endRV , podInformer .lastSyncResourceVersion ());
@@ -169,15 +203,21 @@ public void onDelete(V1Pod obj, boolean deletedFinalStateUnknown) {}
169203 public void testAllNamespacedPodInformerNormalBehavior () throws InterruptedException {
170204
171205 CoreV1Api coreV1Api = new CoreV1Api (client );
172-
206+ Semaphore getCount = new Semaphore (1 );
207+ Semaphore watchCount = new Semaphore (2 );
208+ Parameters getParams = new Parameters ();
209+ Parameters watchParams = new Parameters ();
210+ getParams .put ("semaphore" , getCount );
211+ watchParams .put ("semaphore" , watchCount );
173212 String startRV = "1000" ;
174213 String endRV = "1001" ;
175214
176215 V1PodList podList =
177216 new V1PodList ().metadata (new V1ListMeta ().resourceVersion (startRV )).items (Arrays .asList ());
178217
179- stubFor (
218+ wireMockRule . stubFor (
180219 get (urlPathEqualTo ("/api/v1/pods" ))
220+ .withPostServeAction ("semaphore" , getParams )
181221 .withQueryParam ("watch" , equalTo ("false" ))
182222 .willReturn (
183223 aResponse ()
@@ -197,8 +237,9 @@ public void testAllNamespacedPodInformerNormalBehavior() throws InterruptedExcep
197237 .labels (Collections .singletonMap ("foo" , "bar" ))
198238 .annotations (Collections .singletonMap ("foo" , "bar" ))));
199239
200- stubFor (
240+ wireMockRule . stubFor (
201241 get (urlPathEqualTo ("/api/v1/pods" ))
242+ .withPostServeAction ("semaphore" , watchParams )
202243 .withQueryParam ("watch" , equalTo ("true" ))
203244 .willReturn (
204245 aResponse ()
@@ -243,19 +284,22 @@ public void testAllNamespacedPodInformerNormalBehavior() throws InterruptedExcep
243284 AtomicBoolean foundExistingPod = new AtomicBoolean (false );
244285 AtomicBoolean transformed = new AtomicBoolean (false );
245286 AtomicBoolean setTransformAfterStarted = new AtomicBoolean (false );
287+ CountDownLatch latch = new CountDownLatch (2 );
246288 podInformer .addEventHandler (
247289 new ResourceEventHandler <V1Pod >() {
248290 @ Override
249291 public void onAdd (V1Pod obj ) {
250292 if (podName .equals (obj .getMetadata ().getName ())
251293 && namespace .equals (obj .getMetadata ().getNamespace ())) {
252294 foundExistingPod .set (true );
295+ latch .countDown ();
253296 }
254297 V1ObjectMeta metadata = obj .getMetadata ();
255298 // check if the object was transformed
256299 if (metadata .getLabels ().get ("foo" ).equals ("bar" )
257300 && metadata .getAnnotations () == null ) {
258301 transformed .set (true );
302+ latch .countDown ();
259303 }
260304 }
261305
@@ -265,9 +309,15 @@ public void onUpdate(V1Pod oldObj, V1Pod newObj) {}
265309 @ Override
266310 public void onDelete (V1Pod obj , boolean deletedFinalStateUnknown ) {}
267311 });
312+
313+ getCount .acquire (1 );
314+ watchCount .acquire (2 );
315+
268316 factory .startAllRegisteredInformers ();
269- Thread .sleep (1000 );
270317
318+ latch .await ();
319+ getCount .acquire (1 );
320+ watchCount .acquire (2 );
271321 // can not set transform func if the informer has started
272322 try {
273323 podInformer .setTransform ((obj ) -> new V1Pod ());
@@ -293,15 +343,21 @@ public void onDelete(V1Pod obj, boolean deletedFinalStateUnknown) {}
293343 public void testAllNamespacedPodInformerTransformFailure () throws InterruptedException {
294344
295345 CoreV1Api coreV1Api = new CoreV1Api (client );
296-
346+ Semaphore getCount = new Semaphore (1 );
347+ Semaphore watchCount = new Semaphore (2 );
348+ Parameters getParams = new Parameters ();
349+ Parameters watchParams = new Parameters ();
350+ getParams .put ("semaphore" , getCount );
351+ watchParams .put ("semaphore" , watchCount );
297352 String startRV = "1000" ;
298353 String endRV = "1001" ;
299354
300355 V1PodList podList =
301356 new V1PodList ().metadata (new V1ListMeta ().resourceVersion (startRV )).items (Arrays .asList ());
302357
303- stubFor (
358+ wireMockRule . stubFor (
304359 get (urlPathEqualTo ("/api/v1/pods" ))
360+ .withPostServeAction ("semaphore" , getParams )
305361 .withQueryParam ("watch" , equalTo ("false" ))
306362 .willReturn (
307363 aResponse ()
@@ -316,8 +372,9 @@ public void testAllNamespacedPodInformerTransformFailure() throws InterruptedExc
316372 .metadata (
317373 new V1ObjectMeta ().namespace (namespace ).name (podName ).resourceVersion (endRV )));
318374
319- stubFor (
375+ wireMockRule . stubFor (
320376 get (urlPathEqualTo ("/api/v1/pods" ))
377+ .withPostServeAction ("semaphore" , watchParams )
321378 .withQueryParam ("watch" , equalTo ("true" ))
322379 .willReturn (
323380 aResponse ()
@@ -348,20 +405,22 @@ public void testAllNamespacedPodInformerTransformFailure() throws InterruptedExc
348405 },
349406 V1Pod .class ,
350407 V1PodList .class );
351-
408+ CountDownLatch latch = new CountDownLatch ( 2 );
352409 podInformer .setTransform (
353410 (obj ) -> {
411+ latch .countDown ();
354412 throw new ObjectTransformException ("test transform failure" );
355413 });
356-
357414 AtomicBoolean foundExistingPod = new AtomicBoolean (false );
415+
358416 podInformer .addEventHandler (
359417 new ResourceEventHandler <V1Pod >() {
360418 @ Override
361419 public void onAdd (V1Pod obj ) {
362420 if (podName .equals (obj .getMetadata ().getName ())
363421 && namespace .equals (obj .getMetadata ().getNamespace ())) {
364422 foundExistingPod .set (true );
423+ latch .countDown ();
365424 }
366425 }
367426
@@ -371,8 +430,15 @@ public void onUpdate(V1Pod oldObj, V1Pod newObj) {}
371430 @ Override
372431 public void onDelete (V1Pod obj , boolean deletedFinalStateUnknown ) {}
373432 });
433+
434+ getCount .acquire (1 );
435+ watchCount .acquire (2 );
436+
374437 factory .startAllRegisteredInformers ();
375- Thread .sleep (1000 );
438+
439+ latch .await ();
440+ getCount .acquire (1 );
441+ watchCount .acquire (2 );
376442
377443 // cannot find the pod due to transform failure
378444 assertFalse (foundExistingPod .get ());
@@ -395,9 +461,17 @@ public void testInformerReListWatchOnWatchConflict() throws InterruptedException
395461 String startRV = "1000" ;
396462 V1PodList podList =
397463 new V1PodList ().metadata (new V1ListMeta ().resourceVersion (startRV )).items (Arrays .asList ());
464+ Semaphore getCount = new Semaphore (2 );
465+ Semaphore watchCount = new Semaphore (2 );
466+ Parameters getParams = new Parameters ();
467+ Parameters watchParams = new Parameters ();
468+ getParams .put ("semaphore" , getCount );
469+ watchParams .put ("semaphore" , watchCount );
470+
398471
399- stubFor (
472+ wireMockRule . stubFor (
400473 get (urlPathEqualTo ("/api/v1/namespaces/" + namespace + "/pods" ))
474+ .withPostServeAction ("semaphore" , getParams )
401475 .withQueryParam ("watch" , equalTo ("false" ))
402476 .willReturn (
403477 aResponse ()
@@ -408,8 +482,9 @@ public void testInformerReListWatchOnWatchConflict() throws InterruptedException
408482 Watch .Response <V1Pod > watchResponse =
409483 new Watch .Response <>(
410484 EventType .ERROR .name (), new V1Status ().apiVersion ("v1" ).kind ("Status" ).code (409 ));
411- stubFor (
485+ wireMockRule . stubFor (
412486 get (urlPathEqualTo ("/api/v1/namespaces/" + namespace + "/pods" ))
487+ .withPostServeAction ("semaphore" , watchParams )
413488 .withQueryParam ("watch" , equalTo ("true" ))
414489 .withQueryParam ("resourceVersion" , equalTo (startRV ))
415490 .willReturn (
@@ -443,10 +518,13 @@ public void testInformerReListWatchOnWatchConflict() throws InterruptedException
443518 V1Pod .class ,
444519 V1PodList .class );
445520
521+ getCount .acquire (2 );
522+ watchCount .acquire (2 );
523+
446524 factory .startAllRegisteredInformers ();
447525
448- // Sleep mroe than 1s so that informer can perform multiple rounds of list-watch
449- Thread . sleep ( 3000 );
526+ getCount . acquire ( 2 );
527+ watchCount . acquire ( 2 );
450528
451529 verify (
452530 moreThan (1 ),
@@ -463,9 +541,13 @@ public void testInformerReListWatchOnWatchConflict() throws InterruptedException
463541 public void testInformerReListingOnListForbidden () throws InterruptedException {
464542
465543 CoreV1Api coreV1Api = new CoreV1Api (client );
544+ Semaphore getCount = new Semaphore (2 );
545+ Parameters getParams = new Parameters ();
546+ getParams .put ("semaphore" , getCount );
466547
467- stubFor (
548+ wireMockRule . stubFor (
468549 get (urlPathEqualTo ("/api/v1/namespaces/" + namespace + "/pods" ))
550+ .withPostServeAction ("semaphore" , getParams )
469551 .withQueryParam ("watch" , equalTo ("false" ))
470552 .willReturn (
471553 aResponse ()
@@ -505,10 +587,9 @@ public void testInformerReListingOnListForbidden() throws InterruptedException {
505587 V1Pod .class ,
506588 V1PodList .class );
507589
590+ getCount .acquire (2 );
508591 factory .startAllRegisteredInformers ();
509-
510- // Sleep mroe than 1s so that informer can perform multiple rounds of list-watch
511- Thread .sleep (3000 );
592+ getCount .acquire (2 );
512593
513594 verify (
514595 moreThan (1 ),
0 commit comments