Skip to content

Commit 4b331e7

Browse files
author
jodzga
committed
Added CancellationException.
Removed lastly() and onCancelled(). Fixed javadoc: <p/> -> <p>
1 parent 6ec6e93 commit 4b331e7

39 files changed

+396
-199
lines changed

src/com/linkedin/parseq/AsyncCallableTask.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@
3232
* tasks wrapped in AsyncCallableTask do not get any special memory consistency
3333
* guarantees and should not attempt to use shared state. In others, they should
3434
* act as a stateless function.
35-
* <p/>
35+
* <p>
3636
* To use this class with an engine, register an executor with engine using
3737
* {@link #register(EngineBuilder, java.util.concurrent.Executor)}
3838
*

src/com/linkedin/parseq/BaseTask.java

Lines changed: 26 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,9 @@
2323
import java.util.function.Function;
2424
import java.util.function.Supplier;
2525

26+
import org.slf4j.Logger;
27+
import org.slf4j.LoggerFactory;
28+
2629
import com.linkedin.parseq.internal.IdGenerator;
2730
import com.linkedin.parseq.internal.TaskLogger;
2831
import com.linkedin.parseq.promise.DelegatingPromise;
@@ -37,9 +40,6 @@
3740
import com.linkedin.parseq.trace.TraceBuilder;
3841

3942
/**
40-
* TODO break tasks referencing each other through context, replace with ids
41-
*
42-
*
4343
* An abstract base class that can be used to build implementations of
4444
* {@link Task}.
4545
*
@@ -48,6 +48,8 @@
4848
*/
4949
public abstract class BaseTask<T> extends DelegatingPromise<T> implements Task<T>
5050
{
51+
static final Logger LOGGER = LoggerFactory.getLogger(BaseTask.class);
52+
5153
private static enum StateType
5254
{
5355
// The initial state of the task.
@@ -103,7 +105,6 @@ public BaseTask()
103105
public BaseTask(final String name)
104106
{
105107
super(Promises.<T>settable());
106-
107108
_name = name;
108109
final State state = State.INIT;
109110
_shallowTraceBuilder = new ShallowTraceBuilder(_id);
@@ -188,17 +189,17 @@ public final void contextRun(final Context context,
188189
{
189190
if (resolvedPromise.isFailed())
190191
{
191-
traceAndFail(resolvedPromise.getError(), taskLogger);
192+
fail(resolvedPromise.getError(), taskLogger);
192193
}
193194
else
194195
{
195-
traceAndDone(resolvedPromise.get(), taskLogger);
196+
done(resolvedPromise.get(), taskLogger);
196197
}
197198
});
198199
}
199200
catch (Throwable t)
200201
{
201-
traceAndFail(t, taskLogger);
202+
fail(t, taskLogger);
202203
}
203204
}
204205
else
@@ -236,21 +237,27 @@ public String getName()
236237
}
237238

238239
@Override
239-
public boolean cancel(final Exception reason)
240+
public boolean cancel(final Exception rootReason)
240241
{
241-
if (transitionCancel())
242+
if (transitionCancel(rootReason))
242243
{
243-
if (reason instanceof EarlyFinishException) {
244-
_shallowTraceBuilder.setResultType(ResultType.EARLY_FINISH);
245-
} else {
246-
_shallowTraceBuilder.setResultType(ResultType.ERROR);
247-
}
244+
final Exception reason = new CancellationException(rootReason);
245+
traceFailure(reason);
248246
getSettableDelegate().fail(reason);
249247
return true;
250248
}
251249
return false;
252250
}
253251

252+
protected void traceFailure(final Throwable reason) {
253+
if (Exceptions.isEarlyFinish(reason)) {
254+
_shallowTraceBuilder.setResultType(ResultType.EARLY_FINISH);
255+
} else {
256+
_shallowTraceBuilder.setResultType(ResultType.ERROR);
257+
_shallowTraceBuilder.setValue(Exceptions.failureToString(reason));
258+
}
259+
}
260+
254261
@Override
255262
public ShallowTraceBuilder getShallowTraceBuilder() {
256263
return _shallowTraceBuilder;
@@ -263,7 +270,7 @@ public ShallowTrace getShallowTrace()
263270
}
264271

265272
@Override
266-
public void traceValue(final Function<T, String> traceValueProvider) {
273+
public void setTraceValueSerializer(final Function<T, String> traceValueProvider) {
267274
_traceValueProvider = traceValueProvider;
268275
}
269276

@@ -286,7 +293,7 @@ public Trace getTrace() {
286293
*/
287294
protected abstract Promise<? extends T> run(final Context context) throws Throwable;
288295

289-
private void traceAndDone(final T value, final TaskLogger taskLogger) {
296+
private void traceDone(final T value) {
290297
_shallowTraceBuilder.setResultType(ResultType.SUCCESS);
291298
final Function<T, String> traceValueProvider = _traceValueProvider;
292299
if (traceValueProvider != null) {
@@ -296,32 +303,23 @@ private void traceAndDone(final T value, final TaskLogger taskLogger) {
296303
_shallowTraceBuilder.setValue(e.toString());
297304
}
298305
}
299-
done(value, taskLogger);
300306
}
301307

302308
private void done(final T value, final TaskLogger taskLogger)
303309
{
304310
if (transitionDone())
305311
{
312+
traceDone(value);
306313
getSettableDelegate().done(value);
307314
taskLogger.logTaskEnd(BaseTask.this, _traceValueProvider);
308315
}
309316
}
310317

311-
private void traceAndFail(final Throwable error, final TaskLogger taskLogger) {
312-
if (error instanceof EarlyFinishException) {
313-
_shallowTraceBuilder.setResultType(ResultType.EARLY_FINISH);
314-
} else {
315-
_shallowTraceBuilder.setResultType(ResultType.ERROR);
316-
_shallowTraceBuilder.setValue(error.toString());
317-
}
318-
fail(error , taskLogger);
319-
}
320-
321318
private void fail(final Throwable error, final TaskLogger taskLogger)
322319
{
323320
if (transitionDone())
324321
{
322+
traceFailure(error);
325323
getSettableDelegate().fail(error);
326324
taskLogger.logTaskEnd(BaseTask.this, _traceValueProvider);
327325
}
@@ -368,14 +366,11 @@ protected void transitionPending()
368366
_shallowTraceBuilder.setPendingNanos(pendingNanos);
369367
}
370368

371-
protected boolean transitionCancel()
369+
protected boolean transitionCancel(final Exception reason)
372370
{
373371
State state;
374372
State newState;
375373

376-
//TODO if previous state was PENDING then notify
377-
//asynchronous execution about the cancellation
378-
379374
do
380375
{
381376
state = _stateRef.get();

src/com/linkedin/parseq/CallableTask.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525
/**
2626
* A {@link Task} that will run a {@link Callable} and will set the task's value
2727
* to the value returned from the callable.
28-
* <p/>
28+
* <p>
2929
* Use {@link Tasks#callable(String, java.util.concurrent.Callable)} to create
3030
* instances of this class.
3131
*
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
package com.linkedin.parseq;
2+
3+
class CancellationException extends Exception {
4+
5+
private static final long serialVersionUID = 1L;
6+
7+
public CancellationException(String message, Throwable cause) {
8+
super(message, cause);
9+
}
10+
11+
public CancellationException(String message) {
12+
super(message);
13+
}
14+
15+
public CancellationException(Throwable cause) {
16+
super(cause);
17+
}
18+
19+
public CancellationException() {
20+
super();
21+
}
22+
}

src/com/linkedin/parseq/Context.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@
2828
* other tasks. Each task gets its own context, but contexts are hierarchical
2929
* such that any state changes made from this context are visible to other
3030
* contexts in the hierarchy.
31-
* <p/>
31+
* <p>
3232
* If a task finished while it still has pending timers or tasks, those
3333
* timers and tasks will be cancelled - they are guaranteed not to execute.
3434
*

src/com/linkedin/parseq/Engine.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -165,7 +165,7 @@ public void run(final Task<?> task, final String planClass)
165165
* shutdown. No new tasks will be accepted, but already running tasks will be
166166
* allowed to finish. Use {@link #awaitTermination(int, java.util.concurrent.TimeUnit)}
167167
* to wait for the engine to shutdown.
168-
* <p/>
168+
* <p>
169169
* If the engine is already shutting down or stopped this method will have
170170
* no effect.
171171
*/

src/com/linkedin/parseq/EngineBuilder.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@
2828

2929
/**
3030
* A configurable builder that makes {@link Engine}s.
31-
* <p/>
31+
* <p>
3232
* Minimum required configuration:
3333
* <ul>
3434
* <li>taskExecutor</li>
@@ -49,7 +49,7 @@ public EngineBuilder() {}
4949

5050
/**
5151
* Sets the task executor for the engine.
52-
* <p/>
52+
* <p>
5353
* The lifecycle of the executor is not managed by the engine.
5454
*
5555
* @param taskExecutor the executor to use for the engine
@@ -64,7 +64,7 @@ public EngineBuilder setTaskExecutor(final Executor taskExecutor)
6464

6565
/**
6666
* Sets the timer scheduler for the engine.
67-
* <p/>
67+
* <p>
6868
* The lifecycle of the scheduler is not managed by the engine.
6969
*
7070
* @param timerScheduler the scheduler to use for the engine
@@ -79,7 +79,7 @@ public EngineBuilder setTimerScheduler(final DelayedExecutor timerScheduler)
7979

8080
/**
8181
* Sets the timer scheduler for the engine.
82-
* <p/>
82+
* <p>
8383
* The lifecycle of the scheduler is not managed by the engine.
8484
*
8585
* @param timerScheduler the scheduler to use for the engine

src/com/linkedin/parseq/Exceptions.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,4 +32,25 @@ private static Exception addCause(Exception e, Throwable cause) {
3232
public static Exception noSuchElement(final Throwable cause) {
3333
return addCause(new NoSuchElementException(), cause);
3434
}
35+
36+
public static boolean isCancellation(final Throwable e) {
37+
return e instanceof CancellationException;
38+
}
39+
40+
public static boolean isEarlyFinish(final Throwable e) {
41+
return isCancellation(e) && e.getCause() instanceof EarlyFinishException;
42+
}
43+
44+
public static String failureToString(final Throwable e) {
45+
if (isCancellation(e)) {
46+
if (isEarlyFinish(e)) {
47+
return "";
48+
} else {
49+
return "cancelled because: " + e.getCause().toString();
50+
}
51+
} else {
52+
return e.toString();
53+
}
54+
}
55+
3556
}

src/com/linkedin/parseq/FusionTask.java

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -149,12 +149,7 @@ public void done(final T value) throws PromiseResolvedException {
149149
public void fail(final Throwable error) throws PromiseResolvedException {
150150
try {
151151
trnasitionToDone(traceContext);
152-
if (error instanceof EarlyFinishException) {
153-
_shallowTraceBuilder.setResultType(ResultType.EARLY_FINISH);
154-
} else {
155-
_shallowTraceBuilder.setResultType(ResultType.ERROR);
156-
_shallowTraceBuilder.setValue(error.toString());
157-
}
152+
traceFailure(error);
158153
settable.fail(error);
159154
traceContext.getTaskLogger().logTaskEnd(FusionTask.this, _traceValueProvider);
160155
CONTINUATIONS.submit(() -> dest.fail(error), null);
@@ -234,7 +229,7 @@ public Task<T> recoverWith(final String desc, final Function1<Throwable, Task<T>
234229
return Task.async(desc, context -> {
235230
final SettablePromise<T> result = Promises.settable();
236231
context.after(that).run(() -> {
237-
if (that.isFailed() && !(that.getError() instanceof EarlyFinishException)) {
232+
if (that.isFailed() && !(Exceptions.isCancellation(that.getError()))) {
238233
try {
239234
Task<T> r = func.apply(that.getError());
240235
Promises.propagateResult(r, result);

src/com/linkedin/parseq/ParTaskImpl.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@
3131

3232
/**
3333
* A {@link Task} that will run all of the constructor-supplied tasks in parallel.
34-
* <p/>
34+
* <p>
3535
* Use {@link Tasks#par(Task[])} or {@link Tasks#par(Iterable)} to create an
3636
* instance of this class.
3737
*

0 commit comments

Comments
 (0)