函数式编程RxJava操作实例
2015-09-12 / modified at 2022-04-04 / 2.3k words / 10 mins
️This article has been over 2 years since the last update.

通过几个实际的例子,加深对RxJava的理解

在阅读本文之前,建议阅读RxJava入门与Lambda表达式入门

处理字符串

用RxJava可以处理很多算法笔试题目,但是它作为第三方的库是不能用的(泪),下面的例子是对字符串的简单操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
//输入一段数字,用空格隔开;然后过滤出小于2的元素;接着进行排序,最后获得了排序后的列表;
Scanner scanner = new Scanner(System.in);//2 2 0 7 3 2 2 4 9 1 4
Observable.from(scanner.nextLine().split("\s+")).map(new Func1<String, Integer>() {
@Override public Integer call(String s) {
return Integer.valueOf(s);
}
}).filter(new Func1<Integer, Boolean>() {
@Override public Boolean call(Integer integer) {
return integer > 2;//然后大于2的元素才能通过,类似于高通滤波器
}
}).toSortedList().subscribe(new Action1<List<Integer>>() {
@Override public void call(List<Integer> integers) {
System.out.println("integers = " + integers.toString());
}
});

如果换成Lambda表达式后,代码量与脚本语言持平

1
2
3
4
5
6
Scanner scanner = new Scanner(System.in);//eg.2 2 0 7 3 2 2 4 9 1 4
Observable.from(scanner.nextLine().split("\s+"))
.map(Integer::valueOf)
.filter(integer -> integer > 2)
.toSortedList()
.subscribe(System.out::println);

当然如果同时学swift可以写成这样

1
2
3
4
5
6
7
8
//swift sample
var s = "2 2 0 7 3 2 2 4 9 1 4"
s.componentsSeparatedByString(" ")
    .map({ (_s) -> Int in
        Int(_s)!
    })
    .filter({$0>2})
    .sort()

RxJava配合Retrofit2.0使用

新的Retrofit2.0简直就是设计模式的教科书典范,同时对Rx的支持也更加友好,本例子为查询ip获取地理信息,并过滤掉失败信息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
//使用Rxjava配合Retrofit解析json数据,注意这里全是电脑运行的,没有分开线程订阅
public static void main(String[] args) throws Exception{
OkHttpClient client = new OkHttpClient();
client.interceptors().add(new LoggingInterceptor());//log for okhttp

Retrofit retrofit = new Retrofit.Builder().baseUrl(IPService.END).client(client)
.addConverterFactory(GsonConverterFactory.create())//对Response进行adapter转换
.addCallAdapterFactory(RxJavaCallAdapterFactory.create())//对转换后的数据进行再包装
.build();

retrofit.create(IPService.class)//动态代理生成class
//直接操作json数据,这里可不是一个好的习惯,真正应该是DTO对象的
.getIPInfo("58.19.239.11")
.filter(jsonObject -> jsonObject.get("code").getAsInt()==0)
//转换数据类型
.map(jsonObject1 -> jsonObject1.get("data"))
//输出结果
.subscribe(System.out::println);
}

//retrofit定义的接口
interface IPService {
String END = "http://ip.taobao.com";
//建议写成dto对象,博主只是为了演示filter就把这里JsonObject了
@GET("/service/getIpInfo.php") Observable<JsonObject> getIPInfo(@Query("ip") String ip);
}

