2424import rx .Observable ;
2525import rx .Observer ;
2626import rx .Subscriber ;
27+ import rx .android .schedulers .AndroidSchedulers ;
2728import rx .schedulers .Schedulers ;
2829import timber .log .Timber ;
2930
@@ -34,89 +35,65 @@ public class BufferDemoFragment
3435
3536 private LogAdapter _adapter ;
3637 private List <String > _logs ;
37- private final Handler _mainThreadHandler = new Handler ( Looper . getMainLooper ()) ;
38+ private int _tapCount = 0 ;
3839
39- private int _counter = 0 ;
40+ private Observable <List <Integer >> _bufferedObservable ;
41+ private Observer <List <Integer >> _observer ;
4042
4143 @ Override
4244 public void onActivityCreated (@ Nullable Bundle savedInstanceState ) {
4345 super .onActivityCreated (savedInstanceState );
4446 _setupLogAdapter ();
4547
46- // Create the subscription
47- // AndroidObservable.bindFragment(this, _getObservable()) // Observable
48- // .observeOn(Schedulers.io())
49- // .subscribe(_getObserver()); // Observer
48+ _bufferedObservable = _getBufferedObservable ();
49+ _observer = _getObserver ();
5050 }
5151
5252
5353 @ OnClick (R .id .btn_start_operation )
5454 public void onButtonTapped () {
55- // BehaviorSubject takes in Observable inputs.
56- // So send 1 tap as an observable
57- _counter += 1 ;
58- _getObservable ().observeOn (Schedulers .io ()).subscribe (_getObserver ());
55+ _bufferedObservable .subscribeOn (Schedulers .io ())
56+ .observeOn (AndroidSchedulers .mainThread ())
57+ .subscribe (_observer );
5958 }
6059
6160 // -----------------------------------------------------------------------------------
6261 // Main Rx entities
6362
64- /**
65- * Gets the Observable as emitted from BehaviorSubject
66- *
67- * It begins by emitting the item most recently emitted by source Observable
68- * (or seed/default if none has yet been emitted - which is the case here)
69- *
70- * https://github.com/Netflix/RxJava/wiki/Subject#behaviorsubject
71- */
72- private Observable <List <Integer >> _getObservable () {
63+ private Observable <List <Integer >> _getBufferedObservable () {
7364 return Observable .create (new Observable .OnSubscribe <Integer >() {
7465
7566
7667 @ Override
77- public void call (Subscriber <? super Integer > observer ) {
78- observer .onNext (1 );
68+ public void call (Subscriber <? super Integer > subscriber ) {
69+ subscriber .onNext (1 );
70+ // send one tap
7971 }
72+
8073 }).buffer (2 , TimeUnit .SECONDS );
8174 }
8275
83- /**
84- * Observer that handles the result List<Integer> from Observable
85- * through the 3 important actions:
86- *
87- * 1. onCompleted
88- * 2. onError
89- * 3. onNext
90- */
9176 private Observer <List <Integer >> _getObserver () {
9277 return new Observer <List <Integer >>() {
9378
9479
9580 @ Override
9681 public void onCompleted () {
97- _mainThreadHandler .post (new Runnable () {
98-
99-
100- @ Override
101- public void run () {
102- _addLogToAdapter (String .format ("%d taps" , _counter ));
103- _counter = 0 ;
104- }
105- });
82+ _log (String .format ("%d taps" , _tapCount ));
83+ _tapCount = 0 ;
10684 }
10785
10886 @ Override
10987 public void onError (Throwable e ) {
11088 Timber .e (e , "--------- Woops on error!" );
89+ _log (String .format ("Dang error. check your logs" ));
11190 }
11291
11392 @ Override
11493 public void onNext (List <Integer > integers ) {
11594 for (int i : integers ) {
116- _counter += i ;
95+ _tapCount += i ;
11796 }
118-
119- Timber .d ("--------- on next with a count of %d" , _counter );
12097 onCompleted ();
12198 }
12299 };
@@ -140,12 +117,34 @@ private void _setupLogAdapter() {
140117 _logsList .setAdapter (_adapter );
141118 }
142119
143- private void _addLogToAdapter (String logMsg ) {
144- _logs .add (0 , logMsg );
145- _adapter .clear ();
146- _adapter .addAll (_logs );
120+ private void _log (String logMsg ) {
121+
122+ if (_isCurrentlyOnMainThread ()) {
123+ _logs .add (0 , logMsg + " (main thread) " );
124+ _adapter .clear ();
125+ _adapter .addAll (_logs );
126+
127+ } else {
128+ _logs .add (0 , logMsg + " (NOT main thread) " );
129+
130+ // You can only do below stuff on main thread.
131+ new Handler (Looper .getMainLooper ()).post (new Runnable () {
132+
133+
134+ @ Override
135+ public void run () {
136+ _adapter .clear ();
137+ _adapter .addAll (_logs );
138+ }
139+ });
140+ }
141+ }
142+
143+ private boolean _isCurrentlyOnMainThread () {
144+ return Looper .myLooper () == Looper .getMainLooper ();
147145 }
148146
147+
149148 private class LogAdapter
150149 extends ArrayAdapter <String > {
151150
0 commit comments