|
1 | 1 | package io.rsocket.internal; |
2 | 2 |
|
3 | 3 | import java.util.Objects; |
| 4 | +import java.util.concurrent.CancellationException; |
4 | 5 | import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; |
5 | | -import java.util.stream.Stream; |
| 6 | +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; |
6 | 7 | import org.reactivestreams.Processor; |
| 8 | +import org.reactivestreams.Subscriber; |
7 | 9 | import org.reactivestreams.Subscription; |
8 | 10 | import reactor.core.CoreSubscriber; |
9 | 11 | import reactor.core.Disposable; |
| 12 | +import reactor.core.Exceptions; |
10 | 13 | import reactor.core.Scannable; |
11 | 14 | import reactor.core.publisher.Mono; |
12 | | -import reactor.core.publisher.MonoProcessor; |
13 | 15 | import reactor.core.publisher.Operators; |
14 | 16 | import reactor.util.annotation.Nullable; |
15 | 17 | import reactor.util.context.Context; |
16 | | -import reactor.util.function.Tuple2; |
17 | 18 |
|
18 | 19 | public class UnicastMonoProcessor<O> extends Mono<O> |
19 | 20 | implements Processor<O, O>, CoreSubscriber<O>, Disposable, Subscription, Scannable { |
20 | 21 |
|
| 22 | + /** |
| 23 | + * Create a {@link UnicastMonoProcessor} that will eagerly request 1 on {@link |
| 24 | + * #onSubscribe(Subscription)}, cache and emit the eventual result for 1 or N subscribers. |
| 25 | + * |
| 26 | + * @param <T> type of the expected value |
| 27 | + * @return A {@link UnicastMonoProcessor}. |
| 28 | + */ |
| 29 | + public static <T> UnicastMonoProcessor<T> create() { |
| 30 | + return new UnicastMonoProcessor<>(); |
| 31 | + } |
| 32 | + |
| 33 | + volatile CoreSubscriber<? super O> actual; |
| 34 | + |
| 35 | + @SuppressWarnings("rawtypes") |
| 36 | + static final AtomicReferenceFieldUpdater<UnicastMonoProcessor, CoreSubscriber> ACTUAL = |
| 37 | + AtomicReferenceFieldUpdater.newUpdater( |
| 38 | + UnicastMonoProcessor.class, CoreSubscriber.class, "actual"); |
| 39 | + |
| 40 | + volatile int once; |
| 41 | + |
21 | 42 | @SuppressWarnings("rawtypes") |
22 | 43 | static final AtomicIntegerFieldUpdater<UnicastMonoProcessor> ONCE = |
23 | 44 | AtomicIntegerFieldUpdater.newUpdater(UnicastMonoProcessor.class, "once"); |
24 | 45 |
|
25 | | - private final MonoProcessor<O> processor; |
| 46 | + Throwable error; |
| 47 | + volatile boolean terminated; |
| 48 | + O value; |
26 | 49 |
|
27 | | - @SuppressWarnings("unused") |
28 | | - private volatile int once; |
| 50 | + volatile Subscription subscription; |
| 51 | + static final AtomicReferenceFieldUpdater<UnicastMonoProcessor, Subscription> UPSTREAM = |
| 52 | + AtomicReferenceFieldUpdater.newUpdater( |
| 53 | + UnicastMonoProcessor.class, Subscription.class, "subscription"); |
29 | 54 |
|
30 | | - private UnicastMonoProcessor() { |
31 | | - this.processor = MonoProcessor.create(); |
32 | | - } |
| 55 | + @Override |
| 56 | + public final void cancel() { |
| 57 | + if (isTerminated()) { |
| 58 | + return; |
| 59 | + } |
33 | 60 |
|
34 | | - public static <O> UnicastMonoProcessor<O> create() { |
35 | | - return new UnicastMonoProcessor<>(); |
36 | | - } |
| 61 | + final Subscription s = UPSTREAM.getAndSet(this, Operators.cancelledSubscription()); |
| 62 | + if (s == Operators.cancelledSubscription()) { |
| 63 | + return; |
| 64 | + } |
37 | 65 |
|
38 | | - @Override |
39 | | - public Stream<? extends Scannable> actuals() { |
40 | | - return processor.actuals(); |
| 66 | + if (s != null) { |
| 67 | + s.cancel(); |
| 68 | + } |
41 | 69 | } |
42 | 70 |
|
43 | 71 | @Override |
44 | | - public boolean isScanAvailable() { |
45 | | - return processor.isScanAvailable(); |
46 | | - } |
| 72 | + @SuppressWarnings("unchecked") |
| 73 | + public void dispose() { |
| 74 | + final Subscription s = UPSTREAM.getAndSet(this, Operators.cancelledSubscription()); |
| 75 | + if (s == Operators.cancelledSubscription()) { |
| 76 | + return; |
| 77 | + } |
47 | 78 |
|
48 | | - @Override |
49 | | - public String name() { |
50 | | - return processor.name(); |
51 | | - } |
| 79 | + final CancellationException e = new CancellationException("Disposed"); |
| 80 | + error = e; |
| 81 | + value = null; |
| 82 | + terminated = true; |
| 83 | + if (s != null) { |
| 84 | + s.cancel(); |
| 85 | + } |
52 | 86 |
|
53 | | - @Override |
54 | | - public String stepName() { |
55 | | - return processor.stepName(); |
| 87 | + final CoreSubscriber<? super O> a = this.actual; |
| 88 | + ACTUAL.lazySet(this, null); |
| 89 | + if (a != null) { |
| 90 | + a.onError(e); |
| 91 | + } |
56 | 92 | } |
57 | 93 |
|
58 | | - @Override |
59 | | - public Stream<String> steps() { |
60 | | - return processor.steps(); |
| 94 | + /** |
| 95 | + * Return the produced {@link Throwable} error if any or null |
| 96 | + * |
| 97 | + * @return the produced {@link Throwable} error if any or null |
| 98 | + */ |
| 99 | + @Nullable |
| 100 | + public final Throwable getError() { |
| 101 | + return isTerminated() ? error : null; |
61 | 102 | } |
62 | 103 |
|
63 | | - @Override |
64 | | - public Stream<? extends Scannable> parents() { |
65 | | - return processor.parents(); |
| 104 | + /** |
| 105 | + * Indicates whether this {@code UnicastMonoProcessor} has been interrupted via cancellation. |
| 106 | + * |
| 107 | + * @return {@code true} if this {@code UnicastMonoProcessor} is cancelled, {@code false} |
| 108 | + * otherwise. |
| 109 | + */ |
| 110 | + public boolean isCancelled() { |
| 111 | + return isDisposed() && !isTerminated(); |
66 | 112 | } |
67 | 113 |
|
68 | | - @Override |
69 | | - @Nullable |
70 | | - public <T> T scan(Attr<T> key) { |
71 | | - return processor.scan(key); |
| 114 | + /** |
| 115 | + * Indicates whether this {@code UnicastMonoProcessor} has been completed with an error. |
| 116 | + * |
| 117 | + * @return {@code true} if this {@code UnicastMonoProcessor} was completed with an error, {@code |
| 118 | + * false} otherwise. |
| 119 | + */ |
| 120 | + public final boolean isError() { |
| 121 | + return getError() != null; |
72 | 122 | } |
73 | 123 |
|
74 | | - @Override |
75 | | - public <T> T scanOrDefault(Attr<T> key, T defaultValue) { |
76 | | - return processor.scanOrDefault(key, defaultValue); |
| 124 | + /** |
| 125 | + * Indicates whether this {@code UnicastMonoProcessor} has been terminated by the source producer |
| 126 | + * with a success or an error. |
| 127 | + * |
| 128 | + * @return {@code true} if this {@code UnicastMonoProcessor} is successful, {@code false} |
| 129 | + * otherwise. |
| 130 | + */ |
| 131 | + public final boolean isTerminated() { |
| 132 | + return terminated; |
77 | 133 | } |
78 | 134 |
|
79 | 135 | @Override |
80 | | - public Stream<Tuple2<String, String>> tags() { |
81 | | - return processor.tags(); |
| 136 | + public boolean isDisposed() { |
| 137 | + return subscription == Operators.cancelledSubscription(); |
82 | 138 | } |
83 | 139 |
|
84 | 140 | @Override |
85 | | - public void onSubscribe(Subscription s) { |
86 | | - processor.onSubscribe(s); |
| 141 | + public final void onComplete() { |
| 142 | + onNext(null); |
87 | 143 | } |
88 | 144 |
|
89 | 145 | @Override |
90 | | - public void onNext(O o) { |
91 | | - processor.onNext(o); |
92 | | - } |
| 146 | + @SuppressWarnings("unchecked") |
| 147 | + public final void onError(Throwable cause) { |
| 148 | + Objects.requireNonNull(cause, "onError cannot be null"); |
| 149 | + |
| 150 | + if (UPSTREAM.getAndSet(this, Operators.cancelledSubscription()) |
| 151 | + == Operators.cancelledSubscription()) { |
| 152 | + Operators.onErrorDropped(cause, currentContext()); |
| 153 | + return; |
| 154 | + } |
93 | 155 |
|
94 | | - @Override |
95 | | - public void onError(Throwable t) { |
96 | | - processor.onError(t); |
97 | | - } |
| 156 | + error = cause; |
| 157 | + value = null; |
| 158 | + terminated = true; |
98 | 159 |
|
99 | | - @Nullable |
100 | | - public Throwable getError() { |
101 | | - return processor.getError(); |
| 160 | + final CoreSubscriber<? super O> a = actual; |
| 161 | + ACTUAL.lazySet(this, null); |
| 162 | + if (a != null) { |
| 163 | + a.onError(cause); |
| 164 | + } |
102 | 165 | } |
103 | 166 |
|
104 | | - public boolean isCancelled() { |
105 | | - return processor.isCancelled(); |
106 | | - } |
| 167 | + @Override |
| 168 | + @SuppressWarnings("unchecked") |
| 169 | + public final void onNext(@Nullable O value) { |
| 170 | + final Subscription s; |
| 171 | + if ((s = UPSTREAM.getAndSet(this, Operators.cancelledSubscription())) |
| 172 | + == Operators.cancelledSubscription()) { |
| 173 | + if (value != null) { |
| 174 | + Operators.onNextDropped(value, currentContext()); |
| 175 | + } |
| 176 | + return; |
| 177 | + } |
107 | 178 |
|
108 | | - public boolean isError() { |
109 | | - return processor.isError(); |
110 | | - } |
| 179 | + this.value = value; |
| 180 | + terminated = true; |
111 | 181 |
|
112 | | - public boolean isSuccess() { |
113 | | - return processor.isSuccess(); |
| 182 | + final CoreSubscriber<? super O> a = actual; |
| 183 | + ACTUAL.lazySet(this, null); |
| 184 | + if (value == null) { |
| 185 | + if (a != null) { |
| 186 | + a.onComplete(); |
| 187 | + } |
| 188 | + } else { |
| 189 | + if (s != null) { |
| 190 | + s.cancel(); |
| 191 | + } |
| 192 | + |
| 193 | + if (a != null) { |
| 194 | + a.onNext(value); |
| 195 | + a.onComplete(); |
| 196 | + } |
| 197 | + } |
114 | 198 | } |
115 | 199 |
|
116 | | - public boolean isTerminated() { |
117 | | - return processor.isTerminated(); |
| 200 | + @Override |
| 201 | + public final void onSubscribe(Subscription subscription) { |
| 202 | + if (Operators.setOnce(UPSTREAM, this, subscription)) { |
| 203 | + subscription.request(Long.MAX_VALUE); |
| 204 | + } |
118 | 205 | } |
119 | 206 |
|
| 207 | + /** |
| 208 | + * Returns the value that completed this {@link UnicastMonoProcessor}. Returns {@code null} if the |
| 209 | + * {@link UnicastMonoProcessor} has not been completed. If the {@link UnicastMonoProcessor} is |
| 210 | + * completed with an error a RuntimeException that wraps the error is thrown. |
| 211 | + * |
| 212 | + * @return the value that completed the {@link UnicastMonoProcessor}, or {@code null} if it has |
| 213 | + * not been completed |
| 214 | + * @throws RuntimeException if the {@link UnicastMonoProcessor} was completed with an error |
| 215 | + */ |
120 | 216 | @Nullable |
121 | 217 | public O peek() { |
122 | | - return processor.peek(); |
123 | | - } |
124 | | - |
125 | | - public long downstreamCount() { |
126 | | - return processor.downstreamCount(); |
127 | | - } |
128 | | - |
129 | | - public boolean hasDownstreams() { |
130 | | - return processor.hasDownstreams(); |
131 | | - } |
| 218 | + if (!isTerminated()) { |
| 219 | + return null; |
| 220 | + } |
132 | 221 |
|
133 | | - @Override |
134 | | - public void onComplete() { |
135 | | - processor.onComplete(); |
136 | | - } |
| 222 | + if (value != null) { |
| 223 | + return value; |
| 224 | + } |
137 | 225 |
|
138 | | - @Override |
139 | | - public void request(long n) { |
140 | | - processor.request(n); |
141 | | - } |
| 226 | + if (error != null) { |
| 227 | + RuntimeException re = Exceptions.propagate(error); |
| 228 | + re = Exceptions.addSuppressed(re, new Exception("Mono#peek terminated with an error")); |
| 229 | + throw re; |
| 230 | + } |
142 | 231 |
|
143 | | - @Override |
144 | | - public void cancel() { |
145 | | - processor.cancel(); |
| 232 | + return null; |
146 | 233 | } |
147 | 234 |
|
148 | 235 | @Override |
149 | | - public void dispose() { |
150 | | - processor.dispose(); |
| 236 | + public final void request(long n) { |
| 237 | + Operators.validate(n); |
151 | 238 | } |
152 | 239 |
|
153 | 240 | @Override |
154 | 241 | public Context currentContext() { |
155 | | - return processor.currentContext(); |
| 242 | + final CoreSubscriber<? super O> a = this.actual; |
| 243 | + return a != null ? a.currentContext() : Context.empty(); |
156 | 244 | } |
157 | 245 |
|
158 | 246 | @Override |
159 | | - public boolean isDisposed() { |
160 | | - return processor.isDisposed(); |
| 247 | + @Nullable |
| 248 | + public Object scanUnsafe(Attr key) { |
| 249 | + // touch guard |
| 250 | + boolean c = isCancelled(); |
| 251 | + |
| 252 | + if (key == Attr.TERMINATED) { |
| 253 | + return isTerminated(); |
| 254 | + } |
| 255 | + if (key == Attr.PARENT) { |
| 256 | + return subscription; |
| 257 | + } |
| 258 | + if (key == Attr.ERROR) { |
| 259 | + return error; |
| 260 | + } |
| 261 | + if (key == Attr.PREFETCH) { |
| 262 | + return Integer.MAX_VALUE; |
| 263 | + } |
| 264 | + if (key == Attr.CANCELLED) { |
| 265 | + return c; |
| 266 | + } |
| 267 | + return null; |
161 | 268 | } |
162 | 269 |
|
163 | | - @Override |
164 | | - public Object scanUnsafe(Attr key) { |
165 | | - return processor.scanUnsafe(key); |
| 270 | + /** |
| 271 | + * Return true if any {@link Subscriber} is actively subscribed |
| 272 | + * |
| 273 | + * @return true if any {@link Subscriber} is actively subscribed |
| 274 | + */ |
| 275 | + public final boolean hasDownstream() { |
| 276 | + return actual != null; |
166 | 277 | } |
167 | 278 |
|
168 | 279 | @Override |
169 | 280 | public void subscribe(CoreSubscriber<? super O> actual) { |
170 | 281 | Objects.requireNonNull(actual, "subscribe"); |
171 | 282 | if (once == 0 && ONCE.compareAndSet(this, 0, 1)) { |
172 | | - processor.subscribe(actual); |
| 283 | + actual.onSubscribe(this); |
| 284 | + ACTUAL.lazySet(this, actual); |
| 285 | + if (isTerminated()) { |
| 286 | + Throwable ex = error; |
| 287 | + if (ex != null) { |
| 288 | + actual.onError(ex); |
| 289 | + } else { |
| 290 | + O v = value; |
| 291 | + if (v != null) { |
| 292 | + actual.onNext(v); |
| 293 | + } |
| 294 | + actual.onComplete(); |
| 295 | + } |
| 296 | + ACTUAL.lazySet(this, null); |
| 297 | + } |
173 | 298 | } else { |
174 | 299 | Operators.error( |
175 | 300 | actual, |
|
0 commit comments