Skip to content

Commit 97882b5

Browse files
author
jodzga
committed
Simplified Task API by removing systemHidden flag.
Fixed up system hidden flag for methods in Task.
1 parent 8ac06c9 commit 97882b5

11 files changed

Lines changed: 45 additions & 68 deletions

File tree

contrib/parseq-exec/src/main/java/com/linkedin/parseq/exec/Exec.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -161,7 +161,7 @@ public Task<Result> command(final String desc, final long timeout, final TimeUni
161161
_processQueueSize.incrementAndGet();
162162
return result;
163163
}
164-
}, false);
164+
});
165165
task.addListener(p -> {
166166
if (p.isFailed() && Exceptions.isCancellation(p.getError())) {
167167
//best effort to try to kill process in case task was cancelled

contrib/parseq-http-client/src/main/java/com/linkedin/parseq/httpclient/WrappedRequestBuilder.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -269,7 +269,7 @@ public void onThrowable(Throwable t) {
269269

270270
});
271271
return result;
272-
}, false);
272+
});
273273
}
274274

275275
public Task<Response> task() {

src-test/com/linkedin/parseq/BaseEngineTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -155,7 +155,7 @@ protected <T> Task<T> delayedValue(T value, long time, TimeUnit timeUnit) {
155155
final SettablePromise<T> promise = Promises.settable();
156156
_scheduler.schedule(() -> promise.done(value), time, timeUnit);
157157
return promise;
158-
} , false);
158+
});
159159
}
160160

161161
/**
@@ -167,7 +167,7 @@ protected <T> Task<T> delayedFailure(Throwable error, long time, TimeUnit timeUn
167167
final SettablePromise<T> promise = Promises.settable();
168168
_scheduler.schedule(() -> promise.fail(error), time, timeUnit);
169169
return promise;
170-
} , false);
170+
});
171171
}
172172

173173
protected int countTasks(Trace trace) {

src/com/linkedin/parseq/FusionTask.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -234,7 +234,7 @@ public Task<T> recoverWith(final String desc, final Function1<Throwable, Task<T>
234234
} );
235235
context.run(that);
236236
return result;
237-
} , true);
237+
});
238238
}
239239

240240
protected void propagate(final FusionTraceContext traceContext, final SettablePromise<T> result) {
@@ -258,7 +258,8 @@ protected Promise<? extends T> run(final Context context) throws Throwable {
258258
FusionTask.this.getShallowTraceBuilder());
259259
propagate(traceContext, result);
260260
return result;
261-
} , true);
261+
});
262+
propagationTask.getShallowTraceBuilder().setSystemHidden(true);
262263
context.after(_task).run(propagationTask);
263264
context.run(_task);
264265
}

src/com/linkedin/parseq/Task.java

Lines changed: 25 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -47,8 +47,6 @@
4747

4848

4949
/**
50-
* TODO add extensive documentations
51-
*
5250
* A task represents a deferred execution that also contains its resulting
5351
* value. In addition, tasks include tracing information that can be
5452
* used with various trace printers.
@@ -218,7 +216,9 @@ default <R> Task<R> map(final Function1<? super T, ? extends R> func) {
218216
*/
219217
default <R> Task<R> flatMap(final String desc, final Function1<? super T, Task<R>> func) {
220218
ArgumentUtil.requireNotNull(func, "function");
221-
return flatten(desc, map("flatMap", func));
219+
final Task<Task<R>> nested = map(func);
220+
nested.getShallowTraceBuilder().setSystemHidden(true);
221+
return flatten(desc, nested);
222222
}
223223

