-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
1 parent
c9d5fb4
commit e771b69
Showing
26 changed files
with
1,448 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,3 @@ | ||
build/ | ||
.gradle | ||
.idea |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,29 @@ | ||
import com.google.protobuf.gradle.* | ||
import org.gradle.kotlin.dsl.provider.gradleKotlinDslOf | ||
|
||
plugins { | ||
idea | ||
id("com.google.protobuf") version "0.8.18" | ||
kotlin("jvm") version Constants.kotlinVersion | ||
} | ||
|
||
group = "org.shehata" | ||
version = "1.0-SNAPSHOT" | ||
|
||
repositories { | ||
google() | ||
mavenCentral() | ||
} | ||
|
||
dependencies { | ||
implementation(kotlin("stdlib")) | ||
|
||
implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:${Constants.coroutinesVersion}") | ||
|
||
implementation("io.grpc:grpc-stub:${Constants.grpcVersion}") | ||
implementation("io.grpc:grpc-netty:${Constants.grpcVersion}") | ||
implementation("io.grpc:grpc-protobuf:${Constants.grpcVersion}") | ||
implementation("com.google.protobuf:protobuf-kotlin:${Constants.protobufVersion}") | ||
|
||
implementation("javax.annotation:javax.annotation-api:1.3.2") | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,7 @@ | ||
plugins { | ||
`kotlin-dsl` | ||
} | ||
|
||
repositories { | ||
mavenCentral() | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,7 @@ | ||
object Constants { | ||
const val grpcVersion = "1.43.2" | ||
const val kotlinVersion = "1.6.10" | ||
const val protobufVersion = "3.19.3" | ||
const val coroutinesVersion = "1.6.0" | ||
const val grpcKotlinVersion = "1.2.0" | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,22 @@ | ||
plugins { | ||
idea | ||
application | ||
kotlin("jvm") | ||
} | ||
|
||
group = "org.shehata" | ||
version = "1.0-SNAPSHOT" | ||
|
||
repositories { | ||
mavenCentral() | ||
} | ||
|
||
dependencies { | ||
implementation(project(":stub")) | ||
|
||
runtimeOnly("io.grpc:grpc-netty:${Constants.grpcVersion}") | ||
} | ||
|
||
tasks.named("startScripts") { | ||
enabled = false | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,281 @@ | ||
import com.google.common.collect.Lists; | ||
import com.google.protobuf.Empty; | ||
import com.google.protobuf.StringValue; | ||
|
||
import java.util.Arrays; | ||
import java.util.concurrent.CountDownLatch; | ||
import java.util.concurrent.TimeUnit; | ||
import java.util.logging.Logger; | ||
|
||
import environment.EnvData; | ||
import environment.EnvironmentGrpc; | ||
import environment.Sensor; | ||
import environment.SensorType; | ||
import environment.SensorTypes; | ||
import environment.SensorValues; | ||
import io.grpc.Channel; | ||
import io.grpc.ManagedChannelBuilder; | ||
import io.grpc.stub.StreamObserver; | ||
|
||
public class Main { | ||
static final int PORT = 5002; | ||
static final String NAME = "localhost"; | ||
static final Logger mLogger = Logger.getLogger(Main.class.getName()); | ||
|
||
/** | ||
* builds the communication to the server | ||
* and calls the methods that request the server | ||
* | ||
* @param _args | ||
*/ | ||
public static void main(String[] _args) { | ||
var channel = ManagedChannelBuilder | ||
.forAddress(NAME, PORT) | ||
.usePlaintext() | ||
.intercept(new MetaDataInterceptorJava()) | ||
.build(); | ||
|
||
blockingEnvironmentTest(channel); | ||
|
||
try { | ||
asyncEnvironmentTest(channel); | ||
} catch (InterruptedException _e) { | ||
mLogger.info("InterruptedException: " + _e.getMessage()); | ||
} | ||
|
||
channel.shutdown(); | ||
} | ||
|
||
/** | ||
* this uses the blocking stub which means the Executor will not do anything if | ||
* one request takes a large amount of time to finish | ||
* | ||
* @param _channel | ||
*/ | ||
static void blockingEnvironmentTest(Channel _channel) { | ||
final var stub = EnvironmentGrpc.newBlockingStub(_channel); | ||
|
||
// =========== UNARY =========== | ||
final var types = stub.requestEnvironmentDataTypes(Empty.newBuilder().build()); | ||
mLogger.info("requestEnvironmentDataTypes Response ->: \n" + types); | ||
|
||
final var sensor = Sensor | ||
.newBuilder() | ||
.setValue(SensorType.HUMIDITY) | ||
.build(); | ||
final var data = stub.requestEnvironmentData(sensor); | ||
mLogger.info("requestEnvironmentData Humidity: \n " + data); | ||
|
||
final var s = stub.requestAll(Empty.newBuilder().build()); | ||
mLogger.info("requestAll: \n " + Arrays.toString(Lists.newArrayList(s).toArray())); | ||
} | ||
|
||
/** | ||
* calls all methods async | ||
* to make the async Requests sequential and still non-blocking we are using `CountDownLatch` | ||
* | ||
* @param _channel | ||
* @throws InterruptedException | ||
*/ | ||
static void asyncEnvironmentTest(Channel _channel) throws InterruptedException { | ||
var stub = EnvironmentGrpc.newStub(_channel); | ||
|
||
mLogger.info("========== Requests started ==========\n"); | ||
|
||
// =========== UNARY =========== | ||
requestEnvironmentDataTypes(stub); | ||
requestEnvironmentData(stub, ""); | ||
// =========== CLIENT STREAMING =========== | ||
setValues(stub); | ||
// =========== UNARY =========== | ||
requestEnvironmentData(stub, "after setValues"); | ||
// =========== SERVER STREAMING =========== | ||
requestAll(stub); | ||
|
||
mLogger.info("========== Requests finished =========="); | ||
} | ||
|
||
static void requestEnvironmentDataTypes(EnvironmentGrpc.EnvironmentStub _stub) { | ||
final var finishLatch = new CountDownLatch(1); | ||
|
||
mLogger.info("-- REQUEST_ENVIRONMENT_DATA_TYPES started!\n"); | ||
_stub.requestEnvironmentDataTypes(Empty.newBuilder().build(), new StreamObserver<>() { | ||
@Override | ||
public void onNext(SensorTypes _value) { | ||
mLogger.info("requestEnvironmentDataTypes Response ->: \n" + _value); | ||
} | ||
|
||
@Override | ||
public void onError(Throwable _t) { | ||
mLogger.info("Error: " + _t.getMessage()); | ||
} | ||
|
||
@Override | ||
public void onCompleted() { | ||
mLogger.info("-- REQUEST_ENVIRONMENT_DATA_TYPES Finished!\n"); | ||
finishLatch.countDown(); | ||
} | ||
}); | ||
|
||
if (finishLatch.getCount() == 0) { | ||
mLogger.warning("requestEnvironmentData completed or errored before we finished sending."); | ||
} | ||
|
||
try { | ||
if (!finishLatch.await(1, TimeUnit.SECONDS)) { | ||
mLogger.warning("requestEnvironmentDataTypes FAILED : cannot finish within 10 seconds"); | ||
} | ||
} catch (InterruptedException _e) { | ||
_e.printStackTrace(); | ||
} | ||
} | ||
|
||
static void requestEnvironmentData(EnvironmentGrpc.EnvironmentStub _stub, String _msg) { | ||
final var finishLatch = new CountDownLatch(1); | ||
|
||
mLogger.info("-- REQUEST_ENVIRONMENT_DATA started!\n"); | ||
final var sensor = Sensor | ||
.newBuilder() | ||
.setValue(SensorType.HUMIDITY) | ||
.build(); | ||
|
||
_stub.requestEnvironmentData(sensor, new StreamObserver<>() { | ||
@Override | ||
public void onNext(EnvData _value) { | ||
mLogger.info("requestEnvironmentData " + _msg + "\n" | ||
+ _value.getSensor().name() | ||
+ " - Timestamp: " + _value.getTimestamp().getSeconds() | ||
+ " - Values: " + _value.getValuesList() | ||
+ "\n" | ||
); | ||
} | ||
|
||
@Override | ||
public void onError(Throwable _t) { | ||
mLogger.info("Error: " + _t.getMessage()); | ||
} | ||
|
||
@Override | ||
public void onCompleted() { | ||
mLogger.info("-- REQUEST_ENVIRONMENT_DATA Finished!\n"); | ||
finishLatch.countDown(); | ||
} | ||
}); | ||
|
||
if (finishLatch.getCount() == 0) { | ||
mLogger.warning("requestEnvironmentData completed or errored before we finished sending."); | ||
} | ||
|
||
try { | ||
if (!finishLatch.await(2, TimeUnit.SECONDS)) { | ||
mLogger.warning("requestEnvironmentData FAILED : cannot finish within 10 seconds"); | ||
} | ||
} catch (InterruptedException _e) { | ||
_e.printStackTrace(); | ||
} | ||
} | ||
|
||
static void setValues(EnvironmentGrpc.EnvironmentStub _stub) throws InterruptedException { | ||
final var finishLatch = new CountDownLatch(1); | ||
|
||
mLogger.info("-- SET_VALUES started!\n"); | ||
final var requestObserver = _stub.setValues(new StreamObserver<>() { | ||
@Override | ||
public void onNext(StringValue _value) { | ||
mLogger.info("setValues Response : " + _value.getValue() + "\n"); | ||
} | ||
|
||
@Override | ||
public void onError(Throwable _t) { | ||
mLogger.info("Error: " + _t.getMessage()); | ||
} | ||
|
||
@Override | ||
public void onCompleted() { | ||
mLogger.info("-- SET_VALUES finished!\n"); | ||
finishLatch.countDown(); | ||
} | ||
}); | ||
|
||
requestObserver.onNext( | ||
SensorValues | ||
.newBuilder() | ||
.setSensor(SensorType.HUMIDITY) | ||
.addValues(5) | ||
.build() | ||
); | ||
|
||
TimeUnit.MILLISECONDS.sleep(1000); | ||
requestObserver.onNext( | ||
SensorValues | ||
.newBuilder() | ||
.setSensor(SensorType.HUMIDITY) | ||
.addValues(5) | ||
.addValues(15) | ||
.build() | ||
); | ||
|
||
TimeUnit.MILLISECONDS.sleep(1000); | ||
requestObserver.onNext( | ||
SensorValues | ||
.newBuilder() | ||
.setSensor(SensorType.PRESSURE) | ||
.addValues(349) | ||
.build() | ||
); | ||
|
||
if (finishLatch.getCount() == 0) { | ||
mLogger.warning("setValues completed or errored before we finished sending."); | ||
return; | ||
} | ||
|
||
requestObserver.onCompleted(); | ||
|
||
try { | ||
if (!finishLatch.await(10, TimeUnit.SECONDS)) { | ||
mLogger.warning("setValues FAILED : cannot finish within 10 seconds"); | ||
} | ||
} catch (InterruptedException e) { | ||
e.printStackTrace(); | ||
} | ||
} | ||
|
||
static void requestAll(EnvironmentGrpc.EnvironmentStub _stub) { | ||
final var finishLatch = new CountDownLatch(1); | ||
|
||
mLogger.info("-- REQUEST_ALL started!\n"); | ||
_stub.requestAll(Empty.newBuilder().build(), new StreamObserver<>() { | ||
@Override | ||
public void onNext(EnvData _value) { | ||
mLogger.info(_value.getSensor().name() | ||
+ " - Timestamp: " + _value.getTimestamp() | ||
.getSeconds() | ||
+ " - Values: " | ||
+ _value.getValuesList() + "\n"); | ||
} | ||
|
||
@Override | ||
public void onError(Throwable _t) { | ||
mLogger.info("Error: " + _t.getMessage()); | ||
} | ||
|
||
@Override | ||
public void onCompleted() { | ||
mLogger.info("-- REQUEST_ALL finished!\n"); | ||
finishLatch.countDown(); | ||
} | ||
}); | ||
|
||
if (finishLatch.getCount() == 0) { | ||
mLogger.warning("requestEnvironmentData completed or errored before we finished sending."); | ||
} | ||
|
||
try { | ||
if (!finishLatch.await(10, TimeUnit.SECONDS)) { | ||
mLogger.warning("requestEnvironmentData FAILED : cannot finish within 10 seconds"); | ||
} | ||
} catch (InterruptedException _e) { | ||
_e.printStackTrace(); | ||
} | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,25 @@ | ||
import io.grpc.*; | ||
|
||
import static io.grpc.Metadata.ASCII_STRING_MARSHALLER; | ||
|
||
public class MetaDataInterceptorJava implements ClientInterceptor { | ||
@Override | ||
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall( | ||
MethodDescriptor<ReqT, RespT> _method, | ||
CallOptions _callOptions, | ||
Channel _next | ||
) { | ||
return new ForwardingClientCall.SimpleForwardingClientCall<>(_next.newCall(_method, _callOptions)) { | ||
/** | ||
* Metadata are like Headers in HTTP | ||
* because grpc uses a binary format the string needs to be encoded with the | ||
* help of the `ASCII_STRING_MARSHALLER` | ||
*/ | ||
@Override | ||
public void start(Listener<RespT> responseListener, Metadata headers) { | ||
headers.put(Metadata.Key.of("MY_MD_1", ASCII_STRING_MARSHALLER), "This is metadata of MY_MD_1"); | ||
super.start(responseListener, headers); | ||
} | ||
}; | ||
} | ||
} |
Oops, something went wrong.