Open Source, Open Future!
  menu
107 文章
ღゝ◡╹)ノ❤️

Kafka消息发送原理揭秘:深入理解消息传输机制 置顶!

1、创建主题

先用kafka脚本创建一个主题:topic_a,分区数:3,副本数:1

bin/kafka-topics.sh --create --topic topic_a --zookeeper localhost:2184 --partitions 3 --replication-factor 1

2、代码示例

创建一个生产者,发送消息,代码如下:

public static void main(String[] args) {
        // 1、配置参数
        Properties props = new Properties();
        props.put("bootstrap.servers", "127.0.0.1:9092");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        String topicName = "topic_a";
  
        // 2、创建生产者
        Producer<String, String> producer = new KafkaProducer<>(props);

        for (int i = 0; i < 3; i++) {
            String value = "hello" + i;
            System.out.printf("发送消息:%s\n", "hello" + i);
            // 3、创建消息
            ProducerRecord<String, String> record = new ProducerRecord<>(topicName, value);
            // 4、发送消息
            producer.send(record);
        }
        // 5、关闭生产者
        producer.close();
    }

3、原理

3.1、架构图

生产者由两个线程协调运行,分别为:主线程和Sender线程。

  • 在主线程中,KafkaProducer创建消息,然后依次经过拦截器、序列化器、分区器,最后缓存到消息累加器
  • Sender线程,从消息累加器获取消息,发送到kafka集群中

主要流程如下:

  1. 经过拦截器链处理
  2. 从服务端获取集群元数据并缓存
  3. 用序列化器对 key 和 value 序列化
  4. 获取要发送到的分区号
    4.1. 如果指定了分区,则使用指定的分区号
    4.2. 否则判断是否有key,有key,则对key进行哈希,根据哈希值计算分区号
    4.3. 如果都没有,则第一次时产生一个随机数,后面每次自增1,从可用的分区中选择一个分区
  5. 向消息累加器对应的分区中追加 record 数据,数据会先进行缓存;
  6. 若批次已满或新的批次被创建,则唤醒send线程,从消息累加器中取出数据来发送。

3.2、消息数据结构

public class ProducerRecord<K, V> {
    // 主题
    private final String topic;
    // 分区
    private final Integer partition;
    // 消息头部
    private final Headers headers;
    // 键
    private final K key;
    // 值
    private final V value;
    // 时间戳
    private final Long timestamp;
}

3.3、生产者

通过调用KafkaProducer的send方法发送消息,源码如下:

public Future<RecordMetadata> send(ProducerRecord<K, V> record) {
    return send(record, null);
}

public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
    // 1、拦截器处理
    ProducerRecord<K, V> interceptedRecord = this.interceptors == null ? record : this.interceptors.onSend(record);
    return doSend(interceptedRecord, callback);
}

private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) {
    TopicPartition tp = null;
    try {
        // 2、从服务端获取集群元数据并缓存
        ClusterAndWaitTime clusterAndWaitTime = waitOnMetadata(record.topic(), record.partition(), maxBlockTimeMs);
        long remainingWaitMs = Math.max(0, maxBlockTimeMs - clusterAndWaitTime.waitedOnMetadataMs);
        Cluster cluster = clusterAndWaitTime.cluster;

        // 3、序列化key
        byte[] serializedKey = keySerializer.serialize(record.topic(), record.headers(), record.key());
  
        // 4、序列化value
        byte[] serializedValue = valueSerializer.serialize(record.topic(), record.headers(), record.value());

        // 5、获取分区号(如果指定了分区,则使用指定分区,否则用算法计算)
        int partition = partition(record, serializedKey, serializedValue, cluster);
        tp = new TopicPartition(record.topic(), partition);
        ...

        // 6、向消息累加器中追加消息
        RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey,serializedValue, headers, interceptCallback, remainingWaitMs);
  
        // 7、批次已满或新的批次被创建,则唤醒send线程来发送数据
        if (result.batchIsFull || result.newBatchCreated) {
            this.sender.wakeup();
        }
        return result.future;
    } catch (xxxException) {
       ...
    } 
}

3.4、拦截器

作用

  1. 在消息发送前,增加自定义的逻辑(过滤掉不符合要求的消息,修改消息等)。
  2. 在结果返回后,增加自定义的逻辑。
  3. 多个拦截器可以组成拦截器链,按配置的顺序执行。

自定义拦截器

1、实现 ProducerInterceptor 接口

public class MyProducerInterceptor implements ProducerInterceptor<String, String> {


    /**
     * 在消息发送前调用
     *
     * @return
     */
    @Override
    public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
        System.out.println("消息发送前,value增加前缀---");
        String newValue = "p1-" + record.value();
        return new ProducerRecord<>(record.topic(), record.partition(),
                record.timestamp(), record.key(), newValue, record.headers());
    }

    /**
     * 在结果返回后调用
     *
     * @return
     */
    @Override
    public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
        System.out.println("结果返回后---");
        System.out.printf("offset = %d, key = %s, partition = %s, value = %s\n", metadata.offset(), metadata.topic(), metadata.partition());
    }

    /**
     * 拦截器关闭时执行资源清理工作
     */
    @Override
    public void close() {

    }

  
    @Override
    public void configure(Map<String, ?> configs) {

    }
}

2、配置拦截器

props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, MyProducerInterceptor.class.getName());

