Authored by all4you

update

... ... @@ -143,6 +143,16 @@ public class SocialUserIndexBuilder extends IIndexBuilder {
return dataList;
}
/**
* 获取变更的数据总量
* @return
* @throws Exception
*/
@Override
public int getUpdatedTotalCount() throws Exception{
// TODO
return 0;
}
/**
* 获取所有变更的数据
... ... @@ -150,7 +160,7 @@ public class SocialUserIndexBuilder extends IIndexBuilder {
* @throws Exception
*/
@Override
public List<?> getUpdatedLists() throws Exception{
public List<?> getUpdatedLists(int offset, int limit) throws Exception{
List<UserFollower> followers = socialUserDetailMapper.selectUserFollower(null);
List<SocialUser> socialUsers = new ArrayList<>();
// 合并socialUser的s_follower等属性
... ...
... ... @@ -190,26 +190,26 @@ public class YohoIndexDataLoader implements ApplicationContextAware {
result = result & futureResult.get();
}
List<?> dataList = indexBuilder.getUpdatedLists();
int resultSize = dataList != null ? dataList.size() : 0;
INDEX_REBUILD_LOG.info("loadUpdatedData [yohoIndexName=[{}]],[{resultSize={}]", yohoIndexName, resultSize);
if (resultSize > 0) {
performanceMonitor.addVisitCount();
long begin = System.currentTimeMillis();
INDEX_REBUILD_LOG.debug("[addDataToEs begin][yohoIndexName=[{}]],[selectCount={}][begin={}]", yohoIndexName, dataList.size(), begin);
List<ESBluk> bluks = new ArrayList<>(dataList.size());
for(Object ob: dataList){
bluks.add(new ESBluk(JSONObject.toJSONString(ob),indexBuilder.getId(ob),yohoIndexName,yohoIndexName,false));
}
BulkResponse bulkResponse = client.bulk(bluks);
if(bulkResponse.hasFailures()){
INDEX_REBUILD_LOG.error("addIndexDataBean has fail,[yohoIndexName=[%s]],[failureMessage=%s]", yohoIndexName,bulkResponse.buildFailureMessage());
throw new Exception(String.format("addIndexDataBean has fail,[yohoIndexName=[%s]],[failureMessage=%s]", yohoIndexName,bulkResponse.buildFailureMessage()));
}
long end = System.currentTimeMillis();
INDEX_REBUILD_LOG.debug("[addDataToEs begin][yohoIndexName=[{}]],[selectCount={}][cost={}ms]", yohoIndexName, dataList.size(), end-begin);
performanceMonitor.addCost(end-begin);
}
// List<?> dataList = indexBuilder.getUpdatedLists();
// int resultSize = dataList != null ? dataList.size() : 0;
// INDEX_REBUILD_LOG.info("loadUpdatedData [yohoIndexName=[{}]],[{resultSize={}]", yohoIndexName, resultSize);
// if (resultSize > 0) {
// performanceMonitor.addVisitCount();
// long begin = System.currentTimeMillis();
// INDEX_REBUILD_LOG.debug("[addDataToEs begin][yohoIndexName=[{}]],[selectCount={}][begin={}]", yohoIndexName, dataList.size(), begin);
// List<ESBluk> bluks = new ArrayList<>(dataList.size());
// for(Object ob: dataList){
// bluks.add(new ESBluk(JSONObject.toJSONString(ob),indexBuilder.getId(ob),yohoIndexName,yohoIndexName,false));
// }
// BulkResponse bulkResponse = client.bulk(bluks);
// if(bulkResponse.hasFailures()){
// INDEX_REBUILD_LOG.error("addIndexDataBean has fail,[yohoIndexName=[%s]],[failureMessage=%s]", yohoIndexName,bulkResponse.buildFailureMessage());
// throw new Exception(String.format("addIndexDataBean has fail,[yohoIndexName=[%s]],[failureMessage=%s]", yohoIndexName,bulkResponse.buildFailureMessage()));
// }
// long end = System.currentTimeMillis();
// INDEX_REBUILD_LOG.debug("[addDataToEs begin][yohoIndexName=[{}]],[selectCount={}][cost={}ms]", yohoIndexName, dataList.size(), end-begin);
// performanceMonitor.addCost(end-begin);
// }
return true;
} catch (Exception e) {
// 如果有异常,则处理一下,并等待30s执行下一次
... ...