第六章、Java并发容器和框架

6.1、ConcurrentHashMap的实现与使用

1、线程不安全的HashMap

线程不安全的HashMap

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
do {
next = e.next;
//假设当前Thread在此挂起,而Thread1完成扩容,分析以下的逻辑如图所示
if ((e.hash & oldCap) == 0) {
if (loTail == null)
loHead = e;
else
loTail.next = e;
loTail = e;
}
else {
if (hiTail == null)
hiHead = e;
else
//完成循环引用
hiTail.next = e;
hiTail = e;
}
//e与next交替
} while ((e = next) != null);

2、线程安全的ConcurrentHashMap(CHM)

  • HashTable使用synchronized方法保证线程安全,效率低
  • 1.6版本代码CHM使用锁分段实现,1.8版本则使用synchronized锁hash桶首节点(1.8对synchronized进行了优化,其效率可以满足Doug Lea的需求,其实也是一种锁分区方式~)

1.8 CHM的数据结构示意图

0) init

  • SIZECTL标量的含义:
    • -1:表示表正在初始化
    • -N:表示有N-1个线程正在进行扩容操作
    • 其他情况:如果表未初始化则表示表的大小,否则标识table的容量,默认为0.75倍容量(n-(n>>>2))
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      /**
      * 初始化表,延时加载于put方法中
      */
      private final Node<K,V>[] initTable() {
      Node<K,V>[] tab; int sc;
      while ((tab = table) == null || tab.length == 0) {
      if ((sc = sizeCtl) < 0)
      // sizeCtl已经小于0,其他线程已经获取了初始化权利
      //本线程让出CPU即可
      Thread.yield();
      else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
      //获取到初始化权利的线程CAS修改sizeCtl为-1标识正在初始化,只有一个线程成功
      try {
      if ((tab = table) == null || tab.length == 0) {
      int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
      //sc其他线程可能会改成-1,参见上一个IF
      @SuppressWarnings("unchecked")
      Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
      table = tab = nt;
      //计算0.75倍容量,无符号右移2位相当于除以4
      sc = n - (n >>> 2);
      }
      } finally {
      //赋值sizeCtl为0.75倍容量
      sizeCtl = sc;
      }
      break;
      }
      }
      return tab;
      }

