工厂方法模式

  • 定义一个用于创建对象的接口,让子类决定实例化哪一个类,使一个类的实例化延迟到子类
    Factory Method
  • 一个手机
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public class Phone {
public Phone() {
}
public void start() {
System.out.println("手机开机");
}
public void close() {
System.out.println("手机关机");
}
@Override public String toString() {
return "这是一部手机";
}
}
  • 手机工厂
1
2
3
4
5
6
7
8
9
10
11
12
13
public class PhoneFactory {
public PhoneFactory() {
}
public Phone CreatePhone() {
return new Phone();
}
public void SellPhone() {
}
}
  • 智能手机
1
2
3
4
5
6
7
8
9
10
11
12
13
14
public class SmartPhone extends Phone {
public SmartPhone() {
}
public void start() {
System.out.println("智能手机开机");
}
public void close() {
System.out.println("智能手机关机");
}
@Override public String toString() {
return "这是一部智能手机";
}
}
  • 智能手机工厂
1
2
3
4
5
6
7
8
9
10
public class SmartPhoneFactory extends PhoneFactory {
public SmartPhoneFactory() {
}
public Phone CreatePhone() {
return new SmartPhone();
}
}
  • 客户端
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public class App {
/**
* 工厂方法模式:定义一个用于创建对象的接口,子类决定具体实现哪一个类,类的实例化延迟到子类
* 缺点,为了拿到一个特定的SmartPhone,必须额外构造一个SmartPhoneFactory的,而这个类接下来可能用不到
* @param args
*/
public static void main(String[] args) {
designpattern.factorymethod.PhoneFactory factory=new SmartPhoneFactory();
Phone phone=factory.CreatePhone();
phone.start();
phone.close();
System.out.println(phone);
}
}

第五章 Java中的锁

5.1 Lock接口

1、可以尝试非阻塞获取锁(lockInterruptibly())
2、能被中断地获取锁
3、超时获取锁

5.2 队列同步器(AQS)-面向自定义锁的实现者

1、同步队列

同步队列的基本结构

  • 设置尾节点为原子方式
  • 头节点线程释放同步状态后,唤醒后继结点,由于只有一个线程能够获取到同步状态(当前的头节点),所有设置头节点不需要CAS保证

2、独占式同步状态获取与释放

通过几个死循环的方式保证节点添加的并发请求串行化
节点自旋获取同步状态

  • 节点进入同步队列后,自省观察并获取同步状态,过程中头节点不为空,enq方法保证,第一次enq方法后(enq中会有两次循环),队列状态如下:
    入参节点node的前驱是一个新节点,在方法enq中new出,此时新的线程在公平锁的获取流程中,hasQueuedPredecessors()返回ture,后继节点node再继续获取锁的流程中(注意此时图中node的线程并没有返回,因而获取锁过程没有结束),多个线程竞争依靠compareAndSetState的CAS方式保证一致性。
    同步队列的初始化
    自省的实现方式-死循环
    独占式获取同步状态的流程

    释放调用release(int)方法进行释放,其中唤醒后继节点

    3、一个示例,TwinsLock

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
package concurrent.concurrentArt.chapter5;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
/**
* 一种基于AQS的自定义锁,最多两个线程进入
* Created with IntelliJ IDEA.
* User: pingansheng
* Date: 2016/8/26
* Time: 11:13
*/
public class TwinsLock implements Lock {
private final Sync sync = new Sync(2);
private static class Sync extends AbstractQueuedSynchronizer {
Sync(Integer count) {
if (count < 0) {
throw new IllegalArgumentException("count must larger than zero.");
}
setState(count);
}
@Override protected int tryAcquireShared(int reduceCount) {
for (; ; ) {
int current = getState();
int newCount = current - reduceCount;
if (newCount < 0 || compareAndSetState(current, newCount)) {
return newCount;
}
}
}
@Override protected boolean tryReleaseShared(int returnCount) {
for (; ; ) {
int current = getState();
int newCount = current + returnCount;
if (compareAndSetState(current, newCount)) {
return true;
}
}
}
}
@Override public void lock() {
sync.acquireShared(1);
}
@Override public void unlock() {
sync.releaseShared(1);
}
@Override public void lockInterruptibly() throws InterruptedException {
}
@Override public boolean tryLock() {
return false;
}
@Override public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
return false;
}
@Override public Condition newCondition() {
return null;
}
}

测试代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
package concurrent.concurrentArt.chapter5;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
/**
* 验证twinsLock
* Created with IntelliJ IDEA.
* User: pingansheng
* Date: 2016/8/30
* Time: 14:05
*/
public class TwinsLockTest {
public static void main(String[] args) throws Exception{
final Lock lock = new TwinsLock();
class Worker extends Thread {
public void run() {
while (true) {
lock.lock();
try {
System.out.println(Thread.currentThread().getName());
TimeUnit.SECONDS.sleep(2);
} catch (Exception e) {
} finally {
lock.unlock();
}
break;
}
}
}
for(int i=0;i<10;i++){
Worker w=new Worker();
// w.setDaemon(true);
w.start();
}
for(int i=0;i<10;i++){
Thread.sleep(1000);
System.out.println();
}
}
}

5.3 重入锁

1、获取同步状态时,如果该线程与当前持有线程相同,则进行同步状态值增加返回true,表示获取成功
2、公平锁与非公平锁:前者FIFO(获取锁时会检查当前同步队列时候有等待的线程节点,见上文红色),后者CAS竞争,前者代价是进行大量的线程切换(切换锁),非公平锁可能造成饥饿,但是线程切换很少(耗时94倍,切换次数133倍)

5.4 读写锁

1、同一时刻允许多个读线程访问,写线程访问时,读写均阻塞
2、一般情况下,读写锁的性能都会比排它锁好,大多数情景读是多于写的
3、读写锁实现分析:整形按位切割,32位前16位用于标记读状态,后16位标记写状态,写状态=S & 0X0000FFFF,读状态=S>>>16,所以,S不等于0时候,写状态=0则读状态>0,即表示读锁已经获取
4、锁降级:把持住写锁,再获取到读锁,随后释放写锁

5.5-5.6 Condition接口

1、Condition对象:依赖Lock接口实现一套等待通知机制
2、一个示例,有界队列:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
package concurrent.concurrentArt.chapter5;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/**
* Created with IntelliJ IDEA.
* User: pingansheng
* Date: 2016/9/8
* Time: 16:53
*/
public class ConditionTest {
Lock lock = new ReentrantLock();
Condition condition = lock.newCondition();
public void conditionWait() throws Exception {
lock.lock();
try {
condition.await();
} finally {
lock.unlock();
}
}
public void conditionSignal() throws Exception {
lock.lock();
try {
condition.signal();
} finally {
lock.unlock();
}
}
public static void main(String[] args) throws Exception {
final BoundedQueue<Integer> queue = new BoundedQueue<>(10);
new Thread() {
@Override public void run() {
try {
while (true) {
System.out.println("remove:" + queue.remove());
}
} catch (Exception e) {
}
}
}.start();
while (true) {
queue.add(1);
System.out.println("add:"+1);
}
}
private static class BoundedQueue<T> {
private Object[] items;
private int addIndex, removeIndex, count;
private Lock lock = new ReentrantLock();
private Condition notEmpty = lock.newCondition();
private Condition notFull = lock.newCondition();
public BoundedQueue(int size) {
items = new Object[size];
}
public void add(T t) throws Exception {
lock.lock();
try {
while (count == items.length) {
notFull.await();
}
items[addIndex] = t;
if (++addIndex == items.length) {
addIndex = 0;
}
++count;
notEmpty.signal();
} finally {
lock.unlock();
}
}
public T remove() throws Exception {
lock.lock();
try {
while (count == 0) {
notEmpty.await();
}
Object x = items[removeIndex];
if (++removeIndex == items.length) {
removeIndex = 0;
}
--count;
notFull.signal();
return (T) x;
} finally {
lock.unlock();
}
}
}
}

3、如果一个线程调用Condition.await(),则其释放锁、构造为节点加入等待队列并进入等待状态
4、等待await:同步队列中的首节点构造为新节点,加入等待队列,等待队列新增节点的线程不需要CAS,因为调用await()的线程必定获取了锁,由锁来保证线程安全
5、通知signal:一起唤醒唤醒等待时间最长的,即首个等待节点,加入同步队列中由locksupport唤醒节点中的线程
Condition的结构与等待通知示意图

1、线程池的数量

$$N_{threads}=N_{cpu}*U_{cpu}*(1+\frac{W}{C})$$

U: 目标CPU使用率 [0,1]
W: wait time
C: compute time

2、任务独立与工作队列界限

  • 任务独立时,设置线程池的工作队列界限才合理,如果任务之间存在依赖性,则可能导致线程“饥饿死锁”,应使用无界线程池,如newCachedThreadPool

    3、单线程的Executor可能发生死锁

  • 单线程的Executor可能发生死锁,newSingleThreadExecutor 对象同时执行父任务与子任务
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    /**
    * Created with IntelliJ IDEA.
    * User: pingansheng
    * Date: 2016/6/6
    * Time: 15:43
    */
    public class SingleThreadExecutorDeadLock {
    ExecutorService exec = Executors.newSingleThreadExecutor() ;
    class MyCall implements Callable<String> {
    @Override public String call () throws Exception {
    Future<String> f1 = exec.submit( new MyThread()) ;
    Future<String> f2 = exec.submit(new MyThread()) ;
    System. out.println(" 任务提交结束,等待两个子任务返回 ");
    // 主线程在单线程线程池中,所以 f1与f2 均在等待队列中,永远无法执行。
    return f1.get() + f2.get();
    }
    }
    class MyThread implements Callable<String> {
    @Override public String call () throws Exception {
    System.out.println( "子任务结束") ;
    return "RES";
    }
    }
    public static void main(String[] args) throws Exception {
    SingleThreadExecutorDeadLock lock = new SingleThreadExecutorDeadLock() ;
    Future<String> f3 = lock.exec .submit(lock.new MyCall()) ;
    try {
    System.out.println(f3.get()) ;
    } finally {
    lock.exec.shutdown() ;
    }
    }
    }

4、一种线程安全的缓存计算工具

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
/**
* 一种带有缓存的计算工具
* Created with IntelliJ IDEA.
* User: pingansheng
* Date: 2016/6/1
* Time: 14:29
*/
public class CachedCompute {
interface Computable<A, V> {
V compute(A args) throws InterruptedException , ExecutionException;
}
class CacheComputer<A, V> implements Computable< A, V > {
// 缓存
private final Map<A, Future<V>> cache = new ConcurrentHashMap<>() ;
private Computable< A, V > computer;
public CacheComputer(Computable< A, V > c) {
this .computer = c ;
}
@Override public V compute (A args) throws InterruptedException , ExecutionException {
Future<V> f = cache.get(args);
if (null == f) {
Callable<V> callable = new Callable< V>() {
@Override public V call() throws Exception {
return computer .compute(args) ;
}
};
FutureTask<V > ft = new FutureTask< V>(callable);
f = cache .putIfAbsent(args, ft) ;
if (null == f) {
System. out.println(" 缓存放置成功 ");
f = ft;
ft.run();
}
}
try {
return f.get() ;
} catch (CancellationException e) {
cache.remove(args , f);
} catch (ExecutionException e) {
throw e ;
}
return null;
}
}
CacheComputer cache = new CacheComputer<String , String>( new Computable<String, String>() {
@Override public String compute (String args)
throws InterruptedException , ExecutionException {
return "计算结果";
}
});
public static void main (String[] args) throws Throwable {
CachedCompute compute = new CachedCompute();
System. out.println(compute.cache .compute("key")) ;
}
}

5、一种This指针逃逸

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
/**
* Created with IntelliJ IDEA.
* User: pingansheng
* Date: 2016/5/11
* Time: 17:08
*/
public class ThisEscape {
private String name;
public ThisEscape() throws Throwable{
new Thread(new EscapeRunnable()).start() ;
Thread. sleep( 1000);
name ="123";
}
private class EscapeRunnable implements Runnable {
@Override
public void run() {
// 通过ThisEscape.this就可以引用外围类对象 , 但是此时外围类对象可能还没有构造完成 , 即发生了外围类的this引用的逃逸,构造函数未完成之前不应该暴露this指针
System. out.println(ThisEscape.this. name); //可能会出现令人疑惑的错误 name=null
}
}
public static void main(String[] args) throws Throwable{
new ThisEscape();
}
}

6、使用Semaphore控制任务提交速度

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
/**
* 使用 semaphore控制任务的提交速度
* Created with IntelliJ IDEA.
* User: pingansheng
* Date: 2016/6/7
* Time: 10:24
*/
public class BoundedExecutor {
private final Executor executor;
private final Semaphore semaphore;
public BoundedExecutor(Executor exe , int bound) {
this .executor = exe ;
this. semaphore = new Semaphore(bound);
}
public void submitTask(final Runnable command) throws InterruptedException {
semaphore .acquire();
System. out.println(" 信号量获取成功,当前剩余数: " + semaphore .availablePermits()) ;
try {
executor .execute(new Runnable() {
@Override public void run() {
try {
command.run();
} finally {
semaphore.release();
}
}
});
} catch (RejectedExecutionException e) {
semaphore .release();
}
}
public static void main(String[] args) throws Exception {
ExecutorService es=Executors.newCachedThreadPool() ;
BoundedExecutor exe = new BoundedExecutor(es , 100) ;
for ( int i = 0 ; i < 50 ; i++) {
exe.submitTask(new Runnable() {
@Override public void run() {
try {
System. out.println(" 任务执行 ");
Thread.sleep(1000) ;
} catch (Throwable e) {
}
}
});
}
System.out.println( "提交50 个任务结束 ");
es.shutdown() ;
}
}

7、修改标准工厂创建的executor

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
/**
* 强制类型转换重新设置 Executor的线程池参数
* newSingleThreadExecutor 除外,不是线程工厂直接创建,而是通过包装类
* public static ExecutorService newSingleThreadExecutor() {
* return new FinalizableDelegatedExecutorService
* (new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS,
* new LinkedBlockingQueue<Runnable>()));
* }
* Created with IntelliJ IDEA.
* User: pingansheng
* Date: 2016/6/7
* Time: 10:24
*/
public class ExecutorForceSet {
private static final ExecutorService executor = Executors.newFixedThreadPool( 1);
// 使用此方法包装后可以避免被修改
// private static final ExecutorService executor =Executors.unconfigurableExecutorService(Executors.newFixedThreadPool(1));
public static void main(String[] args) throws Exception {
if (executor instanceof ThreadPoolExecutor) {
((ThreadPoolExecutor) executor ).setCorePoolSize(100) ;
((ThreadPoolExecutor) executor ).setMaximumPoolSize( 100);
} else {
System.out.println( "转换出错,非线程工厂创建 ");
}
for (int i = 0; i < 50; i++) {
//lambda
executor.execute(() -> {
try {
System. out.println(" 任务执行 ");
Thread.sleep (2000 );
} catch (Throwable e) {
}
});
}
System.out.println( "提交50 个任务结束 ");
executor.shutdown() ;
}
}

8、Amdahl定律

  • 串行部分越小加速比越大(单核运行时间/多核运行时间)$$Speedup=\frac{1}{F+\frac{(1-F)}{N}}$$

F: 必须串行部分的比例
N: CPU个数

9、增加系统的伸缩性

  • 伸缩性:增加计算资源时,吞吐量和处理能力相应增加
  • 缩小锁的范围synchronized方法变为synchronized代码块(锁真正关键的地方)
  • 缩小锁的粒度synchronized方法(锁对象,static锁Class对象)变为synchronized代码块(锁变量或对象)
  • 锁分段:多个锁保护不同的区域,如10个对象数组保护10个数据片段(每片10个),通过取余获取相应的锁,(key.hashCode % length) % lockSize

10、一种非阻塞的计数器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
/**
* CasCounter
* <p/>
* Nonblocking counter using CAS
*
* @author Brian Goetz and Tim Peierls
*/
@ThreadSafe
public class CasCounter {
private SimulatedCAS value;
public int getValue() {
return value.get();
}
public int increment() {
int v;
do {
v = value.get();
} while (v != value.compareAndSwap(v, v + 1));
//非阻塞一般使用底层的并发原语操作
return v + 1;
}
}