Skip to content

Commit df2dbb4

Browse files
replaced wait/notify with countdownlatch and initiated websocket closure from the client side.
1 parent 970ed2b commit df2dbb4

2 files changed

Lines changed: 13 additions & 12 deletions

File tree

util/src/main/java/io/kubernetes/client/Exec.java

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import java.util.HashMap;
3838
import java.util.List;
3939
import java.util.Map;
40+
import java.util.concurrent.CountDownLatch;
4041
import java.util.concurrent.TimeUnit;
4142
import org.apache.commons.lang3.StringUtils;
4243
import org.slf4j.Logger;
@@ -354,6 +355,7 @@ public static class ExecProcess extends Process {
354355
private int statusCode = -1;
355356
private boolean isAlive = true;
356357
private final Map<Integer, InputStream> input = new HashMap<>();
358+
private final CountDownLatch latch = new CountDownLatch(1);
357359

358360
public ExecProcess(final ApiClient apiClient) throws IOException {
359361
this.streamHandler =
@@ -367,10 +369,13 @@ protected void handleMessage(int stream, InputStream inStream) throws IOExceptio
367369
synchronized (ExecProcess.this) {
368370
statusCode = exitCode;
369371
isAlive = false;
370-
ExecProcess.this.notifyAll();
371372
}
372373
}
373374
inStream.close();
375+
// Stream ID of `3` delivers the status of exec connection from kubelet,
376+
// closing the connection upon 0 exit-code.
377+
this.close();
378+
ExecProcess.this.latch.countDown();
374379
} else super.handleMessage(stream, inStream);
375380
}
376381

@@ -386,7 +391,7 @@ public void failure(Throwable ex) {
386391
// code.
387392
statusCode = -1975219;
388393
isAlive = false;
389-
ExecProcess.this.notifyAll();
394+
ExecProcess.this.latch.countDown();
390395
}
391396
}
392397

@@ -396,7 +401,7 @@ public void close() {
396401
synchronized (ExecProcess.this) {
397402
if (isAlive) {
398403
isAlive = false;
399-
ExecProcess.this.notifyAll();
404+
ExecProcess.this.latch.countDown();
400405
}
401406
}
402407

@@ -442,18 +447,14 @@ private synchronized InputStream getInputStream(int stream) {
442447

443448
@Override
444449
public int waitFor() throws InterruptedException {
445-
synchronized (this) {
446-
this.wait();
447-
return statusCode;
448-
}
450+
this.latch.await();
451+
return statusCode;
449452
}
450453

451454
@Override
452455
public boolean waitFor(long timeout, TimeUnit unit) throws InterruptedException {
453-
synchronized (this) {
454-
this.wait(TimeUnit.MILLISECONDS.convert(timeout, unit));
455-
return !isAlive();
456-
}
456+
this.latch.await(timeout, unit);
457+
return !isAlive();
457458
}
458459

459460
@Override

util/src/test/java/io/kubernetes/client/ExecTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -109,14 +109,14 @@ public void testExecProcess() throws IOException, InterruptedException {
109109

110110
process.getHandler().bytesMessage(makeStream(1, msgData.getBytes(StandardCharsets.UTF_8)));
111111
process.getHandler().bytesMessage(makeStream(2, errData.getBytes(StandardCharsets.UTF_8)));
112-
process.getHandler().bytesMessage(makeStream(3, OUTPUT_EXIT0.getBytes(StandardCharsets.UTF_8)));
113112

114113
final ByteArrayOutputStream stdout = new ByteArrayOutputStream();
115114
final ByteArrayOutputStream stderr = new ByteArrayOutputStream();
116115

117116
Thread t1 = asyncCopy(process.getInputStream(), stdout);
118117
Thread t2 = asyncCopy(process.getErrorStream(), stderr);
119118

119+
process.getHandler().bytesMessage(makeStream(3, OUTPUT_EXIT0.getBytes(StandardCharsets.UTF_8)));
120120
// TODO: Fix this asap!
121121
Thread.sleep(1000);
122122

0 commit comments

Comments
 (0)