Skip to content

Commit 11c363a

Browse files
nathanmittlerejona86
authored andcommitted
Disallowing message delivery after status has been delivered to the listener.
Summary of changes: 1) Merged the interfaces MessageDeframer2.Sink and DeframerListener into MessageDeframer2.Listener. This simplifies the interface of MessageDeframer2 quite a bit. 2) Added a deliveryPaused() handler to MessageDeframer2.Listener, which is called by the deframer when there is not enough data to read/deliver the next message. 3) Modified AbstractStream and AbstractClientStream to manage the timing of when the closed() event is delivered to the listener. The transportReportStatus ultimately controls this by creating a task to close the listener. It either runs this task immediately or when the next deliveryPaused() event occurs. ------------- Created by MOE: http://code.google.com/p/moe-java MOE_MIGRATED_REVID=83620903
1 parent f8524a1 commit 11c363a

16 files changed

Lines changed: 301 additions & 208 deletions

File tree

core/src/main/java/com/google/net/stubby/ChannelImpl.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -50,8 +50,6 @@
5050
import java.io.InputStream;
5151
import java.util.ArrayList;
5252
import java.util.Collection;
53-
import java.util.Collections;
54-
import java.util.HashSet;
5553
import java.util.concurrent.Callable;
5654
import java.util.concurrent.ExecutorService;
5755
import java.util.logging.Level;

core/src/main/java/com/google/net/stubby/Metadata.java

Lines changed: 2 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,6 @@
4040
import com.google.common.collect.ListMultimap;
4141
import com.google.common.collect.Lists;
4242