224224
/**
@@ -266,11 +266,11 @@ default Task<T> withSideEffect(final String desc, final Function1<? super T, Tas
266266
Task<?> sideEffect = func.apply(that.get());
267267
ctx.run(sideEffect);
268268
return sideEffect;
269-
} , true);
269+
});
270270
context.after(that).runSideEffect(sideEffectWrapper);
271271
context.run(that);
272272
return that;
273-
} , true);
273+
});
274274
}
275275

276276
/**
@@ -288,7 +288,7 @@ default Task<T> shareable() {
288288
context.runSideEffect(that);
289289
Promises.propagateResult(that, result);
290290
return result;
291-
} , true);
291+
});
292292
}
293293

294294
/**
@@ -369,7 +369,7 @@ default <R> Task<R> andThen(final String desc, final Task<R> task) {
369369
Promises.propagateResult(task, result);
370370
context.run(that);
371371
return result;
372-
} , true);
372+
});
373373
}
374374

375375
/**
@@ -632,7 +632,7 @@ default Task<T> recoverWith(final String desc, final Function1<Throwable, Task<T
632632
final Task<T> that = this;
633633
return async(desc, context -> {
634634
final SettablePromise<T> result = Promises.settable();
635-
final Task<T> recovery = async(desc, ctx -> {
635+
final Task<T> recovery = async("revovery", ctx -> {
636636
if (that.isFailed() && !(Exceptions.isCancellation(that.getError()))) {
637637
try {
638638
Task<T> r = func.apply(that.getError());
@@ -645,11 +645,12 @@ default Task<T> recoverWith(final String desc, final Function1<Throwable, Task<T
645645
result.done(that.get());
646646
}
647647
return result;
648-
} , true);
648+
});
649+
recovery.getShallowTraceBuilder().setSystemHidden(true);
649650
context.after(that).run(recovery);
650651
context.run(that);
651652
return result;
652-
} , true);
653+
});
653654
}
654655

655656
/**
@@ -694,7 +695,7 @@ default Task<T> withTimeout(final long time, final TimeUnit unit) {
694695
that.setPriority(Priority.MAX_PRIORITY);
695696
ctx.run(that);
696697
return result;
697-
} , false);
698+
});
698699
withTimeout.setPriority(getPriority());
699700
return withTimeout;
700701
}
@@ -721,7 +722,7 @@ public static <R> Task<R> flatten(final String desc, final Task<Task<R>> task) {
721722
} );
722723
context.run(task);
723724
return result;
724-
} , true);
725+
});
725726
}
726727

727728
/**
@@ -762,7 +763,7 @@ public static Task<Void> action(final String desc, final Action action) {
762763
return async(desc, () -> {
763764
action.run();
764765
return Promises.VOID;
765-
} , false);
766+
});
766767
}
767768

768769
/**
@@ -920,29 +921,26 @@ public static <T> Task<T> callable(final Callable<? extends T> callable) {
920921
* @param name a name that describes the task, it will show up in a trace
921922
* @param callable a callable to execute when this task is run, it must return
922923
* a {@code Promise<T>}
923-
* @param systemHidden flag that specifies whether trace of this task will have
924-
* a system-hidden flag set
925924
* @return a new task that will invoke the callable and complete with result
926925
* returned by a {@code Promise} returned by it
927926
* @see Promise
928927
*/
929-
public static <T> Task<T> async(final String name, final Callable<Promise<? extends T>> callable,
930-
final boolean systemHidden) {
928+
public static <T> Task<T> async(final String name, final Callable<Promise<? extends T>> callable) {
931929
return async(name, context -> {
932930
try {
933931
return callable.call();
934932
} catch (Throwable e) {
935933
return Promises.error(e);
936934
}
937-
} , systemHidden);
935+
});
938936
}
939937

