Saturday, January 3, 2026

Kafka Idempotence, Transactions, Offsets, and Exactly-Once Semantics

Kafka Idempotence, Transactions, Offsets, and Exactly-Once Semantics

Kafka Idempotence, Transactions, Offsets, and Exactly-Once Semantics

This document ties together the concepts we’ve been discussing: the idempotence protocol, sequence numbers, transactional producers, committing consumer offsets, and the classic failure scenarios that motivate Kafka’s exactly-once semantics (EOS). For a deeper background on EOS and how Kafka implements it with idempotent producers and transactions, see the Confluent article on exactly-once semantics and other EOS guides.

1. Idempotent producer: what it is and what it guarantees

1.1. Core idea

The idempotent producer is a Kafka feature that guarantees:

  • No duplicates: the same message is never written twice, even if the producer retries.
  • Ordering preserved (per partition): messages are written in the order the producer sends them.
  • Message loss is still possible: if an earlier message fails and later ones succeed, the earlier one can be dropped.

Idempotence is one of the building blocks Kafka uses to implement exactly-once semantics, but by itself it only guarantees “no duplicates + ordering,” not “no loss”.

1.2. The idempotence protocol: PID and sequence numbers

The idempotence protocol is implemented via:

  • Producer ID (PID): assigned by the broker when the producer starts. It uniquely identifies a producer instance.
  • Per-partition sequence numbers: for each partition, the producer assigns a monotonically increasing sequence number to each batch of records.
  • Broker-side tracking: the broker tracks, for each (PID, partition), the highest sequence number it has accepted.

The broker then enforces strict rules:

  • Accept new message: if incoming_seq == highest_seq + 1.
  • Accept duplicate retry: if incoming_seq == highest_seq (duplicate is ignored or safely deduplicated).
  • Reject late retry: if incoming_seq < highest_seq (too old; would break ordering and idempotence).
  • Reject gaps: if incoming_seq > highest_seq + 1 (illegal gap in sequence).
Key insight: Idempotence depends on strict ordering. The broker must reject any sequence that would violate monotonic progression, even if the application “doesn’t care” about ordering.

1.3. The seq=0 loss scenario

Consider this sequence on a single partition:

A → seq=0
B → seq=1
C → seq=2

Now imagine:

  1. A(seq=0) fails due to a transient network issue.
  2. B(seq=1) and C(seq=2) are successfully written and acknowledged.
  3. The producer retries A(seq=0) later.

At this point, the broker has:

highest_seq = 2
incoming_seq = 0

According to the idempotence rules:

  • incoming_seq < highest_seq → the broker must reject this message.

Result: A(seq=0) is lost. This is not a bug; it is the correct behavior for an idempotent producer. The broker cannot accept a late retry of an earlier sequence without breaking ordering and duplicate detection.

1.4. Why you cannot “fix” seq=0 by renumbering

A natural thought is: “If ordering doesn’t matter to my application, can I just resend A as a new message with a new sequence number (e.g., seq=3)?” The answer is no:

  • Sequence numbers are protocol metadata: they are not part of the Kafka log and are not controlled by your application.
  • The client library manages them: you cannot manually assign or override them.
  • Renumbering would break idempotence: the broker would treat the message as a new one, and you could end up with duplicates if the original write had partially succeeded.

Therefore, with idempotence alone, the seq=0 loss scenario is unavoidable in the presence of certain failure patterns.

2. Transactions: fixing the limitations of idempotence

2.1. What transactions add on top of idempotence

Kafka transactions build on the idempotent producer to provide full exactly-once semantics (EOS):

  • No duplicates (via idempotence).
  • Ordering preserved (per partition).
  • No message loss within a transaction.
  • Atomic multi-message, multi-partition, multi-topic writes.
  • Atomic commit of consumer offsets with output messages.

Transactions are enabled by configuring a transactional.id on the producer and using the transactional API: initTransactions(), beginTransaction(), commitTransaction(), and abortTransaction().

2.2. Transactional producer flow (high level)

initTransactions()
beginTransaction()

// 1. Read input (via consumer)
// 2. Process
// 3. Produce output records
// 4. Optionally send consumer offsets to the transaction

commitTransaction()   // or abortTransaction() on failure

All writes and offset commits inside a transaction are either fully committed or fully aborted. There are no partial writes visible to consumers in read_committed mode.

2.3. Why transactions eliminate the seq=0 loss scenario

Revisit the earlier example with A, B, C. With a transactional producer:

  1. The producer starts a transaction.
  2. It sends A, B, C as part of the same transaction.
  3. If A fails but B and C succeed, the transaction cannot be committed until A is successfully retried.
  4. If A never succeeds, the transaction is aborted, and none of A, B, or C become visible.

Result:

  • No partial writes.
  • No gaps like “B and C visible, A missing.”
  • No seq=0 loss within a committed transaction.

3. Committing consumer offsets and why it matters

3.1. What is a consumer offset?

A consumer offset is a “bookmark” that indicates how far a consumer has read in a partition. Committing an offset means:

“I have successfully processed all messages up to offset X. If I crash, resume from X+1.”

