RxJava以及扩展的RxAndroid已经越来越火热了,于是我也便在最近的项目中试水了下。当然这方面的文章也有不少了,最为著名的就是朱凯写的 『给 Android 开发者的 RxJava 详解』, 还有就是 Android开发前的RxJava系列文章

我也是从这两篇文章中学习,并在实践中运用,然后得到一点自己的理解。在我理解看来RxAndroid主要有两个部分,一个是其观察者模式,一个是其调度器部分。

首先要在项目中使用RxAndroid,需要现在build.gradle中引入依赖

compile 'io.reactivex:rxjava:1.0.14' 
compile 'io.reactivex:rxandroid:1.0.1'

一、Observable与Observer

RxJava也是用的观察者模式,观察者模式简单的的说来就是观察者和被观察者之间预先建立一种订阅的管理,然后在被观察者发生变化的时候,通知观察者自己发生变化了。Observable 名为 可观察的,就是这里的被观察者,Observer 就是这里的观察者。

先来看看被观察者的创建,被观察者的创建就是根据一些事件源来创建的。比如:

Observable.from()

List<Integer> items = new ArrayList<Integer>();
items.add(1);
items.add(10);
items.add(100);
items.add(200);

Observable<Integer> observableString = Observable.from(items);

这样创建的被观察者对象就会发射4次事件,内容依次是1,10,100,200。当然被观察者创建的形式不止一种。

Observable.just()

Observable<String> observableString = Observable.just(helloWorld());

这样创建的被观察者对象只会发射1次事件。很容易理解不吧。那么我们再来看看观察者。

创建Observer对象就是实现Observer的接口。

Subscription subscriptionPrint = observableString.subscribe(new Observer<Integer>() {
    @Override
    public void onCompleted() {
        System.out.println("Observable completed");
    }

    @Override
    public void onError(Throwable e) {
        System.out.println("Oh,no! Something wrong happened!");
    }

    @Override
    public void onNext(Integer item) {
        System.out.println("Item is " + item);
    }
});

上面的代码就是创建了一个观察者对象,并把它与一开始Observable.from()产生的被观察者之间建立订阅关系。整个列子来看就是:别观察者会发送4次事件,在观察者这边就会得到响应,分别回调 onNext,onNext,onNext,onNext,然后onCompleted。如果有异常情况发生错误就会回调onError。好了以上就是一个简单的 RxJava版本的观察者模式实现。

至于被观察者发射的4次事件,是怎么在在观察者对象中得到的响应的。这个我们先看下被观察者对象的另一个创建方式。

Observable<Integer> observableString = Observable.create(new Observable.OnSubscribe<Integer>() {
        @Override
        public void call(Subscriber<? super Integer> observer) {
            for (int i = 0; i < 5; i++) {
                observer.onNext(i);
            }
            observer.onCompleted();
        }
});

这是被观察者最原始最基本的创建方式,上面列子是创建了一个被观察者对象,发射了5次事件。事件源分别是0,1,2,3,4。所以很容易猜测Observable.from()创建被观察者对象也是用的最原始的 Observable.create()的方式,并在其内部的call方法中依次回调观察者的onNext,onCompleted以及onError方法。

当然讲到这里,就是观察者模式实现。接下来从源码层面看下这内部事件的调用是如何实现的?

其实在每个事件源(被观察者对象)内部有个内部类OnSubscribe的对象,每当被观察者对象被一个Action或者Subscribe对象订阅的时候,都会从观察者对象的onStart()方法开始整个调用链,然后调用内部类OnSubscribe对象的call,这里是真正的操作的地方,可能是异步的网络操作,也可能是异步耗时操作,在执行的同时回调观察者对象的onNext(),onComplete(),onError()的接口。

除了实现了观察者模式,RxJava还提供了复杂的被观察者对象的各种变化,即对被观察者的事件源做各种变换过滤处理。这里讲一个简单的例子。

just()

