1、消费组状态
消费组有5个状态(GroupState):
- Empty:当前无成员
- PreparingRebalance:准备重平衡
- AwaitingSync:等待 Leader 成员制定分配方案
- Stable:重平衡完成可正常工作
- Dead:当前无成员且元数据信息被删除
状态转换图如下:
2、消费者状态
消费组者有3个状态(MemberState):
- UNJOINED:客户端未加入组
- REBALANCING:正在重平衡
- STABLE:客户端已加入组并且发送心跳
3、重平衡机制
rebalance,重平衡是一个协议。
规定了一个消费者组下的所有消费者如何达成一致来分配订阅主题的所有分区。
比如一个消费組有5个消费者,订阅了一个有10个分区的主题,则每个消费者平均分配2个分区,这个分配的过程就是重平衡。
触发条件
rebalance的触发条件有下面3种:
- 组成员发生变更。(新 consumer 加入组、己有 consumer 离开组、己有 consumer 崩溃)
- 订阅的 topic 数发生变更。(使用正则表达式订阅主题,新创建的topic被匹配到)
- 订阅的 topic 的分区数发生变更。
4、分配策略
分区分配策略决定了订阅 topic 的每个分区会被分配给哪个消费者。
有以下三种策略:
- range
- round-robin
- ticky
1、range 策略
RangeAssignor,将单个 topic 的所有分区按照顺序排列,然后把这些分区划分成固定大小的分区段井依次分配给每个消费者。
比如主题t0有6个分区(t0-0, t0-1, t0-2, t0-3, t0-4, t0-5),3个消费者(c1、c2、c3),分配方案如下:
- c1:t0-0, t0-1
- c2:t0-2, t0-3
- c3:t0-4, t0-5
日志如下:
Finished assignment for group test-group: {c2-b43c8ddf-f4e7-4aa8-8e54-9f3caedd6839=Assignment(partitions=[t0-2, t0-3]), c3-7594a2c1-37e3-4b40-86fb-0603befecace=Assignment(partitions=[t0-4, t0-5]), c1-f55d10ae-1994-4ecc-8cee-d0a5ed5eca27=Assignment(partitions=[t0-0, t0-1])} (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
2、round-robin 策略
RoundRobinAssignor,会把所有 topic 的所有分区顺序摆开,然后轮询式地分配给各个消费者。
比如主题t0有6个分区(t0-0, t0-1, t0-2, t0-3, t0-4, t0-5),3个消费者(c1、c2、c3),分配方案如下:
- c1:t0-0, t0-3
- c2:t0-1, t0-4
- c3:t0-2, t0-5
日志如下:
Finished assignment for group test-group: {c2-b43c8ddf-f4e7-4aa8-8e54-9f3caedd6839=Assignment(partitions=[t0-2, t0-3]), c3-7594a2c1-37e3-4b40-86fb-0603befecace=Assignment(partitions=[t0-4, t0-5]), c1-f55d10ae-1994-4ecc-8cee-d0a5ed5eca27=Assignment(partitions=[t0-0, t0-1])} (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
3、sticky 策略
StickyAssignor,粘性分配策略,有效避免了以上两种策略完全无视历史分配方案的缺陷,分区分配尽可能与上次分配的保持相同。
比如主题t0有6个分区(t0-0, t0-1, t0-2, t0-3, t0-4, t0-5),3个消费者(c1、c2、c3)
若使用round-robin策略,分区分配如下:
- c1:t0-0, t0-3
- c2:t0-1, t0-4
- c3:t0-2, t0-5
当c2挂掉后,分区分配如下:
- c1:t0-0, t0-2, t0-4
- c3:t0-1, t0-3, t0-5
有5个分区t0-2、t0-3、t0-1, t0-4、t0-2发生了变化。
若使用sticky策略,分区分配如下:
- c1:t0-0, t0-1
- c2:t0-2, t0-4
- c3:t0-3, t0-5
当c2挂掉后,分区分配如下:
- c1:t0-0, t0-1, t0-2
- c3:t0-3, t0-5, t0-4
只有之前c2上的2个分区(t0-2, t0-4)发生了变化。
5、重平衡流程
主要涉及三个阶段:
- FIND_COORDINATOR,寻找管理该消费者组的 GroupCoordinator 所在broker。
- JOIN_GROUP,向消费者组加入新成员。
- SYNC_GROUP,向所有消费者组成员同步分区分配方案。
6、组件
1、ConsumerCoordinator
客户端的组件,负责 Consumer 与服务端 GroupCoordinator 之间的通信逻辑。
2、GroupCoordinator
服务端Broker上的组件,用于管理和协调消费者组的成员、状态、分区分配、偏移量等信息。
7、示例
消费端示例代码如下:
public static void main(String[] args) throws Exception {
// 1、配置参数
Properties properties = new Properties();
properties.put("bootstrap.servers", "localhost:9092");
properties.put("group.id", "test-group");
properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
// 2、创建实例
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
// 3、订阅主题
consumer.subscribe(Arrays.asList("topic_a"));
// 4、消费消息
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(1000);
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s\n", record.offset(), record.key(), record.value());
}
}
} finally {
consumer.close();
}
}
8、原理
Consumer poll 方法的真正实现是在 pollOnce() 方法中,流程如下:
- 连接 GroupCoordinator,加入組,获取其分配的分区列表
- 查询消费組在每个分区的消费进度
- 向分配分区的所在节点发送fetch请求,拉取数据
- 如果group需要重平衡,直接返回空集合,这样可以便于group尽快达到一个稳定的状态
private Map<TopicPartition, List<ConsumerRecord<K, V>>> pollOnce(long timeout) {
// 1、连接 GroupCoordinator,加入組,获取其分配的分区列表
coordinator.poll(time.milliseconds());
// 2、查询消费組在每个分区的消费进度
if (!subscriptions.hasAllFetchPositions())
updateFetchPositions(this.subscriptions.missingFetchPositions());
// 3、向分配分区的所在节点发送fetch请求,拉取数据
Map<TopicPartition, List<ConsumerRecord<K, V>>> records = fetcher.fetchedRecords();
if (!records.isEmpty())
return records;
fetcher.sendFetches();
long now = time.milliseconds();
long pollTimeout = Math.min(coordinator.timeToNextPoll(now), timeout);
client.poll(pollTimeout, now, new PollCondition() {
@Override
public boolean shouldBlock() {
return !fetcher.hasCompletedFetches();
}
});
// 4、如果group需要重平衡,直接返回空集合,这样可以便于group尽快达到一个稳定的状态
if (coordinator.needRejoin())
return Collections.emptyMap();
return fetcher.fetchedRecords();
}
连接 GroupCoordinator,并发送 join-group、sync-group 请求,加入 group 成功,并获取其分配的 tp 列表;
- 通过 subscribe() 方法订阅 topic,并且 coordinator 未知,初始化 Consumer Coordinator
- 判断是否需要重新加入 group
- 检查心跳线程运行是否正常,如果心跳线程失败,则抛出异常,反之更新 poll 调用的时间
- 如果设置的是自动 commit,如果定时达到自动 commit
public void poll(long now) {
invokeCompletedOffsetCommitCallbacks();
// 1、若订阅模式是分区自动分配模式(AUTO_TOPICS、AUTO_PATTERN),并且组协调器未知
if (subscriptions.partitionsAutoAssigned() && coordinatorUnknown()) {
// 确保组协调器已准备就绪(FIND_COORDINATOR)
ensureCoordinatorReady();
now = time.milliseconds();
}
// 2、判断是否需要重新加入 group(订阅模式是分区自动分配模式的情况)
if (needRejoin()) {
if (subscriptions.hasPatternSubscription())
client.ensureFreshMetadata();
// 新成员加入组(JOIN_GROUP、SYNC_GROUP)
ensureActiveGroup();
now = time.milliseconds();
}
// 3、检查心跳线程运行是否正常,如果心跳线程失败,则抛出异常,反之更新 poll 调用的时间
pollHeartbeat(now);
// 4、如果设置的是自动 commit,如果定时达到自动 commit
maybeAutoCommitOffsetsAsync(now);
}
public void ensureActiveGroup() {
// 确保组协调器已准备就绪
ensureCoordinatorReady();
// 心跳线程不存在,则创建并开启
startHeartbeatThreadIfNeeded();
// 新成员加入组
joinGroupIfNeeded();
}
8.1、FIND_COORDINATOR
Find_Coordinator阶段是客户端向Broker发起GroupCoordinator请求,查找组协调器的过程,流程如下:
-
客户端选择一个负载最小的broker节点,发送 GroupCoordinator 请求
-
服务端接收 GroupCoordinator 请求
2.1. 计算该消费组的offset是由主题__consumer_offsets的哪个分区来保存的,算法:// groupMetadataTopicPartitionCount默认为50 Utils.abs(groupId.hashCode) % groupMetadataTopicPartitionCount
2.2. 获取主题__consumer_offsets的元数据,若主题__consumer_offsets不存在则创建
2.3. 查询该消费组对应的分区的leader副本在哪个broker上
2.4. 将该broker的元数据(ip、端口等)返回客户端 -
客户端从返回结果解析出broker元数据,初始化tcp连接,更新心跳时间
8.1.1、客户端请求
客户端发送GroupCoordinator请求,在ensureCoordinatorReady方法中,逻辑如下:
protected synchronized boolean ensureCoordinatorReady(long startTimeMs, long timeoutMs) {
long remainingMs = timeoutMs;
while (coordinatorUnknown()) {
...
// 选择一个负载最小的节点,发送 GroupCoordinator 请求
RequestFuture<Void> future = lookupCoordinator();
client.poll(future, remainingMs);
...
}
return !coordinatorUnknown();
}
protected synchronized RequestFuture<Void> lookupCoordinator() {
if (findCoordinatorFuture == null) {
// 1、选择一个负载最小的节点
Node node = this.client.leastLoadedNode();
// 2、发送 GroupCoordinator 请求
findCoordinatorFuture = sendGroupCoordinatorRequest(node);
}
return findCoordinatorFuture;
}
// 发送 GroupCoordinator请求
private RequestFuture<Void> sendGroupCoordinatorRequest(Node node) {
GroupCoordinatorRequest.Builder requestBuilder = new GroupCoordinatorRequest.Builder(this.groupId);
return client.send(node, requestBuilder).compose(new GroupCoordinatorResponseHandler());
}
// 对GroupCoordinator请求的返回结果进行处理
private class GroupCoordinatorResponseHandler extends RequestFutureAdapter<ClientResponse, Void> {
@Override
public void onSuccess(ClientResponse resp, RequestFuture<Void> future) {
...
}
@Override
public void onFailure(RuntimeException e, RequestFuture<Void> future) {
...
}
}
8.1.2、服务端处理
服务端接收到GroupCoordinator请求后,处理逻辑如下:
-
计算该消费组的offset是由主题__consumer_offsets的哪个分区来保存的,算法如下:
// groupMetadataTopicPartitionCount默认为50 Utils.abs(groupId.hashCode) % groupMetadataTopicPartitionCount
-
获取主题__consumer_offsets的元数据,若主题__consumer_offsets不存在则创建
-
查询该消费组对应的分区的leader副本在哪个broker上
-
将该broker的元数据(ip、端口等)返回客户端
def handleGroupCoordinatorRequest(request: RequestChannel.Request) {
....
// 1、计算该消费组的offset是由主题__consumer_offsets的哪个分区来保存的
// 算法:groupId的hashCode值对分区数(默认50)取余。[Utils.abs(groupId.hashCode) % groupMetadataTopicPartitionCount]
val partition = coordinator.partitionFor(groupCoordinatorRequest.groupId)
// 2、获取主题__consumer_offsets的元数据,若__consumer_offsets不存在则创建
val offsetsTopicMetadata = getOrCreateGroupMetadataTopic(request.listenerName)
val responseBody = if (offsetsTopicMetadata.error != Errors.NONE) {
new GroupCoordinatorResponse(Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code, Node.noNode)
} else {
// 3、该Group对应的分区的leader副本在哪个broker上
val coordinatorEndpoint = offsetsTopicMetadata.partitionMetadata().asScala
.find(_.partition == partition)
.map(_.leader())
coordinatorEndpoint match {
case Some(endpoint) if !endpoint.isEmpty =>
// 4、将该broker的元数据(ip、端口等信息)返回客户端
new GroupCoordinatorResponse(Errors.NONE.code, endpoint)
case _ =>
new GroupCoordinatorResponse(Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code, Node.noNode)
}
}
requestChannel.sendResponse(new RequestChannel.Response(request, responseBody))
}
8.1.3、客户端响应
// 对GroupCoordinator请求的响应进行处理
private class GroupCoordinatorResponseHandler extends RequestFutureAdapter<ClientResponse, Void> {
@Override
public void onSuccess(ClientResponse resp, RequestFuture<Void> future) {
GroupCoordinatorResponse groupCoordinatorResponse = (GroupCoordinatorResponse) resp.responseBody();
...
// 1、从返回结果解析出组协调器所在的broker的元数据(host、port)
AbstractCoordinator.this.coordinator = new Node(
Integer.MAX_VALUE - groupCoordinatorResponse.node().id(),
groupCoordinatorResponse.node().host(),
groupCoordinatorResponse.node().port());
// 2、初始化 tcp 连接
client.tryConnect(coordinator);
// 3、更新心跳时间
heartbeat.resetTimeouts(time.milliseconds());
...
}
@Override
public void onFailure(RuntimeException e, RequestFuture<Void> future) {
clearFindCoordinatorFuture();
super.onFailure(e, future);
}
}
8.2、JOIN_GROUP
流程如下:
- 发送 JoinGroupRequest 请求前的准备工作(自动提交位移等)
- 发送 JoinGroupRequest 请求
2.1. 停止心跳线程
2.2. 客户端状态设置为 REBALANCING
2.3. 发送 JoinGroup 请求
2.4. 添加监听器(在重平衡结束后,会调用)
请求头
请求体
响应体
8.2.1、客户端请求
void joinGroupIfNeeded() {
while (needRejoin() || rejoinIncomplete()) {
...
// 1、发送 JoinGroupRequest 请求前的准备工作(自动提交位移等)
if (needsJoinPrepare) {
onJoinPrepare(generation.generationId, generation.memberId);
needsJoinPrepare = false;
}
// 2、发送 JoinGroupRequest 请求
RequestFuture<ByteBuffer> future = initiateJoinGroup();
client.poll(future);
resetJoinGroupFuture();
...
}
}
private synchronized RequestFuture<ByteBuffer> initiateJoinGroup() {
if (joinFuture == null) {
// 2.1、停止心跳线程
disableHeartbeatThread();
// 2.2、客户端状态设置为 REBALANCING
state = MemberState.REBALANCING;
// 2.3、发送 JoinGroup 请求
joinFuture = sendJoinGroupRequest();
// 2.4、添加监听器
joinFuture.addListener(new RequestFutureListener<ByteBuffer>() {
@Override
public void onSuccess(ByteBuffer value) {
synchronized (AbstractCoordinator.this) {
// 若成功,将客户端状态设置为 STABLE,启动心跳线程
state = MemberState.STABLE;
if (heartbeatThread != null)
heartbeatThread.enable();
}
}
@Override
public void onFailure(RuntimeException e) {
synchronized (AbstractCoordinator.this) {
// 若失败,将客户端状态设置为 UNJOINED
state = MemberState.UNJOINED;
}
}
});
}
return joinFuture;
}
private RequestFuture<ByteBuffer> sendJoinGroupRequest() {
// 2.3.1、构建JoinGroup请求
JoinGroupRequest.Builder requestBuilder = new JoinGroupRequest.Builder(
groupId, // test-group (消费組id)
this.sessionTimeoutMs, // 10000 (客户端与Broker会话有效期, 超过这个时间没有任何心跳则可能发生重平衡,默认10秒)
this.generation.memberId, // "" (消费者的成员id, 默认是空字符串)
protocolType(), // consumer(协议类型)
metadata() // (设置的分区分配策略、订阅的主题)
).setRebalanceTimeout(this.rebalanceTimeoutMs); // 300000 (rebalance超时时间,默认5分钟)
// 2.3.2、向组协调器发送JoinGroup请求
return client.send(coordinator, requestBuilder).compose(new JoinGroupResponseHandler());
}
public List<ProtocolMetadata> metadata() {
// 订阅的主题列表
this.joinedSubscription = subscriptions.subscription();
List<ProtocolMetadata> metadataList = new ArrayList<>();
// 分区分配策略(默认RangeAssignor)
for (PartitionAssignor assignor : assignors) {
Subscription subscription = assignor.subscription(joinedSubscription);
ByteBuffer metadata = ConsumerProtocol.serializeSubscription(subscription);
metadataList.add(new ProtocolMetadata(assignor.name(), metadata));
}
return metadataList;
}
// JoinGroup响应处理器
private class JoinGroupResponseHandler extends CoordinatorResponseHandler<JoinGroupResponse, ByteBuffer> {
@Override
public void handle(JoinGroupResponse joinResponse, RequestFuture<ByteBuffer> future) {
...
AbstractCoordinator.this.generation = new Generation(joinResponse.generationId(), joinResponse.memberId(), joinResponse.groupProtocol());
AbstractCoordinator.this.rejoinNeeded = false;
if (joinResponse.isLeader()) {
// leader cousumer的处理逻辑
onJoinLeader(joinResponse).chain(future);
} else {
// follower cousumer的处理逻辑
onJoinFollower().chain(future);
}
...
}
}
sendJoinGroupRequest()
方法是由 initiateJoinGroup()
方法来调用的。流程如下:
- 停止心跳线程
- Consumer状态设置为 REBALANCING
- 发送 JoinGroup 请求
- 添加监听器
4.1. 若成功,将Consumer状态设置为 STABLE,启动心跳线程
4.2. 若失败,将Consumer状态设置为 UNJOINED
sendJoinGroupRequest()
方法是由 initiateJoinGroup()
方法来调用的。流程如下:
- 构建JoinGroup请求参数
- 向组协调器发送JoinGroup请求
- 对JoinGroup请求的返回结果进行处理
3.1. 如果是 leader cousumer
3.2. 如果是 follower cousumer
8.2.2、服务端处理
流程如下:
- 处理 JoinGroup 请求
1.1. 为该成员创建 memberId(命名规则:clientId + "-" + UUID)
1.2. 创建 MemberMetadata 实例
1.3. 将成员添加到Group
1.4. 准备开启Rebalance
1.4.1. 将Group状态转移到 PreparingRebalance
1.4.2. 延迟操作执行delayedRebalance
1.5. 初始化下个Generation对象
1.5.1. generationId 自增+1
1.5.2. 分区分配策略,从所有成员都支持的的策略中投票选举出一个票数最高的策略
1.5.3. 将Group状态转移到 AwaitingSync
1.6. 循环遍历消费組的所有成员,构建响应结果
■ memberId、leaderId、generationId、分区分配策略
■ 如果member是leader,会把Group的所有member元信息全部返回给它
■ 如果member是follower,返回空集合 - 调用回调函数,返回客户端结果
def handleJoinGroupRequest(request: RequestChannel.Request) {
val joinGroupRequest = request.body.asInstanceOf[JoinGroupRequest]
// 1、处理 JoinGroup 请求
coordinator.handleJoinGroup(
joinGroupRequest.groupId,
joinGroupRequest.memberId,
request.header.clientId,
request.session.clientAddress.toString,
joinGroupRequest.rebalanceTimeout,
joinGroupRequest.sessionTimeout,
joinGroupRequest.protocolType,
protocols,
sendResponseCallback)
// 2、调用回调函数,返回客户端结果
def sendResponseCallback(joinResult: JoinGroupResult) {
val members = joinResult.members map { case (memberId, metadataArray) => (memberId, ByteBuffer.wrap(metadataArray)) }
val responseBody = new JoinGroupResponse(request.header.apiVersion, joinResult.errorCode, joinResult.generationId,
joinResult.subProtocol, joinResult.memberId, joinResult.leaderId, members.asJava)
requestChannel.sendResponse(new RequestChannel.Response(request, responseBody))
}
}
def handleJoinGroup(groupId: String,
memberId: String,
clientId: String,
clientHost: String,
rebalanceTimeoutMs: Int,
sessionTimeoutMs: Int,
protocolType: String,
protocols: List[(String, Array[Byte])],
responseCallback: JoinCallback) {
...
doJoinGroup(group, memberId, clientId, clientHost, rebalanceTimeoutMs, sessionTimeoutMs, protocolType, protocols, responseCallback)
...
}
private def doJoinGroup(group: GroupMetadata,
memberId: String,
clientId: String,
clientHost: String,
rebalanceTimeoutMs: Int,
sessionTimeoutMs: Int,
protocolType: String,
protocols: List[(String, Array[Byte])],
responseCallback: JoinCallback) {
...
// 向消费者组添加成员和准备Rebalance
addMemberAndRebalance(rebalanceTimeoutMs, sessionTimeoutMs, clientId, clientHost, protocolType, protocols, group, responseCallback)
...
}
private def addMemberAndRebalance(rebalanceTimeoutMs: Int,
sessionTimeoutMs: Int,
clientId: String,
clientHost: String,
protocolType: String,
protocols: List[(String, Array[Byte])],
group: GroupMetadata,
callback: JoinCallback) = {
// 1.1、为该成员创建 memberId(命名规则:clientId + "-" + UUID)
val memberId = clientId + "-" + group.generateMemberIdSuffix
// 1.2、创建 MemberMetadata 实例
val member = new MemberMetadata(memberId, group.groupId, clientId, clientHost, rebalanceTimeoutMs, sessionTimeoutMs, protocolType, protocols)
member.awaitingJoinCallback = callback
// 1.3、将成员添加到Group
group.add(member)
// 1.4、准备开启Rebalance
maybePrepareRebalance(group)
member
}
private def maybePrepareRebalance(group: GroupMetadata) {
group synchronized {
if (group.canRebalance)
prepareRebalance(group)
}
}
private def prepareRebalance(group: GroupMetadata) {
...
// 1.4.1、将Group状态转移到 PreparingRebalance
group.transitionTo(PreparingRebalance)
val rebalanceTimeout = group.rebalanceTimeoutMs
val delayedRebalance = new DelayedJoin(this, group, rebalanceTimeout)
val groupKey = GroupKey(group.groupId)
// 1.4.2、延迟执行delayedRebalance
joinPurgatory.tryCompleteElseWatch(delayedRebalance, Seq(groupKey))
}
以上的步骤,只是把Member加入到了Group中, 这不能算是Join成功了, Join成功与否需要把状态返回给消费者客户端,需要调用onCompleteJoin方法。
def onCompleteJoin(group: GroupMetadata) {
...
// 初始化下个Generation对象
group.initNextGeneration()
// 循环遍历消费組的所有成员
for (member <- group.allMemberMetadata) {
// 构建响应结果
val joinResult = JoinGroupResult(
// 如果member是leader,会把Group的所有member元信息全部返回给它
// 如果member是follower,返回空数据
members = if (member.memberId == group.leaderId) { group.currentMemberMetadata } else { Map.empty },
memberId=member.memberId,
generationId=group.generationId,
subProtocol=group.protocol,
leaderId=group.leaderId,
errorCode=Errors.NONE.code)
// 调用回调函数返回结果给客户端
member.awaitingJoinCallback(joinResult)
member.awaitingJoinCallback = null
completeAndScheduleNextHeartbeatExpiration(group, member)
}
}
}
}
delayedStore.foreach(groupManager.store)
}
def initNextGeneration() = {
...
// generationId 自增+1
generationId += 1
// 分区分配策略,从所有成员都支持的的策略中投票选举出一个票数最高的策略
protocol = selectProtocol
// 将Group状态转移到 AwaitingSync
transitionTo(AwaitingSync)
...
}
8.2.3、客户端响应
从返回结果判断当前消费者是Leader还是Follower。流程如下:
- 如果是Leader
1.1. 制定分区分配方案
1.2. 发送 SyncGroup 请求(携带分区分配方案) - 如果是Follower
2.1. 发送 SyncGroup 请求(不携带分区分配方案,传空)
源码如下:
private class JoinGroupResponseHandler extends CoordinatorResponseHandler<JoinGroupResponse, ByteBuffer> {
@Override
public void handle(JoinGroupResponse joinResponse, RequestFuture<ByteBuffer> future) {
...
if (joinResponse.isLeader()) {
// 1、被组协调器指定为Leader
onJoinLeader(joinResponse).chain(future);
} else
// 2、被组协调器指定为Follower
onJoinFollower().chain(future);
}
...
}
private RequestFuture<ByteBuffer> onJoinLeader(JoinGroupResponse joinResponse) {
...
// 1.1、制定分区分配方案
Map<String, ByteBuffer> groupAssignment = performAssignment(joinResponse.leaderId(), joinResponse.groupProtocol(),
joinResponse.members());
SyncGroupRequest.Builder requestBuilder =
new SyncGroupRequest.Builder(groupId, generation.generationId, generation.memberId, groupAssignment);
// 1.2、发送 SyncGroup 请求(携带分区分配方案)
sendSyncGroupRequest(requestBuilder);
...
}
// 当 consumer 为 follower 时,从 GroupCoordinator 拉取分配结果
private RequestFuture<ByteBuffer> onJoinFollower() {
SyncGroupRequest.Builder requestBuilder =
new SyncGroupRequest.Builder(groupId, generation.generationId, generation.memberId,
Collections.<String, ByteBuffer>emptyMap());
// 2.1、发送 SyncGroup 请求(不携带分区分配方案,传空)
return sendSyncGroupRequest(requestBuilder);
}
分配方案:
8.3、SYNC_GROUP
8.3.1、客户端请求
上一小节,JOIN_GROUP返回结果中会判断当前消费者是Leader还是Follower。如果是Leader,会先制定分区分配方案,然后把分配方案通过SyncGroup 请求发送到服务端; 如果是Follower,也同样会发送SyncGroup 请求,不过它携带的分配方案是空。发送请求代码如下:
// 发送 SyncGroup 请求,获取对 partition 分配的安排
private RequestFuture<ByteBuffer> sendSyncGroupRequest(SyncGroupRequest.Builder requestBuilder) {
return client.send(coordinator, requestBuilder).compose(new SyncGroupResponseHandler());
}
private class SyncGroupResponseHandler extends CoordinatorResponseHandler<SyncGroupResponse, ByteBuffer> {
@Override
public void handle(SyncGroupResponse syncResponse, RequestFuture<ByteBuffer> future) {
...
future.complete(syncResponse.memberAssignment());
...
}
8.3.2、服务端处理
流程如下:
- 处理 SyncGroup 请求
- 回调函数,返回客户端结果
请求体
def handleSyncGroupRequest(request: RequestChannel.Request) {
val syncGroupRequest = request.body.asInstanceOf[SyncGroupRequest]
...
// 1、处理 SyncGroup 请求
coordinator.handleSyncGroup(
syncGroupRequest.groupId(),
syncGroupRequest.generationId(),
syncGroupRequest.memberId(),
syncGroupRequest.groupAssignment().asScala.mapValues(Utils.toArray),
sendResponseCallback
)
...
// 2、回调函数,返回客户端结果
def sendResponseCallback(memberState: Array[Byte], errorCode: Short) {
val responseBody = new SyncGroupResponse(errorCode, ByteBuffer.wrap(memberState))
requestChannel.sendResponse(new Response(request, responseBody))
}
}
def handleSyncGroup(groupId: String,
generation: Int,
memberId: String,
groupAssignment: Map[String, Array[Byte]],
responseCallback: SyncCallback) {
...
doSyncGroup(group, generation, memberId, groupAssignment, responseCallback)
...
}
private def doSyncGroup(group: GroupMetadata,
generationId: Int,
memberId: String,
groupAssignment: Map[String, Array[Byte]],
responseCallback: SyncCallback) {
var delayedGroupStore: Option[DelayedStore] = None
...
// 设置回调函数
group.get(memberId).awaitingSyncCallback = responseCallback
// leader成员发送的SyncGroupRequest请求需要特殊处理
if (memberId == group.leaderId) {
// 如果有成员没有分配到分区,则创建一个空的方案给它
val missing = group.allMembers -- groupAssignment.keySet
val assignment = groupAssignment ++ missing.map(_ -> Array.empty[Byte]).toMap
delayedGroupStore = groupManager.prepareStoreGroup(group, assignment, (error: Errors) => {
group synchronized {
if (group.is(AwaitingSync) && generationId == group.generationId) {
if (error != Errors.NONE) {
resetAndPropagateAssignmentError(group, error)
maybePrepareRebalance(group)
} else {
setAndPropagateAssignment(group, assignment)
group.transitionTo(Stable)
}
}
}
})
}
...
delayedGroupStore.foreach(groupManager.store)
}
8.3.3、客户端响应
private class SyncGroupResponseHandler extends CoordinatorResponseHandler<SyncGroupResponse, ByteBuffer> {
@Override
public void handle(SyncGroupResponse syncResponse,
RequestFuture<ByteBuffer> future) {
Errors error = syncResponse.error();
if (error == Errors.NONE) {
sensors.syncLatency.record(response.requestLatencyMs());
// SyncGroup 成功,重平衡完成
future.complete(syncResponse.memberAssignment());
} else {
// SyncGroup 失败,重新加入组
requestRejoin();
...
}
}
}
SyncGroup
成功,则消费者就加入 group 成功了,之后,将会触发ConsumerCoordinator 的 onJoinComplete
方法如下:
// 加入 group 成功
protected void onJoinComplete(int generation, // 重平衡次数
String memberId, // 消费者标识
String assignmentStrategy, // 分配策略
ByteBuffer assignmentBuffer) { // 分配方案
...
Assignment assignment = ConsumerProtocol.deserializeAssignment(assignmentBuffer);
subscriptions.needRefreshCommits();
// 1、更新新的分区分配方案到缓存
subscriptions.assignFromSubscribed(assignment.partitions());
// 2、更新元数据,确保在下一次循环中可以拉取数据
this.metadata.setTopics(subscriptions.groupSubscription());
client.ensureFreshMetadata();
assignor.onAssignment(assignment);
this.nextAutoCommitDeadline = time.milliseconds() + autoCommitIntervalMs;
ConsumerRebalanceListener listener = subscriptions.listener();
try {
Set<TopicPartition> assigned = new HashSet<>(subscriptions.assignedPartitions());
listener.onPartitionsAssigned(assigned);
} catch (WakeupException | InterruptException e) {
throw e;
} catch (Exception e) {
}
}
在之前发送JOIN_GROUP请求的第2.4步,注册了一个监听器,在重平衡完成后,会调用,如下:
joinFuture.addListener(new RequestFutureListener<ByteBuffer>() {
@Override
public void onSuccess(ByteBuffer value) {
synchronized (AbstractCoordinator.this) {
// 成员状态置为 STABLE
state = MemberState.STABLE;
rejoinNeeded = false;
if (heartbeatThread != null)
// 开启心跳线程
heartbeatThread.enable();
}
}
@Override
public void onFailure(RuntimeException e) {
synchronized (AbstractCoordinator.this) {
// 成员状态置为 UNJOINED
state = MemberState.UNJOINED;
}
}
});
8.4、HEART_BEAT
Kafka中的心跳机制是消费者与GroupCoordinator之间保持联系的重要方式。只要消费者以正常的时间间隔发送心跳,就被认为是活跃的,说明它还在拉取分区中的消息。如果消费者停止发送心跳的时间过长,GroupCoordinator会认为该消费者已离开消费组,并触发重平衡操作。因此,心跳机制对于维护消费者组的稳定性和避免不必要的重平衡非常重要。
8.4.1、客户端请求
Heartbeat类字段如下:
// 会话超时时间
private final long sessionTimeout;
// 发送心跳的时间间隔
private final long heartbeatInterval;
// 最大的poll时间间隔
private final long maxPollInterval;
// 重试时间
private final long retryBackoffMs;
// 上一次发送心跳的时间
private volatile long lastHeartbeatSend;
// 上一次接收心跳的响应时间
private long lastHeartbeatReceive;
// 心跳重置的时间
private long lastSessionReset;
// 上一次poll的时间
private long lastPoll;
// 是否成功
private boolean heartbeatFailed;
客户端开启心跳线程的入口在AbstractCoordinator类的startHeartbeatThreadIfNeeded方法,代码如下:
private synchronized void startHeartbeatThreadIfNeeded() {
// 心跳线程不存在,则创建并开启
if (heartbeatThread == null) {
heartbeatThread = new HeartbeatThread();
heartbeatThread.start();
}
}
心跳线程的核心逻辑在HeartbeatThread类的run方法中,是个while循环,流程如下:
- 是否关闭线程,若是,则退出
- 是否开启线程,若否,则进入下一次循环
- 若客户端的状态不是STABLE(可能是消费者离开消费组或者coordinator把它踢了),停止心跳线程,进入下一次循环
- 检测coordinator是否已连接,若没有连接,则查找coordinator
- 若会话超时,则将客户端持有的coordinator标记为空,重新寻找coordinator
- 若poll超时,则执行离组请求
- 若还没到下次发送心跳的时间,则等待
- 若到了下次发送心跳的时间,则发送心跳请求,并添加监听器
8.1. 若发送成功,更新lastHeartbeatReceive为当前时间(上一次接收心跳的响应时间)
8.2. 若发送失败
8.2.1. 若正在重平衡中,更新lastHeartbeatReceive为当前时间
8.2.2. 否则设置heartbeatFailed为true
HeartbeatThread心跳线程类代码如下:
private class HeartbeatThread extends KafkaThread {
private boolean enabled = false;
private boolean closed = false;
private AtomicReference<RuntimeException> failed = new AtomicReference<>(null);
private HeartbeatThread() {
super("kafka-coordinator-heartbeat-thread" + (groupId.isEmpty() ? "" : " | " + groupId), true);
}
// 开启心跳线程
public void enable() {
synchronized (AbstractCoordinator.this) {
this.enabled = true;
heartbeat.resetTimeouts(time.milliseconds());
AbstractCoordinator.this.notify();
}
}
// 停止心跳线程
public void disable() {
synchronized (AbstractCoordinator.this) {
log.trace("Disabling heartbeat thread for group {}", groupId);
this.enabled = false;
}
}
// 退出心跳线程
public void close() {
synchronized (AbstractCoordinator.this) {
this.closed = true;
AbstractCoordinator.this.notify();
}
}
private boolean hasFailed() {
return failed.get() != null;
}
private RuntimeException failureCause() {
return failed.get();
}
// 心跳线程处理逻辑
@Override
public void run() {
try {
log.debug("Heartbeat thread for group {} started", groupId);
while (true) {
synchronized (AbstractCoordinator.this) {
// 1、是否关闭线程,若是,则退出
if (closed)
return;
// 2、是否开启线程,若否,则进入下一次循环
if (!enabled) {
AbstractCoordinator.this.wait();
continue;
}
// 3、若客户端的状态不是STABLE(可能是消费者离开消费组或者coordinator把它踢了),停止心跳线程,进入下一次循环
if (state != MemberState.STABLE) {
disable();
continue;
}
client.pollNoWakeup();
long now = time.milliseconds();
// 4、检测coordinator是否已连接,若没有连接,则查找coordinator
if (coordinatorUnknown()) {
if (findCoordinatorFuture == null)
lookupCoordinator();
else
AbstractCoordinator.this.wait(retryBackoffMs);
} else if (heartbeat.sessionTimeoutExpired(now)) {
// 5、若会话超时,则将客户端持有的coordinator标记为空,重新寻找coordinator
coordinatorDead();
} else if (heartbeat.pollTimeoutExpired(now)) {
// 6、若poll超时,则执行离组请求
maybeLeaveGroup();
} else if (!heartbeat.shouldHeartbeat(now)) {
// 7、若还没到下次发送心跳的时间,则等待
AbstractCoordinator.this.wait(retryBackoffMs);
} else {
// 8、若到了下次发送心跳的时间,则发送心跳请求,并添加监听器
heartbeat.sentHeartbeat(now);
sendHeartbeatRequest().addListener(new RequestFutureListener<Void>() {
@Override
public void onSuccess(Void value) {
synchronized (AbstractCoordinator.this) {
heartbeat.receiveHeartbeat(time.milliseconds());
}
}
@Override
public void onFailure(RuntimeException e) {
synchronized (AbstractCoordinator.this) {
if (e instanceof RebalanceInProgressException) {
heartbeat.receiveHeartbeat(time.milliseconds());
} else {
heartbeat.failHeartbeat();
AbstractCoordinator.this.notify();
}
}
}
});
}
}
}
} catch (InterruptedException | InterruptException e) {
Thread.interrupted();
log.error("Unexpected interrupt received in heartbeat thread for group {}", groupId, e);
this.failed.set(new RuntimeException(e));
} catch (RuntimeException e) {
log.error("Heartbeat thread for group {} failed due to unexpected error", groupId, e);
this.failed.set(e);
} finally {
log.debug("Heartbeat thread for group {} has closed", groupId);
}
}
}
客户端发送心跳请求,代码如下:
synchronized RequestFuture<Void> sendHeartbeatRequest() {
// 1、构建心跳请求
HeartbeatRequest.Builder requestBuilder = new HeartbeatRequest.Builder(this.groupId, this.generation.generationId, this.generation.memberId);
// 2、发送心跳请求
return client.send(coordinator, requestBuilder).compose(new HeartbeatResponseHandler());
}
// 心跳请求回调函数
private class HeartbeatResponseHandler extends CoordinatorResponseHandler<HeartbeatResponse, Void> {
@Override
public void handle(HeartbeatResponse heartbeatResponse, RequestFuture<Void> future) {
...
}
}
8.4.2、服务端处理
请求体
响应体
def handleHeartbeatRequest(request: RequestChannel.Request) {
val heartbeatRequest = request.body.asInstanceOf[HeartbeatRequest]
...
// 1、处理 Heartbeat 请求
coordinator.handleHeartbeat(
heartbeatRequest.groupId(),
heartbeatRequest.memberId(),
heartbeatRequest.groupGenerationId(),
sendResponseCallback)
...
// 2、回调函数,返回客户端结果
def sendResponseCallback(errorCode: Short) {
val response = new HeartbeatResponse(errorCode)
requestChannel.sendResponse(new RequestChannel.Response(request, response))
}
}
def handleHeartbeat(groupId: String,
memberId: String,
generationId: Int,
responseCallback: Short => Unit) {
if (!isActive.get) {
responseCallback(Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code)
} else if (!isCoordinatorForGroup(groupId)) {
responseCallback(Errors.NOT_COORDINATOR_FOR_GROUP.code)
} else if (isCoordinatorLoadingInProgress(groupId)) {
responseCallback(Errors.NONE.code)
} else {
groupManager.getGroup(groupId) match {
case None =>
responseCallback(Errors.UNKNOWN_MEMBER_ID.code)
case Some(group) =>
group synchronized {
group.currentState match {
case Dead =>
responseCallback(Errors.UNKNOWN_MEMBER_ID.code)
case Empty =>
responseCallback(Errors.UNKNOWN_MEMBER_ID.code)
case AwaitingSync =>
if (!group.has(memberId))
responseCallback(Errors.UNKNOWN_MEMBER_ID.code)
else
responseCallback(Errors.REBALANCE_IN_PROGRESS.code)
case PreparingRebalance =>
if (!group.has(memberId)) {
responseCallback(Errors.UNKNOWN_MEMBER_ID.code)
} else if (generationId != group.generationId) {
responseCallback(Errors.ILLEGAL_GENERATION.code)
} else {
val member = group.get(memberId)
completeAndScheduleNextHeartbeatExpiration(group, member)
responseCallback(Errors.REBALANCE_IN_PROGRESS.code)
}
case Stable =>
if (!group.has(memberId)) {
responseCallback(Errors.UNKNOWN_MEMBER_ID.code)
} else if (generationId != group.generationId) {
responseCallback(Errors.ILLEGAL_GENERATION.code)
} else {
val member = group.get(memberId)
completeAndScheduleNextHeartbeatExpiration(group, member)
responseCallback(Errors.NONE.code)
}
}
}
}
}
}
private def completeAndScheduleNextHeartbeatExpiration(group: GroupMetadata, member: MemberMetadata) {
// 取消老的心跳超时任务
member.latestHeartbeat = time.milliseconds()
val memberKey = MemberKey(member.groupId, member.memberId)
heartbeatPurgatory.checkAndComplete(memberKey)
// 创建新的心跳超时任务
val newHeartbeatDeadline = member.latestHeartbeat + member.sessionTimeoutMs
val delayedHeartbeat = new DelayedHeartbeat(this, group, member, newHeartbeatDeadline, member.sessionTimeoutMs)
heartbeatPurgatory.tryCompleteElseWatch(delayedHeartbeat, Seq(memberKey))
}
心跳检测机制在completeAndScheduleNextHeartbeatExpiration方法中,流程如下:
- 开启一个延迟任务,延迟时间为心跳过期时间
- 心跳正常时,会在过期时间内接受到心跳,取消老的心跳超时任务,创建新的心跳超时任务
- 心跳超时时,心跳超时任务会被取出执行
心跳超时任务处理逻辑如下:
def onExpireHeartbeat(group: GroupMetadata, member: MemberMetadata, heartbeatDeadline: Long) {
group synchronized {
if (!shouldKeepMemberAlive(member, heartbeatDeadline))
onMemberFailure(group, member)
}
}
private def onMemberFailure(group: GroupMetadata, member: MemberMetadata) {
// 把过期成员从组内移除
group.remove(member.memberId)
group.currentState match {
// 不做任何操作
case Dead | Empty =>
// 重平衡(组状态置为PreparingRebalance)
case Stable | AwaitingSync => maybePrepareRebalance(group)
// 检查JOIN_GROUP延迟操作是否完成,若已完成,返回JOIN_GROUP响应
case PreparingRebalance => joinPurgatory.checkAndComplete(GroupKey(group.groupId))
}
}
8.4.3、客户端响应
服务端代码判断逻辑有14个分支,异常类型主要有以下几种:
- GROUP_COORDINATOR_NOT_AVAILABLE
- NOT_COORDINATOR_FOR_GROUP
- UNKNOWN_MEMBER_ID
- ILLEGAL_GENERATION
- REBALANCE_IN_PROGRESS
- NONE
客户端收到服务端的返回结果后,用HeartbeatResponseHandler来处理,不同的异常类型有不同的处理逻辑,对应关系如下:
若心跳正常,服务端返回的是 Errors.NONE,若心跳超时,服务端返回的是Errors.REBALANCE_IN_PROGRESS,需要重新加入组。客户端处理返回结果代码如下:
private class HeartbeatResponseHandler extends CoordinatorResponseHandler<HeartbeatResponse, Void> {
@Override
public void handle(HeartbeatResponse heartbeatResponse, RequestFuture<Void> future) {
sensors.heartbeatLatency.record(response.requestLatencyMs());
Errors error = Errors.forCode(heartbeatResponse.errorCode());
if (error == Errors.NONE) {
future.complete(null);
} else if (error == Errors.GROUP_COORDINATOR_NOT_AVAILABLE
|| error == Errors.NOT_COORDINATOR_FOR_GROUP) {
// 将客户端持有的coordinator标记为空,重新寻找coordinator
coordinatorDead();
future.raise(error);
} else if (error == Errors.REBALANCE_IN_PROGRESS) {
// 需要重新加入组
requestRejoin();
future.raise(Errors.REBALANCE_IN_PROGRESS);
} else if (error == Errors.ILLEGAL_GENERATION) {
// 重置generation为NO_GENERATION、成员状态为UNJOINED,需要重新加入组
resetGeneration();
future.raise(Errors.ILLEGAL_GENERATION);
} else if (error == Errors.UNKNOWN_MEMBER_ID) {
// 重置generation为NO_GENERATION、成员状态为UNJOINED,需要重新加入组
resetGeneration();
future.raise(Errors.UNKNOWN_MEMBER_ID);
} else if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
future.raise(new GroupAuthorizationException(groupId));
} else {
future.raise(new KafkaException("Unexpected error in heartbeat response: " + error.message()));
}
}
}
// 将客户端持有的coordinator标记为空,重新寻找coordinator
protected synchronized void coordinatorDead() {
if (this.coordinator != null) {
log.info("Marking the coordinator {} dead for group {}", this.coordinator, groupId);
client.failUnsentRequests(this.coordinator, GroupCoordinatorNotAvailableException.INSTANCE);
this.coordinator = null;
}
}
// 重置generation为NO_GENERATION、成员状态为UNJOINED,需要重新加入组
protected synchronized void resetGeneration() {
this.generation = Generation.NO_GENERATION;
this.rejoinNeeded = true;
this.state = MemberState.UNJOINED;
}
// 需要重新加入组
protected synchronized void requestRejoin() {
this.rejoinNeeded = true;
}
触发监听器:
sendHeartbeatRequest().addListener(new RequestFutureListener<Void
// 若成功,更新lastHeartbeatReceive为当前时间
@Override
public void onSuccess(Void value) {
synchronized (AbstractCoordinator.this) {
// 更新lastHeartbeatReceive为当前时间
heartbeat.receiveHeartbeat(time.milliseconds());
}
}
// 若失败
@Override
public void onFailure(RuntimeException e) {
synchronized (AbstractCoordinator.this) {
if (e instanceof RebalanceInProgressException) {
// 若正在重平衡中,更新lastHeartbeatReceive为当前时间
heartbeat.receiveHeartbeat(time.milliseconds());
} else {
// 否则设置heartbeatFailed为true
heartbeat.failHeartbeat();
// 唤醒线程
AbstractCoordinator.this.notify();
}
}
}
});
// 更新lastHeartbeatReceive为当前时间
public void receiveHeartbeat(long now) {
this.lastHeartbeatReceive = now;
}
// 设置heartbeatFailed为true
public void failHeartbeat() {
this.heartbeatFailed = true;
}
流程如下:
- 发送 JoinGroupRequest 请求前的准备工作(自动提交位移等)
- 发送 JoinGroupRequest 请求
2.1. 停止心跳线程
2.2. 客户端状态设置为 REBALANCING
2.3. 发送 JoinGroup 请求
2.4. 添加监听器(在重平衡结束后,会调用)
sendJoinGroupRequest()
方法是由 initiateJoinGroup()
方法来调用的。流程如下:
- 停止心跳线程
- Consumer状态设置为 REBALANCING
- 发送 JoinGroup 请求
- 添加监听器
4.1. 若成功,将Consumer状态设置为 STABLE,启动心跳线程
4.2. 若失败,将Consumer状态设置为 UNJOINED