3.5、获取集群元数据

Producer将消息追加到指定主题的某个分区对应的leader副本之前,需要知道主题的分区数量,然后经过计算(或指定)得出目标分区,之后与目标分区的leader所在的broker建立连接,发送消息,这个过程中需要的信息都属于元数据信息。

3.5.1、元数据结构

主要有以下几个类:

说明
Metadata元数据
Cluster集群
TopicPartition主题分区
PartitionInfo分区详情
Node节点详情

数据结构如下:

// 元数据
public final class Metadata {
    public static final long TOPIC_EXPIRY_MS = 5 * 60 * 1000;
    private static final long TOPIC_EXPIRY_NEEDS_UPDATE = -1L;
    // 集群信息
    private Cluster cluster;
    // metadata更新失败时,为避免频繁更新,最小的间隔时间(retry.backoff.ms: 默认值为100ms)
    private final long refreshBackoffMs;
    // metadata过期时间 (metadata.max.age.ms: 默认值为5 * 60 * 1000 ms)
    private final long metadataExpireMs;
    // 每更新成功1次,自增1,用于判断 metadata 是否更新
    private int version;
    // 上一次更新的时间
    private long lastRefreshMs;
    // 上一次更新成功的时间
    private long lastSuccessfulRefreshMs;
    // 是否需要更新
    private boolean needUpdate;
    // Topic --> 过期时间
    private final Map<String, Long> topics;
    // 事件监听器
    private final List<Listener> listeners;
    private final ClusterResourceListeners clusterResourceListeners;
    private boolean needMetadataForAllTopics;
    private final boolean allowAutoTopicCreation;
    // 是否定期移除过期的 topic ,默认为 true
    private final boolean topicExpiryEnabled;
}

// 集群
public final class Cluster {
    // Node集合
    private final List<Node> nodes;
    // Node集合
    private final Set<String> unauthorizedTopics;
    // 内置的 topic 列表
    private final Set<String> internalTopics;
    // 分区--> 分区详情
    private final Map<TopicPartition, PartitionInfo> partitionsByTopicPartition;
    // 主题--> 分区列表
    private final Map<String, List<PartitionInfo>> partitionsByTopic;
    // 主题--> 可用的分区列表
    private final Map<String, List<PartitionInfo>> availablePartitionsByTopic;
    // Node--> 分区列表
    private final Map<Integer, List<PartitionInfo>> partitionsByNode;
    // nodeId --> Node
    private final Map<Integer, Node> nodesById;
    // Controller所在的节点
    private final Node controller;
    private final ClusterResource clusterResource;
    private final boolean isBootstrapConfigured;
}

// 主题分区
public final class TopicPartition {
    // 主题 
    private final String topic;
    // 分区
    private final int partition;
}

// 分区详情
public class PartitionInfo {
    // 主题 
    private final String topic;
    // 分区
    private final int partition;
    // leader
    private final Node leader;
    // ISR列表
    private final Node[] inSyncReplicas;
    // AR列表
    private final Node[] replicas;
}

// 节点详情
public class Node {
    // 节点id
    private final int id;
    private final String idString;
    // 主机名或IP地址
    private final String host;
    // 端口
    private final int port;
    // 机架
    private final String rack;
}
3.5.2、流程

流程如下:

  1. 将topic加入缓存中(topic-->过期时间),若缓存中没有topic,则置lastRefreshMs=0,needUpdate=true
  2. 先从缓存中查询元数据,若有,则直接返回
  3. while循环等待metadata更新,直到成功或超时(max.block.ms,默认:60 * 1000))
    3.1. 将needUpdate置为true
    3.2. 唤醒sender线程,发送updateMetadataRequest请求,更新metadata
    3.3. 主线程阻塞等待(直到Sender线程更新metadata成功,或者超时)