1) put

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
final V putVal(K key, V value, boolean onlyIfAbsent) {
if (key == null || value == null) throw new NullPointerException();
//二次散列避免散列冲突,使得散列值分布更加均匀
int hash = spread(key.hashCode());
int binCount = 0;
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;//n:桶大小;i:目标桶节点索引
if (tab == null || (n = tab.length) == 0)
//hash表为空则初始化
tab = initTable();
//tabAt使用Unsafe.getObjectVolatile直接获取内存数据,否则从线程副本中获取不一定最新
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
//使用Volatile方式取hash桶的首个入口元素,index=(table.size-1)&hash
//如果桶空则直接CAS插入,这时候没有锁啥事,直接break返回
if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null)))
break; // no lock when adding to empty bin
}
else if ((fh = f.hash) == MOVED)
//正在扩容,则帮助其扩容
tab = helpTransfer(tab, f);
else {
V oldVal = null;
synchronized (f) {
//锁桶首元素
if (tabAt(tab, i) == f) {
//在此验证首元素没有发生变化
if (fh >= 0) {
//验证这个节点是链表的节点,负数标识一些特殊状态的节点
binCount = 1;
//标识某个HASH桶节点开始的数据链表的长度
//如果达到阀值TREEIFY_THRESHOLD,则将其转换为红黑树
for (Node<K,V> e = f;; ++binCount) {
K ek;
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
//当前循环元素e与要插入的元素一样
//命中节点的value
oldVal = e.val;
if (!onlyIfAbsent)
//onlyIfAbsent标识当前没有才插入,如果为false,则标识存在就更新
e.val = value;
break;
}
Node<K,V> pred = e;
if ((e = e.next) == null) {
//上一个if没有命中,当前没有此节点,且下一个节点是NULL,此时插入新的节点在尾部
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
else if (f instanceof TreeBin) {
//如果不是链表节点且为红黑树节点,则走红黑树插入逻辑
Node<K,V> p;
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
if (binCount != 0) {
//监控链表长度>=8转换为红黑树形式
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
//更新元素数量
addCount(1L, binCount);
return null;
}

2) get

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public V get(Object key) {
Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
int h = spread(key.hashCode());
if ((tab = table) != null && (n = tab.length) > 0 &&
(e = tabAt(tab, (n - 1) & h)) != null) {
if ((eh = e.hash) == h) {
if ((ek = e.key) == key || (ek != null && key.equals(ek)))
return e.val;
}
else if (eh < 0)
return (p = e.find(h, key)) != null ? p.val : null;
//注意这里的find是TreeBin中覆写的方法,用于红黑树查找
while ((e = e.next) != null) {
if (e.hash == h &&
((ek = e.key) == key || (ek != null && key.equals(ek))))
return e.val;
}
}
return null;
}

3) size

  • 1.8版本使用basecount保存元素个数,同时利用counterCells数组保存并发情况下的元素个数的变化
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    110
    111
    112
    113
    114
    115
    116
    117
    118
    119
    120
    121
    122
    123
    124
    125
    126
    127
    128
    129
    130
    131
    132
    133
    134
    135
    136
    137
    138
    139
    140
    141
    142
    143
    144
    145
    146
    public int size() {
    long n = sumCount();
    return ((n < 0L) ? 0 :
    (n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE :
    (int)n);
    }
    final long sumCount() {
    CHM.CounterCell[] as = counterCells; CHM.CounterCell a;
    long sum = baseCount;
    if (as != null) {
    for (int i = 0; i < as.length; ++i) {
    if ((a = as[i]) != null)
    sum += a.value;
    }
    }
    return sum;
    }
    /**
    * 修改元素个数与扩容检测
    * @param x the count to add
    * @param check if <0, don't check resize, if <= 1 only check if uncontended
    */
    private final void addCount(long x, int check) {
    CounterCell[] as; long b, s;
    if ((as = counterCells) != null ||
    !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
    //首先更新baseCount,只有一个线程成功,其他线程继续逻辑使用counterCells保存变化数
    CounterCell a; long v; int m;
    boolean uncontended = true;
    if (as == null || (m = as.length - 1) < 0 ||
    (a = as[ThreadLocalRandom.getProbe() & m]) == null ||
    !(uncontended =
    U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
    //counterCells如果为空或者随机取得的数据索引为空,或者并发更新失败(只有一个线程成功),则进行fullAddCount去自旋初始化或cas修改basecount或CounterCell
    //ThreadLocalRandom.getProbe() & m随机找一个索引,不同的线程使用不同的种子
    //uncontended表示是否是冲突而造成的走逻辑,默认为true即非冲突
    fullAddCount(x, uncontended);
    return;
    }
    if (check <= 1)
    return;
    s = sumCount();
    }
    //扩容检测部分,与SIZE无关
    ...//略
    }
    // See LongAdder version for explanation
    private final void fullAddCount(long x, boolean wasUncontended) {
    int h;
    if ((h = ThreadLocalRandom.getProbe()) == 0) {
    ThreadLocalRandom.localInit(); // force initialization
    h = ThreadLocalRandom.getProbe();
    wasUncontended = true;
    }
    boolean collide = false; // True if last slot nonempty
    for (;;) {
    CounterCell[] as; CounterCell a; int n; long v;
    if ((as = counterCells) != null && (n = as.length) > 0) {
    //counterCells初始化成功走此逻辑
    if ((a = as[(n - 1) & h]) == null) {
    //随机选择一个如果为空
    if (cellsBusy == 0) { // Try to attach new Cell
    CounterCell r = new CounterCell(x); // Optimistic create
    if (cellsBusy == 0 &&
    U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
    //只有一个线程进入
    boolean created = false;
    try { // Recheck under lock
    CounterCell[] rs; int m, j;
    if ((rs = counterCells) != null &&
    (m = rs.length) > 0 &&
    rs[j = (m - 1) & h] == null) {
    rs[j] = r;
    //创建将新的CounterCell赋值rs[(m - 1) & h]
    created = true;
    }
    } finally {
    cellsBusy = 0;
    }
    if (created)//是创建逻辑则跳出
    break;
    //不是创建逻辑则继续
    continue; // Slot is now non-empty
    }
    }
    collide = false;
    }
    else if (!wasUncontended) // CAS already known to fail
    wasUncontended = true; // Continue after rehash
    else if (U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))
    //取得的a = as[(n - 1) & h]不为空则尝试cas更新其值,成功则跳出
    break;
    else if (counterCells != as || n >= NCPU)
    //counterCells != as说明这个表已经被扩容过了
    //n >= NCPU标识限制最大为cpu核心数,此时之后就无法进入扩容逻辑
    collide = false; // At max size or stale
    else if (!collide)
    collide = true;
    else if (cellsBusy == 0 &&
    U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
    //扩容,增加坑位,减少冲突
    try {
    if (counterCells == as) {// Expand table unless stale
    CounterCell[] rs = new CounterCell[n << 1];
    for (int i = 0; i < n; ++i)
    rs[i] = as[i];
    counterCells = rs;
    }
    } finally {
    cellsBusy = 0;
    }
    collide = false;
    continue; // Retry with expanded table
    }
    h = ThreadLocalRandom.advanceProbe(h);
    }
    else if (cellsBusy == 0 && counterCells == as &&
    U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
    //counterCells为空初始化时走此逻辑,此处cas也只有一个线程进来
    boolean init = false;
    try { // Initialize table
    if (counterCells == as) {
    CounterCell[] rs = new CounterCell[2];
    rs[h & 1] = new CounterCell(x);
    //h & 1取值应该只有0或1,随机选择一个索引进行数据写入
    counterCells = rs;
    init = true;
    }
    } finally {
    cellsBusy = 0;
    }
    if (init)
    break;
    }
    else if (U.compareAndSwapLong(this, BASECOUNT, v = baseCount, v + x))
    //其他线程没有获得初始化counterCells权利则尝试直接修改basecount
    //成功返回,否则自选修改counterCells的值
    break; // Fall back on using base
    }
    }

4) resize

  • 可结合参考其他同学的分析: http://www.daozhihun.com/p/2186 http://www.jianshu.com/p/f6730d5784ad
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    110
    111
    112
    113
    114
    115
    116
    117
    118
    119
    120
    121
    122
    123
    124
    125
    126
    127
    128
    129
    130
    131
    132
    133
    134
    135
    136
    137
    138
    139
    140
    141
    142
    143
    144
    145
    146
    147
    148
    149
    150
    151
    152
    153
    154
    155
    156
    157
    158
    159
    160
    161
    162
    163
    164
    165
    166
    167
    168
    169
    170
    171
    172
    173
    174
    175
    176
    177
    178
    179
    180
    /**
    * 修改元素个数与扩容检测
    * @param x the count to add
    * @param check if <0, don't check resize, if <= 1 only check if uncontended
    */
    private final void addCount(long x, int check) {
    ...//略
    //扩容检测部分
    if (check >= 0) {
    Node<K,V>[] tab, nt; int n, sc;
    while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
    (n = tab.length) < MAXIMUM_CAPACITY) {
    int rs = resizeStamp(n);
    if (sc < 0) {
    if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
    sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
    transferIndex <= 0)
    break;
    if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
    transfer(tab, nt);
    }
    else if (U.compareAndSwapInt(this, SIZECTL, sc,
    (rs << RESIZE_STAMP_SHIFT) + 2))
    transfer(tab, null);
    s = sumCount();
    }
    }
    }
    /**
    * 扩容
    * Moves and/or copies the nodes in each bin to new table. See
    * above for explanation.
    */
    private final void transfer(
    CHM.Node<K,V>[] tab, CHM.Node<K,V>[] nextTab) {
    int n = tab.length, stride;
    //stride标识每个线程要处理转移的节点数
    if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
    stride = MIN_TRANSFER_STRIDE; // subdivide range
    if (nextTab == null) { // initiating
    try {
    @SuppressWarnings("unchecked")
    //此处没有并发控制,调用方控制
    CHM.Node<K,V>[] nt = (CHM.Node<K,V>[])new CHM.Node<?,?>[n << 1];
    nextTab = nt;
    } catch (Throwable ex) { // try to cope with OOME
    sizeCtl = Integer.MAX_VALUE;
    return;
    }
    nextTable = nextTab;
    transferIndex = n;//标识从后往前遍历进行扩容
    }
    int nextn = nextTab.length;
    CHM.ForwardingNode<K,V> fwd = new CHM.ForwardingNode<K,V>(nextTab);
    boolean advance = true;//标识是否处理过
    boolean finishing = false; // to ensure sweep before committing nextTab
    for (int i = 0, bound = 0;;) {
    CHM.Node<K,V> f; int fh;
    while (advance) {
    //advance标识当前节点结束之后才进入该循环进行下个节点的选择,即更新i
    //i指当前处理的数组编号
    //bound指处理的边界
    int nextIndex, nextBound;
    if (--i >= bound || finishing)
    advance = false;
    else if ((nextIndex = transferIndex) <= 0) {
    i = -1;
    advance = false;
    }
    else if (U.compareAndSwapInt
    (this, TRANSFERINDEX, nextIndex,
    nextBound = (nextIndex > stride ?
    nextIndex - stride : 0))) {
    //将transferIndex更新至边界值
    bound = nextBound;
    i = nextIndex - 1;
    advance = false;
    }
    }
    if (i < 0 || i >= n || i + n >= nextn) {
    int sc;
    if (finishing) {
    nextTable = null;
    table = nextTab;
    sizeCtl = (n << 1) - (n >>> 1);
    return;
    }
    if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
    if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
    return;
    finishing = advance = true;
    i = n; // recheck before commit
    }
    }
    else if ((f = tabAt(tab, i)) == null)
    //当前位置为空插入ForwardingNode标识处理过了
    advance = casTabAt(tab, i, null, fwd);
    else if ((fh = f.hash) == MOVED)
    //别的线程如果发现当前节点HASH是ForwardingNode的Hash,则跳过
    advance = true; // already processed
    else {
    synchronized (f) {
    //锁节点
    if (tabAt(tab, i) == f) {
    CHM.Node<K,V> ln, hn;
    if (fh >= 0) {
    int runBit = fh & n;
    //根据runBit将节点分为两类
    CHM.Node<K,V> lastRun = f;
    for (CHM.Node<K,V> p = f.next; p != null; p = p.next) {
    int b = p.hash & n;
    if (b != runBit) {
    //找到最后一个需要处理的链表节点,该节点之后的runBit都一样,整体转移即可
    runBit = b;
    lastRun = p;
    }
    }
    if (runBit == 0) {
    ln = lastRun;
    hn = null;
    }
    else {
    hn = lastRun;
    ln = null;
    }
    //倒序lastRun部分整体不动
    for (CHM.Node<K,V> p = f; p != lastRun; p = p.next) {
    int ph = p.hash; K pk = p.key; V pv = p.val;
    if ((ph & n) == 0)
    ln = new CHM.Node<K,V>(ph, pk, pv, ln);
    else
    hn = new CHM.Node<K,V>(ph, pk, pv, hn);
    }
    //两个链表分别放置
    setTabAt(nextTab, i, ln);
    setTabAt(nextTab, i + n, hn);
    setTabAt(tab, i, fwd);
    advance = true;
    }
    else if (f instanceof CHM.TreeBin) {
    CHM.TreeBin<K,V> t = (CHM.TreeBin<K,V>)f;
    CHM.TreeNode<K,V> lo = null, loTail = null;
    CHM.TreeNode<K,V> hi = null, hiTail = null;
    int lc = 0, hc = 0;
    for (CHM.Node<K,V> e = t.first; e != null; e = e.next) {
    int h = e.hash;
    CHM.TreeNode<K,V> p = new CHM.TreeNode<K,V>
    (h, e.key, e.val, null, null);
    if ((h & n) == 0) {
    if ((p.prev = loTail) == null)
    lo = p;
    else
    loTail.next = p;
    loTail = p;
    ++lc;
    }
    else {
    if ((p.prev = hiTail) == null)
    hi = p;
    else
    hiTail.next = p;
    hiTail = p;
    ++hc;
    }
    }
    ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
    (hc != 0) ? new CHM.TreeBin<K,V>(lo) : t;
    hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
    (lc != 0) ? new CHM.TreeBin<K,V>(hi) : t;
    setTabAt(nextTab, i, ln);
    setTabAt(nextTab, i + n, hn);
    setTabAt(tab, i, fwd);
    advance = true;
    }
    }
    }
    }
    }
    }

4) 红黑树

