1212import android .widget .ListView ;
1313
1414import com .morihacky .android .rxjava .R ;
15+ import com .morihacky .android .rxjava .RxUtils ;
1516
1617import java .util .ArrayList ;
1718import java .util .List ;
2122import butterknife .ButterKnife ;
2223import butterknife .OnClick ;
2324import rx .Observable ;
25+ import rx .Scheduler ;
2426import rx .Subscriber ;
27+ import rx .Subscription ;
2528import rx .functions .Action0 ;
2629import rx .functions .Action1 ;
2730import rx .schedulers .Schedulers ;
@@ -39,17 +42,14 @@ public class PollingFragment
3942 private List <String > _logs ;
4043 private CompositeSubscription _subscriptions ;
4144 private int _counter = 0 ;
42-
43- @ Override
44- public void onDestroy () {
45- super .onDestroy ();
46- _subscriptions .unsubscribe ();
47- }
45+ private Scheduler .Worker _worker ;
4846
4947 @ Override
5048 public void onActivityCreated (@ Nullable Bundle savedInstanceState ) {
5149 super .onActivityCreated (savedInstanceState );
5250 _subscriptions = new CompositeSubscription ();
51+ _worker = Schedulers .newThread ()
52+ .createWorker ();
5353 _setupLogger ();
5454 }
5555
@@ -62,31 +62,82 @@ public View onCreateView(LayoutInflater inflater,
6262 return layout ;
6363 }
6464
65+ @ Override
66+ public void onDestroy () {
67+ super .onDestroy ();
68+ RxUtils .unsubscribeIfNotNull (_subscriptions );
69+ ButterKnife .unbind (this );
70+ }
71+
6572 @ OnClick (R .id .btn_start_simple_polling )
6673 public void onStartSimplePollingClicked () {
67- _subscriptions .add (Observable .create (new Observable .OnSubscribe <String >() {
68- @ Override
69- public void call (final Subscriber <? super String > observer ) {
70-
71- Schedulers .newThread ().createWorker () //
72- .schedulePeriodically (new Action0 () {
73- @ Override
74- public void call () {
75- observer .onNext (_doNetworkCallAndGetStringResult ());
76- }
77- }, INITIAL_DELAY , POLLING_INTERVAL , TimeUnit .MILLISECONDS );
78- }
79- }).take (10 ).subscribe (new Action1 <String >() {
80- @ Override
81- public void call (String s ) {
82- _log (String .format ("String polling - %s" , s ));
83- }
84- }));
74+ _setupLogger ();
75+ _log (String .format ("Simple String polling - %s" , _counter ));
76+ _subscriptions .add (Observable .create (
77+ new Observable .OnSubscribe <String >() {
78+ @ Override
79+ public void call (final Subscriber <? super String > subscriber ) {
80+ Subscription subscription = _worker
81+ .schedulePeriodically (new Action0 () {
82+ @ Override
83+ public void call () {
84+ subscriber .onNext (_doNetworkCallAndGetStringResult ());
85+ }
86+ }, INITIAL_DELAY , POLLING_INTERVAL , TimeUnit .MILLISECONDS );
87+ subscriber .add (subscription );
88+ }
89+ })
90+ .take (10 )
91+ .subscribe (new Action1 <String >() {
92+ @ Override
93+ public void call (String s ) {
94+ _log (String .format ("String polling - %s" , s ));
95+ }
96+ })
97+ );
98+ }
99+
100+ @ OnClick (R .id .btn_start_increasingly_delayed_polling )
101+ public void onStartIncreasinglyDelayedPolling () {
102+ _setupLogger ();
103+ _log (String .format ("Increasingly delayed String polling - %s" , _counter ));
104+ continueIncreasinglyDelayedPolling (1000 , 10 );
85105 }
86106
107+ private void continueIncreasinglyDelayedPolling (final int delay , final int limit ) {
108+ _subscriptions = _unsubscribeAndGetNewCompositeSub (_subscriptions );
109+ Observable .create (
110+ new Observable .OnSubscribe <String >() {
111+ @ Override
112+ public void call (final Subscriber <? super String > subscriber ) {
113+
114+ Subscription subscription = _worker .schedule (new Action0 () {
115+ @ Override
116+ public void call () {
117+ subscriber .onNext (_doNetworkCallAndGetStringResult ());
118+ }
119+ }, delay , TimeUnit .MILLISECONDS );
120+ subscriber .add (subscription );
121+ }
122+ })
123+ .take (limit )
124+ .subscribe (new Action1 <String >() {
125+ @ Override
126+ public void call (String s ) {
127+ continueIncreasinglyDelayedPolling (delay + 1000 , limit - 1 );
128+ Timber .d ("delay of %d" , delay );
129+ _log (String .format ("String polling - %s" , s ));
130+ }
131+ });
132+ }
87133 // -----------------------------------------------------------------------------------
88134 // Method that help wiring up the example (irrelevant to RxJava)
89135
136+ private static CompositeSubscription _unsubscribeAndGetNewCompositeSub (CompositeSubscription subscription ){
137+ RxUtils .unsubscribeIfNotNull (subscription );
138+ return RxUtils .getNewCompositeSubIfUnsubscribed (subscription );
139+ }
140+
90141 private String _doNetworkCallAndGetStringResult () {
91142
92143 try {
@@ -120,17 +171,19 @@ public void run() {
120171 }
121172
122173 private void _setupLogger () {
123- _logs = new ArrayList <String >();
174+ _logs = new ArrayList <>();
124175 _adapter = new LogAdapter (getActivity (), new ArrayList <String >());
125176 _logsList .setAdapter (_adapter );
177+ _subscriptions = _unsubscribeAndGetNewCompositeSub (_subscriptions );
178+ _counter = 0 ;
126179 }
127180
128181 private boolean _isCurrentlyOnMainThread () {
129182 return Looper .myLooper () == Looper .getMainLooper ();
130183 }
131184
132185 private class LogAdapter
133- extends ArrayAdapter <String > {
186+ extends ArrayAdapter <String > {
134187
135188 public LogAdapter (Context context , List <String > logs ) {
136189 super (context , R .layout .item_log , R .id .item_log , logs );
0 commit comments