文章目录
  1. 模板方法模式

模板方法模式

  • 定义一个操作中的算法的骨架,而将一些步骤延迟到子类中。TemplateMethod使得子类可以不改变一个算法的结构即可重定义该算法的某些特定步骤。
    TemplateMethod

较为常见,以一个实际的Worker示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
public interface Worker {
/**
* Worker的入队方法,入队之后直接开始运作,
* 实现类的构造或初始化方法自动启动Worker
* @param o
*/
void startAppendingQueue(Object o);
}
public abstract class BaseWorker<T> implements Worker {
protected Logger log = Logger.getLogger(getClass());
private Queue<T> jobQueue = new LinkedBlockingQueue<T>();
private Queue<List<T>> thread_Queue = new LinkedBlockingQueue<List<T>>();
// 线程池
private ThreadPoolExecutor threadPool = null;
private int coreThreadSize = 5;
private int waitingQueueSize = 50;
//队列入队休眠时间
private int queueThreadSleepTime = 500;
//抓取队列休眠时间
private int fetchThreadSleepTime = 0;
//每次周期处理的队列容量
private int queueBatchNum = 1000;
private String workerName;
private boolean work_done_flag = false;
/**
* 启动
*/
@PostConstruct public void start() {
threadPool = new ThreadPoolExecutor(coreThreadSize, coreThreadSize, 0,
TimeUnit.MILLISECONDS, new LinkedBlockingDeque<Runnable>(waitingQueueSize));
startWorker();
log.info(String.format("%s初始化成功,核心线程数:%d",getWorkerName(),coreThreadSize));
}
private void startWorker() {
new FetchTaskThread().start();
log.info(getWorkerName() + "开启主线程成功");
}
/**
* 调用入队操作
*/
public void startAppendingQueue(Object o) {
new QueueTaskThread(o).start();
}
/**
* 工作处理线程,控制线程池
*
* @author pingansheng
*/
private class FetchTaskThread extends Thread {
public void run() {
while (!work_done_flag) {
List<T> jobs = null;
BaseRunner runner = null;
try {
selectWorkerElements();
if (CollectionUtils.isNotEmpty(thread_Queue)) {
log.info("拉取线程队列成功,拉取数" + thread_Queue.size());
CountDownLatch doneSingal = new CountDownLatch(thread_Queue.size());
for (List<T> thread_jobs : thread_Queue) {
jobs = thread_jobs;
log.info("拉取任务数成功,拉取数" + jobs.size());
runner = new BaseRunner(jobs, doneSingal);
threadPool.execute(runner);
log.info("入线程池成功等待信号,当前线程池容量:" + threadPool.getActiveCount());
}
thread_Queue.clear();
doneSingal.await(15 * 1000, TimeUnit.MILLISECONDS);
}
Thread.sleep(fetchThreadSleepTime);
} catch (RejectedExecutionException e) {
//线程等待一秒再次进行尝试入线程池
threadPool_retry(runner);
log.error(workerName + "Worker-主线程捕获线程池拒绝异常", e);
} catch (Exception e) {
doSth_with_exception();
log.error(workerName + "Worker-主线程异常", e);
}
}
}
}
/**
* 线程池尝试再次入队
*
* @param runner
*/
private void threadPool_retry(BaseRunner runner) {
try {
for (int i = 0; i < 10; i++) {
Thread.sleep(3000);
threadPool.execute(runner);
}
} catch (Exception e) {
doSth_with_exception();
log.error(workerName + "Worker-主线程重入线程池异常", e);
}
}
/**
* 入队控制线程
*
* @author pingansheng
*/
private class QueueTaskThread extends Thread {
private Object obj = null;
public QueueTaskThread(Object queueObj) {
obj = queueObj;
}
public void run() {
try {
insertToQueue(obj);
} catch (Exception e) {
doSth_with_exception();
log.error(workerName + "Worker-主线程入队线程异常", e);
}
}
}
/**
* 选择元素进行处理
*
* @return
*/
private void selectWorkerElements() {
for (int thread_count = 0; thread_count < coreThreadSize; thread_count++) {
List<T> list = new ArrayList<T>();
for (int i = 0; i < queueBatchNum; i++) {
T o = jobQueue.poll();
if (null == o) {
break;
} else {
list.add(o);
}
}
if (CollectionUtils.isNotEmpty(list)) {
thread_Queue.offer(list);
}
}
}
/**
* 线程执行类
*/
private class BaseRunner extends Thread {
private List<T> job;
private CountDownLatch doneSingal;
public BaseRunner(List<T> job, CountDownLatch doneSingal) {
this.job = job;
this.doneSingal = doneSingal;
}
@Override public void run() {
try {
doSth(this.job);
} catch (Exception e) {
doSth_with_exception();
log.error(getWorkerName() + "执行Runner异常", e);
} finally {
doneSingal.countDown();
}
}
}
/**
* 获取线程模型还未执行的任务数量
*
* @return
*/
protected int getUnDoTaskCount() {
return this.getJobQueue().size() +
getThreadPool().getQueue().size() + getThreadPool().getActiveCount();
}
/**
* 子类需要实现,主线程异常执行的方法
*/
protected abstract void doSth_with_exception();
/**
* 子类需要实现,线程执行方法
*/
protected abstract void doSth(List<T> job) throws Exception;
/**
* 子类需要实现,入队方法,注意此方法体内需根据具体业务进行休眠以防止拒绝入线程池
*/
protected abstract void insertToQueue(Object job) throws Exception;
public Queue<T> getJobQueue() {
return jobQueue;
}
public void setJobQueue(Queue<T> jobQueue) {
this.jobQueue = jobQueue;
}
public ThreadPoolExecutor getThreadPool() {
return threadPool;
}
public int getCoreThreadSize() {
return coreThreadSize;
}
public void setCoreThreadSize(int coreThreadSize) {
this.coreThreadSize = coreThreadSize;
}
public int getWaitingQueueSize() {
return waitingQueueSize;
}
public void setWaitingQueueSize(int waitingQueueSize) {
waitingQueueSize = waitingQueueSize;
}
public String getWorkerName() {
return workerName;
}
public void setWorkerName(String workerName) {
this.workerName = workerName;
}
public int getQueueThreadSleepTime() {
return queueThreadSleepTime;
}
public void setQueueThreadSleepTime(int queueThreadTime) {
this.queueThreadSleepTime = queueThreadTime;
}
public int getQueueBatchNum() {
return queueBatchNum;
}
public void setQueueBatchNum(int queueBatchNum) {
this.queueBatchNum = queueBatchNum;
}
public boolean isWork_done_flag() {
return work_done_flag;
}
public void setWork_done_flag(boolean work_done_flag) {
this.work_done_flag = work_done_flag;
}
public int getFetchThreadSleepTime() {
return fetchThreadSleepTime;
}
public void setFetchThreadSleepTime(int fetchThreadSleepTime) {
this.fetchThreadSleepTime = fetchThreadSleepTime;
}
}
public class MyWorker extends BaseWorker<Integer> {
@Override
protected void doSth_with_exception() {
log.error("exception");
}
@Override
protected void doSth(List<Integer> job) throws Exception {
log.info("处理批次任务:"+job.size());
}
@Override
protected void insertToQueue(Object job) throws Exception {
while (true){
this.getJobQueue().offer(1);
}
}
}
public class App {
public static void main(String[] args) {
Worker worker=new MyWorker();
((BaseWorker)worker).start();//一般使用Spring注解PostConstruct
worker.startAppendingQueue(null);
}
}
文章目录
  1. 模板方法模式