1、创建主题
先用kafka脚本创建一个主题:topic_a,分区数:3,副本数:1
bin/kafka-topics.sh --create --topic topic_a --zookeeper localhost:2184 --partitions 3 --replication-factor 1
2、代码示例
创建一个生产者,发送消息,代码如下:
public static void main(String[] args) {
// 1、配置参数
Properties props = new Properties();
props.put("bootstrap.servers", "127.0.0.1:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
String topicName = "topic_a";
// 2、创建生产者
Producer<String, String> producer = new KafkaProducer<>(props);
for (int i = 0; i < 3; i++) {
String value = "hello" + i;
System.out.printf("发送消息:%s\n", "hello" + i);
// 3、创建消息
ProducerRecord<String, String> record = new ProducerRecord<>(topicName, value);
// 4、发送消息
producer.send(record);
}
// 5、关闭生产者
producer.close();
}
3、原理
3.1、架构图
生产者由两个线程协调运行,分别为:主线程和Sender线程。
- 在主线程中,KafkaProducer创建消息,然后依次经过拦截器、序列化器、分区器,最后缓存到消息累加器
- Sender线程,从消息累加器获取消息,发送到kafka集群中
主要流程如下:
- 经过拦截器链处理
- 从服务端获取集群元数据并缓存
- 用序列化器对 key 和 value 序列化
- 获取要发送到的分区号
4.1. 如果指定了分区,则使用指定的分区号
4.2. 否则判断是否有key,有key,则对key进行哈希,根据哈希值计算分区号
4.3. 如果都没有,则第一次时产生一个随机数,后面每次自增1,从可用的分区中选择一个分区 - 向消息累加器对应的分区中追加 record 数据,数据会先进行缓存;
- 若批次已满或新的批次被创建,则唤醒send线程,从消息累加器中取出数据来发送。
3.2、消息数据结构
public class ProducerRecord<K, V> {
// 主题
private final String topic;
// 分区
private final Integer partition;
// 消息头部
private final Headers headers;
// 键
private final K key;
// 值
private final V value;
// 时间戳
private final Long timestamp;
}
3.3、生产者
通过调用KafkaProducer的send方法发送消息,源码如下:
public Future<RecordMetadata> send(ProducerRecord<K, V> record) {
return send(record, null);
}
public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
// 1、拦截器处理
ProducerRecord<K, V> interceptedRecord = this.interceptors == null ? record : this.interceptors.onSend(record);
return doSend(interceptedRecord, callback);
}
private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) {
TopicPartition tp = null;
try {
// 2、从服务端获取集群元数据并缓存
ClusterAndWaitTime clusterAndWaitTime = waitOnMetadata(record.topic(), record.partition(), maxBlockTimeMs);
long remainingWaitMs = Math.max(0, maxBlockTimeMs - clusterAndWaitTime.waitedOnMetadataMs);
Cluster cluster = clusterAndWaitTime.cluster;
// 3、序列化key
byte[] serializedKey = keySerializer.serialize(record.topic(), record.headers(), record.key());
// 4、序列化value
byte[] serializedValue = valueSerializer.serialize(record.topic(), record.headers(), record.value());
// 5、获取分区号(如果指定了分区,则使用指定分区,否则用算法计算)
int partition = partition(record, serializedKey, serializedValue, cluster);
tp = new TopicPartition(record.topic(), partition);
...
// 6、向消息累加器中追加消息
RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey,serializedValue, headers, interceptCallback, remainingWaitMs);
// 7、批次已满或新的批次被创建,则唤醒send线程来发送数据
if (result.batchIsFull || result.newBatchCreated) {
this.sender.wakeup();
}
return result.future;
} catch (xxxException) {
...
}
}
3.4、拦截器
作用
- 在消息发送前,增加自定义的逻辑(过滤掉不符合要求的消息,修改消息等)。
- 在结果返回后,增加自定义的逻辑。
- 多个拦截器可以组成拦截器链,按配置的顺序执行。
自定义拦截器
1、实现 ProducerInterceptor 接口
public class MyProducerInterceptor implements ProducerInterceptor<String, String> {
/**
* 在消息发送前调用
*
* @return
*/
@Override
public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
System.out.println("消息发送前,value增加前缀---");
String newValue = "p1-" + record.value();
return new ProducerRecord<>(record.topic(), record.partition(),
record.timestamp(), record.key(), newValue, record.headers());
}
/**
* 在结果返回后调用
*
* @return
*/
@Override
public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
System.out.println("结果返回后---");
System.out.printf("offset = %d, key = %s, partition = %s, value = %s\n", metadata.offset(), metadata.topic(), metadata.partition());
}
/**
* 拦截器关闭时执行资源清理工作
*/
@Override
public void close() {
}
@Override
public void configure(Map<String, ?> configs) {
}
}
2、配置拦截器
props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, MyProducerInterceptor.class.getName());
3.5、获取集群元数据
Producer将消息追加到指定主题的某个分区对应的leader副本之前,需要知道主题的分区数量,然后经过计算(或指定)得出目标分区,之后与目标分区的leader所在的broker建立连接,发送消息,这个过程中需要的信息都属于元数据信息。
3.5.1、元数据结构
主要有以下几个类:
类 | 说明 |
---|---|
Metadata | 元数据 |
Cluster | 集群 |
TopicPartition | 主题分区 |
PartitionInfo | 分区详情 |
Node | 节点详情 |
数据结构如下:
// 元数据
public final class Metadata {
public static final long TOPIC_EXPIRY_MS = 5 * 60 * 1000;
private static final long TOPIC_EXPIRY_NEEDS_UPDATE = -1L;
// 集群信息
private Cluster cluster;
// metadata更新失败时,为避免频繁更新,最小的间隔时间(retry.backoff.ms: 默认值为100ms)
private final long refreshBackoffMs;
// metadata过期时间 (metadata.max.age.ms: 默认值为5 * 60 * 1000 ms)
private final long metadataExpireMs;
// 每更新成功1次,自增1,用于判断 metadata 是否更新
private int version;
// 上一次更新的时间
private long lastRefreshMs;
// 上一次更新成功的时间
private long lastSuccessfulRefreshMs;
// 是否需要更新
private boolean needUpdate;
// Topic --> 过期时间
private final Map<String, Long> topics;
// 事件监听器
private final List<Listener> listeners;
private final ClusterResourceListeners clusterResourceListeners;
private boolean needMetadataForAllTopics;
private final boolean allowAutoTopicCreation;
// 是否定期移除过期的 topic ,默认为 true
private final boolean topicExpiryEnabled;
}
// 集群
public final class Cluster {
// Node集合
private final List<Node> nodes;
// Node集合
private final Set<String> unauthorizedTopics;
// 内置的 topic 列表
private final Set<String> internalTopics;
// 分区--> 分区详情
private final Map<TopicPartition, PartitionInfo> partitionsByTopicPartition;
// 主题--> 分区列表
private final Map<String, List<PartitionInfo>> partitionsByTopic;
// 主题--> 可用的分区列表
private final Map<String, List<PartitionInfo>> availablePartitionsByTopic;
// Node--> 分区列表
private final Map<Integer, List<PartitionInfo>> partitionsByNode;
// nodeId --> Node
private final Map<Integer, Node> nodesById;
// Controller所在的节点
private final Node controller;
private final ClusterResource clusterResource;
private final boolean isBootstrapConfigured;
}
// 主题分区
public final class TopicPartition {
// 主题
private final String topic;
// 分区
private final int partition;
}
// 分区详情
public class PartitionInfo {
// 主题
private final String topic;
// 分区
private final int partition;
// leader
private final Node leader;
// ISR列表
private final Node[] inSyncReplicas;
// AR列表
private final Node[] replicas;
}
// 节点详情
public class Node {
// 节点id
private final int id;
private final String idString;
// 主机名或IP地址
private final String host;
// 端口
private final int port;
// 机架
private final String rack;
}
3.5.2、流程
流程如下:
- 将topic加入缓存中(topic-->过期时间),若缓存中没有topic,则置lastRefreshMs=0,needUpdate=true
- 先从缓存中查询元数据,若有,则直接返回
- while循环等待metadata更新,直到成功或超时(max.block.ms,默认:60 * 1000))
3.1. 将needUpdate置为true
3.2. 唤醒sender线程,发送updateMetadataRequest请求,更新metadata
3.3. 主线程阻塞等待(直到Sender线程更新metadata成功,或者超时)
3.5.3、源码
private ClusterAndWaitTime waitOnMetadata(String topic, Integer partition, long maxWaitMs) throws InterruptedException {
// 1、将topic加入缓存中(topic-->过期时间),若缓存中没有topic,则置lastRefreshMs=0,needUpdate=true
metadata.add(topic);
// 2、先从缓存中查询元数据,若有,则直接返回
Cluster cluster = metadata.fetch();
Integer partitionsCount = cluster.partitionCountForTopic(topic);
if (partitionsCount != null && (partition == null || partition < partitionsCount))
return new ClusterAndWaitTime(cluster, 0);
long begin = time.milliseconds();
long remainingWaitMs = maxWaitMs;
long elapsed;
// 3、while循环等待metadata更新,直到成功或者超过最大等待时间(maxWaitMs(max.block.ms,默认:60 * 1000))
do {
metadata.add(topic);
// 3.1、将needUpdate置为true
int version = metadata.requestUpdate();
// 3.2、唤醒sender线程,发送updateMetadataRequest请求,更新metadata
sender.wakeup();
try {
// 3.3、主线程阻塞等待(直到Sender线程更新metadata成功,或者超时)
metadata.awaitUpdate(version, remainingWaitMs);
} catch (TimeoutException ex) {
// 超时抛出异常
throw new TimeoutException("Failed to update metadata after " + maxWaitMs + " ms.");
}
cluster = metadata.fetch();
elapsed = time.milliseconds() - begin;
// 超时抛出异常
if (elapsed >= maxWaitMs)
throw new TimeoutException("Failed to update metadata after " + maxWaitMs + " ms.");
if (cluster.unauthorizedTopics().contains(topic))
throw new TopicAuthorizationException(topic);
remainingWaitMs = maxWaitMs - elapsed;
// 从缓存中查询元数据,若没有,则继续不断循环
partitionsCount = cluster.partitionCountForTopic(topic);
} while (partitionsCount == null);
if (partition != null && partition >= partitionsCount) {
throw new KafkaException(
String.format("Invalid partition given with record: %d is not in the range [0...%d).", partition, partitionsCount));
}
return new ClusterAndWaitTime(cluster, elapsed);
}
public synchronized void awaitUpdate(final int lastVersion, final long maxWaitMs) throws InterruptedException {
if (maxWaitMs < 0) {
throw new IllegalArgumentException("Max time to wait for metadata updates should not be < 0 milli seconds");
}
long begin = System.currentTimeMillis();
long remainingWaitMs = maxWaitMs;
// meatadata更新成功之后,version加1。否则一直循环wait
while (this.version <= lastVersion) {
if (remainingWaitMs != 0)
// 阻塞等待(线程的wait机制)
wait(remainingWaitMs);
long elapsed = System.currentTimeMillis() - begin;
// 超时抛出异常
if (elapsed >= maxWaitMs)
throw new TimeoutException("Failed to update metadata after " + maxWaitMs + " ms.");
remainingWaitMs = maxWaitMs - elapsed;
}
}
metadata更新是在Sender线程中完成的, 最终通过NetworkClient用NIO的方式发送请求。逻辑如下:
public List<ClientResponse> poll(long timeout, long now) {
...
// 判断是否要更新metadata
long metadataTimeout = metadataUpdater.maybeUpdate(now);
try {
this.selector.poll(Utils.min(timeout, metadataTimeout, requestTimeoutMs));
} catch (IOException e) {
log.error("Unexpected error during I/O", e);
}
// process completed actions
long updatedNow = this.time.milliseconds();
List<ClientResponse> responses = new ArrayList<>();
handleCompletedSends(responses, updatedNow);
handleCompletedReceives(responses, updatedNow);
handleDisconnections(responses, updatedNow);
handleConnections();
handleInitiateApiVersionRequests(updatedNow);
handleTimedOutRequests(responses, updatedNow);
completeResponses(responses);
return responses;
}
public long maybeUpdate(long now) {
// 计算下一次更新元数据的时间(若是立即更新,则为0)
long timeToNextMetadataUpdate = metadata.timeToNextUpdate(now);
long waitForMetadataFetch = this.metadataFetchInProgress ? requestTimeoutMs : 0;
long metadataTimeout = Math.max(timeToNextMetadataUpdate, waitForMetadataFetch);
// 若时间未到,直接返回
if (metadataTimeout > 0) {
return metadataTimeout;
}
// 选择一个负载最小的节点
Node node = leastLoadedNode(now);
if (node == null) {
return reconnectBackoffMs;
}
// 可以发送 metadata 请求的话,就发送 metadata 请求
return maybeUpdate(now, node);
}
// 元数据是否要更新(立即更新、过期更新)
public synchronized long timeToNextUpdate(long nowMs) {
// 若needUpdate标志为true,则立即更新
// 若needUpdate标志为false,则看元数据是否过期(元数据已经超过metadata.max.age.ms时间没有更新,默认5分钟)
long timeToExpire = needUpdate ? 0 : Math.max(this.lastSuccessfulRefreshMs + this.metadataExpireMs - nowMs, 0);
// 是否需要更新(metadata更新失败时,为避免频繁更新,最小的间隔时间 retry.backoff.ms,默认值为100ms))
long timeToAllowUpdate = this.lastRefreshMs + this.refreshBackoffMs - nowMs;
return Math.max(timeToExpire, timeToAllowUpdate);
}
private long maybeUpdate(long now, Node node) {
String nodeConnectionId = node.idString();
if (canSendRequest(nodeConnectionId)) {
this.metadataFetchInProgress = true;
// 创建 metadata 请求
MetadataRequest.Builder metadataRequest;
if (metadata.needMetadataForAllTopics())
metadataRequest = MetadataRequest.Builder.allTopics();
else
metadataRequest = new MetadataRequest.Builder(new ArrayList<>(metadata.topics()),metadata.allowAutoTopicCreation());
// 发送 metadata 请求
sendInternalMetadataRequest(metadataRequest, nodeConnectionId, now);
return requestTimeoutMs;
}
...
return Long.MAX_VALUE;
}
private void sendInternalMetadataRequest(MetadataRequest.Builder builder,
String nodeConnectionId, long now) {
ClientRequest clientRequest = newClientRequest(nodeConnectionId, builder, now, true);
doSend(clientRequest, true, now);
}
private void handleCompletedReceives(List<ClientResponse> responses, long now) {
for (NetworkReceive receive : this.selector.completedReceives()) {
String source = receive.source();
InFlightRequest req = inFlightRequests.completeNext(source);
Struct responseStruct = parseStructMaybeUpdateThrottleTimeMetrics(receive.payload(), req.header,
throttleTimeSensor, now);
AbstractResponse body = createResponse(responseStruct, req.header);
if (req.isInternalRequest && body instanceof MetadataResponse)
metadataUpdater.handleCompletedMetadataResponse(req.header, now, (MetadataResponse) body);
else if (req.isInternalRequest && body instanceof ApiVersionsResponse)
handleApiVersionsResponse(responses, req, now, (ApiVersionsResponse) body);
else
responses.add(req.completed(body, now));
}
}
public void handleCompletedMetadataResponse(RequestHeader requestHeader, long now, MetadataResponse response) {
this.metadataFetchInProgress = false;
Cluster cluster = response.cluster();
...
if (cluster.nodes().size() > 0) {
this.metadata.update(cluster, response.unavailableTopics(), now);
} else {
this.metadata.failedUpdate(now);
}
}
public synchronized void update(Cluster cluster, Set<String> unavailableTopics, long now) {
Objects.requireNonNull(cluster, "cluster should not be null");
this.needUpdate = false;
this.lastRefreshMs = now;
this.lastSuccessfulRefreshMs = now;
this.version += 1;
if (topicExpiryEnabled) {
// Handle expiry of topics from the metadata refresh set.
for (Iterator<Map.Entry<String, Long>> it = topics.entrySet().iterator(); it.hasNext(); ) {
Map.Entry<String, Long> entry = it.next();
long expireMs = entry.getValue();
if (expireMs == TOPIC_EXPIRY_NEEDS_UPDATE)
entry.setValue(now + TOPIC_EXPIRY_MS);
else if (expireMs <= now) {
it.remove();
log.debug("Removing unused topic {} from the metadata list, expiryMs {} now {}", entry.getKey(), expireMs, now);
}
}
}
for (Listener listener: listeners)
listener.onMetadataUpdate(cluster, unavailableTopics);
String previousClusterId = cluster.clusterResource().clusterId();
if (this.needMetadataForAllTopics) {
// the listener may change the interested topics, which could cause another metadata refresh.
// If we have already fetched all topics, however, another fetch should be unnecessary.
this.needUpdate = false;
this.cluster = getClusterForCurrentTopics(cluster);
} else {
this.cluster = cluster;
}
// The bootstrap cluster is guaranteed not to have any useful information
if (!cluster.isBootstrapConfigured()) {
String clusterId = cluster.clusterResource().clusterId();
if (clusterId == null ? previousClusterId != null : !clusterId.equals(previousClusterId))
log.info("Cluster ID: {}", cluster.clusterResource().clusterId());
clusterResourceListeners.onUpdate(cluster.clusterResource());
}
notifyAll();
log.debug("Updated cluster metadata version {} to {}", this.version, this.cluster);
}
3.6、序列化
3.6.1、简介
生产者需要先用序列化器把对象转换成字节数组,再通过网络发送,然后消费者用反序列化器把收到的字节数组转换成相应的对象。
kafka提供的序列化器主要有:StringSerializer、ByteArraySerializer、 ByteBufferSerializer、 BytesSerializer、 DoubleSerializer、 IntegerSerializer 、 LongSerializer。
3.6.2、源码
序列化源码如下:
// 序列化key
byte[] serializedKey = keySerializer.serialize(record.topic(), record.headers(), record.key());
// 序列化value
byte[] serializedValue = valueSerializer.serialize(record.topic(), record.headers(), record.value());
// 如果指定用StringSerializer,则序列化逻辑如下:
public byte[] serialize(String topic, String data) {
...
if (data == null)
return null;
else
return data.getBytes("UTF8");
...
}
3.6.3、自定义序列化器
1、实现 Serializer 接口
public class UserSerializer implements Serializer<User> {
@Override
public byte[] serialize(String topic, User user) {
if (user == null) {
return null;
}
try {
byte[] name = user.getName() != null ? user.getName().getBytes("UTF-8") : new byte[0];
byte[] city = user.getCity() != null ? user.getCity().getBytes("UTF-8") : new byte[0];
ByteBuffer byteBuffer = ByteBuffer.allocate(4 + name.length + 4 + city.length);
byteBuffer.putInt(name.length);
byteBuffer.put(name);
byteBuffer.putInt(city.length);
byteBuffer.put(city);
return byteBuffer.array();
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
return new byte[0];
}
public void close() {
}
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
}
}
@Data
public class User {
private String name;
private String city;
}
2、配置序列化器
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, UserSerializer.class.getName());
3.7、分区器
3.7.1、规则
消息经过序列化后,需要确定要发送到哪个分区,分区选择规则:
- 如果指定了分区号,则使用指定的分区
- 如果未指定分区但有key,则对key进行哈希(MurmurHash2算法,具有高运算性能和低碰撞率),根据哈希值计算分区号,有相同key的消息会被写入同一个分区
- 如果不存在分区和key, 则第一次时产生一个随机数,后面每次自增1,从可用的分区中选择一个分区
3.7.2、源码
private int partition(ProducerRecord<K, V> record, byte[] serializedKey, byte[] serializedValue, Cluster cluster) {
Integer partition = record.partition();
// 如果指定了分区,则使用指定分区,否则用算法计算
return partition != null ?
partition :
partitioner.partition(
record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster);
}
默认的分区器:
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
// 1、若没有指定key
if (keyBytes == null) {
// 1.1、第一次时产生一个随机数,后面每次自增
int nextValue = nextValue(topic);
// 1.2、查询主题所有可用的分区
List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
// 1.3、从可用的分区中选择一个分区
if (availablePartitions.size() > 0) {
int part = Utils.toPositive(nextValue) % availablePartitions.size();
return availablePartitions.get(part).partition();
} else {
return Utils.toPositive(nextValue) % numPartitions;
}
} else {
// 2、若指定了key,对key进行哈希(MurmurHash2算法,具有高运算性能和低碰撞率),根据哈希值计算分区号,有相同key的消息会被写入同一个分区
return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
}
}
private int nextValue(String topic) {
AtomicInteger counter = topicCounterMap.get(topic);
if (null == counter) {
// 第一次时产生一个随机数
counter = new AtomicInteger(ThreadLocalRandom.current().nextInt());
AtomicInteger currentCounter = topicCounterMap.putIfAbsent(topic, counter);
if (currentCounter != null) {
counter = currentCounter;
}
}
// 后面每次自增1
return counter.getAndIncrement();
}
3.8、消息累加器
描述
消息累加器主要用于缓存消息,以便批量发送,可以减少网络传输消耗提升性能。如果生产者需要向很多分区发送消息,则可以调大buffer.memory提升整体吞吐量。
结构
- 消息累加器内部会为每个分区维护一个双端队列(Deque),队列中的内容就是ProducerBatch。
ConcurrentMap<TopicPartition, Deque<ProducerBatch>> batches
- 主线程发送过来的消息会被放入某个双端队列的尾部,Sender线程读取消息时,从队列的头部读取。
- ProducerBatch是一个消息批次(大小由batch.size参数配置),里面包含了一个或多个消息。
参数
可以通过参数配置大小:buffer.memory,默认33554432(32MB)。
3.8.1、数据结构
RecordAccumulator
// 消息累加器内部会为每个分区维护一个双端队列
private final ConcurrentMap<TopicPartition, Deque<ProducerBatch>> batches;
// 缓存池
private final BufferPool free;
// 批次大小(batch.size,默认16K)
private final int batchSize;
// 延迟时间(linger.ms,默认0)
private final int lingerMs;
// 重试时间(retry.backoff.ms,默认100)
private final long retryBackoffMs;
// 压缩类型(compression.type,默认none)
private final CompressionType compression;
// 是否关闭
private volatile boolean closed;
private final AtomicInteger flushesInProgress;
private final AtomicInteger appendsInProgress;
private final Time time;
private final ApiVersions apiVersions;
private final IncompleteBatches incomplete;
private final Set<TopicPartition> muted;
private int drainIndex;
private final TransactionManager transactionManager;
ProducerBatch
描述
ProducerBatch是一个消息批次(batch),里面包含了一个或多个消息。
当消息留转到RecordAccumulator时,先寻找对应分区的双端队列,在从队列尾部获取一个ProducerBatch,查看剩余空间,若还能写入则写入,否则创建一个新的ProducerBatch,在创建的时候,判断消息的大小是否超过batch.size,若不超过,则以batch.size大小创建ProducerBatch,这样这个ProductBatch以后可以通过BufferPool来复用; 若超过batch.size大小,创建的ProducerBatch不会被复用。
参数
大小可通过batch.size参数配置。
源码
final TopicPartition topicPartition;
int recordCount;
int maxRecordSize;
final ProduceRequestResult produceFuture;
private final List<Thunk> thunks = new ArrayList<>();
private final MemoryRecordsBuilder recordsBuilder;
private final AtomicInteger attempts = new AtomicInteger(0);
private final boolean isSplitBatch;
private final AtomicReference<FinalState> finalState = new AtomicReference<>(null);
final long createdMs;
private long lastAttemptMs;
private long lastAppendTime;
private long drainedMs;
private String expiryErrorMessage;
private boolean retry;
BufferPool
描述
在Kafka客户端,通过ByteBuffer实现消息内存的创建和释放。频繁的创建和释放很耗费资源,所以在RecordAccumulator内部维护了一个BufferPool,用于实现ByteBuffer的复用。不过BufferPool只针对特定大小(batch.size参数指定)的ByteBuffer进行管理,其他大小的ByteBuffer不会进入BufferPool,默认16384(16KB)。
源码
public class BufferPool {
// BufferPool的总大小
private final long totalMemory;
// 未分配的空间大小
private long availableMemory;
// 一个双端队列,里面缓存了指定大小的ByteBuffer对象
private final Deque<ByteBuffer> free;
// 可池化大小(默认16K)
private final int poolableSize;
private final ReentrantLock lock;
private final Deque<Condition> waiters;
private final Metrics metrics;
private final Time time;
private final Sensor waitTime;
}
3.8.2、流程
消息加入消息累加器的流程如下:
- 获取分区对应的双端队列,没有则创建
- 向分区对应的双端队列添加消息
2.1. 获取队列的最后一个批次,如果为空,返回 null
2.2. 否则在最后一个批次添加
2.2.1. 检查当前批次有没有足够的空间来添加消息,不能添加返回null
2.2.2. 能添加,则在当前批次添加 - 如果以上添加消息成功,直接返回,否则继续后面的流程
- 分配一块空间来存储消息(不大于16K则按16K分配,大于16K按指定大小分配)
4.1. 若申请的空间大小超过了总的内存大小(默认32M),则抛出异常
4.2. 若申请的空间大小为16k(可配置),并且缓存池不为空,则直接从缓存池中取
4.3. 若申请的空间大于16k,则需要分配一个新的空间
4.4. 判断可用空间(未分配的空间+缓存池)是否大于申请空间
4.4.1. 若是,则分配指定大小的空间,当未分配的空间不充足且缓存池不为空时,循环释放缓存池,直到未分配的空间足够大为止
4.4.2. 若否,阻塞等待 - 创建一个新的批次
- 将消息添加到新的批次中
- 将新的批次加入分区对应的双端队列中
3.8.3、源码
消息加入消息累加器的源码如下:
public RecordAppendResult append(TopicPartition tp,
long timestamp,
byte[] key,
byte[] value,
Header[] headers,
Callback callback,
long maxTimeToBlock,
boolean abortOnNewBatch,
long nowMs) throws InterruptedException {
ByteBuffer buffer = null;
try {
// 1、获取分区对应的双端队列,没有则创建
Deque<ProducerBatch> dq = getOrCreateDeque(tp);
// 在对queue进行操作时,加锁,保证线程安全
synchronized (dq) {
// 2、向分区对应的双端队列添加消息
RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq);
// 3、如果添加消息成功,直接返回
if (appendResult != null)
return appendResult;
}
...
int size = Math.max(this.batchSize, AbstractRecords.estimateSizeInBytesUpperBound(maxUsableMagic, compression, key, value, headers));
// 4、分配一块空间来存储消息(不大于16K则按16K分配,大于16K按指定大小分配)
buffer = free.allocate(size, maxTimeToBlock);
synchronized (dq) {
RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq, nowMs);
if (appendResult != null) {
return appendResult;
}
MemoryRecordsBuilder recordsBuilder = recordsBuilder(buffer, maxUsableMagic);
// 5、创建一个新的批次
ProducerBatch batch = new ProducerBatch(tp, recordsBuilder, time.milliseconds());
// 6、将消息添加到新的批次中
FutureRecordMetadata future = Utils.notNull(batch.tryAppend(timestamp, key, value, headers, callback, time.milliseconds()));
// 7、将新的批次放入分区对应的双端队列中
dq.addLast(batch);
incomplete.add(batch);
buffer = null;
return new RecordAppendResult(future, dq.size() > 1 || batch.isFull(), true);
}
} finally {
if (buffer != null)
free.deallocate(buffer);
appendsInProgress.decrementAndGet();
}
}
private RecordAppendResult tryAppend(long timestamp, byte[] key, byte[] value, Header[] headers,
Callback callback, Deque<ProducerBatch> deque, long nowMs) {
// 2.1、获取队列的最后一个批次,如果为空,返回 null
ProducerBatch last = deque.peekLast();
if (last != null) {
// 2.2、否则在最后一个批次添加
FutureRecordMetadata future = last.tryAppend(timestamp, key, value, headers, callback, time.milliseconds());
if (future == null)
last.closeForRecordAppends();
else
return new RecordAppendResult(future, deque.size() > 1 || last.isFull(), false);
}
return null;
}
public FutureRecordMetadata tryAppend(long timestamp, byte[] key, byte[] value, Header[] headers, Callback callback, long now) {
// 2.2.1、检查当前批次有没有足够的空间来添加消息,不能添加返回null
if (!recordsBuilder.hasRoomFor(timestamp, key, value, headers)) {
return null;
} else {
// 2.2.2、能添加,则在当前批次添加
Long checksum = this.recordsBuilder.append(timestamp, key, value, headers);
this.maxRecordSize = Math.max(this.maxRecordSize, AbstractRecords.estimateSizeInBytesUpperBound(magic(),
recordsBuilder.compressionType(), key, value, headers));
this.lastAppendTime = now;
FutureRecordMetadata future = new FutureRecordMetadata(this.produceFuture, this.recordCount,
timestamp, checksum,
key == null ? -1 : key.length,
value == null ? -1 : value.length);
thunks.add(new Thunk(callback, future));
this.recordCount++;
return future;
}
}
申请空间源码如下:
public ByteBuffer allocate(int size, long maxTimeToBlockMs) throws InterruptedException {
// 1、若申请的空间大小超过了总的内存大小(默认32M),则抛出异常
if (size > this.totalMemory)
throw new IllegalArgumentException("Attempt to allocate " + size
+ " bytes, but there is a hard limit of "
+ this.totalMemory
+ " on memory allocations.");
this.lock.lock();
try {
// 2、若申请的空间大小为16k(可配置),并且缓存池不为空,则直接从缓存池中取
if (size == poolableSize && !this.free.isEmpty())
return this.free.pollFirst();
// 3、若申请的空间大于16k,则分配一个新的空间
// 缓存池的大小
int freeListSize = freeSize() * this.poolableSize;
// 若总内存(缓存池的大小+可用内存大小)大小 >= 要申请的内存大小
// 4、判断可用空间(未分配的空间+缓存池)是否大于申请空间
if (this.availableMemory + freeListSize >= size) {
// 4.1、若是,则分配指定大小的空间,当未分配的空间不充足且缓存池不为空时,循环释放缓存池,直到未分配的空间足够大为止
freeUp(size);
ByteBuffer allocatedBuffer = allocateByteBuffer(size);
this.availableMemory -= size;
return allocatedBuffer;
} else {
// 4.2、若否,阻塞等待
int accumulated = 0;
ByteBuffer buffer = null;
boolean hasError = true;
Condition moreMemory = this.lock.newCondition();
try {
long remainingTimeToBlockNs = TimeUnit.MILLISECONDS.toNanos(maxTimeToBlockMs);
this.waiters.addLast(moreMemory);
while (accumulated < size) {
long startWaitNs = time.nanoseconds();
long timeNs;
boolean waitingTimeElapsed;
try {
waitingTimeElapsed = !moreMemory.await(remainingTimeToBlockNs, TimeUnit.NANOSECONDS);
} finally {
long endWaitNs = time.nanoseconds();
timeNs = Math.max(0L, endWaitNs - startWaitNs);
this.waitTime.record(timeNs, time.milliseconds());
}
if (waitingTimeElapsed) {
throw new TimeoutException("Failed to allocate memory within the configured max blocking time " + maxTimeToBlockMs + " ms.");
}
remainingTimeToBlockNs -= timeNs;
if (accumulated == 0 && size == this.poolableSize && !this.free.isEmpty()) {
buffer = this.free.pollFirst();
accumulated = size;
} else {
freeUp(size - accumulated);
int got = (int) Math.min(size - accumulated, this.availableMemory);
this.availableMemory -= got;
accumulated += got;
}
}
if (buffer == null)
buffer = allocateByteBuffer(size);
hasError = false;
return buffer;
} finally {
if (hasError)
this.availableMemory += accumulated;
this.waiters.remove(moreMemory);
}
}
} finally {
try {
if (!(this.availableMemory == 0 && this.free.isEmpty()) && !this.waiters.isEmpty())
this.waiters.peekFirst().signal();
} finally {
lock.unlock();
}
}
}
private void freeUp(int size) {
// 缓存池不为空且未分配的空间不充足时,则循环释放缓存池的空间,直到未分配的空间足够大为止
while (!this.free.isEmpty() && this.availableMemory < size)
this.availableMemory += this.free.pollLast().capacity();
}
释放空间源码如下:
public void deallocate(ByteBuffer buffer, int size) {
lock.lock();
try {
// 如果释放的ByteBuffer大小是16K,放入缓存池以重复使用
if (size == this.poolableSize && size == buffer.capacity()) {
buffer.clear();
this.free.add(buffer);
} else {
// 否则,GC回收
this.availableMemory += size;
}
// 唤醒阻塞等待空间分配的线程
Condition moreMem = this.waiters.peekFirst();
if (moreMem != null)
moreMem.signal();
} finally {
lock.unlock();
}
}
3.9、Sender
当生产者向 Kafka 发送消息时,它会先将消息写入消息累加器,并等待一段时间以将多个消息批量发送给 Broker。linger.ms 参数就是用来控制这个等待时间的。如果 linger.ms 设置为 0,那么生产者会立即将单个消息发送给 Broker,不进行任何批量操作。如果 linger.ms 设置为大于 0 的值,那么生产者会定期检查缓冲区中是否已经达到了 batch.size(批量大小)或者 linger.ms 时间,如果满足其中一个条件,则会唤醒sender线程发送消息。
3.8.1、数据结构
private final KafkaClient client;
// 消息累加器
private final RecordAccumulator accumulator;
// 集群元数据
private final Metadata metadata;
// 单个请求的最大字节大小,默认值是 1M
private final int maxRequestSize;
/**
* 生产者发送消息后,要求 Kafka 服务器返回的确认(acknowledgment)级别,默认值是 1
* 参数值如下:
* 1) ack=1, producer 只要收到 leader 副本写入成功的响应就认为推送成功了。
* 2)ack=0,producer 发送请求了就认为推送成功,不管实际是否推送成功。
* 3)ack=-1,producer 只有收到 partition 内所有副本写入成功通知才认为推送消息成功了。
*/
private final short acks;
// 生产者发送失败后的重试次数。默认是 0 次
private final int retries;
/* the clock instance used for getting the time */
private final Time time;
// 线程是否在运行中
private volatile boolean running;
/**
* 生产者发送请求后等待broker响应的最大时间。
* 过了最大响应时间如果配置了重试,生产者会再次发送这个请求。
* 重试次数用完仍然请求超时, 则认为是请求失败
* 默认值 30000 ms(即 30 秒)。
*/
private final int requestTimeoutMs;
/**
* 请求失败重发的间隔等待时间
* 生产者发送请求失败后可能会引起重新发送失败的请求,间隔时间目的是防止重发过快造成服务端压力过大
* 默认是 100 ms
*/
private final long retryBackoffMs;
/**
* 发送中的请求。
* key: 分区,value: 消息批次列表
*/
private final Map<TopicPartition, List<ProducerBatch>> inFlightBatches;
......
3.8.2、源码分析
public void run() {
......
while (running) {
runOnce();
}
......
}
void runOnce() {
......
long currentTimeMs = time.milliseconds();
long pollTimeout = sendProducerData(currentTimeMs);
client.poll(pollTimeout, currentTimeMs);
}
private long sendProducerData(long now) {
// 获取元数据
Cluster cluster = metadata.fetch();
// 获取准备好发送数据的分区列表
RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now);
// 如果有任何分区的leader是未知的,则强制更新元数据
if (!result.unknownLeaderTopics.isEmpty()) {
for (String topic : result.unknownLeaderTopics){
this.metadata.add(topic, now);
}
this.metadata.requestUpdate();
}
// 删除尚未就绪的节点
Iterator<Node> iter = result.readyNodes.iterator();
long notReadyTimeout = Long.MAX_VALUE;
while (iter.hasNext()) {
Node node = iter.next();
if (!this.client.ready(node, now)) {
iter.remove();
notReadyTimeout = Math.min(notReadyTimeout, this.client.pollDelayMs(node, now));
}
}
// 将原先的<分区,Deque<ProducerBatch>>的形式转换成<节点,List<ProducerBatch>>
Map<Integer, List<ProducerBatch>> batches = this.accumulator.drain(cluster, result.readyNodes, this.maxRequestSize, now);
addToInflightBatches(batches);
if (guaranteeMessageOrder) {
for (List<ProducerBatch> batchList : batches.values()) {
for (ProducerBatch batch : batchList)
this.accumulator.mutePartition(batch.topicPartition);
}
}
......
// 发送Produce请求
sendProduceRequests(batches, now);
return pollTimeout;
}
发送 produce请求源码如下:
private void sendProduceRequests(Map<Integer, List<ProducerBatch>> collated, long now) {
for (Map.Entry<Integer, List<ProducerBatch>> entry : collated.entrySet())
sendProduceRequest(now, entry.getKey(), acks, requestTimeoutMs, entry.getValue());
}
private void sendProduceRequest(long now, int destination, short acks, int timeout, List<ProducerBatch> batches) {
if (batches.isEmpty())
return;
final Map<TopicPartition, ProducerBatch> recordsByPartition = new HashMap<>(batches.size());
// find the minimum magic version used when creating the record sets
byte minUsedMagic = apiVersions.maxUsableProduceMagic();
for (ProducerBatch batch : batches) {
if (batch.magic() < minUsedMagic)
minUsedMagic = batch.magic();
}
ProduceRequestData.TopicProduceDataCollection tpd = new ProduceRequestData.TopicProduceDataCollection();
for (ProducerBatch batch : batches) {
TopicPartition tp = batch.topicPartition;
MemoryRecords records = batch.records();
if (!records.hasMatchingMagic(minUsedMagic))
records = batch.records().downConvert(minUsedMagic, 0, time).records();
ProduceRequestData.TopicProduceData tpData = tpd.find(tp.topic());
if (tpData == null) {
tpData = new ProduceRequestData.TopicProduceData().setName(tp.topic());
tpd.add(tpData);
}
tpData.partitionData().add(new ProduceRequestData.PartitionProduceData()
.setIndex(tp.partition())
.setRecords(records));
recordsByPartition.put(tp, batch);
}
......
ProduceRequest.Builder requestBuilder = ProduceRequest.forMagic(minUsedMagic,
new ProduceRequestData()
.setAcks(acks) // 设置ack
.setTimeoutMs(timeout) // 设置超时时间
.setTransactionalId(transactionalId) // 设置transactional id
.setTopicData(tpd)); // 设置topic数据
// 回调方法
RequestCompletionHandler callback = response -> handleProduceResponse(response, recordsByPartition, time.milliseconds());
// broker节点id
String nodeId = Integer.toString(destination);
// 创建Request
ClientRequest clientRequest = client.newClientRequest(nodeId, requestBuilder, now, acks != 0,
requestTimeoutMs, callback);
// 发送数据
client.send(clientRequest, now);
log.trace("Sent produce request to {}: {}", nodeId, requestBuilder);
}