LevelDBPool.java
3.69 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
package io.mycat.cache.impl;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import org.slf4j.Logger; import org.slf4j.LoggerFactory;
import org.iq80.leveldb.DB;
import io.mycat.cache.CachePool;
import io.mycat.cache.CacheStatic;
public class LevelDBPool implements CachePool {
private static final Logger LOGGER = LoggerFactory.getLogger(LevelDBPool.class);
private final DB cache;
private final CacheStatic cacheStati = new CacheStatic();
private final String name;
private final long maxSize;
public LevelDBPool(String name,DB db,long maxSize) {
this.cache = db;
this.name=name;
this.maxSize=maxSize;
cacheStati.setMaxSize(maxSize);
}
@Override
public void putIfAbsent(Object key, Object value) {
cache.put(toByteArray(key),toByteArray(value));
cacheStati.incPutTimes();
if (LOGGER.isDebugEnabled()) {
LOGGER.debug(name+" add leveldb cache ,key:" + key + " value:" + value);
}
}
@Override
public Object get(Object key) {
Object ob= toObject(cache.get(toByteArray(key)));
if (ob != null) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug(name+" hit cache ,key:" + key);
}
cacheStati.incHitTimes();
return ob;
} else {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug(name+" miss cache ,key:" + key);
}
cacheStati.incAccessTimes();
return null;
}
}
@Override
public void clearCache() {
LOGGER.info("clear cache "+name);
//cache.delete(key);
cacheStati.reset();
//cacheStati.setMemorySize(cache.g);
}
@Override
public CacheStatic getCacheStatic() {
/*
int i=0;
try {
// DBIterator iterator = cache.iterator();
for(cache.iterator().seekToFirst(); cache.iterator().hasNext(); cache.iterator().next()) {
i++;
}
cache.iterator().close();
} catch (Exception e) {
// Make sure you close the iterator to avoid resource leaks.
}
//long[] sizes = cache.getApproximateSizes(new Range(bytes("TESTDB"), bytes("TESTDC")));
*/
//cacheStati.setItemSize(cache.getSize());//sizes[0]);//需要修改leveldb的代码
cacheStati.setItemSize(cacheStati.getPutTimes());
return cacheStati;
}
@Override
public long getMaxSize() {
return maxSize;
}
public byte[] toByteArray (Object obj) {
byte[] bytes = null;
ByteArrayOutputStream bos = new ByteArrayOutputStream();
try {
ObjectOutputStream oos = new ObjectOutputStream(bos);
oos.writeObject(obj);
oos.flush();
bytes = bos.toByteArray ();
oos.close();
bos.close();
} catch (IOException ex) {
LOGGER.error("toByteArrayError", ex);
}
return bytes;
}
public Object toObject (byte[] bytes) {
Object obj = null;
if ((bytes==null) || (bytes.length<=0)) {
return obj;
}
try {
ByteArrayInputStream bis = new ByteArrayInputStream (bytes);
ObjectInputStream ois = new ObjectInputStream (bis);
obj = ois.readObject();
ois.close();
bis.close();
} catch (IOException ex) {
LOGGER.error("toObjectError", ex);
} catch (ClassNotFoundException ex) {
LOGGER.error("toObjectError", ex);
}
return obj;
}
}