<>5. 线程控制:Scheduler (二)

除了灵活的变换,RxJava 另一个牛逼的地方,就是线程的自由控制。

5.1 Scheduler 的 API (二)

前面讲到了,可以利用subscribeOn()结合observeOn()来实现线程控制,让事件的产生和消费发生在不同的线程。可是在了解了map()
flatMap() 等变换方法后,有些好事的(其实就是当初刚接触 RxJava 时的我)就问了:能不能多切换几次线程?

答案是:能。因为observeOn()指定的是Subscriber的线程,而这个 Subscriber
并不是(严格说应该为『不一定是』,但这里不妨理解为『不是』)subscribe()参数中的Subscriber,而是 observeOn()执行时的当前
Observable 所对应的Subscriber,即它的直接下级 Subscriber。换句话说,observeOn()
指定的是它之后的操作所在的线程。因此如果有多次切换线程的需求,只要在每个想要切换线程的位置调用一次observeOn()即可。上代码:
Observable.just(1, 2, 3, 4) // IO 线程,由 subscribeOn() 指定 .subscribeOn(Schedulers
.io()) .observeOn(Schedulers.newThread()) .map(mapOperator) // 新线程,由
observeOn() 指定 .observeOn(Schedulers.io()) .map(mapOperator2) // IO 线程,由
observeOn() 指定 .observeOn(AndroidSchedulers.mainThread) .subscribe(subscriber);
// Android 主线程,由 observeOn() 指定
如上,通过observeOn()的多次调用,程序实现了线程的多次切换。不过,不同于observeOn(),subscribeOn()
的位置放在哪里都可以,但它是只能调用一次的。又有好事的(其实还是当初的我)问了:如果我非要调用多次subscribeOn()呢?会有什么效果?

这个问题先放着,我们还是从 RxJava 线程控制的原理说起吧。

5.2 Scheduler 的原理(二)

其实,subscribeOn()和observeOn()的内部实现,也是用的lift()。具体看图(不同颜色的箭头表示不同的线程):



observeOn()原理图:



从图中可以看出,subscribeOn()和observeOn()都做了线程切换的工作(图中的 “schedule…” 部位)。不同的是,
subscribeOn()的线程切换发生在OnSubscribe中,即在它通知上一级OnSubscribe时,这时事件还没有开始发送,因此
subscribeOn()的线程控制可以从事件发出的开端就造成影响;而observeOn()的线程切换则发生在它内建的 Subscriber
中,即发生在它即将给下一级Subscriber发送事件时,因此observeOn()控制的是它后面的线程。

最后,我用一张图来解释当多个 subscribeOn() 和 observeOn()
混合使用时,线程调度是怎么发生的(由于图中对象较多,相对于上面的图对结构做了一些简化调整):



图中共有 5 处含有对事件的操作。由图中可以看出,①和②两处受第一个 subscribeOn() 影响,运行在红色线程;③和④处受第一个observeOn()
的影响,运行在绿色线程;⑤处受第二个onserveOn()影响,运行在紫色线程;而第二个subscribeOn(),由于在通知过程中线程就被第一个
subscribeOn()截断,因此对整个流程并没有任何影响。这里也就回答了前面的问题:当使用了多个subscribeOn()的时候,只有第一个
subscribeOn()起作用。

5.3 延伸:doOnSubscribe()

然而,虽然超过一个的subscribeOn()对事件处理的流程没有影响,但在流程之前却是可以利用的。

在前面讲Subscriber的时候,提到过Subscriber的onStart()可以用作流程开始前的初始化。然而onStart()由于在
subscribe()发生时就被调用了,因此不能指定线程,而是只能执行在subscribe()被调用时的线程。这就导致如果onStart()
中含有对线程有要求的代码(例如在界面上显示一个ProgressBar,这必须在主线程执行),将会有线程非法的风险,因为有时你无法预测subscribe()
将会在什么线程执行。

