Java并发相关工具类 AQS 相关 AbstractQueuedSynchronizer AQS AQS 是一个用来构建锁和同步器的框架,使用 AQS 能简单且高效地构造出应用广泛的大量的同步器。
核心工作流程:
使用volatile修饰的statue变量表示共享资源的状态。如果被请求的共享资源空闲,则将当前请求资源的线程设置为有效的工作线程(Thread exclusiveOwnerThread
),并且将共享资源设置为锁定状态。
如果被请求的共享资源被占用,那么就需要一套线程阻塞等待以及被唤醒时锁分配的机制,这个机制 AQS 是用 CLH 队列锁实现的,即将暂时获取不到锁的线程加入到队列中。
CLH(Craig,Landin,and Hagersten)
队列是一个虚拟的双向队列(虚拟的双向队列即不存在队列实例,仅存在结点之间的关联关系)
通过自旋+CAS获取共享资源,如果获取失败则调用调用 native
方法 进入 park
状态。
1 2 3 4 5 6 7 8 9 10 11 /** waitStatus value to indicate thread has cancelled */ static final int CANCELLED = 1; /** waitStatus value to indicate successor's thread needs unparking */ static final int SIGNAL = -1; /** waitStatus value to indicate thread is waiting on condition */ static final int CONDITION = -2; /** * waitStatus value to indicate the next acquireShared should * unconditionally propagate */ static final int PROPAGATE = -3;
使用模板模式提供了 锁与同步的框架,将一些实现延迟到子类中实现。 需要具体子类实现的方法:
1 2 3 4 5 6 7 8 9 isHeldExclusively()//该线程是否正在独占资源。只有用到condition才需要去实现它。 tryAcquire(int)//独占方式。尝试获取资源,成功则返回true,失败则返回false。 tryRelease(int)//独占方式。尝试释放资源,成功则返回true,失败则返回false。 tryAcquireShared(int)//共享方式。尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。 tryReleaseShared(int)//共享方式。尝试释放资源,成功则返回true,失败则返回false。
比如独占锁相关的方法调用框架
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 public class AbstractQueuedSynchronizer { public final void acquire (int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); } public final boolean release (int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0 ) unparkSuccessor(h); return true ; } return false ; } }
比如ReentrantLock 实现了独占方式的锁及用到了condition
AQS 相关方法 获取锁的框架方法
1 2 3 4 5 6 7 8 9 public class AbstractQueuedSynchronizer { public final void acquire (int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); } }
addWaiter:用于添加节点到队尾
如果队尾节点存在直接CAS添加
如果队尾节点不存在,使用for自旋先添加空的头节点 ,再添加当前线程的队尾节点
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 public class AbstractQueuedSynchronizer { final boolean acquireQueued (final Node node, int arg) { boolean failed = true ; try { boolean interrupted = false ; for (; ; ) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null ; failed = false ; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true ; } } finally { if (failed) cancelAcquire(node); } } }
acquireQueued
:CLH节点休眠与被唤醒后的主要处理逻辑
进入一段自旋
节点正常添加到队尾后,如果当前节点的前驱为头节点,使用CAS尝试获取。获取成功后设置当前节点为头结点。之前的头节点让GC回收
获取失败,则进入shouldParkAfterFailedAcquire方法。
shouldParkAfterFailedAcquire
方法:
正常的尾节点添加,需要使用CAS先把前驱节点的状态变成signal,通过acquireQueued的自旋,再进入到挂起的状态。
若前驱节点声明为取消CANCELLED状态,则需要找到非CANCELLED的前驱节点并连接上,取消的节点排除在双链表外。
parkAndCheckInterrupt方法:正常尾结点添加完成之后,进入到挂起的逻辑。
释放锁的框架方法 1 2 3 4 5 6 7 8 9 10 11 12 13 public class AbstractQueuedSynchronizer { public final boolean release (int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0 ) unparkSuccessor(h); return true ; } return false ; } }
tryRelease方法:需要自定义,正常这步已经把锁给释放了,即修改状态为0,资源当前线程为null
unparkSuccessor:资源释放后的队列抢资源逻辑
定位到头结点,CAS修改状态为0
从尾到头寻找最早一个需要唤醒的线程节点,独占锁模式正常定位到头结点的下一个节点。
执行唤醒逻辑,线程重新进入到acquireQueued
条件队列 condition:条件队列的实现,常可以用在生产者-消费者的场景中。在所对象中内置一个newCondition方法,用于创建一个条件队列。 condition 的方法主要就两个await
等待 、signal
唤醒
条件队列与阻塞队列
阻塞队列为Lock中获取共享资源的CLH阻塞双向链表队列 ,AQS中设置head和tail变量。
条件队列为conditionObject中维护的一个单向链表
1 2 3 4 5 6 7 8 public class ConditionObject implements Condition , java.io.Serializable { private static final long serialVersionUID = 1173984872572414699L ; private transient Node firstWaiter; private transient Node lastWaiter; }
AbstractQueueSynchronizer中的Node节点,分别代表上述两数据结构的节点
条件队列流程简叙:
生产者condition,首先需要获取当前锁,调用await方法。
await方法中会创建condition state 的node 节点,加入到condition的条件单向链表中。
释放当前持有的锁资源,进入循环判断节点是否进入到AQS阻塞队列中。没有则挂起等待唤醒。
若有另一个线程唤醒了生产者,唤醒signal的逻辑中会把唤醒的节点,从当前条件队列中移除并添加到阻塞队列中
await的线程被唤醒后,进入阻塞队列的队列获取锁逻辑,如果前驱节点为头结点,尝试去获取锁资源,否则修改前驱节点为带唤醒状态。等待锁资源释放后的调用。
注意点:在条件队列中,等待的线程节点,即使发生了中断,节点依然会转移到阻塞队列。主要通过是唤醒之后,执行检查中断状态为true的transferAfterCancelledWait实现。 这里描绘了一个场景,本来有个线程,它是排在条件队列的后面的,但是因为它被中断了,那么它会被唤醒,然后它发现自己不是被 signal 的那个,但是它会自己主动去进入到阻塞队列。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 static final class Node { static final Node SHARED = new Node (); static final Node EXCLUSIVE = null ; static final int CANCELLED = 1 ; static final int SIGNAL = -1 ; static final int CONDITION = -2 ; static final int PROPAGATE = -3 ; volatile int waitStatus; volatile Node prev; volatile Node next; volatile Thread thread; Node nextWaiter; }
await方法 interruptMode 中断状态:
REINTERRUPT(1)
: 代表 await 返回的时候,需要重新设置中断状态
THROW_IE(-1)
: 代表 await 返回的时候,需要抛出 InterruptedException 异常
0
:说明在 await 期间,没有发生中断
源码实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 public class AbstractQueuedSynchronizer { public final void await () throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException (); Node node = addConditionWaiter(); int savedState = fullyRelease(node); int interruptMode = 0 ; while (!isOnSyncQueue(node)) { LockSupport.park(this ); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0 ) break ; } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null ) unlinkCancelledWaiters(); if (interruptMode != 0 ) reportInterruptAfterWait(interruptMode); } }
signal方法 源码实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 public class AbstractQueuedSynchronizer { public final void signal () { if (!isHeldExclusively()) throw new IllegalMonitorStateException (); Node first = firstWaiter; if (first != null ) doSignal(first); } private void doSignal (Node first) { do { if ((firstWaiter = first.nextWaiter) == null ) lastWaiter = null ; first.nextWaiter = null ; } while (!transferForSignal(first) && (first = firstWaiter) != null ); } final boolean transferForSignal (Node node) { if (!compareAndSetWaitStatus(node, Node.CONDITION, 0 )) return false ; Node p = enq(node); int ws = p.waitStatus; if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) LockSupport.unpark(node.thread); return true ; } }
ReentrantLock 定义基于AQS实现独占锁及condition。ReentrantLock定义了抽象静态父类Sync提供基础的一些实现。
AQS独占锁获取流程简述:
非公平锁: 获取资源会先直接CAS去获取,没获取到进入acquire模板逻辑,若发现此时状态为0,会再进行一次CAS获取。
公平锁: 获取资源,会先检查是否有CLH队列存在,直接返回,不进行获取的逻辑。
以下共同逻辑
首先自旋+CAS操作添加当前线程节点到队尾。(包括初始化逻辑)。
接着判断前驱节点是否为头结点,如果为头结点就尝试一次获取。否则就修改前驱节点的状态为signal待唤醒状态,并挂起线程。
当有其他线程唤醒时,同样判断前驱是否为头结点,为头节点才获取。获取成功后,修改自身为头结点。脱离双向链表结构。
unlock逻辑:状态减1,当状态为零时,释放共享资源。CAS替换头结点状态为0,从尾到头找第一个状态<0的节点,唤醒线程。
非公平锁 源码实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 public class ReentrantLock { final void lock () { if (compareAndSetState(0 , 1 )) setExclusiveOwnerThread(Thread.currentThread()); else acquire(1 ); } protected final boolean tryAcquire (int acquires) { return nonfairTryAcquire(acquires); } final boolean nonfairTryAcquire (int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0 ) { if (compareAndSetState(0 , acquires)) { setExclusiveOwnerThread(current); return true ; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0 ) throw new Error ("Maximum lock count exceeded" ); setState(nextc); return true ; } return false ; } }
公平锁 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 public class ReentrantLock { protected final boolean tryAcquire (int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0 ) { if (!hasQueuedPredecessors() && compareAndSetState(0 , acquires)) { setExclusiveOwnerThread(current); return true ; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0 ) throw new Error ("Maximum lock count exceeded" ); setState(nextc); return true ; } return false ; } }
CountdownLatch 定义:实现了AQS的共享锁,初始化的时候设置了AQS的state的数量。主要方法是await 和 countdown方法
await
实际调用AQS的acquireShared模板方法
如果state为0,表示数全部被countdown了,不阻塞方法。
数量不为0,新建的Node节点添加CLH队列中,更新前缀节点为-1。
countdown()
方法:CAS+自旋扣减statue状态。当状态为0时,唤醒await等待的线程。
countdown 使用自旋加CAS更新状态,状态为0时,更新等待队列头结点为0,唤醒头结点后的第一个待唤醒节点。
唤醒后的节点自己设置为头节点,更新状态为0,并依次唤醒后序节点。
countDown Demo :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 public class TestCountdownLatch { static CountDownLatch countDownLatch = new CountDownLatch (2 ); public static void main (String[] args) throws InterruptedException { Runnable runnable = () -> { try { countDownLatch.await(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + ": countDown complete" ); }; new Thread (runnable).start(); new Thread (runnable).start(); TimeUnit.SECONDS.sleep(1 ); countDownLatch.countDown(); countDownLatch.countDown(); countDownLatch.await(); } }
CyclicBarrier CyclicBarrier (可重复使用的栅栏): 内部使用ReentrantLock 非公平锁,每次await时,加锁扣减数量,使用condition的await等待唤醒。CountDownLatch 基于 AQS 的共享模式的使用,而 CyclicBarrier 基于 Condition 来实现。
数量扣减为0时,如果有定义栅栏开始的方法则执行,并调用condition的signAll,条件单链表逐个唤醒。 generation 代表栅栏重复使用的一代或者一个周期。
什么时候栅栏会被打破,总结如下:
中断,如果某个等待的线程发生了中断,那么会打破栅栏,同时抛出 InterruptedException 异常;
超时,打破栅栏,同时抛出 TimeoutException 异常;
指定执行的操作抛出了异常。
源码实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 public class CyclicBarrier { public int await () throws InterruptedException, BrokenBarrierException { try { return dowait(false , 0L ); } catch (TimeoutException toe) { throw new Error (toe); } } private int dowait (boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException { final ReentrantLock lock = this .lock; lock.lock(); try { final Generation g = generation; int index = --count; if (index == 0 ) { boolean ranAction = false ; try { final Runnable command = barrierCommand; if (command != null ) command.run(); ranAction = true ; nextGeneration(); return 0 ; } finally { if (!ranAction) breakBarrier(); } } for (; ; ) { try { if (!timed) trip.await(); else if (nanos > 0L ) nanos = trip.awaitNanos(nanos); } catch (InterruptedException ie) { if (g == generation && !g.broken) { breakBarrier(); throw ie; } else { Thread.currentThread().interrupt(); } } } } finally { lock.unlock(); } } }
CyclicBarrier Demo
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 public class TestCyclicBarrier { private static CyclicBarrier cyclicBarrier = new CyclicBarrier (4 , () -> log.info("barrier on going" )); public static void main (String[] args) throws InterruptedException { Runnable runnable = () -> { String name = Thread.currentThread().getName(); try { log.info("{} cyclicBarrier await, parties:{}, waitingNumber:{}" , name, cyclicBarrier.getParties(), cyclicBarrier.getNumberWaiting()); cyclicBarrier.await(); } catch (InterruptedException e) { e.printStackTrace(); } catch (BrokenBarrierException e) { e.printStackTrace(); } log.info("{} : start to work" , name); }; workProcess(runnable); TimeUnit.SECONDS.sleep(2 ); log.info("============= next loop ===========" ); cyclicBarrier.reset(); workProcess(runnable); } private static void workProcess (Runnable runnable) throws InterruptedException { for (int i = 0 ; i < 4 ; i++) { new Thread (runnable).start(); TimeUnit.SECONDS.sleep(1 ); } } }
Semaphore 基于AQS区分公平锁与非公平锁
acquire:获取锁CAS+ 自旋 获取锁,如果发现资源为0,进入队列等待。
release:自旋+CAS释放锁,如果释放成功,唤醒队列的节点起来获取。
1 2 3 public Semaphore(int permits, boolean fair) { sync = fair ? new FairSync(permits) : new NonfairSync(permits); }
Semaphore Demo
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 @Slf4j public class TestSemaphore { private static Semaphore semaphore = new Semaphore (3 , true ); public static void main (String[] args) { createThread1(); createThread2(); createThread3(); } private static void createThread1 () { new Thread (() -> { while (!Thread.currentThread().isInterrupted()) { try { semaphore.acquire(2 ); log.info("AAA-" +semaphore.availablePermits()); TimeUnit.MILLISECONDS.sleep(100 ); } catch (InterruptedException e) { e.printStackTrace(); } finally { semaphore.release(2 ); } } }).start(); } private static void createThread2 () { new Thread (() -> { while (!Thread.currentThread().isInterrupted()) { try { semaphore.acquire(); log.info("BBB-" + semaphore.availablePermits()); TimeUnit.MILLISECONDS.sleep(100 ); } catch (InterruptedException e) { e.printStackTrace(); } finally { semaphore.release(); } } }).start(); } private static void createThread3 () { new Thread (() -> { while (!Thread.currentThread().isInterrupted()) { try { semaphore.acquire(3 ); log.info("CCC-" + semaphore.availablePermits()); TimeUnit.MILLISECONDS.sleep(100 ); } catch (InterruptedException e) { e.printStackTrace(); } finally { semaphore.release(3 ); } } }).start(); } }
ReentrantReadWriteLock 内部分别定义了读锁与写锁。
读锁共享锁实现 。若当前头结点为状态0,则更新状态为propagate,保证共享锁的传播特性。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 public class ReentrantReadWriteLock { protected final int tryAcquireShared (int unused) { Thread current = Thread.currentThread(); int c = getState(); if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current) return -1 ; int r = sharedCount(c); if (!readerShouldBlock() && r < MAX_COUNT && compareAndSetState(c, c + SHARED_UNIT)) { } } }
独占锁与共享锁 独占锁:独占锁模式下,每次只能有一个线程能持有锁, ReentrantLock 就是以独占方式实现的互斥锁。
独占锁是一种悲观保守的加锁策略,它避免了读/读冲突,如果某个只读线程获取锁,则其他读线程都只能等待,这种情况下就限制了不必要的并发性,因为读操作并不会影响数据的一致性。
共享锁则允许多个线程同时获取锁,并发访问 共享资源,如: ReadWriteLock。 共享锁则是一种乐观锁,它放宽了加锁策略,允许多个执行读操作的线程同时访问共享资源。
AQS 的内部类 Node 定义了两个常量 SHARED 和 EXCLUSIVE,他们分别标识 AQS 队列中等待线程的锁获取模式。
java 的并发包中提供了 ReadWriteLock,读-写锁。它允许一个资源可以被多个读操作访问,或者被一个 写操作访问,但两者不能同时进行。
线程池中的Worker(独占锁实现) 详见Java线程章节
Atomic 原子类 原子类主要基于CAS操作实现,同时使用 volatile 保证可见性。
原子类型 使用原子的方式更新基本类型
AtomicInteger
:整型原子类
AtomicLong
:长整型原子类
AtomicBoolean
:布尔型原子类
AtomicInteger
类常用方法
1 2 3 4 5 6 7 8 9 10 11 12 13 public final int get() //获取当前的值 public final int getAndSet(int newValue)//获取当前的值,并设置新的值 public final int getAndIncrement()//获取当前的值,并自增 public final int getAndDecrement() //获取当前的值,并自减 public final int getAndAdd(int delta) //获取当前的值,并加上预期的值 boolean compareAndSet(int expect, int update) //如果输入的数值等于预期值,则以原子方式将该值设置为输入值(update),添加失败返回false public final void lazySet(int newValue)//最终设置为newValue,使用 lazySet 设置之后可能导致其他线程在之后的一小段时间内还是可以读到旧的值。
数组类型 数组类型:使用原子的方式更新数组里的某个元素
AtomicIntegerArray
:整型数组原子类
AtomicLongArray
:长整型数组原子类
AtomicReferenceArray
:引用类型数组原子类
AtomicIntegerArray
类常用方法
1 2 3 4 5 6 7 8 9 10 11 12 13 public final int get(int i) //获取 index=i 位置元素的值 public final int getAndSet(int i, int newValue)//返回 index=i 位置的当前的值,并将其设置为新值:newValue public final int getAndIncrement(int i)//获取 index=i 位置元素的值,并让该位置的元素自增 public final int getAndDecrement(int i) //获取 index=i 位置元素的值,并让该位置的元素自减 public final int getAndAdd(int i, int delta) //获取 index=i 位置元素的值,并加上预期的值 boolean compareAndSet(int i, int expect, int update) //如果输入的数值等于预期值,则以原子方式将 index=i 位置的元素值设置为输入值(update) public final void lazySet(int i, int newValue)//最终 将index=i 位置的元素设置为newValue,使用 lazySet 设置之后可能导致其他线程在之后的一小段时间内还是可以读到旧的值。
引用类型 引用类型
AtomicReference
:引用类型原子类
AtomicMarkableReference
:原子更新带有标记的引用类型。该类将 boolean 标记与引用关联起来,也可以解决使用 CAS 进行原子更新时可能出现的 ABA 问题。
AtomicStampedReference
:原子更新带有版本号的引用类型。该类将整数值与引用关联起来,可用于解决原子的更新数据和数据的版本号,可以解决使用 CAS 进行原子更新时可能出现的 ABA 问题。
AtomicReference 类使用示例
1 2 3 4 5 6 7 8 9 10 11 12 public class Solution { public void test () { AtomicReference<Person> ar = new AtomicReference <Person>(); Person person = new Person ("SnailClimb" , 22 ); ar.set(person); Person updatePerson = new Person ("Daisy" , 20 ); ar.compareAndSet(person, updatePerson); System.out.println(ar.get().getName()); System.out.println(ar.get().getAge()); } }
AtomicStampedReference 类使用示例
1 2 3 4 5 6 7 // 实例化、取当前值和 stamp 值 final Integer initialRef = 0, initialStamp = 0; final AtomicStampedReference<Integer> asr = new AtomicStampedReference<>(initialRef, initialStamp); // compare and set 操作 final Integer newReference = 666, newStamp = 999; final boolean casResult = asr.compareAndSet(initialRef, newReference, initialStamp, newStamp);
对象的属性修改类型 如果需要原子更新某个类里的某个字段时,需要用到对象的属性修改类型原子类。
AtomicIntegerFieldUpdater
:原子更新整型字段的更新器
AtomicLongFieldUpdater
:原子更新长整型字段的更新器
AtomicReferenceFieldUpdater
:原子更新引用类型里的字段
1 2 3 4 AtomicIntegerFieldUpdater<User> a = AtomicIntegerFieldUpdater.newUpdater(User.class, "age"); User user = new User("Java", 22); System.out.println(a.getAndIncrement(user));// 22
线程安全集合
ConcurrentHashMap
: 线程安全的 HashMap
CopyOnWriteArrayList
: 线程安全的 List,在读多写少的场合性能非常好,远远好于 Vector.
ConcurrentLinkedQueue
: 高效的并发队列,使用链表实现。可以看做一个线程安全的 LinkedList,这是一个非阻塞队列。
BlockingQueue
: 这是一个接口,JDK 内部通过链表、数组等方式实现了这个接口。表示阻塞队列,非常适合用于作为数据共享的通道。
ConcurrentSkipListMap
: 跳表的实现。这是一个 Map,使用跳表的数据结构进行快速查找。
blockingQueue 阻塞队列方法区分:
take()
和put()
是阻塞的获取和存储元素 的方法,
poll()
和offer()
是不阻塞的获取元素和存储元素 的方法,并且poll和offer可以指定超时时间。
add()
和remove()
存取元素,队列满时add抛异常,队列空时remove抛异常
ArrayBlockingQueue ArrayBlockingQueue
是 BlockingQueue 接口的有界队列实现类,底层采用数组来实现。ArrayBlockingQueue
一旦创建,容量不能改变。其并发控制采用可重入锁来控制,不管是插入操作还是读取操作,都需要获取到锁才能进行操作。
ArrayBlockingQueue
默认情况下不能保证线程访问队列的公平性。因为底层使用一个ReentrantLock,因此可以设置公平锁和非公平锁。
LinkedBlockingQueue LinkedBlockingQueue
底层基于单向链表实现的阻塞队列,可以当做无界队列也可以当做有界队列 来使用,同样满足 FIFO 的特性。 而 LinkedBlockingQueue
之所以能够高效的处理并发数据,还因为其对于生产者端和消费者端分别采用了独立的锁来控制数据同步,这也意味着在高并发的情况下生产者和消费者可以并行地操作队列中的数据,以此来提高整个队列的并发性能。 使用两个ReentrantLock,takeLock和putLock两把锁,分别用于阻塞队列的读写线程,也就是说,读线程和写线程可以同时运行,在多线程高并发场景,应该可以有更高的吞吐量,性能比单锁更高。
PriorityBlockingQueue PriorityBlockingQueue
是一个支持优先级的无界阻塞队列。默认情况下元素采用自然顺序进行排序,也可以通过自定义类实现 compareTo() 方法来指定元素排序规则,或者初始化时通过构造器参数 Comparator
来指定排序规则。
PriorityBlockingQueue
并发控制采用的是 ReentrantLock
,队列为无界队列
SynchronousQueue SynchronousQueue
是一个不存储元素的阻塞队列。每一个 put 操作必须等待一个 take 操作,否则不能继续添加元素。
队列本身并不存储任何元素,非常适合于传递性场景,比如在一个线程中使用的数据,传递给另外一个线程使用 , SynchronousQueue 的吞吐量高于 LinkedBlockingQueue 和ArrayBlockingQueue。
DelayQueue DelayQueue
延迟队列实现使用数据结构使用PriorityQueue,线程安全协作 使用的是ReentrantLock 与 Condition 条件队列实现。关键的实现在take方法的available.awaitNanos(delay);
队列中的元素必须是Delayed的实现类 延迟队列:可应用于缓存失效及定时任务中。
take()
方法源码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 public class TestDelayQueue { public E take () throws InterruptedException { final ReentrantLock lock = this .lock; lock.lockInterruptibly(); try { for ( ; ; ) { E first = q.peek(); if (first == null ) available.await(); else { long delay = first.getDelay(NANOSECONDS); if (delay <= 0 ) return q.poll(); first = null ; if (leader != null ) available.await(); else { Thread thisThread = Thread.currentThread(); leader = thisThread; try { available.awaitNanos(delay); } finally { if (leader == thisThread) leader = null ; } } } } } finally { if (leader == null && q.peek() != null ) available.signal(); lock.unlock(); } } }
相关资料