标签:
对rxJava不了解的同学可以先看
RxJava 和 RxAndroid 一 (基础)
RxJava 和 RxAndroid 二(操作符的使用)
RxJava 和 RxAndroid 三(生命周期控制和内存优化)
RxJava 和 RxAndroid 四(RxBinding的使用)
本文将有几个例子说明,rxjava线程调度的正确使用姿势。
例1
Observable
.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
Logger.v( "rx_call" , Thread.currentThread().getName() );
subscriber.onNext( "dd");
subscriber.onCompleted();
}
})
.map(new Func1<String, String >() {
@Override
public String call(String s) {
Logger.v( "rx_map" , Thread.currentThread().getName() );
return s + "88";
}
})
.subscribe(new Action1<String>() {
@Override
public void call(String s) {
Logger.v( "rx_subscribe" , Thread.currentThread().getName() );
}
}) ;
结果
/rx_call: main -- 主线程
/rx_map: main -- 主线程
/rx_subscribe: main -- 主线程
例2
new Thread(new Runnable() {
@Override
public void run() {
Logger.v( "rx_newThread" , Thread.currentThread().getName() );
rx();
}
}).start();
void rx(){
Observable
.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
Logger.v( "rx_call" , Thread.currentThread().getName() );
subscriber.onNext( "dd");
subscriber.onCompleted();
}
})
.map(new Func1<String, String >() {
@Override
public String call(String s) {
Logger.v( "rx_map" , Thread.currentThread().getName() );
return s + "88";
}
})
.subscribe(new Action1<String>() {
@Override
public void call(String s) {
Logger.v( "rx_subscribe" , Thread.currentThread().getName() );
}
}) ;
}
结果
/rx_newThread: Thread-564 -- 子线程
/rx_call: Thread-564 -- 子线程
/rx_map: Thread-564 -- 子线程
/rx_subscribe: Thread-564 -- 子线程
例3
Observable
.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
Logger.v( "rx_call" , Thread.currentThread().getName() );
subscriber.onNext( "dd");
subscriber.onCompleted();
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.map(new Func1<String, String >() {
@Override
public String call(String s) {
Logger.v( "rx_map" , Thread.currentThread().getName() );
return s + "88";
}
})
.subscribe(new Action1<String>() {
@Override
public void call(String s) {
Logger.v( "rx_subscribe" , Thread.currentThread().getName() );
}
}) ;
结果
/rx_call: RxCachedThreadScheduler-1 --io线程
/rx_map: main --主线程
/rx_subscribe: main --主线程
例4
Observable
.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
Logger.v( "rx_call" , Thread.currentThread().getName() );
subscriber.onNext( "dd");
subscriber.onCompleted();
}
})
.map(new Func1<String, String >() {
@Override
public String call(String s) {
Logger.v( "rx_map" , Thread.currentThread().getName() );
return s + "88";
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Action1<String>() {
@Override
public void call(String s) {
Logger.v( "rx_subscribe" , Thread.currentThread().getName() );
}
}) ;
结果
/rx_call: RxCachedThreadScheduler-1 --io线程
/rx_map: RxCachedThreadScheduler-1 --io线程
/rx_subscribe: main --主线程
map() , flapMap() , scan() , filter() 等 -- 事件加工
subscribe() -- 事件消费
事件加工:默认跟事件产生的线程保持一致, 可以由 observeOn() 自定义线程
事件消费:默认运行在当前线程,可以有observeOn() 自定义
例5 多次切换线程
Observable
.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
Logger.v( "rx_call" , Thread.currentThread().getName() );
subscriber.onNext( "dd");
subscriber.onCompleted();
}
})
.observeOn( Schedulers.newThread() ) //新线程
.map(new Func1<String, String >() {
@Override
public String call(String s) {
Logger.v( "rx_map" , Thread.currentThread().getName() );
return s + "88";
}
})
.observeOn( Schedulers.io() ) //io线程
.filter(new Func1<String, Boolean>() {
@Override
public Boolean call(String s) {
Logger.v( "rx_filter" , Thread.currentThread().getName() );
return s != null ;
}
})
.subscribeOn(Schedulers.io()) //定义事件产生线程:io线程
.observeOn(AndroidSchedulers.mainThread()) //事件消费线程:主线程
.subscribe(new Action1<String>() {
@Override
public void call(String s) {
Logger.v( "rx_subscribe" , Thread.currentThread().getName() );
}
}) ;
结果
/rx_call: RxCachedThreadScheduler-1 -- io 线程
/rx_map: RxNewThreadScheduler-1 -- new出来的线程
/rx_filter: RxCachedThreadScheduler-2 -- io线程
/rx_subscribe: main -- 主线程
标签:
原文地址:http://www.cnblogs.com/zhaoyanjun/p/5624395.html