...
|
...
|
@@ -54,7 +54,7 @@ public class CanalClientHandler implements InitializingBean, ApplicationEventPub |
|
|
|
|
|
@Override
|
|
|
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
|
|
|
List<CanalConfig.CanalInstance> canalInstancesList = canalConfig.getCanalInstance();
|
|
|
List<CanalConfig.CanalInstance> canalInstancesList = canalConfig.getCanalInstance();
|
|
|
if (!CollectionUtils.isEmpty(canalInstancesList)) {
|
|
|
threadPool = Executors.newFixedThreadPool(canalInstancesList.size());
|
|
|
logger.info("CanalInstance total count is [{}]", canalInstancesList.size());
|
...
|
...
|
@@ -85,7 +85,7 @@ public class CanalClientHandler implements InitializingBean, ApplicationEventPub |
|
|
}
|
|
|
|
|
|
private void startClient() {
|
|
|
List<CanalConfig.CanalInstance> canalInstancesList = canalConfig.getCanalInstance();
|
|
|
List<CanalConfig.CanalInstance> canalInstancesList = canalConfig.getCanalInstance();
|
|
|
for (CanalConfig.CanalInstance instance : canalInstancesList) {
|
|
|
threadPool.execute(() -> {
|
|
|
Thread.currentThread().setName(Thread.currentThread().getName() + "-" + instance.getDestination());
|
...
|
...
|
@@ -95,11 +95,11 @@ public class CanalClientHandler implements InitializingBean, ApplicationEventPub |
|
|
|
|
|
handleCanalMessage(canalInstConnectors.get(instance.getDestination()), instance);
|
|
|
}
|
|
|
}catch (CanalClientException e) {
|
|
|
} catch (CanalClientException e) {
|
|
|
//TODO 事件上报
|
|
|
logger.error("CanalClientHandler invoke thread error!", e);
|
|
|
reConnectCanal(instance);
|
|
|
}catch (Throwable e) {
|
|
|
} catch (Throwable e) {
|
|
|
// 避免未捕获的异常导致连接丢失
|
|
|
// TODO 事件上报
|
|
|
logger.error("CanalClientHandler invoke thread error!", e);
|
...
|
...
|
@@ -113,7 +113,7 @@ public class CanalClientHandler implements InitializingBean, ApplicationEventPub |
|
|
}
|
|
|
}
|
|
|
|
|
|
private void initConnector(CanalConfig.CanalInstance instance) throws CanalClientException{
|
|
|
private void initConnector(CanalConfig.CanalInstance instance) throws CanalClientException {
|
|
|
//暂且canal server端单实例 TODO 集群
|
|
|
logger.info("Start to connect to canal server for instance: {}", instance);
|
|
|
CanalConnector canalConnector = CanalConnectors.newSingleConnector(new InetSocketAddress(canalConfig.getSingleSocketAddress(), canalConfig.getSingleSocketPort()),
|
...
|
...
|
@@ -173,7 +173,7 @@ public class CanalClientHandler implements InitializingBean, ApplicationEventPub |
|
|
long nowtimemillis = System.currentTimeMillis();
|
|
|
//TODO 上报生产者的数量至influxdb
|
|
|
logger.info("handle canal config from [{}],currentTimeMillis is [{}],reportCount is [{}]",
|
|
|
config.getDestination(),nowtimemillis, reportCount);
|
|
|
config.getDestination(), nowtimemillis, reportCount);
|
|
|
} catch (CanalClientException e) {
|
|
|
// 由外围处理进行重连
|
|
|
throw e;
|
...
|
...
|
@@ -211,7 +211,7 @@ public class CanalClientHandler implements InitializingBean, ApplicationEventPub |
|
|
int reportCount = 0;
|
|
|
for (RowData rowData : rowChage.getRowDatasList()) {
|
|
|
//TODO MQ发送
|
|
|
producerRabbitMqMessageHelper.sendRabbitMqMessage(eventType,dbName,tableName, rowData);
|
|
|
producerRabbitMqMessageHelper.sendRabbitMqMessage(eventType, dbName, tableName, rowData);
|
|
|
reportCount++;
|
|
|
}
|
|
|
return reportCount;
|
...
|
...
|
|