以Hash值作为节点Value的红黑树结构
参见 红黑树

6.2、ConcurrentLinkedQueue

1、offer

首次增加节点示意图
第二次增加节点示意图

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public boolean offer(E e) {
checkNotNull(e);
final Node<E> newNode = new Node<E>(e);
//失败即重试的方式,直到入队成功
for (Node<E> t = tail, p = t;;) {
//初始化 t=tail p=tail
Node<E> q = p.next;
if (q == null) {
// p是最后一个结点
if (p.casNext(null, newNode)) {
//CAS保证最后一个是新节点,且将p指向新节点
if (p != t)
// 插入结点后tail和p距离达到两个结点,当p!=t时候,pt中间隔一个节点
// 则修改tail的指向(失败也没关系)
// 这里在判断tail为最后一个结点后仍然要判断hop
// 是否达到2主要是为了预防在并发修改下,多个线程同时修改的问题
casTail(t, newNode); // Failure is OK.
return true;
}
// Lost CAS race to another thread; re-read next
}
else if (p == q)
// 尾节点所在的部分已经删除则p==q
// 首先判断tail是否已经被其他线程修改,
// 如果是直接用t(t=tail),否则从头再来因为此时tail无法找到有效的节点了
p = (t != (t = tail)) ? t : head;
else
// 没找到最后一个节点
// 首先看是否有其他线程修改了tail,如果是则更新为tail
// p!=t表示此种情况下tail才有可能更新
p = (p != t && t != (t = tail)) ? t : q;
}
}

