β

rxjava2——线程切换

Harries Blog™ 286 阅读

简介

对rx java 有一个简单的学习之后,笔者还是很难 理解rxjava 在 服务端 的使用,感觉学习了hystrix 之后,这块的理解会更深刻一些。

首先对于 同步 调用,rxjava的作用有限,而对于异步调用,对于类似于netty这种 方法直接返回future的,rxjava也套不上。其所谓异步调用,通常是另起 线程 执行一个同步调用(从驱动线程的角度看,这就是一个异步调用了),由此成为一个 多线程 代码 ,解决多线程环境下的 数据 流控制问题。

观察者模式的推和拉

模型

class Subject {
	private List<Observer> observers = new ArrayList<Observer>();
	public void notify(){
		for(Observer observer : observers){
			observer.update(data);	// data 是Subject 全部或部分信息
		}
	}
}

拉模型

class Subject {
	private List<Observer> observers = new ArrayList<Observer>();
	public void notify(){
		for(Observer observer : observers){
			observer.update(this); // observer 看情况通过 Subject 引用获取数据
		}
	}
}

线程切换

线程控制绝对是RxJava的重点之一。在不指定线程的情况下,RxJava遵循的是线程不变的原则,在哪个线程调用subscribe(),就在哪个线程生产、消费事件。

线程控制的 本质 还是 将 当前 Observable 转换为 另一个Observable,具体的说是转换Observable的onSubscribe 方法,跟filter 等普通的数据转换一样一样的。明面上是线程切换,其实是函数 包装。

public Observable<T> observeOn(Scheduler scheduler) {
  return observeOn(this, scheduler);
}
public Observable<T> subscribeOn(Scheduler scheduler) {
  return subscribeOn(this, scheduler);
}
public Observable<T> filter(Func1<T, Boolean> predicate) {
  return filter(this, predicate);
}

谜之RxJava (三)update 2 —— subscribeOn 和 observeOn 的区别

笔者最早找到 支持observeOn 的版本 0.10.0

从0.10.0 可以看到,无论是observeOn 还是subscribeOn, 参数 都是Scheduler,都会导致 代码切换到 另一个线程(由Scheduler 实现类决定)执行。只是observeOn 只是 表示 其之后的操作,由observeOn 指定的Scheduler执行。subscribeOn 则是 之前及之后的操作 都由subscribeOn 指定的Scheduler 执行,直到遇到observeOn。

subscribeOn

Func1<Observer<T>, Subscr ip tion>
叫 onSubscribe,Subscribe 是 Subscribe ,别弄混onSubscribe和Subscribe。

public static <T> Func1<Observer<T>, Subscription> subscribeOn(Observable<T> source, Scheduler scheduler) {
  return new SubscribeOn<T>(source, scheduler);
}

private static class SubscribeOn<T> implements Func1<Observer<T>, Subscription> {
  private final Observable<T> source;
  private final Scheduler scheduler;

  public SubscribeOn(Observable<T> source, Scheduler scheduler) {
    this.source = source;
    this.scheduler = scheduler;
  }

  @Override
  public Subscription call(final Observer<T> observer) {
    return scheduler.schedule(new Func0<Subscription>() {
      @Override
      public Subscription call() {
        return new ScheduledSubscription(source.subscribe(observer), scheduler);
      }
    });
  }
}

Observable.subscribeOn 的逻辑链条,根据 当前Observable 和 scheduler 创建一个新的 Func1<Observer<T>, Subscription>
onSubscribe (学名叫subscribeOn )并基于此创建新的 Observable。 转换 onSubscribe 过程涉及到 几个Subscription 的转换

 1. 当前 Observable.subscribe(observer) 返回 Subscribe
 2. 将 Subscribe 封装为 ScheduledSubscription
 3. 将 ScheduledSubscription 封装为 SafeObservableSubscription

以NewThreadScheduler 为例

Observable.filter()				
			.map1()		
			.subscribeOn(NewThreadScheduler)
			.map2()
			.subscribe(xx)

以filter操作为例

// class Observable
public Observable<T> filter(Func1<T, Boolean> predicate) {
  return filter(this, predicate);
}
public static <T> Observable<T> filter(Observable<T> that, Func1<T, Boolean> predicate) {
  return create(OperationFilter.filter(that, predicate));
}
// class OperationFilter
public static <T> Func1<Observer<T>, Subscription> filter(Observable<T> that, Func1<T, Boolean> predicate) {
  return new Filter<T>(that, predicate);
}
 	private static class Filter<T> implements Func1<Observer<T>, Subscription> {
  private final Observable<T> that;
  private final Func1<T, Boolean> predicate;
  public Filter(Observable<T> that, Func1<T, Boolean> predicate) {
    this.that = that;
    this.predicate = predicate;
  }
  public Subscription call(final Observer<T> observer) {
    ...
   	that.subscribe(new Observer<T>() {
      public void onNext(T value) {
        try {
          if (predicate.call(value)) {
            observer.onNext(value);
          }
        } catch (Throwable ex) {
          observer.onError(ex);
          ...
        }
      }
      public void onError(Throwable ex) {
        observer.onError(ex);
      }
      public void onCompleted() {
        observer.onCompleted();
      }
    });
    ...
  }
}
 1. filter 时的 Observable 和 最后 subscribe 当时的 Observable 已经不是同一个了。filter 时的observer 是 new 出来的,跟最后subscribe 方法参数的 observer 也不是同一个。

  动作 源Observable 对应observer
  filter Observable observer3.onNext
  map1 Observable1 observer2.onNext
  subscribeOn Observable2 observer1.onNext 只是异步驱动了一下
  map2 Observable3 observer1.onNext
  subscribe Observable4 observer.onNext

  rxjava 通过封装,只将原始的Observable 和 observer 暴露给了用户。

 2. 下一个Observable 简介持有 上一个 Observable 的引用
 3. 最新的Observable4.subscribe 驱动整个逻辑 开始 执行,具体的说 是驱动 其对应的 Func1<Observer<T>, Subscription>
  的执行。
 4. Observable4.subscribe 实现是 Observable4. onSubscribe.call ,方法执行链条为

  Observable4.subscribe ==> 
   Observable4.onSubscribe.call ==> 	
   Observable3.subscribe ==> 
   Observable3.subscribeOn.call ==> 驱动线程执行完毕,切换thread 
   Observable2.subscribe ==> 
   Observable2.onSubscribe.call ==> 
   Observable1.subscribe ==> 
   Observable1.onSubscribe.call ==> 
   observer3.onNext ==> 
   filter ==> 
   observer2.onNext ==> 
   ...
   observer.onNext

