Apache Kafka - Produce Request
(The Chinese version is below.)
The previous article explained how Kafka establishes connections and receives requests. This article focuses on how Kafka processes write requests after receiving a Produce Request. It will first introduce the Produce Request Protocol, then explain the specific format in which records are stored on disk, and finally discuss how a topic partition leader temporarily holds requests that cannot yet be acknowledged when the producer requires sufficient replicas to be written.
Produce Request Protocol
Just like in web backend development, when we study a new API, we usually start by examining the request and response data fields. It’s the same with Kafka — the data fields for a Produce Request are defined in ProduceRequest.json.
From the JSON below, you can see that each request has a unique apiKey. This corresponds to what was explained in the previous article Apache Kafka — Network Architecture: after the Processor receives a NetworkReceive
, it first parses the apiKey
in the request header to determine which request type the body corresponds to. If the apiKey
is 0
, then it’s a ProduceRequest
.
The listeners
field contains only broker
. It means this request can only be handled by the Broker server. If controller
is also included, it means the Controller server can handle it as well.
The validVersions
field is 3–13
, meaning this Kafka version can only handle Produce Request versions between 3 and 13. This version corresponds to the apiVersion
in the request header. If the apiVersion
is outside the 3–13
range, Kafka will return an InvalidRequestException
. Normally, validVersions
start at 0
, but since Produce Requests are 3–13
, it indicates that versions below 3 have already been deprecated.
Each field in fields
has its own versions
attribute, specifying which apiVersion
support that field. For example, the Name
field is 0–12
, meaning it’s only supported in versions 0 through 12, while TopicId
is 13+
, meaning it’s supported only from version 13 onward. This shows that Produce Requests are transitioning from identifying topic partitions using Topic Name + Partition Index to using Topic Id + Partition Index.
In Kafka, protocol changes are considered public changes, and every public change requires a Kafka Improvement Proposal (KIP). The comments in ProduceRequest.json also indicate what changes were made in each apiVersion
and which KIP introduced them.
{
"apiKey": 0,
"type": "request",
"listeners": ["broker"],
"name": "ProduceRequest",
// Versions 0-2 were removed in Apache Kafka 4.0, version 3 is the new baseline. Due to a bug in librdkafka,
// these versions have to be included in the api versions response (see KAFKA-18659), but are rejected otherwise.
// See `ApiKeys.PRODUCE_API_VERSIONS_RESPONSE_MIN_VERSION` for more details.
//
// Version 1 and 2 are the same as version 0.
//
// Version 3 adds the transactional ID, which is used for authorization when attempting to write
// transactional data. Version 3 also adds support for Kafka Message Format v2.
//
// Version 4 is the same as version 3, but the requester must be prepared to handle a
// KAFKA_STORAGE_ERROR.
//
// Version 5 and 6 are the same as version 3.
//
// Starting in version 7, records can be produced using ZStandard compression. See KIP-110.
//
// Starting in Version 8, response has RecordErrors and ErrorMessage. See KIP-467.
//
// Version 9 enables flexible versions.
//
// Version 10 is the same as version 9 (KIP-951).
//
// Version 11 adds support for new error code TRANSACTION_ABORTABLE (KIP-890).
//
// Version 12 is the same as version 11 (KIP-890). Note when produce requests are used in transaction, if
// transaction V2 (KIP_890 part 2) is enabled, the produce request will also include the function for a
// AddPartitionsToTxn call. If V2 is disabled, the client can't use produce request version higher than 11 within
// a transaction.
// Version 13 replaces topic names with topic IDs (KIP-516). May return UNKNOWN_TOPIC_ID error code.
"validVersions": "3-13",
"flexibleVersions": "9+",
"fields": [
{ "name": "TransactionalId", "type": "string", "versions": "3+", "nullableVersions": "3+", "default": "null", "entityType": "transactionalId",
"about": "The transactional ID, or null if the producer is not transactional." },
{ "name": "Acks", "type": "int16", "versions": "0+",
"about": "The number of acknowledgments the producer requires the leader to have received before considering a request complete. Allowed values: 0 for no acknowledgments, 1 for only the leader and -1 for the full ISR." },
{ "name": "TimeoutMs", "type": "int32", "versions": "0+",
"about": "The timeout to await a response in milliseconds." },
{ "name": "TopicData", "type": "[]TopicProduceData", "versions": "0+",
"about": "Each topic to produce to.", "fields": [
{ "name": "Name", "type": "string", "versions": "0-12", "entityType": "topicName", "mapKey": true, "ignorable": true,
"about": "The topic name." },
{ "name": "TopicId", "type": "uuid", "versions": "13+", "mapKey": true, "ignorable": true, "about": "The unique topic ID" },
{ "name": "PartitionData", "type": "[]PartitionProduceData", "versions": "0+",
"about": "Each partition to produce to.", "fields": [
{ "name": "Index", "type": "int32", "versions": "0+",
"about": "The partition index." },
{ "name": "Records", "type": "records", "versions": "0+", "nullableVersions": "0+",
"about": "The record data to be produced." }
]}
]}
]
}
Acks
Kafka is a system that gives users control over how writes are acknowledged. When writing data, some systems can tolerate data loss, while others require replication across multiple nodes before considering the write successful. Kafka does not enforce a single definition of “success”; instead, it delegates the decision to the user through the acks
field.
There are only three valid values for acks
. If any other value is used, Kafka returns INVALID_REQUIRED_ACKS:
1: The write is considered successful as long as the data is written to the leader.
0: The producer does not care about the result — success or failure is acceptable. However, if an error occurs during the write, the broker will close the connection to signal the producer that the write failed.
-1 (all): The write is only considered successful if at least
min.insync.replicas
replicas acknowledge it. Each topic can define its ownmin.insync.replicas
, giving users control to decide how many replicas must store the data based on its importance.
Record & Batch
To improve write performance, a single Produce Request can carry multiple Records. Multiple Records in Kafka are called a Batch. A Batch is not just a conceptual grouping; it also has physical significance, because multiple Records share the metadata of the Batch. This approach saves storage space: if every Record stored its own copy of the Batch metadata, it would consume far more space. These metadata fields are defined in DefaultRecordBatch.java and include the following:
BaseOffset
: Indicates the starting offset of this Batch.Length
: The number of Bytes in the entire Batch, excluding the BaseOffset and Length fields themselves.PartitionLeaderEpoch
: The Leader Epoch of the Partition at the time this Batch was written. When conflicts occur between Replicas, the Partition Leader Epoch is used to determine which data should be discarded. (This will be explained further in the next article on Fetch Request & Replication Model.)Magic
: The version number of the Batch. Kafka currently has three versions (0, 1, 2). Only version 2 supports Transactions, and starting from Kafka 4.0, if the data written is not version 2, the Broker will convert it to version 2.CRC
: Used to verify that the data received by the Broker matches the data sent by the Producer. If they don’t match, aCorruptRecordException
will be returned. TheCRC
is calculated from the next field (Attributes
) up to the end of the Batch.Attributes
: This field is 2 Bytes, with each bit representing different meanings: Unused (15–7), Delete Horizon Flag (6), Control (5), Transactional (4), Timestamp Type (3), Compression Type (2–0).LastOffsetDelta
: The difference between the last offset in the Batch and the BaseOffset. Why not just store the LastOffset directly? Because storing the difference requires less space.BaseTimestamp
: The timestamp of the first Record in the Batch.LastTimestamp
: Depends on the Timestamp Type inAttributes
. If set to LOG_APPEND_TIME, it is the system time when the Broker writes the data. If set to CREATE_TIME, it is the maximum timestamp among the Records in the Batch.ProducerId
/ProducerEpoch
/BaseSequence
: These three fields are related to transactions and idempotence. They will be explained in later articles.RecordsCount
: The number of Records contained in the Batch.
As for the data of individual Records, it is defined in DefaultRecord.java. Essentially, this corresponds to the data that users provide when creating a ProducerRecord, such as the key, value, and user-defined headers.
Data Transformation
When a user uploads data to the Broker through a Producer Request, will the data be written directly to disk as-is, or will it undergo transformation? The answer depends on the compression.type
settings of both the producer and the topic.
By default, the producer’s compression type is none
, meaning no compression. The topic’s default is producer
, which means it inherits the compression type used by the producer. Therefore, under the default settings, no data transformation is needed on the broker.
However, if the compression.type settings of the producer and the topic differ, data transformation will occur in LogValidator#validateMessagesAndAssignOffsetsCompressed. This behavior reduces Kafka’s write performance.
You might wonder: if this behavior reduces performance, why does Kafka still support it? The reason is that not all machines running producers necessarily support compression, so the broker must be able to handle it. But if your producer supports compression, you should avoid relying on the broker for data transformation.
Segment
After a series of data transformations and validations, where does Kafka actually write the data? Kafka creates a dedicated folder for each Topic Partition, named in the format {topic-name}-{partition-index}
. For example, if the topic name is my-topic and the partition index is 0, the folder name would be my-topic-0.
The previously mentioned Batches are written into files ending with .log
under the topic partition folder. As more and more data is written, if everything were stored in a single file, read performance would gradually decrease. To address this, Kafka splits log files by offset. For instance, in a file named 00000000000001000334.log, the log offsets inside are at least 1000334. This way, Kafka can quickly determine which file contains a given offset simply by looking at the filename.
However, even a single .log
file can still contain a large amount of data. How can Kafka quickly locate a desired offset within one log file? The answer is that Kafka, in addition to writing logs, also writes the mapping of offsets to their positions into an .index
file. But not every offset is indexed — otherwise it would be meaningless. Instead, Kafka allows users to configure how frequently to write indexes via index.interval.bytes
. The default is 4096 bytes. It’s important to note that Kafka calculates this interval based on Batch size. For example, if one Batch is 12288 bytes (4096 * 3), the broker will still only write one index entry. The detailed logic for this process can be found in LogSegment#append.
In addition to the offset index, Kafka also maintains a timestamp index, stored in files ending with .timeindex. The reason for this is that Kafka provides a ListOffsets
RPC that allows users to query logs based on timestamp. The write condition for the timestamp index is the same as for the offset index, but since the timestamp of each record does not always increase monotonically, if a new timestamp is not greater than the latest timestamp already stored, Kafka will skip writing it.
Acks = -1
The above introduces how a Produce Request is written to the leader, from the data format to the specific file it is written to. But you might still wonder: when Acks = -1, how does the leader know when to send a response? This involves the Fetch Request and the Replication Model, which will be explained in detail in the next article. For now, let’s first look at how the leader temporarily stores data that cannot yet trigger a response.
In Kafka, Produce Requests are not the only ones that cannot return immediately — Fetch Requests also support delayed responses. This type of operation is called a DelayedOperation, and the framework that supports it is called DelayedOperationPurgatory. Let’s take Produce Requests as an example to see how this kind of data is temporarily stored on the broker. When the leader receives a Produce Request with Acks = -1 and successfully writes the data, it will wrap the request into a DelayedProduce within ReplicaManager#maybeAddDelayedProduce and store it in a Map data structure.
The reason for using a Map is that a single Produce Request can include multiple topic partitions, but each topic partition may progress at a different replication pace. Therefore, the Key is the topic partition, making it easy to check which produce requests can now respond when that partition’s replication advances. The Value is a linked list that records the corresponding DelayedProduce instances. Since Kafka does not restrict a topic partition to only one producer, different producers may send requests with Acks = -1 to the same partition, resulting in multiple DelayedProduces for that partition. Hence, a linked list is used for storage.
Takeaway
Kafka allows users to define their own conditions for a successful write, which can be flexibly adjusted through acks and min.insync.replicas.
Multiple Records are grouped into a Batch to improve write performance.
Starting from Kafka 4.0, the Batches written to disk are Magic version 2.
Avoid using different compression types between the producer and the topic.
When writing logs, Kafka also checks whether it needs to write an offset index and a timestamp index. If you want to speed up data lookup, you can adjust
index.interval.bytes
to write more index entries.When acks = -1, Kafka temporarily stores Produce Requests that cannot yet respond as DelayedProduce.
上一篇分享了 Kafka 如何建立連線,並接收 Request,這篇主要想探討 Kafka 在收到 Produce Request 後,是如何處理寫入請求的,會先從 Produce Request Protocol 開始介紹,然後再帶大家理解 Record 具體會以什麼形式保存在硬碟,最後當 Producer 要求要有足夠的 Replica 寫入時,Topic Partition Leader 又是如何暫時保存那些還不能回傳的 Request 的。
Produce Request Protocol
就像 Web backend 開發,當我們在研究一個新的 API 時,通常會先看 Request / Response 的資料欄位,在 Kafka 也是如此,Produce Request 的資料欄位定義在 ProduceRequest.json。
從下面的 JSON 可以看到,每個 Request 都會有一個唯一的 apiKey
,這對應到上一篇 Apache Kafka — Network Architecture 説的,Processor 拿到 NetworkReceive
後,會先解析 Request Header 裡面的 apiKey
,如此才知道 Request Body 對應到的是哪個 Request,如果 apiKey
是 0
,那就是 ProduceRequest
。
listeners
欄位裡面只有 broker
,代表這個 Request 只能由 Broker Server 處理,如果還有 controller
,代表 Controller Server 也可以處理。
validVersions
是 3–13
,代表目前這個版本的 Kafka 只能處理 Produce Request Version 3 至 13 之間的版本,這個 Version 也對應到 Request Header 裡的 apiVersion
,如果 apiVersion
不在 3 到 13 之間,就會回傳 InvalidRequestException
。一般來說 validVersion
是從 0
開始,但 Produce Request 是 3–13
,代表 2 以下的版本已經棄用了。
fields
裡面的每個欄位,都有一個 versions
,代表這個 field 在哪些 apiVersion
裡面是支援的,像是 Name
欄位是 0–12
,代表 Name
欄位只支援 0 至 12 之間的版本,而 TopicId
是 13+
,代表只有在 13 版以後的 Produce Request 有支援,由此可以知道 Produce Request 從以前的 Topic Name + Partition Index 來代表 Topic Partition,以後會變成 Topic Id + Partition Index。
在 Kafka 裡面,改動 Protocol 是 Public Change,每個 Public Change 都會需要一個 Kafka Improvement Proposal (KIP),從 ProduceRequest.json 的註解也可以看到,每個 apiVersion
的改動做了哪些事情,以及是哪個 KIP 造成這個改動的。
{
"apiKey": 0,
"type": "request",
"listeners": ["broker"],
"name": "ProduceRequest",
// Versions 0-2 were removed in Apache Kafka 4.0, version 3 is the new baseline. Due to a bug in librdkafka,
// these versions have to be included in the api versions response (see KAFKA-18659), but are rejected otherwise.
// See `ApiKeys.PRODUCE_API_VERSIONS_RESPONSE_MIN_VERSION` for more details.
//
// Version 1 and 2 are the same as version 0.
//
// Version 3 adds the transactional ID, which is used for authorization when attempting to write
// transactional data. Version 3 also adds support for Kafka Message Format v2.
//
// Version 4 is the same as version 3, but the requester must be prepared to handle a
// KAFKA_STORAGE_ERROR.
//
// Version 5 and 6 are the same as version 3.
//
// Starting in version 7, records can be produced using ZStandard compression. See KIP-110.
//
// Starting in Version 8, response has RecordErrors and ErrorMessage. See KIP-467.
//
// Version 9 enables flexible versions.
//
// Version 10 is the same as version 9 (KIP-951).
//
// Version 11 adds support for new error code TRANSACTION_ABORTABLE (KIP-890).
//
// Version 12 is the same as version 11 (KIP-890). Note when produce requests are used in transaction, if
// transaction V2 (KIP_890 part 2) is enabled, the produce request will also include the function for a
// AddPartitionsToTxn call. If V2 is disabled, the client can't use produce request version higher than 11 within
// a transaction.
// Version 13 replaces topic names with topic IDs (KIP-516). May return UNKNOWN_TOPIC_ID error code.
"validVersions": "3-13",
"flexibleVersions": "9+",
"fields": [
{ "name": "TransactionalId", "type": "string", "versions": "3+", "nullableVersions": "3+", "default": "null", "entityType": "transactionalId",
"about": "The transactional ID, or null if the producer is not transactional." },
{ "name": "Acks", "type": "int16", "versions": "0+",
"about": "The number of acknowledgments the producer requires the leader to have received before considering a request complete. Allowed values: 0 for no acknowledgments, 1 for only the leader and -1 for the full ISR." },
{ "name": "TimeoutMs", "type": "int32", "versions": "0+",
"about": "The timeout to await a response in milliseconds." },
{ "name": "TopicData", "type": "[]TopicProduceData", "versions": "0+",
"about": "Each topic to produce to.", "fields": [
{ "name": "Name", "type": "string", "versions": "0-12", "entityType": "topicName", "mapKey": true, "ignorable": true,
"about": "The topic name." },
{ "name": "TopicId", "type": "uuid", "versions": "13+", "mapKey": true, "ignorable": true, "about": "The unique topic ID" },
{ "name": "PartitionData", "type": "[]PartitionProduceData", "versions": "0+",
"about": "Each partition to produce to.", "fields": [
{ "name": "Index", "type": "int32", "versions": "0+",
"about": "The partition index." },
{ "name": "Records", "type": "records", "versions": "0+", "nullableVersions": "0+",
"about": "The record data to be produced." }
]}
]}
]
}
Acks
Kafka 是一個把選擇交給使用者的系統,當寫入一筆資料時,有些系統可以接受資料丟失,有些系統則要求一定要複製到多個節點才算成功,Kafka 並沒有強行規定哪個方式才算是寫入成功,而是透過 Acks 欄位把選擇權交給使用者。
Acks 只有三個合法的值,如果不是這三個值,會回傳 INVALID_REQUIRED_ACKS:
1:只要將資料寫入 Leader,就算成功。
0:對寫入結果不在意,成功或失敗都可以。雖然 Producer 對寫入結果不在意,不過如果寫入的過程中有錯誤,Broker 會透過關閉連線,來讓 Producer 感知到寫入失敗。
-1 (all):至少寫入
min.insync.replicas
才算成功。每個 Topic 都可以自己定義min.insync.replicas
,所以 Kafka 又再次把選擇權交給使用者,使用者可以依據資料的重要性,選擇至少需要寫入幾個 Replica。
Record & Batch
為了提升寫入效能,一個 Produce Request 裡面可以夾帶多個 Record,多個 Record 在 Kafka 裡面稱為 Batch。Batch 不只是字面上的意義,也有物理上的意義,因為多個 Record 會共享 Batch 的基礎資料,這樣做的好處是更節省資料空間,如果每個 Record 都存一份 Batch 定義的資料,那會耗費過多的空間,這些基礎資料定義在 DefaultRecordBatch.java,包含以下資料:
BaseOffset
:表示這個 Batch 是從哪個 offset 開始的。Length
:整個 Batch 扣掉 BaseOffset 跟 Length 後有多少 Bytes。PartitionLeaderEpoch
:寫入這個 Batch 時,當前 Partition 的 Leader Epoch。當多個 Replica 之間資料有衝突時,會用 Partition Leader Epoch 來判斷哪些資料該刪掉,這部分會在下一篇的 Fetch Request & Replication Model 在說明。Magic
:Batch 的版本號,目前 Kafka 的 Batch 有三個版本 0, 1, 2,只有版本 2 支援 Transaction,並且在 Kafka 4.0 之後如果寫入的不是版本 2,Broker 會將資料轉換成版本 2。CRC
:用來驗證 Broker 收到的資料是否跟 Producer 傳送的一致,如果不一致,會回傳 CorruptRecordException。CRC
的計算方式是從下個字段Attributes
開始到 Batch 結束。Attributes
:Attributes 有 2 Bytes,但每個 bit 都切割成不同意思,分別是 Unused (15–7), Delete Horizon Flag (6), Control (5), Transactional (4), Timestamp Type (3), Compression Type (2–0)。LastOffsetDelta
:Batch 的最後一個 offset 減去 BaseOffset 的數值。為什麼不像 BaseOffset 直接存 LastOffset 呢?因為存差值可以用更少的資料量。BaseTimestamp
:Batch 第一個 Record 的 Timestamp。LastTimestamp
:依據 Attributes 裡面的 Timestamp Type 而定,如果是 LOG_APPEND_TIME,則是寫入 Broker 時的系統時間,如果是 CREATE_TIME,則是 Batch 裡面,Record 最大的 Timestamp。ProducerId
/ProducerEpoch
/BaseSequence
:這三個欄位跟 Transaction 以及 Idempotence 有關,會在之後的文章在說明。RecordsCount
:Batch 裡面有多少個 Record。
至於 Record 的資料則是定義在 DefaultRecord.java,基本上是使用者在創建 ProducerRecord 時候會給的資料,像是 key, value, 以及使用者定義的 headers。
資料轉換
當使用者透過 Producer Request,將資料上傳到 Broker 時,資料會原封不動的寫入 Disk,還是會經過轉換呢?答案是取決於 Producer 跟 Topic 的 compression.type
設定。
Producer 預設的 Compression Type 是 none
,代表不壓縮,Topic 預設的則是 producer
,代表沿用 Producer 使用的 Compression Type,所以在預設的情況下,不需要在 Broker 進行資料轉換。
如果 Producer 跟 Topic 設定的 compression.type
不一樣,則會在 LogValidator#validateMessagesAndAssignOffsetsCompressed 進行資料轉換,這個行為會降低 Kafka 的寫入效能。你可能會好奇,如果某個行為會降低效能,為什麼 Kafka 還要支援這個行為呢?原因是某些跑 Producer 的機器不一定支援 Compression,所以還是要依賴 Broker 端處理,但如果你的 Producer 支援 Compression,應該盡量避免資料轉換的行為。
Segment
在經過一連串的資料轉換與驗證後,Kafka 會把資料寫入哪裡呢?Kafka 會幫每個 Topic Partition 創建一個資料夾,以 {topic-name}-{partition-index}
命名,假設 Topic Name 是 my-topic,Partition Index 是 0,資料夾名稱會是 my-topic-0。
前面提到的 Batch 會被寫入 Topic Partition 資料夾底下 .log
結尾的檔案,隨著寫入的資料越來越多,如果都只寫入一個檔案,之後要讀取的效能就會降低,所以 Kafka 會以 Offset 來分割檔案,以檔名為 00000000000001000334.log 的檔案為例,裡面的 log offset 至少是 1000334,如此一來,透過檔名,Kafka 就能快速定位某個 offset 在哪個檔案。
但是一個 .log 檔還是會存入大量的資料,要如何在一個 .log
檔快速找到想要的 offset 呢?答案是 Kafka 在寫入 log 時,還會額外寫入 offset 對應的 position 到 .index
檔案,但不是每個 offset 都做 index,不然就跟沒做一樣了,所以 Kafka 透過 index.interval.bytes
讓使用者設定每隔多少 bytes 寫入一次 index,預設是 4096 bytes,不過要注意,Kafka 是以 Batch 為單位在計算間隔了多少 bytes,所以如果一個 Batch 有 12288 bytes (4096 * 3),對 Broker 來說還是只會寫入一次 index,這部分具體的邏輯在 LogSegment#append。
除了 offset index 之外,還有 timestamp index,以 .timeindex
為檔案結尾,原因是因為 Kafka 有一隻 ListOffsets
RPC 允許使用者依據 timestamp 查詢 log 的。timestamp index 的寫入條件跟 offset index 一樣,不過每條 Record 的 timestamp 並不一定是一直往上增加的,所以如果要寫入的 timestamp 沒有比目前最新的 timestamp 大,則會直接跳過。
Acks = -1
以上就是 Produce Request 寫入 Leader 的介紹,從寫入的資料形式,到具體寫入哪份檔案,不過你可能還會好奇,如果 Acks = -1 時,Leader 是怎麼知道何時該回傳 Response 的?這部分牽涉到 Fetch Request 以及 Replication Model,下一篇會再詳細介紹,不過這邊先說明 Leader 如何暫存這些還不能 Response 的資料。
在 Kafka 裡面,沒辦法即時回傳的 Request 不只 Produce Request,Fetch Request 也支援延時回傳,這類操作稱為 DelayedOperation,支援這個操作的框架稱為 DelayedOperationPurgatory,接下來我們就以 Produce Request 為例,看一下這類型的資料是如何暫存在 Broker 的。當 Leader 收到 Acks = -1,而且成功寫入資料,就會在 ReplicaManager#maybeAddDelayedProduce 裡面,將這次的 Produce Request 包裝成 DelayedProduce,並放入一個 Map 的資料結構保存。
使用 Map 的原因是因為一個 Produce Request 可以包含多個 Topic Partitions,但每個 Topic Partition 複製的進度是不一樣的,所以 Key 是 Topic Partition,方便 Topic Partition 推進複製進度時,可以直接查詢哪些 Produce Request 可以 Response 了。Map 的 Value 則是 Linked List,用來記錄有哪些 DelayedProduce,Kafka 並沒有限制一個 Topic Partition 只有一個 Producer 可以寫入,所以不同 Producer 送來的 Request 如果都有 Acks = -1,那可能會對同個 Topic Partition 產生不同的 DelayedProduce,所以用 Linked List 保存。
Takeaway
Kafka 讓使用者自己定義寫入成功的條件,使用者可以透過 acks 以及 min.insync.replicas 彈性調整。
多個 Record 會集合成 Batch 以提升寫入效能。
Kafka 4.0 之後,寫入 Disk 的 Batch 是 Magic 2。
避免在 Producer 以及 Topic 使用不同的 Compression Type。
Log 寫入時,也會檢查是否需要寫入 Offset Index 以及 Timestamp Index,如果想要提高定位資料的速度,可以透過 index.interval.bytes 調整以寫入更多 Index。
Acks = -1 時,Kafka 會透過 DelayedProduce 暫存還不能 Response 的 Produce Request。