2、poll

POLL第一轮示意图
POLL第二轮示意图

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public E poll() {
restartFromHead:
for (;;) {
for (Node<E> h = head, p = h, q;;) {
E item = p.item;
//第一个节点不为空则将其item设为NULL标识删除,其仍然在队列中
if (item != null && p.casItem(item, null)) {
// Successful CAS is the linearization point
// for item to be removed from this queue.
//
if (p != h)
// 原理同offer,循环一次后p进行迭代后在考虑更新head,减少写volatile变量次数(head)
// 如果进行更换则,此时p一定为null,尝试跳过p,直接将head指向p.next
// 如果q=p.next为null,则不能指向p,head不能为null,只能给p,此时p肯定不是null
updateHead(h, ((q = p.next) != null) ? q : p);
return item;
}
else if ((q = p.next) == null) {
//这个时候队列空
//不清楚为什么此时必须更新head
//有一种假设即当线程A进入第一个if时候在修改head节点之前挂起,这时将head与p相隔一个item为NULL的节点
updateHead(h, p);
return null;
}
else if (p == q)
//其他线程将当前正在处理的节点都已经删除(item置为了空)
//当走到最后一个节点时候,其next指向自身,此时删除失败需要重来,如图第二轮中的NULL节点
continue restartFromHead;
else
//p迭代
p = q;
}
}
}