对于这个方法执行链

RxJava for 100% beginners (part3-switching threads)
subscribeOn()
change the thread for emitting the source Observable’s elements, no matter where you put it in your “chain”.

用一张图解释RxJava中的线程控制
则将这个方法链分为两个阶段

 1. 驱动阶段,从下游到上游,反向驱动
 2. 事件发射阶段。第一个Observable开始产生事件,然后事件流就开始正向传递

这也就解答了笔者的一个疑惑, 为什么subscribeOn 放在任何位置 对“副作用函数” 都有效?
因为线程的切换 在事件驱动阶段,而副作用函数的执行 在事件发射阶段。

observeOn

以下列代码为例

Observable.filter()				
			.map1()		
			.observerOn(NewThreadScheduler)
			.map2()
			.subscribe(xx)

分析下 ObserveOn 源码

// OperationObserveOn
  	public static <T> Func1<Observer<T>, Subscription> observeOn(Observable<T> source, Scheduler scheduler) {
  return new ObserveOn<T>(source, scheduler);
}

private static class ObserveOn<T> implements Func1<Observer<T>, Subscription> {
  private final Observable<T> source;
  private final Scheduler scheduler;

  public ObserveOn(Observable<T> source, Scheduler scheduler) {
    this.source = source;
    this.scheduler = scheduler;
  }

  @Override
  public Subscription call(final Observer<T> observer) {
    if (scheduler instanceof ImmediateScheduler) {
      // do nothing if we request ImmediateScheduler so we don't invoke overhead
      return source.subscribe(observer);
    } else {
      return source.subscribe(new ScheduledObserver<T>(observer, scheduler));
    }
  }
}

分析Observable 与 observer 的 变换

动作 源Observable 对应observer
filter Observable observer4.onNext
map1 Observable1 observer3.onNext
subscribeOn Observable2 observer2.onNext 变成了ScheduledObserver
map2 Observable3 observer1.onNext
subscribe Observable4 observer.onNext

分析方法执行链

Observable4.subscribe ==> 
Observable4.onSubscribe.call ==> 	
Observable3.subscribe ==> 
Observable3.subscribeOn.call ==> 
Observable2.subscribe ==> 
Observable2.onSubscribe.call ==> 
Observable1.subscribe ==> 
Observable1.onSubscribe.call ==> 
observer4.onNext ==> 
filter ==> 
observer3.onNext ==> 
map ==>
observer2.onNext ==> // 提交事件,驱动线程执行完毕,另一个线程执行下面的逻辑(接收事件并驱动后续执行)

...
observer.onNext

小结

形式上顺序执行filter、map 等,从上到下,实际上是subscribe 才真正触发执行,但最后还是按照filter、map 的顺序 执行业务逻辑——代码腾挪的艺术。

突然奇想对照下 build er 模式,示例代码可以类比为

Observable.setFilter(filterFunction)	
			.setMap1(map1Function)		
			.subscribeOn(NewThreadScheduler)
			.setMap2(map2Function)
			.setObserver(observer)
			.build()

类似于 函数式编程 ,返回函数 或者 函数接口的,一定要小心, 代码写在哪里 跟 代码什么时候执行 没啥关系
, 经常违反直觉。

原文

http ://qiankunli. git hub.io/2018/07/31/rxjava2.html

本站部分文章源于互联网,本着传播知识、有益学习和研究的目的进行的转载,为网友免费提供。如有著作权人或出版方提出异议,本站将立即删除。如果您对文章转载有任何疑问请告之我们,以便我们及时纠正。 PS:推荐一个微信公众号: askHarries 或者qq群:474807195,里面会分享一些资深架构师录制的视频录像:有Spring,MyBatis, Netty 源码分析,高并发、高性能、分布式、微服务架构的原理,JVM性能优化这些成为架构师必备的知识体系。还能领取免费的学习资源,目前受益良多

转载请注明原文出处: Harries Blog™ » rxjava2——线程切换

作者:Harries Blog™
追心中的海,逐世界的梦
原文地址:rxjava2——线程切换, 感谢原作者分享。