️This article has been over 2 years since the last update. 通过几个实际的例子,加深对RxJava的理解
在阅读本文之前,建议阅读RxJava入门与Lambda表达式入门
处理字符串 用RxJava可以处理很多算法笔试题目,但是它作为第三方的库是不能用的(泪),下面的例子是对字符串的简单操作
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 Scanner scanner = new Scanner (System.in);Observable.from(scanner.nextLine().split("\s+" )).map(new Func1 <String, Integer>() { @Override public Integer call (String s) { return Integer.valueOf(s); } }).filter(new Func1 <Integer, Boolean>() { @Override public Boolean call (Integer integer) { return integer > 2 ; } }).toSortedList().subscribe(new Action1 <List<Integer>>() { @Override public void call (List<Integer> integers) { System.out.println("integers = " + integers.toString()); } });
如果换成Lambda表达式后,代码量与脚本语言持平
1 2 3 4 5 6 Scanner scanner = new Scanner(System.in);//eg.2 2 0 7 3 2 2 4 9 1 4 Observable.from(scanner.nextLine().split("\s+")) .map(Integer::valueOf) .filter(integer -> integer > 2) .toSortedList() .subscribe(System.out::println);
当然如果同时学swift可以写成这样
1 2 3 4 5 6 7 8 var s = "2 2 0 7 3 2 2 4 9 1 4" s.componentsSeparatedByString(" " ) .map({ (_s) -> Int in Int (_s)! }) .filter({$0 > 2 }) .sort()
RxJava配合Retrofit2.0使用 新的Retrofit2.0简直就是设计模式的教科书典范,同时对Rx的支持也更加友好,本例子为查询ip获取地理信息,并过滤掉失败信息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 public static void main (String[] args) throws Exception{OkHttpClient client = new OkHttpClient ();client.interceptors().add(new LoggingInterceptor ()); Retrofit retrofit = new Retrofit .Builder().baseUrl(IPService.END).client(client) .addConverterFactory(GsonConverterFactory.create()) .addCallAdapterFactory(RxJavaCallAdapterFactory.create()) .build(); retrofit.create(IPService.class) .getIPInfo("58.19.239.11" ) .filter(jsonObject -> jsonObject.get("code" ).getAsInt()==0 ) .map(jsonObject1 -> jsonObject1.get("data" )) .subscribe(System.out::println); } interface IPService { String END = "http://ip.taobao.com" ; @GET("/service/getIpInfo.php") Observable<JsonObject> getIPInfo (@Query("ip") String ip) ; } static class LoggingInterceptor implements Interceptor {@Override public Response intercept (Chain chain) throws IOException { Request request = chain.request(); long t1 = System.nanoTime(); System.out.println( String.format("Sending request %s on %s%n%s" , request.url(), chain.connection(), request.headers())); Response response = chain.proceed(request); long t2 = System.nanoTime(); System.out.println( String.format("Received response for %s in %.1fms%n%s" , response.request().url(), (t2 - t1) / 1e6d , response.headers())); return response; }
完整代码在Gist 托管
对遗留代码进行再包装 很多软件公司由于人事项目等原因,留下了很多历史代码,而重构代码肯定是费时费力而且不容易测试的,这时候我们可以再次对代码封装一遍,实现程序的健壮运行。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 public static void main (String[] args) throws Exception { Observable.create(new Observable .OnSubscribe<Map>() { @Override public void call (Subscriber<? super Map> subscriber) { fuckSDKSample(new BadCallback () { @Override public void onSuccess (Map<String, String> s) { subscriber.onNext(s); subscriber.onCompleted(); } @Override public void onFail () { subscriber.onError(new IllegalStateException ("fuck sdk failed" )); } }); } }) .retry((integer, throwable) -> integer < 3 && throwable instanceof IllegalStateException) .filter(map -> map.get("status" ).equals("1" )) .map(map1 -> Integer.valueOf((String) map1.get("value" ))) .subscribe(System.out::println); } public static void fuckSDKSample (BadCallback callback) { int rand = new Random ().nextInt(2 ); System.out.println("rand = " + rand); if (rand > 1 ) { callback.onFail(); } else { Map<String, String> stringMap = new HashMap <>(); stringMap.put("status" , "1" ); stringMap.put("value" , "23333" ); callback.onSuccess(stringMap); } } interface BadCallback { void onSuccess (Map<String, String> s) ; void onFail () ; }
最后,我们实现了
不破坏原有代码结构 提高了历史代码异常捕捉能力 通过重试与过滤,一定程度上提高代码运行成功率(前提是没有事务问题) 对返回值进行了转换,可以解决MagicNumber的问题 过滤 过滤器,同电路中的滤波器。
注意filter只会过滤掉对象,如果全部过滤掉的话,会返回empty对象,直接调用了onComplete,而不会抛出异常 ;而使用single的话会抛出NoSuchElementException
的异常。示例如下,在实际使用中,博主更喜欢用single一些。
1 2 3 4 5 6 7 8 9 Observable.just(1 ,2 ,3 ,4 ) .filter(integer -> integer > 10 ) .subscribe(System.out::println); Observable.just(1 ,2 ,3 ,4 ) .single(integer -> integer > 10 ) .subscribe(System.out::println);
迭代 RxJava的迭代不同于常见的C语言迭代,RxJava是通过已经存在的值作为scan函数的参数而转换为另一个值。下面例子是求数列{1,2,3,4…}的和。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 Observable.create(new Observable .OnSubscribe<Integer>() { @Override public void call (Subscriber<? super Integer> subscriber) { for (int i = 1 ; i < 200 ; i++) { subscriber.onNext(i); } subscriber.onCompleted(); } }).scan(new Func2 <Integer, Integer, Integer>() { @Override public Integer call (Integer integer, Integer integer2) { return integer + integer2; } }).subscribe(System.out::println);
抽样(基于时间)
buffer 1 2 3 4 5 6 7 8 9 10 11 12 13 14 Observable.create(subscriber -> { for (int i = 0 ; i < 10 ; i++) { try { Thread.sleep(990 ); subscriber.onNext(String.valueOf(new Random ().nextInt(10 ))); } catch (InterruptedException e) { e.printStackTrace(); subscriber.onError(e); } } subscriber.onCompleted(); }).buffer(3 , TimeUnit.SECONDS).forEach(System.out::println);
return
1 2 3 4 [3, 9, 6] [6, 2, 3] [6, 4, 3] [4]
sample code
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 Observable.create(new Observable.OnSubscribe<String>() { @Override public void call(Subscriber<? super String> subscriber) { for (int i = 0; i < 10; i++) { try { Thread.sleep(990); subscriber.onNext(String.valueOf(new Random().nextInt(10))); } catch (InterruptedException e) { e.printStackTrace(); subscriber.onError(e); } } subscriber.onCompleted(); } }).doOnNext(s -> { System.out.println("doOnNext " + s + " at " + System.currentTimeMillis()); }).sample(3, TimeUnit.SECONDS).forEach(each -> { System.out.println("forEach " + each + " at " + System.currentTimeMillis()); });
return
1 2 3 4 5 6 7 8 9 10 11 12 13 doOnNext 0 at 1440944616333 doOnNext 3 at 1440944617326 doOnNext 7 at 1440944618318 forEach 7 at 1440944618341 doOnNext 9 at 1440944619312 doOnNext 4 at 1440944620304 doOnNext 8 at 1440944621297 forEach 8 at 1440944621327 doOnNext 4 at 1440944622291 doOnNext 9 at 1440944623286 doOnNext 4 at 1440944624279 forEach 4 at 1440944624327 doOnNext 5 at 1440944625273
flatmap的探究 与Map不同,它一般用于一对多,或者多对一。最终Stream流的总长度将会改变
1 2 3 4 5 6 7 8 9 10 11 List<Integer> a = Arrays.asList(1 , 2 , 3 ); List<Integer> b = Arrays.asList(4 , 5 , 6 ); List<Integer> c = Arrays.asList(7 , 8 , 9 ); List<Integer> out = Stream.of(a,b,c).flatMap(new Function <List<Integer>, Stream<Integer>>() { @Override public Stream<Integer> apply (List<Integer> integers) { return integers.stream(); } }).collect(Collectors.toList());
一个简易flatMap的源码实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 public class StreamCompat <T> { Collection<T> origin; private StreamCompat (Collection<T> origin) { this .origin = origin; } public <R> StreamCompat<R> map (Function<? super T, ? extends R> mapper) { Collection<R> mapped = new ArrayList <R>(); for (T t : origin) { mapped.add(mapper.apply(t)); } return new StreamCompat <R>(mapped); } public <R> StreamCompat<R> flatMap (Function<? super T, ? extends StreamCompat<? extends R>> mapper) { Collection<R> flatted = new ArrayList <R>(); for (T t : origin) { StreamCompat<? extends R > result = mapper.apply(t); flatted.addAll(result.origin); } return new StreamCompat <R>(flatted); } }
Summary RxJava的确是一种新的开发方式,能够方便的进行测试,以下是个人对RxJava的一些看法
优点:
RxJava跨平台很棒,服务器,iOS与Android可以拥有相同的逻辑,复用相同的代码,这点非常棒 RxJava可以包装大量历史遗留代码,简化开发流程,提高调试速度 RxJava可以按照函数依次进行测试,因为所有功能都是解耦的 RxJava是测试维护人员的惊喜,也有可能是维护人员的噩梦(比如Lambda表达式) RxJava实现了jdk7上的Closures(闭包) 不足:
RxJava的学习成本比较高,需要国内先导者进行翻译与传教,需要说服开发团队学习使用,目前来看不会的都找不到工作了 RxJava的全局try/cathc,还有包装类的存在,担心在性能上有损失(虽然目前还没有看出差别,而且全局try/catch只有在发生异常才有损失) Refference http://www.devtf.cn/?p=174 http://stackoverflow.com/questions/22847105/when-do-you-use-map-vs-flatmap-in-rxjava RxJava2的flatMap操作