码迷,mamicode.com
首页 > 编程语言 > 详细

RxJava 变换操作符 map flatMap concatMap buffer

时间:2018-09-21 23:09:23      阅读:216      评论:0      收藏:0      [点我收藏+]

标签:next   完全   lov   complete   模拟   ids   z-index   使用   回调   


常用的变换操作符

  • map:【数据类型转换将被观察者发送的事件转换为另一种类型事件
  • flatMap:【化解循环嵌套和接口嵌套将被观察者发送的事件序列进行拆分 & 转换 后合并成一个新的事件序列,最后再进行发送
  • concatMap:【有序】与 flatMap 的 区别在于,拆分 & 重新合并生成的事件序列 的顺序与被观察者旧序列生产的顺序一致
  • flatMapIterable:相当于对 flatMap 的数据进行了二次扁平化
  • buffer:定期从被观察者发送的事件中获取一定数量的事件并放到缓存区中,然后把这些数据集合打包发射

map

Observable.just(new Date()) // Date 类型
      .map(Date::getTime) // long 类型
      .map(time -> time + 1000 * 60 * 60)// 改变 long 类型时间的值
      .map(time -> new SimpleDateFormat("HH:mm:ss", Locale.getDefault()).format(new Date(time))) //String 类型
      .subscribe(this::log);
5
5
 
1
Observable.just(new Date()) // Date 类型
2
      .map(Date::getTime) // long 类型
3
      .map(time -> time + 1000 * 60 * 60)// 改变 long 类型时间的值
4
      .map(time -> new SimpleDateFormat("HH:mm:ss", Locale.getDefault()).format(new Date(time))) //String 类型
5
      .subscribe(this::log);

flatMap concatMap flatMapIterable

基础用法:化解循环嵌套

flatMap 使用一个指定的函数对原始 Observable 发射的每一项数据之行相应的变换操作,这个函数返回一个本身也发射数据的 Observable,然后 flatMap 合并这些 Observables 发射的数据,最后将合并后的结果当做它自己的数据序列发射。
Observable.just(new Person(Arrays.asList("篮球", "足球", "排球")), new Person(Arrays.asList("画画", "跳舞")))
      .map(person -> person.loves)
      .flatMap(Observable::fromIterable) //fromIterable:逐个发送集合中的元素
      .subscribe(this::log);
4
4
 
1
Observable.just(new Person(Arrays.asList("篮球", "足球", "排球")), new Person(Arrays.asList("画画", "跳舞")))
2
      .map(person -> person.loves)
3
      .flatMap(Observable::fromIterable) //fromIterable:逐个发送集合中的元素
4
      .subscribe(this::log);
篮球,22:56:43 009,true
足球,22:56:43 010,true
排球,22:56:43 010,true
画画,22:56:43 011,true
跳舞,22:56:43 012,true
5
 
1
篮球,22:56:43 009,true
2
足球,22:56:43 010,true
3
排球,22:56:43 010,true
4
画画,22:56:43 011,true
5
跳舞,22:56:43 012,true
flatMap() 执行的过程:
  • 使用传入的事件对象创建一个 Observable 对象;
  • 并不发送这个 Observable,而是将它激活,于是它开始发送事件;
  • 每一个创建出来的 Observable 发送的事件,都被汇入同一个 Observable,而这个 Observable 负责将这些事件统一交给 Observer 的回调方法。

注意:如果任何一个通过这个 flatMap 操作产生的单独的 Observable 调用 onError 异常终止了,这个 Observable 自身会立即调用 onError 并终止。例如:
Observable.just(new Person(Arrays.asList("篮球", null, "排球")), new Person(Arrays.asList("画画", "跳舞")))
      .map(person -> person.loves)
      .flatMap(Observable::fromIterable) //fromIterable:逐个发送集合中的元素
      .subscribe(this::log, e -> log("onError:" + e.getMessage()), () -> log("onComplete"));
4
4
 
1
Observable.just(new Person(Arrays.asList("篮球", null, "排球")), new Person(Arrays.asList("画画", "跳舞")))
2
      .map(person -> person.loves)
3
      .flatMap(Observable::fromIterable) //fromIterable:逐个发送集合中的元素
4
      .subscribe(this::log, e -> log("onError:" + e.getMessage()), () -> log("onComplete"));
篮球,00:20:14 762,true
onError:The iterator returned a null value,00:20:14 767,true
2
 
1
篮球,00:20:14 762,true
2
onError:The iterator returned a null value,00:20:14 767,true

flatMap 和 concatMap

concatMap 操作符的功能和 flatMap 非常相似,只不过经过 flatMap 操作变换后,最后输出的序列有可能是交错的(flatMap最后合并结果采用的是 merge 操作符),而 concatMap 最终输出的数据序列和原数据序列是一致的。
long start = System.currentTimeMillis();
Observable.just(Arrays.asList(1, 2, 3), Arrays.asList(4, 5))
      .flatMap(list -> Observable.fromIterable(list).delay(list.size(), TimeUnit.SECONDS))//flatMap是无序的
      .subscribe((s -> log("f:" + s)), e -> log("f"), () -> log("f耗时" + (System.currentTimeMillis() - start))); //3秒
