NettyBufferPool.java
3.42 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
package io.mycat.buffer;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.PoolArenaMetric;
import io.netty.buffer.PoolChunkListMetric;
import io.netty.buffer.PoolChunkMetric;
import io.netty.util.internal.PlatformDependent;
import java.nio.ByteBuffer;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
/**
* 封装netty pooled Direct Memory 接口,为mycat提供内存分配功能
* 由于Mycat目前使用ByteBuffer,而Netty分配的是ByteBuf,为了管理ByteBuf
* 在MyCatMemoryAllocator中定义recycleMaps ByteBuffer(address) -->ByteBuf
* 的映射关系,通过address来回收ByteBuf.
*
* @author zagnix
* @create 2017-04-13
*/
public class NettyBufferPool implements BufferPool {
MyCatMemoryAllocator allocator;
private int chunkSize = 0;
public NettyBufferPool(int chunkSize) {
allocator = MyCatMemoryAllocator.getINSTANCE();
this.chunkSize = chunkSize;
}
@Override
public ByteBuffer allocate(int size) {
ByteBuf byteBuf = allocator.directBuffer(size);
ByteBuffer byteBuffer = byteBuf.nioBuffer(0, size);
allocator.recycleMaps.put(PlatformDependent.directBufferAddress(byteBuffer), byteBuf);
return byteBuffer;
}
@Override
public void recycle(ByteBuffer byteBuffer) {
ByteBuf byteBuf =
allocator.recycleMaps.get(PlatformDependent.directBufferAddress(byteBuffer));
if (byteBuf != null) {
byteBuf.release();
allocator.recycleMaps.remove(PlatformDependent.directBufferAddress(byteBuffer));
}
}
/**
* return memory allocator
*
* @return
*/
public MyCatMemoryAllocator getAllocator() {
return allocator;
}
/**
* TODO
* 下面函数需要将netty相关内存信息导出处理,然后实现
* 计算逻辑就是,
* 1.先计算PoolChunk分配的页,表示已经消耗的内存,
* 2.然后计算小于一页情况,记录小于一页内存使用情况,
* 上面二者合起来就是整个netty 使用的内存,
* 已经分配了,但是没有使用的内存的情况
*/
@Override
public long capacity() {
return size();
}
@Override
public long size() {
List<PoolArenaMetric> list = allocator.getAlloc().directArenas();
long chunkSizeBytes = allocator.getChunkSize();
int chunkCount = 0;
synchronized (this) {
/**PoolArenas*/
for (PoolArenaMetric pool : list) {
List<PoolChunkListMetric> pcks = pool.chunkLists();
/**针对PoolChunkList*/
for (PoolChunkListMetric pck : pcks) {
Iterator<PoolChunkMetric> it = pck.iterator();
while (it.hasNext()) {
PoolChunkMetric p = it.next();
chunkCount++;
}
}
}
}
return chunkCount * chunkSizeBytes;
}
@Override
public int getConReadBuferChunk() {
return 0;
}
@Override
public int getSharedOptsCount() {
return 0;
}
@Override
public int getChunkSize() {
return chunkSize;
}
@Override
public ConcurrentHashMap<Long, Long> getNetDirectMemoryUsage() {
return null;
}
@Override
public BufferArray allocateArray() {
return new BufferArray(this);
}
}