1
|
package com.yoho.search.consumer.index.increment;
|
1
|
package com.yoho.search.consumer.index.increment;
|
2
|
|
2
|
|
3
|
-import java.util.ArrayList;
|
|
|
4
|
-import java.util.HashMap;
|
|
|
5
|
-import java.util.List;
|
|
|
6
|
-import java.util.Map;
|
|
|
7
|
-import java.util.UUID;
|
|
|
8
|
-
|
|
|
9
|
-import org.apache.commons.lang.StringUtils;
|
|
|
10
|
-import org.slf4j.Logger;
|
|
|
11
|
-import org.slf4j.LoggerFactory;
|
|
|
12
|
-import org.springframework.amqp.core.Message;
|
|
|
13
|
-import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
|
|
|
14
|
-import org.springframework.beans.factory.annotation.Autowired;
|
|
|
15
|
-import org.springframework.stereotype.Component;
|
|
|
16
|
-
|
|
|
17
|
import com.alibaba.fastjson.JSONObject;
|
3
|
import com.alibaba.fastjson.JSONObject;
|
18
|
import com.rabbitmq.client.Channel;
|
4
|
import com.rabbitmq.client.Channel;
|
19
|
import com.yoho.error.event.SearchEvent;
|
5
|
import com.yoho.error.event.SearchEvent;
|
|
@@ -24,122 +10,177 @@ import com.yoho.search.consumer.common.CostStatistics; |
|
@@ -24,122 +10,177 @@ import com.yoho.search.consumer.common.CostStatistics; |
24
|
import com.yoho.search.consumer.index.increment.bulks.StorageSkuIndexBulkService;
|
10
|
import com.yoho.search.consumer.index.increment.bulks.StorageSkuIndexBulkService;
|
25
|
import com.yoho.search.consumer.index.increment.rule.AbstractStorageRelatedMqListener;
|
11
|
import com.yoho.search.consumer.index.increment.rule.AbstractStorageRelatedMqListener;
|
26
|
import com.yoho.search.consumer.service.base.GoodsService;
|
12
|
import com.yoho.search.consumer.service.base.GoodsService;
|
|
|
13
|
+import com.yoho.search.consumer.service.base.ProductService;
|
27
|
import com.yoho.search.consumer.service.bo.StorageSkuBO;
|
14
|
import com.yoho.search.consumer.service.bo.StorageSkuBO;
|
28
|
import com.yoho.search.consumer.service.logic.StorageSkuLogicService;
|
15
|
import com.yoho.search.consumer.service.logic.StorageSkuLogicService;
|
29
|
import com.yoho.search.core.es.utils.IgnoreSomeException;
|
16
|
import com.yoho.search.core.es.utils.IgnoreSomeException;
|
30
|
import com.yoho.search.dal.model.Goods;
|
17
|
import com.yoho.search.dal.model.Goods;
|
|
|
18
|
+import com.yoho.search.dal.model.Product;
|
|
|
19
|
+import org.apache.commons.collections.CollectionUtils;
|
|
|
20
|
+import org.apache.commons.lang.StringUtils;
|
|
|
21
|
+import org.slf4j.Logger;
|
|
|
22
|
+import org.slf4j.LoggerFactory;
|
|
|
23
|
+import org.springframework.amqp.core.Message;
|
|
|
24
|
+import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
|
|
|
25
|
+import org.springframework.beans.factory.annotation.Autowired;
|
|
|
26
|
+import org.springframework.stereotype.Component;
|
|
|
27
|
+
|
|
|
28
|
+import java.util.*;
|
31
|
|
29
|
|
32
|
@Component
|
30
|
@Component
|
33
|
public class GoodsMqListener extends AbstractStorageRelatedMqListener implements ChannelAwareMessageListener {
|
31
|
public class GoodsMqListener extends AbstractStorageRelatedMqListener implements ChannelAwareMessageListener {
|
34
|
|
32
|
|
35
|
- private static final Logger logger = LoggerFactory.getLogger(GoodsMqListener.class);
|
|
|
36
|
- @Autowired
|
|
|
37
|
- private GoodsService goodsService;
|
|
|
38
|
- @Autowired
|
|
|
39
|
- private StorageSkuIndexBulkService storageSkuIndexService;
|
|
|
40
|
- @Autowired
|
|
|
41
|
- private StorageSkuLogicService storageSkuLogicService;
|
|
|
42
|
-
|
|
|
43
|
- public void onMessage(Message message, Channel channel) throws Exception {
|
|
|
44
|
- try {
|
|
|
45
|
- final String key = UUID.randomUUID().toString();
|
|
|
46
|
- String messageStr = new String(message.getBody(), "UTF-8");
|
|
|
47
|
- logger.info("[model=GoodsMqListener][key={}][message={}]", key, messageStr);
|
|
|
48
|
- // 如果在重建索引等待
|
|
|
49
|
- this.waitingRebuildingIndex();
|
|
|
50
|
- JSONObject json = JSONObject.parseObject(messageStr);
|
|
|
51
|
- if (ISearchConstants.ACTION_DELETE.equals(json.getString("action"))) {
|
|
|
52
|
- deleteData(json.getString("data"), key);
|
|
|
53
|
- } else if (ISearchConstants.ACTION_UPDATE.equals(json.getString("action"))) {
|
|
|
54
|
- updateData(json.getObject("data", Map.class), key);
|
|
|
55
|
- } else {
|
|
|
56
|
- updateData(json.getObject("data", Map.class), key);
|
|
|
57
|
- }
|
|
|
58
|
- } catch (Exception e) {
|
|
|
59
|
- publisher.publishEvent(new SearchEvent(EventReportEnum.GOODSMQLISTENER_ONMESSAGE.getEventName(), EventReportEnum.GOODSMQLISTENER_ONMESSAGE.getFunctionName(),
|
|
|
60
|
- EventReportEnum.GOODSMQLISTENER_ONMESSAGE.getMoudleName(), "exception", IgnoreSomeException.filterSomeException(e), null));
|
|
|
61
|
- Thread.sleep(1000);
|
|
|
62
|
- throw e;
|
|
|
63
|
- }
|
|
|
64
|
- }
|
|
|
65
|
-
|
|
|
66
|
- public void deleteData(final String id, final String key) {
|
|
|
67
|
- if (StringUtils.isBlank(id)) {
|
|
|
68
|
- return;
|
|
|
69
|
- }
|
|
|
70
|
- long begin = System.currentTimeMillis();
|
|
|
71
|
- Goods goods = goodsService.getById(Integer.parseInt(id));
|
|
|
72
|
- if (goods == null || goods.getId() == null) {
|
|
|
73
|
- return;
|
|
|
74
|
- }
|
|
|
75
|
- // 1、获取StorageSku
|
|
|
76
|
- List<Integer> goodsIds = new ArrayList<>();
|
|
|
77
|
- goodsIds.add(goods.getId());
|
|
|
78
|
- List<StorageSkuBO> storageSkuBOList = storageSkuLogicService.getStorageSkuIndexBySkcs(goodsIds);
|
|
|
79
|
-
|
|
|
80
|
- // 2、删除数据库数据
|
|
|
81
|
- goodsService.delete(Integer.valueOf(id));
|
|
|
82
|
-
|
|
|
83
|
- // 3、更新productIndex索引
|
|
|
84
|
- if (goods != null && goods.getProductId() != null) {
|
|
|
85
|
- updateIndexNew(goods.getProductId(), begin, key);
|
|
|
86
|
- }
|
|
|
87
|
-
|
|
|
88
|
- // 4、更新storageSku索引
|
|
|
89
|
- for (StorageSkuBO storageSkuBO : storageSkuBOList) {
|
|
|
90
|
- storageSkuBO.setGoodsStatus(0);
|
|
|
91
|
- }
|
|
|
92
|
- storageSkuIndexService.updateStorageSkuIndexByList(storageSkuBOList);
|
|
|
93
|
-
|
|
|
94
|
- logger.info("[func=deleteData][step=success][key={}][indexName={}][id={}][cost={}ms]", key, ISearchConstants.INDEX_NAME_PRODUCT_INDEX, id,
|
|
|
95
|
- (System.currentTimeMillis() - begin));
|
|
|
96
|
- }
|
|
|
97
|
-
|
|
|
98
|
- @SuppressWarnings({ "rawtypes", "unchecked" })
|
|
|
99
|
- public void updateData(final Map data, final String key) {
|
|
|
100
|
- CostStatistics costStatistics = new CostStatistics();
|
|
|
101
|
- // 1.对象转义
|
|
|
102
|
- Goods goods = new Goods();
|
|
|
103
|
- goods = (Goods) ConvertUtils.toJavaBean(goods, data);
|
|
|
104
|
- if (goods == null || goods.getId() == null) {
|
|
|
105
|
- return;
|
|
|
106
|
- }
|
|
|
107
|
- Integer productId = goods.getProductId();
|
|
|
108
|
- if (productId == null) {
|
|
|
109
|
- return;
|
|
|
110
|
- }
|
|
|
111
|
- logger.info("[func=updateData][step1=covertFromMap][goodId={}][cost={}ms]", goods.getId(), costStatistics.getCost());
|
|
|
112
|
-
|
|
|
113
|
- // 2.保存到数据库
|
|
|
114
|
- goodsService.saveOrUpdate(goods);
|
|
|
115
|
- logger.info("[func=updateData][step2=saveToBb][goodId={}][cost={}ms]", goods.getId(), costStatistics.getCost());
|
|
|
116
|
-
|
|
|
117
|
- // 3.更新到ES
|
|
|
118
|
- updateIndexNew(productId, System.currentTimeMillis(), key);
|
|
|
119
|
- logger.info("[func=updateData][step3=updateProductIndex][goodId={}][cost={}ms]", goods.getId(), costStatistics.getCost());
|
|
|
120
|
-
|
|
|
121
|
- // 4.更新storagesku
|
|
|
122
|
- storageSkuIndexService.updateStorageSkuIndexByGoodsId(goods.getId(), System.currentTimeMillis(), key);
|
|
|
123
|
- logger.info("[func=updateData][step4=updateStorageSkuIndex][goodId={}][cost={}ms]", goods.getId(), costStatistics.getCost());
|
|
|
124
|
-
|
|
|
125
|
- logger.info("[func=updateData][step5=success][goodId={}][totalCost={}ms][costStatistics={}]", goods.getId(), costStatistics.getTotalCost(),
|
|
|
126
|
- costStatistics.getCostStatisticsString());
|
|
|
127
|
- }
|
|
|
128
|
-
|
|
|
129
|
- private void updateIndexNew(Integer productId, long begin, final String key) {
|
|
|
130
|
- CostStatistics costStatistics = new CostStatistics();
|
|
|
131
|
- Map<String, Object> indexData = new HashMap<String, Object>();
|
|
|
132
|
- logger.info("[model=GoodsMqListener_UpdateIndexNew][step=getGoodsListByProductId][productId={}][cost={}]", productId, costStatistics.getCost());
|
|
|
133
|
- //第一步:获取goodsList,组装goods数据
|
|
|
134
|
- fillColor(indexData,productId);
|
|
|
135
|
- logger.info("[model=GoodsMqListener_UpdateIndexNew][step=genGoodsList][productId={}][cost={}]", productId, costStatistics.getCost());
|
|
|
136
|
-
|
|
|
137
|
- // 第二步、组装库存数据
|
|
|
138
|
- this.fillStorageNumAndSizeInfo(productId, indexData);
|
|
|
139
|
- logger.info("[model=GoodsMqListener_UpdateIndexNew][step=fillProductStorage][productId={}][cost={}]", productId, costStatistics.getCost());
|
|
|
140
|
-
|
|
|
141
|
- // 第三步:更新索引
|
|
|
142
|
- this.updateProductIndexWithDataMap(indexData, productId, key, begin);
|
|
|
143
|
- logger.info("[model=GoodsMqListener_UpdateIndexNew][step=updateProductIndexWithDataMap][productId={}][cost={}]", productId, costStatistics.getCost());
|
|
|
144
|
- }
|
33
|
+ private static final Logger logger = LoggerFactory.getLogger(GoodsMqListener.class);
|
|
|
34
|
+ @Autowired
|
|
|
35
|
+ private GoodsService goodsService;
|
|
|
36
|
+ @Autowired
|
|
|
37
|
+ private StorageSkuIndexBulkService storageSkuIndexService;
|
|
|
38
|
+ @Autowired
|
|
|
39
|
+ private StorageSkuLogicService storageSkuLogicService;
|
|
|
40
|
+ @Autowired
|
|
|
41
|
+ private ProductService productService;
|
|
|
42
|
+
|
|
|
43
|
+ public void onMessage(Message message, Channel channel) throws Exception {
|
|
|
44
|
+ try {
|
|
|
45
|
+ final String key = UUID.randomUUID().toString();
|
|
|
46
|
+ String messageStr = new String(message.getBody(), "UTF-8");
|
|
|
47
|
+ logger.info("[model=GoodsMqListener][key={}][message={}]", key, messageStr);
|
|
|
48
|
+ // 如果在重建索引等待
|
|
|
49
|
+ this.waitingRebuildingIndex();
|
|
|
50
|
+ JSONObject json = JSONObject.parseObject(messageStr);
|
|
|
51
|
+ if (ISearchConstants.ACTION_DELETE.equals(json.getString("action"))) {
|
|
|
52
|
+ deleteData(json.getString("data"), key);
|
|
|
53
|
+ } else if (ISearchConstants.ACTION_UPDATE.equals(json.getString("action"))) {
|
|
|
54
|
+ updateData(json.getObject("data", Map.class), key);
|
|
|
55
|
+ } else {
|
|
|
56
|
+ updateData(json.getObject("data", Map.class), key);
|
|
|
57
|
+ }
|
|
|
58
|
+ } catch (Exception e) {
|
|
|
59
|
+ publisher.publishEvent(new SearchEvent(EventReportEnum.GOODSMQLISTENER_ONMESSAGE.getEventName(), EventReportEnum.GOODSMQLISTENER_ONMESSAGE.getFunctionName(),
|
|
|
60
|
+ EventReportEnum.GOODSMQLISTENER_ONMESSAGE.getMoudleName(), "exception", IgnoreSomeException.filterSomeException(e), null));
|
|
|
61
|
+ Thread.sleep(1000);
|
|
|
62
|
+ throw e;
|
|
|
63
|
+ }
|
|
|
64
|
+ }
|
|
|
65
|
+
|
|
|
66
|
+ public void deleteData(final String id, final String key) {
|
|
|
67
|
+ if (StringUtils.isBlank(id)) {
|
|
|
68
|
+ return;
|
|
|
69
|
+ }
|
|
|
70
|
+ long begin = System.currentTimeMillis();
|
|
|
71
|
+ Goods goods = goodsService.getById(Integer.parseInt(id));
|
|
|
72
|
+ if (goods == null || goods.getId() == null) {
|
|
|
73
|
+ return;
|
|
|
74
|
+ }
|
|
|
75
|
+ // 1、获取StorageSku
|
|
|
76
|
+ List<Integer> goodsIds = new ArrayList<>();
|
|
|
77
|
+ goodsIds.add(goods.getId());
|
|
|
78
|
+ List<StorageSkuBO> storageSkuBOList = storageSkuLogicService.getStorageSkuIndexBySkcs(goodsIds);
|
|
|
79
|
+
|
|
|
80
|
+ //判断skn状态
|
|
|
81
|
+ Map<String, Object> indexData = new HashMap<String, Object>();
|
|
|
82
|
+ Integer productId = goods.getProductId();
|
|
|
83
|
+ this.fillSknStatus(productId, indexData);
|
|
|
84
|
+ this.updateProductIndexWithDataMap(indexData, productId, key, begin);
|
|
|
85
|
+
|
|
|
86
|
+ // 2、删除数据库数据
|
|
|
87
|
+ goodsService.delete(Integer.valueOf(id));
|
|
|
88
|
+
|
|
|
89
|
+ // 3、更新productIndex索引
|
|
|
90
|
+ if (goods != null && goods.getProductId() != null) {
|
|
|
91
|
+ updateIndexNew(goods.getProductId(), begin, key);
|
|
|
92
|
+ }
|
|
|
93
|
+
|
|
|
94
|
+ // 4、更新storageSku索引
|
|
|
95
|
+ for (StorageSkuBO storageSkuBO : storageSkuBOList) {
|
|
|
96
|
+ storageSkuBO.setGoodsStatus(0);
|
|
|
97
|
+ }
|
|
|
98
|
+ storageSkuIndexService.updateStorageSkuIndexByList(storageSkuBOList);
|
|
|
99
|
+
|
|
|
100
|
+ logger.info("[func=deleteData][step=success][key={}][indexName={}][id={}][cost={}ms]", key, ISearchConstants.INDEX_NAME_PRODUCT_INDEX, id,
|
|
|
101
|
+ (System.currentTimeMillis() - begin));
|
|
|
102
|
+ }
|
|
|
103
|
+
|
|
|
104
|
+ @SuppressWarnings({"rawtypes", "unchecked"})
|
|
|
105
|
+ public void updateData(final Map data, final String key) {
|
|
|
106
|
+ CostStatistics costStatistics = new CostStatistics();
|
|
|
107
|
+ // 1.对象转义
|
|
|
108
|
+ Goods goods = new Goods();
|
|
|
109
|
+ goods = (Goods) ConvertUtils.toJavaBean(goods, data);
|
|
|
110
|
+ if (goods == null || goods.getId() == null) {
|
|
|
111
|
+ return;
|
|
|
112
|
+ }
|
|
|
113
|
+ Integer productId = goods.getProductId();
|
|
|
114
|
+ if (productId == null) {
|
|
|
115
|
+ return;
|
|
|
116
|
+ }
|
|
|
117
|
+ logger.info("[func=updateData][step1=covertFromMap][goodId={}][cost={}ms]", goods.getId(), costStatistics.getCost());
|
|
|
118
|
+
|
|
|
119
|
+ // 2.保存到数据库
|
|
|
120
|
+ goodsService.saveOrUpdate(goods);
|
|
|
121
|
+ logger.info("[func=updateData][step2=saveToBb][goodId={}][cost={}ms]", goods.getId(), costStatistics.getCost());
|
|
|
122
|
+
|
|
|
123
|
+ // 3.更新到ES
|
|
|
124
|
+ updateIndexNew(productId, System.currentTimeMillis(), key);
|
|
|
125
|
+ logger.info("[func=updateData][step3=updateProductIndex][goodId={}][cost={}ms]", goods.getId(), costStatistics.getCost());
|
|
|
126
|
+
|
|
|
127
|
+ // 4.更新storagesku
|
|
|
128
|
+ storageSkuIndexService.updateStorageSkuIndexByGoodsId(goods.getId(), System.currentTimeMillis(), key);
|
|
|
129
|
+ logger.info("[func=updateData][step4=updateStorageSkuIndex][goodId={}][cost={}ms]", goods.getId(), costStatistics.getCost());
|
|
|
130
|
+
|
|
|
131
|
+ logger.info("[func=updateData][step5=success][goodId={}][totalCost={}ms][costStatistics={}]", goods.getId(), costStatistics.getTotalCost(),
|
|
|
132
|
+ costStatistics.getCostStatisticsString());
|
|
|
133
|
+ }
|
|
|
134
|
+
|
|
|
135
|
+ private void updateIndexNew(Integer productId, long begin, final String key) {
|
|
|
136
|
+ CostStatistics costStatistics = new CostStatistics();
|
|
|
137
|
+ Map<String, Object> indexData = new HashMap<String, Object>();
|
|
|
138
|
+ logger.info("[model=GoodsMqListener_UpdateIndexNew][step=getGoodsListByProductId][productId={}][cost={}]", productId, costStatistics.getCost());
|
|
|
139
|
+ //第一步:获取goodsList,组装goods数据
|
|
|
140
|
+ fillColor(indexData, productId);
|
|
|
141
|
+ logger.info("[model=GoodsMqListener_UpdateIndexNew][step=genGoodsList][productId={}][cost={}]", productId, costStatistics.getCost());
|
|
|
142
|
+
|
|
|
143
|
+ // 第二步、组装库存数据
|
|
|
144
|
+ this.fillStorageNumAndSizeInfo(productId, indexData);
|
|
|
145
|
+ logger.info("[model=GoodsMqListener_UpdateIndexNew][step=fillProductStorage][productId={}][cost={}]", productId, costStatistics.getCost());
|
|
|
146
|
+
|
|
|
147
|
+ // 第三步,判断skn状态
|
|
|
148
|
+ this.fillSknStatus(productId, indexData);
|
|
|
149
|
+
|
|
|
150
|
+ logger.info("[model=GoodsMqListener_UpdateIndexNew][step=fillSknStatus][productId={}][cost={}]", productId, costStatistics.getCost());
|
|
|
151
|
+
|
|
|
152
|
+
|
|
|
153
|
+ // 第四步:更新索引
|
|
|
154
|
+ this.updateProductIndexWithDataMap(indexData, productId, key, begin);
|
|
|
155
|
+ logger.info("[model=GoodsMqListener_UpdateIndexNew][step=updateProductIndexWithDataMap][productId={}][cost={}]", productId, costStatistics.getCost());
|
|
|
156
|
+ }
|
|
|
157
|
+
|
|
|
158
|
+
|
|
|
159
|
+ /**
|
|
|
160
|
+ * 如果skn下面所有skc的状态都是0,那skn的状态设为0,更新和删除都需要验证
|
|
|
161
|
+ */
|
|
|
162
|
+ private void fillSknStatus(Integer productId, Map<String, Object> indexData) {
|
|
|
163
|
+ List<Integer> ids = new ArrayList<>();
|
|
|
164
|
+ ids.add(productId);
|
|
|
165
|
+ List<Goods> goodsList = goodsService.getListByProductId(ids);
|
|
|
166
|
+ if (CollectionUtils.isEmpty(goodsList)) {
|
|
|
167
|
+ return;
|
|
|
168
|
+ }
|
|
|
169
|
+ boolean sknInvalid = true;
|
|
|
170
|
+ for (Goods goods : goodsList) {
|
|
|
171
|
+ if (goods.getStatus() == 1) {
|
|
|
172
|
+ sknInvalid = false;
|
|
|
173
|
+ }
|
|
|
174
|
+ }
|
|
|
175
|
+ if (sknInvalid) {
|
|
|
176
|
+ indexData.put("status", 0);
|
|
|
177
|
+ } else {
|
|
|
178
|
+ //需要检查product表的status字段,如果为1才更新状态,为0则以product表的状态为准
|
|
|
179
|
+ Product product = productService.getById(productId);
|
|
|
180
|
+ if (product != null && product.getStatus() == 1) {
|
|
|
181
|
+ indexData.put("status", 1);
|
|
|
182
|
+ }
|
|
|
183
|
+ }
|
|
|
184
|
+ }
|
|
|
185
|
+
|
145
|
} |
186
|
} |