-
Notifications
You must be signed in to change notification settings - Fork 641
Description
Search before asking
- I had searched in the issues and found no similar issues.
Environment
Linux
EventMesh version
1.10.0
What happened
我这应用系统集成1.10的sdk,对接eventmesh服务端,用grpc协议接收消息,rocketmq是双机集群,发现有时能收到消息有时收不到,
eventmesh 服务端总是报如下错误,
2024-04-11 19:15:24,290 ERROR [eventMesh-grpc-pushMsg-2] StreamPushRequest(StreamPushRequest.java:91) - message|eventMesh2client|exception=Stream is already completed, no further calls are allowed |emitter|topic=TEST-TOPIC-GRPC-SYNC|bizSeqNo=949462082066922981761046640402|uniqueId=988003831724711254497105493010|cost=0
java.lang.IllegalStateException: Stream is already completed, no further calls are allowed
at com.google.common.base.Preconditions.checkState(Preconditions.java:502) ~[guava-31.0.1-jre.jar:?]
at io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl.onNext(ServerCalls.java:375) ~[grpc-stub-1.43.2.jar:1.43.2]
at org.apache.eventmesh.runtime.core.protocol.grpc.push.StreamPushRequest.tryPushRequest(StreamPushRequest.java:81) ~[eventmesh-runtime-1.10.0-release.jar:1.10.0-release]
at org.apache.eventmesh.runtime.core.protocol.grpc.push.MessageHandler.lambda$handle$1(MessageHandler.java:75) ~[eventmesh-runtime-1.10.0-release.jar:1.10.0-release]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_181]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_181]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_181]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_181]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_181]
我把EventEmitter 这类改成如下方式,客户端肯定能收到消息,没有消息会丢或者收不到。
但是clustering模式失效了,变成广播模式接收了,即所有节点能收到所有的消息。
不知道作者们,能不能处理一下这个问题。
package org.apache.eventmesh.runtime.core.protocol.grpc.service;
import io.grpc.stub.StreamObserver;
import lombok.extern.slf4j.Slf4j;
@slf4j
public class EventEmitter {
private boolean inError = false;
private final StreamObserver<T> emitter;
public EventEmitter(StreamObserver<T> emitter) {
this.emitter = emitter;
}
public synchronized void onNext(T event) {
if (inError) {
return;
}
try {
emitter.onNext(event);
} catch (final Exception e) {
log.warn("StreamObserver Error onNext. {}", e.getMessage());
onError(e);
}
// try {
// emitter.onNext(event);
// } catch (Exception e) {
// log.warn("StreamObserver Error onNext. {}", e.getMessage());
// }
}
public synchronized void onCompleted() {
if (inError) {
return;
}
try {
emitter.onCompleted();
} catch (final Exception e) {
log.warn("StreamObserver Error onCompleted. {}", e.getMessage());
onError(e);
}
// try {
// emitter.onCompleted();
// } catch (Exception e) {
// log.warn("StreamObserver Error onCompleted. {}", e.getMessage());
// }
}
public synchronized void onError(Throwable t) {
emitter.onError(t);
inError = true;
log.warn("StreamObserver Error onError. {}", t.getMessage());
// try {
// emitter.onError(t);
// } catch (Exception e) {
// log.warn("StreamObserver Error onError. {}", e.getMessage());
// }
}
public StreamObserver<T> getEmitter() {
return emitter;
}
}
How to reproduce
我这应用系统集成1.10的sdk,对接eventmesh服务端,用grpc协议接收消息,发现有时能收到消息有时收不到,
eventmesh 服务端总是报错。
Debug logs
2024-04-11 19:15:24,290 ERROR [eventMesh-grpc-pushMsg-2] StreamPushRequest(StreamPushRequest.java:91) - message|eventMesh2client|exception=Stream is already completed, no further calls are allowed |emitter|topic=TEST-TOPIC-GRPC-SYNC|bizSeqNo=949462082066922981761046640402|uniqueId=988003831724711254497105493010|cost=0
java.lang.IllegalStateException: Stream is already completed, no further calls are allowed
at com.google.common.base.Preconditions.checkState(Preconditions.java:502) ~[guava-31.0.1-jre.jar:?]
at io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl.onNext(ServerCalls.java:375) ~[grpc-stub-1.43.2.jar:1.43.2]
at org.apache.eventmesh.runtime.core.protocol.grpc.push.StreamPushRequest.tryPushRequest(StreamPushRequest.java:81) ~[eventmesh-runtime-1.10.0-release.jar:1.10.0-release]
at org.apache.eventmesh.runtime.core.protocol.grpc.push.MessageHandler.lambda$handle$1(MessageHandler.java:75) ~[eventmesh-runtime-1.10.0-release.jar:1.10.0-release]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_181]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_181]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_181]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_181]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_181]Are you willing to submit PR?
- Yes I am willing to submit a PR!
Code of Conduct
- I agree to follow this project's Code of Conduct *