Saturday, July 22, 2017

Notes on Exactly-Once semantics in light of Kafka's 0.11 release.

The Exactly-Once (EO) semantics feature introduced in Apache Kafka (release 0.11) caused a Tsunami in the academic and industrial communities. Why not? The blog post of Neha Narkhede says "Exactly-once Semantics are Possible: Here’s How Kafka Does it", which is a very teasing title as everyone in this community tends to have a doctrine that this is impossible. I confess that this title was a smart choice from a business perspective.

Technically, I've got surprised with the paramount criticism of people who considered this a bold claim and especially those who cited the FLP, the Two Generals problem, and even the CAP! My conjecture is that most of these people neither read (very well) Neha's blog post or Kafka's design documentation, nor the aforementioned academic papers.

Yet another blog post on Exactly-Once!

Despite the several blog posts on the subject, I noticed that some important points have not been fully discussed yet, which I try to address here. I'm particularly interested in this subject because we've recently designed a "non-classical" exactly-once delivery protocol — well, consider it another bold claim :) — that we are planning to integrate with some messaging brokers (ZeroMQ, RabbitMQ, ActiveMQ, Kafka, ..) — if you work on these projects and you're are interested, please reach out. Given that, it was intuitive to be very excited to read about Kafka's new feature.

Buzz words: Exactly-Once (EO) delivery, semantics, processing, effective...?

I would first like to clarify this confusion that people often make. There is a clear distinction between EO delivery and EO semantics. EO delivery is a guarantee at the message delivery layer, in particular, the end to end buffered plumbing. The buffer may be considered a naive application decoupled form delivery, but since it is common across all applications, it makes sense to consider it part of the plumbing. To the contrary, EO semantics deals with many software layers (including the plumbing itself) to satisfy defined EO application semantics: read-and-execute one and only one copy of a message (reply, state, etc.). For instance, considering Kafka's case, EO delivery (which they call Idempotence) is part of the whole thing, i.e., Kafka's EO semantics.

But what really matters is basically EO semantics, being a defacto requirement for many applications and it's what your customers require — regardless of EO delivery (as long as it is not very costly). I argue that many current services implicitly implement (not necessarily correctly) exactly-once semantics: services retain a "received messages log" to check possible duplicates, or still favor TCP over UDP for this reason (though TCP does not guarantee exactly once delivery unless within an incarnation, indeed!). 

Some people believe that "EO processing" is the important subject, and consider that: 
"Exactly-once delivery, at the transport level, is impossible. It doesn’t exist in any meaningful way and isn’t all that interesting to talk about." 

I do agree that since EO delivery is, at best, hard to achieve (if not impossible), the only way to go is to tolerate this on another layer — the "processing" or application layer! Though practical, the delegation of solving EO delivery to another layer lead to the fact that EO delivery itself has not been well thought about, and all existing solutions that I know so far are "classical" — store messages in a log and deduplicate a la TCP fashion! Minimizing the importance of this problem is understood in the context of Kafka as they invest a lot on other layers, but EO delivery is still interesting as a stand-alone layer if you're not interested in a complete end-to-end solution as Kafka.

"Effective EO" is what manifests on the application, which Kafka called "EO Semantics".

But first, is Exactly-Once (EO) delivery possible?

Yes, and no! 

EO delivery is what people consider impossible (or at least very hard to achieve). But wait, is there a proof? This paper by Attiya et al. on "The level of handshake required for managing a connection" is probably the most relevant paper on the subject (though not often mentioned). In a nutshell, EO delivery may be possible if you can tolerate infinite delays (for at-least-once) and you have an infinite memory (for exactly-once); under these (impossible?) conditions, the best that you can have is a three-way handshake protocol (along with performance challenges). 