43-
import java.util.Iterator;
4443
import java.util.List;
4544
import java.util.Map;
4645
import java.util.Set;
@@ -62,37 +61,6 @@ public abstract class Metadata {
6261
*/
6362
public static final String BINARY_HEADER_SUFFIX = "-bin";
6463

65-
/**
66-
* Interleave keys and values into a single iterator.
67-
*/
68-
private static Iterator<String> fromMapEntries(Iterable<Map.Entry<String, String>> entries) {
69-
final Iterator<Map.Entry<String, String>> iterator = entries.iterator();
70-
return new Iterator<String>() {
71-
Map.Entry<String, String> last;
72-
@Override
73-
public boolean hasNext() {
74-
return last != null || iterator.hasNext();
75-
}
76-
77-
@Override
78-
public String next() {
79-
if (last == null) {
80-
last = iterator.next();
81-
return last.getKey();
82-
} else {
83-
String val = last.getValue();
84-
last = null;
85-
return val;
86-
}
87-
}
88-
89-
@Override
90-
public void remove() {
91-
throw new UnsupportedOperationException();
92-
}
93-
};
94-
}
95-
9664
/**
9765
* Simple metadata marshaller that encodes strings as is.
9866
*
@@ -409,7 +377,7 @@ public abstract static class Key<T> {
409377
/**
410378
* Creates a key for a binary header.
411379
*
412-
* @param name must end with {@link BINARY_HEADER_SUFFIX}
380+
* @param name must end with {@link #BINARY_HEADER_SUFFIX}
413381
*/
414382
public static <T> Key<T> of(String name, BinaryMarshaller<T> marshaller) {
415383
return new BinaryKey<T>(name, marshaller);
@@ -418,7 +386,7 @@ public static <T> Key<T> of(String name, BinaryMarshaller<T> marshaller) {
418386
/**
419387
* Creates a key for a ASCII header.
420388
*
421-
* @param name must not end with {@link BINARY_HEADER_SUFFIX}
389+
* @param name must not end with {@link #BINARY_HEADER_SUFFIX}
422390
*/
423391
public static <T> Key<T> of(String name, AsciiMarshaller<T> marshaller) {
424392
return new AsciiKey<T>(name, marshaller);

core/src/main/java/com/google/net/stubby/SharedResourceHolder.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -118,8 +118,9 @@ public static <T> T release(final Resource<T> resource, final T instance) {
118118
/**
119119
* Visible to unit tests.
120120
*
121-
* @see {@link #get(Resource)}.
121+
* @see #get(Resource)
122122
*/
123+
@SuppressWarnings("unchecked")
123124
synchronized <T> T getInternal(Resource<T> resource) {
124125
Instance instance = instances.get(resource);
125126
if (instance == null) {
@@ -136,8 +137,6 @@ synchronized <T> T getInternal(Resource<T> resource) {
136137

137138
/**
138139
* Visible to unit tests.
139-
*
140-
* @see {@link #releaseInternal(Resource, T)}.
141140
*/
142141
synchronized <T> T releaseInternal(final Resource<T> resource, final T instance) {
143142
final Instance cached = instances.get(resource);

core/src/main/java/com/google/net/stubby/transport/AbstractClientStream.java

Lines changed: 65 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333

3434
import com.google.common.base.MoreObjects;
3535
import com.google.common.base.Preconditions;
36+
import com.google.common.util.concurrent.Futures;
3637
import com.google.common.util.concurrent.ListenableFuture;
3738
import com.google.net.stubby.Metadata;
3839
import com.google.net.stubby.Status;
@@ -52,6 +53,7 @@ public abstract class AbstractClientStream<IdT> extends AbstractStream<IdT>
5253
implements ClientStream {
5354

5455
private static final Logger log = Logger.getLogger(AbstractClientStream.class.getName());
56+
private static final ListenableFuture<Void> COMPLETED_FUTURE = Futures.immediateFuture(null);
5557

5658
private final ClientStreamListener listener;
5759
private boolean listenerClosed;
@@ -60,6 +62,7 @@ public abstract class AbstractClientStream<IdT> extends AbstractStream<IdT>
6062
// transportReportStatus is directly called.
6163
private Status status;
6264
private Metadata.Trailers trailers;
65+
private Runnable closeListenerTask;
6366

6467

6568
protected AbstractClientStream(ClientStreamListener listener, Executor deframerExecutor) {
@@ -69,6 +72,9 @@ protected AbstractClientStream(ClientStreamListener listener, Executor deframerE
6972

7073
@Override
7174
protected ListenableFuture<Void> receiveMessage(InputStream is, int length) {
75+
if (listenerClosed) {
76+
return COMPLETED_FUTURE;
77+
}
7278
return listener.messageRead(is, length);
7379
}
7480

@@ -91,14 +97,14 @@ protected void inboundTransportError(Status errorStatus) {
9197
}
9298
// For transport errors we immediately report status to the application layer
9399
// and do not wait for additional payloads.
94-
transportReportStatus(errorStatus, new Metadata.Trailers());
100+
transportReportStatus(errorStatus, false, new Metadata.Trailers());
95101
}
96102

97103
/**
98104
* Called by transport implementations when they receive headers. When receiving headers
99105
* a transport may determine that there is an error in the protocol at this phase which is
100106
* why this method takes an error {@link Status}. If a transport reports an
101-
* {@link Status.Code#INTERNAL} error
107+
* {@link com.google.net.stubby.Status.Code#INTERNAL} error
102108
*
103109
* @param headers the parsed headers
104110
*/
@@ -131,6 +137,11 @@ protected void inboundDataReceived(Buffer frame) {
131137
deframe(frame, false);
132138
}
133139

140+
@Override
141+
protected void inboundDeliveryPaused() {
142+
runCloseListenerTask();
143+
}
144+
134145
@Override
135146
protected final void deframeFailed(Throwable cause) {
136147
log.log(Level.WARNING, "Exception processing message", cause);
@@ -155,7 +166,7 @@ protected void inboundTrailersReceived(Metadata.Trailers trailers, Status status
155166

156167
@Override
157168
protected void remoteEndClosed() {
158-
transportReportStatus(status, trailers);
169+
transportReportStatus(status, true, trailers);
159170
}
160171

161172
@Override
@@ -173,23 +184,66 @@ protected final void internalSendFrame(ByteBuffer frame, boolean endOfStream) {
173184
protected abstract void sendFrame(ByteBuffer frame, boolean endOfStream);
174185

175186
/**
176-
* Report stream closure with status to the application layer if not already reported.
177-
* This method must be called from the transport thread.
187+
* Report stream closure with status to the application layer if not already reported. This method
188+
* must be called from the transport thread.
178189
*
179190
* @param newStatus the new status to set
180-
* @return {@code} true if the status was not already set.
191+
* @param stopDelivery if {@code true}, interrupts any further delivery of inbound messages that
192+
* may already be queued up in the deframer. If {@code false}, the listener will be
193+
* notified immediately after all currently completed messages in the deframer have been
194+
* delivered to the application.
181195
*/
182-
public boolean transportReportStatus(final Status newStatus, Metadata.Trailers trailers) {
196+
public void transportReportStatus(final Status newStatus, boolean stopDelivery,
197+
final Metadata.Trailers trailers) {
183198
Preconditions.checkNotNull(newStatus, "newStatus");
199+
200+
boolean closingLater = closeListenerTask != null && !stopDelivery;
201+
if (listenerClosed || closingLater) {
202+
// We already closed (or are about to close) the listener.
203+
return;
204+
}
205+
184206
inboundPhase(Phase.STATUS);
185207
status = newStatus;
186-
// Invoke the observer callback which will schedule work onto an application thread
187-
if (!listenerClosed) {
188-
// Status has not been reported to the application layer
208+
closeListenerTask = null;
209+
210+
// Determine if the deframer is stalled (i.e. currently has no complete messages to deliver).
211+
boolean deliveryStalled = !deframer2.isDeliveryOutstanding();
212+
213+
if (stopDelivery || deliveryStalled) {
214+
// Close the listener immediately.
189215
listenerClosed = true;
190216
listener.closed(newStatus, trailers);
217+
} else {
218+
// Delay close until inboundDeliveryStalled()
219+
closeListenerTask = newCloseListenerTask(newStatus, trailers);
220+
}
221+
}
222+
223+
/**
224+
* Creates a new {@link Runnable} to close the listener with the given status/trailers.
225+
*/
226+
private Runnable newCloseListenerTask(final Status status, final Metadata.Trailers trailers) {
227+
return new Runnable() {
228+
@Override
229+
public void run() {
230+
if (!listenerClosed) {
231+
// Status has not been reported to the application layer
232+
listenerClosed = true;
233+
listener.closed(status, trailers);
234+
}
235+
}
236+
};
237+
}
238+
239+
/**
240+
* Executes the pending listener close task, if one exists.
241+
*/
242+
private void runCloseListenerTask() {
243+
if (closeListenerTask != null) {
244+
closeListenerTask.run();
245+
closeListenerTask = null;
191246
}
192-
return true;
193247
}
194248

195249
@Override

core/src/main/java/com/google/net/stubby/transport/AbstractStream.java

Lines changed: 16 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,12 @@ public void onFailure(Throwable t) {
8282
private Phase outboundPhase = Phase.HEADERS;
8383

8484
AbstractStream(Executor deframerExecutor) {
85-
MessageDeframer2.Sink inboundMessageHandler = new MessageDeframer2.Sink() {
85+
MessageDeframer2.Listener inboundMessageHandler = new MessageDeframer2.Listener() {
86+
@Override
87+
public void bytesRead(int numBytes) {
88+
returnProcessedBytes(numBytes);
89+
}
90+
8691
@Override
8792
public ListenableFuture<Void> messageRead(InputStream input, final int length) {
8893
ListenableFuture<Void> future = null;
@@ -94,6 +99,11 @@ public ListenableFuture<Void> messageRead(InputStream input, final int length) {
9499
}
95100
}
96101

102+
@Override
103+
public void deliveryStalled() {
104+
inboundDeliveryPaused();
105+
}
106+
97107
@Override
98108
public void endOfStream() {
99109
remoteEndClosed();
@@ -106,16 +116,8 @@ public void deliverFrame(ByteBuffer frame, boolean endOfStream) {
106116
}
107117
};
108118

109-
// When the deframer reads the required number of bytes for the next message,
110-
// immediately return those bytes to inbound flow control.
111-
DeframerListener listener = new DeframerListener() {
112-
@Override
113-
public void bytesRead(int numBytes) {
114-
returnProcessedBytes(numBytes);
115-
}
116-
};
117119
framer = new MessageFramer2(outboundFrameHandler, 4096);
118-
this.deframer2 = new MessageDeframer2(inboundMessageHandler, deframerExecutor, listener);
120+
this.deframer2 = new MessageDeframer2(inboundMessageHandler, deframerExecutor);
119121
}
120122

121123
/**
@@ -194,6 +196,9 @@ public void dispose() {
194196
/** A message was deframed. */
195197
protected abstract ListenableFuture<Void> receiveMessage(InputStream is, int length);
196198

199+
/** Deframer has no pending deliveries. */
200+
protected abstract void inboundDeliveryPaused();
201+
197202
/** Deframer reached end of stream. */
198203
protected abstract void remoteEndClosed();
199204

@@ -310,6 +315,7 @@ public boolean isClosed() {
310315
return inboundPhase() == Phase.STATUS && outboundPhase() == Phase.STATUS;
311316
}
312317

318+
@Override
313319
public String toString() {
314320
return toStringHelper().toString();
315321
}

core/src/main/java/com/google/net/stubby/transport/DeframerListener.java

Lines changed: 0 additions & 45 deletions
This file was deleted.

0 commit comments

Comments
 (0)