Kotlin协程

文章说明

本文内容基于Kotlin官方文档总结翻译而成,旨在为自己学习Kotlin使用。文中的代码示例、技术概念和最佳实践均参考自Kotlin官方资源和相关技术文档,如需获取最新或更详细的信息,建议查阅官方文档。


1. 协程基础

协程概念

协程是一个可挂起计算的实例。从概念上讲,协程类似于线程,都是执行与其他代码并发运行的代码块。但是,协程并不绑定到特定线程。协程可以在一个线程中挂起执行,然后在另一个线程中恢复。

可以将协程视为轻量级线程,但它们与真正的线程有很多重要区别,使得在实际使用中有明显不同。

下面是一个简单的协程示例:

import kotlinx.coroutines.*

fun main() = runBlocking {
// 启动一个新协程
launch {
delay(1000L) // 非阻塞延迟1秒
println("World!")
}
println("Hello")
}

运行结果:

Hello
World!

这段代码的工作原理:

  • launch 是一个协程构建器,它启动一个新协程与其他代码并发运行。因此,”Hello”会先被打印。
  • delay 是一个特殊的挂起函数,它会在指定时间内挂起协程。挂起协程并不会阻塞底层线程,而是允许其他协程运行和使用底层线程。
  • runBlocking 也是一个协程构建器,它桥接了常规fun main()的非协程世界和协程代码内部的runBlocking { ... }花括号。

挂起函数

通过将launch块中的代码提取到单独的函数中,可以创建一个挂起函数。挂起函数是通过suspend修饰符标记的,可以在协程内部使用,就像常规函数一样,但它们的额外特性是可以使用其他挂起函数(如本例中的delay)来挂起协程的执行。

import kotlinx.coroutines.*

fun main() = runBlocking {
launch {
doWorld()
}
println("Hello")
}

// 第一个挂起函数
suspend fun doWorld() {
delay(1000L)
println("World!")
}

结构化并发

协程遵循结构化并发原则,这意味着新协程只能在特定的CoroutineScope中启动,该范围界定了协程的生命周期。在上面的例子中,runBlocking建立了相应的作用域,这就是为什么前面的例子会等待”World!”在一秒延迟后打印,然后才退出。

在实际应用中,会启动大量协程。结构化并发确保协程不会丢失或泄漏。外部作用域只有在所有子协程完成后才能完成。结构化并发也确保代码中的任何错误都能被正确报告,永远不会丢失。

协程作用域构建器

除了由不同构建器提供的协程作用域外,还可以使用coroutineScope构建器声明自己的作用域。它创建一个协程作用域,并且直到所有启动的子协程完成后才会完成。

runBlockingcoroutineScope构建器看起来相似,因为它们都会等待它们的主体和所有子协程完成。主要区别在于runBlocking方法会阻塞当前线程等待,而coroutineScope只是挂起,释放底层线程以供其他用途。由于这种差异,runBlocking是一个常规函数,而coroutineScope是一个挂起函数。

import kotlinx.coroutines.*

fun main() = runBlocking {
doWorld()
}

suspend fun doWorld() = coroutineScope {
launch {
delay(1000L)
println("World!")
}
println("Hello")
}

并发执行

coroutineScope构建器可以在任何挂起函数中使用,以执行多个并发操作。以下代码在doWorld挂起函数内启动两个并发协程:

import kotlinx.coroutines.*

fun main() = runBlocking {
doWorld()
println("Done")
}

suspend fun doWorld() = coroutineScope {
launch {
delay(2000L)
println("World 2")
}
launch {
delay(1000L)
println("World 1")
}
println("Hello")
}

launch { ... }块内的两段代码会并发执行,先打印”World 1”,然后打印”World 2”。doWorld中的coroutineScope只有在两者都完成后才会完成,所以doWorld返回并允许”Done”字符串打印出来:

Hello
World 1
World 2
Done

显式Job

launch协程构建器返回一个Job对象,它是已启动协程的句柄,可用于显式等待其完成。例如,可以等待子协程完成,然后再打印”Done”字符串:

import kotlinx.coroutines.*

fun main() = runBlocking {
val job = launch {
delay(1000L)
println("World!")
}
println("Hello")
job.join() // 等待子协程完成
println("Done")
}

输出结果:

Hello
World!
Done

协程的轻量性

协程比JVM线程消耗的资源少。使用线程时会耗尽JVM可用内存的代码可以使用协程表达而不会触及资源限制。例如,以下代码启动了50,000个独立的协程,每个都等待5秒,然后打印一个句点(“.”),而消耗很少的内存:

import kotlinx.coroutines.*

fun main() = runBlocking {
repeat(50_000) {
launch {
delay(5000L)
print(".")
}
}
}

如果用线程编写相同的程序(移除runBlocking,用thread替换launch,用Thread.sleep替换delay),它将消耗大量内存,甚至可能导致内存不足错误。

2. 协程上下文与调度器

协程上下文

协程总是在由CoroutineContext类型的值表示的上下文中执行。协程上下文是各种元素的集合,主要元素是协程的Job和其调度器。

调度器和线程

协程上下文包括一个协程调度器(见CoroutineDispatcher),它确定相应协程使用哪个线程或哪些线程执行。协程调度器可以将协程执行限制在特定线程中,将其分派到线程池,或者让它不受限制地运行。

所有协程构建器(如launchasync)都接受一个可选的CoroutineContext参数,可用于明确指定新协程的调度器。

import kotlinx.coroutines.*

fun main() = runBlocking<Unit> {
launch { // 父协程的上下文,主runBlocking协程
println("main runBlocking: 运行在线程 ${Thread.currentThread().name}")
}
launch(Dispatchers.Unconfined) { // 不受限制 -- 将与主线程一起工作
println("Unconfined: 运行在线程 ${Thread.currentThread().name}")
}
launch(Dispatchers.Default) { // 将被分派到DefaultDispatcher
println("Default: 运行在线程 ${Thread.currentThread().name}")
}
launch(newSingleThreadContext("MyOwnThread")) { // 将获得自己的新线程
println("newSingleThreadContext: 运行在线程 ${Thread.currentThread().name}")
}
}

不同类型的调度器:

  1. 当没有参数的launch { ... }使用时,它继承启动它的CoroutineScope的上下文(因此也继承了调度器)。
  2. Dispatchers.Unconfined是一个特殊的调度器,它看起来也在主线程中运行,但实际上是一种不同的机制。
  3. 默认调度器是当在作用域中没有明确指定其他调度器时使用的。它由Dispatchers.Default表示,并使用共享的后台线程池。
  4. newSingleThreadContext为协程创建一个线程。专用线程是一种非常昂贵的资源,在实际应用中,必须在不再需要时通过close函数释放它,或者存储在顶层变量中并在整个应用程序中重用。

非受限与受限调度器

Dispatchers.Unconfined协程调度器在调用者线程中启动协程,但仅在第一个挂起点之前。挂起后,它在完全由被调用的挂起函数决定的线程中恢复协程。对于既不消耗CPU时间也不更新任何特定线程中的共享数据(如UI)的协程来说,非受限调度器是合适的。

另一方面,默认情况下,调度器是从外部CoroutineScope继承的。特别是runBlocking协程的默认调度器被限制在调用者线程,所以继承它具有将执行限制在这个线程的效果,具有可预测的FIFO调度。

import kotlinx.coroutines.*

fun main() = runBlocking<Unit> {
launch(Dispatchers.Unconfined) { // 不受限制 -- 将与主线程一起工作
println("Unconfined: 运行在线程 ${Thread.currentThread().name}")
delay(500)
println("Unconfined: 延迟后在线程 ${Thread.currentThread().name}")
}
launch { // 父协程的上下文,主runBlocking协程
println("main runBlocking: 运行在线程 ${Thread.currentThread().name}")
delay(1000)
println("main runBlocking: 延迟后在线程 ${Thread.currentThread().name}")
}
}

线程之间的跳转

协程可以在不同的线程间切换。以下示例展示了如何使用withContext切换上下文:

import kotlinx.coroutines.*

fun log(msg: String) = println("[${Thread.currentThread().name}] $msg")

fun main() {
newSingleThreadContext("Ctx1").use { ctx1 ->
newSingleThreadContext("Ctx2").use { ctx2 ->
runBlocking(ctx1) {
log("在ctx1中开始")
withContext(ctx2) {
log("在ctx2中工作")
}
log("回到ctx1")
}
}
}
}

这个例子演示了两种协程使用的新技术:

  1. 第一种技术展示了如何使用具有指定上下文的runBlocking
  2. 第二种技术涉及调用withContext,它可能挂起当前协程并切换到新的上下文。

上下文中的Job

协程的Job是其上下文的一部分,可以使用coroutineContext[Job]表达式从中检索:

import kotlinx.coroutines.*

fun main() = runBlocking<Unit> {
println("当前job是 ${coroutineContext[Job]}")
}

CoroutineScope中的isActive实际上只是coroutineContext[Job]?.isActive == true的一个方便的快捷方式。

