你会怎么实现下面这个场景?应用首页有三个优先级从高到低的弹窗,展示内容依赖网络请求,若低优先级弹窗请求先返回则需等待,让高优先级先展示。
串行请求是最容易想到的解决方案,即先请求最高优先级的弹窗,当它返回展示后再请求第二优先级弹窗。但这样会拉长所有弹窗的展示时间。
性能更好的方案是同时并行发出三个请求,但网络请求时长的不确定性使得最高优先级的弹窗不一定优先返回,所以得设计一种优先级阻塞机制 。本文使用 协程 + 链式队列 实现这个机制。
# 异步任务链
把单个异步任务进行抽象:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 class Item { companion object { const val PRIORITY_DEFAULT = 0 } var suspendAction: (suspend () -> Any?)? = null set (value) { field = value value?.let { GlobalScope.launch { deferred = async { it.invoke() } } } } var resumeAction: ((Any?) -> Unit )? = null var deferred: Deferred<*>? = null var priority: Int = PRIORITY_DEFAULT }
异步任务有三个主要的属性,分别是异步操作 suspendAction
、异步响应 resumeAction
、异步结果 deferred
。每当异步操作被赋值时,就启动协程执行它,并将其返回值保存在 deferred
中。
为了确保异步任务的优先级,把多个异步任务用链的方式串起来,组成异步任务链 :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 class Item { companion object { const val PRIORITY_DEFAULT = 0 } var suspendAction: (suspend () -> Any?)? = null set (value) { field = value value?.let { GlobalScope.launch { deferred = async { it.invoke() } } } } var resumeAction: ((Any?) -> Unit )? = null var deferred: Deferred<*>? = null var priority: Int = PRIORITY_DEFAULT internal var next: Item? = null internal var pre: Item? = null internal fun addNext (item: Item ) { next?.let { it.pre = item item.next = it item.pre = this this .next = item } ?: also { this .next = item item.pre = this item.next = null } } internal fun addPre (item: Item ) { pre?.let { it.next = item item.pre = it item.next = this this .pre = item } ?: also { item.next = this item.pre = null this .pre = item } } }
用 自己包含自己 的方式就能实现链式结构。Android 消息列表也用同样的结构:
1 2 3 4 public final class Message implements Parcelable { Message next; ... }
链必须有个头结点,存放头结点的类就是存放整个链的类,就好像消息列表 MessageQueue
一样:
1 2 3 public final class MessageQueue { Message mMessages; }
模仿消息列表,写一个异步任务链 :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 class SuspendList { private var head: Item = emptyItem() fun add (item: Item ) { head.findItem(item.priority).addNext(item) } private fun Item.findItem (priority: Int ) : Item { var p: Item? = this var next: Item? = p?.next while (next != null ) { if (priority in p!!.priority until next.priority) { break } p = p.next next = p?.next } return p!! } fun observe () = GlobalScope.launch(Dispatchers.Main) { var p: Item? = head.next while (p != null ) { p.resumeAction?.invoke(p.deferred?.await()) p = p.next } } class Item { ... } } fun emptyItem () : SuspendList.Item = SuspendList.Item().apply { priority = -1 }
SuspendList
持有链的头结点,为了使 “头插入” 和 “中间插入” 复用一套代码,将头结点设置为 “空结点”。
异步任务链 上的任务按优先级升序排列(优先级数字越小优先级越高)。这保证了优先级最高的异步任务总是在链表头。
优先级阻塞:当所有异步任务都被添加到链之后,调用 observe()
观察整个异步任务链。该方法启动了一个协程,在协程中从头结点向后遍历链,并在每个异步任务的 Deferred
上阻塞。因为链表已按优先级排序,所以阻塞时也是按优先级从高到低进行的。
# 全局作用域
真实业务场景中,需要统一安排优先级的异步任务可能是跨界面的。这就要求异步任务链能全局访问,单例是一个最直接的选择,但它限制了整个 App 中异步任务链的数量:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 class SuspendList private constructor () { companion object { var map = ArrayMap<String, SuspendList>() fun of (key: String ) : SuspendList = map[key] ?: let { val p = SuspendList() map[key] = p p } } ... }
然后就可以像这样使用异步任务链:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 SuspendList.of("dialog" ).apply { add(Item { suspendAction = { fetchUser() } resumeAction = { user: Any? -> onUserResume(user) } priority = 3 }) add(Item { suspendAction = { fetchActivity() } resumeAction = { activity: Any? -> onActivityResume(activity) } priority = 1 }) }.observe() suspend fun fetchUser () : String { delay(4000 ) return "User Taylor" } suspend fun fetchActivity () : String { delay(5000 ) return "Activity Bonus" } private fun onActivityResume (activity: Any ?) { Log.v("test" , "activity(${activity.toString()} ) resume" ) } private fun onUserResume (user: Any ?) { Log.v("test" , "user(${user.toString()} ) resume" ) }
上述代码构建了一个名为 dialog 的异步任务链,向其中添加了两个异步任务,并按优先级观察它们的结果。
其中 Item()
是一个顶层方法,用于构建单个异步任务:
1 fun Item (init : SuspendList .Item .() -> Unit ) : SuspendList.Item = SuspendList.Item().apply(init )
这是构建对象 DSL 的标准写法,详细讲解可以参见这里 。
运用 DSL 的思路还可以进一步将构建代码简化成这样:
1 2 3 4 5 6 7 8 9 10 11 12 SuspendList.of("dialog" ) { Item { suspendAction = { fetchUser() } resumeAction = { user: Any? -> onUserResume(user) } priority = 3 } Item { suspendAction = { fetchActivity() } resumeAction = { activity: Any? -> onActivityResume(activity) } priority = 1 } }.observe()
不过需要对原先的 of()
和 Item()
做一些调整:
1 2 3 4 5 6 7 8 9 10 11 fun of (key: String , init : SuspendList .() -> Unit ) : SuspendList = (map[key] ?: let { val p = SuspendList() map[key] = p p }).apply(init ) fun SuspendList.Item (init : SuspendList .Item .() -> Unit ) : SuspendList.Item = SuspendList.Item().apply(init ).also { add(it) }
# 异步超时
若某个高优先级的异步任务迟迟不能结束,其它任务只能都被阻塞?
得加个超时参数:
1 2 3 4 5 6 7 class Item { var suspendAction: (suspend () -> Any?)? = null var timeout: Long = -1 ... }
为单个异步任务添加超时时长参数,还得重构一下异步任务的构建函数:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 fun SuspendList.Item (init : SuspendList .Item .() -> Unit ) : SuspendList.Item = SuspendList.Item().apply { init () GlobalScope.launch { deferred = async { if (timeout > 0 ) { withTimeoutOrNull(timeout) { suspendAction?.invoke() } } else { suspendAction?.invoke() } } } }.also { add(it) }
原本在 suspendAction
赋值时就立马启动协程,现在将其延后,等所有参数都设置完毕后才启动。这样可以避免 “先为 suspendAction 赋值,再为 timeout 赋值”case 下超时无效的 bug。
使用 withTimeoutOrNull()
实现超时机制,当超时发生时,业务会从 resumeAction
中获得 null
。
# 异步任务生命周期管理
构建异步任务链时使用了 GlobalScope.launch()
启动协程,其创建的协程不符合 structured-concurrency
。所以需要手动管理生命周期:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 class SuspendList private constructor () { class Item { internal var job:Job? = null ... } fun observe () = GlobalScope.launch(Dispatchers.Main) { var p: Item? = head.next while (p != null ) { p.resumeAction?.invoke(p.deferred?.await()) p.job?.cancel() p = p.next } } } fun SuspendList.Item (init : SuspendList .Item .() -> Unit ) : SuspendList.Item = SuspendList.Item().apply { init () job = GlobalScope.launch { deferred = async { if (timeout > 0 ) { withTimeoutOrNull(timeout) { suspendAction?.invoke() } } else { suspendAction?.invoke() } } } }.also { add(it) }
# Talk is cheap, show me the code
本篇的完整源码可以点击这里