6.3、几种阻塞队列

1、ArrayBlockingQueue

  • 数组组成的

2、LinkedBlockingQueue

  • 链表组成的

3、PriorityBlockingQueue
4、DelayQueue

  • 元素需要实现DelayQueue接口
  • 缓存有效期的设计
  • 定时任务的调度设计

5、SynchronousQueue

  • 不存储元素,每一个put必须等待一个take
  • 吞吐量不低,高于上二者

6、LinkedTransferQueue

  • transfer方法等待消费者消费才返回
  • trytransfer方法立即返回

7、LinkedBlockingDeque

  • 链表组成的双向阻塞队列
  • 所有操作可以指定两端
  • 可用于工作窃取模式(某个线程从其他队列里面窃取任务执行)

6.4、ForkJoin

  • 分割任务
  • 执行并合并
  • 一般使用子类:RecursiveTask用于有返回型;RecursiveAction用于无返回型
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    110
    111
    112
    package concurrent.concurrentArt.chapter6;
    import java.util.ArrayList;
    import java.util.Arrays;
    import java.util.Collection;
    import java.util.List;
    import java.util.concurrent.*;
    import java.util.stream.Collector;
    import java.util.stream.Collectors;
    /**
    * Created with IntelliJ IDEA.
    * Date: 2016/10/20
    * Time: 23:30
    */
    public class ForkJoinTest {
    /**
    * RecursiveTask用于有返回的任务
    */
    static class CountTask extends RecursiveTask<Integer> {
    List<Integer> nums;
    static final int THRESHOLD = 2;//阀值
    public CountTask(List<Integer> nums) {
    this.nums = nums;
    }
    @Override protected Integer compute() {
    if (nums.size() <= THRESHOLD) {
    return nums.parallelStream().collect(Collectors.summingInt(p -> p));
    } else {
    int wall = nums.size() / 2;
    List left = nums.subList(0, wall);
    List right = nums.subList(wall, nums.size());
    CountTask taskLeft = new CountTask(left);
    CountTask taskRight = new CountTask(right);
    taskLeft.fork();
    taskRight.fork();
    int leftRes = taskLeft.join();
    int rightRes = taskRight.join();
    return leftRes + rightRes;
    }
    }
    }
    interface MyJob {
    void doSth();
    }
    static class Job implements MyJob {
    @Override public void doSth() {
    System.out.println("do");
    }
    }
    /**
    * RecursiveAction用于无返回任务
    */
    static class ForkJoinAction extends RecursiveAction {
    List<MyJob> jobs;
    static final int THRESHOLD = 1;//阀值
    public ForkJoinAction(List<MyJob> jobs) {
    this.jobs = jobs;
    }
    @Override protected void compute() {
    if (jobs.size() <= THRESHOLD) {
    jobs.get(0).doSth();
    } else {
    int wall = jobs.size() / 2;
    List left = jobs.subList(0, wall);
    List right = jobs.subList(wall, jobs.size());
    ForkJoinAction taskLeft = new ForkJoinAction(left);
    ForkJoinAction taskRight = new ForkJoinAction(right);
    taskLeft.fork();
    taskRight.fork();
    taskLeft.join();
    taskRight.join();
    }
    }
    }
    public static void main(String[] args) throws Exception {
    ForkJoinPool pool = new ForkJoinPool();
    ForkJoinTask task = new CountTask(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10));
    Future res = pool.submit(task);
    if (!task.isCompletedAbnormally()) {
    System.out.println(res.get());
    } else {
    System.out.println(task.getException());
    }
    Job job = new Job();
    ForkJoinTask action = new ForkJoinAction(Arrays.asList(job, job, job, job, job));
    pool.submit(action);
    if (!action.isCompletedAbnormally()) {
    System.out.println("DONE");
    } else {
    System.out.println(action.getException());
    }
    }
    }

第7章、Java中的13个原子操作类

7.1、原子更新基本类型

1、AtomicBoolean
2、AtomicInteger
3、AtomicLong

  • 以上都是以CAS为原理进行的原子操作,项目中尽量使用即可,方法列表参见源码

7.2、原子更新数组

1、AtomicIntegerArray
2、AtomicLongArray
3、AtomicReferenceArray

  • 原子更新数组中某个值

7.3、原子更新引用类型

1、AtomicReference
2、AtomicReferenceFieldUpdater
3、AtomicMarkableReference

7.4、原子更新字段

1、AtomicIntegerFieldUpdater
2、AtomicLongFieldUpdater
3、AtomicStampedReference