MigrateMainRunner.java
3.54 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
package io.mycat.migrate;
import io.mycat.MycatServer;
import io.mycat.backend.datasource.PhysicalDBPool;
import io.mycat.backend.datasource.PhysicalDatasource;
import io.mycat.config.model.DBHostConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
/**
* Created by magicdoom on 2016/12/8.
*/
public class MigrateMainRunner implements Runnable {
private static final Logger LOGGER = LoggerFactory.getLogger(MigrateMainRunner.class);
private String dataHost;
private List<MigrateTask> migrateTaskList;
public MigrateMainRunner(String dataHost, List<MigrateTask> migrateTaskList) {
this.dataHost = dataHost;
this.migrateTaskList = migrateTaskList;
}
@Override public void run() {
AtomicInteger sucessTask=new AtomicInteger(0);
CountDownLatch downLatch=new CountDownLatch(migrateTaskList.size()) ;
for (MigrateTask migrateTask : migrateTaskList) {
MycatServer.getInstance().getBusinessExecutor().submit( new MigrateDumpRunner(migrateTask,downLatch,sucessTask)) ;
}
try {
downLatch.await(2, TimeUnit.HOURS) ;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
//同一个dataHost的任务合并执行,避免过多流量浪费
if(sucessTask.get()==migrateTaskList.size())
{
long binlogFileNum=-1;
String binlogFile="";
long pos=-1;
for (MigrateTask migrateTask : migrateTaskList) {
if(binlogFileNum==-1){
binlogFileNum=Integer.parseInt(migrateTask.getBinlogFile().substring(migrateTask.getBinlogFile().lastIndexOf(".")+1)) ;
binlogFile=migrateTask.getBinlogFile();
pos=migrateTask.getPos();
} else{
int tempBinlogFileNum=Integer.parseInt(migrateTask.getBinlogFile().substring(migrateTask.getBinlogFile().lastIndexOf(".")+1)) ;
if(tempBinlogFileNum<=binlogFileNum&&migrateTask.getPos()<=pos) {
binlogFileNum=tempBinlogFileNum;
binlogFile=migrateTask.getBinlogFile();
pos=migrateTask.getPos();
}
}
}
String taskPath=migrateTaskList.get(0).getZkpath();
taskPath=taskPath.substring(0,taskPath.lastIndexOf("/"));
String taskID=taskPath.substring(taskPath.lastIndexOf('/')+1,taskPath.length());
//开始增量数据迁移
PhysicalDBPool dbPool= MycatServer.getInstance().getConfig().getDataHosts().get(dataHost);
PhysicalDatasource datasource = dbPool.getSources()[dbPool.getActivedIndex()];
DBHostConfig config = datasource.getConfig();
BinlogStream stream=new BinlogStream(config.getUrl().substring(0,config.getUrl().indexOf(":")),config.getPort(),config.getUser(),config.getPassword());
try {
stream.setSlaveID(migrateTaskList.get(0).getSlaveId());
stream.setBinglogFile(binlogFile);
stream.setBinlogPos(pos);
stream.setMigrateTaskList(migrateTaskList);
BinlogStreamHoder.binlogStreamMap.put(taskID,stream);
stream.connect();
} catch (IOException e) {
LOGGER.error("error:",e);
}
}
}
}