协程的子项

当一个协程在另一个协程的CoroutineScope中启动时,它会通过CoroutineScope.coroutineContext继承其上下文,新协程的Job成为父协程job的子项。当父协程被取消时,所有其子项也会递归地被取消。

然而,这种父子关系可以通过两种方式明确地覆盖:

  1. 当启动协程时明确指定不同的作用域(例如,GlobalScope.launch),它不会从父作用域继承Job
  2. 当为新协程传递一个不同的Job对象作为上下文时(如下面的例子所示),它会覆盖父作用域的Job
import kotlinx.coroutines.*

fun main() = runBlocking<Unit> {
// 启动协程来处理某种传入请求
val request = launch {
// 它产生两个其他job
launch(Job()) {
println("job1: 我在自己的Job中运行,独立执行!")
delay(1000)
println("job1: 我不受request取消的影响")
}
// 另一个继承父上下文
launch {
delay(100)
println("job2: 我是request协程的子项")
delay(1000)
println("job2: 如果我的父request被取消,我不会执行这一行")
}
}
delay(500)
request.cancel() // 取消请求的处理
println("main: 谁在request取消后存活?")
delay(1000) // 延迟主线程一秒钟,看看会发生什么
}

父级责任

父协程始终等待所有子协程完成。父级不必显式跟踪它启动的所有子项,不必在最后使用Job.join等待它们:

import kotlinx.coroutines.*

fun main() = runBlocking<Unit> {
// 启动协程来处理某种传入请求
val request = launch {
repeat(3) { i ->
launch {
delay((i + 1) * 200L) // 可变延迟200ms、400ms、600ms
println("协程 $i 完成")
}
}
println("request: 我完成了,不显式加入我仍然活跃的子协程")
}
request.join() // 等待请求完成,包括所有其子协程
println("现在请求处理完成")
}

调试协程

协程可以挂起在一个线程上并在另一个线程上恢复。甚至使用单线程调度器,也可能很难弄清楚协程在哪里、何时以及做什么,除非有特殊工具。

使用IntelliJ IDEA调试

Kotlin插件的Coroutine Debugger简化了在IntelliJ IDEA中调试协程的过程。Debug工具窗口包含Coroutines选项卡,可以找到当前运行和挂起的协程信息。

使用协程调试器,可以:

  • 检查每个协程的状态
  • 查看运行和挂起协程的局部变量和捕获变量的值
  • 查看完整的协程创建堆栈以及协程内部的调用堆栈
  • 获取包含每个协程状态及其堆栈的完整报告

使用日志调试

另一种调试带有线程的应用程序的方法是在每条日志语句上打印线程名称。使用协程时,仅线程名称本身不会提供太多上下文,因此kotlinx.coroutines包含调试工具使其更容易。

使用-Dkotlinx.coroutines.debug JVM选项运行代码:

import kotlinx.coroutines.*

fun log(msg: String) = println("[${Thread.currentThread().name}] $msg")

fun main() = runBlocking<Unit> {
val a = async {
log("我正在计算答案的一部分")
6
}
val b = async {
log("我正在计算答案的另一部分")
7
}
log("答案是 ${a.await() * b.await()}")
}

命名协程进行调试

自动分配的id对于经常记录的协程来说是好的,只需要关联来自同一协程的日志记录。然而,当协程与特定请求的处理或执行特定后台任务相关联时,最好为调试目的显式命名它。CoroutineName上下文元素与线程名称服务于相同的目的,当打开调试模式时,它包含在执行此协程的线程名称中。

import kotlinx.coroutines.*

fun log(msg: String) = println("[${Thread.currentThread().name}] $msg")

fun main() = runBlocking(CoroutineName("main")) {
log("开始主协程")
// 运行两个后台值计算
val v1 = async(CoroutineName("v1coroutine")) {
delay(500)
log("计算v1")
6
}
val v2 = async(CoroutineName("v2coroutine")) {
delay(1000)
log("计算v2")
7
}
log("v1 * v2的答案是 ${v1.await() * v2.await()}")
}

组合上下文元素

有时需要为协程上下文定义多个元素。可以使用+运算符来实现。例如,可以同时显式指定调度器和显式指定名称来启动协程:

import kotlinx.coroutines.*

fun main() = runBlocking<Unit> {
launch(Dispatchers.Default + CoroutineName("test")) {
println("我在线程 ${Thread.currentThread().name} 中工作")
}
}

协程作用域