3.5.3、源码
private ClusterAndWaitTime waitOnMetadata(String topic, Integer partition, long maxWaitMs) throws InterruptedException {
    // 1、将topic加入缓存中(topic-->过期时间),若缓存中没有topic,则置lastRefreshMs=0,needUpdate=true
    metadata.add(topic);

    // 2、先从缓存中查询元数据,若有,则直接返回
    Cluster cluster = metadata.fetch();
    Integer partitionsCount = cluster.partitionCountForTopic(topic);
    if (partitionsCount != null && (partition == null || partition < partitionsCount))
        return new ClusterAndWaitTime(cluster, 0);

    long begin = time.milliseconds();
    long remainingWaitMs = maxWaitMs;
    long elapsed;

    // 3、while循环等待metadata更新,直到成功或者超过最大等待时间(maxWaitMs(max.block.ms,默认:60 * 1000))
    do {
        metadata.add(topic);
        // 3.1、将needUpdate置为true
        int version = metadata.requestUpdate();
        // 3.2、唤醒sender线程,发送updateMetadataRequest请求,更新metadata
        sender.wakeup();
        try {
            // 3.3、主线程阻塞等待(直到Sender线程更新metadata成功,或者超时)
            metadata.awaitUpdate(version, remainingWaitMs);
        } catch (TimeoutException ex) {
            // 超时抛出异常
            throw new TimeoutException("Failed to update metadata after " + maxWaitMs + " ms.");
        }
        cluster = metadata.fetch();
        elapsed = time.milliseconds() - begin;
        // 超时抛出异常
        if (elapsed >= maxWaitMs)
            throw new TimeoutException("Failed to update metadata after " + maxWaitMs + " ms.");
        if (cluster.unauthorizedTopics().contains(topic))
            throw new TopicAuthorizationException(topic);
  
        remainingWaitMs = maxWaitMs - elapsed;
        // 从缓存中查询元数据,若没有,则继续不断循环
        partitionsCount = cluster.partitionCountForTopic(topic);
    } while (partitionsCount == null);

    if (partition != null && partition >= partitionsCount) {
        throw new KafkaException(
                String.format("Invalid partition given with record: %d is not in the range [0...%d).", partition, partitionsCount));
    }
    return new ClusterAndWaitTime(cluster, elapsed);
}


 
public synchronized void awaitUpdate(final int lastVersion, final long maxWaitMs) throws InterruptedException {
    if (maxWaitMs < 0) {
        throw new IllegalArgumentException("Max time to wait for metadata updates should not be < 0 milli seconds");
    }
    long begin = System.currentTimeMillis();
    long remainingWaitMs = maxWaitMs;
    // meatadata更新成功之后,version加1。否则一直循环wait
    while (this.version <= lastVersion) {
        if (remainingWaitMs != 0)
            // 阻塞等待(线程的wait机制)
            wait(remainingWaitMs);
        long elapsed = System.currentTimeMillis() - begin;
        // 超时抛出异常
        if (elapsed >= maxWaitMs)
            throw new TimeoutException("Failed to update metadata after " + maxWaitMs + " ms.");
        remainingWaitMs = maxWaitMs - elapsed;
    }
}

metadata更新是在Sender线程中完成的, 最终通过NetworkClient用NIO的方式发送请求。逻辑如下:

public List<ClientResponse> poll(long timeout, long now) {
    ...
    // 判断是否要更新metadata
    long metadataTimeout = metadataUpdater.maybeUpdate(now);
  
    try {
        this.selector.poll(Utils.min(timeout, metadataTimeout, requestTimeoutMs));
    } catch (IOException e) {
        log.error("Unexpected error during I/O", e);
    }

    // process completed actions
    long updatedNow = this.time.milliseconds();
    List<ClientResponse> responses = new ArrayList<>();
    handleCompletedSends(responses, updatedNow);
    handleCompletedReceives(responses, updatedNow);
    handleDisconnections(responses, updatedNow);
    handleConnections();
    handleInitiateApiVersionRequests(updatedNow);
    handleTimedOutRequests(responses, updatedNow);
    completeResponses(responses);

    return responses;
}


public long maybeUpdate(long now) {
    // 计算下一次更新元数据的时间(若是立即更新,则为0)
    long timeToNextMetadataUpdate = metadata.timeToNextUpdate(now);
    long waitForMetadataFetch = this.metadataFetchInProgress ? requestTimeoutMs : 0;
    long metadataTimeout = Math.max(timeToNextMetadataUpdate, waitForMetadataFetch);

    // 若时间未到,直接返回
    if (metadataTimeout > 0) {
        return metadataTimeout;
    }
    // 选择一个负载最小的节点
    Node node = leastLoadedNode(now);
    if (node == null) {
        return reconnectBackoffMs;
    }
    // 可以发送 metadata 请求的话,就发送 metadata 请求
    return maybeUpdate(now, node);
}

// 元数据是否要更新(立即更新、过期更新)
public synchronized long timeToNextUpdate(long nowMs) {
    // 若needUpdate标志为true,则立即更新
    // 若needUpdate标志为false,则看元数据是否过期(元数据已经超过metadata.max.age.ms时间没有更新,默认5分钟)
    long timeToExpire = needUpdate ? 0 : Math.max(this.lastSuccessfulRefreshMs + this.metadataExpireMs - nowMs, 0);
    // 是否需要更新(metadata更新失败时,为避免频繁更新,最小的间隔时间 retry.backoff.ms,默认值为100ms))
    long timeToAllowUpdate = this.lastRefreshMs + this.refreshBackoffMs - nowMs;
    return Math.max(timeToExpire, timeToAllowUpdate);
}


private long maybeUpdate(long now, Node node) {
    String nodeConnectionId = node.idString();
    if (canSendRequest(nodeConnectionId)) {
        this.metadataFetchInProgress = true;
        // 创建 metadata 请求
        MetadataRequest.Builder metadataRequest;
        if (metadata.needMetadataForAllTopics())
            metadataRequest = MetadataRequest.Builder.allTopics();
        else
            metadataRequest = new MetadataRequest.Builder(new ArrayList<>(metadata.topics()),metadata.allowAutoTopicCreation());
        // 发送 metadata 请求
        sendInternalMetadataRequest(metadataRequest, nodeConnectionId, now);
        return requestTimeoutMs;
    }
    ...
    return Long.MAX_VALUE;
}

private void sendInternalMetadataRequest(MetadataRequest.Builder builder,
                                         String nodeConnectionId, long now) {
    ClientRequest clientRequest = newClientRequest(nodeConnectionId, builder, now, true);
    doSend(clientRequest, true, now);
}


