作者简介:大家好,我是smart哥,前中兴通讯、美团架构师,现某互联网公司CTO
联系qq:184480602,加我进群,大家一起学习,一起进步,一起对抗互联网寒冬
学习必须往深处挖,挖的越深,基础越扎实!
阶段1、深入多线程
阶段2、深入多线程设计模式
阶段3、深入juc源码解析
阶段4、深入jdk其余源码解析
阶段5、深入jvm源码解析
码哥源码部分
码哥讲源码-原理源码篇【2024年最新大厂关于线程池使用的场景题】
码哥讲源码【炸雷啦!炸雷啦!黄光头他终于跑路啦!】
码哥讲源码-【jvm课程前置知识及c/c++调试环境搭建】
码哥讲源码-原理源码篇【揭秘join方法的唤醒本质上决定于jvm的底层析构函数】
码哥源码-原理源码篇【Doug Lea为什么要将成员变量赋值给局部变量后再操作?】
码哥讲源码【你水不是你的错,但是你胡说八道就是你不对了!】
码哥讲源码【谁再说Spring不支持多线程事务,你给我抽他!】
终结B站没人能讲清楚红黑树的历史,不服等你来踢馆!
打脸系列【020-3小时讲解MESI协议和volatile之间的关系,那些将x86下的验证结果当作最终结果的水货们请闭嘴】
KafkaProducer在通过send
方法发送消息时,会首先将消息追加到一个名为 RecordAccumulator 的组件中。RecordAccumulator又名消息累加器,可以看成是KafkaProducer的一块消息缓冲区,主要用来按批次缓存消息,以便 Sender 线程可以批量发送,进而减少网络传输的资源消耗以提升性能。
// KafkaProducer.java RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey, serializedValue, interceptCallback, remainingWaitMs);
本章,我就来讲解RecordAccumulator的内部结构,以及它是如何对消息进行按批次缓存处理的。
我们先来看下RecordAccumulator的基本构造:
// RecordAccumulator.java public final class RecordAccumulator { private volatile boolean closed; private final AtomicInteger flushesInProgress; private final AtomicInteger appendsInProgress; private final int batchSize; private final CompressionType compression; private final long lingerMs; private final long retryBackoffMs; // 缓冲池,里面是一个个ByteBuffer private final BufferPool free; private final Time time; // 分区和一批次消息的映射Map private final ConcurrentMap> batches; private final IncompleteRecordBatches incomplete; private final Set muted; private int drainIndex; public RecordAccumulator(int batchSize, long totalSize, CompressionType compression, long lingerMs, long retryBackoffMs, Metrics metrics, Time time) { this.drainIndex = 0; this.closed = false; this.flushesInProgress = new AtomicInteger(0); this.appendsInProgress = new AtomicInteger(0); this.batchSize = batchSize; this.compression = compression; this.lingerMs = lingerMs; this.retryBackoffMs = retryBackoffMs; this.batches = new CopyOnWriteMap<>(); String metricGrpName = "producer-metrics"; // 创建内部的BufferPool this.free = new BufferPool(totalSize, batchSize, metrics, time, metricGrpName); this.incomplete = new IncompleteRecordBatches(); this.muted = new HashSet<>(); this.time = time; registerMetrics(metrics, metricGrpName); } }
上述有两个比较重要的地方:
>
,因为对分区的操作基本都是并发且读多写少的,所以适合”写时复制“算法。我们先来看BufferPool,这是一块内存缓冲区,默认大小32MB,可以通过参数buffer.memory
控制:
// BufferPool.java public final class BufferPool { // 缓冲池大小,默认32MB,通过参数buffer.memory控制 private final long totalMemory; // batch大小,也就是一个ByteBuffer的大小,默认16KB,通过batch.size控制 private final int poolableSize; private final ReentrantLock lock; // 可用 private final Deque free; private final Deque waiters; private long availableMemory; private final Metrics metrics; private final Time time; private final Sensor waitTime; public BufferPool(long memory, int poolableSize, Metrics metrics, Time time, String metricGrpName) { this.poolableSize = poolableSize; this.lock = new ReentrantLock(); this.free = new ArrayDeque(); this.waiters = new ArrayDeque(); this.totalMemory = memory; this.availableMemory = memory; this.metrics = metrics; this.time = time; this.waitTime = this.metrics.sensor("bufferpool-wait-time"); MetricName metricName = metrics.metricName("bufferpool-wait-ratio", metricGrpName, "The fraction of time an appender waits for space allocation."); this.waitTime.add(metricName, new Rate(TimeUnit.NANOSECONDS)); } //... }
对BufferPool的操作,本质就是对它内部的ByteBuffer的操作。BufferPool内部有一个Deque队列,缓存了可用的ByteBuffer,也就是缓存了一批内存空间,每个ByteBuffer都是16kb,即默认的batch大小。
Deque里ByteBuffer数量 * 16kb
就是已使用的缓存空间大小,availableMemory
就是剩余可使用的缓存空间大小,最大32mb,每用掉一个batch,就要减去batchSize的大小,即132mb - 16kb
。
另外,当调用方想要获取可用ByteBuffer,但是BufferPool可用空间又不足时,调用线程会阻塞,由参数max.block.ms
控制:
// BufferPool.java public ByteBuffer allocate(int size, long maxTimeToBlockMs) throws InterruptedException { if (size > this.totalMemory) throw new IllegalArgumentException("Attempt to allocate " + size + " bytes, but there is a hard limit of " + this.totalMemory + " on memory allocations."); this.lock.lock(); try { // 有可用空间,且要分配的ByteBuffer块大小就是poolableSize if (size == poolableSize && !this.free.isEmpty()) return this.free.pollFirst(); // 计算剩余可用空间 int freeListSize = this.free.size() * this.poolableSize; if (this.availableMemory + freeListSize >= size) { freeUp(size); this.availableMemory -= size; lock.unlock(); return ByteBuffer.allocate(size); } else { //... } } finally { if (lock.isHeldByCurrentThread()) lock.unlock(); } }
RecordAccumulator会按照分区,将同一个分区的消息打包成一个个 RecordBatch ,每一个RecordBatch可能包含多条消息,这些消息在内存里是按照一定的格式紧凑拼接的:
// RecordBatch.java public final class RecordBatch { final long createdMs; final TopicPartition topicPartition; final ProduceRequestResult produceFuture; private final List thunks = new ArrayList<>(); // 内存消息构建器,这个很重要,最终是它将消息拼接 private final MemoryRecordsBuilder recordsBuilder; volatile int attempts; int recordCount; int maxRecordSize; long drainedMs; long lastAttemptMs; long lastAppendTime; private String expiryErrorMessage; private AtomicBoolean completed; private boolean retry; public RecordBatch(TopicPartition tp, MemoryRecordsBuilder recordsBuilder, long now) { this.createdMs = now; this.lastAttemptMs = now; this.recordsBuilder = recordsBuilder; this.topicPartition = tp; this.lastAppendTime = createdMs; this.produceFuture = new ProduceRequestResult(topicPartition); this.completed = new AtomicBoolean(); } // 在内存里拼接消息 public FutureRecordMetadata tryAppend(long timestamp, byte[] key, byte[] value, Callback callback, long now) { // 空间不足 if (!recordsBuilder.hasRoomFor(key, value)) { return null; } else { // 通过MemoryRecordsBuilder,追加消息到内存 long checksum = this.recordsBuilder.append(timestamp, key, value); this.maxRecordSize = Math.max(this.maxRecordSize, Record.recordSize(key, value)); this.lastAppendTime = now; FutureRecordMetadata future = new FutureRecordMetadata(this.produceFuture, this.recordCount, timestamp, checksum, key == null ? -1 : key.length, value == null ? -1 : value.length); if (callback != null) thunks.add(new Thunk(callback, future)); this.recordCount++; return future; } } //... }
可以看到,消息追加的操作最终是通过 MemoryRecordsBuilder 完成的,每一条消息都是以crc|magic|attribute|timestamp...这样的格式最终追加到分配到ByteBuffer中:
// MemoryRecordsBuilder.java public long append(long timestamp, byte[] key, byte[] value) { return appendWithOffset(lastOffset < 0 ? baseOffset : lastOffset + 1, timestamp, key, value); } public long appendWithOffset(long offset, long timestamp, byte[] key, byte[] value) { try { if (lastOffset >= 0 && offset <= lastOffset) throw new IllegalArgumentException(String.format("Illegal offset %s following previous offset %s (Offsets must increase monotonically).", offset, lastOffset)); int size = Record.recordSize(magic, key, value); // LogEntry日志项,appendStream就是由ByteBuffer转化而来 LogEntry.writeHeader(appendStream, toInnerOffset(offset), size); if (timestampType == TimestampType.LOG_APPEND_TIME) timestamp = logAppendTime; long crc = Record.write(appendStream, magic, timestamp, key, value, CompressionType.NONE, timestampType); recordWritten(offset, timestamp, size + Records.LOG_OVERHEAD); return crc; } catch (IOException e) { throw new KafkaException("I/O exception when writing to the append stream, closing", e); } }
了解了RecordAccumulator内部的几个重要组件,我们再来看消息缓存的整体流程。
KafkaProducer发送消息时,内部调用了RecordAccumulator.append
方法,消息会被追加到 RecordAccumulator 内部的某个双端队列( Deque )中,并且多个消息会被打包成一个批次——RecordBatch:
消息追加的整体流程是通过RecordAccumulator.append()
方法完成的:
Deque
; // RecordAccumulator.java public RecordAppendResult append(TopicPartition tp, long timestamp, byte[] key, byte[] value, Callback callback, long maxTimeToBlock) throws InterruptedException { appendsInProgress.incrementAndGet(); try { // 1.根据分区,从内部的CopyOnWriteMap获取或新建一个双端队列 Deque dq = getOrCreateDeque(tp); synchronized (dq) { if (closed) throw new IllegalStateException("Cannot send after the producer is closed."); // 尝试往Dequeue中追加消息,不存在可用Batch或Batch可用空间不足会追加失败 RecordAppendResult appendResult = tryAppend(timestamp, key, value, callback, dq); if (appendResult != null) // 追加成功 return appendResult; } // 2.执行到这里,说明Dequeue队尾没有可用batch,或有batch但可用空间不足 // 计算待新建的batch大小 int size = Math.max(this.batchSize, Records.LOG_OVERHEAD + Record.recordSize(key, value)); log.trace("Allocating a new {} byte message buffer for topic {} partition {}", size, tp.topic(), tp.partition()); // 从BufferPool中获取一块可用的ByteBuffer,如果空间不足会阻塞 ByteBuffer buffer = free.allocate(size, maxTimeToBlock); synchronized (dq) { if (closed) throw new IllegalStateException("Cannot send after the producer is closed."); // 再次追加消息,不存在可用Batch会追加失败 RecordAppendResult appendResult = tryAppend(timestamp, key, value, callback, dq); // 这里用了一个双重锁检查,主要针对多个线程同时获取多个ByteBuffer的情况进行处理 if (appendResult != null) { // 归还buffer free.deallocate(buffer); return appendResult; } // 3.执行到这里,说明是首次往Deque存入batch // MemoryRecordsBuilder负责真正的消息往ByteBuffer写入 MemoryRecordsBuilder recordsBuilder = MemoryRecords.builder(buffer, compression, TimestampType.CREATE_TIME, this.batchSize); // 创建一个RecordBatch并入队,持有MemoryRecordsBuilder RecordBatch batch = new RecordBatch(tp, recordsBuilder, time.milliseconds()); FutureRecordMetadata future = Utils.notNull(batch.tryAppend(timestamp, key, value, callback, time.milliseconds())); dq.addLast(batch); incomplete.add(batch); return new RecordAppendResult(future, dq.size() > 1 || batch.isFull(), true); } } finally { appendsInProgress.decrementAndGet(); } }
// RecordAccumulator.java // 新建或获取已存在的Deque private Deque getOrCreateDeque(TopicPartition tp) { // 从内部的CopyOnWriteMap获取 Deque d = this.batches.get(tp); if (d != null) return d; // 如果不存在,则新建一个 d = new ArrayDeque<>(); Deque previous = this.batches.putIfAbsent(tp, d); if (previous == null) return d; else return previous; } // 尝试向Deque中追加消息 private RecordAppendResult tryAppend(long timestamp, byte[] key, byte[] value, Callback callback, Deque deque) { // 拿出队尾的Batch RecordBatch last = deque.peekLast(); if (last != null) { FutureRecordMetadata future = last.tryAppend(timestamp, key, value, callback, time.milliseconds()); if (future == null) last.close(); else return new RecordAppendResult(future, deque.size() > 1 || last.isFull(), false); } return null; }
一个RecordBatch内部持有一个ByteBuffer,里面可能存放好几条消息,可以通过batch.size
参数可以控制batch的大小,默16KB。所以在实际生产环境中,以下参数都是必须经过调优的:
你必须要根据自己实际发送的消息大小来设置request.max.size
和batch.size
,否则如果消息大小频繁超过了batch.sizse
的话,那就是一条消息一个批次,起不到提升吞吐量的效果。
本章,我对RecordAccumulator的内存结构和消息缓存的底层原理进行了讲解。这里总结一下:
offset | size | crc | magic | attibutes | timestamp | key size | key | value size | value
写入到底层的ByteBuffer里去。