Skip to content

[Bug] Stream is already completed, no further calls are allowed  #4829

@9997766

Description

@9997766

Search before asking

  • I had searched in the issues and found no similar issues.

Environment

Linux

EventMesh version

1.10.0

What happened

我这应用系统集成1.10的sdk,对接eventmesh服务端,用grpc协议接收消息,rocketmq是双机集群,发现有时能收到消息有时收不到,
eventmesh 服务端总是报如下错误,

2024-04-11 19:15:24,290 ERROR [eventMesh-grpc-pushMsg-2] StreamPushRequest(StreamPushRequest.java:91) - message|eventMesh2client|exception=Stream is already completed, no further calls are allowed |emitter|topic=TEST-TOPIC-GRPC-SYNC|bizSeqNo=949462082066922981761046640402|uniqueId=988003831724711254497105493010|cost=0
java.lang.IllegalStateException: Stream is already completed, no further calls are allowed
at com.google.common.base.Preconditions.checkState(Preconditions.java:502) ~[guava-31.0.1-jre.jar:?]
at io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl.onNext(ServerCalls.java:375) ~[grpc-stub-1.43.2.jar:1.43.2]
at org.apache.eventmesh.runtime.core.protocol.grpc.push.StreamPushRequest.tryPushRequest(StreamPushRequest.java:81) ~[eventmesh-runtime-1.10.0-release.jar:1.10.0-release]
at org.apache.eventmesh.runtime.core.protocol.grpc.push.MessageHandler.lambda$handle$1(MessageHandler.java:75) ~[eventmesh-runtime-1.10.0-release.jar:1.10.0-release]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_181]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_181]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_181]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_181]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_181]

我把EventEmitter 这类改成如下方式,客户端肯定能收到消息,没有消息会丢或者收不到。
但是clustering模式失效了,变成广播模式接收了,即所有节点能收到所有的消息。
不知道作者们,能不能处理一下这个问题。

package org.apache.eventmesh.runtime.core.protocol.grpc.service;

import io.grpc.stub.StreamObserver;

import lombok.extern.slf4j.Slf4j;

@slf4j
public class EventEmitter {
private boolean inError = false;

private final StreamObserver<T> emitter;

public EventEmitter(StreamObserver<T> emitter) {
    this.emitter = emitter;
}

public synchronized void onNext(T event) {
    if (inError) {
        return;
    }
    try {
        emitter.onNext(event);
    } catch (final Exception e) {
        log.warn("StreamObserver Error onNext. {}", e.getMessage());
        onError(e);
    }

// try {
// emitter.onNext(event);
// } catch (Exception e) {
// log.warn("StreamObserver Error onNext. {}", e.getMessage());
// }
}

public synchronized void onCompleted() {
    if (inError) {
        return;
    }
    try {
        emitter.onCompleted();
    } catch (final Exception e) {
        log.warn("StreamObserver Error onCompleted. {}", e.getMessage());
        onError(e);
    }

// try {
// emitter.onCompleted();
// } catch (Exception e) {
// log.warn("StreamObserver Error onCompleted. {}", e.getMessage());
// }
}

public synchronized void onError(Throwable t) {

    emitter.onError(t);
    inError = true;
    log.warn("StreamObserver Error onError. {}", t.getMessage());

// try {
// emitter.onError(t);
// } catch (Exception e) {
// log.warn("StreamObserver Error onError. {}", e.getMessage());
// }
}

public StreamObserver<T> getEmitter() {
    return emitter;
}

}

How to reproduce

我这应用系统集成1.10的sdk,对接eventmesh服务端,用grpc协议接收消息,发现有时能收到消息有时收不到,
eventmesh 服务端总是报错。

Debug logs

2024-04-11 19:15:24,290 ERROR [eventMesh-grpc-pushMsg-2] StreamPushRequest(StreamPushRequest.java:91) - message|eventMesh2client|exception=Stream is already completed, no further calls are allowed |emitter|topic=TEST-TOPIC-GRPC-SYNC|bizSeqNo=949462082066922981761046640402|uniqueId=988003831724711254497105493010|cost=0
java.lang.IllegalStateException: Stream is already completed, no further calls are allowed
        at com.google.common.base.Preconditions.checkState(Preconditions.java:502) ~[guava-31.0.1-jre.jar:?]
        at io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl.onNext(ServerCalls.java:375) ~[grpc-stub-1.43.2.jar:1.43.2]
        at org.apache.eventmesh.runtime.core.protocol.grpc.push.StreamPushRequest.tryPushRequest(StreamPushRequest.java:81) ~[eventmesh-runtime-1.10.0-release.jar:1.10.0-release]
        at org.apache.eventmesh.runtime.core.protocol.grpc.push.MessageHandler.lambda$handle$1(MessageHandler.java:75) ~[eventmesh-runtime-1.10.0-release.jar:1.10.0-release]
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_181]
        at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_181]
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_181]
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_181]
        at java.lang.Thread.run(Thread.java:748) [?:1.8.0_181]

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Metadata

Metadata

Assignees

No one assigned

    Labels

    StalebugSomething isn't workingneed researchThis bug haven't been reproduced yet or this feature is under POC

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions