public interface Worker {
* Worker的入队方法,入队之后直接开始运作,
* 实现类的构造或初始化方法自动启动Worker
* @param o
*/
void startAppendingQueue(Object o);
}
public abstract class BaseWorker<T> implements Worker {
protected Logger log = Logger.getLogger(getClass());
private Queue<T> jobQueue = new LinkedBlockingQueue<T>();
private Queue<List<T>> thread_Queue = new LinkedBlockingQueue<List<T>>();
private ThreadPoolExecutor threadPool = null;
private int coreThreadSize = 5;
private int waitingQueueSize = 50;
private int queueThreadSleepTime = 500;
private int fetchThreadSleepTime = 0;
private int queueBatchNum = 1000;
private String workerName;
private boolean work_done_flag = false;
* 启动
*/
@PostConstruct public void start() {
threadPool = new ThreadPoolExecutor(coreThreadSize, coreThreadSize, 0,
TimeUnit.MILLISECONDS, new LinkedBlockingDeque<Runnable>(waitingQueueSize));
startWorker();
log.info(String.format("%s初始化成功,核心线程数:%d",getWorkerName(),coreThreadSize));
}
private void startWorker() {
new FetchTaskThread().start();
log.info(getWorkerName() + "开启主线程成功");
}
* 调用入队操作
*/
public void startAppendingQueue(Object o) {
new QueueTaskThread(o).start();
}
* 工作处理线程,控制线程池
*
* @author pingansheng
*/
private class FetchTaskThread extends Thread {
public void run() {
while (!work_done_flag) {
List<T> jobs = null;
BaseRunner runner = null;
try {
selectWorkerElements();
if (CollectionUtils.isNotEmpty(thread_Queue)) {
log.info("拉取线程队列成功,拉取数" + thread_Queue.size());
CountDownLatch doneSingal = new CountDownLatch(thread_Queue.size());
for (List<T> thread_jobs : thread_Queue) {
jobs = thread_jobs;
log.info("拉取任务数成功,拉取数" + jobs.size());
runner = new BaseRunner(jobs, doneSingal);
threadPool.execute(runner);
log.info("入线程池成功等待信号,当前线程池容量:" + threadPool.getActiveCount());
}
thread_Queue.clear();
doneSingal.await(15 * 1000, TimeUnit.MILLISECONDS);
}
Thread.sleep(fetchThreadSleepTime);
} catch (RejectedExecutionException e) {
threadPool_retry(runner);
log.error(workerName + "Worker-主线程捕获线程池拒绝异常", e);
} catch (Exception e) {
doSth_with_exception();
log.error(workerName + "Worker-主线程异常", e);
}
}
}
}
* 线程池尝试再次入队
*
* @param runner
*/
private void threadPool_retry(BaseRunner runner) {
try {
for (int i = 0; i < 10; i++) {
Thread.sleep(3000);
threadPool.execute(runner);
}
} catch (Exception e) {
doSth_with_exception();
log.error(workerName + "Worker-主线程重入线程池异常", e);
}
}
* 入队控制线程
*
* @author pingansheng
*/
private class QueueTaskThread extends Thread {
private Object obj = null;
public QueueTaskThread(Object queueObj) {
obj = queueObj;
}
public void run() {
try {
insertToQueue(obj);
} catch (Exception e) {
doSth_with_exception();
log.error(workerName + "Worker-主线程入队线程异常", e);
}
}
}
* 选择元素进行处理
*
* @return
*/
private void selectWorkerElements() {
for (int thread_count = 0; thread_count < coreThreadSize; thread_count++) {
List<T> list = new ArrayList<T>();
for (int i = 0; i < queueBatchNum; i++) {
T o = jobQueue.poll();
if (null == o) {
break;
} else {
list.add(o);
}
}
if (CollectionUtils.isNotEmpty(list)) {
thread_Queue.offer(list);
}
}
}
* 线程执行类
*/
private class BaseRunner extends Thread {
private List<T> job;
private CountDownLatch doneSingal;
public BaseRunner(List<T> job, CountDownLatch doneSingal) {
this.job = job;
this.doneSingal = doneSingal;
}
@Override public void run() {
try {
doSth(this.job);
} catch (Exception e) {
doSth_with_exception();
log.error(getWorkerName() + "执行Runner异常", e);
} finally {
doneSingal.countDown();
}
}
}
* 获取线程模型还未执行的任务数量
*
* @return
*/
protected int getUnDoTaskCount() {
return this.getJobQueue().size() +
getThreadPool().getQueue().size() + getThreadPool().getActiveCount();
}
* 子类需要实现,主线程异常执行的方法
*/
protected abstract void doSth_with_exception();
* 子类需要实现,线程执行方法
*/
protected abstract void doSth(List<T> job) throws Exception;
* 子类需要实现,入队方法,注意此方法体内需根据具体业务进行休眠以防止拒绝入线程池
*/
protected abstract void insertToQueue(Object job) throws Exception;
public Queue<T> getJobQueue() {
return jobQueue;
}
public void setJobQueue(Queue<T> jobQueue) {
this.jobQueue = jobQueue;
}
public ThreadPoolExecutor getThreadPool() {
return threadPool;
}
public int getCoreThreadSize() {
return coreThreadSize;
}
public void setCoreThreadSize(int coreThreadSize) {
this.coreThreadSize = coreThreadSize;
}
public int getWaitingQueueSize() {
return waitingQueueSize;
}
public void setWaitingQueueSize(int waitingQueueSize) {
waitingQueueSize = waitingQueueSize;
}
public String getWorkerName() {
return workerName;
}
public void setWorkerName(String workerName) {
this.workerName = workerName;
}
public int getQueueThreadSleepTime() {
return queueThreadSleepTime;
}
public void setQueueThreadSleepTime(int queueThreadTime) {
this.queueThreadSleepTime = queueThreadTime;
}
public int getQueueBatchNum() {
return queueBatchNum;
}
public void setQueueBatchNum(int queueBatchNum) {
this.queueBatchNum = queueBatchNum;
}
public boolean isWork_done_flag() {
return work_done_flag;
}
public void setWork_done_flag(boolean work_done_flag) {
this.work_done_flag = work_done_flag;
}
public int getFetchThreadSleepTime() {
return fetchThreadSleepTime;
}
public void setFetchThreadSleepTime(int fetchThreadSleepTime) {
this.fetchThreadSleepTime = fetchThreadSleepTime;
}
}
public class MyWorker extends BaseWorker<Integer> {
@Override
protected void doSth_with_exception() {
log.error("exception");
}
@Override
protected void doSth(List<Integer> job) throws Exception {
log.info("处理批次任务:"+job.size());
}
@Override
protected void insertToQueue(Object job) throws Exception {
while (true){
this.getJobQueue().offer(1);
}
}
}
public class App {
public static void main(String[] args) {
Worker worker=new MyWorker();
((BaseWorker)worker).start();
worker.startAppendingQueue(null);
}
}