1616package rx .schedulers ;
1717
1818import rx .Scheduler ;
19- import rx .internal .schedulers .*;
19+ import rx .annotations .Experimental ;
20+ import rx .internal .schedulers .ExecutorScheduler ;
21+ import rx .internal .schedulers .GenericScheduledExecutorService ;
22+ import rx .internal .schedulers .SchedulerLifecycle ;
2023import rx .internal .util .RxRingBuffer ;
2124import rx .plugins .RxJavaPlugins ;
2225import rx .plugins .RxJavaSchedulersHook ;
2326
2427import java .util .concurrent .Executor ;
28+ import java .util .concurrent .atomic .AtomicReference ;
2529
2630/**
2731 * Static factory methods for creating Schedulers.
@@ -32,7 +36,22 @@ public final class Schedulers {
3236 private final Scheduler ioScheduler ;
3337 private final Scheduler newThreadScheduler ;
3438
35- private static final Schedulers INSTANCE = new Schedulers ();
39+ private static final AtomicReference <Schedulers > INSTANCE = new AtomicReference <Schedulers >();
40+
41+ private static Schedulers getInstance () {
42+ for (;;) {
43+ Schedulers current = INSTANCE .get ();
44+ if (current != null ) {
45+ return current ;
46+ }
47+ current = new Schedulers ();
48+ if (INSTANCE .compareAndSet (null , current )) {
49+ return current ;
50+ } else {
51+ shutdown ();
52+ }
53+ }
54+ }
3655
3756 private Schedulers () {
3857 RxJavaSchedulersHook hook = RxJavaPlugins .getInstance ().getSchedulersHook ();
@@ -86,7 +105,7 @@ public static Scheduler trampoline() {
86105 * @return a {@link Scheduler} that creates new threads
87106 */
88107 public static Scheduler newThread () {
89- return INSTANCE .newThreadScheduler ;
108+ return getInstance () .newThreadScheduler ;
90109 }
91110
92111 /**
@@ -101,7 +120,7 @@ public static Scheduler newThread() {
101120 * @return a {@link Scheduler} meant for computation-bound work
102121 */
103122 public static Scheduler computation () {
104- return INSTANCE .computationScheduler ;
123+ return getInstance () .computationScheduler ;
105124 }
106125
107126 /**
@@ -118,7 +137,7 @@ public static Scheduler computation() {
118137 * @return a {@link Scheduler} meant for IO-bound work
119138 */
120139 public static Scheduler io () {
121- return INSTANCE .ioScheduler ;
140+ return getInstance () .ioScheduler ;
122141 }
123142
124143 /**
@@ -141,13 +160,27 @@ public static TestScheduler test() {
141160 public static Scheduler from (Executor executor ) {
142161 return new ExecutorScheduler (executor );
143162 }
163+
164+ /**
165+ * Resets the current {@link Schedulers} instance.
166+ * <p>
167+ * This API is experimental. Resetting the schedulers is dangerous
168+ * during application runtime and also bad code could invoke it in
169+ * the middle of an application life-cycle and really break applications
170+ * if not used cautiously.
171+ */
172+ @ Experimental
173+ public static void reset () {
174+ shutdown ();
175+ INSTANCE .set (null );
176+ }
144177
145178 /**
146179 * Starts those standard Schedulers which support the SchedulerLifecycle interface.
147180 * <p>The operation is idempotent and threadsafe.
148181 */
149182 /* public test only */ static void start () {
150- Schedulers s = INSTANCE ;
183+ Schedulers s = getInstance () ;
151184 synchronized (s ) {
152185 if (s .computationScheduler instanceof SchedulerLifecycle ) {
153186 ((SchedulerLifecycle ) s .computationScheduler ).start ();
@@ -170,7 +203,7 @@ public static Scheduler from(Executor executor) {
170203 * <p>The operation is idempotent and threadsafe.
171204 */
172205 public static void shutdown () {
173- Schedulers s = INSTANCE ;
206+ Schedulers s = getInstance () ;
174207 synchronized (s ) {
175208 if (s .computationScheduler instanceof SchedulerLifecycle ) {
176209 ((SchedulerLifecycle ) s .computationScheduler ).shutdown ();
@@ -181,12 +214,12 @@ public static void shutdown() {
181214 if (s .newThreadScheduler instanceof SchedulerLifecycle ) {
182215 ((SchedulerLifecycle ) s .newThreadScheduler ).shutdown ();
183216 }
184-
217+
185218 GenericScheduledExecutorService .INSTANCE .shutdown ();
186-
219+
187220 RxRingBuffer .SPSC_POOL .shutdown ();
188-
221+
189222 RxRingBuffer .SPMC_POOL .shutdown ();
190223 }
191224 }
192- }
225+ }
0 commit comments