Kotlin协程:MutableSharedFlow的实现原理

一.MutableSharedFlow接口的实现

1.MutableSharedFlow方法

在Koltin协程:异步热数据流的设计与使用中,提到了可以通过MutableSharedFlow方法创建一个MutableSharedFlow接口指向的对象,代码如下:

@Suppress("FunctionName", "UNCHECKED_CAST")
public fun <T> MutableSharedFlow(
    replay: Int = 0,
    extraBufferCapacity: Int = 0,
    onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND
): MutableSharedFlow<T> {
    // 参数检查
    require(replay >= 0) { "replay cannot be negative, but was $replay" }
    require(extraBufferCapacity >= 0) { "extraBufferCapacity cannot be negative, but was $extraBufferCapacity" }
    require(replay > 0 || extraBufferCapacity > 0 || onBufferOverflow == BufferOverflow.SUSPEND) {
        "replay or extraBufferCapacity must be positive with non-default onBufferOverflow strategy $onBufferOverflow"
    }
    
    // 相加计算缓存容量
    val bufferCapacity0 = replay + extraBufferCapacity
    // 如果缓存容量小于0,则设置缓存容量为Int类型的最大值
    val bufferCapacity = if (bufferCapacity0 < 0) Int.MAX_VALUE else bufferCapacity0
    // 创建一个SharedFlowImpl类型的对象并返回
    return SharedFlowImpl(replay, bufferCapacity, onBufferOverflow)
}

在MutableSharedFlow方法中,首先将参数replay与参数extraBufferCapacity相加,计算缓存容量,这与上一篇里提到的“参数replay与参数extraBufferCapacity共同决定缓存最大容量”的设计思想一致。接着判断缓存容量是否溢出,即小于0,如果小于零则将缓存容量设置为Int类型的最大值。最后根据参数,创建并返回一个SharedFlowImpl类型的对象。

二.SharedFlowImpl类

SharedFlowImpl类是MutableSharedFlow接口的核心实现,它的继承关系如下图所示: Kotlin协程:MutableSharedFlow的实现原理

  • AbstractSharedFlow类:提供了对订阅者进行管理的方法。
  • CancellableFlow接口:用于标记SharedFlowImpl类型的Flow对象是可取消的。
  • MutableSharedFlow接口:表示SharedFlowImpl类型的Flow对象是一个热流。
  • FusibleFlow接口:表示SharedFlowImpl类型的Flow对象是可融合的。

1.发射数据的管理

在SharedFlowImpl类中,维护了一个缓存数组,用于保存emit方法发射数据,数据缓存数组分成了buffered values和queued emitters两部分,它的结构如下所示: Kotlin协程:MutableSharedFlow的实现原理

  • buffered values:表示当前缓存数据的大小,最大容量为SharedFlowImp类构造方法中bufferCapacity。buffered values由extraBuffer和replayCache两部分构成:
    • replayCache的最大容量由MutableSharedFlow方法中参数replay决定。
    • extraBuffer的最大容量由MutableSharedFlow方法中参数extraBufferCapacity决定。
  • queued emitters:通常情况下,当调用emit方法发射数据时,如果缓存数组的buffered values未达到最大容量,则发射的数据将保存到缓存中,并立即返回emit方法。如果缓存数组的buffered values已达到最大容量,则调用emit方法的协程会被立即挂起,并且它的续体和数据会被封装成一个Emitter类型的对象,保存到缓存数组的queued emitters中。
  • 数据缓存的移动:假设上图中,当buffered values中位置为0的数据被所有的订阅者都处理后,buffered values会前移动一位。这时,queued emitters中位置为7的Emitter类型的对象就会被“拆箱”,将其中保存的数据存放到位置7,同时恢复其中保存的emit方法所在续体的执行。之后,位置7将作为buffered values的一部分。

为了实现上述模型的运行,在SharedFlowImpl类中使用了很多的全局变量,代码如下:

private class SharedFlowImpl<T>(
    private val replay: Int, // replayCache的最大容量
    private val bufferCapacity: Int, // buffered values的最大容量
    private val onBufferOverflow: BufferOverflow // 溢出策略
) : AbstractSharedFlow<SharedFlowSlot>(), MutableSharedFlow<T>, CancellableFlow<T>, FusibleFlow<T> {
    // 缓存数组,用于保存emit方法发射的数据,在需要时进行初始化
    private var buffer: Array<Any?>? = null
    // 新的订阅者从replayCache中获取数据的起始位置
    private var replayIndex = 0L
    // 当前所有的订阅者从缓存数组中获取的数据中,对应位置最小的索引
    // 如果没有订阅者,则minCollectorIndex的值等于replayIndex
    private var minCollectorIndex = 0L
    // 缓存数组中buffered values缓存数据的数量
    private var bufferSize = 0
    // 缓存数组中queued emitters缓存数据的数量
    private var queueSize = 0

    // 当前缓存数组的起始位置
    private val head: Long get() = minOf(minCollectorIndex, replayIndex)
    // 当前缓存数组中replayCache缓存数据的数量
    private val replaySize: Int get() = (head + bufferSize - replayIndex).toInt()
    // 当前缓存数组中已经缓存的数据的数量
    private val totalSize: Int get() = bufferSize + queueSize
    // 当前缓存数组中buffered values的最末尾位置索引的后一位
    private val bufferEndIndex: Long get() = head + bufferSize
    // 当前数组中queued emitters的最末尾位置索引的后一位
    private val queueEndIndex: Long get() = head + bufferSize + queueSize

    ...
}

上面代码中的全局变量对应到数组中的位置如下图所示: Kotlin协程:MutableSharedFlow的实现原理

2.订阅者的管理

在SharedFlowImpl中,AbstractSharedFlow类与AbstractSharedFlowSlot类实现了对订阅者的管理,这两个类都是抽象类。在AbstractSharedFlow类中维护了一个订阅者数组,数组中每一个元素都是一个AbstractSharedFlowSlot类型的对象。

1)AbstractSharedFlowSlot类与SharedFlowSlot类

在AbstractSharedFlowSlot类中,定义了allocateLocked方法与freeLocked方法,用于实现订阅者数组中AbstractSharedFlowSlot类型对象的复用,代码如下:

internal abstract class AbstractSharedFlowSlot<F> {
    // 用于新订阅者申请使用当前AbstractSharedFlowSlot类型的对象
    // 返回true代表申请成功,返回false代表申请失败
    abstract fun allocateLocked(flow: F): Boolean
    // 用于订阅者释放当前使用的AbstractSharedFlowSlot类型得到对象,
    // 并以数组的形式返回待恢复的续体
    abstract fun freeLocked(flow: F): Array<Continuation<Unit>?>
}

在SharedFlowImpl中,当有新的订阅者出现时,会为它在订阅者数组中分配一个类型为SharedFlowSlot的对象。SharedFlowSlot类继承自AbstractSharedFlowSlot类,代码如下:

private class SharedFlowSlot : AbstractSharedFlowSlot<SharedFlowImpl<*>>() {
    // 表示将要在处理的数据在数组中的索引
    // 如果为-1,表示当前可用
    @JvmField
    var index = -1L

    // 用来保存等待新数据发送的订阅者的续体
    @JvmField
    var cont: Continuation<Unit>? = null

    // 重写
    override fun allocateLocked(flow: SharedFlowImpl<*>): Boolean {
        // 如果已经被其他订阅者使用,则返回false
        if (index >= 0) return false
        // 走到这里说明没有被其他订阅者使用,分配成功
        // 获取当前的新订阅者应该从缓存数组获取数据的初始位置
        index = flow.updateNewCollectorIndexLocked()
        return true
    }

    // 重写
    override fun freeLocked(flow: SharedFlowImpl<*>): Array<Continuation<Unit>?> {
        assert { index >= 0 }
        val oldIndex = index
        // 清除索引,表示可用
        index = -1L
        // 清除续体
        cont = null
        return flow.updateCollectorIndexLocked(oldIndex)
    }
}

SharedFlowSlot类在AbstractSharedFlowSlot类的基础上,加入了全局变量index和cont。

当index大于等于0时,表示订阅者应该从缓存数组index对应的位置中获取数据,而当index小于0时,则表示当前SharedFlowSlot类型的对象没有被任何订阅者使用。

当订阅者处理完应该处理的所有数据时,订阅者所在的协程会被挂起,它的续体就会被保存在全局变量cont中。

2)AbstractSharedFlow类

AbstractSharedFlow类内部维护了一个订阅者数组,并管理订阅者数组中AbstractSharedFlowSlot类型的对象。