Kafka stores these offsets in an internal topic called __consumer_offsets. This is how Kafka tracks progress for each consumer group.

3.2. The classic failure problem: read → process → write

Consider this pipeline:

Input Topic A → Consumer → Processor → Producer → Output Topic B

And this sequence:

  1. Consumer reads message A from topic A.
  2. Processor generates output message B for topic B.
  3. Consumer commits the offset for A (marking it as processed).
  4. Producer crashes before writing B.

Result:

  • A is marked as processed and will not be re-read.
  • B was never written.
  • Message A is effectively lost.

This is the fundamental exactly-once problem: if you commit offsets before writing output, you risk loss; if you write output before committing offsets, you risk duplicates. This is why idempotence alone is not enough for end-to-end exactly-once processing.

3.3. Transactions + sendOffsetsToTransaction()

Kafka solves this by allowing the producer to include the consumer’s offsets in the same transaction as the output writes:

beginTransaction()

// 1. Consumer reads A
// 2. Processor generates B
// 3. Producer sends B to output topic
// 4. Producer calls sendOffsetsToTransaction(offset of A, consumerGroupId)

commitTransaction()   // or abortTransaction()

Now:

  • If the transaction commits:
    • B is visible in the output topic.
    • The offset for A is committed.
  • If the transaction aborts:
    • B is discarded (never visible).
    • The offset for A is not committed; A will be re-read.

This atomicity is what gives you true exactly-once semantics in a read → process → write pipeline.

4. Failure scenarios in the read → process → write pipeline

4.1. Non-transactional pipeline (unsafe)

Scenario A — Commit offset before writing output (loss)

1. Consumer reads A
2. Processor generates B
3. Consumer commits offset for A
4. Producer fails before writing B

Result:

  • A is marked processed.
  • B is never written.
  • A is lost forever.

Scenario B — Write output before committing offset (duplicates)

1. Consumer reads A
2. Processor generates B
3. Producer writes B
4. Consumer crashes before committing offset for A
5. Consumer restarts and re-reads A
6. Processor generates B again
7. Producer writes B again

Result:

  • A is processed twice.
  • B is written twice (duplicate output).

Without transactions, you are forced to choose between possible loss or possible duplicates. You cannot avoid both simultaneously.

4.2. Transactional pipeline (safe)

Scenario C — Transaction commits successfully

1. beginTransaction()
2. Consumer reads A
3. Processor generates B
4. Producer writes B
5. Producer sendOffsetsToTransaction(offset of A)
6. commitTransaction()

Result:

  • B is visible in the output topic.
  • Offset for A is committed.
  • A is processed exactly once.
  • No duplicates, no loss.

Scenario D — Failure before commit → transaction aborts

1. beginTransaction()
2. Consumer reads A
3. Processor generates B
4. Producer writes B (pending, not visible)
5. Producer or broker fails before commitTransaction()
6. Transaction coordinator aborts the transaction

Result:

  • B is discarded (never visible to consumers in read_committed mode).
  • Offset for A is not committed.
  • Consumer will re-read A.
  • No duplicates, no loss.

4.3. Summary table: non-transactional vs transactional

Aspect Non-Transactional (Idempotence Only) Transactional (Exactly-Once)
No duplicates Yes (for producer retries), but not end-to-end Yes, end-to-end
Ordering preserved (per partition) Yes Yes
No message loss No (seq=0 loss, offset-commit race) Yes (within transactions)
Atomic write + offset commit No Yes (via sendOffsetsToTransaction)
Exactly-once semantics No Yes

5. Transaction coordinator and broker crash behavior (high level)

5.1. Transaction coordinator state machine (conceptual)

The transaction coordinator manages transactional state for producers. Conceptually, it moves through states like:

  • NO_TXN / READY: no active transaction; producer has a valid PID + epoch.
  • ONGOING: a transaction is in progress; messages are written as pending/uncommitted.
  • PREPARE_COMMIT: coordinator writes a prepare-commit marker to the transaction log.
  • COMPLETE_COMMIT: coordinator writes a commit marker; messages become visible.
  • PREPARE_ABORT / COMPLETE_ABORT: similar flow for abort; pending messages are discarded.

On failures (producer crash, timeouts, fencing), the coordinator will typically move the transaction to ABORT, ensuring no partial writes are visible.

5.2. Broker crash and recovery

When a broker crashes and restarts, it:

  • Replays log segments to restore committed data and high watermarks.
  • Replays the transaction log to determine the state of in-flight transactions.
  • For each transaction:
    • If a prepare-commit marker is present but no final commit, it completes the commit.
    • If a prepare-abort marker or incomplete transaction is found, it aborts the transaction.

This ensures that after recovery, the broker presents a consistent view: no partial transactions, no duplicates, and no reordering for committed data.

6. The core use case: exactly-once processing, not “reading A and B in order”

In the canonical pipeline:

Input Topic A → Consumer → Processor → Producer → Output Topic B

The goal is not for a consumer to read both A and B in order. Typically:

  • One consumer group reads A from the input topic.
  • Another consumer group reads B from the output topic.

