PooledByteBufAllocator初始化
PoolThreadCache初始化
PoolAerna初始化
PoolChunk初始化
PoolSubpage初始化
PooledUnsafeDirectByteBuf初始化
分配微小型PooledByteBuf(未命中缓存场景)
分配小型PooledByteBuf(未命中缓存场景)
分配普通型PooledByteBuf(未命中缓存场景)
PoolChunkList源码解析
ReferenceCountUpdater源码解析
Recycler及基内部类初始化
Recycler.Stack<T> 压入对象
Recycler.Stack<T> 弹出对象
PooledByteBuf的回收
PoolSubpage boolean free(PoolSubpage<T> head, int bitmapIdx)
//subpage在使用中(在链表中)则返回true,否则返回false
boolean free(PoolSubpage<T> head, int bitmapIdx) {
if (elemSize == 0) {
return true;
}
//计算bitmap下标
int q = bitmapIdx >>> 6;
//计算在位图中的下标
int r = bitmapIdx & 63;
assert (bitmap[q] >>> r & 1) != 0;
//将位图中对应的位置为0
bitmap[q] ^= 1L << r;
//设置下一个可用
setNextAvail(bitmapIdx);
//可用数量为0的时候subpage已经不在链表中,需要将其加入链表中
if (numAvail ++ == 0) {
addToPool(head);
return true;
}
if (numAvail != maxNumElems) {
return true;
} else {
// Subpage 未被使用 (numAvail == maxNumElems)
if (prev == next) {
// 如果链表中只有一个subpage,那么不要删除它
return true;
}
// 如果链表中还有其它subpage,那么从链表中删除当前subpage
doNotDestroy = false;
removeFromPool();
return false;
}
}
private void addToPool(PoolSubpage<T> head) {
assert prev == null && next == null;
prev = head;
next = head.next;
next.prev = this;
head.next = this;
}
private void removeFromPool() {
assert prev != null && next != null;
prev.next = next;
next.prev = prev;
next = null;
prev = null;
}
PoolChunk free(long handle, ByteBuffer nioBuffer)
void free(long handle, ByteBuffer nioBuffer) {
//handle的低32位是memoryMapIdx
int memoryMapIdx = memoryMapIdx(handle);
//handle的高32们是bitmapIdx
int bitmapIdx = bitmapIdx(handle);
// bitmapIdx !=0 表明分配的是subpage
if (bitmapIdx != 0) {
//根据 memoryMapIdx 获取subpages中的某个subpage
PoolSubpage<T> subpage = subpages[subpageIdx(memoryMapIdx)];
assert subpage != null && subpage.doNotDestroy;
// 获取PoolArena拥有的PoolSubPage池的头并在其上进行同步。
// 这是必需的,因为我们可能会重新添加它,从而更改链表的结构。
PoolSubpage<T> head = arena.findSubpagePoolHead(subpage.elemSize);
synchronized (head) {
//前面已分析过 注意如果subpage已经不在链表中,那么接着处理下面的逻辑
if (subpage.free(head, bitmapIdx & 0x3FFFFFFF)) {
return;
}
}
}
//当前chunk的可用字节数增加
freeBytes += runLength(memoryMapIdx);
//设置当前节点为可用
setValue(memoryMapIdx, depth(memoryMapIdx));
//设置当前节点的父节点为可用
updateParentsFree(memoryMapIdx);
if (nioBuffer != null && cachedNioBuffers != null &&
cachedNioBuffers.size() < PooledByteBufAllocator.DEFAULT_MAX_CACHED_BYTEBUFFERS_PER_CHUNK) {
cachedNioBuffers.offer(nioBuffer);
}
}
private static int memoryMapIdx(long handle) {
return (int) handle;
}
private static int bitmapIdx(long handle) {
return (int) (handle >>> Integer.SIZE);
}
private int subpageIdx(int memoryMapIdx) {
return memoryMapIdx ^ maxSubpageAllocs; // remove highest set bit, to get offset
}
//PoolArena的findSubpagePoolHead
//根据elemSize获取某个PoolSubpage
PoolSubpage<T> findSubpagePoolHead(int elemSize) {
int tableIdx;
PoolSubpage<T>[] table;
if (isTiny(elemSize)) { // < 512
tableIdx = elemSize >>> 4;
table = tinySubpagePools;
} else {
tableIdx = 0;
elemSize >>>= 10;//除以1024
while (elemSize != 0) {
elemSize >>>= 1;
tableIdx ++;
}
table = smallSubpagePools;
}
return table[tableIdx];
}
private int runLength(int id) {
// represents the size in #bytes supported by node 'id' in the tree
return 1 << log2ChunkSize - depth(id);
}
private void setValue(int id, byte val) {
memoryMap[id] = val;
}
private void updateParentsFree(int id) {
int logChild = depth(id) + 1;
while (id > 1) {
int parentId = id >>> 1;
byte val1 = value(id);
byte val2 = value(id ^ 1);
logChild -= 1; // 在第一个迭代中等于log,随后在向上遍历时从logChild中减少1
//如果val1 == val2 == logChild,则表明两个子节点均可用,那么父节点的值就是其depth
if (val1 == logChild && val2 == logChild) {
setValue(parentId, (byte) (logChild - 1));
} else {
//否则存在一个子节点不可用,父节点的值就是可用子节点的depth
byte val = val1 < val2 ? val1 : val2;
setValue(parentId, val);
}
id = parentId;
}
}
PoolChunkList free(PoolChunk<T> chunk, long handle, ByteBuffer nioBuffer)
boolean free(PoolChunk<T> chunk, long handle, ByteBuffer nioBuffer) {
//前面已分析
chunk.free(handle, nioBuffer);
//如果chunk的使用率小于当前chunkList的使用率
if (chunk.usage() < minUsage) {
//将chunk从链表中删除
remove(chunk);
// 将PoolChunk向下移动到PoolChunkList链接列表中。
return move0(chunk);
}
return true;
}
private void remove(PoolChunk<T> cur) {
if (cur == head) {
head = cur.next;
if (head != null) {
head.prev = null;
}
} else {
PoolChunk<T> next = cur.next;
cur.prev.next = next;
if (next != null) {
next.prev = cur.prev;
}
}
}
private boolean move0(PoolChunk<T> chunk) {
if (prevList == null) {
// 前面没有PoolChunkList,所以返回false会导致PoolChunk被销毁,并且与PoolChunk关联的所有内存都将被释放。
assert chunk.usage() == 0;
return false;
}
return prevList.move(chunk);
}
private boolean move(PoolChunk<T> chunk) {
assert chunk.usage() < maxUsage;
if (chunk.usage() < minUsage) {
// 将PoolChunk向下移动到PoolChunkList链接列表中。
return move0(chunk);
}
// PoolChunk fits into this PoolChunkList, adding it here.
add0(chunk);
return true;
}
void add0(PoolChunk<T> chunk) {
chunk.parent = this;
if (head == null) {
head = chunk;
chunk.prev = null;
chunk.next = null;
} else {
chunk.prev = null;
chunk.next = head;
head.prev = chunk;
head = chunk;
}
}
PoolThreadCache.add 需要再熟悉PoolThreadCache初始化
boolean add(PoolArena<?> area, PoolChunk chunk, ByteBuffer nioBuffer,
long handle, int normCapacity, SizeClass sizeClass) {
//获取MemoryRegionCache
MemoryRegionCache<?> cache = cache(area, normCapacity, sizeClass);
if (cache == null) {
return false;
}
//缓存
return cache.add(chunk, nioBuffer, handle);
}
private MemoryRegionCache<?> cache(PoolArena<?> area, int normCapacity, SizeClass sizeClass) {
switch (sizeClass) {
case Normal:
return cacheForNormal(area, normCapacity);
case Small:
return cacheForSmall(area, normCapacity);
case Tiny:
return cacheForTiny(area, normCapacity);
default:
throw new Error();
}
}
//普通型 >=8192 小于16MB
private MemoryRegionCache<?> cacheForNormal(PoolArena<?> area, int normCapacity) {
if (area.isDirect()) {
//numShiftsNormalDirect=13 1<<13 = 8192
int idx = log2(normCapacity >> numShiftsNormalDirect);
return cache(normalDirectCaches, idx);
}
int idx = log2(normCapacity >> numShiftsNormalHeap);
return cache(normalHeapCaches, idx);
}
//小型 >=512 小于8192 smallSubPageDirectCaches的长度为4
private MemoryRegionCache<?> cacheForSmall(PoolArena<?> area, int normCapacity) {
int idx = PoolArena.smallIdx(normCapacity);
if (area.isDirect()) {
return cache(smallSubPageDirectCaches, idx);
}
return cache(smallSubPageHeapCaches, idx);
}
static int smallIdx(int normCapacity) {
int tableIdx = 0;
// 1<<10 = 1024
int i = normCapacity >>> 10;
while (i != 0) {
i >>>= 1;
tableIdx ++;
}
return tableIdx;
}
//微小型 小于512 tinySubPageHeapCaches的长度为32 下标的计算为normCapacity >>> 4
private MemoryRegionCache<?> cacheForTiny(PoolArena<?> area, int normCapacity) {
int idx = PoolArena.tinyIdx(normCapacity);
if (area.isDirect()) {
return cache(tinySubPageDirectCaches, idx);
}
return cache(tinySubPageHeapCaches, idx);
}
static int tinyIdx(int normCapacity) {
return normCapacity >>> 4;
}
private static <T> MemoryRegionCache<T> cache(MemoryRegionCache<T>[] cache, int idx) {
if (cache == null || idx > cache.length - 1) {
return null;
}
return cache[idx];
}
public final boolean add(PoolChunk<T> chunk, ByteBuffer nioBuffer, long handle) {
Entry<T> entry = newEntry(chunk, nioBuffer, handle);
boolean queued = queue.offer(entry);
if (!queued) {
// If it was not possible to cache the chunk, immediately recycle the entry
entry.recycle();
}
return queued;
}
private static Entry newEntry(PoolChunk<?> chunk, ByteBuffer nioBuffer, long handle) {
Entry entry = RECYCLER.get();
entry.chunk = chunk;
entry.nioBuffer = nioBuffer;
entry.handle = handle;
return entry;
}
private static final ObjectPool<Entry> RECYCLER = ObjectPool.newPool(new ObjectCreator<Entry>() {
@SuppressWarnings("unchecked")
@Override
public Entry newObject(Handle<Entry> handle) {
return new Entry(handle);
}
});
PoolArena.free
void free(PoolChunk<T> chunk, ByteBuffer nioBuffer, long handle, int normCapacity, PoolThreadCache cache) {
//非池化
if (chunk.unpooled) {
int size = chunk.chunkSize();
destroyChunk(chunk);
activeBytesHuge.add(-size);
deallocationsHuge.increment();
} else {
//池化
SizeClass sizeClass = sizeClass(normCapacity);
//先尝试缓存,前面已分析
if (cache != null && cache.add(this, chunk, nioBuffer, handle, normCapacity, sizeClass)) {
// cached so not free it.
return;
}
//未缓存则释放
freeChunk(chunk, handle, sizeClass, nioBuffer, false);
}
}
PoolArena freeChunk
void freeChunk(PoolChunk<T> chunk, long handle, SizeClass sizeClass, ByteBuffer nioBuffer, boolean finalizer) {
final boolean destroyChunk;
synchronized (this) {
我们仅在由于PoolThreadCache终结器而未调用freeChunk的情况下才调用此方法,否则由于延迟类加载而可能失败,例如tomcat。
if (!finalizer) {
switch (sizeClass) {
case Normal:
++deallocationsNormal;
break;
case Small:
++deallocationsSmall;
break;
case Tiny:
++deallocationsTiny;
break;
default:
throw new Error();
}
}
// PoolChunkList.free 前面已分析
destroyChunk = !chunk.parent.free(chunk, handle, nioBuffer);
}
if (destroyChunk) {
// 保持同步锁时不需要调用destroyChunk。
destroyChunk(chunk);
}
}
protected abstract void destroyChunk(PoolChunk<T> chunk); PoolArena子类实现此方法
DirectArena
@Override
protected void destroyChunk(PoolChunk<ByteBuffer> chunk) {
if (PlatformDependent.useDirectBufferNoCleaner()) {
PlatformDependent.freeDirectNoCleaner(chunk.memory);
} else {
PlatformDependent.freeDirectBuffer(chunk.memory);
}
}
HeapArena
@Override
protected void destroyChunk(PoolChunk<byte[]> chunk) {
// 依靠GC.
}
PooledByteBuf父类AbstractReferenceCountedByteBuf
初始化
属性
static final long REFCNT_FIELD_OFFSET | ReferenceCountUpdater.getUnsafeOffset(AbstractReferenceCountedByteBuf.class, "refCnt") | AbstractReferenceCountedByteBuf类refCnt属性的偏移量 |
---|
static final AtomicIntegerFieldUpdater<AbstractReferenceCountedByteBuf> AIF_UPDATER | AtomicIntegerFieldUpdater.newUpdater(AbstractReferenceCountedByteBuf.class, "refCnt") | AbstractReferenceCountedByteBuf类refCnt属性的原子性操作updater |
static final ReferenceCountUpdater<AbstractReferenceCountedByteBuf> updater |
new ReferenceCountUpdater<AbstractReferenceCountedByteBuf>() {
@Override
protected AtomicIntegerFieldUpdater<AbstractReferenceCountedByteBuf> updater() {
return AIF_UPDATER;
}
@Override
protected long unsafeOffset() {
return REFCNT_FIELD_OFFSET;
}
}; | 一个引用计数器,若真实的引用计数=1 则回收此ByteBuf |
volatile int refCnt | updater.initialValue() | 引用计数,初始值为2 |
构造方法
protected AbstractReferenceCountedByteBuf(int maxCapacity) {
super(maxCapacity);
}
回收
@Override
public boolean release() {
return handleRelease(updater.release(this));
}
updater.release(this) 参考 ReferenceCountUpdater源码解析
private boolean handleRelease(boolean result) {
if (result) {
deallocate();
}
return result;
}
//抽象方法,在子类PooledByteBuf中实现
protected abstract void deallocate();
PooledByteBuf deallocate() 解除分配
@Override
protected final void deallocate() {
if (handle >= 0) {
final long handle = this.handle;
this.handle = -1;
memory = null;
//前面已分析
chunk.arena.free(chunk, tmpNioBuf, handle, maxLength, cache);
tmpNioBuf = null;
chunk = null;
//回收对象
recycle();
}
}
先看recycle() stack.push(this) 参考 Recycler及基内部类初始化 Recycler.Stack<T> 压入对象
private void recycle() {
recyclerHandle.recycle(this);
}
//DefaultHandle的recycle方法
@Override
public void recycle(Object object) {
if (object != value) {
throw new IllegalArgumentException("object does not belong to handle");
}
Stack<?> stack = this.stack;
if (lastRecycledId != recycleId || stack == null) {
throw new IllegalStateException("recycled already");
}
stack.push(this);
}
②
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)