940938
/**
941-
* Equivalent to {@code async("async", callable, systemHidden)}.
942-
* @see #async(String, Callable, boolean)
939+
* Equivalent to {@code async("async", callable)}.
940+
* @see #async(String, Callable)
943941
*/
944-
public static <T> Task<T> async(final Callable<Promise<? extends T>> callable, final boolean systemHidden) {
945-
return async("async", callable, systemHidden);
942+
public static <T> Task<T> async(final Callable<Promise<? extends T>> callable) {
943+
return async("async", callable);
946944
}
947945

948946
/**
@@ -957,15 +955,12 @@ public static <T> Task<T> async(final Callable<Promise<? extends T>> callable, f
957955
* @param name a name that describes the task, it will show up in a trace
958956
* @param func a function to execute when this task is run, it must return
959957
* a {@code Promise<T>}
960-
* @param systemHidden flag that specifies whether trace of this task will have
961-
* a system-hidden flag set
962958
* @return a new task that will invoke the function and complete with result
963959
* returned by a {@code Promise} returned by it
964960
* @see Context
965961
* @see Promise
966962
*/
967-
public static <T> Task<T> async(final String name, final Function1<Context, Promise<? extends T>> func,
968-
final boolean systemHidden) {
963+
public static <T> Task<T> async(final String name, final Function1<Context, Promise<? extends T>> func) {
969964
ArgumentUtil.requireNotNull(func, "function");
970965

971966
Task<T> task = new BaseTask<T>(name) {
@@ -975,17 +970,15 @@ protected Promise<? extends T> run(Context context) throws Throwable {
975970
}
976971
};
977972

978-
task.getShallowTraceBuilder().setSystemHidden(systemHidden);
979-
980973
return task;
981974
}
982975

983976
/**
984-
* Equivalent to {@code async("async", func, systemHidden)}.
977+
* Equivalent to {@code async("async", func)}.
985978
* @see #async(String, Function1, boolean)
986979
*/
987-
public static <T> Task<T> async(final Function1<Context, Promise<? extends T>> func, final boolean systemHidden) {
988-
return async("async", func, systemHidden);
980+
public static <T> Task<T> async(final Function1<Context, Promise<? extends T>> func) {
981+
return async("async", func);
989982
}
990983

991984
/**
@@ -1016,7 +1009,7 @@ public static <T> Task<T> blocking(final String name, final Callable<? extends T
10161009
}
10171010
} );
10181011
return promise;
1019-
} , false);
1012+
});
10201013
}
10211014

10221015
/**

test/com/linkedin/parseq/AbstractTaskTest.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -80,15 +80,15 @@ public void testNoRecover(int expectedNumberOfTasks) {
8080
assertEquals(countTasks(task.getTrace()), expectedNumberOfTasks);
8181
}
8282

83-
public void testTry(int expectedNumberOfTasks) {
83+
public void testToTry(int expectedNumberOfTasks) {
8484
Task<Try<Integer>> success = getSuccessTask().map("strlen", String::length).toTry();
85-
runAndWait("AbstractTaskTest.testTrySuccess", success);
85+
runAndWait("AbstractTaskTest.testToTrySuccess", success);
8686
assertFalse(success.get().isFailed());
8787
assertEquals((int) success.get().get(), TASK_VALUE.length());
8888
assertEquals(countTasks(success.getTrace()), expectedNumberOfTasks);
8989

9090
Task<Try<Integer>> failure = getFailureTask().map("strlen", String::length).toTry();
91-
runAndWait("AbstractTaskTest.testTryFailure", failure);
91+
runAndWait("AbstractTaskTest.testToTryFailure", failure);
9292
assertTrue(failure.get().isFailed());
9393
assertEquals(failure.get().getError().getMessage(), TASK_ERROR_MESSAGE);
9494
assertEquals(countTasks(failure.getTrace()), expectedNumberOfTasks);

test/com/linkedin/parseq/TestFusionTask.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ public void testNoRecover() {
4646

4747
@Test
4848
public void testTry() {
49-
testTry(3);
49+
testToTry(3);
5050
}
5151

5252
@Test

test/com/linkedin/parseq/TestTask.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ public void testNoRecover() {
4343

4444
@Test
4545
public void testTry() {
46-
testTry(4);
46+
testToTry(4);
4747
}
4848

4949
@Test
@@ -78,13 +78,13 @@ public void testOnFailure() {
7878

7979
@Override
8080
Task<String> getSuccessTask() {
81-
return Task.async("success", () -> Promises.value(TASK_VALUE), false);
81+
return Task.async("success", () -> Promises.value(TASK_VALUE));
8282
}
8383

8484
@Override
8585
Task<String> getFailureTask() {
8686
return Task.async("failure", () -> {
8787
throw new RuntimeException(TASK_ERROR_MESSAGE);
88-
} , false);
88+
});
8989
}
9090
}

test/com/linkedin/parseq/TestTaskCancellation.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ public void testTaskCancellationAfterRun() throws InterruptedException {
4040
Task<?> uncompleted = Task.async(() -> {
4141
runLatch.countDown();
4242
return Promises.settable();
43-
} , false);
43+
});
4444
uncompleted.addListener(p -> {
4545
if (p.isFailed() && Exceptions.isCancellation(p.getError())) {
4646
cancelActionValue.set(p.getError().getCause());
@@ -59,7 +59,7 @@ public void testTaskCancellationAfterRun() throws InterruptedException {
5959
@Test
6060
public void testTaskCancellationBeforeRun() throws InterruptedException {
6161
final AtomicReference<Throwable> cancelActionValue = new AtomicReference<>();
62-
Task<?> uncompleted = Task.async(() -> Promises.settable(), false);
62+
Task<?> uncompleted = Task.async(() -> Promises.settable());
6363
uncompleted.addListener(p -> {
6464
if (p.isFailed() && Exceptions.isCancellation(p.getError())) {
6565
cancelActionValue.set(p.getError().getCause());
@@ -97,7 +97,7 @@ public void testTaskCancellationPar() throws InterruptedException {
9797
Task<Integer> uncompleted = Task.async(() -> {
9898
runLatch.countDown();
9999
return Promises.settable();
100-
} , false);
100+
});
101101
uncompleted.addListener(p -> {
102102
if (p.isFailed() && Exceptions.isCancellation(p.getError())) {
103103
cancelActionValue.set(p.getError().getCause());
@@ -120,7 +120,7 @@ public void testTaskCancellationTimeout() throws InterruptedException {
120120
Task<Integer> uncompleted = Task.async(() -> {
121121
runLatch.countDown();
122122
return Promises.settable();
123-
} , false);
123+
});
124124
uncompleted.addListener(p -> {
125125
if (p.isFailed() && Exceptions.isCancellation(p.getError())) {
126126
cancelActionValue.set(p.getError().getCause());

test/com/linkedin/parseq/TestTaskFactoryMethods.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ public void testCallable() {
6666
@Test
6767
public void testAsync() {
6868
final SettablePromise<String> promise = Promises.settable();
69-
Task<String> task = Task.async(() -> promise, false);
69+
Task<String> task = Task.async(() -> promise);
7070
getScheduler().schedule(() -> promise.done("done"), 10, TimeUnit.MILLISECONDS);
7171
String value = runAndWait("TestTaskFactoryMethods.testAsync", task);
7272
assertEquals(value, "done");
@@ -80,7 +80,7 @@ public void testAsyncWithContext() {
8080
Task<String> task = Task.async(ctx -> {
8181
ctx.run(t);
8282
return t;
83-
} , false);
83+
});
8484
String value = runAndWait("TestTaskFactoryMethods.testAsyncWithContext", task);
8585
assertEquals(value, "done");
8686

0 commit comments

Comments
 (0)