AbstractSharedFlow类定义了两个抽象方法:createSlot方法与createSlotArray方法,createSlot方法用于创建一个类型为AbstractSharedFlowSlot的对象,createSlotArray方法用于创建一个泛型AbstractSharedFlowSlot的数组,代码如下:

internal abstract class AbstractSharedFlow<S : AbstractSharedFlowSlot<*>> : SynchronizedObject() {
    @Suppress("UNCHECKED_CAST")
    // 存放订阅者的数组,在必要时进行初始化
    protected var slots: Array<S?>? = null
        private set
    // 用于记录订阅者的数量
    protected var nCollectors = 0
        private set
    // 用于保存在订阅者数组中查找空位时,下一次要查找的位置
    private var nextIndex = 0
    // 订阅者数量的状态流,当订阅者数量发生变化时,会进行回调
    private var _subscriptionCount: MutableStateFlow<Int>? = null    
    val subscriptionCount: StateFlow<Int>
        // 加锁
        get() = synchronized(this) {
            // 对_subscriptionCount初始化,nCollectors为初始值
            _subscriptionCount ?: MutableStateFlow(nCollectors).also {
                _subscriptionCount = it
            }
        }

    ...
    // 创建一个类型为AbstractSharedFlowSlot的对象
    protected abstract fun createSlot(): S
    // createSlotArray方法用于创建一个泛型AbstractSharedFlowSlot的数组
    protected abstract fun createSlotArray(size: Int): Array<S?>
    
    ...    
}

AbstractSharedFlow类继承自SynchronizedObject类,SynchronizedObject类实际是Any类的别名,代码如下:

@InternalCoroutinesApi
public actual typealias SynchronizedObject = Any

a)allocateSlot方法

allocateSlot方法用于为新的订阅者在订阅者数组中分配一个可使用的AbstractSharedFlowSlot类型对象,代码如下:

internal abstract class AbstractSharedFlow<S : AbstractSharedFlowSlot<*>> : SynchronizedObject() {
    ...
     
    @Suppress("UNCHECKED_CAST")
    protected fun allocateSlot(): S {
        // 在锁外创建一个状态流
        var subscriptionCount: MutableStateFlow<Int>? = null
        // 加锁
        val slot = synchronized(this) {
            // 对订阅者数组进行判断处理
            val slots = when (val curSlots = slots) {
                // 为空则初始化,初始化大小为2,并保存到全局变量
                null -> createSlotArray(2).also { slots = it }
                // 如果容量已满,则进行扩容,扩容前大小为扩容后大小的2倍,并保存到全局变量
                else -> if (nCollectors >= curSlots.size) {
                    curSlots.copyOf(2 * curSlots.size).also { slots = it }
                } else {
                    curSlots
                }
            }
            
            // 获取全局变量
            var index = nextIndex
            var slot: S
            
            // 遍历
            while (true) {
                // 获取index位置的AbstractSharedFlowSlot类型的对象,
                // 如果为空,则调用createSlot方法创建一个,并保存到订阅者数组中
                slot = slots[index] ?: createSlot().also { slots[index] = it }
                // 自增
                index++
                // 如果遍历到数组的最后一个元素,则从头开始
                if (index >= slots.size) index = 0
                // 尝试对AbstractSharedFlowSlot类型的对象分配订阅者,
                // 分配成功则跳出循环
                if ((slot as AbstractSharedFlowSlot<Any>).allocateLocked(this)) break
            }
            // 走到这里说明已经分配成功
            
            // 将下一次要遍历的位置保存到全局变量
            nextIndex = index
            // 订阅者数量自增
            nCollectors++
            // 获取全局变量
            subscriptionCount = _subscriptionCount
            // 返回分配的AbstractSharedFlowSlot类型的对象
            slot
        }
        // 订阅者状态流增加1,此方法会触发回调通知
        subscriptionCount?.increment(1)
        // 返回
        return slot
    }
    
    ...
}

b)freeSlot方法

freeSlot方法用于释放已分配给订阅者的AbstractSharedFlowSlot类型的对象,代码如下:

internal abstract class AbstractSharedFlow<S : AbstractSharedFlowSlot<*>> : SynchronizedObject() {
    ...

