...
|
...
|
@@ -19,6 +19,7 @@ import properties.PropertiesFactory; |
|
|
import properties.SparkProperties;
|
|
|
import scala.Tuple2;
|
|
|
|
|
|
import java.nio.ByteBuffer;
|
|
|
import java.util.*;
|
|
|
|
|
|
/**
|
...
|
...
|
@@ -27,8 +28,8 @@ import java.util.*; |
|
|
public class TraceExecutor {
|
|
|
private static final Logger logger = LoggerFactory.getLogger(TraceExecutor.class);
|
|
|
|
|
|
private static final String CHECKPOINT_FS_DIR = "/spark/checkpoint/trace";
|
|
|
|
|
|
private static final String CHECKPOINT_FS_DIR = "/spark/checkpoint/trace";
|
|
|
// private static final String CHECKPOINT_FS_DIR = "/Users/apple/Documents/trace";
|
|
|
private static final String APP_NAME = "trace-analyzer";
|
|
|
|
|
|
public static void main(String[] args) {
|
...
|
...
|
@@ -46,6 +47,7 @@ public class TraceExecutor { |
|
|
|
|
|
private static JavaStreamingContext getStreamContext(final KafkaProperties kafkaPro, final SparkProperties sparkPro) {
|
|
|
SparkConf sparkConf = new SparkConf().setAppName(APP_NAME);
|
|
|
// SparkConf sparkConf = new SparkConf().setAppName(APP_NAME).setMaster("local[4]");
|
|
|
JavaStreamingContext streamingContext = new JavaStreamingContext(sparkConf,
|
|
|
Durations.seconds(sparkPro.getDuration()));
|
|
|
|
...
|
...
|
@@ -66,10 +68,14 @@ public class TraceExecutor { |
|
|
ObjectMapper jacksonMapper = new ObjectMapper();
|
|
|
while (msgIterator.hasNext()) {
|
|
|
Tuple2<byte[], byte[]> msgTuple2 = msgIterator.next();
|
|
|
if (msgTuple2._2() != null && msgTuple2._2().length > 0) {
|
|
|
byte[] newPayload =extractHeaders(msgTuple2._2());
|
|
|
if (logger.isDebugEnabled()) {
|
|
|
logger.debug("-------接受kafka数据===》{}", new String(newPayload));
|
|
|
}
|
|
|
if (newPayload != null && newPayload.length > 0) {
|
|
|
Spans spans = null;
|
|
|
try {
|
|
|
spans = jacksonMapper.readValue(msgTuple2._2(), Spans.class);
|
|
|
spans = jacksonMapper.readValue(newPayload, Spans.class);
|
|
|
} catch (Exception e) {
|
|
|
logger.error("{}", e);
|
|
|
}
|
...
|
...
|
@@ -99,4 +105,15 @@ public class TraceExecutor { |
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
private static byte[] extractHeaders(byte[] payload){
|
|
|
ByteBuffer byteBuffer = ByteBuffer.wrap(payload);
|
|
|
byteBuffer.get();
|
|
|
byteBuffer.get();
|
|
|
byte[] newPayload = new byte[byteBuffer.remaining()];
|
|
|
byteBuffer.get(newPayload);
|
|
|
return newPayload;
|
|
|
}
|
|
|
|
|
|
|
|
|
} |
...
|
...
|
|