List<AppInfo> apps = ApplicationsList.getInstance().getList();
AppInfo appOne = apps.get(0);
AppInfo appTwo = apps.get(10);
AppInfo appThree = apps.get(24);
Observable.just(appOne,appTwo,appThree)
            .subscribe(new Observable<AppInfo>() {

                @Override
                public void onCompleted() {

                }

                @Override
                public void onError(Throwable e) {

                }

                @Override
                public void onNext(AppInfo appInfo) {

                }
            });

上面的列子意思是,开始的数据源可能是apps的list,可能有很多,之间使用的话,会发射list.size() 次事件,使用just,标识只发射0,10,24的三次。

类似的变换还有repeat(),defer(),range(),interval(),和timer()。不一一讲了。

除了这种变换,还有事件源过滤。

Observable.from(apps)
        .filter((appInfo) ->
        appInfo.getName().startsWith("C"))
        .subscribe(new Observable<AppInfo>() {
        ....

这里的 .fliter((appInfo -> appInfo.getName().startsWith("C"))是使用了lambda表达式,扩展开来的写法是:

.filter(new Func1<AppInfo,Boolean>(){
    @Override
    public Boolean call(AppInfo appInfo){
        return appInfo.getName().startsWith("C");
    }
})

即将以 C字母开头的过滤出来,发射事件。

除此之外还有 take(),takeLast(),distinct(),last(),skip(),skipLast()等等各种过滤方式。根据自己的使用场景去选择合适的过滤操作。

当然RxJava还提供了一些组合Observable的方法,比如merge(),join(),可以将两个Observable甚至多个Observable组合成一个Observable。

二 调度器 Schedulers

我们都知道有些复杂耗时的操作不应该放在主线程来操作,所以在RxJava中提供了多种调度器来处理这种异步任务的。

Schedulers.io() IO调度器 用来处理复杂的IO操作
Schedulers.computation() 计算工作默认的调度器 用来处理复杂的计算操作
Schedulers.immediate() 这个调度器允许你立即在当前线程执行你指定的工作
Schedulers.newThread() 它为指定任务启动一个新的线程

RxAndroid里面还提供了一种常用的调度器,AndroidSchedulers.mainThread()运行在UI主线程的。

有了这些调度器,我们就可以指定 Observable,Observer的操作执行在哪个调度器中。看这样一个列子。

private Observable<AppInfo> getObservableApps(List<AppInfo> apps) {
    return Observable .create(subscriber -> {
        for (double i = 0; i < 1000000000; i++) {
            double y = i * i;
        }
        for (AppInfo app : apps) {
            subscriber.onNext(app);
        }
        subscriber.onCompleted(); 
    });
}
getObservableApps(apps)
    .subscribeOn(Schedulers.computation())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(new Observer<AppInfo>() {

这个例子就是把getObservableApps的复杂耗时操作放在computation调度器中执行,把观察者的操作放在 主线程操作。即subscribeOn指定被观察者Observable操作执行的调度器,observeOn即指定观察者Observer操作执行的调度器。

讲到这里RxJava实现基本的异步操作就完成了。

三 RxJava配合Retrofit的使用场景

这个使用场景可以参考朱凯的那篇文章 给 Android 开发者的 RxJava 详解

也可参考这篇文章Retrofit and RxJava, Android multi-threaded REST requests

public interface GithubService {  
    String SERVICE_ENDPOINT = "https://api.github.com";

    @GET("/users/{login}")
    Observable<Github> getUser(@Path("login") String login);
}
GithubService service = ServiceFactory.createRetrofitService(GithubService.class, GithubService.SERVICE_ENDPOINT);  
for(String login : Data.githubList) {  
    service.getUser(login)
        .subscribeOn(Schedulers.newThread())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(new Subscriber<Github>() {
            @Override
            public final void onCompleted() {
                // do nothing
            }

            @Override
            public final void onError(Throwable e) {
                Log.e("GithubDemo", e.getMessage());
            }

            @Override
            public final void onNext(Github response) {
                mCardAdapter.addData(response);
            }
        });
}

通过Retrofit2请求返回Gson序列化成一个Github对象事件源的被观察者对象,然后在观察者这得到相应。其中service.getUser就是被观察者的网络操作,放在Schedulers.newThread()调度器中执行,观察者的回调往往会用来更新UI,所以放在主线程调度器中执行。