Skip to content

Commit 7fc766c

Browse files
author
Kaushik Gopal
committed
feat: wip: cleanup buffer example based on current misunderstanding
1 parent 5f39213 commit 7fc766c

2 files changed

Lines changed: 46 additions & 46 deletions

File tree

app/src/main/java/com/morihacky/android/rxjava/BufferDemoFragment.java

Lines changed: 44 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import rx.Observable;
2525
import rx.Observer;
2626
import rx.Subscriber;
27+
import rx.android.schedulers.AndroidSchedulers;
2728
import rx.schedulers.Schedulers;
2829
import timber.log.Timber;
2930

@@ -34,89 +35,65 @@ public class BufferDemoFragment
3435

3536
private LogAdapter _adapter;
3637
private List<String> _logs;
37-
private final Handler _mainThreadHandler = new Handler(Looper.getMainLooper());
38+
private int _tapCount = 0;
3839

39-
private int _counter = 0;
40+
private Observable<List<Integer>> _bufferedObservable;
41+
private Observer<List<Integer>> _observer;
4042

4143
@Override
4244
public void onActivityCreated(@Nullable Bundle savedInstanceState) {
4345
super.onActivityCreated(savedInstanceState);
4446
_setupLogAdapter();
4547

46-
// Create the subscription
47-
// AndroidObservable.bindFragment(this, _getObservable()) // Observable
48-
// .observeOn(Schedulers.io())
49-
// .subscribe(_getObserver()); // Observer
48+
_bufferedObservable = _getBufferedObservable();
49+
_observer = _getObserver();
5050
}
5151

5252

5353
@OnClick(R.id.btn_start_operation)
5454
public void onButtonTapped() {
55-
// BehaviorSubject takes in Observable inputs.
56-
// So send 1 tap as an observable
57-
_counter += 1;
58-
_getObservable().observeOn(Schedulers.io()).subscribe(_getObserver());
55+
_bufferedObservable.subscribeOn(Schedulers.io())
56+
.observeOn(AndroidSchedulers.mainThread())
57+
.subscribe(_observer);
5958
}
6059

6160
// -----------------------------------------------------------------------------------
6261
// Main Rx entities
6362

64-
/**
65-
* Gets the Observable as emitted from BehaviorSubject
66-
*
67-
* It begins by emitting the item most recently emitted by source Observable
68-
* (or seed/default if none has yet been emitted - which is the case here)
69-
*
70-
* https://github.com/Netflix/RxJava/wiki/Subject#behaviorsubject
71-
*/
72-
private Observable<List<Integer>> _getObservable() {
63+
private Observable<List<Integer>> _getBufferedObservable() {
7364
return Observable.create(new Observable.OnSubscribe<Integer>() {
7465

7566

7667
@Override
77-
public void call(Subscriber<? super Integer> observer) {
78-
observer.onNext(1);
68+
public void call(Subscriber<? super Integer> subscriber) {
69+
subscriber.onNext(1);
70+
// send one tap
7971
}
72+
8073
}).buffer(2, TimeUnit.SECONDS);
8174
}
8275

83-
/**
84-
* Observer that handles the result List<Integer> from Observable
85-
* through the 3 important actions:
86-
*
87-
* 1. onCompleted
88-
* 2. onError
89-
* 3. onNext
90-
*/
9176
private Observer<List<Integer>> _getObserver() {
9277
return new Observer<List<Integer>>() {
9378

9479

9580
@Override
9681
public void onCompleted() {
97-
_mainThreadHandler.post(new Runnable() {
98-
99-
100-
@Override
101-
public void run() {
102-
_addLogToAdapter(String.format("%d taps", _counter));
103-
_counter = 0;
104-
}
105-
});
82+
_log(String.format("%d taps", _tapCount));
83+
_tapCount = 0;
10684
}
10785

10886
@Override
10987
public void onError(Throwable e) {
11088
Timber.e(e, "--------- Woops on error!");
89+
_log(String.format("Dang error. check your logs"));
11190
}
11291

11392
@Override
11493
public void onNext(List<Integer> integers) {
11594
for (int i : integers) {
116-
_counter += i;
95+
_tapCount += i;
11796
}
118-
119-
Timber.d("--------- on next with a count of %d", _counter);
12097
onCompleted();
12198
}
12299
};
@@ -140,12 +117,34 @@ private void _setupLogAdapter() {
140117
_logsList.setAdapter(_adapter);
141118
}
142119

143-
private void _addLogToAdapter(String logMsg) {
144-
_logs.add(0, logMsg);
145-
_adapter.clear();
146-
_adapter.addAll(_logs);
120+
private void _log(String logMsg) {
121+
122+
if (_isCurrentlyOnMainThread()) {
123+
_logs.add(0, logMsg + " (main thread) ");
124+
_adapter.clear();
125+
_adapter.addAll(_logs);
126+
127+
} else {
128+
_logs.add(0, logMsg + " (NOT main thread) ");
129+
130+
// You can only do below stuff on main thread.
131+
new Handler(Looper.getMainLooper()).post(new Runnable() {
132+
133+
134+
@Override
135+
public void run() {
136+
_adapter.clear();
137+
_adapter.addAll(_logs);
138+
}
139+
});
140+
}
141+
}
142+
143+
private boolean _isCurrentlyOnMainThread() {
144+
return Looper.myLooper() == Looper.getMainLooper();
147145
}
148146

147+
149148
private class LogAdapter
150149
extends ArrayAdapter<String> {
151150

app/src/main/java/com/morihacky/android/rxjava/MainActivity.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,8 @@ protected void onCreate(Bundle savedInstanceState) {
1919

2020
getSupportFragmentManager().beginTransaction()
2121
.addToBackStack(this.toString())
22-
.replace(R.id.activity_main, new MainFragment(), this.toString())
22+
// .replace(R.id.activity_main, new MainFragment(), this.toString())
23+
.replace(R.id.activity_main, new BufferDemoFragment(), this.toString())
2324
.commit();
2425
}
2526
}

0 commit comments

Comments
 (0)