1
|
package com.yoho.datasync.producer.canal;
|
1
|
package com.yoho.datasync.producer.canal;
|
2
|
|
2
|
|
|
|
3
|
+import com.sun.deploy.util.StringUtils;
|
|
|
4
|
+import com.yoho.datasync.core.base.message.TableConfig;
|
|
|
5
|
+import com.yoho.datasync.core.base.message.TableConfigLoader;
|
3
|
import lombok.Data;
|
6
|
import lombok.Data;
|
4
|
import org.springframework.boot.context.properties.ConfigurationProperties;
|
7
|
import org.springframework.boot.context.properties.ConfigurationProperties;
|
5
|
import org.springframework.stereotype.Component;
|
8
|
import org.springframework.stereotype.Component;
|
6
|
|
9
|
|
|
|
10
|
+import javax.annotation.PostConstruct;
|
|
|
11
|
+import javax.annotation.Resource;
|
|
|
12
|
+import java.util.ArrayList;
|
7
|
import java.util.List;
|
13
|
import java.util.List;
|
|
|
14
|
+import java.util.stream.Collectors;
|
8
|
|
15
|
|
9
|
@Component
|
16
|
@Component
|
10
|
@ConfigurationProperties(prefix = "canal")
|
17
|
@ConfigurationProperties(prefix = "canal")
|
|
@@ -16,6 +23,9 @@ public class CanalConfig { |
|
@@ -16,6 +23,9 @@ public class CanalConfig { |
16
|
|
23
|
|
17
|
private List<CanalInstance> canalInstance ;
|
24
|
private List<CanalInstance> canalInstance ;
|
18
|
|
25
|
|
|
|
26
|
+ @Resource
|
|
|
27
|
+ TableConfigLoader tableConfigLoader;
|
|
|
28
|
+
|
19
|
public List<CanalInstance> getCanalInstance() {
|
29
|
public List<CanalInstance> getCanalInstance() {
|
20
|
return canalInstance;
|
30
|
return canalInstance;
|
21
|
}
|
31
|
}
|
|
@@ -56,7 +66,27 @@ public class CanalConfig { |
|
@@ -56,7 +66,27 @@ public class CanalConfig { |
56
|
|
66
|
|
57
|
String filter;
|
67
|
String filter;
|
58
|
|
68
|
|
|
|
69
|
+ String dbname;
|
|
|
70
|
+
|
59
|
int fetchSize;
|
71
|
int fetchSize;
|
60
|
|
72
|
|
61
|
}
|
73
|
}
|
|
|
74
|
+
|
|
|
75
|
+ @PostConstruct
|
|
|
76
|
+ void buildInstanceFilter(){
|
|
|
77
|
+ canalInstance.forEach(instance -> {
|
|
|
78
|
+ String dbName = instance.getDbname();
|
|
|
79
|
+ String filter = buildCanalFilter(dbName);
|
|
|
80
|
+ instance.setFilter(filter);
|
|
|
81
|
+ });
|
|
|
82
|
+ }
|
|
|
83
|
+
|
|
|
84
|
+ private String buildCanalFilter(String dbName) {
|
|
|
85
|
+ List<TableConfig> dbTableConfigList = tableConfigLoader.getTableConfigs().stream().filter(a -> a.getDbName().equalsIgnoreCase(dbName)).collect(Collectors.toList());
|
|
|
86
|
+ List<String> filters = new ArrayList<>();
|
|
|
87
|
+ for (TableConfig tableConfig : dbTableConfigList) {
|
|
|
88
|
+ filters.add(dbName + "." + tableConfig.getTableName());
|
|
|
89
|
+ }
|
|
|
90
|
+ return StringUtils.join(filters, ",");
|
|
|
91
|
+ }
|
62
|
} |
92
|
} |