你会怎么实现下面这个场景?应用首页有三个优先级从高到低的弹窗,展示内容依赖网络请求,若低优先级弹窗请求先返回则需等待,让高优先级先展示。

串行请求是最容易想到的解决方案,即先请求最高优先级的弹窗,当它返回展示后再请求第二优先级弹窗。但这样会拉长所有弹窗的展示时间。

性能更好的方案是同时并行发出三个请求,但网络请求时长的不确定性使得最高优先级的弹窗不一定优先返回,所以得设计一种优先级阻塞机制。本文使用 协程 + 链式队列 实现这个机制。

# 异步任务链

把单个异步任务进行抽象:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// 单个异步任务
class Item {
companion object {
// 默认异步优先级
const val PRIORITY_DEFAULT = 0
}

// 异步操作:耗时的异步操作
var suspendAction: (suspend () -> Any?)? = null
set(value) {
field = value
value?.let {
// 启动协程执行异步操作
GlobalScope.launch { deferred = async { it.invoke() } }
}
}
// 异步响应:异步操作结束后要做的事情
var resumeAction: ((Any?) -> Unit)? = null
// 异步结果:异步操作返回值
var deferred: Deferred<*>? = null
// 异步优先级
var priority: Int = PRIORITY_DEFAULT
}

异步任务有三个主要的属性,分别是异步操作 suspendAction 、异步响应 resumeAction 、异步结果 deferred 。每当异步操作被赋值时,就启动协程执行它,并将其返回值保存在 deferred 中。

为了确保异步任务的优先级,把多个异步任务用链的方式串起来,组成异步任务链

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
class Item {
companion object {
const val PRIORITY_DEFAULT = 0
}

var suspendAction: (suspend () -> Any?)? = null
set(value) {
field = value
value?.let {
GlobalScope.launch { deferred = async { it.invoke() } }
}
}

var resumeAction: ((Any?) -> Unit)? = null
var deferred: Deferred<*>? = null
var priority: Int = PRIORITY_DEFAULT

// 异步任务前结点(Item 包含 Item)
internal var next: Item? = null
// 异步任务后结点(Item 包含 Item)
internal var pre: Item? = null

// 在当前结点后插入新结点
internal fun addNext(item: Item) {
next?.let {
it.pre = item
item.next = it
item.pre = this
this.next = item
} ?: also { // 尾结点插入
this.next = item
item.pre = this
item.next = null
}
}

// 在当前结点前插入新结点
internal fun addPre(item: Item) {
pre?.let {
it.next = item
item.pre = it
item.next = this
this.pre = item
} ?: also { // 头结点插入
item.next = this
item.pre = null
this.pre = item
}
}
}

自己包含自己 的方式就能实现链式结构。Android 消息列表也用同样的结构:

1
2
3
4
public final class Message implements Parcelable {
Message next;
...
}

链必须有个头结点,存放头结点的类就是存放整个链的类,就好像消息列表 MessageQueue 一样:

1
2
3
public final class MessageQueue {
Message mMessages;
}

模仿消息列表,写一个异步任务链

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
// 异步任务链
class SuspendList {
// 头结点
private var head: Item = emptyItem()

// 向异步任务链插入结点
fun add(item: Item) {
// 从头结点向后查找插入位置,找到再后插入
head.findItem(item.priority).addNext(item)
}

// 根据优先级向后查找插入位置(优先级升序)
private fun Item.findItem(priority: Int): Item {
// 当前结点
var p: Item? = this
// 当前结点的后续结点
var next: Item? = p?.next
// 从当前结点开始向后遍历异步任务链
while (next != null) {
// 若优先级介于当前结点和其后续结点之间,则表示找到插入位置
if (priority in p!!.priority until next.priority) {
break
}
p = p.next
next = p?.next
}
return p!!
}

// 观察异步任务链并按优先级阻塞
fun observe() = GlobalScope.launch(Dispatchers.Main) {
// 从头结点向后遍历异步任务链
var p: Item? = head.next
while (p != null) {
// 在每个异步结果上阻塞,直到异步任务完成后执行异步响应
p.resumeAction?.invoke(p.deferred?.await())
p = p.next
}
}

// 异步任务(已讲解不再赘述)
class Item { ... }
}

// 空结点(头结点)
fun emptyItem(): SuspendList.Item = SuspendList.Item().apply { priority = -1 }

SuspendList 持有链的头结点,为了使 “头插入” 和 “中间插入” 复用一套代码,将头结点设置为 “空结点”。

异步任务链上的任务按优先级升序排列(优先级数字越小优先级越高)。这保证了优先级最高的异步任务总是在链表头。

优先级阻塞:当所有异步任务都被添加到链之后,调用 observe() 观察整个异步任务链。该方法启动了一个协程,在协程中从头结点向后遍历链,并在每个异步任务的 Deferred 上阻塞。因为链表已按优先级排序,所以阻塞时也是按优先级从高到低进行的。

# 全局作用域

