文章目录
- RxJava2 依赖
- 作用 - 异步
- 模式 - 观察者模式
- 结构 - 响应式编程
- 优势 - 逻辑简洁
- RxJava2 入门
- 事件产生
- 事件消费
- 事件订阅
- 区分回调动作
- 入门示例
- RxJava 进阶
- Scheduler线程控制
- 变换
- map操作符
- flatMap操作符
- RxJava 其他常用操作符
- 1.from系列
- 2.just
- 3.filter
- 4.take
- 5.doOnNext
- 6.debounce
- 7.merge
- 8.concat
- 9.compose
- 10.first
- 11.timer
- 12.interval
- 13.throttleFirst
- 14.Single
- 15.Subject
- RxJava取消订阅
- 总结
RxJava2 依赖 RxJava 在Github Repo上给的解释是:
RxJava is a Java VM implementation of Reactive Extensions: a library for composing asynchronous and event-based programs by using observable sequences.
一个在 Java VM 上使用可观测的序列来组成异步的、基于事件的程序的库。
在gradle中做以下依赖以使用RxJava2:
implementation "io.reactivex.rxjava2:rxjava:2.2.5"
implementation 'io.reactivex.rxjava2:rxandroid:2.1.0'
作用 - 异步
上面 这段解释,重点就在于
异步
!RxJava精华可以浓缩为异步
两个字,其核心的东西不外乎两个:1.Observable(被观察者)
2.Observer/Subscriber(观察者)
Observables可以发出一系列的 事件,这里的事件可以是任何东西,例如网络请求、复杂计算处理、数据库操作、文件操作等等,事件执行结束后交给 Observer/Subscriber 的回调处理。
模式 - 观察者模式
观察者模式是一种对象的行为模式,是 Java 设计模式中很常用的一个模式。
RxJava 是一种扩展的观察者模式!从windows的理解角度来说说,它非常类似于REDM中的
Event
绑定事件,只是通过模板化加了更多的控制。结构 - 响应式编程
响应式?顾名思义,就是“你变化,我响应”。举个栗子,
a = b + c;
这句代码将b+c的值赋给a,而之后如果b和c的值改变了不会影响到a,然而,对于响应式编程,之后b和c的值的改变也动态影响着a,意味着a会随着b和c的变化而变化。响应式编程的组成为Observable/Operator/Subscriber,RxJava在响应式编程中的基本流程如下:
Observable -> Operator 1 -> Operator 2 -> Operator 3 -> Subscriber
这个流程,可以简单的理解为:
- Observable 广播一系列事件,他是事件的产生(发送)者。
- Subscriber 负责监听处理事件,他是事件的消费(监听)者。
- Operator 是对 Observable 发出的事件在接收前进行修改和变换。
- 若事件从产生(发送)到消费(监听)不需要其他处理,则可以省略掉中间的 Operator,从而流程变为
Obsevable -> Subscriber
。 - Subscriber 通常在主线程执行,所以原则上不要去处理太多的事务,而这些复杂的事务处理则交给 Operator。
Rx 优势可以概括为四个字,那就是
逻辑简洁
。然而,逻辑简洁并不意味着代码简洁,但是,由于链式结构,一条龙,你可以从头到尾,从上到下,很清楚的看到这个链式结构的执行顺序。对于开发人员来说,代码质量并不在于代码量,而在于逻辑是否清晰简洁,可维护性如何,代码是否健壮!举个简单栗子,暂时不需要过多理解,后面会一一道来:
Observable.just("Hello World!")
.map(new Function() {
@Override
public String apply(String s) throws Exception {
return s + "I am hgy413!";
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer() {
@Override
public void accept(String s) throws Exception {
Toast.makeText(Main2Activity.this, s, Toast.LENGTH_SHORT).show();
}
});
效果图:

文章图片
RxJava2 入门 前面讲了那么多,大家在概念上对RxJava有一个初步的认识就好,接下来,将为您解开RxJava神秘的面纱~~
无需过分纠结于“事件”这个词,暂时可以简单的把“事件”看成是一个值,或者一个对象。
- 事件产生,就是构造要传递的对象。
2.事件处理变换,就是改变传递的对象,可以改变对象的值,或是干脆创建个新对象,新对象类型也可以与源对象不一样。
3.事件处理,就是接收到对象后要做的事。
事件产生
RxJava创建一个事件比较简单,由 Observable 通过 create 操作符来创建。举个栗子,还是经典的 HelloWorld:
Observable observable = Observable.create(new ObservableOnSubscribe() {
@Override
public void subscribe(ObservableEmitter emitter) throws Exception {
// 发送一个 Hello World 事件
emitter.onNext("Hello World!");
// 事件发送完成
emitter.onComplete();
}
});
这段代码可以理解为, Observable 发出了一个类型为 String ,值为 “Hello World!” 的事件,仅此而已。
ObservableEmitter 可以理解为发射器,是用来发出事件的,它可以发出三种类型的事件,通过调用emitter的
onNext(T value)
、onComplete()
和onError(Throwable error)
可以分别发出next事件、complete事件和error事件。 如果只关心next事件的话,只需单独使用onNext()
即可。需要特别注意,emitter的
onComplete()
调用后,Consumer不再接收任何onNext(T value)
事件,在onComplete()
调用前,onNext(T value)
可多次调用。上面这段代码,也可以通过
just
操作符进行简化。RxJava常用操作符后面会详细介绍,这里先有个了解。 Observable observable = Observable.just("Hello World!");
这样,是不是简单了许多?
事件消费
有事件产生,自然也要有事件消费。RxJava 可以通过 subscribe 操作符,对上述事件进行消费。首先,先创建一个观察者。
Observer observer = new Observer() {
private Disposable disposable;
@Override
public void onSubscribe(Disposable d) {
disposable = d;
}@Override
public void onNext(String s) {
if (.....)// 为异常数据时,解除订阅
disposable.dispose();
}
}@Override
public void onError(Throwable e) {
}@Override
public void onComplete() {
}
};
第一个回调方法
onSubscribe
, 传递参数为Disposable
,用于解除订阅,如上代码所示:disposable.dispose()
。事件订阅
最后,我们可以调用 subscribe 操作符, 进行事件订阅。
observable.subscribe(observer);
在 Observer实现的另三个方法中,顾名思义,对应三种不同状态:
onComplete()
: 事件全部处理完成后回调。onError(Throwable t)
: 事件处理异常回调。onNext(T t)
: 每接收到一个事件,回调一次。
区分回调动作
对于
事件消费
与事件订阅
来说,好像为了打印一个 “Hello World!” 要费好大的劲… 其实,RxJava 自身提供了精简回调方式,我们可以为 Observer 中的三种状态根据自身需要分别创建一个回调动作 Action
:Action onCompleteAction = new Action() {
@Override
public void run() throws Exception {
Log.i(TAG, "complete");
}
};
Consumer onNextAction = new Consumer() {
@Override
public void accept(String s) throws Exception {
Log.i(TAG, s);
}
};
Consumer onErrorAction = new Consumer() {
@Override
public void accept(Throwable throwable) throws Exception {
}
};
那么,RxJava 的事件订阅支持以下三种不完整定义的回调。
observable.subscribe(onNextAction);
observable.subscribe(onNextAction,onErrorAction);
observable.subscribe(onNextAction,onErrorAction,onCompleteAction);
我们可以根据当前需要,传入对应的 Action, RxJava 会相应的自动创建 Observer。
1.Action
表示一个无回调参数的Action。
2.Consumer
表示一个含有一个回调参数的Action。
3.BiConsumer
表示一个含有二个回调参数的Action。
4.Consumer
表示一个含有N个回调参数的Action。
入门示例
前面讲解了事件的产生到消费、订阅的过程,下面就举个完整的例子。从res/mipmap中取出一张图片,显示在ImageView上。
final ImageView ivLogo = (ImageView) findViewById(R.id.ivLogo);
Observable.create(new ObservableOnSubscribe(){@Override
public void subscribe(ObservableEmitter emitter) throws Exception {
// 从mipmap取出一张图片作为Drawable对象
Drawable drawable = getResources().getDrawable(R.mipmap.ic_launcher);
// 把Drawable对象发送出去
emitter.onNext(drawable);
emitter.onComplete();
}
})
.subscribe(new Observer() {
@Override
public void onSubscribe(Disposable d) {
}@Override
public void onNext(Drawable drawable) {
// 接收到Drawable对象,显示在ImageView上
ivLogo.setImageDrawable(drawable);
}@Override
public void onError(Throwable e) {
Log.i(TAG, e.toString());
}@Override
public void onComplete() {
}
});
效果图:

文章图片
上面示例是 RxJava 最基本的一个用法。稍微消化一下,继续~~
RxJava 进阶
Scheduler线程控制
默认情况下,RxJava事件产生和消费均在同一个线程中,例如在主线程中调用,那么事件的产生和消费都在主线程。
那么问题来了,假如事件产生的过程是耗时操作,比如网络请求,结果显示在UI中,这个时候在主线程执行对于网络请求就不合适了,而在子线程执行,显示结果需要进行UI操作,同样不合适~~
所以,RxJava 的第一个牛逼之处在于可以自由切换线程!那么,如何做?
在 RxJava 中,提供了一个名为 Scheduler 的线程调度器,RxJava 内部提供了调度器,分别是:
Schedulers.io()
: 用于IO密集型的操作,例如读写SD卡文件,读写文件,查询数据库,访问网络等 ,具有线程缓存机制,在此调度器接收到任务后,先检查线程缓存池中,是否有空闲的线程,如果有,则复用,如果没有则创建新的线程,并加入到线程池中,如果每次都没有空闲线程使用,可以无上限的创建新线程。 不要在此调度程序上执行计算工作,以免产生不必要的线程。Schedulers.newThread()
: 在每执行一个任务时创建一个新的线程,不具有线程缓存机制,因为创建一个新的线程比复用一个线程更耗时耗力,虽然使用Schedulers.io(?)
的地方,都可以使用Schedulers.newThread(?)
,但是,Schedulers.newThread(?)
的效率没有Schedulers.io(?)
高。Schedulers.computation()
:用于CPU 密集型计算任务,即不会被 I/O 等操作限制性能的耗时操作,例如xml,json文件的解析,Bitmap图片的压缩取样等,具有固定的线程池,大小为CPU的核数。不可以用于I/O操作,因为I/O操作的等待时间会浪费CPU。Schedulers.trampoline()
:在当前线程立即执行任务,如果当前线程有任务在执行,则会将其暂停,等插入进来的任务执行完之后,再将未完成的任务接着执行。Schedulers.single()
:拥有一个线程单例,所有的任务都在这一个线程中执行,当此线程中有任务执行时,其他任务将会按照先进先出的顺序依次执行。AndroidSchedulers.mainThread()
:在Android UI线程中执行任务,为Android开发定制。
subscribeOn()
和 observeOn()
这两个方法来进行线程调度。举个栗子:依然还是显示一张图片,不同的是,这次是从网络上加载图片
final ImageView ivLogo = (ImageView) findViewById(R.id.ivLogo);
Observable.create(new ObservableOnSubscribe() {
@Override
public void subscribe(ObservableEmitter emitter) throws Exception {
try {
Drawable drawable = Drawable.createFromStream(new URL("https://ss2.baidu.com/6ONYsjip0QIZ8tyhnq/it/u=2502144641,437990411&fm=80&w=179&h=119&img.JPEG").openStream(), "src");
emitter.onNext(drawable);
} catch (IOException e) {
emitter.onError(e);
}
}
})
// 指定 subscribe(ObservableEmitter emitter)调用的线程
.subscribeOn(Schedulers.io())
// 指定 onCompleted, onError, onNext回调的线程
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer(){
private Disposable disposable;
@Override
public void onSubscribe(Disposable d) {
disposable = d;
}@Override
public void onNext(Drawable drawable) {
// 接收到Drawable对象,显示在ImageView上
ivLogo.setImageDrawable(drawable);
}@Override
public void onError(Throwable e) {
disposable.dispose();
}@Override
public void onComplete() {}
});
效果图:

文章图片
所以,这段代码就做一件事,在 io 线程加载一张网络图片,加载完毕之后在主线程中显示到ImageView上。
变换
变换的概念不好理解吧?举个简单的栗子,我们对上述示例进行改写。
map操作符
final ImageView ivLogo = (ImageView) findViewById(R.id.ivLogo);
Observable.create(new ObservableOnSubscribe() {@Override
public void subscribe(ObservableEmitter emitter) throws Exception {
emitter.onNext("https://ss2.baidu.com/6ONYsjip0QIZ8tyhnq/it/u=2502144641,437990411&fm=80&w=179&h=119&img.JPEG");
}
}).map(new Function() {
@Override
public Drawable apply(String s) throws Exception {
try {
Drawable drawable = Drawable.createFromStream(new URL(s).openStream(), "src");
return drawable;
} catch (IOException e) {}
return null;
}
})
// 指定 subscribe(ObservableEmitter emitter)调用的线程
.subscribeOn(Schedulers.io())
// 指定 onCompleted, onError, onNext回调的线程
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer() {
private Disposable disposable;
@Override
public void onSubscribe(Disposable d) {
disposable = d;
}@Override
public void onNext(Drawable drawable) {
// 接收到Drawable对象,显示在ImageView上
ivLogo.setImageDrawable(drawable);
}@Override
public void onError(Throwable e) {
disposable.dispose();
}@Override
public void onComplete() {}
});
经过改写代码后,有什么变化呢? Observable 创建了一个 String 事件,也就是产生一个
url
,通过 map
操作符进行变换,返回Drawable对象,这个变换指的就是通过url进行网络图片请求,返回一个Drawable。所以简单的来说就是把String事件,转换为Drawable事件。逻辑表示就是:Observable --> map变换 --> Observable
那么,
Function
是什么呢?与 前面的ActionX
类似,不同的是 Function
有返回值,而 ActionX
没有。为什么需要返回值呢?目的就在于对象的变换,由String对象转换为Drawable对象。1.Function
表示一个无参数的Function。
2.BiFunction
表示一个含有1个参数的Function。
3.Function3
-Function9
表示一个含有2-8参数的Function。
flatMap操作符 不难发现,上述的
map
操作符,是一对一的变换,并且返回的是变换后的对象。而 flatMap
操作符可以适应一对多,并且返回的是一个 Observable 。public void onTest5(View view) {List list = Arrays.asList(1, 2, 3);
Observable.fromIterable(list)
.flatMap(new Function() {
@Override
public ObservableSource apply(Integer integer) throws Exception {
Log.i(TAG, "开始执行,第" + integer + "圆球的任务 |ThreadName=" + Thread.currentThread().getName());
return getObservable(integer);
}
})
.subscribe(new Consumer() {
@Override
public void accept(String s) throws Exception {
Log.i(TAG, "已完成" + s + " |ThreadName=" +Thread.currentThread().getName());
}
});
}public static Observable getObservable(final int integer) {
return Observable.create(new ObservableOnSubscribe() {
@Override
public void subscribe(ObservableEmitter emitter) throws Exception {
emitter.onNext("第" + integer + "圆球的第1个棱形任务");
if(integer != 1) {
// 第2和第3个圆球的第二个任务延时。
Thread.sleep(5 * 1000);
}
emitter.onNext("第" + integer + "圆球的第2个棱形任务");
emitter.onComplete();
}
}).subscribeOn(Schedulers.newThread());
}
运行结果:
Main2Activity: 开始执行,第1圆球的任务 |ThreadName=main
Main2Activity: 开始执行,第2圆球的任务 |ThreadName=main
Main2Activity: 开始执行,第3圆球的任务 |ThreadName=main
Main2Activity: 已完成第1圆球的第1个棱形任务 |ThreadName=RxNewThreadScheduler-1
已完成第1圆球的第2个棱形任务 |ThreadName=RxNewThreadScheduler-1
Main2Activity: 已完成第2圆球的第1个棱形任务 |ThreadName=RxNewThreadScheduler-2
Main2Activity: 已完成第3圆球的第1个棱形任务 |ThreadName=RxNewThreadScheduler-3
Main2Activity: 已完成第3圆球的第2个棱形任务 |ThreadName=RxNewThreadScheduler-3
Main2Activity: 已完成第2圆球的第2个棱形任务 |ThreadName=RxNewThreadScheduler-2
从打印的结果可以看到,FlatMap,首先是从1到3,即从左到右,执行任务的。其中,1、2、3又各自包含2个子任务。但是,很明显,未必就是2先执行完毕才执行3。反而是,2、3的第一个任务先完成,然后,才是2、3的第二个任务完成。
从这个例子,我们得出这样一个结论:FlatMap执行流程是:
先将所有一级任务,平铺成所有二级任务。再依照,从左到右到执行次序,执行任务。但是,任务成功的回调,却不是从左到右的。而是,谁先完成谁先回调,这样,有的任务,可能会因耗时而慢回调。从而导致,先执行,后回调的现象,简言之,即:执行次序是一定的,完成次序是不确定的。
ObservableSource
是Observable
的基类。通过上面的代码可以看出,
map
与 flatMap
这两个操作符的共同点在于,他们都是把一个对象转换为另一个对象,但须注意以下这些特点:flatMap
返回的是一个Observable
对象,而map
返回的是一个普通转换后的对象。flatMap
返回的Observable
对象并不是直接发送到Subscriber
的回调中,而是重新创建一个Observable
对象,并激活这个Observable
对象,使之开始发送事件;而map
变换后返回的对象直接发到Subscriber
回调中。flatMap
变换后产生的每一个Observable
对象发送的事件,最后都汇入同一个Observable
,进而发送给Subscriber
回调。map
返回类型 与flatMap
返回的Observable
事件类型,可以与原来的事件类型一样。- 可以对一个
Observable
多次使用map
和flatMap
。
flatMap
自身强大的功能,这常常被用于 嵌套的异步操作,例如嵌套网络请求。传统的嵌套请求,一般都是在前一个请求的 onSuccess()
回调里面发起新的请求,这样一旦嵌套多个的话,缩进就是大问题了,而且严重的影响代码的可读性。而RxJava嵌套网络请求仍然通过链式结构,保持代码逻辑的清晰!以下为结合
OkHttp3
的网络请求示例:Observable.create(new ObservableOnSubscribe() {@Override
public void subscribe(ObservableEmitter emitter) throws Exception {
Request request = new Request.Builder()
.url("http://api.avatardata.cn/MobilePlace/LookUp?key=ec47b85086be4dc8b5d941f5abd37a4e&mobileNumber=13021671512")
.get()//默认就是GET请求,可以不写
.build();
Call call = new OkHttpClient().newCall(request);
Response response = call.execute();
emitter.onNext(response);
}
})
.flatMap(new Function() {
@Override
public ObservableSource apply(Response response) throws Exception {
if (response.isSuccessful()) {
ResponseBody body = response.body();
if (body != null) {
return Observable.just(body.string());
}
}
return null;
}
})
.observeOn(AndroidSchedulers.mainThread())
.subscribeOn(Schedulers.io())
.subscribe(new Consumer() {
@Override
public void accept(String s) throws Exception {
Toast.makeText(Main2Activity.this,s,Toast.LENGTH_SHORT).show();
}
});
}
效果图:

文章图片
RxJava 其他常用操作符
1.from系列
接收一个集合作为输入,然后每次输出一个元素给subscriber,对应有
fromIterable
,fromArray
。 List list = Arrays.asList(1,2,3,4,5);
Observable.fromIterable(list)
.subscribe(new Consumer() {
@Override
public void accept(Integer number) throws Exception {
Log.i(TAG, "number:" + number);
}
});
注意:如果from里面执行了耗时操作,即使使用了
subscribeOn(Schedulers.io())
,仍然是在主线程执行,可能会造成界面卡顿甚至崩溃,所以耗时操作还是使用Observable.create(…)
。2.just
接收一个可变参数作为输入,最终也是生成数组,调用from系列的
fromArray
,然后每次输出一个元素给subscriber。// Observable.just(T... params),params的个数为1 ~ 10
Observable.just(1, 2, 3, 4, 5)
.subscribe(new Consumer() {
@Override
public void accept(Integer number) throws Exception {
Log.i(TAG, "number:" + number);
}
});
3.filter
条件过滤,去除不符合某些条件的事件。举个栗子:
Observable.fromArray(1, 2, 3, 4, 5)
.filter(new Predicate() {
@Override
public boolean test(Integer integer) throws Exception {
return integer % 2 == 0;
// 偶数返回true,则表示剔除奇数,留下偶数
}
})
.subscribe(new Consumer() {
@Override
public void accept(Integer number) throws Exception {
Log.i(TAG, "number:" + number);
}
});
输出:
01-04 07:17:34.503 4730-4730/com.hgy.rxjavademo I/Main2Activity: number:2
01-04 07:17:34.504 4730-4730/com.hgy.rxjavademo I/Main2Activity: number:4
4.take
最多发送的事件数。
Observable.fromArray(1, 2, 3, 4, 5)
.take(3) // 最多调用onNext三次
.subscribe(new Consumer() {
@Override
public void accept(Integer number) throws Exception {
Log.i(TAG, "number:" + number);
}
});
输出:
number:1
number:2
number:3
5.doOnNext
在处理下一个事件之前要做的事。
Observable.fromArray(1, 2, 3, 4, 5)
.doOnNext(new Consumer() {//
@Override
public void accept(Integer number) throws Exception {
Log.i(TAG, "hahcode = " + number.hashCode() + "");
}
})
.subscribe(new Consumer() {
@Override
public void accept(Integer number) throws Exception {
Log.i(TAG, "number:" + number);
}
});
输出:
hahcode = 1
number:1
hahcode = 2
number:2
hahcode = 3
number:3
hahcode = 4
number:4
hahcode = 5
number:5
调用流程是
onNext-->doOnNext-->subscribe
。6.debounce
防抖。简单的说,就是在特定的时间间隔中只有一个事件发送是有效的,其余的发送都无效,发送完事件后,重新计算时间间隔。debounce可以指定这个时间间隔!
Observable.create(new ObservableOnSubscribe() {
@Override
public void subscribe(ObservableEmitter emitter) throws Exception {
emitter.onNext(1);
// skip
Thread.sleep(400);
emitter.onNext(2);
// deliver
Thread.sleep(505);
emitter.onNext(3);
// skip
Thread.sleep(100);
emitter.onNext(4);
// deliver
Thread.sleep(605);
emitter.onNext(5);
// deliver
Thread.sleep(510);
emitter.onComplete();
}
})
.debounce(500, TimeUnit.MILLISECONDS)// 设置时间为0.5秒
.subscribe(new Consumer(){
@Override
public void accept(Integer number) throws Exception {
Log.i(TAG, "number:" + number);
}
});
输出:
01-04 07:40:45.605 5163-5187/com.hgy.rxjavademo I/Main2Activity: number:2
01-04 07:40:46.214 5163-5187/com.hgy.rxjavademo I/Main2Activity: number:4
01-04 07:40:46.821 5163-5187/com.hgy.rxjavademo I/Main2Activity: number:5
部分在间隔内的
onNext
被忽略了。关键看被观察者这块:
1.
事件1
发送后休眠了400毫秒后就会发送事件2
,小于500毫秒,重新计时。2.
事件2
发出后休眠了505毫秒,超过了500毫秒,所以事件2
被发射成功。3.
事件3
发出后休眠了100毫秒就会发送事件4
,所以事件3
被遗弃。4.
事件4
发出后休眠了605毫秒,超过了500毫秒,所以事件4
被发射成功。5.
事件5
发出后0.5秒内也没有再发出别的事件,所以最终事件5
也被发射成功。类似一个弹簧,如果一个事件相当于挤压它一下的话,它回到初始状态需要一段时间,那如果一直有事件不断的挤压它,那它一直回不到初始状态,就一个事件也弹不出来。一旦有一段时间里面没有人挤压它,他就把最后一个弹出来了。周而复始。
7.merge
用于合并两个Observable为一个Observable。输出顺序不固定。
final String[] aStrings = {"A1", "A2", "A3", "A4"};
final String[] bStrings = {"B1", "B2", "B3"};
final Observable aObservable = Observable.fromArray(aStrings);
final Observable bObservable = Observable.fromArray(bStrings);
Observable.merge(aObservable,bObservable)
.subscribe(new Consumer() {
@Override
public void accept(String s) throws Exception {
Log.i(TAG, s);
}
});
输出:
A1 A2 A3 A4 B1 B2 B3
合并的Observable类型不一定相同,如果不同,则以Serializable表示
final String[] aStrings = {"A1", "A2", "A3", "A4"};
final Observable aObservable = Observable.fromArray(aStrings);
final Observable bObservable = Observable.fromArray(1,2,3);
Observable.merge(aObservable,bObservable)
.subscribe(new Consumer() {
@Override
public void accept(Serializable serializable) throws Exception {
Log.i(TAG, serializable.toString());
}
});
输出:
A1 A2 A3 A4 1 2 3
8.concat
顺序执行
多个Observable,个数为1 ~ 9。注意是顺序执行
,而前面的merge输出顺序不固定。代码和merge类似,就不再贴了。
9.compose
与
flatMap
类似,都是进行变换,返回Observable对象,激活并发送事件。compose
是唯一一个能够从数据流中得到原始Observable
的操作符,所以,那些需要对整个数据流产生作用的操作(比如,subscribeOn()
和observeOn()
)需要使用compose
来实现。相较而言,如果在flatMap()
中使用subscribeOn()
或者observeOn()
,那么它仅仅对在flatMap
中创建的Observable
起作用,而不会对剩下的流产生影响。compose
是对Observable
整体的变换,换句话说,flatMap
转换Observable
里的每一个事件,而compose
转换的是整个Observable
数据流。flatMap
每发送一个事件都创建一个Observable
,所以效率较低。而compose
操作符只在主干数据流上执行操作。
List list = Arrays.asList(1, 2, 3);
Observable.fromIterable(list)
.compose(new MyTransformer());
class MyTransformer implements ObservableTransformer{
@Override
public ObservableSource apply(Observable upstream) {
return upstream
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread());
}
}
如上,我们在
MyTransformer
中封装了常用的subscribeOn
+observeOn
操作,通过 .compose(new MyTransformer())
我们可以让所有的Observable来重用。10.first
只发送符合条件的第一个事件。
11.timer
可以做定时操作,换句话讲,就是延迟执行。事件间隔由timer控制。举个栗子:两秒后输出
“Hello World!”
Observable.timer(2,TimeUnit.MILLISECONDS)
.subscribe(new Observer() {
@Override
public void onSubscribe(Disposable d) {}@Override
public void onNext(Long aLong) {
Log.i(TAG, "Hello World!");
}@Override
public void onError(Throwable e) {}@Override
public void onComplete() {}
});
12.interval
定时的周期性操作,与timer的区别就在于它可以重复操作。事件间隔由interval控制。举个栗子:每隔两秒输出 “Hello World!”, 将前面的代码中
timer
换成interval
,不贴代码了。13.throttleFirst
【Android-0.RxJava2简介】与debounce类似,也是时间间隔太短,就丢弃事件。可以用于防抖操作。
允许设置一个时间长度,之后它会发送固定时间长度内的
第一个事件
,而屏蔽其它事件,在间隔达到设置的时间后,可以再发送下一个事件。Observable.create(new ObservableOnSubscribe() {
@Override
public void subscribe(ObservableEmitter emitter) throws Exception {
emitter.onNext(1);
// skip
Thread.sleep(400);
emitter.onNext(2);
// deliver
Thread.sleep(505);
emitter.onNext(3);
// skip
Thread.sleep(100);
emitter.onNext(4);
// deliver
Thread.sleep(605);
emitter.onNext(5);
// deliver
Thread.sleep(510);
emitter.onComplete();
}
})
.throttleFirst(500, TimeUnit.MILLISECONDS)// 设置时间为0.5秒
.subscribe(new Consumer(){
@Override
public void accept(Integer number) throws Exception {
Log.i(TAG, "number:" + number);
}
});
输出:
01-04 09:38:02.183 5750-5750/com.hgy.rxjavademo I/Main2Activity: number:1
01-04 09:38:03.092 5750-5750/com.hgy.rxjavademo I/Main2Activity: number:3
01-04 09:38:03.802 5750-5750/com.hgy.rxjavademo I/Main2Activity: number:5
和debounce的输出结果可以做个类比。
14.Single
Single与Observable类似,相当于是他的精简版。订阅者回调的不是OnNext/OnError/onCompleted,而是回调OnSuccess/OnError。
Single.create(new SingleOnSubscribe
15.Subject
Subject这个类,既是Observable又是Observer,啥意思呢?就是它自身既是事件的生产者,又是事件的消费者,相当于自身是一条管道,从一端进,又从另一端出。举个栗子:PublishSubject
Subject subject = PublishSubject.create();
// 1.由于Subject是Observable,所以进行订阅
subject.subscribe(new Consumer() {
@Override
public void accept(Object o) throws Exception {
Log.i(TAG, o.toString());
}
});
// 2.由于Subject同时也是Observer,所以可以调用onNext发送数据
subject.onNext("world");
RxJava取消订阅
Disposable
可以通过以下来获取:1.
subscribe
时返回了 Disposable
: Disposable disposable =Observable.just(1,2,3).subscribe(...);
2.从
observer
的 onSubscribe
中获取: .subscribe(new Observer() {
private Disposable disposable;
@Override
public void onSubscribe(Disposable d) {
disposable = d;
}
然后直接调用
disposable.dispose();
取消订阅。但是,如果有很多个数据源,那岂不是要取消很多次?当然不是的,可以利用
CompositeDisposable
, 相当于一个 Disposable
集合。CompositeDisposable list = new CompositeDisposable();
list.add(disposable1);
list.add(disposable2);
// 统一调用,就可以把所有的订阅都取消
list.dispose();
当然,也可以使用第三方库,如
rxlifecycle
总结
- 首先是创建被观察者,可以用Observable的create/just/from等方法来创建。
- 通过filter/debounce等操作符,进行自定义事件过滤。
- 通过Schedules进行事件发送和订阅的线程控制,也就是
subscribeOn()
和observeOn()
。 - 通过map/flatMap/compose等操作符,进行事件的变换。
- 调用subscribe进行事件订阅。
- 最后,不要忘了对订阅者生命周期的控制,不用的时候,记得调用
dispose()
,以免引发内存泄漏。
https://blog.csdn.net/yyh352091626/article/details/53304728
https://gank.io/post/560e15be2dca930e00da1083
https://www.jianshu.com/p/d53463e1c3d6
https://www.cnblogs.com/zhujiabin/p/8183827.html