EngineCtx.java
5.41 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
package io.mycat.sqlengine;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;
import org.slf4j.Logger; import org.slf4j.LoggerFactory;
import io.mycat.manager.handler.ConfFileHandler;
import io.mycat.net.mysql.EOFPacket;
import io.mycat.net.mysql.EmptyPacket;
import io.mycat.net.mysql.ResultSetHeaderPacket;
import io.mycat.net.mysql.RowDataPacket;
import io.mycat.server.NonBlockingSession;
import io.mycat.server.ServerConnection;
public class EngineCtx {
public static final Logger LOGGER = LoggerFactory.getLogger(ConfFileHandler.class);
private final BatchSQLJob bachJob;
private AtomicInteger jobId = new AtomicInteger(0);
AtomicInteger packetId = new AtomicInteger(0);
private final NonBlockingSession session;
private AtomicBoolean finished = new AtomicBoolean(false);
private AllJobFinishedListener allJobFinishedListener;
private AtomicBoolean headerWrited = new AtomicBoolean();
private final ReentrantLock writeLock = new ReentrantLock();
private volatile boolean hasError = false;
public EngineCtx(NonBlockingSession session) {
this.bachJob = new BatchSQLJob();
this.session = session;
}
public byte incPackageId() {
return (byte) packetId.incrementAndGet();
}
public void executeNativeSQLSequnceJob(String[] dataNodes, String sql,
SQLJobHandler jobHandler) {
for (String dataNode : dataNodes) {
SQLJob job = new SQLJob(jobId.incrementAndGet(), sql, dataNode,
jobHandler, this);
bachJob.addJob(job, false);
}
}
public ReentrantLock getWriteLock() {
return writeLock;
}
public void setAllJobFinishedListener(
AllJobFinishedListener allJobFinishedListener) {
this.allJobFinishedListener = allJobFinishedListener;
}
public void executeNativeSQLParallJob(String[] dataNodes, String sql,
SQLJobHandler jobHandler) {
for (String dataNode : dataNodes) {
SQLJob job = new SQLJob(jobId.incrementAndGet(), sql, dataNode,
jobHandler, this);
bachJob.addJob(job, true);
}
}
/**
* set no more jobs created
*/
public void endJobInput() {
bachJob.setNoMoreJobInput(true);
}
public void writeHeader(List<byte[]> afields, List<byte[]> bfields) {
if (headerWrited.compareAndSet(false, true)) {
try {
writeLock.lock();
// write new header
ResultSetHeaderPacket headerPkg = new ResultSetHeaderPacket();
headerPkg.fieldCount = afields.size() +bfields.size()-1;
headerPkg.packetId = incPackageId();
LOGGER.debug("packge id " + headerPkg.packetId);
ServerConnection sc = session.getSource();
ByteBuffer buf = headerPkg.write(sc.allocate(), sc, true);
// wirte a fields
for (byte[] field : afields) {
field[3] = incPackageId();
buf = sc.writeToBuffer(field, buf);
}
// write b field
for (int i=1;i<bfields.size();i++) {
byte[] bfield = bfields.get(i);
bfield[3] = incPackageId();
buf = sc.writeToBuffer(bfield, buf);
}
// write field eof
EOFPacket eofPckg = new EOFPacket();
eofPckg.packetId = incPackageId();
buf = eofPckg.write(buf, sc, true);
sc.write(buf);
//LOGGER.info("header outputed ,packgId:" + eofPckg.packetId);
} finally {
writeLock.unlock();
}
}
}
public void writeHeader(List<byte[]> afields) {
if (headerWrited.compareAndSet(false, true)) {
try {
writeLock.lock();
// write new header
ResultSetHeaderPacket headerPkg = new ResultSetHeaderPacket();
headerPkg.fieldCount = afields.size();// -1;
headerPkg.packetId = incPackageId();
LOGGER.debug("packge id " + headerPkg.packetId);
ServerConnection sc = session.getSource();
ByteBuffer buf = headerPkg.write(sc.allocate(), sc, true);
// wirte a fields
for (byte[] field : afields) {
field[3] = incPackageId();
buf = sc.writeToBuffer(field, buf);
}
// write field eof
EOFPacket eofPckg = new EOFPacket();
eofPckg.packetId = incPackageId();
buf = eofPckg.write(buf, sc, true);
sc.write(buf);
//LOGGER.info("header outputed ,packgId:" + eofPckg.packetId);
} finally {
writeLock.unlock();
}
}
}
public void writeRow(RowDataPacket rowDataPkg) {
ServerConnection sc = session.getSource();
try {
writeLock.lock();
rowDataPkg.packetId = incPackageId();
// 输出完整的 记录到客户端
ByteBuffer buf = rowDataPkg.write(sc.allocate(), sc, true);
sc.write(buf);
//LOGGER.info("write row ,packgId:" + rowDataPkg.packetId);
} finally {
writeLock.unlock();
}
}
public void writeEof() {
ServerConnection sc = session.getSource();
EOFPacket eofPckg = new EOFPacket();
eofPckg.packetId = incPackageId();
ByteBuffer buf = eofPckg.write(sc.allocate(), sc, false);
sc.write(buf);
LOGGER.info("write eof ,packgId:" + eofPckg.packetId);
}
public NonBlockingSession getSession() {
return session;
}
public void onJobFinished(SQLJob sqlJob) {
boolean allFinished = bachJob.jobFinished(sqlJob);
if (allFinished && finished.compareAndSet(false, true)) {
if(!hasError){
LOGGER.info("all job finished for front connection: "
+ session.getSource());
allJobFinishedListener.onAllJobFinished(this);
}else{
LOGGER.info("all job finished with error for front connection: "
+ session.getSource());
}
}
}
public boolean isHasError() {
return hasError;
}
public void setHasError(boolean hasError) {
this.hasError = hasError;
}
}