Observable.just(Arrays.asList("A", "B", "C"), Arrays.asList("D", "E"))
      .concatMap(list -> Observable.fromIterable(list).delay(list.size(), TimeUnit.SECONDS))//concatMap是有序的
      .subscribe(s -> log("c:" + s), e -> log("c"), () -> log("c耗时" + (System.currentTimeMillis() - start))); //5秒
7
7
 
1
long start = System.currentTimeMillis();
2
Observable.just(Arrays.asList(1, 2, 3), Arrays.asList(4, 5))
3
      .flatMap(list -> Observable.fromIterable(list).delay(list.size(), TimeUnit.SECONDS))//flatMap是无序的
4
      .subscribe((s -> log("f:" + s)), e -> log("f"), () -> log("f耗时" + (System.currentTimeMillis() - start))); //3秒
5
Observable.just(Arrays.asList("A", "B", "C"), Arrays.asList("D", "E"))
6
      .concatMap(list -> Observable.fromIterable(list).delay(list.size(), TimeUnit.SECONDS))//concatMap是有序的
7
      .subscribe(s -> log("c:" + s), e -> log("c"), () -> log("c耗时" + (System.currentTimeMillis() - start))); //5秒
f:4,23:21:07 944,false   //flatMap后,订阅者首先接收到的事件是【4】而不是【1】
f:5,23:21:07 945,false
f:1,23:21:08 942,false
f:2,23:21:08 943,false
f:3,23:21:08 943,false
f耗时3025,23:21:08 945,false    //flatMap耗时3秒

c:A,23:21:08 949,false   //concatMap后,订阅者首先接收到的事件是【1】
c:B,23:21:08 950,false
c:C,23:21:08 950,false
c:D,23:21:10 953,false
c:E,23:21:10 953,false
c耗时5034,23:21:10 954,false    //concatMap耗时5秒
13
 
1
f:4,23:21:07 944,false   //flatMap后,订阅者首先接收到的事件是【4】而不是【1】
2
f:5,23:21:07 945,false
3
f:1,23:21:08 942,false
4
f:2,23:21:08 943,false
5
f:3,23:21:08 943,false
6
f耗时3025,23:21:08 945,false    //flatMap耗时3秒
7
8
c:A,23:21:08 949,false   //concatMap后,订阅者首先接收到的事件是【1】
9
c:B,23:21:08 950,false
10
c:C,23:21:08 950,false
11
c:D,23:21:10 953,false
12
c:E,23:21:10 953,false
13
c耗时5034,23:21:10 954,false    //concatMap耗时5秒

扩展用法:化解接口嵌套

可以利用 flatMap 操作符实现网络请求依次依赖,即:第一个接口的返回值包含第二个接口请求需要用到的数据。

首先是两个请求网络的操作:
private Observable<String> firstRequest(String parameter) {
   return Observable.create(emitter -> {
      SystemClock.sleep(2000);//模拟网络请求
      emitter.onNext(parameter + ",第一次修改:" + FORMAT.format(new Date(System.currentTimeMillis())));
      emitter.onComplete();
   });
}
7
7
 
1
private Observable<String> firstRequest(String parameter) {
2
   return Observable.create(emitter -> {
3
      SystemClock.sleep(2000);//模拟网络请求
4
      emitter.onNext(parameter + ",第一次修改:" + FORMAT.format(new Date(System.currentTimeMillis())));
5
      emitter.onComplete();
6
   });
7
}
private Observable<String> secondRequest(String parameter) {
   return Observable.create(emitter -> {
      SystemClock.sleep(3000);//模拟网络请求
      emitter.onNext(parameter + ",第二次修改:" + FORMAT.format(new Date(System.currentTimeMillis())));
      emitter.onComplete();
   });
}
7
7
 
1
private Observable<String> secondRequest(String parameter) {
2
   return Observable.create(emitter -> {
3
      SystemClock.sleep(3000);//模拟网络请求
4
      emitter.onNext(parameter + ",第二次修改:" + FORMAT.format(new Date(System.currentTimeMillis())));
5
      emitter.onComplete();
6
   });
7
}
然后可以通过 flatMap 将两者串联起来:
firstRequest("原始值:" + FORMAT.format(new Date(System.currentTimeMillis())))
      .subscribeOn(Schedulers.io()) // 在io线程进行网络请求
      .observeOn(AndroidSchedulers.mainThread()) // 在主线程处理请求结果
      .doOnNext(response -> log("【第一个网络请求结束,响应为】" + response))//true
      .observeOn(Schedulers.io()) // 回到 io 线程去处理下一个网络请求
      .flatMap(this::secondRequest)//实现多个网络请求依次依赖
      .observeOn(AndroidSchedulers.mainThread()) // 在主线程处理请求结果
      .subscribe(string -> log("【第二个网络请求结束,响应为】" + string));//true,5 秒