private void handleCompletedReceives(List<ClientResponse> responses, long now) {
    for (NetworkReceive receive : this.selector.completedReceives()) {
        String source = receive.source();
        InFlightRequest req = inFlightRequests.completeNext(source);
        Struct responseStruct = parseStructMaybeUpdateThrottleTimeMetrics(receive.payload(), req.header,
            throttleTimeSensor, now);
  
        AbstractResponse body = createResponse(responseStruct, req.header);
        if (req.isInternalRequest && body instanceof MetadataResponse)
            metadataUpdater.handleCompletedMetadataResponse(req.header, now, (MetadataResponse) body);
        else if (req.isInternalRequest && body instanceof ApiVersionsResponse)
            handleApiVersionsResponse(responses, req, now, (ApiVersionsResponse) body);
        else
            responses.add(req.completed(body, now));
    }
}


public void handleCompletedMetadataResponse(RequestHeader requestHeader, long now, MetadataResponse response) {
    this.metadataFetchInProgress = false;
    Cluster cluster = response.cluster();
    ...
    if (cluster.nodes().size() > 0) {
        this.metadata.update(cluster, response.unavailableTopics(), now);
    } else {
        this.metadata.failedUpdate(now);
    }
}


public synchronized void update(Cluster cluster, Set<String> unavailableTopics, long now) {
    Objects.requireNonNull(cluster, "cluster should not be null");

    this.needUpdate = false;
    this.lastRefreshMs = now;
    this.lastSuccessfulRefreshMs = now;
    this.version += 1;

    if (topicExpiryEnabled) {
        // Handle expiry of topics from the metadata refresh set.
        for (Iterator<Map.Entry<String, Long>> it = topics.entrySet().iterator(); it.hasNext(); ) {
            Map.Entry<String, Long> entry = it.next();
            long expireMs = entry.getValue();
            if (expireMs == TOPIC_EXPIRY_NEEDS_UPDATE)
                entry.setValue(now + TOPIC_EXPIRY_MS);
            else if (expireMs <= now) {
                it.remove();
                log.debug("Removing unused topic {} from the metadata list, expiryMs {} now {}", entry.getKey(), expireMs, now);
            }
        }
    }

    for (Listener listener: listeners)
        listener.onMetadataUpdate(cluster, unavailableTopics);

    String previousClusterId = cluster.clusterResource().clusterId();

    if (this.needMetadataForAllTopics) {
        // the listener may change the interested topics, which could cause another metadata refresh.
        // If we have already fetched all topics, however, another fetch should be unnecessary.
        this.needUpdate = false;
        this.cluster = getClusterForCurrentTopics(cluster);
    } else {
        this.cluster = cluster;
    }

    // The bootstrap cluster is guaranteed not to have any useful information
    if (!cluster.isBootstrapConfigured()) {
        String clusterId = cluster.clusterResource().clusterId();
        if (clusterId == null ? previousClusterId != null : !clusterId.equals(previousClusterId))
            log.info("Cluster ID: {}", cluster.clusterResource().clusterId());
        clusterResourceListeners.onUpdate(cluster.clusterResource());
    }

    notifyAll();
    log.debug("Updated cluster metadata version {} to {}", this.version, this.cluster);
}

3.6、序列化

3.6.1、简介

生产者需要先用序列化器把对象转换成字节数组,再通过网络发送,然后消费者用反序列化器把收到的字节数组转换成相应的对象。

kafka提供的序列化器主要有:StringSerializer、ByteArraySerializer、 ByteBufferSerializer、 BytesSerializer、 DoubleSerializer、 IntegerSerializer 、 LongSerializer。

3.6.2、源码

序列化源码如下:

// 序列化key
byte[] serializedKey = keySerializer.serialize(record.topic(), record.headers(), record.key());
// 序列化value
byte[] serializedValue = valueSerializer.serialize(record.topic(), record.headers(), record.value());


// 如果指定用StringSerializer,则序列化逻辑如下:
public byte[] serialize(String topic, String data) {
    ...
    if (data == null)
        return null;
    else
        return data.getBytes("UTF8");
    ...
}
3.6.3、自定义序列化器

1、实现 Serializer 接口

public class UserSerializer implements Serializer<User> {

    @Override
    public byte[] serialize(String topic, User user) {
        if (user == null) {
            return null;
        }
        try {
            byte[] name = user.getName() != null ? user.getName().getBytes("UTF-8") : new byte[0];
            byte[] city = user.getCity() != null ? user.getCity().getBytes("UTF-8") : new byte[0];
            ByteBuffer byteBuffer = ByteBuffer.allocate(4 + name.length + 4 + city.length);
            byteBuffer.putInt(name.length);
            byteBuffer.put(name);
            byteBuffer.putInt(city.length);
            byteBuffer.put(city);
            return byteBuffer.array();
        } catch (UnsupportedEncodingException e) {
            e.printStackTrace();
        }
        return new byte[0];
    }

    public void close() {

    }

    @Override
    public void configure(Map<String, ?> configs, boolean isKey) {

    }
}
@Data
public class User {
    private String name;
    private String city;
}

2、配置序列化器

props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, UserSerializer.class.getName());

3.7、分区器

3.7.1、规则

消息经过序列化后,需要确定要发送到哪个分区,分区选择规则:

  1. 如果指定了分区号,则使用指定的分区
  2. 如果未指定分区但有key,则对key进行哈希(MurmurHash2算法,具有高运算性能和低碰撞率),根据哈希值计算分区号,有相同key的消息会被写入同一个分区
  3. 如果不存在分区和key, 则第一次时产生一个随机数,后面每次自增1,从可用的分区中选择一个分区
