Skip to content

Commit 07d3f6a

Browse files
committed
Move gRPC core to third_party
Half our tests still need to be moved, but that will be for a later time. ------------- Created by MOE: http://code.google.com/p/moe-java MOE_MIGRATED_REVID=67023748
1 parent de5413f commit 07d3f6a

48 files changed

Lines changed: 4484 additions & 0 deletions

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.
Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,117 @@
1+
package com.google.net.stubby;
2+
3+
import com.google.common.base.Preconditions;
4+
import com.google.common.collect.MapMaker;
5+
import com.google.common.logging.FormattingLogger;
6+
import com.google.net.stubby.transport.Transport;
7+
8+
import java.io.InputStream;
9+
import java.util.concurrent.ConcurrentMap;
10+
11+
/**
12+
* Common implementation for {@link Request} and {@link Response} operations
13+
*/
14+
public abstract class AbstractOperation implements Operation {
15+
16+
private static final FormattingLogger logger =
17+
FormattingLogger.getLogger(AbstractOperation.class);
18+
19+
/**
20+
* Allow implementations to associate state with an operation
21+
*/
22+
private ConcurrentMap stash;
23+
private final int id;
24+
private Phase phase;
25+
private Status status;
26+
27+
public AbstractOperation(int id) {
28+
this.id = id;
29+
this.phase = Phase.HEADERS;
30+
stash = new MapMaker().concurrencyLevel(2).makeMap();
31+
}
32+
33+
@Override
34+
public int getId() {
35+
return id;
36+
}
37+
38+
@Override
39+
public Phase getPhase() {
40+
return phase;
41+
}
42+
43+
/**
44+
* Move into the desired phase.
45+
*/
46+
protected Operation progressTo(Phase desiredPhase) {
47+
if (desiredPhase.ordinal() < phase.ordinal()) {
48+
close(new Status(Transport.Code.INTERNAL,
49+
"Canot move to " + desiredPhase.name() + " from " + phase.name()));
50+
} else {
51+
phase = desiredPhase;
52+
}
53+
return this;
54+
}
55+
56+
@Override
57+
public Operation addContext(String type, InputStream message, Phase nextPhase) {
58+
if (getPhase() == Phase.CLOSED) {
59+
throw new RuntimeException("addContext called after operation closed");
60+
}
61+
if (phase == Phase.PAYLOAD) {
62+
progressTo(Phase.FOOTERS);
63+
}
64+
if (phase == Phase.HEADERS || phase == Phase.FOOTERS) {
65+
return progressTo(nextPhase);
66+
}
67+
throw new IllegalStateException("Cannot add context in phase " + phase.name());
68+
}
69+
70+
@Override
71+
public Operation addPayload(InputStream payload, Phase nextPhase) {
72+
if (getPhase() == Phase.CLOSED) {
73+
throw new RuntimeException("addPayload called after operation closed");
74+
}
75+
if (phase == Phase.HEADERS) {
76+
progressTo(Phase.PAYLOAD);
77+
}
78+
if (phase == Phase.PAYLOAD) {
79+
return progressTo(nextPhase);
80+
}
81+
throw new IllegalStateException("Cannot add payload in phase " + phase.name());
82+
}
83+
84+
@Override
85+
public Operation close(Status status) {
86+
// TODO(user): Handle synchronization properly.
87+
Preconditions.checkNotNull(status, "status");
88+
this.phase = Phase.CLOSED;
89+
if (this.status != null && this.status.getCode() != status.getCode()) {
90+
logger.severefmt(status.getCause(),
91+
"Attempting to override status of already closed operation from %s to %s",
92+
this.status.getCode(), status.getCode());
93+
}
94+
this.status = status;
95+
return this;
96+
}
97+
98+
@Override
99+
public Status getStatus() {
100+
return status;
101+
}
102+
103+
@Override
104+
public <E> E put(Object key, E value) {
105+
return (E) stash.put(key, value);
106+
}
107+
108+
@Override
109+
public <E> E get(Object key) {
110+
return (E) stash.get(key);
111+
}
112+
113+
@Override
114+
public <E> E remove(Object key) {
115+
return (E) stash.remove(key);
116+
}
117+
}
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
package com.google.net.stubby;
2+
3+
/**
4+
* Common implementation for {@link Request} objects.
5+
*/
6+
public abstract class AbstractRequest extends AbstractOperation implements Request {
7+
8+
private final Response response;
9+
10+
/**
11+
* Constructor that takes a pre-built {@link Response} and uses it's id
12+
*/
13+
public AbstractRequest(Response response) {
14+
super(response.getId());
15+
this.response = response;
16+
}
17+
18+
/**
19+
* Constructor that takes a {@link Response.ResponseBuilder} to
20+
* be built with the same id as this request
21+
*/
22+
public AbstractRequest(int id, Response.ResponseBuilder responseBuilder) {
23+
super(id);
24+
this.response = responseBuilder.build(id);
25+
}
26+
27+
@Override
28+
public Response getResponse() {
29+
return response;
30+
}
31+
}
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
package com.google.net.stubby;
2+
3+
/**
4+
* Common implementation for {@link Response} objects.
5+
*/
6+
public class AbstractResponse extends AbstractOperation implements Response {
7+
8+
public AbstractResponse(int id) {
9+
super(id);
10+
}
11+
}
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
package com.google.net.stubby;
2+
3+
import java.io.IOException;
4+
import java.io.InputStream;
5+
import java.io.OutputStream;
6+
7+
/**
8+
* Extension to {@link InputStream} to allow for deferred production of data. Allows for
9+
* zero-copy conversions when the goal is to copy the contents of a resource to a
10+
* stream or buffer.
11+
*/
12+
public abstract class DeferredInputStream extends InputStream {
13+
14+
/**
15+
* Produce the entire contents of this stream to the specified target
16+
*
17+
* @return number of bytes written
18+
*/
19+
public abstract int flushTo(OutputStream target) throws IOException;
20+
}
Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
package com.google.net.stubby;
2+
3+
import com.google.common.io.ByteStreams;
4+
import com.google.protobuf.CodedOutputStream;
5+
import com.google.protobuf.MessageLite;
6+
7+
import java.io.ByteArrayInputStream;
8+
import java.io.IOException;
9+
import java.io.OutputStream;
10+
11+
import javax.annotation.Nullable;
12+
13+
/**
14+
* Implementation of {@link DeferredInputStream} backed by a protobuf.
15+
*/
16+
public class DeferredProtoInputStream extends DeferredInputStream {
17+
18+
// DeferredProtoInputStream is first initialized with a *message*. *partial* is initially null.
19+
// Once there has been a read operation on this stream, *message* is serialized to *partial* and
20+
// set to null.
21+
@Nullable private MessageLite message;
22+
@Nullable private ByteArrayInputStream partial;
23+
24+
public DeferredProtoInputStream(MessageLite message) {
25+
this.message = message;
26+
}
27+
28+
/**
29+
* Returns the original protobuf message. Returns null after this stream has been read.
30+
*/
31+
@Nullable
32+
public MessageLite getMessage() {
33+
return message;
34+
}
35+
36+
@Override
37+
public int flushTo(OutputStream target) throws IOException {
38+
int written;
39+
if (message != null) {
40+
written = message.getSerializedSize();
41+
message.writeTo(target);
42+
message = null;
43+
} else {
44+
written = (int) ByteStreams.copy(partial, target);
45+
partial = null;
46+
}
47+
return written;
48+
}
49+
50+
@Override
51+
public int read() throws IOException {
52+
if (message != null) {
53+
partial = new ByteArrayInputStream(message.toByteArray());
54+
message = null;
55+
}
56+
if (partial != null) {
57+
return partial.read();
58+
}
59+
return -1;
60+
}
61+
62+
@Override
63+
public int read(byte[] b, int off, int len) throws IOException {
64+
if (message != null) {
65+
int size = message.getSerializedSize();
66+
if (len >= size) {
67+
// This is the only case that is zero-copy.
68+
CodedOutputStream stream = CodedOutputStream.newInstance(b, off, size);
69+
message.writeTo(stream);
70+
stream.flush();
71+
stream.checkNoSpaceLeft();
72+
73+
message = null;
74+
partial = null;
75+
return size;
76+
}
77+
78+
partial = new ByteArrayInputStream(message.toByteArray());
79+
message = null;
80+
}
81+
if (partial != null) {
82+
return partial.read(b, off, len);
83+
}
84+
return -1;
85+
}
86+
87+
@Override
88+
public int available() throws IOException {
89+
if (message != null) {
90+
return message.getSerializedSize();
91+
} else if (partial != null) {
92+
return partial.available();
93+
}
94+
return 0;
95+
}
96+
}
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
package com.google.net.stubby;
2+
3+
/**
4+
* Common constants and utilities for GRPC protocol framing.
5+
* The format within the data stream provided by the transport layer is simply
6+
*
7+
* stream = frame+
8+
* frame = frame-type framed-message
9+
* frame-type = payload-type | context-type | status-type
10+
* framed-message = payload | context | status
11+
* payload = length <bytes>
12+
* length = <uint32>
13+
* context = context-key context-value
14+
* context-key = length str
15+
* context-value = length <bytes>
16+
* status = TBD
17+
*
18+
* frame-type is implemented as a bitmask within a single byte
19+
*
20+
*/
21+
public class GrpcFramingUtil {
22+
/**
23+
* Length of flags block in bytes
24+
*/
25+
public static final int FRAME_TYPE_LENGTH = 1;
26+
27+
// Flags
28+
public static final byte PAYLOAD_FRAME = 0x0;
29+
public static final byte CONTEXT_VALUE_FRAME = 0x1;
30+
public static final byte STATUS_FRAME = 0x2;
31+
public static final byte RESERVED_FRAME = 0x3;
32+
public static final byte FRAME_TYPE_MASK = 0x3;
33+
34+
/**
35+
* No. of bytes for length field within a frame
36+
*/
37+
public static final int FRAME_LENGTH = 4;
38+
39+
public static boolean isContextValueFrame(int flags) {
40+
return (flags & FRAME_TYPE_MASK) == CONTEXT_VALUE_FRAME;
41+
}
42+
43+
public static boolean isPayloadFrame(byte flags) {
44+
return (flags & FRAME_TYPE_MASK) == PAYLOAD_FRAME;
45+
}
46+
47+
public static boolean isStatusFrame(byte flags) {
48+
return (flags & FRAME_TYPE_MASK) == STATUS_FRAME;
49+
}
50+
}

0 commit comments

Comments
 (0)