-
Notifications
You must be signed in to change notification settings - Fork 116
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Kafka support? #105
Comments
Not right now. But feel free to submit a PR :-) |
Seems idea of using Kafka as event store is really good. @adrai what is your personal opinion about this feature? May be you see obvious underwater rocks? |
Never used it 🤷♂️ |
Using Kafka as an eventstore is rather awful idea. Each aggregate ( not aggregate type, but aggregate, ie id ) is a separate ordered stream, that needs to be streamed with in order to rebuild the aggregate state ( ie before each command is executed ). Solving this with Kafka is not impossible but highly complex and unperformant ( you have two options, one partition for all events and then streaming them all and filtering on each command, or one partition per aggregate instance, which will get unmanageable and resource intensive over time ), most important Kafka is not designed for this. That does not mean that Kafka is a bad choice for the messaging layer between the parts of your system ( which is out of scope of this library ). |
@nanov hi, Kafka as event store gives us possibility to use it as a Bus and as a Single event store at the same time. We do not need to do replays of events at cold start of new services anymore (for example by requesting it from bus from domain services), we just read events from Kafka with offset 0. Also see https://blog.softwaremill.com/event-sourcing-using-kafka-53dfd72ad45d Furthermore, I already implemented the very first version of Kafka evenstore for cqrs-domain lib and seems it works. |
I do get the advantages regarding the other services ( saga/demoralizer ). |
@nanov
Does it mean that at-least-once is totally fit our requirements? |
proof-of-concept https://github.com/imissyouso/domain-cqrs-kafka-eventstore |
Again, all that is known, still you are loosing the main idea of eventstreams here and compensating for it with something like a snapshot storage. What happens if you want to review some events? All this are pretty trivial right now and come out-of-the-box, in your case you'll have to write some aggregate rebuild logic. I definitely think that kafka is suitable for broker here ( especially for delivering commands to aggregates, as it can allow you huge amount of safe ordered parallelism ) but as an event store, in most cases, i would stick to something built for that. |
@nanov
Could you provide example? |
Its pretty simple, let's say you have an event The simple approach would be just to change the way the handler behaves and then by next rebuild you would have everything in the state, by your approach you would have to fire some custom rebuild script on aggregate in order to update your DB. |
@nanov
|
also, such approach solves our issue thenativeweb/node-cqrs-domain#153 |
I think you don't get how the current, or any classical cqrs/es, implementation works. The fundamental difference here is that you don't have a aggregate state store, state is recreated before each new command, welcome to event-sourcing. I am not saying that your implementation cannot work or has no uses - but it is different you remove one component (es) but introduce a two new ones (kafka, agrgegate state store) and additionally you have to be absoutliey sure that both are 100% synchronized before handling the next command on the same aggregate. Once getting it right, you will benefit from faster writes, which cqrs/es is not about, and loose many of the benefits that rebuilding the state out of stream before handling a new command gives you. Additionally, introducing such a heavy stack one comes to the question if using getEventStore isn't a better fit here as it gives you everything from all worlds. |
yes, I already use var domain = require('cqrs-domain')({
//...
eventStore: createKafkaEventStore({
ackTimeout: 3000, // when we save events to kafka we wait when we will receive them in consumer and only then saving considered completed. If consumer doesn't received it then we throw exception.
client: {kafkaHost: 'kafka:9092'},
sources: [
//offset: 0
{topic: 'events', partition: 0}
],
// See https://github.com/SOHU-Co/kafka-node for parameters description
consumer: {
groupId: 'test',//consumer group id, default `kafka-node-group`
// Auto commit config
autoCommit: true,
autoCommitIntervalMs: 5000,
// The max wait time is the maximum amount of time in milliseconds to block waiting if insufficient data is available at the time the request is issued, default 100ms
fetchMaxWaitMs: 100,
// This is the minimum number of bytes of messages that must be available to give a response, default 1 byte
fetchMinBytes: 1,
// The maximum bytes to include in the message set for this partition. This helps bound the size of the response.
fetchMaxBytes: 1024 * 1024,
// If set true, consumer will fetch message from the given offset in the payloads
fromOffset: true,
// If set to 'buffer', values will be returned as raw buffer objects.
encoding: 'utf8',
keyEncoding: 'utf8'
}
})
//...
}); |
but isn't it technically the same? What's the difference between: a) and b) solutions look exactly the same for me, except that "snapshots" are taken continuously on each event. Events are stored eventually in Kafka in this case, of course. It's not a bus, it's an event store. |
also, Kafka can "compact" events which works similarly to snapshots (...almost, actually it merges similar events rather than making the real snapshot of aggregate) https://medium.com/swlh/introduction-to-topic-log-compaction-in-apache-kafka-3e4d4afd2262 |
Well it's not the same ( snapshots are optional, configurable, and generally avoidable ), taking them out of the way you recreate and reapply event handlers before command, it really is everything except the same. Plus, for snapshots you really don't care if they are created or not, you don't have to wait for them either handle failuers with them, as opposed that by you it is vital that the state store is updated before you can handle next command - otherwise you'll get really unexpected results. end of the day - kafka is not an eventsotre for such long-living cqrs/es use-cases - it is great at streaming data in a partitioned manner, it is made for it. |
the next command will be executed only when we are 100% sure that aggregate was updated, it solves on the code level - we execute
why we can't create partitions per aggregateId? Then we read it in manner - per aggregateId / per partition / per event denormalizer, for example: 10 partitions reads by 10 denomalizer instances. Because ordering of events across different aggregates (then partitions) is not important we can read it in parallel by denormalizers.
yes, how to do repartition is a good question. |
But it is, readmodels can be cross-aggregate and even cross-context, loosing the order here may be vital, depends on design. Same thing applies for sagas. EDIT
Thats similiar to what happens now with |
Don't use Kafka as EventStore. Kafka does not provide consistency guarantees (i.e. via optimistic concurrency check). It will completely mess up your data if you have multiple concurrent processes rebuild an aggregate and create new events (which you will if you just process http requests). See https://dyllanwli.github.io/2019/02/10/Using-Kafka-as-a-Event-Store/#:~:text=Kafka%20does%20not%20support%20optimistic,writer%20at%20the%20application%20level. The same is btw true for the current "published" flag on the event and the concept of publishing (as @nanov mentioned), with the difference that you only end up with duplicated messages (at-least-once) that you need to handle, i.e. idempotency - don't use it, it overcomplicates things in order to have consistent consumers, which you do want. Have your consumers only query the EventStore from their own last persisted position (pull). See https://twitter.com/ih8nickfinding/status/1285490013854208000
You can and it's the best way to do something like that in Kafka (don't), but to rebuild a single aggregate (or a projection thereof), you need to fetch all events from all same aggregate types (=topic) and filter only the ones matching your aggregateId. This will quickly become a bottleneck or PITA.
You noticed the issue and are uninentionally starting to implement a distributed transaction protocol (wrongly at first of course). You need to do this at every single arrow in your architecture plan where you move data outside a single thread of execution. Your architecture will be much more complex if you deal with it. Keep data single source and immutable (=read-only). That's the whole idea of EventSourcing really. |
Is there a plan on introducing Kafka as one of the event store as well?
The text was updated successfully, but these errors were encountered: