Authored by LUOXC

Merge branch 'dev-monitor' into test6.8.5

package com.yoho.order.dal;
import org.apache.ibatis.annotations.Param;
import java.util.List;
/**
* @author LUOXC
* @date 2019/1/7 11:04
*/
public interface OrderStatusMonitorMapper {
List<Long> selectBuyerOrderCodeForBuyerOrderWithoutPaidTimeoutCancelFail(@Param("startTime") long startTime, @Param("endTime") long endTime);
}
... ...
<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" >
<mapper namespace="com.yoho.order.dal.OrderStatusMonitorMapper">
<select id="selectBuyerOrderCodeForBuyerOrderWithoutPaidTimeoutCancelFail" resultType="java.lang.Long">
SELECT
order_code
FROM
buyer_order
WHERE
create_time > #{startTime}
AND create_time <![CDATA[<= ]]> #{endTime}
AND STATUS = 0
</select>
</mapper>
\ No newline at end of file
... ...
... ... @@ -42,6 +42,12 @@
<groupId>com.yoho.core</groupId>
<artifactId>yoho-core-rest-client-simple</artifactId>
</dependency>
<!-- https://mvnrepository.com/artifact/io.projectreactor/reactor-core -->
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
<version>3.2.3.RELEASE</version>
</dependency>
</dependencies>
</project>
... ...
package com.yoho.ufo.order.monitor;
import com.yoho.core.redis.cluster.annotation.Redis;
import com.yoho.core.redis.cluster.operations.nosync.YHHashOperations;
import com.yoho.core.redis.cluster.operations.nosync.YHValueOperations;
import com.yoho.core.redis.cluster.operations.serializer.RedisKeyBuilder;
import com.yoho.order.dal.OrderStatusMonitorMapper;
import com.yoho.quartz.annotation.JobType;
import com.yoho.quartz.annotation.MisfiredPolicy;
import com.yoho.quartz.annotation.YhJobDef;
import com.yoho.quartz.domain.JobProcessResult;
import com.yoho.quartz.domain.JobResultCode;
import com.yoho.quartz.job.YhJob;
import lombok.extern.slf4j.Slf4j;
import lombok.val;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.math.NumberUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import reactor.core.publisher.Mono;
import reactor.util.function.Tuple2;
import reactor.util.function.Tuples;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.temporal.ChronoUnit;
import java.util.concurrent.TimeUnit;
/**
* @author LUOXC
* @date 2019/1/7 11:02
*/
@Component
@Slf4j
@YhJobDef(desc = "卖家未支付订单超时取消监控",
jobName = "BuyerOrderWithoutPaidTimeoutCancelMonitor",
cron = "0 0/5 * * * ?",
misfiredPolicy = MisfiredPolicy.CRON_DO_NOTHING,
jobType = JobType.CRON,
jobGroup = "ufoPlatform",
needUpdate = true)
public class BuyerOrderWithoutPaidTimeoutCancelMonitor implements YhJob {
@Redis("yohoNoSyncRedis")
private YHValueOperations yhValueOperations;
@Redis("yohoNoSyncRedis")
private YHHashOperations hashOperations;
@Autowired
private OrderStatusMonitorMapper orderStatusMonitorMapper;
private RedisKeyBuilder lastMonitorEndTimeKey = RedisKeyBuilder.newInstance()
.appendFixed("ufo:platform:order:monitor:buyerOrderWithoutPaidTimeoutCancelLastMonitorEndTime");
private RedisKeyBuilder alertKey = RedisKeyBuilder.newInstance()
.appendFixed("ufo:platform:order:monitor:buyerOrderWithoutPaidTimeoutCancelAlert");
@Override
public JobProcessResult process(String s) {
monitor();
JobProcessResult result = new JobProcessResult();
result.setJobResultCode(JobResultCode.SUCCESS);
return result;
}
private void monitor() {
Mono.zip(getLastMonitorEndTime(), getThisMonitorEndTime())
.doOnNext(this::cacheThisMonitorEndTime)
.flatMapIterable(monitorTime ->
orderStatusMonitorMapper.selectBuyerOrderCodeForBuyerOrderWithoutPaidTimeoutCancelFail(monitorTime.getT1(), monitorTime.getT2())
)
.filter(this::cacheIfNotAlert)
.map(orderCode -> Tuples.of("卖家未支付订单超时取消异常", "订单号" + orderCode))
.subscribe(
message -> alert(message),
e -> log.warn("Buyer order without paid timeout cancel monitor fail", e),
() -> log.info("Buyer order without paid timeout cancel monitor complete")
);
}
private Mono<Long> getLastMonitorEndTime() {
return Mono.fromSupplier(() -> yhValueOperations.get(lastMonitorEndTimeKey))
.doOnError(e -> log.warn("get last monitor end time fail", e))
.onErrorReturn(StringUtils.EMPTY)
.map(NumberUtils::toLong);
}
private Mono<Long> getThisMonitorEndTime() {
return Mono.just(Instant.now())
.map(now -> now.minus(20, ChronoUnit.MINUTES).toEpochMilli());
}
private void cacheThisMonitorEndTime(Tuple2<Long, Long> monitorTime) {
try {
yhValueOperations.set(lastMonitorEndTimeKey, String.valueOf(monitorTime.getT1()), 20, TimeUnit.MINUTES);
} catch (Exception e) {
log.warn("cache this monitor end time fail", e);
}
}
private Boolean cacheIfNotAlert(Long orderCode) {
try {
return hashOperations.putIfAbsent(
alertKey,
orderCode.toString(),
LocalDateTime.now().toString());
} catch (Exception e) {
log.warn("cache if not alert fail for {}", orderCode, e);
return true;
}
}
private void alert(Tuple2<String, String> message) {
val title = message.getT1();
val body = message.getT2();
log.warn("Buyer order without paid timeout cancel fail, title:{},body:", title, body);
}
}
... ...
package com.reactorcode.test;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.math.NumberUtils;
import org.junit.Test;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
/**
* @author LUOXC
* @date 2019/1/7 19:02
*/
public class ReactorCoreTests {
@Test
public void testOnErrorReturn() {
Mono.just("3")
.map(this::mapToNull)
.onErrorReturn(StringUtils.EMPTY)
.map(NumberUtils::toLong)
.subscribe(System.out::println, System.err::println);
}
private String mapToNull(String str){
throw new RuntimeException();
}
}
... ...
... ... @@ -38,6 +38,7 @@ datasources:
- com.yoho.order.dal.QiniuLiveRecordMapper
- com.yoho.order.dal.CameraRecordMapper
- com.yoho.order.dal.UserCameraRecordMapper
- com.yoho.order.dal.OrderStatusMonitorMapper
ufo_resource:
servers:
... ...
... ... @@ -7,7 +7,7 @@ redis:
yohoNoSyncRedis :
servers:
- 192.168.102.45:6379
auth: redis9646
auth: redis9646
unionRedisTemplate :
servers:
... ...
... ... @@ -38,6 +38,7 @@ datasources:
- com.yoho.order.dal.QiniuLiveRecordMapper
- com.yoho.order.dal.CameraRecordMapper
- com.yoho.order.dal.UserCameraRecordMapper
- com.yoho.order.dal.OrderStatusMonitorMapper
ufo_resource:
servers:
... ...