参数
Builder
ConcurrentLinkedHashMap 构造方法私有,只能通过其静态内部类
Builder
来进行实例化
一个完全支持并发检索的哈希表,可调节更新预期并发度,以及限制其最大容量
该实现与 ConcurrentHashMap
的不同之处在于维护了一个页面替换算法(page replacement
algorithm),用于在 map 超出容量时删除元素
这个 map 实现没有共有的构造器,它的实例是通过 Builder
创建的
Builder
支持链式赋值,使用如下
1 2 3 4 5 6 7 8 public static void main (String[] args) { ConcurrentLinkedHashMap<Integer, String> map = new ConcurrentLinkedHashMap .Builder<Integer, String>() .maximumWeightedCapacity(2 ) .weigher(Weighers.singleton()) .listener((key, value) -> System.out.println("元素被丢弃了 key=" + key + " value=" + value)) .concurrencyLevel(32 ) }
调用 build()
方法进行
ConcurrentLinkedHashMap
的实例化
1 2 3 4 public ConcurrentLinkedHashMap<K, V> build () { checkState(capacity >= 0 ); return new ConcurrentLinkedHashMap <K, V>(this ); }
这里可以看到如果 capacity
,即
maximumWeightedCapacity
未进行赋值,则会抛出异常,其他参数均有默认值
1 2 3 4 5 6 7 8 public Builder () { capacity = -1 ; weigher = Weighers.entrySingleton(); initialCapacity = DEFAULT_INITIAL_CAPACITY; concurrencyLevel = DEFAULT_CONCURRENCY_LEVEL; listener = (EvictionListener<K, V>) DiscardingListener.INSTANCE; }
maximumWeightedCapacity
表示该 map
最大能够允许的最大重量,并且有可能暂时超过它(我理解可能驱逐元素是一个惰性过程)
在实例化 ConcurrentLinkedHashMap
时,使用原子类作为容量的记录器
1 2 3 4 5 private ConcurrentLinkedHashMap (Builder<K, V> builder) { capacity = new AtomicLong (Math.min(builder.capacity, MAXIMUM_CAPACITY)); }
涉及 capacity
的方法:
capacity:获取当前 capacity
setCapacity:设置 capacity
hasOverflowed:判断当前重量大小是否超过了 capacity
initialCapacity
表示初始化底层存放元素的 map 容量,默认为 16
1 2 3 4 5 private ConcurrentLinkedHashMap (Builder<K, V> builder) { data = new ConcurrentHashMapV8 <K, Node<K, V>>(builder.initialCapacity, 0.75f , concurrencyLevel); }
ConcurrentLinkedHashMap 底层实现存储的结构是
ConcurrentHashMapV8
,看到其参数和 JUC 下面的
ConcurrentHashMap
一样大致就可以猜到该参数的作用,本质上也是在预估数据量来避免频繁 resize
操作
对于该参数是如何用于实例化 ConcurrentHashMapV8
,在后面的 concurrencyLevel
参数进行展示
该参数只用于实例化 ConcurrentHashMapV8
concurrencyLevel
预估的并发线程数
和 initialCapacity
参数一样,都是为了实例化
ConcurrentHashMapV8
,其参数的作用也和
ConcurrentHashMap
一样(作者都是 Doug Lea)
ConcurrentHashMap
提高并发性能的思想就是 cell
化,按照底层 Entry 数组头元素作为并发粒度,那么
concurrencyLevel
和 initialCapacity
这两个参数必然同时影响底层数组的大小创建
1 2 3 4 5 6 7 8 9 10 public ConcurrentHashMapV8 (int initialCapacity, float loadFactor, int concurrencyLevel) { if (!(loadFactor > 0.0f ) || initialCapacity < 0 || concurrencyLevel <= 0 ) throw new IllegalArgumentException (); if (initialCapacity < concurrencyLevel) initialCapacity = concurrencyLevel; long size = (long )(1.0 + initialCapacity / loadFactor); int cap = (size >= MAXIMUM_CAPACITY) ? MAXIMUM_CAPACITY : tableSizeFor((int )size); this .sizeCtl = cap; }
weigher
重量器,用来衡量一个元素占用了多少重量,默认为
SingletonWeigher
因为 ConcurrentLinkedHashMap 的核心功能是在限制 map
内元素的占用,那么衡量一个元素的权重就被抽象为了
Weigher
其中除了 Weigher
还提供了 EntryWeigher
的抽象,用于对整个 KV 进行重量的衡量
对于 Weigher
实现,会被转换为 EntryWeigher
实现(统一入口)
1 2 3 4 5 6 public Builder<K, V> weigher (Weigher<? super V> weigher) { this .weigher = (weigher == Weighers.singleton()) ? Weighers.<K, V>entrySingleton() : new BoundedEntryWeigher <K, V>(Weighers.asEntryWeigher(weigher)); return this ; }
其中 EntryWeigherView
就是用来将 Weigher
包装为 EntryWeigher
的实现
列举一些自带的实现
Weigher
SingletonWeigher:一个 value 的重量视为 1
CollectionWeigher:value 为集合类型,重量为集合的 size
ListWeigher:value 为 list 类型,重量为 list 的 size
MapWeigher:value 为 map 类型,重量为 map 的 size
EntryWeigher
BoundedEntryWeigher:内部为 EntryWeigher
实现,要求重量大于等于 1
SingletonEntryWeigher:一个 KV 的重量视为 1
EntryWeigherView:用于包装 Weigher
的实现
1 2 3 4 5 6 private ConcurrentLinkedHashMap (Builder<K, V> builder) { weigher = builder.weigher; }
该参数会在 put 元素以及一系列获取 ordered 视图操作中被使用
listener
EvictionListener
的实现,用于驱逐元素时调用该实现的
onEviction(K key, V value)
方法进行通知
默认值为 DiscardingListener.INSTANCE
1 2 3 4 5 6 7 8 9 private ConcurrentLinkedHashMap (Builder<K, V> builder) { listener = builder.listener; pendingNotifications = (listener == DiscardingListener.INSTANCE) ? (Queue<Node<K, V>>) DISCARDING_QUEUE : new ConcurrentLinkedQueue <Node<K, V>>(); }
这里可以看出,如果 listener 是一个默认值,则
pendingNotifications
也会使用默认队列,其实现就是空实现
1 2 3 4 5 6 7 8 9 static final class DiscardingQueue extends AbstractQueue <Object> { @Override public boolean add (Object e) { return true ; } @Override public boolean offer (Object e) { return true ; } @Override public Object poll () { return null ; } @Override public Object peek () { return null ; } @Override public int size () { return 0 ; } @Override public Iterator<Object> iterator () { return emptyList().iterator(); } }
驱逐元素时不会进行通知,使用了空对象模式
在空对象模式(Null Object Pattern)中,一个空对象取代 NULL
对象实例的检查
Null 对象不是检查空值,而是反应一个不做任何动作的关系,这样的 Null
对象也可以在数据不可用的时候提供默认的行为
添加元素
put
和 putIfAbsent
最终都调用
put(K key, V value, boolean onlyIfAbsent)
方法
1 2 3 4 5 6 7 8 9 @Override public V put (K key, V value) { return put(key, value, false ); } @Override public V putIfAbsent (K key, V value) { return put(key, value, true ); }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 V put (K key, V value, boolean onlyIfAbsent) { checkNotNull(key); checkNotNull(value); final int weight = weigher.weightOf(key, value); final WeightedValue<V> weightedValue = new WeightedValue <V>(value, weight); final Node<K, V> node = new Node <K, V>(key, weightedValue); for (;;) { final Node<K, V> prior = data.putIfAbsent(node.key, node); if (prior == null ) { afterWrite(new AddTask (node, weight)); return null ; } else if (onlyIfAbsent) { afterRead(prior); return prior.getValue(); } for (;;) { final WeightedValue<V> oldWeightedValue = prior.get(); if (!oldWeightedValue.isAlive()) { break ; } if (prior.compareAndSet(oldWeightedValue, weightedValue)) { final int weightedDifference = weight - oldWeightedValue.weight; if (weightedDifference == 0 ) { afterRead(prior); } else { afterWrite(new UpdateTask (prior, weightedDifference)); } return oldWeightedValue.value; } } } }
计算权重
经过参数校验后,首先要进行权重的计算和包装
1 2 3 4 5 final int weight = weigher.weightOf(key, value);final WeightedValue<V> weightedValue = new WeightedValue <V>(value, weight);final Node<K, V> node = new Node <K, V>(key, weightedValue);
权重的计算就是调用了 EntryWeigher
实现的
weightOf()
方法
随后将权重值和 value 包装进 WeightedValue
,并包装为
Node
Node
即为链表的节点
1 static final class Node <K, V> extends AtomicReference <WeightedValue<V>> implements Linked <Node<K, V>>
整个保存 KV 的核心结构(上面提到的
ConcurrentHashMapV8
)保存的其实是 key 和 Node
的数据:final ConcurrentMap<K, Node<K, V>> data;
无冲突
随后调用 data
的 putIfAbsent
方法
1 final Node<K, V> prior = data.putIfAbsent(node.key, node);
根据返回的 prior 节点情况,判断后续流程
1 2 3 4 5 6 7 8 9 if (prior == null ) { afterWrite(new AddTask (node, weight)); return null ; } else if (onlyIfAbsent) { afterRead(prior); return prior.getValue(); }
这两种情况均可以视为无冲突情况:
不存在 prior
存在 prior 但是 onlyIfAbsent = true
随后执行相应的 Add 或者 Read 任务(后面解释任务的操作)
需要更新
当 key 已经存在,那么上述 put 方法其实并未完成,只是返回了 K 对应的
value 对象(节点)
可以猜测下面流程需要做的操作:
替换 key 对应的 value;因为 value
和权重是绑定关系,也即替换了权重
进行 Task 流程
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 for (;;) { final WeightedValue<V> oldWeightedValue = prior.get(); if (!oldWeightedValue.isAlive()) { break ; } if (prior.compareAndSet(oldWeightedValue, weightedValue)) { final int weightedDifference = weight - oldWeightedValue.weight; if (weightedDifference == 0 ) { afterRead(prior); } else { afterWrite(new UpdateTask (prior, weightedDifference)); } return oldWeightedValue.value; } }
首先获取 prior 节点的 WeightedValue
结构,当该结构
isAlive()
为 false 时,说明现在的 prior
节点在并发环境下已经成为过去式了,需要跳出循环,重新进行最开始的获取流程
当旧节点没问题,就会调用 CAS 方法来替换新的 weight 和
value,如果替换成功则会根据权重差值来进行 Read 或者 Update 任务
如果 CAS 失败,则会继续对节点进行判断,重复流程
删除元素
按 K 删除
1 2 3 4 5 6 7 8 9 10 11 @Override public V remove (Object key) { final Node<K, V> node = data.remove(key); if (node == null ) { return null ; } makeRetired(node); afterWrite(new RemovalTask (node)); return node.getValue(); }
首先依赖 data
的 remove(key)
方法,拿到节点后进行判断,如果等于 null,说明该 key
没有对应的值,就不需要考虑后续流程了
如果节点不等 null,需要进行
makeRetired()
,本质是修改节点 WeightedValue
的权重为负数(在添加流程中,依靠权重是否为负作为 value
被删除的标志)
随后进行 Removal 任务
按 KV 删除
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 @Override public boolean remove (Object key, Object value) { final Node<K, V> node = data.get(key); if ((node == null ) || (value == null )) { return false ; } WeightedValue<V> weightedValue = node.get(); for (;;) { if (weightedValue.contains(value)) { if (tryToRetire(node, weightedValue)) { if (data.remove(key, node)) { afterWrite(new RemovalTask (node)); return true ; } } else { weightedValue = node.get(); if (weightedValue.isAlive()) { continue ; } } } return false ; } }
在删除的基础上多了对 value 的比较
当 key 存在时,这里用了多个嵌套判断来实现目的或保证并发:
value 是否能对应上,对应不上直接返回删除失败
让 value 退休,调用 tryToRetire(node, weightedValue)
方法,如果失败就重新获取节点的
value,进行存活判断,如果依然存活则重复整个流程(从 1 开始)
调用 data.remove(key, node)
方法,如果成功则进行
Removal 任务,失败则重新整个流程
tryToRetire
在上述删除流程中,都需要调用 tryToRetire
方法
本质上是将节点置为失效状态,失效状态根据节点 value 中的
WeightedValue
的 weight
来标示
1 2 3 4 5 6 7 boolean tryToRetire (Node<K, V> node, WeightedValue<V> expect) { if (expect.isAlive()) { final WeightedValue<V> retired = new WeightedValue <V>(expect.value, -expect.weight); return node.compareAndSet(expect, retired); } return false ; }
后置读写
在上述流程中,各种操作都离不开调用 afterRead
和
afterWrite
方法
后置读写的操作本质上就是在调整链表结构来实现
LRU,对元素进行写入、驱逐和排序
afterWrite
1 2 3 4 5 6 7 8 9 10 void afterWrite (Runnable task) { writeBuffer.add(task); drainStatus.lazySet(REQUIRED); tryToDrainBuffers(); notifyListener(); }
其中 drainStatus 是一个被 AtomicReference
包装的
DrainStatus
,代表当前缓冲操作的状态
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 enum DrainStatus { IDLE { @Override boolean shouldDrainBuffers (boolean delayable) { return !delayable; } }, REQUIRED { @Override boolean shouldDrainBuffers (boolean delayable) { return true ; } }, PROCESSING { @Override boolean shouldDrainBuffers (boolean delayable) { return false ; } }; abstract boolean shouldDrainBuffers (boolean delayable) ; }
当 drainStatus 被并发安全并 lazy 设置为 REQUIRED
后,tryToDrainBuffers()
方法来执行对缓冲区的操作
tryToDrainBuffers
1 2 3 4 5 6 7 8 9 10 11 void tryToDrainBuffers () { if (evictionLock.tryLock()) { try { drainStatus.lazySet(PROCESSING); drainBuffers(); } finally { drainStatus.compareAndSet(PROCESSING, IDLE); evictionLock.unlock(); } } }
首先获取锁资格,evictionLock 是一个
ReentrantLock
:evictionLock = new ReentrantLock();
再将 drainStatus 状态设置为 PROCESSING
;最后调用核心的
drainBuffers
方法
当执行结束后将 drainStatus CAS 修改为 IDLE
,进行解锁
afterRead
1 2 3 4 5 6 7 8 9 10 11 void afterRead (Node<K, V> node) { final int bufferIndex = readBufferIndex(); final long writeCount = recordRead(bufferIndex, node); drainOnReadIfNeeded(bufferIndex, writeCount); notifyListener(); }
其中 recordRead
的操作
1 2 3 4 5 6 7 8 9 10 11 12 13 long recordRead (int bufferIndex, Node<K, V> node) { final AtomicLong counter = readBufferWriteCount[bufferIndex]; final long writeCount = counter.get(); counter.lazySet(writeCount + 1 ); final int index = (int ) (writeCount & READ_BUFFER_INDEX_MASK); readBuffers[bufferIndex][index].lazySet(node); return writeCount; }
读取 readBufferWriteCount 的 counter 的 value 后 +1 写入
这里注释中说明了这个操作不是原子的,意味着可以并发读取,并且会导致
buffer 的重叠和覆盖,还不清楚为什么这样处理
根据后续代码这里记录的 count 主要是用于控制读缓冲区的 drain
操作,可能不需要那么准确,猜测这样处理的原因是因为没必要为了并发安全牺牲性能
随后将 node 记录在 readBuffers 中
drainOnReadIfNeeded
该方法就是用于 drain 读缓冲区
1 2 3 4 5 6 7 8 9 10 11 void drainOnReadIfNeeded (int bufferIndex, long writeCount) { final long pending = (writeCount - readBufferDrainAtWriteCount[bufferIndex].get()); final boolean delayable = (pending < READ_BUFFER_THRESHOLD); final DrainStatus status = drainStatus.get(); if (status.shouldDrainBuffers(delayable)) { tryToDrainBuffers(); } }
根据读 count 计算 pending 数量,根据 pending 值来判断是否执行 drain
操作
其中对于操作的判断来自于
DrainStatus
,即状态枚举实现的抽象方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 enum DrainStatus { IDLE { @Override boolean shouldDrainBuffers (boolean delayable) { return !delayable; } }, REQUIRED { @Override boolean shouldDrainBuffers (boolean delayable) { return true ; } }, PROCESSING { @Override boolean shouldDrainBuffers (boolean delayable) { return false ; } }; abstract boolean shouldDrainBuffers (boolean delayable) ; }
可以看到:
对于
REQUIRED
,说明需要立即刷新缓冲区(这个状态最直接是来自于写操作)
对于
PROCESSING
,说明当前其他线程正在处理,所以不需要该线程再进行操作了
对于 IDLE
,则根据 pending 数量得到的 delayable
来判断
最后也是调用 tryToDrainBuffers
方法来对缓冲区进行 drain
操作
drainWriteBuffer
1 2 3 4 5 6 7 8 9 void drainWriteBuffer () { for (int i = 0 ; i < WRITE_BUFFER_DRAIN_THRESHOLD; i++) { final Runnable task = writeBuffer.poll(); if (task == null ) { break ; } task.run(); } }
获取一定数量的 task,然后执行 task 的 run 方法
每一次 drain 操作都有一个阈值(WRITE_BUFFER_DRAIN_THRESHOLD =
16),应该是为每个线程分摊工作量,在 read 中也是这样类似的设计
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 final class AddTask implements Runnable { final Node<K, V> node; final int weight; AddTask(Node<K, V> node, int weight) { this .weight = weight; this .node = node; } @Override @GuardedBy("evictionLock") public void run () { weightedSize.lazySet(weightedSize.get() + weight); if (node.get().isAlive()) { evictionDeque.add(node); evict(); } } } final class RemovalTask implements Runnable { final Node<K, V> node; RemovalTask(Node<K, V> node) { this .node = node; } @Override @GuardedBy("evictionLock") public void run () { evictionDeque.remove(node); makeDead(node); } } final class UpdateTask implements Runnable { final int weightDifference; final Node<K, V> node; public UpdateTask (Node<K, V> node, int weightDifference) { this .weightDifference = weightDifference; this .node = node; } @Override @GuardedBy("evictionLock") public void run () { weightedSize.lazySet(weightedSize.get() + weightDifference); applyRead(node); evict(); } }
对于 task 的实现,有 3 种:
AddTask
调整 weight
evictionDeque
添加节点(尾插)
调用驱逐方法 evict
RemovalTask
evictionDeque
移除节点
设置节点 dead(包括调整 weight)
UpdateTask
调整 weight
调整 node 到尾部
调用驱逐方法 evict
为什么 UpdateTask
也需要驱逐元素? 因为
weight 的计算方法是由实现类提供的,可能根据不同的情况计算
weight,更新操作也可能更新 weight,所以需要进行驱逐操作
驱逐方法 evict
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 void evict () { while (hasOverflowed()) { final Node<K, V> node = evictionDeque.poll(); if (node == null ) { return ; } if (data.remove(node.key, node)) { pendingNotifications.add(node); } makeDead(node); } }
该方法就是为了实现 LRU,如果超出了 weight,则会从
evictionDeque
中 poll
出元素后,记录待通知队列,标记删除
标记删除删除的是 Map 结构中的元素(修改其 weight 标记删除),其中
node 也是 Map 中持有的引用
drainReadBuffers
1 2 3 4 5 6 7 void drainReadBuffers () { final int start = (int ) Thread.currentThread().getId(); final int end = start + NUMBER_OF_READ_BUFFERS; for (int i = start; i < end; i++) { drainReadBuffer(i & READ_BUFFERS_MASK); } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 void drainReadBuffer (int bufferIndex) { final long writeCount = readBufferWriteCount[bufferIndex].get(); for (int i = 0 ; i < READ_BUFFER_DRAIN_THRESHOLD; i++) { final int index = (int ) (readBufferReadCount[bufferIndex] & READ_BUFFER_INDEX_MASK); final AtomicReference<Node<K, V>> slot = readBuffers[bufferIndex][index]; final Node<K, V> node = slot.get(); if (node == null ) { break ; } slot.lazySet(null ); applyRead(node); readBufferReadCount[bufferIndex]++; } readBufferDrainAtWriteCount[bufferIndex].lazySet(writeCount); }
上述操作可以概括成几个要点:
每一次 drain 操作分配了 buffer 操作的阈值
获取 node,释放 node
将 node 移动到队尾
记录 readBufferDrainAtWriteCount
驱逐监听器
最后就是 notifyListener
方法调用
EvictionListener
的实现来进行元素驱逐的通知功能
相关的方法在
afterRead
、afterWrite
、setCapacity
中被调用
1 2 3 4 5 6 void notifyListener () { Node<K, V> node; while ((node = pendingNotifications.poll()) != null ) { listener.onEviction(node.key, node.getValue()); } }
遍历 pendingNotifications 中被驱逐的 node,调用 listener 的
onEviction
方法
参考
ben-manes/concurrentlinkedhashmap:
A ConcurrentLinkedHashMap for Java (github.com)
https://code.google.com/p/concurrentlinkedhashmap/