In practice, systems are not carbon copy of theoretical papers. You know that consensus is impossible in asynchronous models even in presence of a single crash failure (FLP); but you are aware (and maybe using) many consensus protocols (like Paxos and Raft) who did it, how? It's something to do with the assumptions: they build on "partial synchrony" instead of full asynchrony to guarantee liveness without violating safety. You also know the dilemma of the CAP theorem and you see many successful systems in production that are consistent (though eventually) and available, again because of the different assumptions. (You may have also read Eric Brewer's blog post about Google Spanner talking about CAP exceptions.) 

Same holds for EO delivery, if you have a sufficiently large memory, sufficiently connected network, and an efficient protocol (i.e., few handshakes) then EO delivery is possible. The subtle detail is rather what is considered "sufficient" and what's considered "efficient"!

What about Kafka's EO semantics?

As mentioned above, Kafka's new feature is about EO "semantics" and not EO "delivery", and that was the main confusion people made and fired their guns at — what they've considered — Kafka's "bold claim". What people know about the impossibility of EO delivery does not hold in the case of EO semantics; but yes, it is very hard to achieve. In my opinion, the design decisions Kafka's EO semantics team made are smart and can, theoretically, achieve their goal to a great extent (I only read Neha's blog post though).

But why not 100%? Again, due to the underlying assumptions throughout the different layers and, practically, due to the implementation flaws or bugs that might appear with time (the feature is not drilled enough to figure out the implementation caveats of such a complex problem).

Pedantic details: how/why Kafka achieves EO semantics?

In my opinion, Kafka's key design to achieve EO semantics is:

(1) The persistence of the log of messages with sequence numbers (they call this Idempotence).

The first point is to give an increment-only sequence numbers for Producer's messages and retain them in a persistent log in Kafka broker. The sequence number is simply used to check if this very message has already been added to the persistent log before. As Neha said, this is somehow like TCP with the persistence thing (TCP retains in-memory log). But is this enough to ensure EO delivery, a.k.a., Idempotence?

The devil is in the detail! Again, this is conceptually correct as I've mentioned above, but it's not clear (to me) what are Kafka's assumptions regarding acceptable delays and storage? In particular, what is the size of the log? This not only has to do with storage, but also with performance: what is the lookup time in the log on each new message (to detect duplicates)?

Moreover, if you are only storing the last received sequence number, then you assume ordering, which EO delivery does not necessarily guarantee and is not a requirement for all applications. The lookup cost will even persist in this case when you have billions of clients (which is not exaggerated in the IoT era). On the other hand, if you don't care about message ordering, then you will need to retain all message sequences, and hence the cost of lookup will be much higher. Notice the dilemma here: to guarantee ordering is more costly, in principle, but EO delivery becomes cheaper.

These are exactly what I've meant by "sufficient" and what's considered "efficient" in explaining EO delivery impossibility! I would like to see practical numbers on these, being not considered in the current Kafka's evaluation (by June 2017). (Note that measuring the Producer's throughput is not as important as measuring the message delay from Producer to Consumer, as Kafka's current experiments do.).

(2) The persistence of the state and the offsets atomically (part of what they call Transactions).

Now, having unique message in the broker is not enough as the application (i.e., the Consumer) may reread the entire log again or a previously consumed message if a fault happens. This requires persisting the offset (the pointer to the last message read), so you don't retry it once you recover. Until here you are not much safe if your offset is decoupled from the state you computed as it might happen that one of them is persisted and the other is not due to the failure (which will lead to either consuming a message twice or never). That's why the offset and the state must be atomically persisted.

Kafka does more than that in Transactions (with extra complexity) as they have the data sharded across multiple servers and non-deterministic operations exist; but this is not a requirement for all services.


It is interesting to see such a comprehensive EO design in practice; the corresponding investment is also an indicator that people started to care a lot about this problem. Given that Kafka is a complete system, it is still worth exploring other exactly-once semantics and delivery solutions to use elsewhere...

Last, but not least, stay tuned with our "new class" of Exactly-Once delivery protocols.

I would like to thank Joubin Houshyar (@SunOf27) for his helpful comments to improve this post.

No comments:

Post a Comment