3333import org .joda .time .Seconds ;
3434import org .osgl .$ ;
3535import org .osgl .exception .NotAppliedException ;
36+ import org .osgl .logging .LogManager ;
37+ import org .osgl .logging .Logger ;
3638import org .osgl .util .C ;
3739import org .osgl .util .E ;
3840import org .osgl .util .S ;
4446
4547public class AppJobManager extends AppServiceBase <AppJobManager > {
4648
49+ private static final Logger LOGGER = LogManager .get (AppJobManager .class );
50+
4751 private ScheduledThreadPoolExecutor executor ;
4852 private ConcurrentMap <String , _Job > jobs = new ConcurrentHashMap <String , _Job >();
4953 private ConcurrentMap <String , ScheduledFuture > scheduled = new ConcurrentHashMap <>();
@@ -62,12 +66,13 @@ public AppJobManager(App app) {
6266
6367 @ Override
6468 protected void releaseResources () {
69+ LOGGER .trace ("release job manager resources" );
6570 for (_Job job : jobs .values ()) {
6671 job .destroy ();
6772 }
6873 jobs .clear ();
69- executor .shutdown ();
7074 executor .getQueue ().clear ();
75+ executor .shutdownNow ();
7176 }
7277
7378 public <T > Future <T > now (Callable <T > callable ) {
@@ -130,19 +135,25 @@ public void fixedDelay(String id, Runnable runnable, long interval, TimeUnit tim
130135
131136 private int parseTime (String timeDuration ) {
132137 if (timeDuration .startsWith ("${" ) && timeDuration .endsWith ("}" )) {
133- timeDuration = ( String ) app ().config ().get (timeDuration .substring (2 , timeDuration .length () - 1 ));
138+ timeDuration = app ().config ().get (timeDuration .substring (2 , timeDuration .length () - 1 ));
134139 }
135140 return Time .parseDuration (timeDuration );
136141 }
137142
138143 public void on (DateTime instant , Runnable runnable ) {
144+ if (LOGGER .isTraceEnabled ()) {
145+ LOGGER .trace ("schedule runnable[%s] on %s" , runnable , instant );
146+ }
139147 DateTime now = DateTime .now ();
140148 E .illegalArgumentIf (instant .isBefore (now ));
141149 Seconds seconds = Seconds .secondsBetween (now , instant );
142150 executor ().schedule (wrap (runnable ), seconds .getSeconds (), TimeUnit .SECONDS );
143151 }
144152
145153 public <T > Future <T > on (DateTime instant , Callable <T > callable ) {
154+ if (LOGGER .isTraceEnabled ()) {
155+ LOGGER .trace ("schedule callable[%s] on %s" , callable , instant );
156+ }
146157 DateTime now = DateTime .now ();
147158 E .illegalArgumentIf (instant .isBefore (now ));
148159 Seconds seconds = Seconds .secondsBetween (now , instant );
@@ -154,12 +165,7 @@ public void on(AppEventId appEvent, final Runnable runnable) {
154165 }
155166
156167 public void on (AppEventId appEvent , final Runnable runnable , boolean runImmediatelyIfEventDispatched ) {
157- _Job job = jobById (appEventJobId (appEvent ));
158- if (null == job ) {
159- processDelayedJob (wrap (runnable ), runImmediatelyIfEventDispatched );
160- } else {
161- job .addPrecedenceJob (_Job .once (runnable , this ));
162- }
168+ on (appEvent , runnable .toString (), runnable , runImmediatelyIfEventDispatched );
163169 }
164170
165171 public void post (AppEventId appEvent , final Runnable runnable ) {
@@ -180,10 +186,20 @@ public void on(AppEventId appEvent, String jobId, final Runnable runnable) {
180186 }
181187
182188 public void on (AppEventId appEvent , String jobId , final Runnable runnable , boolean runImmediatelyIfEventDispatched ) {
189+ boolean traceEnabled = LOGGER .isTraceEnabled ();
190+ if (traceEnabled ) {
191+ LOGGER .trace ("binding job[%s] to app event: %s, run immediately if event dispatched: %s" , jobId , appEvent , runImmediatelyIfEventDispatched );
192+ }
183193 _Job job = jobById (appEventJobId (appEvent ));
184194 if (null == job ) {
195+ if (traceEnabled ) {
196+ LOGGER .trace ("process delayed job: %s" , jobId );
197+ }
185198 processDelayedJob (wrap (runnable ), runImmediatelyIfEventDispatched );
186199 } else {
200+ if (traceEnabled ) {
201+ LOGGER .trace ("schedule job: %s" , jobId );
202+ }
187203 job .addPrecedenceJob (_Job .once (jobId , runnable , this ));
188204 }
189205 }
@@ -291,7 +307,10 @@ ScheduledThreadPoolExecutor executor() {
291307 private void initExecutor (App app ) {
292308 int poolSize = app .config ().jobPoolSize ();
293309 executor = new ScheduledThreadPoolExecutor (poolSize , new AppThreadFactory ("jobs" ), new ThreadPoolExecutor .AbortPolicy ());
294- //JDK1.7 API: executor.setRemoveOnCancelPolicy(true);
310+ executor .setRemoveOnCancelPolicy (true );
311+ if (LOGGER .isTraceEnabled ()) {
312+ LOGGER .trace ("init executor with thread pool: %s" , poolSize );
313+ }
295314 }
296315
297316 private void createAppEventListener (AppEventId appEventId ) {
0 commit comments