Apache Kafka — Network Architecture
(The Chinese version is below.)
Apache Kafka’s network architecture uses the Reactor Pattern, specifically a Multiple Reactors setup. This means that the NIO selectors responsible for handling Accept
and Read/Write
events are separated. The benefit of this approach is that different reactors can focus on the specific events they are monitoring. For example, as shown in the diagram below, the mainReactor
handles Accept
events, while the subReactor
handles Read/Write
events. If you are not yet familiar with the Reactor Pattern, you may first refer to Doug Lea’s Scalable IO in Java.
The main entry point of Apache Kafka’s network component is the SocketServer
, which works together with other components such as Acceptor
, Processor
, TransportLayer
, RequestChannel
, and so on. In the following sections, we will walk through the complete flow from a request to a response to see how each component participates in the process.
SocketServer
SocketServer
is the primary component responsible for managing the network, so both BrokerServer
and ControllerServer
include a SocketServer
. The SocketServer
creates an Acceptor
for each listener. Although Figure 2 shows only one Acceptor
, if there are multiple listeners in your configuration like PLAINTEXT://:9092,SSL://:9093
, there would actually be multiple Acceptor
in practice.
Acceptor (KAcceptor)
Here, we are referring to the Acceptor
in Figure 2, which is slightly different from the acceptor
in Figure 1. From here on, we will refer to the Acceptor
in Figure 2 as KAcceptor. The KAcceptor is more like the combination of the mainReactor
and acceptor
in Figure 1, because the SocketServer
itself does not have an NIO selector.
The NIO selector of the KAcceptor listens for Accept
events and manages multiple Processor
instances (the number is defined by num.network.threads
). When a new client connects, the KAcceptor establishes the connection and assigns it to one of its processors. Each processor has a queue of size 20 to record new connections. When the queue is full, the KAcceptor assigns connections using a round-robin approach. At this stage—within the KAcceptor—we have already established a connection with the client.
Processor
The Processor
also has an NIO selector, but in Kafka, this NIO selector is wrapped by org.apache.kafka.common.network.Selector
(hereafter referred to as KSelector). The reason for this is that Kafka defines its own transport interface at this layer—namely, NetworkReceive
and NetworkSend
. As a result, components further down the stack handle data defined by Kafka, rather than dealing with raw zeros and ones.
TransportLayer
In addition to wrapping the NIO selector, Kafka also wraps the SocketChannel
into a TransportLayer
. The reason for this is that Kafka needs to support both Plaintext and SSL transmission, and the TransportLayer
can handle the additional encryption and decryption processes.
Transfer byte array to NetworkReceive / NetworkSend
Network transmission can only consist of 0s and 1s. When a socket receives a byte array, how can it determine where to split it and then assemble it into an object?
Kafka’s method for parsing packets is defined in NetworkReceive#readFrom
. At the very beginning of the packet, the first 4 bytes store the size of the following data. Therefore, when reading, it first reads these 4 bytes, then uses the integer value constructed from them to determine how much additional data should be read next.
When Kafka finishes processing a request and needs to send back a response, it uses a similar approach to convert the response into a byte array. The specific process is defined in SendBuilder#buildSend
. It first writes the total length of the entire message, followed by the request header, and finally the request body.
NetworkReceive to Request
Although NetworkReceive
reads the required data from the SocketChannel
, how is this data converted from 0s and 1s into the fields of a request?
In Kafka, even the RPC framework is custom-designed. Each RPC request/response is defined in JSON format under clients/src/main/resources/common/message
.
After the Processor
obtains a NetworkReceive
from the TransportLayer
, it processes it in Processor#processCompletedReceives
by first converting the data into RequestHeader.json
. From the header, it can retrieve the API key and API version of the request, which makes it possible to determine which API the remaining data belongs to and how it should be converted.
Request to Response
After the Processor
finishes converting a request, it places it into the RequestChannel
’s queue, whose size can be adjusted via queued.max.requests
.
The RequestChannel
is shared among all processors, meaning that processors under different KAcceptors eventually aggregate their requests into the RequestChannel
. At this point, the network’s read operation comes to an end. From here, the requests are handled by the I/O threads managed by KafkaRequestHandlerPool
, and the number of I/O threads can be configured via num.io.threads
.
Once a request is processed, the RequestChannel
is used to identify the processor that originally generated the request, and the response is placed into that processor’s queue (the size of this queue cannot be configured via parameters). Finally, the processor uses the TransportLayer
to send the response back.
Takeaway
That’s the complete process in Kafka — from establishing a connection, to handling a request, and finally returning a response. Hopefully, after reading this article, the Kafka network will no longer feel like a black box.
To summarize:
Each listener in the
SocketServer
corresponds to aKAcceptor
.Each
KAcceptor
runs in its own thread and hasnum.network.threads
processors. Therefore, the total number of network threads is:
number of listeners + (number of listeners × num.network.threads).The
SocketServer
uses theKAcceptor
to listen forAccept
events and uses processors to handleRead/Write
events.A
SocketServer
has only oneRequestChannel
, so regardless of how many processors it has, all requests ultimately aggregate into the sameRequestChannel
, and responses are placed into the individual queues of each processor. However, note that the broker and controller each have their ownSocketServer
. If a server acts as both a broker and a controller, it will have twoRequestChannel
s.Kafka’s packet format is defined such that the first 4 bytes specify the length, followed by data whose size matches that length.
Apache Kafka 的 Network Architecture 使用的是 Reactor Pattern,而且是 Multiple Reactors,也就是說,負責處理 Accept
跟 Read/Write
的 NIO Selector 是分開的,這樣做的好處是讓不同 Reactor 專注處理他們監聽的事件,以下圖為例,mainReactor
負責處理 Accept
事件,subReactor
負責處理 Read/Write
事件。如果你對 Reactor Pattern 還不太熟悉的話,可以先閱讀 Doug Lea 的 Scalable IO in Java。
Apache Kafka 的 Network 元件主要入口是 SocketServer
,並且會用到其他元件像是 Acceptor
, Processor
, TransportLayer
, RequestChannel
等等,以下我們會從一個完整的 Request 到 Response 的流程,來看各個元件是如何參與其中。
SocketServer
SocketServer
是主要管理 Network 的元件,所以 BrokerServer
以及 ControllerServer
都包含 SocketServer
。SocketServer
會幫每一個 listener 創建一個 Acceptor
,雖然圖二只有一個 Acceptor
,不過如果你有多個 listener,像是 PLAINTEXT://:9092,SSL://:9093
,實質上則會有多個 Acceptor
。
Acceptor (KAcceptor)
這裡是指圖二的 Acceptor
,跟圖一的 acceptor
有點區別,所以接下來會以 KAcceptor 來稱呼圖二的 Acceptor
,KAcceptor 比較像圖一 mainReactor
+ acceptor 的部分,原因是 SocketServer
本身並沒有 NIO Selector。
KAcceptor 的 NIO Selector 會監聽Accept
事件,並且擁有多個 Processor(由num.network.threads
定義數量)。當有新的 client 時,KAcceptor 會建立連線,並分配給底下的 Processor,每個 Processor 都有個大小為 20 的 queue 來紀錄 new connection,當 queue 滿了,KAcceptor 會透過輪詢的方式來分配 connection,所以到 KAcceptor 這一步,我們已經可以跟 client 端建立起連線了。
Processor
Processor
也有一個 NIO Selector,不過 Kafka 會把這個 NIO Selector 透過 org.apache.kafka.common.network.Selector
封裝(以下用 KSelector 簡稱),原因是 Kafka 在這一層定義了自己的傳輸介面,也就是 NetworkReceive
以及 NetworkSend
,所以再往下的元件處理的就是 Kafka 定義的資料,而不是單純的 0 跟 1。
TransportLayer
Kafka 除了對 NIO Selector 做封裝,也把SocketChannel
封裝成TransportLayer
,原因是 Kafka 要支援 Plaintext 以及 SSL 傳輸,所以透過TransportLayer
可以處理額外加解密的過程。
Transfer byte array to NetworkReceive / NetworkSend
網路傳輸只能是 0 或 1,當 Socket 接收到一串 byte array 時,如何知道在哪裡該截斷,然後把它組合出一個物件呢?
Kafka 解析封包的方法定義在 NetworkReceive#readFrom
,在封包的最前面 4 個 byte 放接下來資料的大小,所以讀的時候先讀 4 個 byte,再由這 4 個 byte 組出的 integer 決定接下來要讀多少資料。
當 Kafka 處理完 Request,要回傳 Response 時,也是透過類似的方式把 Response 轉成 byte array,具體方式定義在SendBuilder#buildSend
,一樣先寫入整個 message 的長度,再來是 request header,最後才是 request body。
NetworkReceive to Request
NetworkReceive
雖然從 SocketChannel
讀取到想要的資料,但這些資料又要如何從 0 跟 1 轉換成 Request 的欄位呢?
在 Kafka 裡面,連 RPC Framework 都是自己設計的,每個 RPC 的 Request / Response 都用 JSON 的方式定義在 clients/src/main/resources/common/message
。
Processor
從TransportLayer
拿到NetworkReceive
後,會在Processor#processCompletedReceives
把資料先轉換成RequestHeader.json
,在 header 裡面可以拿到這個 Request 的 api key 以及 api version,所以就能知道 header 之後的資料是哪個 API 以及如何轉換。
Request to Response
Processor
轉換完 Request 後,會把它放到RequestChannel
的 queue,這個 queue size 可以透過queued.max.requests
調整。
RequestChannel
是所有 Processor
共用的,代表不同 KAcceptor 底下的 Processor
,最終都會把 Request 匯總到 RequestChannel
,Network 的 Read operation 到這裡就告一段落了,在往後就是由KafkaRequestHandlerPool
管理的 IO thread 來處理這些 Request,IO thread 的數量可以由num.io.threads
調整。
當 Request 被處理完後,會透過RequestChannel
找到當初是哪個Processor
產生的 Request,並將 Response 放到該 Processor 的 queue,這個 queue 的大小無法由參數調整。最後 Processor 會透過TransportLayer
把 Response 傳輸出去。
Takeaway
以上就是 Kafka 從建立連線,到處理 Request,再到最後回傳 Response 的整個流程,希望在這篇文章之後,Kafka Network 不會再像個黑盒子。
最後總結一下:
每個 listener 在
SocketServer
裡面都代表一個KAcceptor
。每個
KAcceptor
都是一個 thread,並且有num.network.threads
個Processor
,所以 network threads 的總數量為 number of listeners + (number of listeners × num.network.threads)。SocketServer
透過KAcceptor
監聽Accept
事件,透過Processor
處理Read/Write
事件。一個
SocketServer
只有一個RequestChannel
,所以SocketServer
不管有多少個Processor
,Request 最後都會匯總到同個RequestChannel
,Response 則會放到Processor
各自的 queue。不過要注意 Broker 跟 Controller 會有各自的SocketServer
,如果一台 Server 同時是 Broker 跟 Controller,那會有兩個RequestChannel
。Kafka 定義的封包格式為前面 4 bytes 定義長度,後面接著前面長度大小的資料。