/**
* Retrofit2.0已经把网络部分剥离了,所以需要自己实现Log
*/
static class LoggingInterceptor implements Interceptor {
@Override public Response intercept(Chain chain) throws IOException {
Request request = chain.request();

long t1 = System.nanoTime();
System.out.println(
String.format("Sending request %s on %s%n%s", request.url(), chain.connection(),
request.headers()));

Response response = chain.proceed(request);

long t2 = System.nanoTime();
System.out.println(
String.format("Received response for %s in %.1fms%n%s", response.request().url(),
(t2 - t1) / 1e6d, response.headers()));

return response;
}

完整代码在Gist托管

对遗留代码进行再包装

很多软件公司由于人事项目等原因,留下了很多历史代码,而重构代码肯定是费时费力而且不容易测试的,这时候我们可以再次对代码封装一遍,实现程序的健壮运行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
//使用RxJava对历史代码进行再包装,使它更加健壮,同时不需要重构或者继承SDK
public static void main(String[] args) throws Exception {
Observable.create(new Observable.OnSubscribe<Map>() {
@Override public void call(Subscriber<? super Map> subscriber) {
fuckSDKSample(new BadCallback() {
@Override public void onSuccess(Map<String, String> s) {
subscriber.onNext(s);
subscriber.onCompleted();
}

@Override public void onFail() {
subscriber.onError(new IllegalStateException("fuck sdk failed"));
}
});
}
})//重试3次
.retry((integer, throwable) -> integer < 3 && throwable instanceof IllegalStateException)
//过滤掉不满足的值
.filter(map -> map.get("status").equals("1"))
//转换并获取到接口中真正有用的东西
.map(map1 -> Integer.valueOf((String) map1.get("value")))
.subscribe(System.out::println);
}


/**
* 某个既没有异常回掉,没有注释,返回值模糊不清的大爷代码
*
* @param callback 奇葩回掉
*/
public static void fuckSDKSample(BadCallback callback) {
int rand = new Random().nextInt(2);
System.out.println("rand = " + rand);
//模拟某些遗留代码的Bug,这里可以自行调整概率
if (rand > 1) {
callback.onFail();
} else {
//MagicNumber回掉
Map<String, String> stringMap = new HashMap<>();
stringMap.put("status", "1");
stringMap.put("value", "23333");
//注意这里是非ui线程回调哦
callback.onSuccess(stringMap);
}
}

/**
* 某个奇葩的回掉接口
*/
interface BadCallback {

void onSuccess(Map<String, String> s);

void onFail();
}

最后,我们实现了

  • 不破坏原有代码结构
  • 提高了历史代码异常捕捉能力
  • 通过重试与过滤,一定程度上提高代码运行成功率(前提是没有事务问题)
  • 对返回值进行了转换,可以解决MagicNumber的问题

过滤

过滤器,同电路中的滤波器。

注意filter只会过滤掉对象,如果全部过滤掉的话,会返回empty对象,直接调用了onComplete,而不会抛出异常;而使用single的话会抛出NoSuchElementException的异常。示例如下,在实际使用中,博主更喜欢用single一些。

1
2
3
4
5
6
7
8
9
Observable.just(1,2,3,4)
.filter(integer -> integer > 10)
//现在一个都过滤没了,返回的是一个empty对象,直接执行onComplete
.subscribe(System.out::println);

Observable.just(1,2,3,4)
.single(integer -> integer > 10)
//没有元素直接抛出异常
.subscribe(System.out::println);

迭代

RxJava的迭代不同于常见的C语言迭代,RxJava是通过已经存在的值作为scan函数的参数而转换为另一个值。下面例子是求数列{1,2,3,4…}的和。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
Observable.create(new Observable.OnSubscribe<Integer>() {
@Override public void call(Subscriber<? super Integer> subscriber) {
for (int i = 1; i < 200; i++) {
subscriber.onNext(i);
}
subscriber.onCompleted();
}
}).scan(new Func2<Integer, Integer, Integer>() {
//1 2 3 4 5(before)
//1 3 6 10 15(after)
@Override public Integer call(Integer integer, Integer integer2) {
return integer + integer2;
}
}).subscribe(System.out::println);

抽样(基于时间)

buffer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
Observable.create(subscriber -> {
//产生10个事件
for (int i = 0; i < 10; i++) {
try {
Thread.sleep(990);
subscriber.onNext(String.valueOf(new Random().nextInt(10)));
} catch (InterruptedException e) {
e.printStackTrace();
subscriber.onError(e);
}
}
subscriber.onCompleted();
//每3秒合并一个Integer事件
}).buffer(3, TimeUnit.SECONDS).forEach(System.out::println);

return

1
2
3
4
[3, 9, 6]
[6, 2, 3]
[6, 4, 3]
[4]

sample

code

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
for (int i = 0; i < 10; i++) {
try {
Thread.sleep(990);
subscriber.onNext(String.valueOf(new Random().nextInt(10)));
} catch (InterruptedException e) {
e.printStackTrace();
subscriber.onError(e);
}
}
subscriber.onCompleted();
}
}).doOnNext(s -> {
System.out.println("doOnNext " + s + " at " + System.currentTimeMillis());
}).sample(3, TimeUnit.SECONDS).forEach(each -> {
System.out.println("forEach " + each + " at " + System.currentTimeMillis());
});

