2323import java .util .function .Function ;
2424import java .util .function .Supplier ;
2525
26+ import org .slf4j .Logger ;
27+ import org .slf4j .LoggerFactory ;
28+
2629import com .linkedin .parseq .internal .IdGenerator ;
2730import com .linkedin .parseq .internal .TaskLogger ;
2831import com .linkedin .parseq .promise .DelegatingPromise ;
3740import com .linkedin .parseq .trace .TraceBuilder ;
3841
3942/**
40- * TODO break tasks referencing each other through context, replace with ids
41- *
42- *
4343 * An abstract base class that can be used to build implementations of
4444 * {@link Task}.
4545 *
4848 */
4949public abstract class BaseTask <T > extends DelegatingPromise <T > implements Task <T >
5050{
51+ static final Logger LOGGER = LoggerFactory .getLogger (BaseTask .class );
52+
5153 private static enum StateType
5254 {
5355 // The initial state of the task.
@@ -103,7 +105,6 @@ public BaseTask()
103105 public BaseTask (final String name )
104106 {
105107 super (Promises .<T >settable ());
106-
107108 _name = name ;
108109 final State state = State .INIT ;
109110 _shallowTraceBuilder = new ShallowTraceBuilder (_id );
@@ -188,17 +189,17 @@ public final void contextRun(final Context context,
188189 {
189190 if (resolvedPromise .isFailed ())
190191 {
191- traceAndFail (resolvedPromise .getError (), taskLogger );
192+ fail (resolvedPromise .getError (), taskLogger );
192193 }
193194 else
194195 {
195- traceAndDone (resolvedPromise .get (), taskLogger );
196+ done (resolvedPromise .get (), taskLogger );
196197 }
197198 });
198199 }
199200 catch (Throwable t )
200201 {
201- traceAndFail (t , taskLogger );
202+ fail (t , taskLogger );
202203 }
203204 }
204205 else
@@ -236,21 +237,27 @@ public String getName()
236237 }
237238
238239 @ Override
239- public boolean cancel (final Exception reason )
240+ public boolean cancel (final Exception rootReason )
240241 {
241- if (transitionCancel ())
242+ if (transitionCancel (rootReason ))
242243 {
243- if (reason instanceof EarlyFinishException ) {
244- _shallowTraceBuilder .setResultType (ResultType .EARLY_FINISH );
245- } else {
246- _shallowTraceBuilder .setResultType (ResultType .ERROR );
247- }
244+ final Exception reason = new CancellationException (rootReason );
245+ traceFailure (reason );
248246 getSettableDelegate ().fail (reason );
249247 return true ;
250248 }
251249 return false ;
252250 }
253251
252+ protected void traceFailure (final Throwable reason ) {
253+ if (Exceptions .isEarlyFinish (reason )) {
254+ _shallowTraceBuilder .setResultType (ResultType .EARLY_FINISH );
255+ } else {
256+ _shallowTraceBuilder .setResultType (ResultType .ERROR );
257+ _shallowTraceBuilder .setValue (Exceptions .failureToString (reason ));
258+ }
259+ }
260+
254261 @ Override
255262 public ShallowTraceBuilder getShallowTraceBuilder () {
256263 return _shallowTraceBuilder ;
@@ -263,7 +270,7 @@ public ShallowTrace getShallowTrace()
263270 }
264271
265272 @ Override
266- public void traceValue (final Function <T , String > traceValueProvider ) {
273+ public void setTraceValueSerializer (final Function <T , String > traceValueProvider ) {
267274 _traceValueProvider = traceValueProvider ;
268275 }
269276
@@ -286,7 +293,7 @@ public Trace getTrace() {
286293 */
287294 protected abstract Promise <? extends T > run (final Context context ) throws Throwable ;
288295
289- private void traceAndDone (final T value , final TaskLogger taskLogger ) {
296+ private void traceDone (final T value ) {
290297 _shallowTraceBuilder .setResultType (ResultType .SUCCESS );
291298 final Function <T , String > traceValueProvider = _traceValueProvider ;
292299 if (traceValueProvider != null ) {
@@ -296,32 +303,23 @@ private void traceAndDone(final T value, final TaskLogger taskLogger) {
296303 _shallowTraceBuilder .setValue (e .toString ());
297304 }
298305 }
299- done (value , taskLogger );
300306 }
301307
302308 private void done (final T value , final TaskLogger taskLogger )
303309 {
304310 if (transitionDone ())
305311 {
312+ traceDone (value );
306313 getSettableDelegate ().done (value );
307314 taskLogger .logTaskEnd (BaseTask .this , _traceValueProvider );
308315 }
309316 }
310317
311- private void traceAndFail (final Throwable error , final TaskLogger taskLogger ) {
312- if (error instanceof EarlyFinishException ) {
313- _shallowTraceBuilder .setResultType (ResultType .EARLY_FINISH );
314- } else {
315- _shallowTraceBuilder .setResultType (ResultType .ERROR );
316- _shallowTraceBuilder .setValue (error .toString ());
317- }
318- fail (error , taskLogger );
319- }
320-
321318 private void fail (final Throwable error , final TaskLogger taskLogger )
322319 {
323320 if (transitionDone ())
324321 {
322+ traceFailure (error );
325323 getSettableDelegate ().fail (error );
326324 taskLogger .logTaskEnd (BaseTask .this , _traceValueProvider );
327325 }
@@ -368,14 +366,11 @@ protected void transitionPending()
368366 _shallowTraceBuilder .setPendingNanos (pendingNanos );
369367 }
370368
371- protected boolean transitionCancel ()
369+ protected boolean transitionCancel (final Exception reason )
372370 {
373371 State state ;
374372 State newState ;
375373
376- //TODO if previous state was PENDING then notify
377- //asynchronous execution about the cancellation
378-
379374 do
380375 {
381376 state = _stateRef .get ();
0 commit comments