而与Subscriber.onStart()相对应的,有一个方法 Observable.doOnSubscribe()。它和
Subscriber.onStart()同样是在subscribe()调用后而且在事件发送前执行,但区别在于它可以指定线程。默认情况下,
doOnSubscribe()执行在subscribe()发生的线程;而如果在doOnSubscribe()之后有subscribeOn()
的话,它将执行在离它最近的subscribeOn()所指定的线程。

示例代码:
Observable.create(onSubscribe) .subscribeOn(Schedulers.io()) .doOnSubscribe(new
Action0() { @Override public void call() { progressBar.setVisibility(View.
VISIBLE); // 需要在主线程执行 } }) .subscribeOn(AndroidSchedulers.mainThread()) // 指定主线程
.observeOn(AndroidSchedulers.mainThread()) .subscribe(subscriber);
如上,在doOnSubscribe()的后面跟一个subscribeOn(),就能指定准备工作的线程了。

<>RxJava 的适用场景和使用方式

1. 与 Retrofit 的结合

Retrofit 是 Square 的一个著名的网络请求库。没有用过 Retrofit
的可以选择跳过这一小节也没关系,我举的每种场景都只是个例子,而且例子之间并无前后关联,只是个抛砖引玉的作用,所以你跳过这里看别的场景也可以的。

2. RxBinding

RxBinding <https://link.jianshu.com?t=https://github.com/JakeWharton/RxBinding>
是 Jake Wharton 的一个开源库,它提供了一套在 Android 平台上的基于 RxJava 的 Binding API。所谓
Binding,就是类似设置 OnClickListener、设置 TextWatcher这样的注册绑定对象的 API。

3. 各种异步操作

前面举的 Retrofit 和 RxBinding 的例子,是两个可以提供现成的 Observable 的库。而如果你有某些异步操作无法用这些库来自动生成
Observable,也完全可以自己写。例如数据库的读写、大图片的载入、文件压缩/解压等各种需要放在后台工作的耗时操作,都可以用 RxJava
来实现,有了之前几章的例子,这里应该不用再举例了。

4. RxBus

RxBus 名字看起来像一个库,但它并不是一个库,而是一种模式,它的思想是使用 RxJava 来实现了 EventBus ,而让你不再需要使用 Otto
或者 GreenRobot 的 EventBus。至于什么是 RxBus,可以看这篇文章。顺便说一句,Flipboard 已经用 RxBus 替换掉了
Otto ,目前为止没有不良反应。

<>最后

对于 Android 开发者来说, RxJava 是一个很难上手的库,因为它对于 Android
开发者来说有太多陌生的概念了,可是它真的很牛逼。希望此文能给始终搞不明白什么是 RxJava 的人一些入门的指引,或者能让正在使用 RxJava
但仍然心存疑惑的人看到一些更深入的解析。无论如何,只要能给各位同为 Android 工程师的你们提供一些帮助,该系列的目的就达到了。
希望大家多多支持,关注我后续还有更多Android开发经验分享。

<>最后

在这里我总结出了互联网公司Android程序员面试简历模板,面试涉及到的绝大部分面试题及答案做成了文档和架构视频资料免费分享给大家【
包括高级UI、性能优化、架构师课程、NDK、Kotlin、混合式开发(ReactNative+Weex)、Flutter等架构技术资料
】,希望能帮助到您面试前的复习且找到一个好的工作,也节省大家在网上搜索资料的时间来学习。

<>资料获取方式:加入Android架构交流QQ群聊:513088520 ,进群即领取资料!!!

<>点击链接加入群聊【Android移动架构总群】:加入群聊 <https://jq.qq.com/?_wv=1027&k=5V5U98t>


友情链接
KaDraw流程图
API参考文档
OK工具箱
云服务器优惠
阿里云优惠券
腾讯云优惠券
华为云优惠券
站点信息
问题反馈
邮箱:[email protected]
QQ群:637538335
关注微信