8
8
 
1
firstRequest("原始值:" + FORMAT.format(new Date(System.currentTimeMillis())))
2
      .subscribeOn(Schedulers.io()) // 在io线程进行网络请求
3
      .observeOn(AndroidSchedulers.mainThread()) // 在主线程处理请求结果
4
      .doOnNext(response -> log("【第一个网络请求结束,响应为】" + response))//true
5
      .observeOn(Schedulers.io()) // 回到 io 线程去处理下一个网络请求
6
      .flatMap(this::secondRequest)//实现多个网络请求依次依赖
7
      .observeOn(AndroidSchedulers.mainThread()) // 在主线程处理请求结果
8
      .subscribe(string -> log("【第二个网络请求结束,响应为】" + string));//true,5 秒
打印结果为:
【第一个网络请求结束,响应为】原始值:23:58:11 220,第一次修改:23:58:13 245,true
【第二个网络请求结束,响应为】原始值:23:58:11 220,第一次修改:23:58:13 245,第二次修改:23:58:16 256,true
2
 
1
【第一个网络请求结束,响应为】原始值:23:58:11 220,第一次修改:23:58:13 245,true
2
【第二个网络请求结束,响应为】原始值:23:58:11 220,第一次修改:23:58:13 245,第二次修改:23:58:16 256,true

简化形式的Demo代码为:
Observable.just("包青天").delay(1000, TimeUnit.MILLISECONDS) //第一个网络请求,返回姓名
      .flatMap(s -> Observable.just(s + ",男").delay(1000, TimeUnit.MILLISECONDS)) //第二个网络请求,返回性别
      .flatMap(s -> Observable.just(s + ",28岁").delay(1000, TimeUnit.MILLISECONDS)) //第三个网络请求,返回年龄
      .subscribeOn(Schedulers.io())
      .observeOn(AndroidSchedulers.mainThread())
      .subscribe(this::log); //包青天,男,28岁,耗时:3058毫秒,true
6
6
 
1
Observable.just("包青天").delay(1000, TimeUnit.MILLISECONDS) //第一个网络请求,返回姓名
2
      .flatMap(s -> Observable.just(s + ",男").delay(1000, TimeUnit.MILLISECONDS)) //第二个网络请求,返回性别
3
      .flatMap(s -> Observable.just(s + ",28岁").delay(1000, TimeUnit.MILLISECONDS)) //第三个网络请求,返回年龄
4
      .subscribeOn(Schedulers.io())
5
      .observeOn(AndroidSchedulers.mainThread())
6
      .subscribe(this::log); //包青天,男,28岁,耗时:3058毫秒,true

当然这种情况下使用 concatMap 的效果也是完全一样的,然而因为 concatMap 的核心是用来保证在合并时"有序"的,而这两种情况根本就没涉及到合并,所以这些情况下使用 concatMap是没有任何意义的。

flatMap 和 flatMapIterable

flatMapIterable 与 flatMap 在流程上大体都相同,唯一不同的是,flatMap 是将一个 Observable 转换成多个 Observables,每一个Observable 最后又得返回一个 Observable。而 flatMapIterable 在将一个 Observable 转换成多个 Observables 后,每一个 Observable 只能返回一个 Iterable 而不是另一个 Observable。

案例1:
Observable.just(Arrays.asList("篮球1", "足球1"))
      .flatMap(Observable::fromIterable) //返回一个 Observable
      .subscribe(string -> log("" + string));
Observable.just(Arrays.asList("篮球2", "足球2"))
      .flatMapIterable(list -> list) //返回一个 Iterable 而不是另一个 Observable
      .subscribe(string -> log("" + string));
Observable.fromIterable(Arrays.asList("篮球3", "足球3")) //和上面两种方式的结果一样
      .subscribe(string -> log("" + string));
8
8
 
1
Observable.just(Arrays.asList("篮球1", "足球1"))
2
      .flatMap(Observable::fromIterable) //返回一个 Observable
3
      .subscribe(string -> log("" + string));
4
Observable.just(Arrays.asList("篮球2", "足球2"))
5
      .flatMapIterable(list -> list) //返回一个 Iterable 而不是另一个 Observable
6
      .subscribe(string -> log("" + string));
7
Observable.fromIterable(Arrays.asList("篮球3", "足球3")) //和上面两种方式的结果一样
8
      .subscribe(string -> log("" + string));
篮球1,01:00:39 493,true
足球1,01:00:39 494,true
篮球2,01:00:39 496,true
足球2,01:00:39 496,true
篮球3,01:00:39 499,true
足球3,01:00:39 499,true
6
 
1
篮球1,01:00:39 493,true
2
足球1,01:00:39 494,true
3
篮球2,01:00:39 496,true
4
足球2,01:00:39 496,true
5
篮球3,01:00:39 499,true
6
足球3,01:00:39 499,true

