Authored by qinchao

nginx 同步功能 去掉ops-agent项目

... ... @@ -78,14 +78,14 @@
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
</dependency>
<dependency>
<!--<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
</dependency>-->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
... ...
... ... @@ -96,6 +96,12 @@
<artifactId>commons-collections</artifactId>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-exec</artifactId>
<version>1.3</version>
</dependency>
</dependencies>
</project>
\ No newline at end of file
... ...
package com.monitor.nginxsync.job;
import lombok.Getter;
import org.apache.commons.exec.LogOutputStream;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicInteger;
/**
* Created by yoho on 2016/8/22.
*/
public class CollectionLogOutputStream extends LogOutputStream {
@Getter
private String taskId;
private final List<String> lines = new CopyOnWriteArrayList<String>();
//public static final String LOCK = "LOCK";
public CollectionLogOutputStream(String taskId) {
this.taskId = taskId;
}
@Getter
private final AtomicInteger isFinished = new AtomicInteger(0);
@Override
protected void processLine(String s, int i) {
synchronized (taskId.intern()) {
lines.add(s);
}
}
public void addErrorLog(String s) {
synchronized (taskId.intern()) {
lines.add(s);
}
}
//截包
public List<String> getLog() {
List<String> logList = new ArrayList<String>();
synchronized (taskId.intern()) {
if (!lines.isEmpty()) {
String str = "";
for (int i = 0; i < lines.size(); i++) {
int length = lines.get(i).length() + str.length();
if (length > 924) {
logList.add(String.copyValueOf(str.toCharArray()));
str = "";
}
str += (lines.get(i) + "\r\n");
}
logList.add(str);
lines.clear();
}
}
return logList;
}
}
... ...
package com.monitor.nginxsync.job;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.monitor.common.util.SpringContextUtils;
import com.monitor.nginxsync.constant.Constants;
import com.monitor.nginxsync.nio.model.LogMsg;
import com.monitor.nginxsync.nio.service.LogService;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
import java.util.concurrent.Callable;
/**
* Created by yoho on 2016/8/22.
*/
public class LogJob implements Callable {
public static final Logger DEBUG = LoggerFactory.getLogger(LogJob.class);
private CollectionLogOutputStream outputStream;
private String taskId;
public LogJob(CollectionLogOutputStream outputStream) {
this.outputStream = outputStream;
this.taskId = outputStream.getTaskId();
}
@Override
public Object call() {
while (true) {
//判断任务是否结束
if (1 == outputStream.getIsFinished().get()) {
break;
}
List<String> msgList = outputStream.getLog();
try {
sendByHttp( msgList);
//1s 间歇
Thread.sleep(1000);
} catch (Exception e) {
DEBUG.error("Failed to send log ,error {}", e);
}
}
//补发剩余的日志
List<String> msgList = outputStream.getLog();
sendByHttp( msgList);
//发送结束日志
LogMsg logMsg = new LogMsg();
logMsg.setMsg("");
logMsg.setIsFinished(1);
logMsg.setTaskId(taskId);
try {
//再一次send
sendMsg(logMsg);
} catch (Exception e) {
e.printStackTrace();
}
return null;
}
private void sendByHttp(List<String> msgList) {
for (String msg : msgList) {
if (StringUtils.isNotBlank(msg)) {
while (msg.length() < 1024) {
msg = msg + " ";
}
LogMsg logMsg = new LogMsg();
logMsg.setMsg(msg);
logMsg.setIsFinished(0);
logMsg.setTaskId(taskId);
try {
//
sendMsg(logMsg);
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
private void sendMsg(LogMsg logMsg){
LogService logService=SpringContextUtils.getBeanByClass(LogService.class);
logService.inLogMsg(logMsg);
}
}
... ...
package com.monitor.nginxsync.job;
import com.monitor.common.ProjectConstant;
import com.monitor.nginxsync.model.TaskInfo;
import org.apache.commons.exec.*;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* Created by yoho on 2016/8/22.
*/
public class ShellJob implements Callable {
public ExecutorService service = Executors.newSingleThreadExecutor();
public TaskInfo cmdInfo;
public ShellJob(TaskInfo cmdInfo) {
this.cmdInfo = cmdInfo;
}
@Override
public Object call() throws Exception {
//ssh master@10.xxx /bin/bash /home/master/nginx-sync/xxx.sh args
CommandLine commandLine = CommandLine.parse("ssh master@"+ ProjectConstant.DEPLOY_IP+" "+cmdInfo.getCmd());
DefaultExecutor executor = new DefaultExecutor();
ExecuteWatchdog watchdog = new ExecuteWatchdog(5 * 60000);
CollectionLogOutputStream outputStream = new CollectionLogOutputStream(cmdInfo.getTaskId());
PumpStreamHandler pumpStreamHandler = new PumpStreamHandler(outputStream, outputStream);
executor.setStreamHandler(pumpStreamHandler);
executor.setWatchdog(watchdog);
executor.setExitValue(0);
service.submit(new LogJob(outputStream));
executor.execute(commandLine, new ExitHandler(outputStream, service));
return null;
}
public static class ExitHandler implements ExecuteResultHandler {
private CollectionLogOutputStream outputStream;
private ExecutorService service;
public ExitHandler(CollectionLogOutputStream outputStream, ExecutorService service) {
this.outputStream = outputStream;
this.service = service;
}
@Override
public void onProcessComplete(int i) {
outputStream.getIsFinished().set(1);
service.shutdown();
}
@Override
public void onProcessFailed(ExecuteException e) {
//执行异常的日志
outputStream.addErrorLog(e.getMessage());
outputStream.getIsFinished().set(1);
service.shutdown();
}
}
}
... ...
package com.monitor.nginxsync.service;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.monitor.nginxsync.job.ShellJob;
import com.monitor.nginxsync.model.TaskInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
/**
* 执行shell 命令服务
*
* @author halo_
* @creat 2016-08-2016/8/22 13:18
**/
@Service
public class NginxSyncCmdServices {
public static final Logger logger = LoggerFactory.getLogger("switchLogger");
private final ThreadFactory threadFactory = new ThreadFactoryBuilder()
.setNameFormat("NginxSync-Orders-%d")
.setDaemon(true)
.build();
private int INIT_THREADS = 10;
private final ExecutorService EXECUTORSER = Executors.newFixedThreadPool(INIT_THREADS, threadFactory);
public void doHandle(TaskInfo taskInfo) {
logger.info("NginxSyncCmdServices do handle {} ",taskInfo);
EXECUTORSER.submit(new ShellJob(taskInfo));
}
}
... ...
... ... @@ -25,18 +25,14 @@ import org.springframework.web.client.RestTemplate;
*/
@RestController
@RequestMapping(value = "/nginxsync")
public class NginxSyncService {
public static final Logger DEBUG = LoggerFactory.getLogger(NginxSyncService.class);
@Value("${nginxsync.agent}")
String agent;
public class NginxSyncCtrl {
public static final Logger DEBUG = LoggerFactory.getLogger(NginxSyncCtrl.class);
@Autowired
LogService logService;
@Autowired
RestTemplate restTemplate;
NginxSyncCmdServices nginxSyncCmdServices;
@RequestMapping(value = "/pushtask")
public BaseResponse pushTask(@RequestBody TaskInfo taskInfo) {
... ... @@ -132,24 +128,12 @@ public class NginxSyncService {
cmdTaskInfo.setTaskId(String.valueOf(System.currentTimeMillis()));
BaseResponse response = new BaseResponse();
taskInfo.setTaskId(cmdTaskInfo.getTaskId());
response.setData(taskInfo);
response.setCode(200);
try {
restTemplate.postForObject("http://" + agent + "/handle", cmdTaskInfo, String.class);
} catch (Exception e) {
DEBUG.error("Failed to dispatch nginx sync task {}...error {}", cmdTaskInfo, e);
response.setCode(500);
response.setMessage("agent error: " + e.getMessage());
}
nginxSyncCmdServices.doHandle(taskInfo);
response.setCode(200);
//返回结果
return response;
}
... ... @@ -175,8 +159,6 @@ public class NginxSyncService {
logMsgResp.setTaskId(logMsg.getTaskId());
String str = StringUtils.EMPTY;
logMsgResp.setLog(logMsg.getMsg());
return logMsgResp;
... ...