Authored by markeloff

kafka消息类型String改为byte

... ... @@ -4,8 +4,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Sets;
import com.yoho.trace.handler.TraceHandlerStarter;
import com.yoho.trace.sleuth.Spans;
import kafka.serializer.StringDecoder;
import org.apache.commons.lang3.StringUtils;
import kafka.serializer.DefaultDecoder;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.Function0;
import org.apache.spark.streaming.Durations;
... ... @@ -55,19 +54,19 @@ public class TraceExecutor {
kafkaParamMap.put("fetch.message.max.bytes", "104857600");
kafkaParamMap.put("group.id", kafkaPro.getGroup());
HashSet<String> topics = Sets.newHashSet(kafkaPro.getTopic());
JavaPairInputDStream<String, String> kafkaPairInputStream = KafkaUtils.createDirectStream(streamingContext,
String.class,
String.class,
StringDecoder.class,
StringDecoder.class,
JavaPairInputDStream<byte[], byte[]> kafkaPairInputStream = KafkaUtils.createDirectStream(streamingContext,
byte[].class,
byte[].class,
DefaultDecoder.class,
DefaultDecoder.class,
kafkaParamMap,
topics);
JavaDStream<Spans> kafkaMsgDStream = kafkaPairInputStream.mapPartitions(msgIterator -> {
List<Spans> messages = new ArrayList<>();
ObjectMapper jacksonMapper = new ObjectMapper();
while (msgIterator.hasNext()) {
Tuple2<String, String> msgTuple2 = msgIterator.next();
if (StringUtils.isNoneBlank(msgTuple2._2())) {
Tuple2<byte[], byte[]> msgTuple2 = msgIterator.next();
if (msgTuple2._2() != null && msgTuple2._2().length > 0) {
Spans spans = null;
try {
spans = jacksonMapper.readValue(msgTuple2._2(), Spans.class);
... ...
package com.yoho.trace.handler;
import com.yoho.trace.sleuth.Spans;
import com.yoho.trace.sleuth.TraceStoreUtils;
import com.yoho.trace.sleuth.util.TraceStoreUtils;
import org.apache.spark.streaming.api.java.JavaDStream;
/**
... ...
... ... @@ -16,6 +16,7 @@
package com.yoho.trace.sleuth;
import com.yoho.trace.sleuth.util.StringUtils;
import org.apache.commons.collections.CollectionUtils;
import java.util.Collection;
... ...
... ... @@ -15,6 +15,7 @@
*/
package com.yoho.trace.sleuth;
import com.yoho.trace.sleuth.util.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import zipkin.BinaryAnnotation;
... ...
... ... @@ -19,6 +19,7 @@ package com.yoho.trace.sleuth;
import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.yoho.trace.sleuth.util.StringUtils;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
... ...
... ... @@ -14,7 +14,7 @@
* limitations under the License.
*/
package com.yoho.trace.sleuth;
package com.yoho.trace.sleuth.util;
import org.apache.commons.collections.CollectionUtils;
... ...
package com.yoho.trace.sleuth;
package com.yoho.trace.sleuth.util;
import com.yoho.trace.sleuth.Spans;
import com.yoho.trace.sleuth.ZipkinESStoreConfiguration;
import com.yoho.trace.sleuth.ZipkinMessageListener;
import org.apache.commons.collections.CollectionUtils;
/**
... ...