案例2:
Observable.just(new Person(Arrays.asList("包青天", "哈哈")), new Person(Arrays.asList("白乾涛", "你好")))
      .map(person -> person.loves)
      .flatMap(Observable::fromIterable) //返回一个 Observable
      .flatMap(string -> Observable.fromArray(string.toCharArray())) //返回一个 Observable
      .subscribe(array -> log(Arrays.toString(array)));
Observable.just(new Person(Arrays.asList("广州", "上海")), new Person(Arrays.asList("武汉", "长沙")))
      .map(person -> person.loves)
      .flatMap(Observable::fromIterable) //返回一个 Observable
      .flatMapIterable(string -> Arrays.asList(string.toCharArray())) //返回一个 Iterable 而不是另一个 Observable
      .subscribe(array -> log(Arrays.toString(array)));
10
10
 
1
Observable.just(new Person(Arrays.asList("包青天", "哈哈")), new Person(Arrays.asList("白乾涛", "你好")))
2
      .map(person -> person.loves)
3
      .flatMap(Observable::fromIterable) //返回一个 Observable
4
      .flatMap(string -> Observable.fromArray(string.toCharArray())) //返回一个 Observable
5
      .subscribe(array -> log(Arrays.toString(array)));
6
Observable.just(new Person(Arrays.asList("广州", "上海")), new Person(Arrays.asList("武汉", "长沙")))
7
      .map(person -> person.loves)
8
      .flatMap(Observable::fromIterable) //返回一个 Observable
9
      .flatMapIterable(string -> Arrays.asList(string.toCharArray())) //返回一个 Iterable 而不是另一个 Observable
10
      .subscribe(array -> log(Arrays.toString(array)));
[包, 青, 天],01:23:27 376,true
[哈, 哈],01:23:27 376,true
[白, 乾, 涛],01:23:27 377,true
[你, 好],01:23:27 377,true
[广, 州],01:23:27 380,true
[上, 海],01:23:27 380,true
[武, 汉],01:23:27 381,true
[长, 沙],01:23:27 382,true
8
 
1
[, , ],01:23:27 376,true
2
[, ],01:23:27 376,true
3
[, , ],01:23:27 377,true
4
[, ],01:23:27 377,true
5
[广, ],01:23:27 380,true
6
[, ],01:23:27 380,true
7
[, ],01:23:27 381,true
8
[, ],01:23:27 382,true

buffer

定期从被观察者发送的事件中获取一定数量的事件并放到缓存区中,然后把这些数据集合打包发射
技术分享图片
请注意,如果源ObservableSource发出onError通知,则事件会立即传递而不是首先发到缓冲区[without first emitting the buffer it is in the process of assembling.]。

buffer(count)

每接收到 count 个数据包裹,将这 count 个包裹打包,发送给订阅者
一次订阅2个:
Observable.range(1, 5)
    .buffer(2) //缓存区大小,步长==缓存区大小,等价于buffer(count, count)
    .subscribe(list -> log(list.toString()), t -> log(""), () -> log("完成")); //[1, 2],[3, 4],[5],完成
1
Observable.range(1, 5)
2
    .buffer(2) //缓存区大小,步长==缓存区大小,等价于buffer(count, count)
3
    .subscribe(list -> log(list.toString()), t -> log(""), () -> log("完成")); //[1, 2],[3, 4],[5],完成
一次全部订阅(将所有元素组装到集合中的效果):
Observable.range(1, 10).buffer(10).subscribe(list -> log(list.toString())); //[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
x
1
Observable.range(1, 10).buffer(10).subscribe(list -> log(list.toString())); //[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

buffer(count, skip)

生成的ObservableSource每隔 skip 项就会 emits buffers,每个 buffers 都包含 count 个 items。
队列效果(先进先出):
Observable.range(1, 5)
      .buffer(3, 1) // 缓存区大小,步长(每次获取新事件的数量)
      .subscribe(list -> log(list.toString()));//[1, 2, 3],[2, 3, 4],[3, 4, 5],[4, 5],[5]
 
1
Observable.range(1, 5)
2
      .buffer(3, 1) // 缓存区大小,步长(每次获取新事件的数量)
3
      .subscribe(list -> log(list.toString()));//[1, 2, 3],[2, 3, 4],[3, 4, 5],[4, 5],[5]
每次剔除一个效果:
Observable.range(1, 5).buffer(5, 1)
    .subscribe(list -> log(list.toString()));//[1, 2, 3, 4, 5],[2, 3, 4, 5],[3, 4, 5],[4, 5],[5]
x
1
Observable.range(1, 5).buffer(5, 1)
2
    .subscribe(list -> log(list.toString()));//[1, 2, 3, 4, 5],[2, 3, 4, 5],[3, 4, 5],[4, 5],[5]
只取奇数个效果
Observable.range(1, 5).buffer(1, 2).subscribe(list -> log(list.toString()));//[1],[3],[5]
x
1
Observable.range(1, 5).buffer(1, 2).subscribe(list -> log(list.toString()));//[1],[3],[5]

