Do you really need exactly-once delivery?
In practice you may be better off using other options.
Delivery semantics is a very important topic when building data pipelines, and it’s even more important when building streaming data pipelines with a streaming platform like Apache Kafka. You never want to lose data. Ideally, you also want to avoid or minimize duplicates. So exactly-once delivery usually becomes a north star.
I had the same opinion a few years ago. And I was so excited to see the Exactly-Once Semantics announcement for Kafka in 2017. Nowadays, I think that exactly-once delivery is not always the best option, especially when using frameworks other than Kafka Streams: there are simpler and more reliable ways to achieve the same result.
Obligatory note: yes, true exactly-once delivery is impossible when the network is involved. So much is written on this topic that I don’t think any explanation is needed. Effectively-once delivery is what I really mean in this post, but “exactly-once” is still more popular as a concept name.
Exactly-once is still hard. You may actually lose data
Despite exactly-once delivery semantics being released six years ago in Kafka, it still can be hard to use.
It’s implemented using transactions and idempotent producers. Both help prevent emitting duplicates in case of a network issue: e.g. instead of simply retrying (which may cause duplicates), the whole transaction is aborted. And both have several gotchas that are easy to miss. This post from Ververica lists many best practices when using exactly-once delivery in Flink. Kafka transaction timeout is arguably the main source of concern:
[…] it is strongly recommended to configure the Kafka transaction timeout to a duration longer than the maximum checkpoint duration plus the maximum expected Flink job downtime.
So, do you know your maximum expected job downtime? 😬 Me neither. The maximum value you can set by default is 15 minutes. The change must be applied to all brokers. It must be configured because a Kafka transaction is only committed in a Flink checkpoint, so the checkpoint must happen within the Kafka transaction timeout, or some data loss will happen.
But that’s not all! Currently, there are 59 (!) “exactly once“ bugs reported in Flink JIRA. Many of them are closed, but some are still open and considered critical, e.g. causing OutOfMemoryError
s, commit failures. The sheer variety of issues paints a somewhat grim picture.
Apache Beam JIRA has 9 bugs reported for “kafka exactly once”.
What about Apache Spark Structured Streaming? Apparently, the project gave up on implementing exactly-once semantics for Kafka (PR)!
Finally, Kafka Streams must have excellent support for exactly-once semantics, right? In the end, it was designed with Kafka Streams in mind. Well, Kafka JIRA has 67 bugs reported for the streams
component (many issues related to state stores, and some are related to rebalance / recovery). In fact, exactly-once support in Kafka Streams was re-implemented (there are several options for configuring processing guarantee, including a few deprecated ones; the latest and currently recommended one is exactly_once_v2
).
Nevertheless, more recent versions of Kafka Streams support exactly-once semantics quite well. The Kafka transaction timeout is a lesser issue in this case, but it may still lead to stuck applications. Exactly-once is not supported for multiple clusters (which is a general Kafka Streams limitation). And, of course, exactly-once can affect performance, sometimes significantly. Check this blog if you want to learn more.
All of this is written not to diminish any of the work made by different communities but to highlight the complexity of the current approach. Using it requires an excellent understanding of how it’s implemented on the Kafka side, in your framework of choice and everything in between.
Why do you need exactly-once?
To avoid duplicates, d'oh! But what kind of duplicates?
As we saw, exactly-once in Kafka prevents potential duplicates created by issues at the producer, e.g. network issues causing retries. However, if the message was already duplicated before it arrived at the Kafka producer (e.g. by an upstream process), exactly-once delivery won’t help!
So, isn’t it better to have a solution for any kind of duplicates? Yes, I think so too.
In my experience, you care about exactly-once delivery in two situations:
When you consume data from Kafka and write it to another data store, typically for querying it later.
When you consume data from Kafka in a stream processor and perform stateful computations like aggregations.
In both situations, duplicates will lead to incorrect results, especially when used to power any kind of analytics.
Better options
Upserts
Simply using Kafka for data ingestion and delivering it to a data store(s) is still one of the most popular types of streaming data pipelines. In this case, try to choose data stores that support upserts or upsert-like semantics:
OLAP engines like Apache Pinot and Rockset support upserts natively. Apache Druid and Clickhouse do not (UPDATE: upserts are now possible in Apache Druid and can be achieved in ClickHouse using the ReplacingMergeTree engine).
Modern data lakehouse formats like Apache Iceberg, Apache Hudi and Delta Lake support upserts. Traditional data warehouses have historically struggled with it, but the situation is slowly improving. And, obviously, writing files to a data lake on an object store like AWS S3 doesn’t support any upserting.
Some other popular data stores for search and caching, like Elasticsearch or Redis, also support upsert semantics.
As you can see, there are plenty of data stores that can be used together with Kafka to achieve upsert semantics. Some may have different performance characteristics when performing upserts, but all of them should be generally fine with at least handling occasional duplicates.
One requirement for using upserts is having a unique message ID that can be used as a “primary key” when upserting. Usually, not a bad idea by itself.
Deduplication with stateful streaming
If you use a stateful stream processor like Apache Flink, instead of trying to achieve exactly-once delivery (which is no easy feat), you can, instead, implement a deduplication step. In fact, this was (and still is) a pretty common step in any modern data streaming pipeline.
However, because deduplication is a stateful operation, it adds additional performance and storage requirements. It seems that exactly-once delivery was once considered a silver bullet that could save us from implementing deduplication, but, as I mentioned above, it’s likely still needed to prevent other types of duplicates.
Implementing deduplication in a stateful framework like Flink just requires a state variable and a timer: use state to flag when a message with a certain id first arrived and a timer to implement a TTL mechanism for garbage collection so it doesn't grow indefinitely. This requires you to:
have a unique message ID (just like with upserting).
come up with a reasonable TTL value that covers a good window and uses a reasonable amount of state. It mostly depends on your data volume.
Here’s an example (using state TTL instead of a timer).
Nowadays, deduplication can even be expressed with pure SQL thanks to window functions (examples: one, two).
Nice write up!
I can relay the same pain on my experience dealing with exactly-once Flink applications. If Flink failed to commit the transaction before the timeout for whatever reason, the app will just get stuck into a crashlooping state because it'll kep trying to commit a transaction that doesn't exist anymore..
Re: better solutions. I think upsert syntax is generally the preferrable option here. For ledger-like applications that has zero tolerance on duplicates, implementing stateful deduplication process likely require a global window (i.e. one that without TTL). This means that you end up with a inifitely growing state size, which leads to another whole set of problems e.g. storage, recovery time, latency.
Wow, you set yourself a very high bar as the first post in the substack 👏
Showing the number of bugs related to ExactlyOnce in Flink/Spark/Beam/Kafka is an interesting and convincing angle. Just like 100% uptime, it's a goal not really something could be achieved in the real world. I am with you. Sending upsert to downstream, or applying dedup during streaming processing will be much more practical. For streaming databases like us, one or more columns can be set as primary key(s) for a data stream, so you can just send duplicated data in and write SQL in the clean way. No need to configure Kafka as a rocket scientist.