Authored by all4you

update

... ... @@ -23,53 +23,63 @@ public class RunnableQueueFactory {
private ExecutorService executorService = Executors.newFixedThreadPool(10);
/**
* add task
* @param taskType
* @param task
* add a task
* @param type the queue type
* @param task the task
*/
public void addTask(String taskType, Runnable task){
BlockingQueue<Runnable> queue = queueMap.get(taskType);
public void addTask(String type, Runnable task){
BlockingQueue<Runnable> queue = queueMap.get(type);
if(queue==null){
queue = new ArrayBlockingQueue<>(MAX_QUEUE_SIZE);
putTask(queue,taskType,task);
queueMap.putIfAbsent(taskType,queue);
initQueue(taskType);
putTask(queue,type,task);
queueMap.putIfAbsent(type,queue);
initQueue(type);
}else{
putTask(queue,taskType,task);
putTask(queue,type,task);
}
}
private void putTask(BlockingQueue<Runnable> queue,String taskType,Runnable task){
/**
* put a task to the queue
* @param queue the queue
* @param type the queue type
* @param task the task
*/
private void putTask(BlockingQueue<Runnable> queue,String type,Runnable task){
try {
queue.put(task);
LOGGER.info("addTask success,with taskType=[{}]",taskType);
LOGGER.info("put a task with type=[{}] to queue successfully",type);
} catch (InterruptedException e) {
LOGGER.error("put task error,with taskType=[{}],cause:",taskType,e);
LOGGER.error("put a task with type=[{}] to queue error,cause:",type,e);
}
}
private void initQueue(String taskType){
/**
* init the queue
* @param type the queue type
*/
private void initQueue(String type){
executorService.execute(new Runnable() {
@Override
public void run() {
BlockingQueue<Runnable> queue = queueMap.get(taskType);
BlockingQueue<Runnable> queue = queueMap.get(type);
if(queue!=null){
LOGGER.info("initQueue success,with taskType=[{}]",taskType);
LOGGER.info("initQueue with type=[{}] successfully",type);
try {
for(;;) {
LOGGER.info("try to take a task from queue,with taskType=[{}]",taskType);
LOGGER.info("try to take a task with type=[{}] from queue",type);
Runnable task = queue.take();
if(task!=null) {
LOGGER.info("take a task success,with taskType=[{}],will run it directly",taskType);
LOGGER.info("take a task with type=[{}] from queue successfully,will run it directly",type);
task.run();
}
}
}catch(Exception e){
LOGGER.error("take task error,with taskType=[{}],cause:",taskType,e);
LOGGER.error("take task with type=[{}] error,cause:",type,e);
}
}else{
LOGGER.error("initQueue failed,with queue=null and taskType=[{}]",taskType);
LOGGER.error("initQueue with type=[{}] failed,because the queue is null ",type);
}
}
});
... ...