一、groupid的定義

在使用Kafka的時候,我們經(jīng)常會看到group.id這個配置項(xiàng),它是一個字符串類型的配置項(xiàng)。具體來說,每個消費(fèi)者都有一個group id,一般情況下我們可以將同樣處理某個數(shù)據(jù)源的消費(fèi)者放置在一組中,使用group id進(jìn)行標(biāo)識。
舉個例子,如果你有一個在多個地方運(yùn)行的日志處理程序,每個程序都會處理某個topic的消息,那么你可以用相同的group id來標(biāo)識這個處理組,以確保傳遞給組中的每個處理程序的消息是唯一的。
二、groupid的作用
Kafka通過group id分配消費(fèi)者之間的消息,確保一個組內(nèi)的消費(fèi)者不會接收到相同的消息。當(dāng)同一個group id下的多個消費(fèi)者訂閱了同一個topic時,每個消息將只能被一個消費(fèi)者消費(fèi)。
在多個消費(fèi)者共同消費(fèi)一個topic的場景下,可以通過groupid來做load balance,即通過groupid的設(shè)置,部署多個消費(fèi)者實(shí)例來對消息進(jìn)行消費(fèi)。
三、groupid的注意事項(xiàng)
1、group id需要唯一
在同一個Kafka集群中,group id需要唯一,如果兩個group使用了相同的groupid,它們就會消費(fèi)相同的消息,造成消息的重復(fù)消費(fèi)。
2、重新啟動后,groupid也需要唯一
如果在同一個group中,消費(fèi)者重啟或新加入消費(fèi)者組,那么每次加入新消費(fèi)者之前,需要確保添加的消費(fèi)者的group id在之前沒有被使用過。
3、group id的更改會導(dǎo)致消費(fèi)者重新從頭開始消費(fèi)
Kafka集群會為group id下的每個消費(fèi)者保存消費(fèi)的偏移量,如果group id被更改,消費(fèi)者將會從頭開始消費(fèi)。
四、實(shí)例代碼
// 配置項(xiàng)
properties.put("group.id", "test-group");
// 創(chuàng)建消費(fèi)者
KafkaConsumer consumer = new KafkaConsumer<>(properties);
// 訂閱topic
consumer.subscribe(Arrays.asList("test-topic"));
// 消費(fèi)消息
while (true) {
ConsumerRecords records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
}
五、小結(jié)
Kafka是一個分布式的消息隊(duì)列,通過group id來保證消費(fèi)者組內(nèi)的消息處理具有唯一性,可以做到消息的負(fù)載均衡和處理組內(nèi)消息的互斥性。在使用時需要注意group id的唯一性以及更改group id的影響等問題。

京公網(wǎng)安備 11010802030320號