    @Suppress("UNCHECKED_CAST")
    protected fun freeSlot(slot: S) {
        // 在锁外创建一个状态流
        var subscriptionCount: MutableStateFlow<Int>? = null
        // 加锁
        val resumes = synchronized(this) {
            // 订阅者数量自减
            nCollectors--
            // 获取全局变量
            subscriptionCount = _subscriptionCount
            // 如果订阅者为0,说明订阅者数组里没有订阅者,则下一次从头开始
            if (nCollectors == 0) nextIndex = 0
            // 释放已分配的AbstractSharedFlowSlot类型对象
            (slot as AbstractSharedFlowSlot<Any>).freeLocked(this)
        }
        
        // 对释放后返回的续体进行遍历,恢复续体
        for (cont in resumes) cont?.resume(Unit)
        // 订阅者状态流减1,此方法会触发回调通知
        subscriptionCount?.increment(-1)
    }

    // 用于遍历订阅者数组
    protected inline fun forEachSlotLocked(block: (S) -> Unit) {
        // 如果没有订阅者,则直接返回
        if (nCollectors == 0) return
        // 遍历订阅者数组
        slots?.forEach { slot ->
            if (slot != null) block(slot)
        }
    }
    
    ...
}

3.数据的接收

当调用SharedFlow类型对象的collect方法,会触发订阅过程,接收emit方法发送的数据,这部分在 SharedFlowImpl中实现,代码如下:

@Suppress("UNCHECKED_CAST")
override suspend fun collect(collector: FlowCollector<T>) {
    // 为当前的订阅者分配一个SharedFlowSlot类型的对象
    val slot = allocateSlot()
    try {
        // 如果collector类型为SubscribedFlowCollector,
        // 说明订阅者监听了订阅过程的启动,则先回调
        if (collector is SubscribedFlowCollector) collector.onSubscription()
        // 获取订阅者所在的协程
        val collectorJob = currentCoroutineContext()[Job]
        // 死循环
        while (true) {
            var newValue: Any?
            // 死循环
            while (true) {
                // 从缓存数组中获取数据
                newValue = tryTakeValue(slot)
                // 如果获取数据成功,则跳出循环
                if (newValue !== NO_VALUE) break
                // 走到这里,说明获取数据失败,
                // 挂起订阅者所在协程,等待新数据的到来
                awaitValue(slot)
            }
            // 走到这里,说明已经获取到了数据
            // 判断订阅者所在协程是否是存活的,如果不是则抛出异常
            collectorJob?.ensureActive()
            // 进行类型转换,并向下游发射数据
            collector.emit(newValue as T)
        }
    } finally {
        // 释放已分配的SharedFlowSlot类型的对象
        freeSlot(slot)
    }
}


@SharedImmutable
@JvmField
internal val NO_VALUE = Symbol("NO_VALUE")

1)数据的获取

在collect方法中,通过tryTakeValue方法获取数据,代码如下:

private fun tryTakeValue(slot: SharedFlowSlot): Any? {
    var resumes: Array<Continuation<Unit>?> = EMPTY_RESUMES
    // 加锁
    val value = synchronized(this) {
        // 从slot中获取index
        // index表示当前应该从缓存数组的index位置中获取数据
        val index = tryPeekLocked(slot)
        // 如果index小于0,说明没有数据
        if (index < 0) {
            // 返回空数据标识
            NO_VALUE
        } else { // 如果有数据
            // 获取当前的slot的index
            val oldIndex = slot.index
            // 从缓存数组的index处获取数据
            val newValue = getPeekedValueLockedAt(index)
            // 计算下一次获取数据的位置,并保存到slot中
            slot.index = index + 1
            // 更新缓存数组的位置,并获取缓存数组与订阅者数组中可恢复的续体
            resumes = updateCollectorIndexLocked(oldIndex)
            // 返回获取的数据
            newValue
        }
    }
    // 遍历,恢复续体
    for (resume in resumes) resume?.resume(Unit)
    // 返回获取的数据
    return value
}

@JvmField
@SharedImmutable
internal val EMPTY_RESUMES = arrayOfNulls<Continuation<Unit>?>(0)

a)数据获取策略

在tryTakeValue方法,获取数据之前,首先会调用tryPeekLocked方法,判断数据所在的位置是否符合要求,代码如下:

private fun tryPeekLocked(slot: SharedFlowSlot): Long {
    // 从slot中获取index
    val index = slot.index
    // 如果是在buffered values中获取,则直接返回
    if (index < bufferEndIndex) return index
    
    // 走到这里说明是要在queued emitters中获取,
    // 如果buffered values的最大容量大于0,则返回-1
    // 在buffered values可以存在的情况下,禁止发射者和订阅者接触
    if (bufferCapacity > 0) return -1L
    
    // 走到这里说明要在queued emitters中获取,同时buffered values的最大容量为0
    // 这种情况缓存数组只能有queued emitters,
    // 因此,只能处理queued emitters中的第一个Emitter类型的对象
    // 如果当前订阅者想要处理下一个Emitter类型的对象,则返回-1
    if (index > head) return -1L
    
    // 走到这里说明要在queued emitters中获取,同时buffered values的最大容量为0
    // 并且要获取当前的正在处理的Emmiter类型的对象
    // 如果queued emitters为空,说明当前没有Emmiter类型的对象,则返回-1
    if (queueSize == 0) return -1L
    // 满足上述要求,返回index
    return index
}

在允许bufferd values存在的情况下,只能从bufferd values获取数据。在不允许bufferd values存在的情况下,只能处理queued emitters的第一个Emitter类型的对象。

b)获取数据

如果数据所在的位置符合要求,则会调用getPeekedValueLockedAt方法获取数据,代码如下:

private fun getPeekedValueLockedAt(index: Long): Any? =
    // 从缓存数组中index位置获取数据
    when (val item = buffer!!.getBufferAt(index)) {
        // 如果是Emitter类型的,则进行拆箱,获取数据
        is Emitter -> item.value
        // 直接返回
        else -> item
    }

Emitter类是SharedFlowImpl类的内部类,用于在挂起调用emit方法所在的协程后,对emit方法发射的数据及挂起后的续体进行封装,代码如下:

private class Emitter(
    @JvmField val flow: SharedFlowImpl<*>,
    @JvmField var index: Long, // 当前对象在缓存数组中的位置
    @JvmField val value: Any?,// emit方法发射的数据
    @JvmField val cont: Continuation<Unit> // 挂起的续体
) : DisposableHandle {
    override fun dispose() = flow.cancelEmitter(this)
}

2)订阅者协程的挂起

在collect方法中,当订阅者无数据可获取时,则会调用awaitValue方法,挂起订阅者所在的协程,代码如下:

private suspend fun awaitValue(slot: SharedFlowSlot): Unit = 
  // 直接挂起订阅者所在的协程
  suspendCancellableCoroutine { cont ->
    // 加锁
    synchronized(this) lock@{
        // 再次检查当前的index是否满足要求
        val index = tryPeekLocked(slot)
        // 如果确实不满足要求
        if (index < 0) {
            // 保存续体到slot中
            slot.cont = cont
        } else { // 如果再次检查发现index这时满足要求
            // 则恢复挂起,并返回
            cont.resume(Unit)
            return@lock
        }
        // 保存续体到slot中
        slot.cont = cont
    }
}

4.数据的发射

当需要发射数据时,会调用SharedFlowImpl类的emit方法,代码如下:

override suspend fun emit(value: T) {
    // 首先尝试调用不需要挂起的tryEmit方法,如果发射成功,则返回
    if (tryEmit(value)) return
    
    // 走到这里说明需要挂起,则调用emitSuspend方法
    emitSuspend(value)
}

1)以不挂起的方式发射数据

SharedFlowImpl类中实现了MutableSharedFlow接口中tryEmit方法,用于以不挂起的方式发射数据,代码如下:

override fun tryEmit(value: T): Boolean {
    var resumes: Array<Continuation<Unit>?> = EMPTY_RESUMES
    // 加锁
    val emitted = synchronized(this) {
        // 尝试发射数据,如果发射成功
        if (tryEmitLocked(value)) {
            // 收集已经挂起的订阅者的续体
            resumes = findSlotsToResumeLocked(resumes)
            // 返回true
            true
        } else { // 发射失败
            // 返回false
            false
        }
    }
    // 唤起挂起的订阅者
    for (cont in resumes) cont?.resume(Unit)
    // 返回结果
    return emitted
}

tryEmit方法中通过tryEmitLocked方法尝试对数据进行发射,如果发射成功,会调用findSlotsToResumeLocked方法收集已经挂起的订阅者的续体,并唤醒订阅者去接收消费数据。

tryEmitLocked方法代码如下:

@Suppress("UNCHECKED_CAST")
private fun tryEmitLocked(value: T): Boolean {
    // 如果当前没有订阅者,则调用tryEmitNoCollectorsLocked处理,并返回
    // 该方法永远返回true
    if (nCollectors == 0) return tryEmitNoCollectorsLocked(value)
    // 如果当前有订阅者,同时buffered values已达到最大容量
    if (bufferSize >= bufferCapacity && minCollectorIndex <= replayIndex) {
        // 根据溢出策略进行判断
        when (onBufferOverflow) {
            // 如果是挂起,则返回fasle
            BufferOverflow.SUSPEND -> return false
            // 如果是丢掉最新的数据,则返回true
            BufferOverflow.DROP_LATEST -> return true
            // 如果是丢掉最旧的数据,则暂不作处理
            BufferOverflow.DROP_OLDEST -> {}
        }
    }
    
    // 走到这里,有两种情况:
    // 情况1:buffered values还可以继续添加数据
    // 情况2:buffered values已达到最大容量,同时溢出策略为DROP_OLDEST
    
    // 将数据加入到缓存数组中
    // 这里因为tryEmit方法不会挂起emit方法所在的协程,
    // 所以value没有被封装成Emitter类型的对象
    enqueueLocked(value)
    // buffered values的数据数量加1
    bufferSize++
    // 如果buffered values的数据数量超过最大容量的限制,
    // 说明此时为情况2,则调用dropOldestLocked方法,丢弃最旧的数据
    if (bufferSize > bufferCapacity) dropOldestLocked()
    // 如果replayCache中数据的数量超过了最大容量
    if (replaySize > replay) {
        // 更新replayIndex的值,replayIndex向前移动一位
        updateBufferLocked(replayIndex + 1, minCollectorIndex, bufferEndIndex, queueEndIndex)
    }
    // 返回true
    return true
}

buffered values已达到最大容量,同时溢出策略为DROP_OLDEST情况下数据发射图解Kotlin协程:MutableSharedFlow的实现原理     在tryEmitLocked方法中,如果当前没有订阅者时,会调用tryEmitNoCollectorsLocked方法,代码如下:

// 如果当前没有订阅者,会调用tryEmitNoCollectorsLocked方法
private fun tryEmitNoCollectorsLocked(value: T): Boolean {
    assert { nCollectors == 0 }
    // 如果不允许有replayCache,则不处理,直接返回true
    if (replay == 0) return true
    // 走到这里说明可以有replayCache
    // 加入到缓存数组中
    enqueueLocked(value)
    // buffered values的数据数量加1
    bufferSize++
    // 如果buffered values的数据数量超过了replayCache的最大容量
    // 则丢弃最旧的数据
    // 因为新订阅者只会从replayCache中取数据,
    // 如果没有订阅者,buffered values的数据数量超过replayCache的最大容量没有意义
    if (bufferSize > replay) dropOldestLocked()
    // 重新计算minCollectorIndex
    minCollectorIndex = head + bufferSize
    // 返回true
    return true
}

在tryEmitNoCollectorsLocked方法中,如果发现当前buffered values的数据数量超过了replayCache的最大容量,则会丢弃最旧的数据,保持buffered values中数据的数量最大为replay。因为当有新的订阅者出现时,首先会从replayCache中获取数据,因此在buffered values中,replayCache前的数据只对已经订阅的订阅者有用,而此时又没有订阅者,因此缓存超过replayCache最大容量的数据只会占用更多内存,是没有意义的。

通过对tryEmitLocked方法与tryEmitNoCollectorsLocked方法的分析,可以知道数据的发射最终都调用了enqueueLocked方法,代码如下:

private fun enqueueLocked(item: Any?) {
    // 获取当前缓存数组中缓存的数量
    val curSize = totalSize
    // 判断
    val buffer = when (val curBuffer = buffer) {
        // 缓存数组为空,则进行初始化,初始化容量为2
        null -> growBuffer(null, 0, 2)
        // 如果超过了当前缓存数组的最大容量,则进行扩容,新的缓存数组的容量为之前的2倍
        // growBuffer方法会把原来缓存数组的数据填充到新的缓存数组中
        else -> if (curSize >= curBuffer.size) growBuffer(curBuffer, curSize,curBuffer.size * 2) else curBuffer
    }
    // 将要发射的数据填充的缓存数组的head + curSize位置
    buffer.setBufferAt(head + curSize, item)
}

enqueueLocked方法内部将要发射的数据填充的缓存数组的顺序位置,最终完成了数据发射的过程。通过分析可以知道,数据发射的实质就是将数据添加到缓存数组中。

