码迷,mamicode.com
首页 > 其他好文 > 详细

Hystrix源码

时间:2019-10-19 21:06:46      阅读:91      评论:0      收藏:0      [点我收藏+]

标签:操作   feign   去除   eth   dev   context   timer   允许   run   

HystrixInvocationHandler.invoke()--->HystrixCommand.execute()--->queue()--->toObservable().toBlocking.toFuture()--->toFuture方法中that.single().subscribe()订阅subscriber

而生成Observable的逻辑是:toObservable--->applyHystrixSemantics(cmd)--->executeCommandAndObserve()--->executeCommandWithSpecifiedIsolation()--->getUserExecutionObservable()

--->getExecutionObservable()--->Observable.just(run())/Observable.error(ex); run方法中是feign和ribbon的请求逻辑。

去除掉一些空方法或者无用的逻辑以及defer的部分,生成的Observable就是Observable.just(run())

.doOnTerminate().doOnUnsubscribe().subscribeOn()---在executeCommandWithSpecifiedIsolation方法中

.lift(new HystrixObservableTimeoutOperator<R>(_cmd)).doOnNext(markEmits).doOnCompleted(markOnCompleted).onErrorResumeNext(handleFallback).doOnEach(setRequestContext)---executeCommandAndObserve方法中

.doOnError(markExceptionThrown).doOnTerminate(singleSemaphoreRelease).doOnUnsubscribe(singleSemaphoreRelease)---applyHystrixSemantics方法中

.doOnTerminate(terminateCommandCleanup).doOnUnsubscribe(unsubscribeCommandCleanup).doOnCompleted(fireOnCompletedHook)---toObservable方法中

toObservable方法中,经过上面的十多个方法,一层一层的装饰justObservable。然后在toObservable().subscribe(原始subscriber)方法中,一层一层的剥离得到justObservable的同时,一层层终极subscriber(简称终极subscriber)。最后会师的时候,执行

justObservable.onSubscribe的call(终极的subscriber) ,调用终极的subscriber的链式的一路setProducer(WeakSingleProducer producer)下去,直到原始subscriber没有子subscriber为止(注意中间有可能

被lift给截断),就开始调用producer的request方法,因为producer里面有终极的subscriber的引用,request开始调用终极的subscriber(没被取消订阅unsubscribe的)的onNext,

onComplete方法一路再这样链式一直到原始subscriber。
-------------------------------------------------------------------------------------------------------------------------------------------
Hystrix的超时原理:

在上面的lift(new HystrixObservableTimeoutOperator<R>(_cmd))中,HystrixObservableTimeoutOperator的call方法中,新建一个TimerListener,开启一个定时任务,放入HystrixTimer这个线程池中,这个设计有意思,这个定时任务在

getIntervalTimeInMilliseconds也就是originalCommand.properties.executionTimeoutInMilliseconds的时间后,执行一个originalCommand.isCommandTimedOut.compareAndSet(TimedOutStatus.NOT_EXECUTED, TimedOutStatus.TIMED_OUT)

的操作,如果成功了,说明正常的just(run)方法的后续操作还没有到达这一步,也就是说明超时了。如果超时了,会执行两个方法,先说第一个:

CompositeSubscription s.unsubscribe,这是在新开的线程的1秒钟以后的定时任务方法中,这个方法执行之前,异步的“主”线程中已经执行了s.add(parent),这个parent是Subscriber链中的一员。

s.unsubscribe会调用parent的unsubscribe,在上面说的subscriber链式调用中,开始的时候是WeakSingleProducer的request方法,判断,终极subscriber.isUnsubscribed,如果是,返回不执行下面的onNext和onCompleted,

那么问题来了,刚才说的CompositeSubscription的unsubscribe只是解除了parent的订阅,而parent只是终极的subscriber的递归链中的一员,为什么parent的订阅的解除可以引起终极的subscriber的订阅的解除呢?

注意subscriber的装饰过程中,构造方法的shareSubscriptions总是true,也就是subscriptions是共享的,链中所有的subscriber的subscriptions都指向同一个对象,而且subscriptions的unsubscribed是volatile的,

