collect通知flow执行
- public suspend inline fun <T> Flow<T>.collect(crossinline action: suspend (value: T) -> Unit): Unit =
- collect(object : FlowCollector<T> {
- override suspend fun emit(value: T) = action(value)
- })
flow是冷流,只有调用collect{}方法时才能触发flow代码块的执行。还有一点要注意,collect{}方法是个suspend声明的方法,需要在协程作用域的范围能调用。
除此之外,collect{}方法的参数是一个被crossinline修饰的函数类型,旨在加强内联,禁止在该函数类型中直接使用return关键字(return@标签除外)。
- fun main() {
- GlobalScope.launch {
- flow {
- emit("haha")
- }.collect {
- }
- }
- }
launchIn()指定协程作用域通知flow执行
- public fun <T> Flow<T>.launchIn(scope: CoroutineScope): Job = scope.launch {
- collect() // tail-call
- }
这个方法允许我们直接传入一个协程作用域的参数,不需要直接在外部开启一个协程执行。本质上就是使用我们传入的协程作用域手动开启一个协程代码块调用collect{}通知协程执行。
这里看官方的源码有个tail-call的注释,也就是尾调用的意思,猜测这里可能官方会在这里进行了优化,减少了栈中方法调用的层级,降低栈溢出的风险。
- fun main() {
- flow {
- emit("haha")
- }.launchIn(GlobalScope)
- }
catch{}捕捉异常
- public fun <T> Flow<T>.catch(action: suspend FlowCollector<T>.(cause: Throwable) -> Unit): Flow<T> =
- flow {
- val exception = catchImpl(this)
- if (exception != null) action(exception)
- }
这个就是用来捕捉异常的,不过注意,只能捕捉catch()之前的异常,下面来个图阐述下:

即,只能捕捉第一个红框中的异常,而不能捕捉第二个红框中的异常。
merge()合流
- public fun <T> merge(vararg flows: Flow<T>): Flow<T> = flows.asIterable().merge()
最终的实现类如下:

请注意,这个合流的每个流可以理解为是并行执行的,而不是后一个流等待前一个流中的flow代码块中的逻辑执行完毕再执行,这样做的目的可以提供合流的每个流的执行效果。
测试代码如下:
- fun main() {
- GlobalScope.launch {
- merge(flow {
- delay(1000)
- emit(4)
- }, flow {
- println("flow2")
- delay(2000)
- emit(20)
- }).collect {
- println("collect value: $it")
- }
- }
- }
输出日志如下:

map{}变换发送的数据类型
- public inline fun <T, R> Flow<T>.map(crossinline transform: suspend (value: T) -> R): Flow<R> = transform { value ->
- return@transform emit(transform(value))
- }
这个api没什么可将的,很多的地方比如集合、livedata中都有它的影子,它的作用就是将当前数据类型变换成另一种数据类型(可以相同)。
- fun main() {
- GlobalScope.launch {
- flow {
- emit(5)
- }.map {
- "ha".repeat(it)
- }.collect {
- println("collect value: $it")
- }
- }
- }
总结
本篇文章介绍了flow常见的api,接下来还会有一些列文章用来介绍flow的其他api,更多关于Android开发flow常见API的资料请关注w3xue其它相关文章!