2)以挂起的方式发射数据

SharedFlowImpl类中实现了MutableSharedFlow接口中emit方法。在上面分析的emit方法中,首先会尝试通过tryEmit方法发射数据,如果发射失败,说明发射过程需要挂起,这时会调用emitSuspend方法,代码如下:

private suspend fun emitSuspend(value: T) = 
  // 直接挂起emit方法所在的协程,获取续体
  suspendCancellableCoroutine<Unit> sc@{ cont ->
    var resumes: Array<Continuation<Unit>?> = EMPTY_RESUMES
    // 加锁
    val emitter = synchronized(this) lock@{
        // 这里再次尝试以tryEmit的方式发射数据
        if (tryEmitLocked(value)) {
            // 如果发射成功,则恢复续体的执行
            cont.resume(Unit)
            // 收集已经挂起的订阅者的续体
            resumes = findSlotsToResumeLocked(resumes)
            // 返回
            return@lock null
        }
        // 将续体、待发射的数据等封装成Emitter类型的对象
        Emitter(this, head + totalSize, value, cont).also {
            // 加入到缓存数组中
            enqueueLocked(it)
            // queued emitters的数据的数量加1
            queueSize++
            // 如果buffered values的最大容量为0,即不存在
            // 则收集已经挂起的订阅者的续体,保存到局部变量resumes中
            if (bufferCapacity == 0) resumes = findSlotsToResumeLocked(resumes)
        }
    }
    // emitter对象监听emit方法所在协程的取消
    // 发生取消时会调用emitter对象的dispose方法
    emitter?.let { cont.disposeOnCancellation(it) }
    // 遍历,唤起挂起的订阅者
    for (cont in resumes) cont?.resume(Unit)
}

3)唤醒挂起的订阅者

无论是在tryEmit方法,还是在emit方法,当发射数据成功后,都会调用findSlotsToResumeLocked方法,获取已经挂起的订阅者的续体,然后恢复订阅者所在协程的执行,代码如下:

private fun findSlotsToResumeLocked(resumesIn: Array<Continuation<Unit>?>): Array<Continuation<Unit>?> {
    // 引用参数中的续体数组
    var resumes: Array<Continuation<Unit>?> = resumesIn
    // 用于记录需要恢复的续体的数量
    var resumeCount = resumesIn.size
    // 遍历订阅者数组
    forEachSlotLocked loop@{ slot ->
        // 获取续体,如果续体为空,说明对应订阅者的协程没有挂起,本次循环返回
        val cont = slot.cont ?: return@loop
        // 判断slot中index是否符合要求
        // 如果不符合要求,则本次循环返回
        if (tryPeekLocked(slot) < 0) return@loop
        // 如果需要恢复的续体的数量超过续体数组的容量,则进行扩容
        // 新的续体数组的容量是之前续体数组容量的2倍
        if (resumeCount >= resumes.size) resumes = resumes.copyOf(maxOf(2, 2 * resumes.size))
        // 保存续体到续体数组中
        resumes[resumeCount++] = cont
        // 清空slot中保存的续体
        slot.cont = null
    }
    // 返回收集完的续体数组
    return resumes
}

5.新订阅者获取缓存数据

SharedFlowImpl类实现了SharedFlow接口,重写了其中的常量replayCache,当有新订阅者出现时,如果replayCache存在,并且有缓存数据,则优先从replayCache中获取,代码如下:

override val replayCache: List<T>
    // 只能获取,不能设置,加锁
    get() = synchronized(this) {
        // 获取当前replayCache中缓存数据的数量
        val replaySize = this.replaySize
        // 如果数量为0,则返回一个空列表
        if (replaySize == 0) return emptyList()
        // 若数量不为0,则根据容量创建一个列表
        val result = ArrayList<T>(replaySize)
        // 获取缓存数组
        val buffer = buffer!!
        // 遍历replayCache,将数据进行类型转换,并添加到列表中
        @Suppress("UNCHECKED_CAST")
        for (i in 0 until replaySize) result += buffer.getBufferAt(replayIndex + i) as T
        // 返回列表
        result
    }

6.热流的融合

SharedFlowImpl类实现了FusibleFlow接口,重写了其中的fuse方法,代码如下:

