|
1 | 1 | package io.rsocket.internal; |
2 | 2 |
|
3 | 3 | import java.util.Objects; |
4 | | -import java.util.concurrent.CancellationException; |
5 | 4 | import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; |
6 | | -import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; |
| 5 | +import java.util.stream.Stream; |
7 | 6 | import org.reactivestreams.Processor; |
8 | | -import org.reactivestreams.Subscriber; |
9 | 7 | import org.reactivestreams.Subscription; |
10 | 8 | import reactor.core.CoreSubscriber; |
11 | 9 | import reactor.core.Disposable; |
12 | | -import reactor.core.Exceptions; |
13 | 10 | import reactor.core.Scannable; |
14 | 11 | import reactor.core.publisher.Mono; |
| 12 | +import reactor.core.publisher.MonoProcessor; |
15 | 13 | import reactor.core.publisher.Operators; |
16 | 14 | import reactor.util.annotation.Nullable; |
17 | 15 | import reactor.util.context.Context; |
| 16 | +import reactor.util.function.Tuple2; |
18 | 17 |
|
19 | 18 | public class UnicastMonoProcessor<O> extends Mono<O> |
20 | 19 | implements Processor<O, O>, CoreSubscriber<O>, Disposable, Subscription, Scannable { |
21 | 20 |
|
22 | | - /** |
23 | | - * Create a {@link UnicastMonoProcessor} that will eagerly request 1 on {@link |
24 | | - * #onSubscribe(Subscription)}, cache and emit the eventual result for 1 or N subscribers. |
25 | | - * |
26 | | - * @param <T> type of the expected value |
27 | | - * @return A {@link UnicastMonoProcessor}. |
28 | | - */ |
29 | | - public static <T> UnicastMonoProcessor<T> create() { |
30 | | - return new UnicastMonoProcessor<>(); |
31 | | - } |
32 | | - |
33 | | - volatile CoreSubscriber<? super O> actual; |
34 | | - |
35 | | - @SuppressWarnings("rawtypes") |
36 | | - static final AtomicReferenceFieldUpdater<UnicastMonoProcessor, CoreSubscriber> ACTUAL = |
37 | | - AtomicReferenceFieldUpdater.newUpdater( |
38 | | - UnicastMonoProcessor.class, CoreSubscriber.class, "actual"); |
39 | | - |
40 | | - volatile int once; |
41 | | - |
42 | 21 | @SuppressWarnings("rawtypes") |
43 | 22 | static final AtomicIntegerFieldUpdater<UnicastMonoProcessor> ONCE = |
44 | 23 | AtomicIntegerFieldUpdater.newUpdater(UnicastMonoProcessor.class, "once"); |
45 | 24 |
|
46 | | - Throwable error; |
47 | | - volatile boolean terminated; |
48 | | - O value; |
| 25 | + private final MonoProcessor<O> processor; |
49 | 26 |
|
50 | | - volatile Subscription subscription; |
51 | | - static final AtomicReferenceFieldUpdater<UnicastMonoProcessor, Subscription> UPSTREAM = |
52 | | - AtomicReferenceFieldUpdater.newUpdater( |
53 | | - UnicastMonoProcessor.class, Subscription.class, "subscription"); |
| 27 | + @SuppressWarnings("unused") |
| 28 | + private volatile int once; |
54 | 29 |
|
55 | | - @Override |
56 | | - public final void cancel() { |
57 | | - if (isTerminated()) { |
58 | | - return; |
59 | | - } |
| 30 | + private UnicastMonoProcessor() { |
| 31 | + this.processor = MonoProcessor.create(); |
| 32 | + } |
60 | 33 |
|
61 | | - final Subscription s = UPSTREAM.getAndSet(this, Operators.cancelledSubscription()); |
62 | | - if (s == Operators.cancelledSubscription()) { |
63 | | - return; |
64 | | - } |
| 34 | + public static <O> UnicastMonoProcessor<O> create() { |
| 35 | + return new UnicastMonoProcessor<>(); |
| 36 | + } |
65 | 37 |
|
66 | | - if (s != null) { |
67 | | - s.cancel(); |
68 | | - } |
| 38 | + @Override |
| 39 | + public Stream<? extends Scannable> actuals() { |
| 40 | + return processor.actuals(); |
69 | 41 | } |
70 | 42 |
|
71 | 43 | @Override |
72 | | - @SuppressWarnings("unchecked") |
73 | | - public void dispose() { |
74 | | - final Subscription s = UPSTREAM.getAndSet(this, Operators.cancelledSubscription()); |
75 | | - if (s == Operators.cancelledSubscription()) { |
76 | | - return; |
77 | | - } |
| 44 | + public boolean isScanAvailable() { |
| 45 | + return processor.isScanAvailable(); |
| 46 | + } |
78 | 47 |
|
79 | | - final CancellationException e = new CancellationException("Disposed"); |
80 | | - error = e; |
81 | | - value = null; |
82 | | - terminated = true; |
83 | | - if (s != null) { |
84 | | - s.cancel(); |
85 | | - } |
| 48 | + @Override |
| 49 | + public String name() { |
| 50 | + return processor.name(); |
| 51 | + } |
86 | 52 |
|
87 | | - final CoreSubscriber<? super O> a = this.actual; |
88 | | - ACTUAL.lazySet(this, null); |
89 | | - if (a != null) { |
90 | | - a.onError(e); |
91 | | - } |
| 53 | + @Override |
| 54 | + public String stepName() { |
| 55 | + return processor.stepName(); |
92 | 56 | } |
93 | 57 |
|
94 | | - /** |
95 | | - * Return the produced {@link Throwable} error if any or null |
96 | | - * |
97 | | - * @return the produced {@link Throwable} error if any or null |
98 | | - */ |
99 | | - @Nullable |
100 | | - public final Throwable getError() { |
101 | | - return isTerminated() ? error : null; |
| 58 | + @Override |
| 59 | + public Stream<String> steps() { |
| 60 | + return processor.steps(); |
102 | 61 | } |
103 | 62 |
|
104 | | - /** |
105 | | - * Indicates whether this {@code UnicastMonoProcessor} has been interrupted via cancellation. |
106 | | - * |
107 | | - * @return {@code true} if this {@code UnicastMonoProcessor} is cancelled, {@code false} |
108 | | - * otherwise. |
109 | | - */ |
110 | | - public boolean isCancelled() { |
111 | | - return isDisposed() && !isTerminated(); |
| 63 | + @Override |
| 64 | + public Stream<? extends Scannable> parents() { |
| 65 | + return processor.parents(); |
112 | 66 | } |
113 | 67 |
|
114 | | - /** |
115 | | - * Indicates whether this {@code UnicastMonoProcessor} has been completed with an error. |
116 | | - * |
117 | | - * @return {@code true} if this {@code UnicastMonoProcessor} was completed with an error, {@code |
118 | | - * false} otherwise. |
119 | | - */ |
120 | | - public final boolean isError() { |
121 | | - return getError() != null; |
| 68 | + @Override |
| 69 | + @Nullable |
| 70 | + public <T> T scan(Attr<T> key) { |
| 71 | + return processor.scan(key); |
122 | 72 | } |
123 | 73 |
|
124 | | - /** |
125 | | - * Indicates whether this {@code UnicastMonoProcessor} has been terminated by the source producer |
126 | | - * with a success or an error. |
127 | | - * |
128 | | - * @return {@code true} if this {@code UnicastMonoProcessor} is successful, {@code false} |
129 | | - * otherwise. |
130 | | - */ |
131 | | - public final boolean isTerminated() { |
132 | | - return terminated; |
| 74 | + @Override |
| 75 | + public <T> T scanOrDefault(Attr<T> key, T defaultValue) { |
| 76 | + return processor.scanOrDefault(key, defaultValue); |
133 | 77 | } |
134 | 78 |
|
135 | 79 | @Override |
136 | | - public boolean isDisposed() { |
137 | | - return subscription == Operators.cancelledSubscription(); |
| 80 | + public Stream<Tuple2<String, String>> tags() { |
| 81 | + return processor.tags(); |
138 | 82 | } |
139 | 83 |
|
140 | 84 | @Override |
141 | | - public final void onComplete() { |
142 | | - onNext(null); |
| 85 | + public void onSubscribe(Subscription s) { |
| 86 | + processor.onSubscribe(s); |
143 | 87 | } |
144 | 88 |
|
145 | 89 | @Override |
146 | | - @SuppressWarnings("unchecked") |
147 | | - public final void onError(Throwable cause) { |
148 | | - Objects.requireNonNull(cause, "onError cannot be null"); |
149 | | - |
150 | | - if (UPSTREAM.getAndSet(this, Operators.cancelledSubscription()) |
151 | | - == Operators.cancelledSubscription()) { |
152 | | - Operators.onErrorDropped(cause, currentContext()); |
153 | | - return; |
154 | | - } |
| 90 | + public void onNext(O o) { |
| 91 | + processor.onNext(o); |
| 92 | + } |
155 | 93 |
|
156 | | - error = cause; |
157 | | - value = null; |
158 | | - terminated = true; |
| 94 | + @Override |
| 95 | + public void onError(Throwable t) { |
| 96 | + processor.onError(t); |
| 97 | + } |
159 | 98 |
|
160 | | - final CoreSubscriber<? super O> a = actual; |
161 | | - ACTUAL.lazySet(this, null); |
162 | | - if (a != null) { |
163 | | - a.onError(cause); |
164 | | - } |
| 99 | + @Nullable |
| 100 | + public Throwable getError() { |
| 101 | + return processor.getError(); |
165 | 102 | } |
166 | 103 |
|
167 | | - @Override |
168 | | - @SuppressWarnings("unchecked") |
169 | | - public final void onNext(@Nullable O value) { |
170 | | - final Subscription s; |
171 | | - if ((s = UPSTREAM.getAndSet(this, Operators.cancelledSubscription())) |
172 | | - == Operators.cancelledSubscription()) { |
173 | | - if (value != null) { |
174 | | - Operators.onNextDropped(value, currentContext()); |
175 | | - } |
176 | | - return; |
177 | | - } |
| 104 | + public boolean isCancelled() { |
| 105 | + return processor.isCancelled(); |
| 106 | + } |
178 | 107 |
|
179 | | - this.value = value; |
180 | | - terminated = true; |
| 108 | + public boolean isError() { |
| 109 | + return processor.isError(); |
| 110 | + } |
181 | 111 |
|
182 | | - final CoreSubscriber<? super O> a = actual; |
183 | | - ACTUAL.lazySet(this, null); |
184 | | - if (value == null) { |
185 | | - if (a != null) { |
186 | | - a.onComplete(); |
187 | | - } |
188 | | - } else { |
189 | | - if (s != null) { |
190 | | - s.cancel(); |
191 | | - } |
192 | | - |
193 | | - if (a != null) { |
194 | | - a.onNext(value); |
195 | | - a.onComplete(); |
196 | | - } |
197 | | - } |
| 112 | + public boolean isSuccess() { |
| 113 | + return processor.isSuccess(); |
198 | 114 | } |
199 | 115 |
|
200 | | - @Override |
201 | | - public final void onSubscribe(Subscription subscription) { |
202 | | - if (Operators.setOnce(UPSTREAM, this, subscription)) { |
203 | | - subscription.request(Long.MAX_VALUE); |
204 | | - } |
| 116 | + public boolean isTerminated() { |
| 117 | + return processor.isTerminated(); |
205 | 118 | } |
206 | 119 |
|
207 | | - /** |
208 | | - * Returns the value that completed this {@link UnicastMonoProcessor}. Returns {@code null} if the |
209 | | - * {@link UnicastMonoProcessor} has not been completed. If the {@link UnicastMonoProcessor} is |
210 | | - * completed with an error a RuntimeException that wraps the error is thrown. |
211 | | - * |
212 | | - * @return the value that completed the {@link UnicastMonoProcessor}, or {@code null} if it has |
213 | | - * not been completed |
214 | | - * @throws RuntimeException if the {@link UnicastMonoProcessor} was completed with an error |
215 | | - */ |
216 | 120 | @Nullable |
217 | 121 | public O peek() { |
218 | | - if (!isTerminated()) { |
219 | | - return null; |
220 | | - } |
| 122 | + return processor.peek(); |
| 123 | + } |
221 | 124 |
|
222 | | - if (value != null) { |
223 | | - return value; |
224 | | - } |
| 125 | + public long downstreamCount() { |
| 126 | + return processor.downstreamCount(); |
| 127 | + } |
225 | 128 |
|
226 | | - if (error != null) { |
227 | | - RuntimeException re = Exceptions.propagate(error); |
228 | | - re = Exceptions.addSuppressed(re, new Exception("Mono#peek terminated with an error")); |
229 | | - throw re; |
230 | | - } |
| 129 | + public boolean hasDownstreams() { |
| 130 | + return processor.hasDownstreams(); |
| 131 | + } |
231 | 132 |
|
232 | | - return null; |
| 133 | + @Override |
| 134 | + public void onComplete() { |
| 135 | + processor.onComplete(); |
233 | 136 | } |
234 | 137 |
|
235 | 138 | @Override |
236 | | - public final void request(long n) { |
237 | | - Operators.validate(n); |
| 139 | + public void request(long n) { |
| 140 | + processor.request(n); |
238 | 141 | } |
239 | 142 |
|
240 | 143 | @Override |
241 | | - public Context currentContext() { |
242 | | - final CoreSubscriber<? super O> a = this.actual; |
243 | | - return a != null ? a.currentContext() : Context.empty(); |
| 144 | + public void cancel() { |
| 145 | + processor.cancel(); |
244 | 146 | } |
245 | 147 |
|
246 | 148 | @Override |
247 | | - @Nullable |
248 | | - public Object scanUnsafe(Attr key) { |
249 | | - // touch guard |
250 | | - boolean c = isCancelled(); |
| 149 | + public void dispose() { |
| 150 | + processor.dispose(); |
| 151 | + } |
251 | 152 |
|
252 | | - if (key == Attr.TERMINATED) { |
253 | | - return isTerminated(); |
254 | | - } |
255 | | - if (key == Attr.PARENT) { |
256 | | - return subscription; |
257 | | - } |
258 | | - if (key == Attr.ERROR) { |
259 | | - return error; |
260 | | - } |
261 | | - if (key == Attr.PREFETCH) { |
262 | | - return Integer.MAX_VALUE; |
263 | | - } |
264 | | - if (key == Attr.CANCELLED) { |
265 | | - return c; |
266 | | - } |
267 | | - return null; |
| 153 | + @Override |
| 154 | + public Context currentContext() { |
| 155 | + return processor.currentContext(); |
| 156 | + } |
| 157 | + |
| 158 | + @Override |
| 159 | + public boolean isDisposed() { |
| 160 | + return processor.isDisposed(); |
268 | 161 | } |
269 | 162 |
|
270 | | - /** |
271 | | - * Return true if any {@link Subscriber} is actively subscribed |
272 | | - * |
273 | | - * @return true if any {@link Subscriber} is actively subscribed |
274 | | - */ |
275 | | - public final boolean hasDownstream() { |
276 | | - return actual != null; |
| 163 | + @Override |
| 164 | + public Object scanUnsafe(Attr key) { |
| 165 | + return processor.scanUnsafe(key); |
277 | 166 | } |
278 | 167 |
|
279 | 168 | @Override |
280 | 169 | public void subscribe(CoreSubscriber<? super O> actual) { |
281 | 170 | Objects.requireNonNull(actual, "subscribe"); |
282 | 171 | if (once == 0 && ONCE.compareAndSet(this, 0, 1)) { |
283 | | - actual.onSubscribe(this); |
284 | | - ACTUAL.lazySet(this, actual); |
285 | | - if (isTerminated()) { |
286 | | - Throwable ex = error; |
287 | | - if (ex != null) { |
288 | | - actual.onError(ex); |
289 | | - } else { |
290 | | - O v = value; |
291 | | - if (v != null) { |
292 | | - actual.onNext(v); |
293 | | - } |
294 | | - actual.onComplete(); |
295 | | - } |
296 | | - ACTUAL.lazySet(this, null); |
297 | | - } |
| 172 | + processor.subscribe(actual); |
298 | 173 | } else { |
299 | 174 | Operators.error( |
300 | 175 | actual, |
|
0 commit comments