Managing State With RxJava

这篇文章根据 Jake Wharton 的 Managing State with RxJava 演讲视频里的设计思路完成的。视频中 Jake 讲解了如何使用 RxJava 更优雅的去管理 UI 各种状态的切换,整个视频结合例子,由浅入深。

Why Reactive?

如果不使用 RxJava 除非可以把系统设计成同步模式,否则一个或者多个异步操作混合在一起会使得代码很难去维护。
假如有一个类管理当前登录用户的状态,所有操作都是同步操作

然而当这些操作变成了异步之后,问题就会随之而来,比如更新当前用户到文件系统或者写入数据库,客户端需要请求后端 API 和后端服务器进行交互。在异步过程中会遇到网络问题或者并发操作问题,然而你却得不到任何反馈,无能为力,只能祈求它们是可以正常运行的。

1
2
3
4
5
6
7
8
9
10
interface UserManager {
User getUser();
void setName(String name); // <-- now Async
void setAge(int age); // <-- now Async
}
UserManager um = new UserManager();
System.out.println(um.getUser());
um.setName("Jane Doe");
System.out.println(um.getUser());

所以你给这些异步操作加了回调函数,当操作成功后输出新的用户信息,当操作失败后也执行相应的方法,这基本上解决了上面所说的异步问题。如下面执行了两个异步操作就已经定义了4个回调函数,可想而知如果有更多的异步方法或者回调嵌套,会让人抓狂。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
UserManager um = new UserManager();
System.out.println(um.getUser());
um.setName("Jane Doe",new UserManager.Listener(){
@Override public void success(){
System.out.println(um.getUser());
}
@Override public void failure(IOException e){
//TODO show the error...
}
});
um.setAge(40,new UserManager.Listener(){
@Override public void success(){
System.out.println(um.getUser());
}
@Override public void failure(IOException e){
//TODO show the error...
}
});

现在我们把上面的案例放在 Android 的 Activity 中,我们还需要注意更多的东西,比如回调回来后的线程切换,用户的输入操作,Activity 的生命周期(当屏幕旋转,切换 app ,收到通知,任何其他的 configuration change 都会在没有警告的情况下触发)。但是上面的这些问题正是 RxJava 所擅长的。

Being Reactive

RxJava 把网络请求,数据库操作,生命周期监听分成了互相独立的部分,然后这些单独的部分直接进行交互,去掉了我们的代码在中心直接管理操作一切的职责。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
//onCreate
disposables.add(um.getUser()
.observaOn(AndroidSchedulers.mainThread())
.subscribeWith(new DisposableObserver<User>(){
@Override public void onNext(User user){
tv.setText(user.toString());
}
@Override public void onComplete(){/* ignored */}
@Override public void onError(Throwable t){/* show throwable */}
}));
//button click listener
disposables.add(um.setName("Jane Doe")
.observaOn(AndroidSchedulers.mainThread())
.subscribeWith(new DisposableObserver<User>(){
@Override public void onComplete(){
//success
}
@Override public void onComplete(){/* ignored */}
@Override public void onError(Throwable t){
//retry or show
}
}));
//onDestroy
disposables.dispose();

Managing State


这是一个用户修改昵称的例子,在 EditText 中输入昵称,然后点提交按钮。

1
2
3
4
5
6
7
8
9
10
11
12
disposables.add(RxView.clicks(submitView)
.doOnNext(ignored -> {
submitView.setEnabled(false);
progressView.setVisibility(VISIBLE);
})
.flatMap(ignored -> service.setName(nameView.getText().toString()))
.observeOn(AndroidSchedulers.mainThread())
.doOnNext(ignored -> progressView.setVisibility(GONE))
.subscribe(s -> finish(), t -> {
submitView.setEnabled(true);
Toast.makeText(this,"Failed to set name",LENGTH_SHORT).show;
}));

上面的代码中 doOnNext()flatMap() 跳出了整个流,转向 去操作 UI 和从 UI 中获取数据,然后再回到 RxJava 的流中。还有就是 doOnNext() 只会在成功的时候才会调用,当发生 error 后是不会调用的,如果忘了切换到主线程直接操作 UI 又会报错。也不利于测试,因为已经脱离出整个流和 UI 耦合在了一起。因为向服务器提交的数据不一定是来自于 UI ,响应状态变化的也不应该是 UI ,UI 只是我们把各种状态翻译成能和用户交互的一种方式,状态变化也可以仅仅是一个日志输出。如果我们可以直接把各种状态变化直接传到用户的大脑中,我们可以不需要 UI。

如果我们把需要从 UI 中获取的所有数据(包括点击提交按钮事件和用户输入的数据)打包成一个单独的对象发射出去,我们就不需要跳出流去获取数据了,然后我们回到 subscribe() 方法,现在变成了事件流,事件可以有多个,但是每个事件代表的不是最终的 UI 展示,而是代表了 UI 不同的状态。

现在整个流程变成了一个单向数据流,我们接收来自 UI 的事件进入一个单向流,然后相反方向的数据流把代表不同状态的对象推向 UI。

Reactive State

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public class SubmitUiModel {
public final boolean inProgress;
public final boolean success;
public final String errorMessage;
protected SubmitUiModel(boolean inProgress, boolean success, String errorMessage) {
this.inProgress = inProgress;
this.success = success;
this.errorMessage = errorMessage;
}
public static SubmitUiModel inProgress() {
return new SubmitUiModel(true, false, null);
}
public static SubmitUiModel success() {
return new SubmitUiModel(false, true, null);
}
public static SubmitUiModel failure(String message) {
return new SubmitUiModel(false, false, message);
}
}

所以整个功能改造后的样子如下

1
2
3
4
5
6
7
8
9
10
11
12
Observable<SubmitEvent> events = RxView.clicks(submitView)
.map(ignored -> new SubmitEvent(nameView.getText().toString()));
disposables.add(events.flatMap(event -> service.setName(event.name)
.map(response -> SubmitUiModel.success())
.onErrorReturn(t -> SubmitUiModel.failure(t.getMessage()))
.onBserveOn(AndroidSchedulers.mainThread())
.startWith(SubmitUiModel.inProgress()))
.subscribe(model -> {
submitView.setEnabled(!model.inprogress);
progressView.setVisibility(model.inProgress ? VISIBLE : GONE);
}))

但是又有更复杂的需求来了,那就是当用户输入昵称的时候需要向服务器发起请求验证昵称的可用性,需要满足如下细节:

  1. 如果用户在输入了一个字符后的200毫秒内又输入了字符则取消上次请求
  2. 如果后面的请求已经回来了,则取消前面的请求

思路就是通过 sample 操作符延时 200ms 用switchMap 操作符转换和取消请求,用 merge 操作符把用户输入字符的事件和 submit 的事件合并成一个流,然后使用 typeOf 操作符区分出不同事件分别用对应的 Transformer 进行转换,然后再用 merge 操作符合并成同一个流。

具体代码就不往下贴了,请查看 Demo ManagingStateWithRxJava