return

1
2
3
4
5
6
7
8
9
10
11
12
13
doOnNext 0 at 1440944616333
doOnNext 3 at 1440944617326
doOnNext 7 at 1440944618318
forEach 7 at 1440944618341
doOnNext 9 at 1440944619312
doOnNext 4 at 1440944620304
doOnNext 8 at 1440944621297
forEach 8 at 1440944621327
doOnNext 4 at 1440944622291
doOnNext 9 at 1440944623286
doOnNext 4 at 1440944624279
forEach 4 at 1440944624327
doOnNext 5 at 1440944625273

flatmap的探究

与Map不同,它一般用于一对多,或者多对一。最终Stream流的总长度将会改变

1
2
3
4
5
6
7
8
9
10
11
//[[1,2,3],[4,5,6],[7,8,9]]
List<Integer> a = Arrays.asList(1, 2, 3);
List<Integer> b = Arrays.asList(4, 5, 6);
List<Integer> c = Arrays.asList(7, 8, 9);
List<Integer> out = Stream.of(a,b,c).flatMap(new Function<List<Integer>, Stream<Integer>>() {
@Override
public Stream<Integer> apply(List<Integer> integers) {
return integers.stream();
}
}).collect(Collectors.toList());
//out = [1, 2, 3, 4, 5, 6, 7, 8, 9]

一个简易flatMap的源码实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public class StreamCompat<T> {

Collection<T> origin;

private StreamCompat(Collection<T> origin) {
this.origin = origin;
}

public <R> StreamCompat<R> map(Function<? super T, ? extends R> mapper) {
Collection<R> mapped = new ArrayList<R>();
for (T t : origin) {
mapped.add(mapper.apply(t));
}
return new StreamCompat<R>(mapped);
}

public <R> StreamCompat<R> flatMap(Function<? super T, ? extends StreamCompat<? extends R>> mapper) {
Collection<R> flatted = new ArrayList<R>();
for (T t : origin) {
StreamCompat<? extends R> result = mapper.apply(t);
flatted.addAll(result.origin);
}
return new StreamCompat<R>(flatted);
}
}

Summary

RxJava的确是一种新的开发方式,能够方便的进行测试,以下是个人对RxJava的一些看法

优点:

  • RxJava跨平台很棒,服务器,iOS与Android可以拥有相同的逻辑,复用相同的代码,这点非常棒
  • RxJava可以包装大量历史遗留代码,简化开发流程,提高调试速度
  • RxJava可以按照函数依次进行测试,因为所有功能都是解耦的
  • RxJava是测试维护人员的惊喜,也有可能是维护人员的噩梦(比如Lambda表达式)
  • RxJava实现了jdk7上的Closures(闭包)

不足:

  • RxJava的学习成本比较高,需要国内先导者进行翻译与传教,需要说服开发团队学习使用,目前来看不会的都找不到工作了
  • RxJava的全局try/cathc,还有包装类的存在,担心在性能上有损失(虽然目前还没有看出差别,而且全局try/catch只有在发生异常才有损失)

Refference

  1. http://www.devtf.cn/?p=174
  2. http://stackoverflow.com/questions/22847105/when-do-you-use-map-vs-flatmap-in-rxjava

RxJava2的flatMap操作