The real goals are:

  • Every A is processed exactly once.
  • Every A produces exactly one corresponding B (if that’s your business logic).
  • No A is lost.
  • No B is duplicated.

Committing consumer offsets inside the same transaction as writing B is what guarantees that you never end up in the “A marked processed but B never written” or “B written twice because A was re-read” situations. This is the essence of Kafka’s exactly-once semantics.

Neeraj, this is a **fantastic** question — and you’re now pushing into the *deepest* part of Kafka’s idempotence protocol: **What happens if seq‑0 is lost, seq‑1 and seq‑2 succeed, and THEN the producer crashes and restarts? Can the new producer session send seq‑0 again?** Let’s walk through this carefully and precisely. --- # ⭐ **Short Answer** **No — even after the producer crashes and restarts, Kafka will NOT accept seq‑0. It is permanently lost.** And the reason is subtle but absolutely fundamental: > **A new producer session gets a new Producer ID (PID) and resets sequence numbers to 0, > but the broker will NOT treat the old seq‑0 retry as part of the new session.** Let’s break this down step by step. --- # 🧩 **Step 1 — Original Producer Session** Producer sends: ``` A → seq=0 (fails) B → seq=1 (success) C → seq=2 (success) ``` Broker state for this PID: ``` PID = 123 highest_seq = 2 ``` Retry of seq‑0 arrives: ``` incoming_seq = 0 highest_seq = 2 → reject (late retry) ``` So **seq‑0 is lost**. --- # 🧩 **Step 2 — Producer Crashes** Producer process dies. This is important: - The PID is tied to the old producer session. - The broker keeps the PID state for a while (until it expires). --- # 🧩 **Step 3 — Producer Restarts** When the producer restarts, Kafka assigns: ``` NEW PID = 456 NEW epoch = 0 Sequence numbers reset to 0 ``` This is a **completely new identity**. The new producer session **cannot reuse** the old PID or old sequence numbers. --- # 🧩 **Step 4 — Can the new producer send the old seq‑0 message?** Here’s the key: ### ✔ The new producer *can* send a message with sequence number 0 ### ❌ But Kafka will treat it as a **new message**, not the old seq‑0 retry Why? Because: - The PID is different - The epoch is different - The broker’s sequence tracking is per **PID + partition** So the broker sees: ``` Old PID = 123 → highest_seq = 2 New PID = 456 → highest_seq = -1 (no messages yet) ``` So if the new producer sends: ``` A' → seq=0 ``` Kafka accepts it — **but it is NOT the original seq‑0**. It is a **new message**, unrelated to the lost one. --- # 🟥 **Critical Insight** > **Kafka cannot resurrect the original seq‑0 message after the producer restarts. > The original seq‑0 is permanently lost.** Even though the new producer can send a message with seq=0, it is: - A different PID - A different session - A different sequence space - A different message Kafka has no way to know it is “the same” message. --- # 🟦 **Why Kafka Cannot Accept the Old seq‑0 After Restart** Because doing so would break: - Duplicate detection - Ordering guarantees - Zombie producer protection - Idempotence protocol correctness Imagine if Kafka allowed this: ``` Old PID 123: seq=0 lost New PID 456: seq=0 accepted ``` Kafka would have no way to know: - Is this a retry of the old message? - Or is it a new message? - Should it dedupe? - Should it reorder? - Should it reject? This ambiguity is fatal for correctness. So Kafka’s rule is: > **A new producer session starts a new sequence space. > Old messages cannot be retried.** --- # 🟩 **Timeline Summary** ### **Original session** ``` PID=123 seq=0 → lost seq=1 → committed seq=2 → committed ``` ### **Producer crashes** ### **New session** ``` PID=456 seq numbers reset seq=0 → accepted (but treated as a NEW message) ``` ### **Conclusion** The original seq‑0 is gone forever. --- # ⭐ **Final Answer** ### ❌ No — Kafka will NOT accept the original seq‑0 after the producer restarts. ### ✔ The new producer can send a new message with seq=0, but it is NOT the same message. ### ✔ The original seq‑0 is permanently lost once seq‑1 and seq‑2 are committed. ### ✔ This is why idempotence alone cannot guarantee no message loss. ### ✔ Transactions are required if you want to prevent this scenario. --- If you want, I can also draw: - A timeline diagram of this exact scenario - A state machine showing PID rollover - A comparison table of idempotence vs transactions for this case Just tell me what would help you think through it next.

7. Quick mental model recap

  • Idempotence protocol: PID + per-partition sequence numbers; guarantees no duplicates and ordering, but can drop messages (e.g., seq=0) in certain failure patterns.
  • Transactions: build on idempotence to add atomic commit/abort, no loss, and exactly-once semantics across reads, writes, and offset commits.
  • Committing consumer offsets: is how Kafka tracks progress; doing it inside a transaction with output writes is how you avoid both loss and duplicates.
  • Exactly-once semantics: achieved by combining idempotent producers, transactional writes, and atomic offset commits.

No comments:

Post a Comment

Kafka Partition

🧩 What Exactly Is a Partition in Kafka? A partition is the fundamental unit of storage, parallelism, and scalability in Kafka. Think of ...