如果有一个带有生命周期的对象,但该对象不是协程,可以创建与对象生命周期相关联的CoroutineScope实例。例如,在Android应用程序中,可能在Android活动的上下文中启动各种协程来执行异步操作,当活动被销毁时,这些协程必须被取消以避免内存泄漏。

class Activity {
private val mainScope = MainScope() // MainScope适用于UI应用程序

fun destroy() {
mainScope.cancel()
}

fun doSomething() {
// 启动10个协程,每个工作不同的时间
repeat(10) { i ->
mainScope.launch {
delay((i + 1) * 200L) // 可变延迟200ms、400ms等
println("协程 $i 完成")
}
}
}
}

fun main() = runBlocking<Unit> {
val activity = Activity()
activity.doSomething() // 运行测试函数
println("启动了协程")
delay(500L) // 延迟半秒
println("销毁活动!")
activity.destroy() // 取消所有协程
delay(1000) // 视觉上确认它们不工作
}

线程本地数据

有时能够在协程之间传递一些线程本地数据是很方便的。对于ThreadLocalasContextElement扩展函数可以帮助解决这个问题。它创建一个额外的上下文元素,保存给定ThreadLocal的值,并在协程每次切换上下文时恢复它。

import kotlinx.coroutines.*

val threadLocal = ThreadLocal<String?>() // 声明线程局部变量

fun main() = runBlocking<Unit> {
threadLocal.set("main")
println("预主线程:${Thread.currentThread()},线程本地值:'${threadLocal.get()}'")
val job = launch(Dispatchers.Default + threadLocal.asContextElement(value = "launch")) {
println("启动开始,当前线程:${Thread.currentThread()},线程本地值:'${threadLocal.get()}'")
yield()
println("yield之后,当前线程:${Thread.currentThread()},线程本地值:'${threadLocal.get()}'")
}
job.join()
println("后主线程:${Thread.currentThread()},线程本地值:'${threadLocal.get()}'")
}

3. 通道与流

Flow基础

挂起函数异步返回单个值,但如何返回多个异步计算的值?Kotlin Flow用于此目的。

表示多个值

在Kotlin中,可以使用Flow类型来表示异步计算的值流,就像使用Sequence类型表示同步计算的值一样:

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun simple(): Flow<Int> = flow { // flow构建器
for (i in 1..3) {
delay(100) // 假设正在进行有用的处理
emit(i) // 发出下一个值
}
}

fun main() = runBlocking<Unit> {
// 启动一个并发协程,检查主线程是否被阻塞
launch {
for (k in 1..3) {
println("我没有被阻塞 $k")
delay(100)
}
}
// 收集流
simple().collect { value ->
println(value)
}
}

这段代码在不阻塞主线程的情况下,每100毫秒打印一个数字。这通过从单独的协程打印”我没有被阻塞”来验证,该协程每100毫秒在主线程中运行一次:

我没有被阻塞 1
1
我没有被阻塞 2
2
我没有被阻塞 3
3

注意Flow与前面示例的以下区别:

  • Flow类型的构建器函数称为flow
  • flow { ... }构建器块内的代码可以挂起。
  • simple函数不再标记为suspend修饰符。
  • 从流中使用emit函数发出值。
  • 使用collect函数从流中收集值。

Flow是冷的

Flow是冷流,类似于序列 - flow构建器内的代码直到流被收集时才运行。以下示例清楚地展示了这一点:

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun simple(): Flow<Int> = flow {
println("Flow started")
for (i in 1..3) {
delay(100)
emit(i)
}
}

fun main() = runBlocking<Unit> {
println("Calling simple function...")
val flow = simple()
println("Calling collect...")
flow.collect { value ->
println(value)
}
println("Calling collect again...")
flow.collect { value ->
println(value)
}
}

输出如下:

Calling simple function...
Calling collect...
Flow started
1
2
3
Calling collect again...
Flow started
1
2
3

这是simple函数(返回流)没有标记为suspend修饰符的关键原因。simple()调用本身会快速返回,不等待任何东西。每次收集时,流都会重新启动,这就是为什么每次再次调用collect时都会看到”Flow started”。

流运算符

可以使用运算符转换流,方式类似于转换集合和序列。中间运算符应用于上游流并返回下游流。这些运算符是冷的,就像流一样。调用此类运算符不是挂起函数本身,它可以快速工作,返回新转换流的定义。

基本运算符具有熟悉的名称,如mapfilter。与序列的重要区别在于,这些运算符内部的代码块可以调用挂起函数。

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

suspend fun performRequest(request: Int): String {
delay(1000) // 模拟长时间运行的异步工作
return "response $request"
}

