Kafka的架構原理,你真的理解嗎?

Apache Kafka 最早是由 LinkedIn 開源出來的分布式消息系統,現在是 Apache 旗下的一個子項目,並且已經成為開源領域應用最廣泛的消息系統之一。

Kafka 社區非常活躍,從 0.9 版本開始,Kafka 的標語已經從「一個高吞吐量,分布式的消息系統」改為”一個分布式流平台”。

Kafka 和傳統的消息系統不同在於:

  • Kafka是一個分布式系統,易於向外擴展。

  • 它同時為發布和訂閱提供高吞吐量。

  • 它支持多訂閱者,當失敗時能自動平衡消費者。

  • 消息的持久化。

Kafka 和其他消息隊列的對比:

入門實例

生產者

代碼如下:

importjava.util.Properties;importorg.apache.kafka.clients.producer.KafkaProducer;importorg.apache.kafka.clients.producer.ProducerRecord;publicclassUserKafkaProducerextendsThread{privatefinalKafkaProducer<Integer,String>producer;privatefinalStringtopic;privatefinalPropertiesprops=newProperties();publicUserKafkaProducer(Stringtopic){props.put("metadata.broker.list","localhost:9092");props.put("bootstrap.servers","master2:6667");props.put("retries",0);props.put("batch.size",16384);props.put("linger.ms",1);props.put("buffer.memory",33554432);props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");producer=newKafkaProducer<Integer,String>(props);this.topic=topic;}@Overridepublicvoidrun(){intmessageNo=1;while(true){StringmessageStr=newString("Message_"+messageNo);System.out.println("Send:"+messageStr);//返回的是Future<RecordMetadata>,異步發送producer.send(newProducerRecord<Integer,String>(topic,messageStr));messageNo++;try{sleep(3000);}catch(InterruptedExceptione){e.printStackTrace();}}}}

消費者

代碼如下:

Propertiesprops=newProperties();/*定義kakfa服務的地址,不需要將所有broker指定上*/props.put("bootstrap.servers","localhost:9092");/*制定consumergroup*/props.put("group.id","test");/*是否自動確認offset*/props.put("enable.auto.commit","true");/*自動確認offset的時間間隔*/props.put("auto.commit.interval.ms","1000");props.put("session.timeout.ms","30000");/*key的序列化類*/props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");/*value的序列化類*/props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");/*定義consumer*/KafkaConsumer<String,String>consumer=newKafkaConsumer<>(props);/*消費者訂閱的topic,可同時訂閱多個*/consumer.subscribe(Arrays.asList("foo","bar"));/*讀取數據,讀取超時時間為100ms*/while(true){ConsumerRecords<String,String>records=consumer.poll(100);for(ConsumerRecord<String,String>record:records)System.out.printf("offset=%d,key=%s,value=%s",record.offset(),record.key(),record.value());}

Kafka 架構原理

對於 Kafka 的架構原理,我們先提出如下幾個問題:

  • Kafka 的 topic 和分區內部是如何存儲的,有什麼特點?

  • 與傳統的消息系統相比,Kafka 的消費模型有什麼優點?

  • Kafka 如何做到分布式的數據存儲與數據讀取?

Kafka 架構圖

Kafka 名詞解釋

在一套 Kafka 架構中有多個 Producer,多個 Broker,多個 Consumer,每個 Producer 可以對應多個 Topic,每個 Consumer 只能對應一個 Consumer Group。

整個 Kafka 架構對應一個 ZK 集群,通過 ZK 管理集群配置,選舉 Leader,以及在 Consumer Group 發生變化時進行 Rebalance。

Topic 和 Partition

在 Kafka 中的每一條消息都有一個 Topic。一般來說在我們應用中產生不同類型的數據,都可以設置不同的主題。

一個主題一般會有多個消息的訂閱者,當生產者發布消息到某個主題時,訂閱了這個主題的消費者都可以接收到生產者寫入的新消息。

Kafka 為每個主題維護了分布式的分區(Partition)日志文件,每個 Partition 在 Kafka 存儲層面是 Append Log。

任何發布到此 Partition 的消息都會被追加到 Log 文件的尾部,在分區中的每條消息都會按照時間順序分配到一個單調遞增的順序編號,也就是我們的 Offset。Offset 是一個 Long 型的數字。

我們通過這個 Offset 可以確定一條在該 Partition 下的唯一消息。在 Partition 下面是保證了有序性,但是在 Topic 下面沒有保證有序性。

在上圖中我們的生產者會決定發送到哪個 Partition:

  • 如果沒有 Key 值則進行輪詢發送。

  • 如果有 Key 值,對 Key 值進行 Hash,然後對分區數量取餘,保證了同一個 Key 值的會被路由到同一個分區;如果想隊列的強順序一致性,可以讓所有的消息都設置為同一個 Key。

消費模型

消息由生產者發送到 Kafka 集群後,會被消費者消費。一般來說我們的消費模型有兩種:

  • 推送模型(Psuh)

  • 拉取模型(Pull)

基於推送模型的消息系統,由消息代理記錄消費狀態。消息代理將消息推送到消費者後,標記這條消息為已經被消費,但是這種方式無法很好地保證消費的處理語義。

比如當我們已經把消息發送給消費者之後,由於消費進程掛掉或者由於網路原因沒有收到這條消息,如果我們在消費代理將其標記為已消費,這個消息就永久丟失了。

如果我們利用生產者收到消息後回復這種方法,消息代理需要記錄消費狀態,這種不可取。

如果採用 Push,消息消費的速率就完全由消費代理控制,一旦消費者發生阻塞,就會出現問題。

Kafka 採取拉取模型(Poll),由自己控制消費速度,以及消費的進度,消費者可以按照任意的偏移量進行消費。

比如消費者可以消費已經消費過的消息進行重新處理,或者消費最近的消息等等。

網路模型

Kafka Client:單線程 Selector

單線程模式適用於並發鏈接數小,邏輯簡單,數據量小的情況。在 Kafka 中,Consumer 和 Producer 都是使用的上面的單線程模式。

這種模式不適合 Kafka 的服務端,在服務端中請求處理過程比較複雜,會造成線程阻塞,一旦出現後續請求就會無法處理,會造成大量請求超時,引起雪崩。而在服務器中應該充分利用多線程來處理執行邏輯。

Kafka Server:多線程 Selector

在 Kafka 服務端採用的是多線程的 Selector 模型,Acceptor 運行在一個單獨的線程中,對於讀取操作的線程池中的線程都會在 Selector 註冊 Read 事件,負責服務端讀取請求的邏輯。

成功讀取後,將請求放入 Message Queue共享隊列中。然後在寫線程池中,取出這個請求,對其進行邏輯處理。

這樣,即使某個請求線程阻塞了,還有後續的線程從消息隊列中獲取請求並進行處理,在寫線程中處理完邏輯處理,由於註冊了 OP_WIRTE 事件,所以還需要對其發送響應。

高可靠分布式存儲模型

在 Kafka 中保證高可靠模型依靠的是副本機制,有了副本機制之後,就算機器宕機也不會發生數據丟失。

高性能的日志存儲

Kafka 一個 Topic 下面的所有消息都是以 Partition 的方式分布式的存儲在多個節點上。

同時在 Kafka 的機器上,每個 Partition 其實都會對應一個日志目錄,在目錄下面會對應多個日志分段(LogSegment)。

LogSegment 文件由兩部分組成,分別為「.index」文件和「.log」文件,分別表示為 Segment 索引文件和數據文件。

這兩個文件的命令規則為:Partition 全局的第一個 Segment 從 0 開始,後續每個 Segment 文件名為上一個 Segment 文件最後一條消息的 Offset 值,數值大小為 64 位,20 位數字字符長度,沒有數字用 0 填充。

如下,假設有 1000 條消息,每個 LogSegment 大小為 100,下面展現了 900-1000 的索引和 Log:

由於 Kafka 消息數據太大,如果全部建立索引,既占了空間又增加了耗時,所以 Kafka 選擇了稀疏索引的方式,這樣索引可以直接進入記憶體,加快偏查詢速度。

簡單介紹一下如何讀取數據,如果我們要讀取第 911 條數據首先第一步,找到它是屬於哪一段的。

根據二分法查找到它屬於的文件,找到 0000900.index 和 00000900.log 之後,然後去 index 中去查找 (911-900) = 11 這個索引或者小於 11 最近的索引。

在這里通過二分法我們找到了索引是 [10,1367],然後我們通過這條索引的物理位置 1367,開始往後找,直到找到 911 條數據。

上面講的是如果要找某個 Offset 的流程,但是我們大多數時候並不需要查找某個 Offset,只需要按照順序讀即可。

而在順序讀中,操作系統會在記憶體和磁盤之間添加 Page Cache,也就是我們平常見到的預讀操作,所以我們的順序讀操作時速度很快。

但是 Kafka 有個問題,如果分區過多,那麼日志分段也會很多,寫的時候由於是批量寫,其實就會變成隨機寫了,隨機 I/O 這個時候對性能影響很大。所以一般來說 Kafka 不能有太多的 Partition。

針對這一點,RocketMQ 把所有的日志都寫在一個文件里面,就能變成順序寫,通過一定優化,讀也能接近於順序讀。

大家可以思考一下:

  • 為什麼需要分區,也就是說主題只有一個分區,難道不行嗎?

  • 日志為什麼需要分段?

副本機制

Kafka 的副本機制是多個服務端節點對其他節點的主題分區的日志進行復制。

當集群中的某個節點出現故障,訪問故障節點的請求會被轉移到其他正常節點(這一過程通常叫 Reblance)。

Kafka 每個主題的每個分區都有一個主副本以及 0 個或者多個副本,副本保持和主副本的數據同步,當主副本出故障時就會被替代。

在 Kafka 中並不是所有的副本都能被拿來替代主副本,所以在 Kafka 的 Leader 節點中維護著一個 ISR(In Sync Replicas)集合。

翻譯過來也叫正在同步中集合,在這個集合中的需要滿足兩個條件:

  • 節點必須和 ZK 保持連接。

  • 在同步的過程中這個副本不能落後主副本太多。

另外還有個 AR(Assigned Replicas)用來標識副本的全集,OSR 用來表示由於落後被剔除的副本集合。

所以公式如下:ISR = Leader + 沒有落後太多的副本;AR = OSR+ ISR。

這里先要說下兩個名詞:HW(高水位)是 Consumer 能夠看到的此 Partition 的位置,LEO 是每個 Partition 的 Log 最後一條 Message 的位置。

HW 能保證 Leader 所在的 Broker 失效,該消息仍然可以從新選舉的 Leader 中獲取,不會造成消息丟失。

當 Producer 向 Leader 發送數據時,可以通過 request.required.acks 參數來設置數據可靠性的級別:

  • 1(默認):這意味著 Producer 在 ISR 中的 Leader 已成功收到的數據並得到確認後發送下一條 Message。如果 Leader 宕機了,則會丟失數據。

  • 0:這意味著 Producer 無需等待來自 Broker 的確認而繼續發送下一批消息。這種情況下數據傳輸效率最高,但是數據可靠性卻是最低的。

  • -1:Producer 需要等待 ISR 中的所有 Follower 都確認接收到數據後才算一次發送完成,可靠性最高。

    但是這樣也不能保證數據不丟失,比如當 ISR 中只有 Leader 時(其他節點都和 ZK 斷開連接,或者都沒追上),這樣就變成了 acks = 1 的情況。

高可用模型及冪等

在分布式系統中一般有三種處理語義:

at-least-once

至少一次,有可能會有多次。如果 Producer 收到來自 Ack的確認,則表示該消息已經寫入到 Kafka 了,此時剛好是一次,也就是我們後面的 Exactly-once。

但是如果 Producer 超時或收到錯誤,並且 request.required.acks 配置的不是 -1,則會重試發送消息,客戶端會認為該消息未寫入 Kafka。

如果 Broker 在發送 Ack 之前失敗,但在消息成功寫入 Kafka 之後,這一次重試將會導致我們的消息會被寫入兩次。

所以消息就不止一次地傳遞給最終 Consumer,如果 Consumer 處理邏輯沒有保證冪等的話就會得到不正確的結果。

在這種語義中會出現亂序,也就是當第一次 Ack 失敗準備重試的時候,但是第二消息已經發送過去了,這個時候會出現單分區中亂序的現象。

我們需要設置 Prouducer 的參數 max.in.flight.requests.per.connection,flight.requests 是 Producer 端用來保存發送請求且沒有響應的隊列,保證 Produce r端未響應的請求個數為 1。

at-most-once

如果在 Ack 超時或返回錯誤時 Producer 不重試,也就是我們講 request.required.acks = -1,則該消息可能最終沒有寫入 Kafka,所以 Consumer 不會接收消息。

exactly-once

剛好一次,即使 Producer 重試發送消息,消息也會保證最多一次地傳遞給 Consumer。該語義是最理想的,也是最難做到的。

在 0.10 之前並不能保證 exactly-once,需要使用 Consumer 自帶的冪等性保證。0.11.0 使用事務保證了。

如何做到 exactly-once

要做到 exactly-once 在 Kafka 0.11.0 中有兩個官方策略:

單 Producer 單 Topic

每個 Producer 在初始化的時候都會被分配一個唯一的 PID,對於每個唯一的 PID,Producer 向指定的 Topic 中某個特定的 Partition 發送的消息都會攜帶一個從 0 單調遞增的 Sequence Number。

在我們的 Broker 端也會維護一個維度為,每次提交一次消息的時候都會對齊進行校驗:

  • 如果消息序號比 Broker 維護的序號大一以上,說明中間有數據尚未寫入,也即亂序,此時 Broker 拒絕該消息,Producer 拋出 InvalidSequenceNumber。

  • 如果消息序號小於等於 Broker 維護的序號,說明該消息已被保存,即為重復消息,Broker 直接丟棄該消息,Producer 拋出 DuplicateSequenceNumber。

  • 如果消息序號剛好大一,就證明是合法的。

上面所說的解決了兩個問題:

  • 當 Prouducer 發送了一條消息之後失敗,Broker 並沒有保存,但是第二條消息卻發送成功,造成了數據的亂序。

  • 當 Producer 發送了一條消息之後,Broker 保存成功,Ack 回傳失敗,Producer 再次投遞重復的消息。

上面所說的都是在同一個 PID 下面,意味著必須保證在單個 Producer 中的同一個 Seesion 內,如果 Producer 掛了,被分配了新的 PID,這樣就無法保證了,所以 Kafka 中又有事務機制去保證。

事務

在 Kafka 中事務的作用是:

  • 做到 exactly-once 語義。

  • 保證操作的原子性,要麼全部成功,要麼全部失敗。

  • 有狀態的操作的恢復。

事務可以保證就算跨多個,在本次事務中的對消費隊列的操作都當成原子性,要麼全部成功,要麼全部失敗。

並且,有狀態的應用也可以保證重啟後從斷點處繼續處理,也即事務恢復。

在 Kafka 的事務中,應用程序必須提供一個唯一的事務 ID,即 Transaction ID,並且宕機重啟之後,也不會發生改變。

Transactin ID 與 PID 可能一一對應,區別在於 Transaction ID 由用戶提供,而 PID 是內部的做到對用戶透明。

為了 Producer 重啟之後,舊的 Producer 具有相同的 Transaction ID 失效,每次 Producer 通過 Transaction ID 拿到 PID 的同時,還會獲取一個單調遞增的 Epoch。

由於舊的 Producer 的 Epoch 比新 Producer 的 Epoch 小,Kafka 可以很容易識別出該 Producer 是老的,Producer 並拒絕其請求。

為了做到這一點,Kafka 0.11.0.0 引入了一個服務器端的模塊,名為 Transaction Coordinator,用於管理 Producer 發送的消息的事務性。

該 Transaction Coordinator 維護 Transaction Log,該 Log 存於一個內部的 Topic 內。

由於 Topic 數據具有持久性,因此事務的狀態也具有持久性。Producer 並不直接讀寫 Transaction Log,它與 Transaction Coordinator 通信,然後由 Transaction Coordinator 將該事務的狀態插入相應的 Transaction Log。

Transaction Log 的設計與 Offset Log 用於保存 Consumer 的 Offset 類似。

最後

關於消息隊列或者 Kafka 的一些常見的面試題,通過上面的文章可以提煉出以下幾個比較經典的問題,大部分問題都可以從上面總結後找到答案:

  • 為什麼使用消息隊列?消息隊列的作用是什麼?

  • Kafka 的 Topic 和分區內部是如何存儲的,有什麼特點?

  • 與傳統的消息系統相比,Kafka 的消費模型有什麼優點?

  • Kafka 如何做到分布式的數據存儲與數據讀取?

  • Kafka 為什麼比 RocketMQ 支持的單機 Partition 要少?

  • 為什麼需要分區,也就是說主題只有一個分區,難道不行嗎?

  • 日志為什麼需要分段?

  • Kafka 是依靠什麼機制保持高可靠,高可用?

  • 消息隊列如何保證消息冪等?

  • 讓你自己設計個消息隊列,你會怎麼設計,會考慮哪些方面?

作者:李釗

簡介:目前就職於美團點評餐飲生態技術部,喜歡鑽研閱讀開源源碼。招3年以上Java開發,請發送簡歷到郵箱:[email protected]

編輯:陶家龍、孫淑娟

出處:本文經授權轉載自咖啡拿鐵(ID:close_3092860495)微信公眾號

精彩文章推薦:

餓了麼容器平台的演進,看這篇文章就夠了!

NoSQL還是SQL?這一篇講清楚

第一次有人把「分布式事務」講的這麼簡單明了

[do_widget id=yuzo_widget-4] [do_widget id=yuzo_widget-9] 流行