3.7.2、源码
private int partition(ProducerRecord<K, V> record, byte[] serializedKey, byte[] serializedValue, Cluster cluster) {
    Integer partition = record.partition();
    // 如果指定了分区,则使用指定分区,否则用算法计算
    return partition != null ?
    partition :
    partitioner.partition(
        record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster);
}

默认的分区器:

public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
    List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
    int numPartitions = partitions.size();
     // 1、若没有指定key
    if (keyBytes == null) {
        // 1.1、第一次时产生一个随机数,后面每次自增
        int nextValue = nextValue(topic);
        // 1.2、查询主题所有可用的分区 
        List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
        // 1.3、从可用的分区中选择一个分区
        if (availablePartitions.size() > 0) {
            int part = Utils.toPositive(nextValue) % availablePartitions.size();
            return availablePartitions.get(part).partition();
        } else {
            return Utils.toPositive(nextValue) % numPartitions;
        }
    } else {
     // 2、若指定了key,对key进行哈希(MurmurHash2算法,具有高运算性能和低碰撞率),根据哈希值计算分区号,有相同key的消息会被写入同一个分区
        return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
    }
}

private int nextValue(String topic) {
    AtomicInteger counter = topicCounterMap.get(topic);
    if (null == counter) {
        // 第一次时产生一个随机数
        counter = new AtomicInteger(ThreadLocalRandom.current().nextInt());
        AtomicInteger currentCounter = topicCounterMap.putIfAbsent(topic, counter);
        if (currentCounter != null) {
            counter = currentCounter;
        }
    }
    // 后面每次自增1
    return counter.getAndIncrement();
}

3.8、消息累加器

描述

消息累加器主要用于缓存消息,以便批量发送,可以减少网络传输消耗提升性能。如果生产者需要向很多分区发送消息,则可以调大buffer.memory提升整体吞吐量。

结构

  • 消息累加器内部会为每个分区维护一个双端队列(Deque),队列中的内容就是ProducerBatch。
    ConcurrentMap<TopicPartition, Deque<ProducerBatch>> batches
    
  • 主线程发送过来的消息会被放入某个双端队列的尾部,Sender线程读取消息时,从队列的头部读取。
  • ProducerBatch是一个消息批次(大小由batch.size参数配置),里面包含了一个或多个消息。

参数

可以通过参数配置大小:buffer.memory,默认33554432(32MB)。

3.8.1、数据结构

RecordAccumulator
// 消息累加器内部会为每个分区维护一个双端队列
private final ConcurrentMap<TopicPartition, Deque<ProducerBatch>> batches;
// 缓存池
private final BufferPool free;
// 批次大小(batch.size,默认16K)
private final int batchSize; 
// 延迟时间(linger.ms,默认0)
private final int lingerMs;
// 重试时间(retry.backoff.ms,默认100)
private final long retryBackoffMs;
// 压缩类型(compression.type,默认none)
private final CompressionType compression; 
// 是否关闭
private volatile boolean closed;
private final AtomicInteger flushesInProgress;
private final AtomicInteger appendsInProgress;
private final Time time;
private final ApiVersions apiVersions;
private final IncompleteBatches incomplete; 
private final Set<TopicPartition> muted;
private int drainIndex;
private final TransactionManager transactionManager;
ProducerBatch

描述

ProducerBatch是一个消息批次(batch),里面包含了一个或多个消息。

当消息留转到RecordAccumulator时,先寻找对应分区的双端队列,在从队列尾部获取一个ProducerBatch,查看剩余空间,若还能写入则写入,否则创建一个新的ProducerBatch,在创建的时候,判断消息的大小是否超过batch.size,若不超过,则以batch.size大小创建ProducerBatch,这样这个ProductBatch以后可以通过BufferPool来复用; 若超过batch.size大小,创建的ProducerBatch不会被复用。

参数

大小可通过batch.size参数配置。

源码

final TopicPartition topicPartition;
int recordCount;
int maxRecordSize;
final ProduceRequestResult produceFuture;
private final List<Thunk> thunks = new ArrayList<>();
private final MemoryRecordsBuilder recordsBuilder;
private final AtomicInteger attempts = new AtomicInteger(0);
private final boolean isSplitBatch;
private final AtomicReference<FinalState> finalState = new AtomicReference<>(null);
final long createdMs;
private long lastAttemptMs;
private long lastAppendTime;
private long drainedMs;
private String expiryErrorMessage;
private boolean retry;
BufferPool

描述

在Kafka客户端,通过ByteBuffer实现消息内存的创建和释放。频繁的创建和释放很耗费资源,所以在RecordAccumulator内部维护了一个BufferPool,用于实现ByteBuffer的复用。不过BufferPool只针对特定大小(batch.size参数指定)的ByteBuffer进行管理,其他大小的ByteBuffer不会进入BufferPool,默认16384(16KB)。

源码

public class BufferPool {
    // BufferPool的总大小
    private final long totalMemory;
    // 未分配的空间大小
    private long availableMemory;
    // 一个双端队列,里面缓存了指定大小的ByteBuffer对象
    private final Deque<ByteBuffer> free;
    // 可池化大小(默认16K)
    private final int poolableSize;
    private final ReentrantLock lock;
    private final Deque<Condition> waiters;
    private final Metrics metrics;
    private final Time time;
    private final Sensor waitTime;
}