// 调用了fuseSharedFlow方法实现
override fun fuse(context: CoroutineContext, capacity: Int, onBufferOverflow: BufferOverflow) =
    fuseSharedFlow(context, capacity, onBufferOverflow)

...

internal fun <T>  SharedFlow<T>.fuseSharedFlow(
    context: CoroutineContext,
    capacity: Int,
    onBufferOverflow: BufferOverflow
): Flow<T> {
    // 如果容量为0或默认值,同时溢出策略为SUSPEND
    if ((capacity == Channel.RENDEZVOUS || capacity == Channel.OPTIONAL_CHANNEL) && onBufferOverflow == BufferOverflow.SUSPEND) {
        // 返回自身
        return this
    }
    // 其他情况,将当前的SharedFlow对象包装成ChannelFlowOperatorImpl类型的对象
    return ChannelFlowOperatorImpl(this, context, capacity, onBufferOverflow)
}

在Kotlin协程:Flow的融合、Channel容量、溢出策略中提到过,当对类型为SharedFlowImpl的对象使用某些操作符时,会触发fuse方法的执行。fuse方法默认的容量为OPTIONAL_CHANNEL,默认的溢出策略为SUSPEND,返回自身,因此融合后还是SharedFlowImpl类型的对象。

如果容量为RENDEZVOUS,同时溢出策略为SUSPEND时,也会返回自身。RENDEZVOUS表示容量为0,无论SharedFlowImpl类型的对象的buffered values最大容量是否为0,在外面再套一层RENDEZVOUS是没有意义的。

其他情况下,SharedFlowImpl类型的对象会被封装成一个类型为ChannelFlowOperatorImpl的对象,根据Kotlin协程:flowOn与线程切换讲过的,之后向下游发射的数据会通过Channel来发送。

7.只读热流

调用MutableSharedFlow方法,可以得到一个类型为MutableSharedFlow的对象。通过这个对象,我们可以调用它的collect方法来订阅接收,也可以调用它的emit方法来发射数据。但大多数的时候,我们需要统一数据的发射过程,因此需要对外暴露一个只可以调用collect方法订阅而不能调用emit方法发射的对象,而不是直接暴露MutableSharedFlow类型的对象。

根据上面代码的介绍,订阅的过程实际上是从缓存数组中读取数据的,而发射的过程实际上是向缓存数据中写数据,因此如果一个流只能调用collect方法而不能调用emit方法,这种流这是一种只读流。

事实上,根据在Koltin协程:异步热数据流的设计与使用中对接口的分析可以发现,MutableSharedFlow接口继承了FlowCollector接口和SharedFlow接口,emit方法定义在FlowCollector中。SharedFlow接口继承了Flow接口,collect方法定义在Flow接口中。因此只要将MutableSharedFlow接口指向的对象转换为SharedFlow接口指向的对象就可以将读写流转换为只读流。

在代码中,对MutableSharedFlow类型的对象调用asSharedFlow方法恰好可以实现将读写流转换为只读流,代码如下:

// 该方法调用了ReadonlySharedFlow方法,返回一个类型为SharedFlow的对象
public fun <T> MutableSharedFlow<T>.asSharedFlow(): SharedFlow<T> =
    // 传入当前的MutableSharedFlow类型的对象
    ReadonlySharedFlow(this)

// 实现了FusibleFlow接口,
// 实现了SharedFlow接口,并且使用上一步传入的MutableSharedFlow类型的对象作为代理
private class ReadonlySharedFlow<T>(
    flow: SharedFlow<T>
) : SharedFlow<T> by flow, CancellableFlow<T>, FusibleFlow<T> {
    // 用于流融合,也是通过fuseSharedFlow方法实现
    override fun fuse(context: CoroutineContext, capacity: Int, onBufferOverflow: BufferOverflow) =
        fuseSharedFlow(context, capacity, onBufferOverflow)
}
免责声明:
1.本站所有内容由本站原创、网络转载、消息撰写、网友投稿等几部分组成。
2.本站原创文字内容若未经特别声明,则遵循协议CC3.0共享协议,转载请务必注明原文链接。
3.本站部分来源于网络转载的文章信息是出于传递更多信息之目的,不意味着赞同其观点。
4.本站所有源码与软件均为原作者提供,仅供学习和研究使用。
5.如您对本网站的相关版权有任何异议,或者认为侵犯了您的合法权益,请及时通知我们处理。
火焰兔 » Kotlin协程:MutableSharedFlow的实现原理