...
|
...
|
@@ -90,23 +90,24 @@ public class CanalClientHandler implements InitializingBean, ApplicationEventPub |
|
|
for (CanalConfig.CanalInstance instance : canalInstancesList) {
|
|
|
threadPool.execute(() -> {
|
|
|
Thread.currentThread().setName(Thread.currentThread().getName() + "-" + instance.getDestination());
|
|
|
initConnector(instance);
|
|
|
while (!threadPool.isShutdown() && !threadPool.isTerminated()) {
|
|
|
try {
|
|
|
try {
|
|
|
initConnector(instance);
|
|
|
while (!threadPool.isShutdown() && !threadPool.isTerminated()) {
|
|
|
|
|
|
handleCanalMessage(canalInstConnectors.get(instance.getDestination()), instance);
|
|
|
} catch (CanalClientException e) {
|
|
|
//TODO 事件上报
|
|
|
logger.error("CanalClientHandler invoke thread error!", e);
|
|
|
reConnectCanal(instance);
|
|
|
} catch (Throwable e) {
|
|
|
// 避免未捕获的异常导致连接丢失
|
|
|
// TODO 事件上报
|
|
|
logger.error("CanalClientHandler invoke thread error!", e);
|
|
|
try {
|
|
|
Thread.sleep(1000);
|
|
|
} catch (InterruptedException ie) {
|
|
|
Thread.currentThread().interrupt();
|
|
|
}
|
|
|
}
|
|
|
}catch (CanalClientException e) {
|
|
|
//TODO 事件上报
|
|
|
logger.error("CanalClientHandler invoke thread error!", e);
|
|
|
reConnectCanal(instance);
|
|
|
}catch (Throwable e) {
|
|
|
// 避免未捕获的异常导致连接丢失
|
|
|
// TODO 事件上报
|
|
|
logger.error("CanalClientHandler invoke thread error!", e);
|
|
|
try {
|
|
|
Thread.sleep(1000);
|
|
|
} catch (InterruptedException ie) {
|
|
|
Thread.currentThread().interrupt();
|
|
|
}
|
|
|
}
|
|
|
});
|
...
|
...
|
|