真实业务场景中,需要统一安排优先级的异步任务可能是跨界面的。这就要求异步任务链能全局访问,单例是一个最直接的选择,但它限制了整个 App 中异步任务链的数量:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// 私有构造函数
class SuspendList private constructor() {
companion object {
// 静态 map 存放所有异步任务链
var map = ArrayMap<String, SuspendList>()
// 根据 key 构建异步任务链
fun of(key: String): SuspendList = map[key] ?: let {
val p = SuspendList()
map[key] = p
p
}
}
...
}

然后就可以像这样使用异步任务链:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
// 构建异步任务链
SuspendList.of("dialog").apply {
// 向链添加异步任务1
add(Item {
suspendAction = { fetchUser() }
resumeAction = { user: Any? -> onUserResume(user) }
priority = 3
})
// 向链添加异步任务2
add(Item {
suspendAction = { fetchActivity() }
resumeAction = { activity: Any? -> onActivityResume(activity) }
priority = 1
})
}.observe()

suspend fun fetchUser(): String {
delay(4000)
return "User Taylor"
}

suspend fun fetchActivity(): String {
delay(5000)
return "Activity Bonus"
}

private fun onActivityResume(activity: Any?) {
Log.v("test", "activity(${activity.toString()}) resume")
}

private fun onUserResume(user: Any?) {
Log.v("test", "user(${user.toString()}) resume")
}

上述代码构建了一个名为 dialog 的异步任务链,向其中添加了两个异步任务,并按优先级观察它们的结果。

其中 Item() 是一个顶层方法,用于构建单个异步任务:

1
fun Item(init: SuspendList.Item.() -> Unit): SuspendList.Item = SuspendList.Item().apply(init)

这是构建对象 DSL 的标准写法,详细讲解可以参见这里

运用 DSL 的思路还可以进一步将构建代码简化成这样:

1
2
3
4
5
6
7
8
9
10
11
12
SuspendList.of("dialog") {
Item {
suspendAction = { fetchUser() }
resumeAction = { user: Any? -> onUserResume(user) }
priority = 3
}
Item {
suspendAction = { fetchActivity() }
resumeAction = { activity: Any? -> onActivityResume(activity) }
priority = 1
}
}.observe()

不过需要对原先的 of()Item() 做一些调整:

1
2
3
4
5
6
7
8
9
10
11
// 新增接收者为SuspendList的 lambda 参数,为构建异步任务提供外层环境
fun of(key: String, init: SuspendList.() -> Unit): SuspendList = (map[key] ?: let {
val p = SuspendList()
map[key] = p
p
}).apply(init)

// 将构建异步任务声明为SuspendList的扩展方法
// 构建异步任务后将其插入到异步任务链中
fun SuspendList.Item(init: SuspendList.Item.() -> Unit): SuspendList.Item =
SuspendList.Item().apply(init).also { add(it) }

# 异步超时

若某个高优先级的异步任务迟迟不能结束,其它任务只能都被阻塞?

得加个超时参数:

1
2
3
4
5
6
7
class Item {
// 为异步操作赋值时,不再立刻构建协程
var suspendAction: (suspend () -> Any?)? = null
// 超时时长
var timeout: Long = -1
...
}

为单个异步任务添加超时时长参数,还得重构一下异步任务的构建函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
fun SuspendList.Item(init: SuspendList.Item.() -> Unit): SuspendList.Item =
SuspendList.Item().apply {
// 为异步任务设置各种参数
init()
// 启动协程
GlobalScope.launch {
// 将异步任务结果包装成 Deferred
deferred = async {
// 若需要超时机制
if (timeout > 0) {
withTimeoutOrNull(timeout) { suspendAction?.invoke() }
}
// 不需要超时机制
else {
suspendAction?.invoke()
}
}
}
}.also { add(it) }

原本在 suspendAction 赋值时就立马启动协程,现在将其延后,等所有参数都设置完毕后才启动。这样可以避免 “先为 suspendAction 赋值,再为 timeout 赋值”case 下超时无效的 bug。

使用 withTimeoutOrNull() 实现超时机制,当超时发生时,业务会从 resumeAction 中获得 null

# 异步任务生命周期管理

构建异步任务链时使用了 GlobalScope.launch() 启动协程,其创建的协程不符合 structured-concurrency 。所以需要手动管理生命周期:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
class SuspendList private constructor() {
class Item {
// 为异步任务新增 Job 属性,指向其对应的协程
internal var job:Job? = null
...
}

// observer()返回类型为 Job,业务层可以在需要的时候取消它
fun observe() = GlobalScope.launch(Dispatchers.Main) {
var p: Item? = head.next
while (p != null) {
p.resumeAction?.invoke(p.deferred?.await())
// 当异步任务响应被处理后,取消其协程以释放资源
p.job?.cancel()
p = p.next
}
}
}

fun SuspendList.Item(init: SuspendList.Item.() -> Unit): SuspendList.Item =
SuspendList.Item().apply {
init()
// 将该异步任务的协程存储在 job 中
job = GlobalScope.launch {
deferred = async {
if (timeout > 0) {
withTimeoutOrNull(timeout) { suspendAction?.invoke() }
} else {
suspendAction?.invoke()
}
}
}
}.also { add(it) }

# Talk is cheap, show me the code

本篇的完整源码可以点击这里