buffer(timespan, unit)

持续收集直到指定的每隔时间后,然后发射一次并清空缓存区。
周期性订阅多个结果:
Observable.create(emitter -> {
   for (int i = 0; i < 8; i++) {
      SystemClock.sleep(100);//模拟耗时操作
      emitter.onNext(i);
   }
}).buffer(250, TimeUnit.MICROSECONDS) //等价于 count = Integer.MAX_VALUE
      .subscribe(list -> log("缓存区中事件:" + list.toString())); //[0, 1],[2, 3],[4, 5, 6],[7]
x
 
1
Observable.create(emitter -> {
2
   for (int i = 0; i < 8; i++) {
3
      SystemClock.sleep(100);//模拟耗时操作
4
      emitter.onNext(i);
5
   }
6
}).buffer(250, TimeUnit.MICROSECONDS) //等价于 count = Integer.MAX_VALUE
7
      .subscribe(list -> log("缓存区中事件:" + list.toString())); //[0, 1],[2, 3],[4, 5, 6],[7]

buffer(timespan, unit, count)

当达到指定时间【或】缓冲区中达到指定数量时发射
Observable.create(emitter -> {
   for (int i = 0; i < 8; i++) {
      SystemClock.sleep(100);//每个对象均延迟后再单独发出去
      emitter.onNext(i);
   }
}).buffer(250, TimeUnit.MICROSECONDS, 2) //可以指定工作所在的线程
      .subscribe(list -> log("缓存区中事件:" + list.toString())); //[0, 1],[],[2, 3],[],[4, 5],[6],[7]
x
1
Observable.create(emitter -> {
2
   for (int i = 0; i < 8; i++) {
3
      SystemClock.sleep(100);//每个对象均延迟后再单独发出去
4
      emitter.onNext(i);
5
   }
6
}).buffer(250, TimeUnit.MICROSECONDS, 2) //可以指定工作所在的线程
7
      .subscribe(list -> log("缓存区中事件:" + list.toString())); //[0, 1],[],[2, 3],[],[4, 5],[6],[7]

完整测试案例

public class TransformOperatorActivity extends ListActivity {
	private static Format FORMAT = new SimpleDateFormat("HH:mm:ss SSS", Locale.getDefault());
	
	protected void onCreate(Bundle savedInstanceState) {
		super.onCreate(savedInstanceState);
		String[] array = {"0、map",
				"1、flatMap 基础用法",
				"2、flatMap和concatMap的区别",
				"3、flatMap 实现多个网络请求依次依赖",
				"4、flatMap 实现多个网络请求依次依赖简化代码",
				"5、flatMapIterable 案例1",
				"6、flatMapIterable 案例2",
				"7、buffer(int count)",
				"8、buffer(count, skip)",
				"9、buffer(timespan, unit, count)",
		};
		setListAdapter(new ArrayAdapter<>(this, android.R.layout.simple_list_item_1, Arrays.asList(array)));
	}
	
	private int i;
	
