02同步容器
同步容器
同步容器的单个操作都是线程安全的,但是组合操作需要手动加锁,例如遍历删除。
同步容器由于对所有的方法都加了锁会有效率问题, 所以推荐使用并发容器。
同步容器包装类
1 |
|
- SynchronizedList中的listIterator和listIterator(int index) 方法并没有做同步处理,而是快速失败机制,所以在进行遍历时,需要开发者手动加锁
并发容器
- 特点以Concurrent开头或者CopyOnWriteArrayList/Set
并发MAP
ConcurrentMap接口
- putIfAbsent:与原有put方法不同的是,putIfAbsent方法中如果插入的key相同,则不替换原有的value值;
- remove:与原有remove方法不同的是,新remove方法中增加了对value的判断,如果要删除的key-value不能与Map中原有的key-value对应上,则不会删除该元素;
- replace(K,V,V):增加了对value值的判断,如果key-oldValue能与Map中原有的key-value对应上,才进行替换操作;
- replace(K,V):与上面的replace不同的是,此replace不会对Map中原有的key-value进行比较,如果key存在则直接替换;
ConcurrentHashMap
- 具体解析在JAVA基础中的java集合篇
ConcurrentNavigableMap接口与ConcurrentSkipListMap类
- ConcurrentSkipListMap是线程安全的有序的哈希表,适用于高并发的场景。
- ConcurrentSkipListMap是通过跳表实现的
- 调用ConcurrentSkipListMap的size时,由于多个线程可以同时对映射表进行操作,所以映射表需要遍历整个链表才能返回元素个数,这个操作是个O(log(n))的操作。在数据量一定的情况下,并发的线程越多,ConcurrentSkipListMap越能体现出他的优势。
并发Queue
阻塞队列和非阻塞队列
- 阻塞队列,当取出或者添加元素时,操作会阻塞住,直到操作被成功执行
- TransferQueue和BlockingQueue
- 向TransferQueue添加元素会一直阻塞知道这个元素被其他消费者消费
ConcurrentLinkedQueue
- 基于链接节点的无界线程安全队列。
- 一个基于链接节点的无界线程安全队列。此队列按照 FIFO(先进先出)原则对元素进行排序。队列的头部 是队列中时间最长的元素。队列的尾部是队列中时间最短的元素。新的元素插入到队列的尾部,队列获取操作从队列头部获得元素。当多个线程共享访问一个公共 collection 时,
ConcurrentLinkedQueue
是一个恰当的选择 - 此队列不允许使用
null
元素。 - ConcurrentLinkedQueue使用CAS非阻塞算法实现使用CAS解决了当前节点与next节点之间的安全链接和对当前节点值的赋值
BlockingQueue
- 七种实现
- ArrayBlockingQueue :一个由数组结构组成的有界阻塞队列。
- LinkedBlockingQueue :一个由链表结构组成的可选有界阻塞队列。如果未指定容量,那么容量将等于
Integer.MAX_VALUE
。 2 ^ 31-1 - PriorityBlockingQueue :一个支持优先级排序的无界阻塞队列。
- DelayQueue:一个使用优先级队列实现的无界阻塞队列,,只有在延迟期满时才能从中提取元素。
- SynchronousQueue:一个不存储元素、没有内部容量的阻塞队列。
- LinkedTransferQueue:一个由链表结构组成的无界阻塞TransferQueue队列。
- LinkedBlockingDeque:一个由链表结构组成的可选范围双向阻塞队列。如果未指定容量,那么容量将等于
Integer.MAX_VALUE
。 2 ^31-1
- 特点
支持两个附加操作的
Queue
,这两个操作是:获取元素时等待队列变为非空,以及存储元素时等待空间变得可用(阻塞)。BlockingQueue
方法以四种形式出现,对于不能立即满足但可能在将来某一时刻可以满足的操作,这四种形式的处理方式不同:第一种是抛出一个异常,第二种是返回一个特殊值(null
或false
,具体取决于操作),第三种是在操作可以成功前,无限期地阻塞当前线程,第四种是在放弃前只在给定的最大时间限制内阻塞BlockingQueue
不接受null
元素。试图add
、put
或offer
一个null
元素时,某些实现会抛出NullPointerException
。null
被用作指示poll
操作失败的警戒值BlockingQueue
可以是限定容量的。它在任意给定时间都可以有一个remainingCapacity
,超出此容量,便无法无阻塞地put
附加元素。没有任何内部容量约束的BlockingQueue
总是报告Integer.MAX_VALUE
的剩余容量。BlockingQueue
实现主要用于生产者-使用者队列,BlockingQueue
可以安全地与多个生产者和多个使用者一起使用。但它另外还支持Collection
接口。因此,举例来说,使用remove(x)
从队列中移除任意一个元素是有可能的。然而,这种操作通常不 会有效执行,只能有计划地偶尔使用,比如在取消排队信息时。BlockingQueue
实现是线程安全的。所有排队方法都可以使用内部锁或其他形式的并发控制来自动达到它们的目的。然而,大量的 Collection 操作(addAll
、containsAll
、retainAll
和removeAll
)没有 必要自动执行,除非在实现中特别说明。因此,举例来说,在只添加了c
中的一些元素后,addAll(c)
有可能失败(抛出一个异常)。BlockingQueue
实质上不 支持使用任何一种“close”或“shutdown”操作来指示不再添加任何项。这种功能的需求和使用有依赖于实现的倾向。例如,一种常用的策略是:对于生产者,插入特殊的 end-of-stream 或 poison 对象,并根据使用者获取这些对象的时间来对它们进行解释。内存一致性效果:当存在其他并发 collection 时,将对象放入
BlockingQueue
之前的线程中的操作 happen-before 随后通过另一线程从BlockingQueue
中访问或移除该元素的操作。
原理
- 利用了Lock锁的多条件(Condition)阻塞控制
- 首先是构造器,除了初始化队列的大小和是否是公平锁之外,还对同一个锁(lock)初始化了两个监视器,分别是notEmpty和notFull。这两个监视器的作用目前可以简单理解为标记分组,当该线程是put操作时,给他加上监视器notFull,标记这个线程是一个生产者;当线程是take操作时,给他加上监视器notEmpty,标记这个线程是消费者。
- PUT流程
- 所有执行put操作的线程竞争lock锁,拿到了lock锁的线程进入下一步,没有拿到lock锁的线程自旋竞争锁。
- 判断阻塞队列是否满了,如果满了,则调用await方法阻塞这个线程,并标记为notFull(生产者)线程,同时释放lock锁,等待被消费者线程唤醒。
- 如果没有满,则调用enqueue方法将元素put进阻塞队列。注意这一步的线程还有一种情况是第二步中阻塞的线程被唤醒且又拿到了lock锁的线程。
- 唤醒一个标记为notEmpty(消费者)的线程。
- TAKE流程
- 所有执行take操作的线程竞争lock锁,拿到了lock锁的线程进入下一步,没有拿到lock锁的线程自旋竞争锁。
- 判断阻塞队列是否为空,如果是空,则调用await方法阻塞这个线程,并标记为notEmpty(消费者)线程,同时释放lock锁,等待被生产者线程唤醒。
- 如果没有空,则调用dequeue方法。注意这一步的线程还有一种情况是第二步中阻塞的线程被唤醒且又拿到了lock锁的线程。
- 唤醒一个标记为notFull(生产者)的线程。
- 注意
- put和take操作都需要先获取锁,没有获取到锁的线程会被挡在第一道大门之外自旋拿锁,直到获取到锁。
- 就算拿到锁了之后,也不一定会顺利进行put/take操作,需要判断队列是否可用(是否满/空),如果不可用,则会被阻塞,并释放锁。
- 在第2点被阻塞的线程会被唤醒,但是在唤醒之后,依然需要拿到锁才能继续往下执行,否则,自旋拿锁,拿到锁了再while判断队列是否可用(这也是为什么不用if判断,而使用while判断的原因)
ArrayBlockingQueue
- 特点
- 一个由数组支持的有界阻塞队列。此队列按 FIFO(先进先出)原则对元素进行排序。队列的头部 是在队列中存在时间最长的元素。队列的尾部 是在队列中存在时间最短的元素。新元素插入到队列的尾部,队列获取操作则是从队列头部开始获得元素。
- 是否采用公平锁,默认是非公平锁
- 这是一个典型的“有界缓存区”,固定大小的数组在其中保持生产者插入的元素和使用者提取的元素。一旦创建了这样的缓存区,就不能再增加其容量。试图向已满队列中
放入
元素会导致操作受阻塞;试图从空队列中提取
元素将导致类似阻塞。 - 此类支持对等待的生产者线程和使用者线程进行排序的可选公平策略。默认情况下,不保证是这种排序。然而,通过将公平性 (fairness) 设置为
true
而构造的队列允许按照 FIFO 顺序访问线程。公平性通常会降低吞吐量,但也减少了可变性和避免了“不平衡性”。 - ArrayBlockingQueue内部有个循环数组items用来存放队列元素,putIndex下标标示入队元素下标,takeIndex是出队下标,count统计队列元素个数,从定义可知道并没有使用volatile修饰,这是因为访问这些变量使用都是在锁块内,并不存在可见性问题。
- 有个独占锁lock用来对出入队操作加锁,这导致同时只有一个线程可以访问入队出队,
- notEmpty,notFull条件变量用来进行出入队的同步。另外构造函数必须传入队列大小参数,所以为有界队列,默认是Lock为非公平锁。
- offer操作:在队尾插入元素,如果队列满则返回false,否者入队返回true。由于在操作共享变量前加了锁【final ReentrantLock lock = this.lock; 获取独占锁 lock.lock();】,所以不存在内存不可见问题,加过锁后获取的共享变量都是从主内存获取的,而不是在CPU缓存或者寄存器里面的值,释放锁后修改的共享变量值会刷新会主内存中。另外这个队列是使用循环数组实现,所以计算下一个元素存放下标时候有些特殊。【i=putIndex; return(++i == items.length) ? 0 : i;】另外insert后调用 notEmpty.signal();是为了激活调用notEmpty.await()阻塞后放入notEmpty条件队列中的线程。
- put操作:在队列尾部添加元素,如果队列满则等待队列有空位置插入后返回。【final ReentrantLock lock = this.lock; 获取可被中断锁 lock.lockInterruptibly();】如果队列满了那么当前线程会阻塞,直到出队操作调用了notFull.signal方法激活该线程。
- size操作:获取队列元素个数,非常精确,因为计算size时候加了独占锁,其他线程不能入队或者出队或者删除元素
- ArrayBlockingQueue通过使用全局独占锁实现同时只能有一个线程进行入队或者出队操作,这个锁的粒度比较大,有点类似在方法上添加synchronized的意味。其中offer,poll操作通过简单的加锁进行入队出队操作,而put,take则使用了条件变量实现如果队列满则等待,如果队列空则等待,然后分别在出队和入队操作中发送信号激活等待线程实现同步。另外相比LinkedBlockingQueue,ArrayBlockingQueue的size操作的结果是精确的,因为计算前加了全局锁。
LinkedBlockingQueue
- 特点
- 一个基于已链接节点的、范围任意的 blocking queue。此队列按 FIFO(先进先出)排序元素。队列的头部 是在队列中时间最长的元素。队列的尾部 是在队列中时间最短的元素。新元素插入到队列的尾部,并且队列获取操作会获得位于队列头部的元素。链接队列的吞吐量通常要高于基于数组的队列,但是在大多数并发应用程序中,其可预知的性能要低。
- 可选的容量范围构造方法参数作为防止队列过度扩展的一种方法。如果未指定容量,则它等于
Integer.MAX_VALUE
。除非插入节点会使队列超出容量,否则每次插入后会动态地创建链接节点。 - LinkedBlockingQueue中也有两个Node分别用来存放首尾节点,并且里面有个初始值为0的原子变量count用来记录队列元素个数,另外里面有两个ReentrantLock的独占锁,分别用来控制元素入队和出队加锁,其中takeLock用来控制同时只有一个线程可以从队列获取元素,其他线程必须等待,putLock控制同时只能有一个线程可以获取锁去添加元素,其他线程必须等待。另外notEmpty和notFull用来实现入队和出队的同步。 另外由于出入队是两个非公平独占锁,所以可以同时又一个线程入队和一个线程出队,其实这个是个生产者-消费者模型。
PriorityBlockingQueue
- 特点
- 一个基于优先级堆的无界阻塞队列。,它使用与类
PriorityQueue
相同的顺序规则,并且提供了阻塞获取操作。虽然此队列逻辑上是无界的,但是资源被耗尽时试图执行 add 操作也将失败(导致OutOfMemoryError
)。 - 此类不允许使用
null
元素。依赖自然顺序的优先级队列也不允许插入不可比较的对象(这样做会导致抛出ClassCastException
)。 - 此类及其迭代器可以实现
Collection
和Iterator
接口的所有可选 方法。iterator()
方法中提供的迭代器并不 保证以特定的顺序遍历 PriorityBlockingQueue 的元素。如果需要有序地进行遍历,则应考虑使用Arrays.sort(pq.toArray())
。此外,可以使用方法drainTo
按优先级顺序移除 全部或部分元素,并将它们放在另一个 collection 中。 - 在此类上进行的操作不保证具有同等优先级的元素的顺序。如果需要实施某一排序,那么可以定义自定义类或者比较器,比较器可使用修改键断开主优先级值之间的联系。例如,以下是应用先进先出 (first-in-first-out) 规则断开可比较元素之间联系的一个类。要使用该类,则需要插入一个新的
FIFOEntry(anEntry)
来替换普通的条目对象。 - PriorityBlockingQueue内部有个数组queue用来存放队列元素,size用来存放队列元素个数,allocationSpinLockOffset是用来在扩容队列时候做cas的,目的是保证只有一个线程可以进行扩容。
- 由于这是一个优先级队列所以有个比较器comparator用来比较元素大小。lock独占锁对象用来控制同时只能有一个线程可以进行入队出队操作。notEmpty条件变量用来实现take方法阻塞模式。这里没有notFull 条件变量是因为这里的put操作是非阻塞的,为啥要设计为非阻塞的是因为这是无界队列。
- 最后PriorityQueue q用来搞序列化的。如下构造函数,默认队列容量为11,默认比较器为null;
- 一个基于优先级堆的无界阻塞队列。,它使用与类
DelayQueue
- 一个使用优先级队列实现的无界阻塞队列,只有在延迟期满时才能从中提取元素。
- DelayQueue队列中每个元素都有个过期时间,并且队列是个优先级队列,当从队列获取元素时候,只有过期元素才会出队列。
- 特点
Delayed
元素的一个无界阻塞队列,只有在延迟期满时才能从中提取元素。该队列的头部 是延迟期满后保存时间最长的Delayed
元素。如果延迟都还没有期满,则队列没有头部,并且poll
将返回null
。- 当一个元素的
getDelay(TimeUnit.NANOSECONDS)
方法返回一个小于等于 0 的值时,将发生到期。即使无法使用take
或poll
移除未到期的元素,也不会将这些元素作为正常元素对待。例如,size
方法同时返回到期和未到期元素的计数。 - 此队列不允许使用 null 元素。
- DelayQueue中内部使用的是PriorityQueue存放数据,使用ReentrantLock实现线程同步,可知是阻塞队列。另外队列里面的元素要实现Delayed接口,一个是获取当前剩余时间的接口,一个是元素比较的接口,因为这个是有优先级的队列。
SychronousQueue
- 一个不存储元素、没有内部容量的阻塞同步队列。
- SynchronousQueue的吞吐量高于LinkedBlockingQueue 和 ArrayBlockingQueue。
- 特点
- 一个不存储元素、没有内部容量的阻塞队列,其中每个插入操作必须等待另一个线程的对应移除操作 ,反之亦然。同步队列没有任何内部容量,甚至连一个队列的容量都没有。不能在同步队列上进行
peek
,因为仅在试图要移除元素时,该元素才存在;除非另一个线程试图移除某个元素,否则也不能(使用任何方法)插入元素;也不能迭代队列,因为其中没有元素可用于迭代。队列的头 是尝试添加到队列中的首个已排队插入线程的元素;如果没有这样的已排队线程,则没有可用于移除的元素并且poll()
将会返回null
。对于其他Collection
方法(例如contains
),SynchronousQueue
作为一个空 collection。 - 此队列不允许
null
元素。 - 同步队列类似于 CSP 和 Ada 中使用的 简单聚集(rendezvous)机制信道。它非常适合于传递性设计,在这种设计中,在一个线程中运行的对象要将某些信息、事件或任务传递给在另一个线程中运行的对象,它就必须与该对象同步。
- 对于正在等待的生产者和使用者线程而言,此类支持可选的公平排序策略。默认情况下不保证这种排序。但是,使用公平设置为
true
所构造的队列可保证线程以 FIFO 的顺序进行访问。 - SynchronousQueue的吞吐量高于LinkedBlockingQueue 和 ArrayBlockingQueue。
- 一个不存储元素、没有内部容量的阻塞队列,其中每个插入操作必须等待另一个线程的对应移除操作 ,反之亦然。同步队列没有任何内部容量,甚至连一个队列的容量都没有。不能在同步队列上进行
LinkedTransferQueue
- 特点
- LinkedTransferQueue是一个由链表结构组成的无界阻塞TransferQueue队列。
- 相对于其他阻塞队列LinkedTransferQueue多了tryTransfer和transfer方法。
- transfer方法。如果当前有消费者正在等待接收元素(消费者使用take()方法或带时间限制的poll()方法时),transfer方法可以把生产者传入的元素立刻transfer(传输)给消费者。如果没有消费者在等待接收元素,transfer方法会将元素存放在队列的tail节点,并等到该元素被消费者消费了才返回。
- tryTransfer方法。则是用来试探下生产者传入的元素是否能直接传给消费者。如果没有消费者等待接收元素,则返回false。和transfer方法的区别是tryTransfer方法无论消费者是否接收,方法立即返回。而transfer方法是必须等到消费者消费了才返回。
- 对于带有时间限制的tryTransfer(E e, long timeout, TimeUnit unit)方法,则是试图把生产者传入的元素直接传给消费者,但是如果没有消费者消费该元素则等待指定的时间再返回,如果超时还没消费元素,则返回false,如果在超时时间内消费了元素,则返回true。
并发Deque
- 一个线性 collection,支持在两端插入和移除元素。名称 deque 是“double ended queue(双端队列)”的缩写,通常读为“deck”。大多数
Deque
实现对于它们能够包含的元素数没有固定限制,但此接口既支持有容量限制的双端队列,也支持没有固定大小限制的双端队列。
LinkList
- LinkedList是基于链表实现的,从源码可以看出是一个双向链表。除了当做链表使用外,它也可以被当作堆栈、队列或双端队列进行操作。
ConcurrentLinkedDeque
- 并发队列ConcurrentLinkedDeque,这是一个非阻塞,无锁,无界 ,线程安全双端操作的队列。简单说就是ConcurrentLinkedQueue的升级版,在JDK7之后才提供。该队列也不允许空元素,而且size方法并不是常量时间,其需要遍历链表,此时并发修改链表会造成统计size不正确。同样,bulk操作和equal以及toArray方法不保证原子性。
- 适合“多生产,多消费”的场景。
- 内存一致性遵循对 ConcurrentLinkedDeque 的插入操作先行发生于(happen-before)访问或移除操作。
LinkedBlockingDeque
- 一个基于已链接节点的、任选范围的阻塞双端队列。
- 可选的容量范围构造方法参数是一种防止过度膨胀的方式。如果未指定容量,那么容量将等于
Integer.MAX_VALUE
。只要插入元素不会使双端队列超出容量,每次插入后都将动态地创建链接节点。 - 大多数操作都以固定时间运行(不计阻塞消耗的时间)。异常包括
remove
、removeFirstOccurrence
、removeLastOccurrence
、contains
、iterator.remove()
以及批量操作,它们均以线性时间运行。
Copy On Write
copy on write 机制
- 写入时复制思想,当有多个调用者同时去请求一个资源数据的时候,有一个调用者出于某些原因需要对当前的数据源进行修改,这个时候系统将会复制一个当前数据源的副本给调用者修改,并且写时会加锁,保证同一时刻只有一个调用者进行写操作
- CopyOnWrite容器即写时复制的容器,当我们往一个容器中添加元素的时候,不直接往容器中添加,而是将当前容器进行copy,复制出来一个新的容器,然后向新容器中添加我们需要的元素,最后将原容器的引用指向新容器。
- 好处:
- 我们可以在并发的场景下对容器进行”读操作”而不需要”加锁”,从而达到读写分离的目的。从JDK 1.5 开始Java并发包里提供了两个使用CopyOnWrite机制实现的并发容器 ,分别是CopyOnWriteArrayList和CopyOnWriteArraySet 。我们着重给大家介绍一下CopyOnWriteArrayList。
CopyOnWriteArrayList
- 优点:CopyOnWriteArrayList经常被用于“读多写少”的并发场景,是因为CopyOnWriteArrayList无需任何同步措施,大大增强了读的性能。在Java中遍历线程非安全的List(如:ArrayList和 LinkedList)的时候,若中途有别的线程对List容器进行修改,那么会抛出ConcurrentModificationException异常。CopyOnWriteArrayList由于其”读写分离”,遍历和修改操作分别作用在不同的List容器,所以在使用迭代器遍历的时候,则不会抛出异常。
- 缺点:
- CopyOnWriteArrayList每次执行写操作都会将原容器进行拷贝一份,数据量大的时候,内存会存在较大的压力,可能会引起频繁Full GC(ZGC因为没有使用Full GC)。比如这些对象占用的内存200M左右,那么再写入100M数据进去,内存就会多占用300M。
- CopyOnWriteArrayList由于实现的原因,写和读分别作用在不同新老容器上,在写操作执行过程中,读不会阻塞,但读取到的却是老容器的数据。