The Art of Java Concurrency Programming III
第六章、Java并发容器和框架
6.1、ConcurrentHashMap的实现与使用
1、线程不安全的HashMap
2、线程安全的ConcurrentHashMap(CHM)
- HashTable使用synchronized方法保证线程安全,效率低
- 1.6版本代码CHM使用锁分段实现,1.8版本则使用synchronized锁hash桶首节点(1.8对synchronized进行了优化,其效率可以满足Doug Lea的需求,其实也是一种锁分区方式~)
0) init
- SIZECTL标量的含义:
- -1:表示表正在初始化
- -N:表示有N-1个线程正在进行扩容操作
- 其他情况:如果表未初始化则表示表的大小,否则标识table的容量,默认为0.75倍容量(n-(n>>>2))1234567891011121314151617181920212223242526272829303132/*** 初始化表,延时加载于put方法中*/private final Node<K,V>[] initTable() {Node<K,V>[] tab; int sc;while ((tab = table) == null || tab.length == 0) {if ((sc = sizeCtl) < 0)// sizeCtl已经小于0,其他线程已经获取了初始化权利//本线程让出CPU即可Thread.yield();else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {//获取到初始化权利的线程CAS修改sizeCtl为-1标识正在初始化,只有一个线程成功try {if ((tab = table) == null || tab.length == 0) {int n = (sc > 0) ? sc : DEFAULT_CAPACITY;//sc其他线程可能会改成-1,参见上一个IF"unchecked")(Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];table = tab = nt;//计算0.75倍容量,无符号右移2位相当于除以4sc = n - (n >>> 2);}} finally {//赋值sizeCtl为0.75倍容量sizeCtl = sc;}break;}}return tab;}
1) put
|
|
2) get
|
|
3) size
- 1.8版本使用basecount保存元素个数,同时利用counterCells数组保存并发情况下的元素个数的变化123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146public int size() {long n = sumCount();return ((n < 0L) ? 0 :(n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE :(int)n);}final long sumCount() {CHM.CounterCell[] as = counterCells; CHM.CounterCell a;long sum = baseCount;if (as != null) {for (int i = 0; i < as.length; ++i) {if ((a = as[i]) != null)sum += a.value;}}return sum;}/*** 修改元素个数与扩容检测* @param x the count to add* @param check if <0, don't check resize, if <= 1 only check if uncontended*/private final void addCount(long x, int check) {CounterCell[] as; long b, s;if ((as = counterCells) != null ||!U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {//首先更新baseCount,只有一个线程成功,其他线程继续逻辑使用counterCells保存变化数CounterCell a; long v; int m;boolean uncontended = true;if (as == null || (m = as.length - 1) < 0 ||(a = as[ThreadLocalRandom.getProbe() & m]) == null ||!(uncontended =U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {//counterCells如果为空或者随机取得的数据索引为空,或者并发更新失败(只有一个线程成功),则进行fullAddCount去自旋初始化或cas修改basecount或CounterCell//ThreadLocalRandom.getProbe() & m随机找一个索引,不同的线程使用不同的种子//uncontended表示是否是冲突而造成的走逻辑,默认为true即非冲突fullAddCount(x, uncontended);return;}if (check <= 1)return;s = sumCount();}//扩容检测部分,与SIZE无关...//略}// See LongAdder version for explanationprivate final void fullAddCount(long x, boolean wasUncontended) {int h;if ((h = ThreadLocalRandom.getProbe()) == 0) {ThreadLocalRandom.localInit(); // force initializationh = ThreadLocalRandom.getProbe();wasUncontended = true;}boolean collide = false; // True if last slot nonemptyfor (;;) {CounterCell[] as; CounterCell a; int n; long v;if ((as = counterCells) != null && (n = as.length) > 0) {//counterCells初始化成功走此逻辑if ((a = as[(n - 1) & h]) == null) {//随机选择一个如果为空if (cellsBusy == 0) { // Try to attach new CellCounterCell r = new CounterCell(x); // Optimistic createif (cellsBusy == 0 &&U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {//只有一个线程进入boolean created = false;try { // Recheck under lockCounterCell[] rs; int m, j;if ((rs = counterCells) != null &&(m = rs.length) > 0 &&rs[j = (m - 1) & h] == null) {rs[j] = r;//创建将新的CounterCell赋值rs[(m - 1) & h]created = true;}} finally {cellsBusy = 0;}if (created)//是创建逻辑则跳出break;//不是创建逻辑则继续continue; // Slot is now non-empty}}collide = false;}else if (!wasUncontended) // CAS already known to failwasUncontended = true; // Continue after rehashelse if (U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))//取得的a = as[(n - 1) & h]不为空则尝试cas更新其值,成功则跳出break;else if (counterCells != as || n >= NCPU)//counterCells != as说明这个表已经被扩容过了//n >= NCPU标识限制最大为cpu核心数,此时之后就无法进入扩容逻辑collide = false; // At max size or staleelse if (!collide)collide = true;else if (cellsBusy == 0 &&U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {//扩容,增加坑位,减少冲突try {if (counterCells == as) {// Expand table unless staleCounterCell[] rs = new CounterCell[n << 1];for (int i = 0; i < n; ++i)rs[i] = as[i];counterCells = rs;}} finally {cellsBusy = 0;}collide = false;continue; // Retry with expanded table}h = ThreadLocalRandom.advanceProbe(h);}else if (cellsBusy == 0 && counterCells == as &&U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {//counterCells为空初始化时走此逻辑,此处cas也只有一个线程进来boolean init = false;try { // Initialize tableif (counterCells == as) {CounterCell[] rs = new CounterCell[2];rs[h & 1] = new CounterCell(x);//h & 1取值应该只有0或1,随机选择一个索引进行数据写入counterCells = rs;init = true;}} finally {cellsBusy = 0;}if (init)break;}else if (U.compareAndSwapLong(this, BASECOUNT, v = baseCount, v + x))//其他线程没有获得初始化counterCells权利则尝试直接修改basecount//成功返回,否则自选修改counterCells的值break; // Fall back on using base}}
4) resize
- 可结合参考其他同学的分析: http://www.daozhihun.com/p/2186 http://www.jianshu.com/p/f6730d5784ad123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180/*** 修改元素个数与扩容检测* @param x the count to add* @param check if <0, don't check resize, if <= 1 only check if uncontended*/private final void addCount(long x, int check) {...//略//扩容检测部分if (check >= 0) {Node<K,V>[] tab, nt; int n, sc;while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&(n = tab.length) < MAXIMUM_CAPACITY) {int rs = resizeStamp(n);if (sc < 0) {if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||transferIndex <= 0)break;if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))transfer(tab, nt);}else if (U.compareAndSwapInt(this, SIZECTL, sc,(rs << RESIZE_STAMP_SHIFT) + 2))transfer(tab, null);s = sumCount();}}}/*** 扩容* Moves and/or copies the nodes in each bin to new table. See* above for explanation.*/private final void transfer(CHM.Node<K,V>[] tab, CHM.Node<K,V>[] nextTab) {int n = tab.length, stride;//stride标识每个线程要处理转移的节点数if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)stride = MIN_TRANSFER_STRIDE; // subdivide rangeif (nextTab == null) { // initiatingtry {"unchecked")(//此处没有并发控制,调用方控制CHM.Node<K,V>[] nt = (CHM.Node<K,V>[])new CHM.Node<?,?>[n << 1];nextTab = nt;} catch (Throwable ex) { // try to cope with OOMEsizeCtl = Integer.MAX_VALUE;return;}nextTable = nextTab;transferIndex = n;//标识从后往前遍历进行扩容}int nextn = nextTab.length;CHM.ForwardingNode<K,V> fwd = new CHM.ForwardingNode<K,V>(nextTab);boolean advance = true;//标识是否处理过boolean finishing = false; // to ensure sweep before committing nextTabfor (int i = 0, bound = 0;;) {CHM.Node<K,V> f; int fh;while (advance) {//advance标识当前节点结束之后才进入该循环进行下个节点的选择,即更新i//i指当前处理的数组编号//bound指处理的边界int nextIndex, nextBound;if (--i >= bound || finishing)advance = false;else if ((nextIndex = transferIndex) <= 0) {i = -1;advance = false;}else if (U.compareAndSwapInt(this, TRANSFERINDEX, nextIndex,nextBound = (nextIndex > stride ?nextIndex - stride : 0))) {//将transferIndex更新至边界值bound = nextBound;i = nextIndex - 1;advance = false;}}if (i < 0 || i >= n || i + n >= nextn) {int sc;if (finishing) {nextTable = null;table = nextTab;sizeCtl = (n << 1) - (n >>> 1);return;}if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)return;finishing = advance = true;i = n; // recheck before commit}}else if ((f = tabAt(tab, i)) == null)//当前位置为空插入ForwardingNode标识处理过了advance = casTabAt(tab, i, null, fwd);else if ((fh = f.hash) == MOVED)//别的线程如果发现当前节点HASH是ForwardingNode的Hash,则跳过advance = true; // already processedelse {synchronized (f) {//锁节点if (tabAt(tab, i) == f) {CHM.Node<K,V> ln, hn;if (fh >= 0) {int runBit = fh & n;//根据runBit将节点分为两类CHM.Node<K,V> lastRun = f;for (CHM.Node<K,V> p = f.next; p != null; p = p.next) {int b = p.hash & n;if (b != runBit) {//找到最后一个需要处理的链表节点,该节点之后的runBit都一样,整体转移即可runBit = b;lastRun = p;}}if (runBit == 0) {ln = lastRun;hn = null;}else {hn = lastRun;ln = null;}//倒序lastRun部分整体不动for (CHM.Node<K,V> p = f; p != lastRun; p = p.next) {int ph = p.hash; K pk = p.key; V pv = p.val;if ((ph & n) == 0)ln = new CHM.Node<K,V>(ph, pk, pv, ln);elsehn = new CHM.Node<K,V>(ph, pk, pv, hn);}//两个链表分别放置setTabAt(nextTab, i, ln);setTabAt(nextTab, i + n, hn);setTabAt(tab, i, fwd);advance = true;}else if (f instanceof CHM.TreeBin) {CHM.TreeBin<K,V> t = (CHM.TreeBin<K,V>)f;CHM.TreeNode<K,V> lo = null, loTail = null;CHM.TreeNode<K,V> hi = null, hiTail = null;int lc = 0, hc = 0;for (CHM.Node<K,V> e = t.first; e != null; e = e.next) {int h = e.hash;CHM.TreeNode<K,V> p = new CHM.TreeNode<K,V>(h, e.key, e.val, null, null);if ((h & n) == 0) {if ((p.prev = loTail) == null)lo = p;elseloTail.next = p;loTail = p;++lc;}else {if ((p.prev = hiTail) == null)hi = p;elsehiTail.next = p;hiTail = p;++hc;}}ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :(hc != 0) ? new CHM.TreeBin<K,V>(lo) : t;hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :(lc != 0) ? new CHM.TreeBin<K,V>(hi) : t;setTabAt(nextTab, i, ln);setTabAt(nextTab, i + n, hn);setTabAt(tab, i, fwd);advance = true;}}}}}}
4) 红黑树
以Hash值作为节点Value的红黑树结构
参见 红黑树
6.2、ConcurrentLinkedQueue
1、offer
2、poll
6.3、几种阻塞队列
1、ArrayBlockingQueue
- 数组组成的
2、LinkedBlockingQueue
- 链表组成的
3、PriorityBlockingQueue
4、DelayQueue
- 元素需要实现DelayQueue接口
- 缓存有效期的设计
- 定时任务的调度设计
5、SynchronousQueue
- 不存储元素,每一个put必须等待一个take
- 吞吐量不低,高于上二者
6、LinkedTransferQueue
- transfer方法等待消费者消费才返回
- trytransfer方法立即返回
7、LinkedBlockingDeque
- 链表组成的双向阻塞队列
- 所有操作可以指定两端
- 可用于工作窃取模式(某个线程从其他队列里面窃取任务执行)
6.4、ForkJoin
- 分割任务
- 执行并合并
- 一般使用子类:RecursiveTask用于有返回型;RecursiveAction用于无返回型123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112package concurrent.concurrentArt.chapter6;import java.util.ArrayList;import java.util.Arrays;import java.util.Collection;import java.util.List;import java.util.concurrent.*;import java.util.stream.Collector;import java.util.stream.Collectors;/*** Created with IntelliJ IDEA.* Date: 2016/10/20* Time: 23:30*/public class ForkJoinTest {/*** RecursiveTask用于有返回的任务*/static class CountTask extends RecursiveTask<Integer> {List<Integer> nums;static final int THRESHOLD = 2;//阀值public CountTask(List<Integer> nums) {this.nums = nums;}protected Integer compute() {if (nums.size() <= THRESHOLD) {return nums.parallelStream().collect(Collectors.summingInt(p -> p));} else {int wall = nums.size() / 2;List left = nums.subList(0, wall);List right = nums.subList(wall, nums.size());CountTask taskLeft = new CountTask(left);CountTask taskRight = new CountTask(right);taskLeft.fork();taskRight.fork();int leftRes = taskLeft.join();int rightRes = taskRight.join();return leftRes + rightRes;}}}interface MyJob {void doSth();}static class Job implements MyJob {public void doSth() {System.out.println("do");}}/*** RecursiveAction用于无返回任务*/static class ForkJoinAction extends RecursiveAction {List<MyJob> jobs;static final int THRESHOLD = 1;//阀值public ForkJoinAction(List<MyJob> jobs) {this.jobs = jobs;}protected void compute() {if (jobs.size() <= THRESHOLD) {jobs.get(0).doSth();} else {int wall = jobs.size() / 2;List left = jobs.subList(0, wall);List right = jobs.subList(wall, jobs.size());ForkJoinAction taskLeft = new ForkJoinAction(left);ForkJoinAction taskRight = new ForkJoinAction(right);taskLeft.fork();taskRight.fork();taskLeft.join();taskRight.join();}}}public static void main(String[] args) throws Exception {ForkJoinPool pool = new ForkJoinPool();ForkJoinTask task = new CountTask(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10));Future res = pool.submit(task);if (!task.isCompletedAbnormally()) {System.out.println(res.get());} else {System.out.println(task.getException());}Job job = new Job();ForkJoinTask action = new ForkJoinAction(Arrays.asList(job, job, job, job, job));pool.submit(action);if (!action.isCompletedAbnormally()) {System.out.println("DONE");} else {System.out.println(action.getException());}}}
第7章、Java中的13个原子操作类
7.1、原子更新基本类型
1、AtomicBoolean
2、AtomicInteger
3、AtomicLong
- 以上都是以CAS为原理进行的原子操作,项目中尽量使用即可,方法列表参见源码
7.2、原子更新数组
1、AtomicIntegerArray
2、AtomicLongArray
3、AtomicReferenceArray
- 原子更新数组中某个值
7.3、原子更新引用类型
1、AtomicReference
2、AtomicReferenceFieldUpdater
3、AtomicMarkableReference
7.4、原子更新字段
1、AtomicIntegerFieldUpdater
2、AtomicLongFieldUpdater
3、AtomicStampedReference