	@Override
	protected void onListItemClick(ListView l, View v, int position, long id) {
		i++;
		switch (position) {
			case 0:
				Observable.just(new Date()) // Date 类型
						.map(Date::getTime) // long 类型
						.map(time -> time + 1000 * 60 * 60)// 改变 long 类型时间的值
						.map(time -> new SimpleDateFormat("HH:mm:ss", Locale.getDefault()).format(new Date(time))) //String 类型
						.subscribe(this::log);
				break;
			case 1:
				Observable.just(new Person(Arrays.asList("篮球", "足球", "排球")), new Person(Arrays.asList("画画", "跳舞")))
						.map(person -> person.loves)
						.flatMap(Observable::fromIterable) //fromIterable:逐个发送集合中的元素
						.subscribe(this::log);
				break;
			case 2:
				long start = System.currentTimeMillis();
				Observable.just(Arrays.asList(1, 2, 3), Arrays.asList(4, 5))
						.flatMap(list -> Observable.fromIterable(list).delay(list.size(), TimeUnit.SECONDS))//flatMap是无序的
						.subscribe((s -> log("f:" + s)), e -> log("f"), () -> log("f耗时" + (System.currentTimeMillis() - start))); //3秒
				Observable.just(Arrays.asList("A", "B", "C"), Arrays.asList("D", "E"))
						.concatMap(list -> Observable.fromIterable(list).delay(list.size(), TimeUnit.SECONDS))//concatMap是有序的
						.subscribe(s -> log("c:" + s), e -> log("c"), () -> log("c耗时" + (System.currentTimeMillis() - start))); //5秒
				break;
			case 3:
				firstRequest("原始值:" + FORMAT.format(new Date(System.currentTimeMillis())))
						.subscribeOn(Schedulers.io()) // 在io线程进行网络请求
						.observeOn(AndroidSchedulers.mainThread()) // 在主线程处理请求结果
						.doOnNext(response -> log("【第一个网络请求结束,响应为】" + response))//true
						.observeOn(Schedulers.io()) // 回到 io 线程去处理下一个网络请求
						.flatMap(this::secondRequest)//实现多个网络请求依次依赖
						.observeOn(AndroidSchedulers.mainThread()) // 在主线程处理请求结果
						.subscribe(string -> log("【第二个网络请求结束,响应为】" + string));//true,5 秒
				break;
			case 4:
				Observable.just("包青天").delay(1000, TimeUnit.MILLISECONDS) //第一个网络请求,返回姓名
						.flatMap(s -> Observable.just(s + ",男").delay(1000, TimeUnit.MILLISECONDS)) //第二个网络请求,返回性别
						.flatMap(s -> Observable.just(s + ",28岁").delay(1000, TimeUnit.MILLISECONDS)) //第三个网络请求,返回年龄
						.subscribeOn(Schedulers.io())
						.observeOn(AndroidSchedulers.mainThread())
						.subscribe(this::log); //包青天,男,28岁,耗时:3058毫秒,true
				break;
			case 5:
				Observable.just(Arrays.asList("篮球1", "足球1"))
						.flatMap(Observable::fromIterable) //返回一个 Observable
						.subscribe(string -> log("" + string));
				Observable.just(Arrays.asList("篮球2", "足球2"))
						.flatMapIterable(list -> list) //返回一个 Iterable 而不是另一个 Observable
						.subscribe(string -> log("" + string));
				Observable.fromIterable(Arrays.asList("篮球3", "足球3")) //和上面两种方式的结果一样
						.subscribe(string -> log("" + string));
				break;
			case 6:
				Observable.just(new Person(Arrays.asList("包青天", "哈哈")), new Person(Arrays.asList("白乾涛", "你好")))
						.map(person -> person.loves)
						.flatMap(Observable::fromIterable) //返回一个 Observable
						.flatMap(string -> Observable.fromArray(string.toCharArray())) //返回一个 Observable
						.subscribe(array -> log(Arrays.toString(array)));
				Observable.just(new Person(Arrays.asList("广州", "上海")), new Person(Arrays.asList("武汉", "长沙")))
						.map(person -> person.loves)
						.flatMap(Observable::fromIterable) //返回一个 Observable
						.flatMapIterable(string -> Arrays.asList(string.toCharArray())) //返回一个 Iterable 而不是另一个 Observable
						.subscribe(array -> log(Arrays.toString(array)));
				break;
			case 7:
				if (i % 2 == 0) {
					Observable.range(1, 5).buffer(2)  //缓存区大小,步长==缓存区大小,等价于buffer(count, count)
							.subscribe(list -> log(list.toString()), t -> log(""), () -> log("完成")); //[1, 2],[3, 4],[5],完成
				} else {
					Observable.range(1, 10).buffer(10)  //将所有元素组装到集合中的效果
							.subscribe(list -> log(list.toString())); //[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
				}
				break;
			case 8:
				if (i % 3 == 0) {
					Observable.range(1, 5).buffer(3, 1) // 缓存区大小,步长;队列效果(先进先出)
							.subscribe(list -> log(list.toString()));//[1, 2, 3],[2, 3, 4],[3, 4, 5],[4, 5],[5]
				} else if (i % 3 == 1) {
					Observable.range(1, 5).buffer(5, 1) //每次剔除一个效果
							.subscribe(list -> log(list.toString()));//[1, 2, 3, 4, 5],[2, 3, 4, 5],[3, 4, 5],[4, 5],[5]
				} else {
					Observable.range(1, 5).buffer(1, 2) //只取奇数个效果
							.subscribe(list -> log(list.toString()));//[1],[3],[5]
				}
				break;
			case 9:
				Observable<Integer> observable = Observable.create(emitter -> {
					for (int i = 0; i < 8; i++) {
						SystemClock.sleep(100);//模拟耗时操作
						emitter.onNext(i);
					}
					emitter.onComplete();
				});
				if (i % 3 == 0) { //周期性订阅多个结果:
					observable.buffer(250, TimeUnit.MILLISECONDS) //等价于 count = Integer.MAX_VALUE
							.subscribe(list -> log("缓存区中事件:" + list.toString())); //[0, 1],[2, 3],[4, 5, 6],[7]
				} else { //当达到指定时间【或】缓冲区中达到指定数量时发射
					observable.buffer(250, TimeUnit.MILLISECONDS, 2) //可以指定工作所在的线程
							.subscribe(list -> log("缓存区中事件:" + list.toString())); //[0, 1],[],[2, 3],[],[4, 5],[6],[7]
				}
				break;
		}
	}
	
	private Observable<String> firstRequest(String parameter) {
		return Observable.create(emitter -> {
			SystemClock.sleep(2000);//模拟网络请求
			emitter.onNext(parameter + ",第一次修改:" + FORMAT.format(new Date(System.currentTimeMillis())));
			emitter.onComplete();
		});
	}
	