3.8.2、流程

消息加入消息累加器的流程如下:

  1. 获取分区对应的双端队列,没有则创建
  2. 向分区对应的双端队列添加消息
    2.1. 获取队列的最后一个批次,如果为空,返回 null
    2.2. 否则在最后一个批次添加
     2.2.1. 检查当前批次有没有足够的空间来添加消息,不能添加返回null
     2.2.2. 能添加,则在当前批次添加
  3. 如果以上添加消息成功,直接返回,否则继续后面的流程
  4. 分配一块空间来存储消息(不大于16K则按16K分配,大于16K按指定大小分配)
    4.1. 若申请的空间大小超过了总的内存大小(默认32M),则抛出异常
    4.2. 若申请的空间大小为16k(可配置),并且缓存池不为空,则直接从缓存池中取
    4.3. 若申请的空间大于16k,则需要分配一个新的空间
    4.4. 判断可用空间(未分配的空间+缓存池)是否大于申请空间
     4.4.1. 若是,则分配指定大小的空间,当未分配的空间不充足且缓存池不为空时,循环释放缓存池,直到未分配的空间足够大为止
     4.4.2. 若否,阻塞等待
  5. 创建一个新的批次
  6. 将消息添加到新的批次中
  7. 将新的批次加入分区对应的双端队列中

3.8.3、源码

消息加入消息累加器的源码如下:

public RecordAppendResult append(TopicPartition tp,
                                 long timestamp,
                                 byte[] key,
                                 byte[] value,
                                 Header[] headers,
                                 Callback callback,
                                 long maxTimeToBlock,
                                 boolean abortOnNewBatch,
                                 long nowMs) throws InterruptedException {
    ByteBuffer buffer = null;
    try {
        // 1、获取分区对应的双端队列,没有则创建
        Deque<ProducerBatch> dq = getOrCreateDeque(tp);
        // 在对queue进行操作时,加锁,保证线程安全
        synchronized (dq) {
            // 2、向分区对应的双端队列添加消息
            RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq);
            // 3、如果添加消息成功,直接返回
            if (appendResult != null)
                return appendResult;
        }
        ... 
        int size = Math.max(this.batchSize, AbstractRecords.estimateSizeInBytesUpperBound(maxUsableMagic, compression, key, value, headers));
        // 4、分配一块空间来存储消息(不大于16K则按16K分配,大于16K按指定大小分配)
        buffer = free.allocate(size, maxTimeToBlock);
        synchronized (dq) {
            RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq, nowMs);
            if (appendResult != null) {
                return appendResult;
            }

            MemoryRecordsBuilder recordsBuilder = recordsBuilder(buffer, maxUsableMagic);
            // 5、创建一个新的批次
            ProducerBatch batch = new ProducerBatch(tp, recordsBuilder, time.milliseconds());
            // 6、将消息添加到新的批次中
            FutureRecordMetadata future = Utils.notNull(batch.tryAppend(timestamp, key, value, headers, callback, time.milliseconds()));
            // 7、将新的批次放入分区对应的双端队列中
            dq.addLast(batch);
            incomplete.add(batch);

            buffer = null;
            return new RecordAppendResult(future, dq.size() > 1 || batch.isFull(), true);
        }
    } finally {
        if (buffer != null)
            free.deallocate(buffer);
        appendsInProgress.decrementAndGet();
    }
}


private RecordAppendResult tryAppend(long timestamp, byte[] key, byte[] value, Header[] headers,
                                     Callback callback, Deque<ProducerBatch> deque, long nowMs) {
    // 2.1、获取队列的最后一个批次,如果为空,返回 null
    ProducerBatch last = deque.peekLast();
    if (last != null) {
        // 2.2、否则在最后一个批次添加
        FutureRecordMetadata future = last.tryAppend(timestamp, key, value, headers, callback, time.milliseconds());
        if (future == null)
            last.closeForRecordAppends();
        else
            return new RecordAppendResult(future, deque.size() > 1 || last.isFull(), false);
    }
    return null;
}


public FutureRecordMetadata tryAppend(long timestamp, byte[] key, byte[] value, Header[] headers, Callback callback, long now) {
    // 2.2.1、检查当前批次有没有足够的空间来添加消息,不能添加返回null
    if (!recordsBuilder.hasRoomFor(timestamp, key, value, headers)) {
        return null;
    } else {
        // 2.2.2、能添加,则在当前批次添加
        Long checksum = this.recordsBuilder.append(timestamp, key, value, headers);
        this.maxRecordSize = Math.max(this.maxRecordSize, AbstractRecords.estimateSizeInBytesUpperBound(magic(),
                recordsBuilder.compressionType(), key, value, headers));
        this.lastAppendTime = now;
        FutureRecordMetadata future = new FutureRecordMetadata(this.produceFuture, this.recordCount,
                                                               timestamp, checksum,
                                                               key == null ? -1 : key.length,
                                                               value == null ? -1 : value.length);
        thunks.add(new Thunk(callback, future));
        this.recordCount++;
        return future;
    }
}

申请空间源码如下:

public ByteBuffer allocate(int size, long maxTimeToBlockMs) throws InterruptedException {
    // 1、若申请的空间大小超过了总的内存大小(默认32M),则抛出异常
    if (size > this.totalMemory)
        throw new IllegalArgumentException("Attempt to allocate " + size
                                           + " bytes, but there is a hard limit of "
                                           + this.totalMemory
                                           + " on memory allocations.");

    this.lock.lock();
    try {
        // 2、若申请的空间大小为16k(可配置),并且缓存池不为空,则直接从缓存池中取
        if (size == poolableSize && !this.free.isEmpty())
            return this.free.pollFirst();
  
        // 3、若申请的空间大于16k,则分配一个新的空间
        // 缓存池的大小
        int freeListSize = freeSize() * this.poolableSize;
        // 若总内存(缓存池的大小+可用内存大小)大小 >= 要申请的内存大小
  
        // 4、判断可用空间(未分配的空间+缓存池)是否大于申请空间
        if (this.availableMemory + freeListSize >= size) {
            // 4.1、若是,则分配指定大小的空间,当未分配的空间不充足且缓存池不为空时,循环释放缓存池,直到未分配的空间足够大为止
            freeUp(size);
            ByteBuffer allocatedBuffer = allocateByteBuffer(size);
            this.availableMemory -= size;
            return allocatedBuffer;
        } else {
             // 4.2、若否,阻塞等待
            int accumulated = 0;
            ByteBuffer buffer = null;
            boolean hasError = true;
            Condition moreMemory = this.lock.newCondition();
            try {
                long remainingTimeToBlockNs = TimeUnit.MILLISECONDS.toNanos(maxTimeToBlockMs);
                this.waiters.addLast(moreMemory);
                while (accumulated < size) {
                    long startWaitNs = time.nanoseconds();
                    long timeNs;
                    boolean waitingTimeElapsed;
                    try {
                        waitingTimeElapsed = !moreMemory.await(remainingTimeToBlockNs, TimeUnit.NANOSECONDS);
                    } finally {
                        long endWaitNs = time.nanoseconds();
                        timeNs = Math.max(0L, endWaitNs - startWaitNs);
                        this.waitTime.record(timeNs, time.milliseconds());
                    }
                    if (waitingTimeElapsed) {
                        throw new TimeoutException("Failed to allocate memory within the configured max blocking time " + maxTimeToBlockMs + " ms.");
                    }
                    remainingTimeToBlockNs -= timeNs;
                    if (accumulated == 0 && size == this.poolableSize && !this.free.isEmpty()) {
                        buffer = this.free.pollFirst();
                        accumulated = size;
                    } else {
                        freeUp(size - accumulated);
                        int got = (int) Math.min(size - accumulated, this.availableMemory);
                        this.availableMemory -= got;
                        accumulated += got;
                    }
                }
                if (buffer == null)
                    buffer = allocateByteBuffer(size);
                hasError = false;
                return buffer;
            } finally {
                if (hasError)
                    this.availableMemory += accumulated;
                this.waiters.remove(moreMemory);
            }
        }
    } finally {
        try {
            if (!(this.availableMemory == 0 && this.free.isEmpty()) && !this.waiters.isEmpty())
                this.waiters.peekFirst().signal();
        } finally {
            lock.unlock();
        }
    }
}


private void freeUp(int size) {
   // 缓存池不为空且未分配的空间不充足时,则循环释放缓存池的空间,直到未分配的空间足够大为止
    while (!this.free.isEmpty() && this.availableMemory < size)
        this.availableMemory += this.free.pollLast().capacity();
}

释放空间源码如下:

public void deallocate(ByteBuffer buffer, int size) {
    lock.lock();
    try {
        // 如果释放的ByteBuffer大小是16K,放入缓存池以重复使用
        if (size == this.poolableSize && size == buffer.capacity()) {
            buffer.clear();
            this.free.add(buffer);
        } else {
            // 否则,GC回收
            this.availableMemory += size;
        }
        // 唤醒阻塞等待空间分配的线程
        Condition moreMem = this.waiters.peekFirst();
        if (moreMem != null)
            moreMem.signal();
    } finally {
        lock.unlock();
    }
}

3.9、Sender

当生产者向 Kafka 发送消息时,它会先将消息写入消息累加器,并等待一段时间以将多个消息批量发送给 Broker。linger.ms 参数就是用来控制这个等待时间的。如果 linger.ms 设置为 0,那么生产者会立即将单个消息发送给 Broker,不进行任何批量操作。如果 linger.ms 设置为大于 0 的值,那么生产者会定期检查缓冲区中是否已经达到了 batch.size(批量大小)或者 linger.ms 时间,如果满足其中一个条件,则会唤醒sender线程发送消息。

3.8.1、数据结构

private final KafkaClient client;

// 消息累加器
private final RecordAccumulator accumulator;

// 集群元数据
private final Metadata metadata;

// 单个请求的最大字节大小,默认值是 1M
private final int maxRequestSize;

/**
 * 生产者发送消息后,要求 Kafka 服务器返回的确认(acknowledgment)级别,默认值是 1
 * 参数值如下:
 * 1) ack=1, producer 只要收到 leader 副本写入成功的响应就认为推送成功了。
 * 2)ack=0,producer 发送请求了就认为推送成功,不管实际是否推送成功。
 * 3)ack=-1,producer 只有收到 partition 内所有副本写入成功通知才认为推送消息成功了。
 */
