Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kafka support? #105

Open
barathank opened this issue Mar 27, 2017 · 22 comments
Open

Kafka support? #105

barathank opened this issue Mar 27, 2017 · 22 comments

Comments

@barathank
Copy link

Is there a plan on introducing Kafka as one of the event store as well?

@adrai
Copy link
Contributor

adrai commented Mar 28, 2017

Not right now. But feel free to submit a PR :-)

@imissyouso
Copy link

Seems idea of using Kafka as event store is really good. @adrai what is your personal opinion about this feature? May be you see obvious underwater rocks?

@adrai
Copy link
Contributor

adrai commented Feb 13, 2020

Never used it 🤷‍♂️

@nanov
Copy link
Contributor

nanov commented Feb 15, 2020

Using Kafka as an eventstore is rather awful idea.

Each aggregate ( not aggregate type, but aggregate, ie id ) is a separate ordered stream, that needs to be streamed with in order to rebuild the aggregate state ( ie before each command is executed ). Solving this with Kafka is not impossible but highly complex and unperformant ( you have two options, one partition for all events and then streaming them all and filtering on each command, or one partition per aggregate instance, which will get unmanageable and resource intensive over time ), most important Kafka is not designed for this.

That does not mean that Kafka is a bad choice for the messaging layer between the parts of your system ( which is out of scope of this library ).

@imissyouso
Copy link

imissyouso commented Feb 16, 2020

@nanov hi,
we don't need to rebuild aggregate state every command, we can cache it in internal DB, like Kafka community suggests to do it using Kafka Streams (RocksDB) or something like this.

Kafka as event store gives us possibility to use it as a Bus and as a Single event store at the same time. We do not need to do replays of events at cold start of new services anymore (for example by requesting it from bus from domain services), we just read events from Kafka with offset 0.
More advantages are described here https://www.confluent.io/blog/event-sourcing-cqrs-stream-processing-apache-kafka-whats-connection/

Also see https://blog.softwaremill.com/event-sourcing-using-kafka-53dfd72ad45d
and sceptical article https://medium.com/serialized-io/apache-kafka-is-not-for-event-sourcing-81735c3cf5c - comments are the most important there.

Furthermore, I already implemented the very first version of Kafka evenstore for cqrs-domain lib and seems it works.

1_jyahaf0wO27PI52k_VqpQw

@nanov
Copy link
Contributor

nanov commented Feb 17, 2020

I do get the advantages regarding the other services ( saga/demoralizer ).
What you basically do is a snapshot per each event, this could work and depending on the use case it may even be a good idea, can you assure exactly-one-semantic this way?

@imissyouso
Copy link

imissyouso commented Feb 18, 2020

