Authored by jack

修改netty拆包问题

... ... @@ -6,6 +6,7 @@ import com.monitor.nginxsync.nio.service.LogService;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
... ... @@ -21,23 +22,14 @@ public class LogMsgHandler extends SimpleChannelInboundHandler {
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, Object msg) throws Exception {
ByteBuf result = (ByteBuf) msg;
String recvMsg = StringUtils.trim((String) msg);
byte[] resultWarp = new byte[result.readableBytes()];
DEBUG.info("recv log msg {}", recvMsg);
// msg中存储的是ByteBuf类型的数据,把数据读取到byte[]中
result.readBytes(resultWarp);
DEBUG.info("recv log msg {}", new String(resultWarp));
LogMsg logMsg = Constants.OBJECT_MAPPER.readValue(resultWarp, LogMsg.class);
LogMsg logMsg = Constants.OBJECT_MAPPER.readValue(recvMsg, LogMsg.class);
// 插入日志管理
LOG_SERVICE.inLogMsg(logMsg);
// 释放资源,这行很关键
result.release();
}
@Override
... ...
... ... @@ -15,5 +15,5 @@ public class LogMsg {
int isFinished ;
List<String> msgList = new ArrayList<>();
String msg ;
}
... ...
... ... @@ -33,10 +33,9 @@ public class LogService {
List<String> logList = LOGMAPPER.get(taskId);
for (String logItem : logMsg.getMsgList()) {
logList.add(logItem);
}
logList.add(logMsg.getMsg());
FINISHMAPPER.put(taskId, logMsg.getIsFinished());
}
... ... @@ -63,7 +62,14 @@ public class LogService {
logMsg.setTaskId(taskId);
logMsg.setMsgList(logList);
String str = "";
for (String log : logList) {
str = str + log;
}
logMsg.setMsg(str);
if (!FINISHMAPPER.containsKey(taskId)) {
logMsg.setIsFinished(0);
... ...
... ... @@ -10,6 +10,8 @@ import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.FixedLengthFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
... ... @@ -49,6 +51,10 @@ public class NIOService {
@Override
public void initChannel(SocketChannel ch)
throws Exception {
ch.pipeline().addLast(new FixedLengthFrameDecoder(1024));
ch.pipeline().addLast(new StringDecoder());
// 注册handler
ch.pipeline().addLast(new LogMsgHandler());
}
... ...
... ... @@ -138,17 +138,7 @@ public class NginxSyncService {
String str = StringUtils.EMPTY;
if (null != logMsg.getMsgList()) {
for (String logItem : logMsg.getMsgList()) {
if (StringUtils.isEmpty(logItem)) {
continue;
}
str += "\r\n" + logItem;
}
}
logMsgResp.setLog(str);
logMsgResp.setLog(logMsg.getMsg());
return logMsgResp;
}
... ...