AIOSocketWR.java
5.43 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
package io.mycat.net;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.util.concurrent.atomic.AtomicBoolean;
import io.mycat.util.TimeUtil;
public class AIOSocketWR extends SocketWR
{
private static final AIOReadHandler aioReadHandler = new AIOReadHandler();
private static final AIOWriteHandler aioWriteHandler = new AIOWriteHandler();
private final AsynchronousSocketChannel channel;
protected final AbstractConnection con;
protected final AtomicBoolean writing = new AtomicBoolean(false);
public AIOSocketWR(AbstractConnection conn)
{
channel = (AsynchronousSocketChannel) conn.getChannel();
this.con = conn;
}
@Override
public void asynRead()
{
ByteBuffer theBuffer = con.readBuffer;
if (theBuffer == null)
{
theBuffer = con.processor.getBufferPool().allocate(con.processor.getBufferPool().getChunkSize());
con.readBuffer = theBuffer;
channel.read(theBuffer, this, aioReadHandler);
} else if (theBuffer.hasRemaining())
{
channel.read(theBuffer, this, aioReadHandler);
} else
{
throw new java.lang.IllegalArgumentException("full buffer to read ");
}
}
private void asynWrite(final ByteBuffer buffer)
{
buffer.flip();
this.channel.write(buffer, this, aioWriteHandler);
}
// public int flushChannel(final AsynchronousSocketChannel channel,
// final ByteBuffer bb, final long writeTimeout)
// {
//
// if (!bb.hasRemaining())
// {
// return 0;
// }
// int nWrite = bb.limit();
// try
// {
// while (bb.hasRemaining())
// {
// channel.write(bb).get(writeTimeout, TimeUnit.SECONDS);
// }
// } catch (Exception ie)
// {
// con.close("write failed " + ie);
//
// }
// return nWrite;
// }
/**
* return true ,means no more data
*
* @return
*/
private boolean write0()
{
if (!writing.compareAndSet(false, true))
{
return false;
}
ByteBuffer theBuffer = con.writeBuffer;
if (theBuffer == null || !theBuffer.hasRemaining())
{// writeFinished,但要区分bufer是否NULL,不NULL,要回收
if (theBuffer != null)
{
con.recycle(theBuffer);
con.writeBuffer = null;
}
// poll again
ByteBuffer buffer = con.writeQueue.poll();
// more data
if (buffer != null)
{
if (buffer.limit() == 0)
{
con.recycle(buffer);
con.writeBuffer = null;
con.close("quit cmd");
writing.set(false);
return true;
} else
{
con.writeBuffer = buffer;
asynWrite(buffer);
return false;
}
} else
{
// no buffer
writing.set(false);
return true;
}
} else
{
theBuffer.compact();
asynWrite(theBuffer);
return false;
}
}
protected void onWriteFinished(int result)
{
con.netOutBytes += result;
con.processor.addNetOutBytes(result);
con.lastWriteTime = TimeUtil.currentTimeMillis();
boolean noMoreData = this.write0();
if (noMoreData)
{
this.doNextWriteCheck();
}
}
public void doNextWriteCheck()
{
boolean noMoreData = false;
noMoreData = this.write0();
if (noMoreData
&& !con.writeQueue.isEmpty())
{
this.write0();
}
}
}
class AIOWriteHandler implements CompletionHandler<Integer, AIOSocketWR> {
@Override
public void completed(final Integer result, final AIOSocketWR wr) {
try {
wr.writing.set(false);
if (result >= 0) {
wr.onWriteFinished(result);
} else {
wr.con.close("write erro " + result);
}
} catch (Exception e) {
AbstractConnection.LOGGER.warn("caught aio process err:", e);
}
}
@Override
public void failed(Throwable exc, AIOSocketWR wr) {
wr.writing.set(false);
wr.con.close("write failed " + exc);
}
}
class AIOReadHandler implements CompletionHandler<Integer, AIOSocketWR>
{
@Override
public void completed(final Integer i, final AIOSocketWR wr)
{
// con.getProcessor().getExecutor().execute(new Runnable() {
// public void run() {
if (i > 0)
{
try
{
wr.con.onReadData(i);
wr.con.asynRead();
} catch (IOException e)
{
wr.con.close("handle err:" + e);
}
} else if (i == -1)
{
// System.out.println("read -1 xxxxxxxxx "+con);
wr.con.close("client closed");
}
// }
// });
}
@Override
public void failed(Throwable exc, AIOSocketWR wr)
{
wr.con.close(exc.toString());
}
}