@nanov
Kafka provides exactly-once semantics between producer and broker since version 0.11 out of the box (https://www.confluent.io/blog/exactly-once-semantics-are-possible-heres-how-apache-kafka-does-it/), it means that message will be saved in the storage only once without duplication and events losses.
As I understand for providing full exactly-once semantic between producer and consumer we should use their Transaction API on consumer side too, it is fully implemented under the hood of Kafka Streams Java library - I don't think that I will find port for this written on JS (I don't trust to https://github.com/nodefluent/kafka-streams :) ).
Summarizing, in our scenario, we can be sure that Kafka will save our event exactly-once, but our consumers can receive it at-least-once.
But is that really bad?

  1. Duplication of events in Kafka event store will never harm to final aggregate state (which cached in Redis, my implementation) since events are right ordered (Kafka guarantees it in bounds of each partition)
  2. On Denormalizers/Sagas side we use RevisionGuard mechanism for deduplication.

Does it mean that at-least-once is totally fit our requirements?

@imissyouso
Copy link

imissyouso commented Feb 18, 2020

proof-of-concept https://github.com/imissyouso/domain-cqrs-kafka-eventstore
I am going to finish all todo steps listed in readme in nearest future.

@nanov
Copy link
Contributor

nanov commented Feb 18, 2020

Again, all that is known, still you are loosing the main idea of eventstreams here and compensating for it with something like a snapshot storage.

What happens if you want to review some events?
What happens if you want to change how eventhandlers behave?

All this are pretty trivial right now and come out-of-the-box, in your case you'll have to write some aggregate rebuild logic.

I definitely think that kafka is suitable for broker here ( especially for delivering commands to aggregates, as it can allow you huge amount of safe ordered parallelism ) but as an event store, in most cases, i would stick to something built for that.

@imissyouso
Copy link

imissyouso commented Feb 18, 2020

@nanov
thank you for reply, but I feel that I didn't understand:

What happens if you want to change how eventhandlers behave?

Could you provide example?
Thanks in advance for your time.

@nanov
Copy link
Contributor

nanov commented Feb 19, 2020

Its pretty simple, let's say you have an event ProductDescriptionChanged, your first handler implementation just changes the description property on the aggregates state, after some time you decide that you want to also represent the timestamp of the change in your aggregate state in order to do some other business logic validation.

The simple approach would be just to change the way the handler behaves and then by next rebuild you would have everything in the state, by your approach you would have to fire some custom rebuild script on aggregate in order to update your DB.

@imissyouso
Copy link

imissyouso commented Feb 19, 2020

@nanov
thanks for your reply again, I think you did not understand my idea, to clarify I draw this image
Untitled Diagram (5)
so I still do not see any fundamental logical contradictions which can hinder to implement these points

What happens if you want to review some events?
What happens if you want to change how eventhandlers behave?

@imissyouso
Copy link

imissyouso commented Feb 19, 2020

also, such approach solves our issue thenativeweb/node-cqrs-domain#153
Here we can simple use Redis aggregate state store for making unique fields validations at 'preCondition' state because updating aggregate state in redis is processed in bounds of aggregate lock as a last step of command processing.

@nanov
Copy link
Contributor

nanov commented Feb 21, 2020

I think you don't get how the current, or any classical cqrs/es, implementation works.

The fundamental difference here is that you don't have a aggregate state store, state is recreated before each new command, welcome to event-sourcing.

I am not saying that your implementation cannot work or has no uses - but it is different you remove one component (es) but introduce a two new ones (kafka, agrgegate state store) and additionally you have to be absoutliey sure that both are 100% synchronized before handling the next command on the same aggregate.

Once getting it right, you will benefit from faster writes, which cqrs/es is not about, and loose many of the benefits that rebuilding the state out of stream before handling a new command gives you.

Additionally, introducing such a heavy stack one comes to the question if using getEventStore isn't a better fit here as it gives you everything from all worlds.

@imissyouso
Copy link

@nanov

Additionally, introducing such a heavy stack one comes to the question if using getEventStore isn't a better fit here as it gives you everything from all worlds.

yes, I already use getEventStore to inject my own event store implementation

var domain = require('cqrs-domain')({
  	//...
	eventStore: createKafkaEventStore({
		ackTimeout: 3000, // when we save events to kafka we wait when we will receive them in consumer and only then saving considered completed. If consumer doesn't received it then we throw exception.
		client: {kafkaHost: 'kafka:9092'},
		sources: [
			//offset: 0
			{topic: 'events', partition: 0}
		],
		// See https://github.com/SOHU-Co/kafka-node for parameters description
		consumer: {
			groupId: 'test',//consumer group id, default `kafka-node-group`
			// Auto commit config
			autoCommit: true,
			autoCommitIntervalMs: 5000,
			// The max wait time is the maximum amount of time in milliseconds to block waiting if insufficient data is available at the time the request is issued, default 100ms
			fetchMaxWaitMs: 100,
			// This is the minimum number of bytes of messages that must be available to give a response, default 1 byte
			fetchMinBytes: 1,
			// The maximum bytes to include in the message set for this partition. This helps bound the size of the response.
			fetchMaxBytes: 1024 * 1024,
			// If set true, consumer will fetch message from the given offset in the payloads
			fromOffset: true,
			// If set to 'buffer', values will be returned as raw buffer objects.
			encoding: 'utf8',
			keyEncoding: 'utf8'
		}
	})
  	//...
});

@nanov
Copy link
Contributor

nanov commented Feb 21, 2020

this is what i ment. :)

@dmitry-textmagic
Copy link

The fundamental difference here is that you don't have an aggregate state store, the state is recreated before each new command, welcome to event-sourcing.

but isn't it technically the same? What's the difference between:
a) recreating the aggregate state by picking up the last aggregate snapshot + all events went to the event store after the last snapshot;
and b) recreating the aggregate state by loading it from the aggregate state?

a) and b) solutions look exactly the same for me, except that "snapshots" are taken continuously on each event.

Events are stored eventually in Kafka in this case, of course. It's not a bus, it's an event store.

@imissyouso
Copy link

imissyouso commented Feb 21, 2020

also, Kafka can "compact" events which works similarly to snapshots (...almost, actually it merges similar events rather than making the real snapshot of aggregate) https://medium.com/swlh/introduction-to-topic-log-compaction-in-apache-kafka-3e4d4afd2262

@nanov
Copy link
Contributor

nanov commented Feb 21, 2020

@dmitry-textmagic

but isn't it technically the same?

Well it's not the same ( snapshots are optional, configurable, and generally avoidable ), taking them out of the way you recreate and reapply event handlers before command, it really is everything except the same.

Plus, for snapshots you really don't care if they are created or not, you don't have to wait for them either handle failuers with them, as opposed that by you it is vital that the state store is updated before you can handle next command - otherwise you'll get really unexpected results.

end of the day - kafka is not an eventsotre for such long-living cqrs/es use-cases - it is great at streaming data in a partitioned manner, it is made for it.
Sure you can work it out to work like that, but I don't see either the gain nor either a real need, you don't solve atomicty or complexity problems you just switch them to another place. You either need to have one partition for all events, or partition per aggregateId - both approaches are suboptimal and have to scale dynamically. What if i want an ordered stream of ALL events for readmodel purposes, in such case you'll need to store those events in the same partition which will result in a not-scalabale solution.

@imissyouso
Copy link

imissyouso commented Feb 21, 2020

Plus, for snapshots you really don't care if they are created or not, you don't have to wait for them either handle failuers with them, as opposed that by you it is vital that the state store is updated before you can handle next command - otherwise you'll get really unexpected results.

the next command will be executed only when we are 100% sure that aggregate was updated, it solves on the code level - we execute callback in addEvents method only when received acks from redis/kafka, otherwise we throw timeout exception and command fails. If it was failed by timeout then there could be situations when we published event to kafka but due redis failing it didn't receive state update - by this way saved state will be inconsistent with kafka. We need to think how to avoid such situations, agree, that's a question. Probably we need to save state to the redis and only if it was saved with success then we can send the event to kafka, but if sending was failed, we should undo changes in redis...

What if i want an ordered stream of ALL events for readmodel purposes, in such case you'll need to store those events in the same partition which will result in a not-scalabale solution.

why we can't create partitions per aggregateId? Then we read it in manner - per aggregateId / per partition / per event denormalizer, for example: 10 partitions reads by 10 denomalizer instances. Because ordering of events across different aggregates (then partitions) is not important we can read it in parallel by denormalizers.

and have to scale dynamically

yes, how to do repartition is a good question.

@nanov
Copy link
Contributor

nanov commented Feb 21, 2020

Because ordering of events across different aggregates (then partitions) is not important we can read it in parallel by denormalizers.

But it is, readmodels can be cross-aggregate and even cross-context, loosing the order here may be vital, depends on design. Same thing applies for sagas.

EDIT

We need to think how to avoid such situations, agree, that's a question. Probably we need to save state to the redis and only if it was saved with success then we can send the event to kafka, but if sending was failed, we should undo changes in redis...

Thats similiar to what happens now with published field, except that right now it's published does not block the aggreagte ( worker ) in any way, the aggregate does not depend on the event publishing as he is going to get the stream out of the single-source-of-truth with the next command.

@albe
Copy link
Contributor

albe commented Jul 24, 2020

Don't use Kafka as EventStore. Kafka does not provide consistency guarantees (i.e. via optimistic concurrency check). It will completely mess up your data if you have multiple concurrent processes rebuild an aggregate and create new events (which you will if you just process http requests). See https://dyllanwli.github.io/2019/02/10/Using-Kafka-as-a-Event-Store/#:~:text=Kafka%20does%20not%20support%20optimistic,writer%20at%20the%20application%20level.
Adding an additional (shared) snapshot per event will only lead to you having to implement a distributed transaction protocol to keep both in sync. This is super hard. Avoid yourself the pain.

The same is btw true for the current "published" flag on the event and the concept of publishing (as @nanov mentioned), with the difference that you only end up with duplicated messages (at-least-once) that you need to handle, i.e. idempotency - don't use it, it overcomplicates things in order to have consistent consumers, which you do want. Have your consumers only query the EventStore from their own last persisted position (pull). See https://twitter.com/ih8nickfinding/status/1285490013854208000

why we can't create partitions per aggregateId?

You can and it's the best way to do something like that in Kafka (don't), but to rebuild a single aggregate (or a projection thereof), you need to fetch all events from all same aggregate types (=topic) and filter only the ones matching your aggregateId. This will quickly become a bottleneck or PITA.
See https://dyllanwli.github.io/2019/02/10/Using-Kafka-as-a-Event-Store/#:~:text=Lack%20of%20entity%20isolation

We need to think how to avoid such situations, agree, that's a question. Probably we need to save state to the redis and only if it was saved with success then we can send the event to kafka, but if sending was failed, we should undo changes in redis...

You noticed the issue and are uninentionally starting to implement a distributed transaction protocol (wrongly at first of course). You need to do this at every single arrow in your architecture plan where you move data outside a single thread of execution. Your architecture will be much more complex if you deal with it. Keep data single source and immutable (=read-only). That's the whole idea of EventSourcing really.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

6 participants