用RxJava完成轮询请求

最近做一个软件,需要后台服务轮询请求数据,这里我是用RxJava来实现轮询。这里我用金山词霸的接口来做测试。

一般的做法

为什么说这是一般的做法,是因为我们平时一般就是使用嵌套的形式,就是外面是一个轮询,内部嵌套一个网络请求。其中intervalRxJava的一个操作符,我们用interval举一个例子,具体的代码如下:

1
2
3
4
5
6
7
8
Observable
.interval(2,1,TimeUnit.SECONDS)
.subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
Log.d("InternetService","---->"+aLong);
}
});

输出的效果如下:

接着我的网络请求如下,我网络请求用的是RetrofitRxJava来请求,具体的代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
Retrofit retrofit = new Retrofit.Builder()
.baseUrl("http://fy.iciba.com/")
.addConverterFactory(GsonConverterFactory.create())
.addCallAdapterFactory(RxJava2CallAdapterFactory.create())
.build();

GetRequest_Interface request = retrofit.create(GetRequest_Interface.class);

Observable<Translation> observable = request.getCall();

observable
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<Translation>() {
@Override
public void onSubscribe(Disposable d) {
}

@Override
public void onNext(Translation result) {
result.show() ;
}

@Override
public void onError(Throwable e) {
Log.d(TAG, "请求失败");
}

@Override
public void onComplete() {

}
});

通过上面的代码我们需要注意的是我们需要通过addCallAdapterFactory()添加RxJava2CallAdapterFactory,这样就可以让RxJava可以直接跟Retrofit搭配使用,Retrofit的初始化我在之前的文章里面谈过,其中subscribeOn() 指定的是上游发送事件的线程, observeOn() 指定的是下游接收事件的线程,RxJava为我们内置了很多线程选项,具体的选项如下:

  • Schedulers.io() 代表io操作的线程, 通常用于网络,读写文件等io密集型的操作
  • Schedulers.computation() 代表CPU计算密集型的操作, 例如需要大量计算的操作
  • Schedulers.newThread() 代表一个常规的新线程
  • AndroidSchedulers.mainThread() 代表Android的主线程

这些内置的Scheduler已经足够满足我们开发的需求, 因此我们应该使用内置的这些选项, 在RxJava内部使用的是线程池来维护这些线程, 所有效率也比较高.

链式做法

RxJava最大的好处就是逻辑清楚,上面的例子是通过嵌套来实现轮询请求的,所以我们可以借助RxJava的操作符来完成上面同样的效果,这里我们用flatMap这个操作符把long类型转换成我们想要的Observable<T>类型,具体的代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
Observable
.interval(2,5, TimeUnit.SECONDS)
.doOnNext(new Consumer<Long>() {
@Override
public void accept(Long integer) throws Exception {
Log.d("InternetService", "第 " + integer + " 次轮询" );


}
})
.subscribeOn(Schedulers.io())
.flatMap(new Function<Long, ObservableSource<Translation>>() {
@Override
public ObservableSource<Translation> apply(Long aLong) throws Exception {
return request.getCall();
}
})
.subscribeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<Translation>() {
@Override
public void accept(Translation translation) throws Exception {

Log.d("InternetService",translation.getContent().getOut());

}
});

通过上面可以看到flatMap这个操作符把long转换成Observable<Translation>,这个返回的就是转换的类型,同时也可以体现出链式操作。

停止轮询

现在问题解决了但我们还剩下一个问题就是怎么停止轮询,首先链式操作返回的是一个Disposable,然后我们调用disposable.dispose()来断开上游与下游的联系。

本文标题:用RxJava完成轮询请求

文章作者:袁来

发布时间:2018年05月23日 - 22:05

最后更新:2018年06月07日 - 15:06

原始链接:http://yoursite.com/2018/05/23/用RxJava完成轮询请求/

许可协议: 署名-非商业性使用-禁止演绎 4.0 国际 转载请保留原文链接及作者。

显示 Gitment 评论