	private Observable<String> secondRequest(String parameter) {
		return Observable.create(emitter -> {
			SystemClock.sleep(3000);//模拟网络请求
			emitter.onNext(parameter + ",第二次修改:" + FORMAT.format(new Date(System.currentTimeMillis())));
			emitter.onComplete();
		});
	}
	
	private void log(String s) {
		String date = new SimpleDateFormat("HH:mm:ss SSS", Locale.getDefault()).format(new Date());
		Log.i("【bqt】", s + "," + date + "," + (Looper.myLooper() == Looper.getMainLooper()));
	}
}
148
 
1
public class TransformOperatorActivity extends ListActivity {
2
    private static Format FORMAT = new SimpleDateFormat("HH:mm:ss SSS", Locale.getDefault());
3
    
4
    protected void onCreate(Bundle savedInstanceState) {
5
        super.onCreate(savedInstanceState);
6
        String[] array = {"0、map",
7
                "1、flatMap 基础用法",
8
                "2、flatMap和concatMap的区别",
9
                "3、flatMap 实现多个网络请求依次依赖",
10
                "4、flatMap 实现多个网络请求依次依赖简化代码",
11
                "5、flatMapIterable 案例1",
12
                "6、flatMapIterable 案例2",
13
                "7、buffer(int count)",
14
                "8、buffer(count, skip)",
15
                "9、buffer(timespan, unit, count)",
16
        };
17
        setListAdapter(new ArrayAdapter<>(this, android.R.layout.simple_list_item_1, Arrays.asList(array)));
18
    }
19
    
20
    private int i;
21
    
22
    @Override
23
    protected void onListItemClick(ListView l, View v, int position, long id) {
24
        i++;
25
        switch (position) {
26
            case 0:
27
                Observable.just(new Date()) // Date 类型
28
                        .map(Date::getTime) // long 类型
29
                        .map(time -> time + 1000 * 60 * 60)// 改变 long 类型时间的值
30
                        .map(time -> new SimpleDateFormat("HH:mm:ss", Locale.getDefault()).format(new Date(time))) //String 类型
31
                        .subscribe(this::log);
32
                break;
33
            case 1:
34
                Observable.just(new Person(Arrays.asList("篮球", "足球", "排球")), new Person(Arrays.asList("画画", "跳舞")))
35
                        .map(person -> person.loves)
36
                        .flatMap(Observable::fromIterable) //fromIterable:逐个发送集合中的元素
37
                        .subscribe(this::log);
38
                break;
39
            case 2:
40
                long start = System.currentTimeMillis();
41
                Observable.just(Arrays.asList(1, 2, 3), Arrays.asList(4, 5))
42
                        .flatMap(list -> Observable.fromIterable(list).delay(list.size(), TimeUnit.SECONDS))//flatMap是无序的
43
                        .subscribe((s -> log("f:" + s)), e -> log("f"), () -> log("f耗时" + (System.currentTimeMillis() - start))); //3秒
44
                Observable.just(Arrays.asList("A", "B", "C"), Arrays.asList("D", "E"))
45
                        .concatMap(list -> Observable.fromIterable(list).delay(list.size(), TimeUnit.SECONDS))//concatMap是有序的
46
                        .subscribe(s -> log("c:" + s), e -> log("c"), () -> log("c耗时" + (System.currentTimeMillis() - start))); //5秒
47
                break;
48
            case 3:
49
                firstRequest("原始值:" + FORMAT.format(new Date(System.currentTimeMillis())))
50
                        .subscribeOn(Schedulers.io()) // 在io线程进行网络请求
51
                        .observeOn(AndroidSchedulers.mainThread()) // 在主线程处理请求结果
52
                        .doOnNext(response -> log("【第一个网络请求结束,响应为】" + response))//true
53
                        .observeOn(Schedulers.io()) // 回到 io 线程去处理下一个网络请求
54
                        .flatMap(this::secondRequest)//实现多个网络请求依次依赖
55
                        .observeOn(AndroidSchedulers.mainThread()) // 在主线程处理请求结果
56
                        .subscribe(string -> log("【第二个网络请求结束,响应为】" + string));//true,5 秒
57
                break;
58
            case 4:
59
                Observable.just("包青天").delay(1000, TimeUnit.MILLISECONDS) //第一个网络请求,返回姓名
60
                        .flatMap(s -> Observable.just(s + ",男").delay(1000, TimeUnit.MILLISECONDS)) //第二个网络请求,返回性别
61
                        .flatMap(s -> Observable.just(s + ",28岁").delay(1000, TimeUnit.MILLISECONDS)) //第三个网络请求,返回年龄
62
                        .subscribeOn(Schedulers.io())
63
                        .observeOn(AndroidSchedulers.mainThread())
64
                        .subscribe(this::log); //包青天,男,28岁,耗时:3058毫秒,true
65
                break;
66
            case 5:
67
                Observable.just(Arrays.asList("篮球1", "足球1"))
68
                        .flatMap(Observable::fromIterable) //返回一个 Observable
69
                        .subscribe(string -> log("" + string));
70
                Observable.just(Arrays.asList("篮球2", "足球2"))
71
                        .flatMapIterable(list -> list) //返回一个 Iterable 而不是另一个 Observable
72
                        .subscribe(string -> log("" + string));
73
                Observable.fromIterable(Arrays.asList("篮球3", "足球3")) //和上面两种方式的结果一样
74
                        .subscribe(string -> log("" + string));
75
                break;
76
            case 6:
77
                Observable.just(new Person(Arrays.asList("包青天", "哈哈")), new Person(Arrays.asList("白乾涛", "你好")))
78
                        .map(person -> person.loves)
79
                        .flatMap(Observable::fromIterable) //返回一个 Observable
80
                        .flatMap(string -> Observable.fromArray(string.toCharArray())) //返回一个 Observable
81
                        .subscribe(array -> log(Arrays.toString(array)));
82
                Observable.just(new Person(Arrays.asList("广州", "上海")), new Person(Arrays.asList("武汉", "长沙")))
83
                        .map(person -> person.loves)
84
                        .flatMap(Observable::fromIterable) //返回一个 Observable
85
                        .flatMapIterable(string -> Arrays.asList(string.toCharArray())) //返回一个 Iterable 而不是另一个 Observable
86
                        .subscribe(array -> log(Arrays.toString(array)));
87
                break;
88
            case 7:
89
                if (i % 2 == 0) {
90
                    Observable.range(1, 5).buffer(2)  //缓存区大小,步长==缓存区大小,等价于buffer(count, count)
91
                            .subscribe(list -> log(list.toString()), t -> log(""), () -> log("完成")); //[1, 2],[3, 4],[5],完成
92
                } else {
93
                    Observable.range(1, 10).buffer(10)  //将所有元素组装到集合中的效果
94
                            .subscribe(list -> log(list.toString())); //[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
95
                }
96
                break;
97
            case 8:
98
                if (i % 3 == 0) {
99
                    Observable.range(1, 5).buffer(3, 1) // 缓存区大小,步长;队列效果(先进先出)
100
                            .subscribe(list -> log(list.toString()));//[1, 2, 3],[2, 3, 4],[3, 4, 5],[4, 5],[5]
101
                } else if (i % 3 == 1) {
102
                    Observable.range(1, 5).buffer(5, 1) //每次剔除一个效果
103
                            .subscribe(list -> log(list.toString()));//[1, 2, 3, 4, 5],[2, 3, 4, 5],[3, 4, 5],[4, 5],[5]
104
                } else {
105
                    Observable.range(1, 5).buffer(1, 2) //只取奇数个效果
106
                            .subscribe(list -> log(list.toString()));//[1],[3],[5]
107
                }
108
                break;
109
            case 9:
110
                Observable<Integer> observable = Observable.create(emitter -> {
111
                    for (int i = 0; i < 8; i++) {
112
                        SystemClock.sleep(100);//模拟耗时操作
113
                        emitter.onNext(i);
114
                    }
115
                    emitter.onComplete();
116
                });
117
                if (i % 3 == 0) { //周期性订阅多个结果:
118
                    observable.buffer(250, TimeUnit.MILLISECONDS) //等价于 count = Integer.MAX_VALUE
119
                            .subscribe(list -> log("缓存区中事件:" + list.toString())); //[0, 1],[2, 3],[4, 5, 6],[7]
120
                } else { //当达到指定时间【或】缓冲区中达到指定数量时发射
121
                    observable.buffer(250, TimeUnit.MILLISECONDS, 2) //可以指定工作所在的线程
122
                            .subscribe(list -> log("缓存区中事件:" + list.toString())); //[0, 1],[],[2, 3],[],[4, 5],[6],[7]
123
                }
124
                break;
125
        }
126
    }
127
    
128
    private Observable<String> firstRequest(String parameter) {
129
        return Observable.create(emitter -> {
130
            SystemClock.sleep(2000);//模拟网络请求
131
            emitter.onNext(parameter + ",第一次修改:" + FORMAT.format(new Date(System.currentTimeMillis())));
132
            emitter.onComplete();
133
        });
134
    }
135
    
136
    private Observable<String> secondRequest(String parameter) {
137
        return Observable.create(emitter -> {
138
            SystemClock.sleep(3000);//模拟网络请求
139
            emitter.onNext(parameter + ",第二次修改:" + FORMAT.format(new Date(System.currentTimeMillis())));
140
            emitter.onComplete();
141
        });
142
    }
143
    
144
    private void log(String s) {
145
        String date = new SimpleDateFormat("HH:mm:ss SSS", Locale.getDefault()).format(new Date());
146
        Log.i("【bqt】", s + "," + date + "," + (Looper.myLooper() == Looper.getMainLooper()));
147
    }
148
}
2018-9-18

RxJava 变换操作符 map flatMap concatMap buffer

标签:next   完全   lov   complete   模拟   ids   z-index   使用   回调   

原文地址:https://www.cnblogs.com/baiqiantao/p/9688484.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!