Authored by LUOXC

添加价格校验

... ... @@ -55,6 +55,7 @@ import java.math.BigDecimal;
import java.util.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.Predicate;
import java.util.stream.Collectors;
@RestController
... ... @@ -601,14 +602,35 @@ public class ProductController {
private void clearBatchProductCache(List<Integer> skupList) {
try {
LOG.info("in clearBatchProductCache skupList = {}", skupList);
List<StoragePrice> spList = productService.getStoragePriceBySkupList(skupList);
List<Integer> productIdList = spList.stream().map(StoragePrice::getProductId).distinct().collect(Collectors.toList());
List<Integer> storageIdList = spList.stream().map(StoragePrice::getStorageId).distinct().collect(Collectors.toList());
List<StoragePrice> storagePriceList = productService.getStoragePriceBySkupList(skupList);
List<Integer> productIdList = storagePriceList.stream().map(StoragePrice::getProductId).distinct().collect(Collectors.toList());
List<Integer> storageIdList = storagePriceList.stream().map(StoragePrice::getStorageId).distinct().collect(Collectors.toList());
if (CollectionUtils.isNotEmpty(productIdList)) {
for (Integer productId : productIdList) {
LOG.info("Batch clearCache queryProductDetailById productId = {}, ", productId);
storagePriceService.publishPriceUpdateEvent(productId);
storagePriceService.publishPriceUpdateEventIf(productId, oldPrices -> {
// 缓存不存在,需要更新
if (CollectionUtils.isEmpty(oldPrices)) {
return true;
}
// 比最低价要低,需要更新
if(storagePriceList.stream()
.filter(e -> Objects.equals(e.getProductId(), productId))
.anyMatch(newPrice -> oldPrices.stream()
.filter(oldPrice -> Objects.equals(newPrice.getStorageId(), oldPrice.getStorageId()))
.anyMatch(oldPrice -> newPrice.getPrice().compareTo(oldPrice.getPrice()) < 0))){
return true;
}
// 如果当前skup为缓存中商品,需要更新
if(storagePriceList.stream()
.filter(e -> Objects.equals(e.getProductId(), productId))
.anyMatch(newPrice -> oldPrices.stream()
.anyMatch(oldPrice -> Objects.equals(newPrice.getSkup(), oldPrice.getSkup())))){
return true;
}
return false;
});
//商品详情
cacheAop.clearCache(
... ... @@ -659,7 +681,7 @@ public class ProductController {
}
}
Map<String, StoragePrice> distinctStorage = new HashMap<>();
for(StoragePrice sp : spList) {
for(StoragePrice sp : storagePriceList) {
Integer skup = sp.getSkup();
Integer storageId = sp.getStorageId();
if (sp.getPreSaleFlag() != null && (sp.getPreSaleFlag() == 5 || sp.getPreSaleFlag() == 6)) {
... ...
... ... @@ -13,11 +13,11 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.web.bind.annotation.RequestParam;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;
/**
... ... @@ -97,6 +97,13 @@ public class StoragePriceService {
return true;
}
public void publishPriceUpdateEventIf(Integer productId, Predicate<List<StoragePrice>> predicate) {
List<StoragePrice> sizePriceCacheList = productCacheService.getListCacheByString(UfoProductCacheKeyEnum.STORAGE_PRICE_IN_STOCK_INFO_KEY, StoragePrice.class, productId);
if (predicate.test(sizePriceCacheList)) {
publishPriceUpdateEvent(productId);
}
}
public void publishPriceUpdateEvent(Integer productId) {
LOGGER.info("method com.yohoufo.product.service.impl.StoragePriceService.publishPriceUpdateEvent in productId is 【{}】", productId);
... ...
package com.yohoufo.product.util;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
public class ThreadPoolFactory {
private final static ExecutorService cacheCleanExecutorService;
static {
int numberOfProcessorsAvailable = Runtime.getRuntime().availableProcessors();
cacheCleanExecutorService = new ThreadPoolExecutor(
numberOfProcessorsAvailable,
numberOfProcessorsAvailable * 2,
60, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(100),
NamedThreadFactory.newThreadFactory("cache-clean"),
new ThreadPoolExecutor.CallerRunsPolicy()
);
}
public static ExecutorService cacheCleanExecutorService() {
return cacheCleanExecutorService;
}
public static class NamedThreadFactory implements ThreadFactory {
private static final AtomicInteger poolNumber = new AtomicInteger(1);
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix;
private final ThreadGroup group;
public static ThreadFactory newThreadFactory(String bizName) {
return new NamedThreadFactory(bizName);
}
private NamedThreadFactory(String bizName) {
SecurityManager s = System.getSecurityManager();
group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup();
namePrefix = "product-pool-" + bizName + "-processor-" + poolNumber.getAndIncrement() + "-thread-";
}
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0);
if (t.isDaemon())
t.setDaemon(true);
if (t.getPriority() != Thread.NORM_PRIORITY)
t.setPriority(Thread.NORM_PRIORITY);
return t;
}
}
}
... ...