Snatcher 是 Reflow 框架中一个极其重要的 抗阻塞 线程同步工具。贯穿于Reflow
框架的各个角落,是Reflow
无锁(lock-free
)的基础。
你可能立马要问了:java
有丰富的锁实现,如最基础的关键字synchronized
,以及ReentrantLock
、ReadWriteLock
、StampedLock
等。你为何自作聪明要实现一套呢?这里暂重申关键词:
无锁(
lock-free
)
为了更形象地介绍Snatcher
组件,下面举一个假想场景的例子以帮助更好地理解。现假设:
有一个理想状态下的工厂。每当工人完成一项工作任务
T
,就需要去一个固定的地点P
打卡交付任务,并去下一个地点领取下一项任务T1
的安排及必要的工具和物资,或临时休息(可不领取物资)。T1
是不可预测的,需要根据上一个领取的任务T0
决定。报酬会按每项任务的难度和时长合理分配,很公平(不用担心领取任务的“优劣”)。而打卡交付的顺序也是不可预测的,因为每个任务的时长不同。基于上述原因,这个地点P
有下列问题:
- 基于功能和职责考虑,必须有且只有一个
P
地点;- 是一个房间,工作内容
T'
是根据屏幕显示的指令C
处理交付的物品(C
需要根据实时的资源状况和从传送带输入的物品实时计算得来,不可预知),只需一个工人在此处工作即可;- 人流量很随机,有时很拥挤,有时没人,有时断断续续有零星几个人前来打卡交付物资;
- 没人愿意在此地工作,因为不能自由安排自己的时间,报酬还不一定多。因为每天工人领到的任务时长也很随机,导致可能某天打卡交付的总体人流量很少,而工作
T'
的报酬也是按劳分配,没有例外。
- 这也显示了整体工作量不够饱和,浪费人力资源。
现在问:
如何合理地解决在打卡地
P
的工人安排的问题,而且最好是最优解(使效率最大化,整体工作量饱和)?
- 重申以下前提:
- 工厂里的每个员工都能够自由地安排自己的时间;
- 工厂里的每个员工都能胜任任何工作(工作
T'
也很简单,而且每处理 一个人交付的物品 的时长也极短,不会大量堆积); - 工厂里的每个工作任务都是按劳分配报酬(不会因为任务特殊或没人愿意干而增加报酬)。
基于以上条件,现设计出如下规则和流程:
- 每个人
W
在到达地点P
后,先按下一个信号灯S0
(打卡和信号合二为一,房间内会亮灯,表示有人来,房间外的大按钮只能点亮);- 然后
W
需要看P
房间里有没有人(门口外面有灯S1
,房间只有一个门),此时有两种情况:
- a. 有人(灯
S1
亮):W
可以走了(去领取下一个任务,或休息);- b. 没人(
S1
不亮):W
需要进入房间并关好门(点亮S1
),并接手工作T'
。步骤:
- ⅰ. 先把灯
S0
熄灭(只能在房间内关闭该灯),以免误判又有人来了(可减少不必要的等待);- ⅱ. 根据屏幕显示的指令操作,直到传送带上 没有 物品为止;
- ⅲ. 看灯
S0
是否点亮,又有两种情况:
- ①. 亮:重复上述
2.b.ⅰ ~ 2.b.ⅲ
的操作;- ②. 灭:打开房门(灯
S1
熄灭),并回头看一眼S0
:
- 亮:回到上述
2.b
分支(即:重新关好房门,并重复上述2.b.ⅰ ~ 2.b.ⅲ
的操作);- 灭:回到上述
2.a
分支(即:离开)。
- 该流程没有要求工人交付任务时必须排队,只要保证把物品放入传送带,并按下
S0
。这样做的目的是使效率最大化:工人可以及时领取下一个任务或休息。
- 如果有人
W'
不看灯S1
,并且直接走了,那么也有几种情况:S1
亮:有人处理,W'
获得该任务报酬;S1
灭:W'
之后没有其他人(当天):W'
的任务得不到真正交付,得不到报酬;W'
在打卡时人很多:别人会看灯S1
并保证有人处理,W'
获得该任务报酬;W'
在打卡时只有自己,但过了一会又有人W''
来打卡:W''
看灯S1
并保证有人处理,W'
获得该任务报酬;W''
不看灯,成为W'
,又回到开头的 case。
- 在重复
2.b.ⅰ ~ 2.b.ⅲ
的操作,执行到步骤2.b.ⅱ
时,发现传送带上并没有物品。其实这是可预见的,也是规则设定的一部分,所以无妨。这说明在上一轮的步骤2.b.ⅱ
都已经处理完(不考虑工人作弊的情况,有其它机制处理作弊的工人,但与进入房间接手工作T'
的人W
无关),如果接下来S0
也没有被点亮,就意味着W
此时已经顺利完成工作T'
,并可以获得相应的报酬。 - 如果
W
看到S0
未被重新点亮,打开房门(灯S1
熄灭),而 不 回头看一眼S0
,直接离开。会出现什么问题?- a. 没问题。回头看一眼
S0
,大多数情况是不亮的(和不看没区别);即使亮了,外面的人此时也会看到S1
是熄灭的状态,进而尝试进来(与正要出来的W
有个沟通决定谁进去); - b. 临界情况。外面只有一个人
W'
(后面很久没人来),且看到S1
是亮的,所以走了。但与此同时,W
已经看完S0
(未被重新点亮),准备开门还未打开(刚达到步骤2.b.ⅲ.②
还未执行,且动作稍慢)。接下来,S0
亮(因为W'
总是先点亮S0
再看S1
,但动作比较快,几乎同时),W
开门,直接离开。现按时间先后顺序梳理一下发生的动作:W
看S0
(熄灭),W'
点亮S0
,W'
看S1
(亮灯),W'
离开(跑的飞快),W
开门,W
离开。 都走了(重申:后面很久没人来),就漏掉了对W'
交付的处理。所以这个W
回头看一眼S0
的规则设定很有必要:- 大多数情况下,走上述
3.a
的 case; - 再看
3.b
,时间线变成了:W
看S0
(熄灭),W'
点亮S0
,W'
看S1
(亮灯),W'
离开(跑的飞快),W
开门,W
回头看一眼S0
,发现灯亮,重新关好房门继续工作。
- 大多数情况下,走上述
- a. 没问题。回头看一眼
- 如果不按上述规则和流程走,而是各自进入房间处理自己交付的物品,会有什么问题?显然会堵塞拥挤不堪,原本人流量不密集的,这时也大概率会拥堵。因为工人进进出出,既大幅占用总体时间,也有巨大的沟通摩擦成本。势必效率低下。
到此,是不是完美地解决了在打卡地的工人安排的问题?!
而且房间里处理交付物品的人W
丝毫不需要多余的等待,干完活就走,也使效率最大化了。至于人流拥挤时,W
工作时长多一些,但多劳多得(这时可以变通一下,把门打开或把S1
按灭,如果有人进来,W
可以商量换人;没有人进来,则说明传送带上已经没有多少物品需要处理了)。
现在回过头来类比到Snatcher
组件的源代码。
// 对应上述信号灯`S0`
private val signature = new AtomicBoolean(false)
// 对应上述信号灯`S1`
private val scheduling = new AtomicBoolean(false)
…
def snatch(): Boolean = {
// 外面的人按下信号灯`S0`(true 表示亮灯)
signature.set(true)
// 具体看下面的实现
tryLock()
}
…
private def tryLock() =
// 这是一个合成操作,也是原子操作:
// 保证了“看灯、推门进入、关门(点亮`S1`)”这三个动作
// 一气呵成,不会有第二个人同时进入房间(如果灯`S1`是熄灭状态),
// 对应于上述步骤`2.a`和`2.b`,不过还未到`2.b.ⅰ`。
// 这里第一个参数`false`,表示“如果灯`S1`是熄灭状态”,第二个参数`true`表示“那就点亮`S1`吧”,合起来就是:
// 如果灯`S1`是熄灭状态,那就点亮吧(代码层面其实没有“门”,只有信号。但有个抢的概念,抢到为上,如果
// 符合本条代码所表示的规则而且执行返回`true`,就意味着抢到了。代码会严格按规则行事,不会像现实中的人那样不遵守规定乱来)。
if (scheduling.compareAndSet(false, true)) {
// 到步骤`2.b.ⅰ`了:熄灭`S0`(false 表示熄灯)。
signature.set(false)
// 暂不用管
thread.set(Thread.currentThread)
// 因为上面已经抢占成功,就还是返回抢占成功的标识。
true
} else {
// 到步骤`2.a`了,与上面的 case 对应,返回表示抢占失败的标识。
false
}
…
// Snatcher 组件的使用入口(理解了逻辑,也可以根据需要自己改写)。
def tryOn(doWork: => Unit, …): Boolean = {
…
// 这里返回的就是上述的抢占成功或失败
if (snatch()) { // 如果抢占成功
breakable { while (true) {
// 干活,对应上述步骤`2.b.ⅱ`。
doWork
// 干完活,看灯`S0`,对应上述步骤`2.b.ⅲ`的所有 case,以决定是继续重复干活,还是开门离开。
// 具体看下面的实现
if (!glance()) {
break // 表示开门离开
}
// 没走`break` case,那就继续走`while (true)`循环,重复`2.b.ⅰ ~ 2.b.ⅲ`的操作。
} }
true
} else false
}
…
def glance(): Boolean =
// 看灯`S0`,对应步骤`2.b.ⅲ`。
// 也是个合成的原子操作,意思是说:如果灯`S0`被点亮,那就按灭,并继续干活;否则开门…
if (signature.compareAndSet(true, false)) {
// 指示继续干活的标识,对应步骤`2.b.ⅲ.①`。
true.ensuring(scheduling.get, "xxx")
} else {
// 暂不管它
thread.set(null)
// 对应步骤`2.b.ⅲ.②`:打开房门(灯`S1`熄灭)。
scheduling.set(false)
// 对应步骤`2.b.ⅲ.②`:回头看一眼`S0`。如果灯`S0`灭,则直接返回指示“离开”的标识(即:false,也正是灯灭的信号)。
signature.get
// 对应步骤`2.b.ⅲ.②`下面的灯`S0`亮分支。这里与现实中稍有不同的是:因为刚刚打开房门了,需要重新抢占
//(重申:代码层面没有“门”的概念),抢占成功,则返回指示“继续”的标识,否则返回指示“离开”的标识。
&& tryLock() // 具体实现,前面已经分析过(字面上也形成闭环了)。
}
Snatcher
整个代码里没有类似synchronized
、ReentrantLock
之类的同步锁机制,仅用了cas
操作实现了两个信号的原子切换,所以,是无锁(lock-free
)的,事实上,还是wait-free
的。
这就是无锁吗?总感觉哪里不对(我读书少,你不要骗我 😂)。
现在大概理解了
Snatcher
的实现逻辑或原理。但跟Reflow
的无锁有什么关系?这样就能说Reflow
是无锁的吗?
众所周知,lock-free/wait-free
是多线程/并发编程的最高境界,是高性能程序的基石,也是程序员不懈追求的目标。lock-free
的程序意味着每个线程都能够畅通无阻地跑满(或空闲),而对共享对象的访问操作依然井然有序,不会对它们造成破坏性影响;而不用时不时等待(暂停,工作不饱和)其它线程释放锁,从而自己能够获得锁,进而得以执行某些原子操作。而且有锁的代码,有潜在死锁的风险。
- 关于
lock-free/wait-free
更专业的说法,一搜一大把,就不赘述了。 - 为什么要尽可能跑满?就像工人的工作量不饱和,就会导致整体效率低下。对于特定的一台设备,
CPU
等硬件资源是固定的,如果不能使线程跑满,想要达到相同的效果,就需要启动更多的线程,但又会消耗内存和CPU
时间分片,显然不如跑满。
前面说了:
Snatcher
贯穿于Reflow
框架的各个角落,是Reflow
无锁(lock-free
)的基础。
Reflow
里有许多类似上述 打卡地P
房间 的操作:某些事只需要一个人(线程)干就好了,其它人(线程)可以马上略过去干其它事情,没必要在这干等着。
- 线程的调度完全可以类比为 某 leader 指挥某 team 的所有人干活。如果团队里总有一些人偷懒,或由于某些原因 需要等待别人完成某任务后 自己才能继续,显然工作量不饱和,效率较低。
例如,Reflow
里有这样一个场景:从四个优先级桶中取出某桶里的某个任务丢进线程池,需要有人(线程)干这件事,那派谁呢?事实上,有启动Reflow
工作流的线程做这件事;也有线程池中干完了某 任务 的线程做这件事;可能线程池里的多个线程同时干完自己当前的 任务,然后都同时干这件事。像极了 P
房间 的场景,Snatcher
组件就派上用场了,再也不用纠结到底派哪个线程干,调用Snatcher
的tryOn()
方法即可。在这里,各个线程就像工厂模型
里面的工人,到达房间后只需 按灯S0、看灯S1,或进入房间,或直接略过。畅通无阻,丝毫不需等待停留。写法如下:
// 用法特别简单
snatcher.tryOn {
// “从四个优先级桶中取出某桶里的某个任务丢进线程池”的一系列操作
}
但假如用传统的多线程编程方式,一般有如下写法:
void xxx() {
…
synchronized(object) {
// “从四个优先级桶中取出某桶里的某个任务丢进线程池”的一系列操作
}
…
}
// 或:
def xxx() {
…
lock.lock()
try {
// “从四个优先级桶中取出某桶里的某个任务丢进线程池”的一系列操作
} finally {
lock.unlock()
}
…
}
无论哪种写法,当某线程到达synchronized
或lock.lock()
时,都要看其它线程是否同时到达这里或其内部(还未出synchronized
代码块,或lock.unlock()
),没有很好(该线程直接进入),但如果有,则需要等那个线程出synchronized
代码块,或lock.unlock()
。显然,线程的工作量没有无锁的饱和,效率没有最大化。就相当于在工厂模型
中,每个打卡交付物品的工人都得自己进入P
房间,处理自己的物品,然后出来,另一个人再进房间 这样一个逻辑,显然效率极低。
- 顺便说一句,同步块语句(或方法)保证
原子性
的底层逻辑,是 happens-before 原则。而该原则又基于内存可见性
概念(用visible
解释happens-before
)。
最后的重要又关键的问题:snatcher.tryOn()
有没有原子性
或happens-before
保证,又是如何保证的?
- 有!!! 不过
原子性
取决于用法和界定,类比锁机制来说,也是如此(必须公用同一个锁对象)。 - 对于
happens-before
,两个信号灯就是保证,具体来说:
// 对应信号灯`S0`
val signature = new AtomicBoolean(false)
// 对应信号灯`S1`
val scheduling = new AtomicBoolean(false)
AtomicBoolean
内部有个volatile
变量,信号灯就是修改的这个变量。volatile
在java 内存模型
中,有明确的happens-before
支持:原生的 可见性 保证。但这还不够,因为仅仅是针对访问volatile
变量的。我要的是对snatcher.tryOn(doWork)
代码块也有happens-before
保证,这恰好涉及到volatile
的另一个能力:禁止指令重排序,配合可见性,就实现了该保证。具体来说:
以信号
S0
开始,保证了 工人把物品放入传送带后打卡(按灯) 的操作结果(程序中的数据或对象)对之后的工作T'
(doWork
)可见,而以信号S1
收尾又保证了工作T'
的结果对所有人(线程)可见。
对于Snatcher
的使用,需要特别注意是:
tryOn()
的参数doWork
(即:具体工作内容的代码块),不能是带上下文参数的,必须是固定的操作(字面量是相同的,且对于任何线程都一样)。如果需要带上下文参数,可使用 Snatcher.ActionQueue。
Snatcher
实现起来比较简单,也有较大的缺点,没理解透彻就很容易出错,这可能是没有把它纳入到 java 标准库的原因。