Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Close consumer when it cannot connect to the broker #175

Open
egast opened this issue May 18, 2020 · 9 comments
Open

Close consumer when it cannot connect to the broker #175

egast opened this issue May 18, 2020 · 9 comments

Comments

@egast
Copy link
Contributor

egast commented May 18, 2020

Add option to close the consumer when it is unable to connect to the broker. This should be configurable i.e. after some timeout and/or after x amount of retries.

@egast egast changed the title Close consumer when it cannot connect the the broker Close consumer when it cannot connect to the broker May 21, 2020
@saraiva132
Copy link

I would very much love to see this! It is extremely annoying to have a consumer hang when it cannot connect to a broker.

WDYT @iravid ?

@svroonland
Copy link
Collaborator

As a workaround, have you considered a .timeout on the stream?

@svroonland
Copy link
Collaborator

@egast @saraiva132 Are you by any chance familiar which how to get connection status or something from the org.apache.kafka.clients.consumer.Consumer or some settings that control the behavior on connection timeout?

@egast
Copy link
Contributor Author

egast commented Sep 28, 2020

@svroonland I don't know if you can get the Kafka connection status from a KafkaConsumer. I think the connection retry behavior is done by the consumer client. I know that Akka Kafka Streams does offer the "close consumer if it cannot connect to Kafka", so I took a quick look at how they do it and from what I understand they just periodically try to get the topic metadata to see if Kafka is still available. Maybe something similar can be implemented for zio-kafka.

@svroonland
Copy link
Collaborator

We could perhaps do something with the lack of a partitions assigned event from the RebalanceListener.

@erikvanoosten
Copy link
Collaborator

We could perhaps do something with the lack of a partitions assigned event from the RebalanceListener.

This would exclude use case that have extra instances standing by. This includes redeployments where you first start new instances, and then stop the old instances.

@svroonland
Copy link
Collaborator

Not sure, but do those instances get an empty list of assigned partitions perhaps?

@erikvanoosten
Copy link
Collaborator

erikvanoosten commented Apr 5, 2024

Not sure, but do those instances get an empty list of assigned partitions perhaps?

Not sure either. It might be that the rebalance listener would not be invoked at all.

@svroonland
Copy link
Collaborator

I like of idea of building a heartbeat mechanism using the listTopics method, as @egast suggested.

I don't think it's always desirable to fail the stream when the consumer cannot connect for some period. In long running applications you may just want to wait until the broker is back online. But it could also signal that you have a wrong configration and the connection cannot be established because of that, in which case it would be good to fail fast. There's also errors that can occur for a specific subscription, i.e. when not authorized for some topic. I don't know if we can detect those and fail the stream, while potentially keeping streams for other subscriptions running.

In any case we could have a diagnostic event Connected / Disconnected, which would allow the user to decide how to handle connection issues. A new metric 'connection state', like we have for subscription state would also allow for monitoring and alerting.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants