Skip to content

Commit

Permalink
finished grpc server/client
Browse files Browse the repository at this point in the history
  • Loading branch information
abdo643-HULK committed Jan 23, 2022
1 parent c9d5fb4 commit e771b69
Show file tree
Hide file tree
Showing 26 changed files with 1,448 additions and 0 deletions.
3 changes: 3 additions & 0 deletions grpc/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
build/
.gradle
.idea
29 changes: 29 additions & 0 deletions grpc/build.gradle.kts
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
import com.google.protobuf.gradle.*
import org.gradle.kotlin.dsl.provider.gradleKotlinDslOf

plugins {
idea
id("com.google.protobuf") version "0.8.18"
kotlin("jvm") version Constants.kotlinVersion
}

group = "org.shehata"
version = "1.0-SNAPSHOT"

repositories {
google()
mavenCentral()
}

dependencies {
implementation(kotlin("stdlib"))

implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:${Constants.coroutinesVersion}")

implementation("io.grpc:grpc-stub:${Constants.grpcVersion}")
implementation("io.grpc:grpc-netty:${Constants.grpcVersion}")
implementation("io.grpc:grpc-protobuf:${Constants.grpcVersion}")
implementation("com.google.protobuf:protobuf-kotlin:${Constants.protobufVersion}")

implementation("javax.annotation:javax.annotation-api:1.3.2")
}
7 changes: 7 additions & 0 deletions grpc/buildSrc/build.gradle.kts
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
plugins {
`kotlin-dsl`
}

repositories {
mavenCentral()
}
7 changes: 7 additions & 0 deletions grpc/buildSrc/src/main/kotlin/Constants.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
object Constants {
const val grpcVersion = "1.43.2"
const val kotlinVersion = "1.6.10"
const val protobufVersion = "3.19.3"
const val coroutinesVersion = "1.6.0"
const val grpcKotlinVersion = "1.2.0"
}
22 changes: 22 additions & 0 deletions grpc/client/build.gradle.kts
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
plugins {
idea
application
kotlin("jvm")
}

group = "org.shehata"
version = "1.0-SNAPSHOT"

repositories {
mavenCentral()
}

dependencies {
implementation(project(":stub"))

runtimeOnly("io.grpc:grpc-netty:${Constants.grpcVersion}")
}

tasks.named("startScripts") {
enabled = false
}
281 changes: 281 additions & 0 deletions grpc/client/src/main/java/Main.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,281 @@
import com.google.common.collect.Lists;
import com.google.protobuf.Empty;
import com.google.protobuf.StringValue;

import java.util.Arrays;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.logging.Logger;

import environment.EnvData;
import environment.EnvironmentGrpc;
import environment.Sensor;
import environment.SensorType;
import environment.SensorTypes;
import environment.SensorValues;
import io.grpc.Channel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.stub.StreamObserver;

public class Main {
static final int PORT = 5002;
static final String NAME = "localhost";
static final Logger mLogger = Logger.getLogger(Main.class.getName());

/**
* builds the communication to the server
* and calls the methods that request the server
*
* @param _args
*/
public static void main(String[] _args) {
var channel = ManagedChannelBuilder
.forAddress(NAME, PORT)
.usePlaintext()
.intercept(new MetaDataInterceptorJava())
.build();

blockingEnvironmentTest(channel);

try {
asyncEnvironmentTest(channel);
} catch (InterruptedException _e) {
mLogger.info("InterruptedException: " + _e.getMessage());
}

channel.shutdown();
}

/**
* this uses the blocking stub which means the Executor will not do anything if
* one request takes a large amount of time to finish
*
* @param _channel
*/
static void blockingEnvironmentTest(Channel _channel) {
final var stub = EnvironmentGrpc.newBlockingStub(_channel);

// =========== UNARY ===========
final var types = stub.requestEnvironmentDataTypes(Empty.newBuilder().build());
mLogger.info("requestEnvironmentDataTypes Response ->: \n" + types);

final var sensor = Sensor
.newBuilder()
.setValue(SensorType.HUMIDITY)
.build();
final var data = stub.requestEnvironmentData(sensor);
mLogger.info("requestEnvironmentData Humidity: \n " + data);

final var s = stub.requestAll(Empty.newBuilder().build());
mLogger.info("requestAll: \n " + Arrays.toString(Lists.newArrayList(s).toArray()));
}

/**
* calls all methods async
* to make the async Requests sequential and still non-blocking we are using `CountDownLatch`
*
* @param _channel
* @throws InterruptedException
*/
static void asyncEnvironmentTest(Channel _channel) throws InterruptedException {
var stub = EnvironmentGrpc.newStub(_channel);

mLogger.info("========== Requests started ==========\n");

// =========== UNARY ===========
requestEnvironmentDataTypes(stub);
requestEnvironmentData(stub, "");
// =========== CLIENT STREAMING ===========
setValues(stub);
// =========== UNARY ===========
requestEnvironmentData(stub, "after setValues");
// =========== SERVER STREAMING ===========
requestAll(stub);

mLogger.info("========== Requests finished ==========");
}

static void requestEnvironmentDataTypes(EnvironmentGrpc.EnvironmentStub _stub) {
final var finishLatch = new CountDownLatch(1);

mLogger.info("-- REQUEST_ENVIRONMENT_DATA_TYPES started!\n");
_stub.requestEnvironmentDataTypes(Empty.newBuilder().build(), new StreamObserver<>() {
@Override
public void onNext(SensorTypes _value) {
mLogger.info("requestEnvironmentDataTypes Response ->: \n" + _value);
}

@Override
public void onError(Throwable _t) {
mLogger.info("Error: " + _t.getMessage());
}

@Override
public void onCompleted() {
mLogger.info("-- REQUEST_ENVIRONMENT_DATA_TYPES Finished!\n");
finishLatch.countDown();
}
});

if (finishLatch.getCount() == 0) {
mLogger.warning("requestEnvironmentData completed or errored before we finished sending.");
}

try {
if (!finishLatch.await(1, TimeUnit.SECONDS)) {
mLogger.warning("requestEnvironmentDataTypes FAILED : cannot finish within 10 seconds");
}
} catch (InterruptedException _e) {
_e.printStackTrace();
}
}

static void requestEnvironmentData(EnvironmentGrpc.EnvironmentStub _stub, String _msg) {
final var finishLatch = new CountDownLatch(1);

mLogger.info("-- REQUEST_ENVIRONMENT_DATA started!\n");
final var sensor = Sensor
.newBuilder()
.setValue(SensorType.HUMIDITY)
.build();

_stub.requestEnvironmentData(sensor, new StreamObserver<>() {
@Override
public void onNext(EnvData _value) {
mLogger.info("requestEnvironmentData " + _msg + "\n"
+ _value.getSensor().name()
+ " - Timestamp: " + _value.getTimestamp().getSeconds()
+ " - Values: " + _value.getValuesList()
+ "\n"
);
}

@Override
public void onError(Throwable _t) {
mLogger.info("Error: " + _t.getMessage());
}

@Override
public void onCompleted() {
mLogger.info("-- REQUEST_ENVIRONMENT_DATA Finished!\n");
finishLatch.countDown();
}
});

if (finishLatch.getCount() == 0) {
mLogger.warning("requestEnvironmentData completed or errored before we finished sending.");
}

try {
if (!finishLatch.await(2, TimeUnit.SECONDS)) {
mLogger.warning("requestEnvironmentData FAILED : cannot finish within 10 seconds");
}
} catch (InterruptedException _e) {
_e.printStackTrace();
}
}

static void setValues(EnvironmentGrpc.EnvironmentStub _stub) throws InterruptedException {
final var finishLatch = new CountDownLatch(1);

mLogger.info("-- SET_VALUES started!\n");
final var requestObserver = _stub.setValues(new StreamObserver<>() {
@Override
public void onNext(StringValue _value) {
mLogger.info("setValues Response : " + _value.getValue() + "\n");
}

@Override
public void onError(Throwable _t) {
mLogger.info("Error: " + _t.getMessage());
}

@Override
public void onCompleted() {
mLogger.info("-- SET_VALUES finished!\n");
finishLatch.countDown();
}
});

requestObserver.onNext(
SensorValues
.newBuilder()
.setSensor(SensorType.HUMIDITY)
.addValues(5)
.build()
);

TimeUnit.MILLISECONDS.sleep(1000);
requestObserver.onNext(
SensorValues
.newBuilder()
.setSensor(SensorType.HUMIDITY)
.addValues(5)
.addValues(15)
.build()
);

TimeUnit.MILLISECONDS.sleep(1000);
requestObserver.onNext(
SensorValues
.newBuilder()
.setSensor(SensorType.PRESSURE)
.addValues(349)
.build()
);

if (finishLatch.getCount() == 0) {
mLogger.warning("setValues completed or errored before we finished sending.");
return;
}

requestObserver.onCompleted();

try {
if (!finishLatch.await(10, TimeUnit.SECONDS)) {
mLogger.warning("setValues FAILED : cannot finish within 10 seconds");
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}

static void requestAll(EnvironmentGrpc.EnvironmentStub _stub) {
final var finishLatch = new CountDownLatch(1);

mLogger.info("-- REQUEST_ALL started!\n");
_stub.requestAll(Empty.newBuilder().build(), new StreamObserver<>() {
@Override
public void onNext(EnvData _value) {
mLogger.info(_value.getSensor().name()
+ " - Timestamp: " + _value.getTimestamp()
.getSeconds()
+ " - Values: "
+ _value.getValuesList() + "\n");
}

@Override
public void onError(Throwable _t) {
mLogger.info("Error: " + _t.getMessage());
}

@Override
public void onCompleted() {
mLogger.info("-- REQUEST_ALL finished!\n");
finishLatch.countDown();
}
});

if (finishLatch.getCount() == 0) {
mLogger.warning("requestEnvironmentData completed or errored before we finished sending.");
}

try {
if (!finishLatch.await(10, TimeUnit.SECONDS)) {
mLogger.warning("requestEnvironmentData FAILED : cannot finish within 10 seconds");
}
} catch (InterruptedException _e) {
_e.printStackTrace();
}
}
}
25 changes: 25 additions & 0 deletions grpc/client/src/main/java/MetaDataInterceptorJava.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
import io.grpc.*;

import static io.grpc.Metadata.ASCII_STRING_MARSHALLER;

public class MetaDataInterceptorJava implements ClientInterceptor {
@Override
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
MethodDescriptor<ReqT, RespT> _method,
CallOptions _callOptions,
Channel _next
) {
return new ForwardingClientCall.SimpleForwardingClientCall<>(_next.newCall(_method, _callOptions)) {
/**
* Metadata are like Headers in HTTP
* because grpc uses a binary format the string needs to be encoded with the
* help of the `ASCII_STRING_MARSHALLER`
*/
@Override
public void start(Listener<RespT> responseListener, Metadata headers) {
headers.put(Metadata.Key.of("MY_MD_1", ASCII_STRING_MARSHALLER), "This is metadata of MY_MD_1");
super.start(responseListener, headers);
}
};
}
}
Loading

0 comments on commit e771b69

Please sign in to comment.