fun main() = runBlocking<Unit> {
(1..3).asFlow() // 请求流
.map { request -> performRequest(request) }
.collect { response -> println(response) }
}

这会产生以下三行,每行在前一行之后一秒钟出现:

response 1
response 2
response 3

transform运算符

在流转换运算符中,最通用的一个叫做transform。它可以用来模拟简单的转换如mapfilter,以及实现更复杂的转换。使用transform运算符,可以发出任意次数的任意值。

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

suspend fun performRequest(request: Int): String {
delay(1000) // 模拟长时间运行的异步工作
return "response $request"
}

fun main() = runBlocking<Unit> {
(1..3).asFlow() // 请求流
.transform { request ->
emit("Making request $request")
emit(performRequest(request))
}
.collect { response -> println(response) }
}

输出:

Making request 1
response 1
Making request 2
response 2
Making request 3
response 3

流的限制大小操作符

限制大小的中间操作符如take在达到相应限制时取消流的执行。协程中的取消总是通过抛出异常来执行,因此所有资源管理函数(如try { ... } finally { ... }块)在取消的情况下也能正常操作:

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun numbers(): Flow<Int> = flow {
try {
emit(1)
emit(2)
println("This line will not execute")
emit(3)
} finally {
println("Finally in numbers")
}
}

fun main() = runBlocking<Unit> {
numbers()
.take(2) // 只取前两个
.collect { value -> println(value) }
}

输出:

1
2
Finally in numbers

流的终端操作符

流上的终端操作符是启动流收集的挂起函数。collect操作符是最基本的一个,但还有其他使流变得更容易的终端操作符:

  • 转换为各种集合,如toListtoSet
  • 获取first值并确保流发出single值的操作符
  • 使用reducefold将流缩减为一个值

例如:

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun main() = runBlocking<Unit> {
val sum = (1..5).asFlow()
.map { it * it } // 1到5的平方
.reduce { a, b -> a + b } // 求和(终端操作符)
println(sum)
}

打印单个数字:

55

流的上下文

流的收集总是在调用协程的上下文中发生。这称为上下文保存属性,意味着无论流的实现细节如何,simple流的代码都在调用者指定的上下文中运行:

withContext(context) {
simple().collect { value ->
println(value) // 在指定的上下文中运行
}
}

默认情况下,flow { ... }构建器中的代码在由相应流的收集器提供的上下文中运行。

使用flowOn更改上下文

flowOn操作符可用于更改流发射的上下文:

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun log(msg: String) = println("[${Thread.currentThread().name}] $msg")

fun simple(): Flow<Int> = flow {
for (i in 1..3) {
Thread.sleep(100) // 假设我们在CPU密集型方式中计算它
log("发射 $i")
emit(i) // 发出下一个值
}
}.flowOn(Dispatchers.Default) // 在flow构建器中更改CPU密集型代码的上下文的正确方法

fun main() = runBlocking<Unit> {
simple().collect { value ->
log("收集 $value")
}
}

输出显示flow { ... }在后台线程中工作,而收集发生在主线程:

[DefaultDispatcher-worker-1 @coroutine#2] 发射 1
[main @coroutine#1] 收集 1
[DefaultDispatcher-worker-1 @coroutine#2] 发射 2
[main @coroutine#1] 收集 2
[DefaultDispatcher-worker-1 @coroutine#2] 发射 3
[main @coroutine#1] 收集 3

flowOn操作符改变了流的默认顺序性质。现在收集发生在一个协程中(”coroutine#1”),而发射发生在另一个协程中(”coroutine#2”),它在另一个线程中与收集协程并发运行。

4. 协程实战

缓冲流

在不同协程中运行流的不同部分对于整体收集流所需的时间可能是有帮助的,特别是在涉及长时间运行的异步操作时。例如,考虑一个简单流的发射很慢,需要100毫秒来产生一个元素;而收集器也很慢,需要300毫秒来处理一个元素。让我们看看收集这样一个带有三个数字的流需要多长时间:

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import kotlin.system.*

fun simple(): Flow<Int> = flow {
for (i in 1..3) {
delay(100) // 假设我们异步等待100毫秒
emit(i) // 发出下一个值
}
}

fun main() = runBlocking<Unit> {
val time = measureTimeMillis {
simple().collect { value ->
delay(300) // 假设我们处理它300毫秒
println(value)
}
}
println("收集耗时 $time 毫秒")
}

它会产生类似这样的结果,整个收集大约需要1200毫秒(三个数字,每个400毫秒):

1
2
3
收集耗时 1220 毫秒

可以使用buffer操作符在流上运行simple流的发射代码与收集代码并发,而不是顺序运行它们:

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import kotlin.system.*

fun simple(): Flow<Int> = flow {
for (i in 1..3) {
delay(100) // 假设我们异步等待100毫秒
emit(i) // 发出下一个值
}
}

fun main() = runBlocking<Unit> {
val time = measureTimeMillis {
simple()
.buffer() // 缓冲发射,不等待
.collect { value ->
delay(300) // 假设我们处理它300毫秒
println(value)
}
}
println("收集耗时 $time 毫秒")
}

它会产生相同的数字,但更快,因为我们有效地创建了一个处理流水线,只需为第一个数字等待100毫秒,然后每个数字只需花费300毫秒处理。这样运行大约需要1000毫秒:

1
2
3
收集耗时 1071 毫秒

协调多个流

有很多方法可以组合多个流。

zip操作符

就像Kotlin标准库中的Sequence.zip扩展函数一样,流有zip操作符,它结合两个流的对应值:

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun main() = runBlocking<Unit> {
val nums = (1..3).asFlow() // 数字1..3
val strs = flowOf("one", "two", "three") // 字符串
nums.zip(strs) { a, b -> "$a -> $b" } // 组合成单个字符串
.collect { println(it) } // 收集并打印
}

这个例子打印:

1 -> one
2 -> two
3 -> three

combine操作符

当流表示一个变量或操作的最新值时,可能需要执行一个计算,该计算依赖于相应流的最新值,并在任何上游流发出值时重新计算它。相应的操作符系列称为combine

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun main() = runBlocking<Unit> {
val nums = (1..3).asFlow().onEach { delay(300) } // 每300毫秒数字1..3
val strs = flowOf("one", "two", "three").onEach { delay(400) } // 每400毫秒字符串
val startTime = System.currentTimeMillis() // 记住开始时间
nums.combine(strs) { a, b -> "$a -> $b" } // 用"combine"组合成单个字符串
.collect { value -> // 收集并打印
println("$value 在开始后 ${System.currentTimeMillis() - startTime} 毫秒")
}
}

我们得到了非常不同的输出,在来自numsstrs流的每次发出时都打印一行:

1 -> one 在开始后 452 毫秒
2 -> one 在开始后 651 毫秒
2 -> two 在开始后 854 毫秒
3 -> two 在开始后 952 毫秒
3 -> three 在开始后 1256 毫秒

扁平化流

流表示异步接收的值序列,所以很容易进入每个值触发另一个值序列请求的情况。例如,可以有以下函数,它返回两个字符串的流,间隔500毫秒:

fun requestFlow(i: Int): Flow<String> = flow {
emit("$i: First")
delay(500) // 等待500毫秒
emit("$i: Second")
}

可能需要不同的扁平化模式,因此流有一系列扁平化操作符:

flatMapConcat

流的流的连接由flatMapConcatflattenConcat操作符提供。它们是对应序列操作符的最直接类似物。它们等待内部流完成,然后开始收集下一个流,如以下示例所示:

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun requestFlow(i: Int): Flow<String> = flow {
emit("$i: First")
delay(500) // 等待500毫秒
emit("$i: Second")
}

fun main() = runBlocking<Unit> {
val startTime = System.currentTimeMillis() // 记住开始时间
(1..3).asFlow().onEach { delay(100) } // 每100毫秒发出一个数字
.flatMapConcat { requestFlow(it) }
.collect { value -> // 收集并打印
println("$value 在开始后 ${System.currentTimeMillis() - startTime} 毫秒")
}
}

flatMapConcat的顺序特性在输出中清晰可见:

1: First 在开始后 121 毫秒
1: Second 在开始后 622 毫秒
2: First 在开始后 727 毫秒
2: Second 在开始后 1227 毫秒
3: First 在开始后 1328 毫秒
3: Second 在开始后 1829 毫秒

flatMapMerge

另一种扁平化操作是并发收集所有传入流并将它们的值合并到单个流中,以便尽快发出值。它由flatMapMergeflattenMerge操作符实现。它们都接受一个可选的concurrency参数,限制同时收集的并发流数量(默认等于DEFAULT_CONCURRENCY)。

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun requestFlow(i: Int): Flow<String> = flow {
emit("$i: First")
delay(500) // 等待500毫秒
emit("$i: Second")
}

fun main() = runBlocking<Unit> {
val startTime = System.currentTimeMillis() // 记住开始时间
(1..3).asFlow().onEach { delay(100) } // 每100毫秒一个数字
.flatMapMerge { requestFlow(it) }
.collect { value -> // 收集并打印
println("$value 在开始后 ${System.currentTimeMillis() - startTime} 毫秒")
}
}

flatMapMerge的并发性质是显而易见的:

1: First 在开始后 136 毫秒
2: First 在开始后 231 毫秒
3: First 在开始后 333 毫秒
1: Second 在开始后 639 毫秒
2: Second 在开始后 732 毫秒
3: Second 在开始后 833 毫秒

flatMapLatest

与之前描述的collectLatest操作符类似,有一个相应的”Latest”扁平化模式,一旦发出新流,前一个流的收集就会被取消。它由flatMapLatest操作符实现。

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun requestFlow(i: Int): Flow<String> = flow {
emit("$i: First")
delay(500) // 等待500毫秒
emit("$i: Second")
}

fun main() = runBlocking<Unit> {
val startTime = System.currentTimeMillis() // 记住开始时间
(1..3).asFlow().onEach { delay(100) } // 每100毫秒一个数字
.flatMapLatest { requestFlow(it) }
.collect { value -> // 收集并打印
println("$value 在开始后 ${System.currentTimeMillis() - startTime} 毫秒")
}
}

这个例子的输出很好地展示了flatMapLatest的工作方式:

1: First 在开始后 142 毫秒
2: First 在开始后 322 毫秒
3: First 在开始后 425 毫秒
3: Second 在开始后 931 毫秒

5. 协程高级模式

流异常处理

当发射器或操作符内的代码抛出异常时,流收集可能会以异常完成。有几种处理这些异常的方法。

收集器try和catch

收集器可以使用Kotlin的try/catch块来处理异常:

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun simple(): Flow<Int> = flow {
for (i in 1..3) {
println("发射 $i")
emit(i) // 发出下一个值
}
}

fun main() = runBlocking<Unit> {
try {
simple().collect { value ->
println(value)
check(value <= 1) { "收集了 $value" }
}
} catch (e: Throwable) {
println("捕获到 $e")
}
}

此代码成功捕获了collect终端操作符中的异常,如我们所见,在此之后不会发出更多值:

发射 1
1
发射 2
2
捕获到 java.lang.IllegalStateException: 收集了 2

异常透明性

但是发射器代码如何封装其异常处理行为?

流必须对异常透明,从try/catch块内的flow { ... }构建器中emit值违反了异常透明性。这保证了抛出异常的收集器始终可以使用try/catch捕获它,如前面的例子所示。

发射器可以使用catch操作符来保持这种异常透明性,并允许封装其异常处理。catch操作符的主体可以分析异常并根据捕获的异常以不同方式做出反应:

  • 可以使用throw重新抛出异常。
  • 可以使用catch主体中的emit将异常转换为值发出。
  • 可以忽略、记录或处理异常。

例如,让我们在捕获异常时发出文本:

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun simple(): Flow<String> = flow {
for (i in 1..3) {
println("发射 $i")
emit(i) // 发出下一个值
}
}
.map { value ->
check(value <= 1) { "崩溃于 $value" }
"字符串 $value"
}

fun main() = runBlocking<Unit> {
simple()
.catch { e -> emit("捕获到 $e") } // 在异常上发出
.collect { value -> println(value) }
}

即使我们在代码周围没有try/catch,示例的输出也是相同的。

流的完成

当流集合完成(正常或异常)时,可能需要执行一个动作。可以用两种方式来完成:命令式或声明式。

命令式finally块

除了try/catch外,收集器还可以使用finally块在collect完成时执行一个动作。

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun simple(): Flow<Int> = (1..3).asFlow()

fun main() = runBlocking<Unit> {
try {
simple().collect { value -> println(value) }
} finally {
println("完成")
}
}

这段代码打印simple流产生的三个数字,后跟”完成”字符串:

1
2
3
完成

声明式处理

对于声明式方法,流有onCompletion中间操作符,当流完全收集时它会被调用。

前面的例子可以使用onCompletion操作符重写,产生相同的输出:

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun simple(): Flow<Int> = (1..3).asFlow()

fun main() = runBlocking<Unit> {
simple()
.onCompletion { println("完成") }
.collect { value -> println(value) }
}

onCompletion的主要优势是它有一个空的Throwable参数,可以用来确定流集合是正常完成还是异常完成。

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun simple(): Flow<Int> = flow {
emit(1)
throw RuntimeException()
}

fun main() = runBlocking<Unit> {
simple()
.onCompletion { cause ->
if (cause != null) println("流异常完成")
}
.catch { cause -> println("捕获到异常") }
.collect { value -> println(value) }
}

如预期的那样,它打印:

1
流异常完成
捕获到异常

catch不同,onCompletion操作符不处理异常。异常仍然向下游流动,可以由catch操作符处理。

声明式与命令式

现在我们知道了如何收集流,并以命令式和声明式方式处理它的完成和异常。自然的问题是,哪种方法更可取,为什么?作为一个库,我们不提倡任何特定的方法,认为这两种选择都是有效的,应该根据自己的喜好和代码风格来选择。

流的启动

使用流来表示来自某个源的异步事件很容易。在这种情况下,我们需要addEventListener函数的类似物,它注册一段代码以对传入事件做出反应并继续进一步工作。onEach操作符可以扮演这个角色。然而,onEach是一个中间操作符。我们还需要一个终端操作符来收集流。否则,仅仅调用onEach是没有效果的。

如果使用collect终端操作符之后使用onEach,则它之后的代码将等到流被收集完毕:

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

// 模拟事件流
fun events(): Flow<Int> = (1..3).asFlow().onEach { delay(100) }

fun main() = runBlocking<Unit> {
events()
.onEach { event -> println("事件: $event") }
.collect() // <--- 收集流,等待
println("完成")
}

它打印:

事件: 1
事件: 2
事件: 3
完成

launchIn终端操作符在这里很有用。通过用launchIn替换collect,可以在单独的协程中启动流的收集,以便立即继续执行后续代码:

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

// 模拟事件流
fun events(): Flow<Int> = (1..3).asFlow().onEach { delay(100) }

fun main() = runBlocking<Unit> {
events()
.onEach { event -> println("事件: $event") }
.launchIn(this) // <--- 在单独的协程中启动流
println("完成")
}

它打印:

完成
事件: 1
事件: 2
事件: 3

launchIn所需的参数必须指定在其中启动收集流的协程的CoroutineScope。在上面的例子中,这个作用域来自runBlocking协程构建器,因此流运行时,这个runBlocking作用域等待其子协程完成,并防止主函数返回和终止这个例子。

在实际应用中,作用域将来自寿命有限的实体。一旦该实体的寿命终止,相应的作用域被取消,取消相应流的收集。这种方式下,onEach { ... }.launchIn(scope)的组合就像addEventListener。然而,不需要相应的removeEventListener函数,因为取消和结构化并发服务于这个目的。

注意,launchIn还返回一个Job,可用于仅取消相应的流收集协程而不取消整个作用域,或者join它。

流取消检查

为了方便,flow构建器在每个发出的值上执行额外的ensureActive检查以进行取消。这意味着从flow { ... }发射的忙循环是可取消的:

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun foo(): Flow<Int> = flow {
for (i in 1..5) {
println("发射 $i")
emit(i)
}
}

fun main() = runBlocking<Unit> {
foo().collect { value ->
if (value == 3) cancel()
println(value)
}
}

我们只得到最多3的数字,然后尝试发射数字4后出现CancellationException

发射 1
1
发射 2
2
发射 3
3
发射 4
Exception in thread "main" kotlinx.coroutines.JobCancellationException: BlockingCoroutine was cancelled; job="coroutine#1":BlockingCoroutine{Cancelled}@6d7b4f4c

然而,大多数其他流操作符出于性能原因自身不执行额外的取消检查。例如,如果使用IntRange.asFlow扩展来编写相同的忙循环,并且不在任何地方挂起,那么就没有检查取消:

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun main() = runBlocking<Unit> {
(1..5).asFlow().collect { value ->
if (value == 3) cancel()
println(value)
}
}

所有从1到5的数字都被收集,只有在runBlocking返回之前才检测到取消:

1
2
3
4
5
Exception in thread "main" kotlinx.coroutines.JobCancellationException: BlockingCoroutine was cancelled; job="coroutine#1":BlockingCoroutine{Cancelled}@3327bd23

使忙碌流可取消

在有协程的忙循环情况下,必须显式检查取消。可以添加.onEach { currentCoroutineContext().ensureActive() },但有一个现成的cancellable操作符来做这件事:

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun main() = runBlocking<Unit> {
(1..5).asFlow().cancellable().collect { value ->
if (value == 3) cancel()
println(value)
}
}

使用cancellable操作符,只收集1到3的数字:

1
2
3
Exception in thread "main" kotlinx.coroutines.JobCancellationException: BlockingCoroutine was cancelled; job="coroutine#1":BlockingCoroutine{Cancelled}@5ec0a365

参考链接