所以是线程可见的。s.add(parent)可以被子线程的s.unsubscribe看到是因为对subscriptionList的操作都是在synchronized内部的。
-------------------------------------------------------------------------------------------------------------------------------------------
如果在1秒钟内完成请求,就会调用onNext一路然后再onCompleted一路,注意调用onNext的是上面说的终极subscriber,现在开始一层一层剥离了,上面14个封装里的onNext,最先被调用的是最外层的

.doOnCompleted(fireOnCompletedHook),不过它的onNext是空,直到executeCommandAndObserve方法中的execution.doOnNext(markEmits),里面有circuitBreaker.markSuccess(),就是如果断路器现在是开着

的,当前的访问是漏网之鱼,那么就把断路器关掉,这个暂时不管,然后到最后也没什么主要的逻辑。再看onCompleted,首先执行terminateCommandCleanup,handleCommandEnd,metrics.markCommandDone

,HystrixThreadEventStream.getInstance().executionDone,writeOnlyCommandCompletionSubject.onNext(event)。这个是告诉hystrix的统计功能,这次请求的结果类型是成功了还是失败了,是用来计算成功率以

决定断路器要不要开启。
-------------------------------------------------------------------------------------------------------------------------------------------
circuitBreaker的获取是从一个静态concurrentHashMap,key是feignClient名+方法名,也就是说断路器是方法级别的。
-------------------------------------------------------------------------------------------------------------------------------------------
HystrixCommand是请求级别的,每一次请求都会实例化一个
-------------------------------------------------------------------------------------------------------------------------------------------
判断断路器有没有开启是在applyHystrixSemantics中circuitBreaker.allowRequest(),点进去

允许请求的话是判断断路器是不是没有开启:!isOpen(),或者另外一种情况:在断路器开启的情况下,每隔circuitBreakerSleepWindowInMilliseconds的时间要试探性的访问一次

先看isOpen的逻辑:从metrics取出metrics.getHealthCounts(),判断getTotalRequests和getErrorPercentage,总访问量和失败比例,点进getHealthCounts,healthCountsStream.getLatest()。

healthCountsStream的生成是在metrics的构造方法中,HealthCountsStream.getInstance(key, properties); 点进去看,

HealthCountsStream继承了BucketedRollingCounterStream,BucketedRollingCounterStream继承BucketedCounterStream,并在构造方法中生成一个HystrixCommandCompletionStream。

BucketedRollingCounterStream的核心属性sourceStream是父类BucketedCounterStream的核心属性bucketedStream经过一系列的封装而来,而bucketedStream又是上面说的HystrixCommandCompletionStream

经过一系列的封装而来。

healthCountsStream.getLatest()调用的是counterSubject.getValue(),而new HealthCountsStream的时候调用的startCachingStreamValuesIfUnstarted方法中,BucketedRollingCounterStream的sourceStream

属性会subscribe(counterSubject),这样HealthCounts就和BucketedRollingCounterStream联系上了。

每次请求成功后,终极Subscriber的onCompleted会调用handleCommandEnd,metrics.markCommandDone,会调用HystrixThreadEventStream的writeOnlyCommandCompletionSubject.onNext(event);

HystrixThreadEventStream的构造方法中,会让writeOnlyCommandCompletionSubject.doOnNext(writeCommandCompletionsToShardedStreams),writeCommandCompletionsToShardedStreams是一个Action,

其call方法中,调用HystrixCommandCompletionStream的write方法,又知道HystrixCommandCompletionStream是BucketedRollingCounterStream的核心。这样请求成功就可以反映到HealthCounts上了。

而BucketedCounterStream中bucketedStream初始化的时候,inputEventStream.window方法中,启动一个定时任务定时获取数据进行统计。

Hystrix源码

标签:操作   feign   去除   eth   dev   context   timer   允许   run   

原文地址:https://www.cnblogs.com/chuliang/p/11705028.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!