private final short acks;

// 生产者发送失败后的重试次数。默认是 0 次
private final int retries;

/* the clock instance used for getting the time */
private final Time time;

// 线程是否在运行中
private volatile boolean running;

/**
 * 生产者发送请求后等待broker响应的最大时间。
 * 过了最大响应时间如果配置了重试,生产者会再次发送这个请求。
 * 重试次数用完仍然请求超时, 则认为是请求失败
 * 默认值 30000 ms(即 30 秒)。
 */
private final int requestTimeoutMs;

/**
 * 请求失败重发的间隔等待时间
 * 生产者发送请求失败后可能会引起重新发送失败的请求,间隔时间目的是防止重发过快造成服务端压力过大
 * 默认是 100 ms
 */
private final long retryBackoffMs;

/**
 * 发送中的请求。
 * key: 分区,value: 消息批次列表
 */
private final Map<TopicPartition, List<ProducerBatch>> inFlightBatches;


......

3.8.2、源码分析

public void run() {
    ......
    while (running) {
       runOnce();
    }
    ......
}

void runOnce() {
    ......
    long currentTimeMs = time.milliseconds();
    long pollTimeout = sendProducerData(currentTimeMs);
    client.poll(pollTimeout, currentTimeMs);
}



private long sendProducerData(long now) {
    // 获取元数据
    Cluster cluster = metadata.fetch();
    // 获取准备好发送数据的分区列表
    RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now);

    // 如果有任何分区的leader是未知的,则强制更新元数据
    if (!result.unknownLeaderTopics.isEmpty()) {
        for (String topic : result.unknownLeaderTopics){
            this.metadata.add(topic, now);
        }
        this.metadata.requestUpdate();
    }

    // 删除尚未就绪的节点
    Iterator<Node> iter = result.readyNodes.iterator();
    long notReadyTimeout = Long.MAX_VALUE;
    while (iter.hasNext()) {
        Node node = iter.next();
        if (!this.client.ready(node, now)) {
            iter.remove();
            notReadyTimeout = Math.min(notReadyTimeout, this.client.pollDelayMs(node, now));
        }
    }
    // 将原先的<分区,Deque<ProducerBatch>>的形式转换成<节点,List<ProducerBatch>>
    Map<Integer, List<ProducerBatch>> batches = this.accumulator.drain(cluster, result.readyNodes, this.maxRequestSize, now);
    addToInflightBatches(batches);
    if (guaranteeMessageOrder) {
        for (List<ProducerBatch> batchList : batches.values()) {
            for (ProducerBatch batch : batchList)
                this.accumulator.mutePartition(batch.topicPartition);
        }
    }
    ......
    // 发送Produce请求
    sendProduceRequests(batches, now);
    return pollTimeout;
}

发送 produce请求源码如下:

private void sendProduceRequests(Map<Integer, List<ProducerBatch>> collated, long now) {
    for (Map.Entry<Integer, List<ProducerBatch>> entry : collated.entrySet())
        sendProduceRequest(now, entry.getKey(), acks, requestTimeoutMs, entry.getValue());
}


private void sendProduceRequest(long now, int destination, short acks, int timeout, List<ProducerBatch> batches) {
    if (batches.isEmpty())
        return;

    final Map<TopicPartition, ProducerBatch> recordsByPartition = new HashMap<>(batches.size());

    // find the minimum magic version used when creating the record sets
    byte minUsedMagic = apiVersions.maxUsableProduceMagic();
    for (ProducerBatch batch : batches) {
        if (batch.magic() < minUsedMagic)
            minUsedMagic = batch.magic();
    }
    ProduceRequestData.TopicProduceDataCollection tpd = new ProduceRequestData.TopicProduceDataCollection();
    for (ProducerBatch batch : batches) {
        TopicPartition tp = batch.topicPartition;
        MemoryRecords records = batch.records();
        if (!records.hasMatchingMagic(minUsedMagic))
            records = batch.records().downConvert(minUsedMagic, 0, time).records();
        ProduceRequestData.TopicProduceData tpData = tpd.find(tp.topic());
        if (tpData == null) {
            tpData = new ProduceRequestData.TopicProduceData().setName(tp.topic());
            tpd.add(tpData);
        }
        tpData.partitionData().add(new ProduceRequestData.PartitionProduceData()
                .setIndex(tp.partition())
                .setRecords(records));
        recordsByPartition.put(tp, batch);
    }

    ......

    ProduceRequest.Builder requestBuilder = ProduceRequest.forMagic(minUsedMagic,
            new ProduceRequestData()
                    .setAcks(acks)  // 设置ack
                    .setTimeoutMs(timeout) // 设置超时时间
                    .setTransactionalId(transactionalId) // 设置transactional id
                    .setTopicData(tpd));  // 设置topic数据
  
    // 回调方法
    RequestCompletionHandler callback = response -> handleProduceResponse(response, recordsByPartition, time.milliseconds());
    // broker节点id
    String nodeId = Integer.toString(destination);
    // 创建Request
    ClientRequest clientRequest = client.newClientRequest(nodeId, requestBuilder, now, acks != 0,
            requestTimeoutMs, callback);
    // 发送数据
    client.send(clientRequest, now);
    log.trace("Sent produce request to {}: {}", nodeId, requestBuilder);
}