...
|
...
|
@@ -8,6 +8,7 @@ import com.yoho.error.event.SearchLogsEvent; |
|
|
import com.yoho.search.base.monitor.PerformanceMonitor;
|
|
|
import com.yoho.search.base.utils.ISearchConstants;
|
|
|
import com.yoho.search.base.utils.MoudleEnum;
|
|
|
import com.yoho.search.consumer.common.ConsumerDynamicConfigService;
|
|
|
import com.yoho.search.consumer.common.YohoConcurrentLRUHashMap;
|
|
|
import com.yoho.search.consumer.index.increment.bulks.CommonBulkService;
|
|
|
import com.yoho.search.consumer.index.rebuild.RebuildFlagService;
|
...
|
...
|
@@ -55,6 +56,8 @@ public abstract class AbstractMqListener implements ApplicationEventPublisherAwa |
|
|
private TableConfigLoader tableConfigLoader;
|
|
|
@Autowired
|
|
|
private SearchMessageHelper searchMessageHelper;
|
|
|
@Autowired
|
|
|
private ConsumerDynamicConfigService consumerDynamicConfigService;
|
|
|
|
|
|
protected ApplicationEventPublisher publisher;
|
|
|
|
...
|
...
|
@@ -68,11 +71,13 @@ public abstract class AbstractMqListener implements ApplicationEventPublisherAwa |
|
|
*/
|
|
|
@Override
|
|
|
public void onMessage(Message message) {
|
|
|
limiter.acquire();
|
|
|
if (ignoreIncrease) {
|
|
|
logger.warn("be ignoreIncreasing ,please close the flag after successed ");
|
|
|
return;
|
|
|
}
|
|
|
if(consumerDynamicConfigService.increaseRateLimiterOpen()){
|
|
|
limiter.acquire();
|
|
|
}
|
|
|
try {
|
|
|
rebuildFlagService.waitingRebuildingIndex();
|
|
|
this.doBeforeConsume();
|
...
|
...
|
|