码迷,mamicode.com
首页 > 其他好文 > 详细

hystrix总结之请求批量执行

时间:2018-01-12 13:22:10      阅读:168      评论:0      收藏:0      [点我收藏+]

标签:tco   let   hold   call   parent   extends   for   res   alt   

  hystrix可以将同一个命令的多次执行合并到一起执行。

public class HelloWorldCommandCollapser extends HystrixCollapser<List<String>,String,String> {
    private String name;
    public HelloWorldCommandCollapser(String name){
        this.name = name;
    }
    @Override
    public String getRequestArgument() {
        return name;
    }
    @Override
    protected HystrixCommand<List<String>> createCommand(Collection<CollapsedRequest<String, String>> collapsedRequests) {
        return new BatchHystrixCommand(collapsedRequests);
    }
    @Override
    protected void mapResponseToRequests(List<String> batchResponse, Collection<CollapsedRequest<String, String>> collapsedRequests) {
        int i =0;
        for(CollapsedRequest collapsedRequest:collapsedRequests){
            collapsedRequest.setResponse(batchResponse.get(i));
            i++;
        }
    }
    private class BatchHystrixCommand extends HystrixCommand{
        private Collection<CollapsedRequest<String, String>> collapsedRequests;
        public BatchHystrixCommand(Collection<CollapsedRequest<String, String>> collapsedRequests){
            super(HystrixCommandGroupKey.Factory.asKey("ExampleGroup"));
            this.collapsedRequests =collapsedRequests;
        }
        @Override
        protected Object run() throws Exception {
            List<String> result = new ArrayList<String>();
            for(CollapsedRequest collapsedRequest:collapsedRequests){
                result.add("helloworld:"+collapsedRequest.getArgument());
            }
            return result;
        }
    }

  方法调用

HystrixRequestContext context = HystrixRequestContext.initializeContext();
        try{
            String result1 = new HelloWorldCommandCollapser("one").execute();
            String result2 = new HelloWorldCommandCollapser("two").execute();
            String result3 = new HelloWorldCommandCollapser("three").execute();
            String result4 = new HelloWorldCommandCollapser("four").execute();
            String result5 = new HelloWorldCommandCollapser("five").execute();
            String result6 = new HelloWorldCommandCollapser("six").execute();
            System.out.println(result1);
            System.out.println(result2);
            System.out.println(result3);
            System.out.println(result4);
            System.out.println(result5);
            System.out.println(result6);
        }finally {
            context.shutdown();
        }

  继承HystrixCollapser的命令,命令将会被集合到一起,当数量或时间到达设定的触发点时,统一执行。

  getRequestArgument 获取请求参数,命令执行时,实际是将该方法的参数设置到批量执行对象中。

  createCommand 批量执行对象通过该方法获得实际执行批量的命令,并返回结果。

  mapResponseToRequests 批量执行对象获得执行结果后,将结果与请求进行匹配。

  本质原理如下:

  当执行继承HystrixCollapser方法时,命令不会被实际执行,会获取getRequestArgument获得执行参数,添加到批量执行的对象中去。

public Observable<ResponseType> toObservable(Scheduler observeOn) {
        return Observable.defer(new Func0<Observable<ResponseType>>() {
            @Override
            public Observable<ResponseType> call() {
               ...
                RequestCollapser<BatchReturnType, ResponseType, RequestArgumentType> requestCollapser = collapserFactory.getRequestCollapser(collapserInstanceWrapper);
                Observable<ResponseType> response = requestCollapser.submitRequest(getRequestArgument());
...
                return response;
            }
        });
    }

  RequestCollapser是批量执行的对象,它有两种作用域,一个是全局范围,一个是一个请求范围内。全局范围通过今天变量实现,一个请求范围通过HystrixRequestVariableHolder实现。  

  当向RequestCollapser添加参数时,当参数到达一定数量时,就会执行批量。

public Observable<ResponseType> submitRequest(final RequestArgumentType arg) {
        ...
        while (true) {
            final RequestBatch<BatchReturnType, ResponseType, RequestArgumentType> b = batch.get();
            ...final Observable<ResponseType> response;
            if (arg != null) {
                response = b.offer(arg);
            } else {
                response = b.offer( (RequestArgumentType) NULL_SENTINEL);
            }
            //如果到达一定数量,respose返回null
            if (response != null) {
                return response;
            } else {
                //执行批量
                createNewBatchAndExecutePreviousIfNeeded(b);
            }
        }
    }

  RequestCollapser内部有一个定时器,每个一定时间就会批量执行并返回结果。  

private class CollapsedTask implements TimerListener {
        final Callable<Void> callableWithContextOfParent;
        CollapsedTask() {
            callableWithContextOfParent = new HystrixContextCallable<Void>(concurrencyStrategy, new Callable<Void>() {
                @Override
                public Void call() throws Exception {
                    ...
            RequestBatch<BatchReturnType, ResponseType, RequestArgumentType> currentBatch = batch.get();
            if (currentBatch != null && currentBatch.getSize() > 0) {
              createNewBatchAndExecutePreviousIfNeeded(currentBatch);
            }
            ... } }); } @Override public void tick() { ...
        callableWithContextOfParent.call();
       ...
} @Override public int getIntervalTimeInMilliseconds() { return properties.timerDelayInMilliseconds().get(); } }

  批量执行

public void executeBatchIfNotAlreadyStarted() {
     ...
            try {
              Collection<Collection<CollapsedRequest<ResponseType, RequestArgumentType>>> shards = commandCollapser.shardRequests(argumentMap.values());
                for (final Collection<CollapsedRequest<ResponseType, RequestArgumentType>> shardRequests : shards) {
                    try {
                        Observable<BatchReturnType> o = commandCollapser.createObservableCommand(shardRequests);//获取批量执行结果
              //批量执行结果映射到执行请求中
                        commandCollapser.mapResponseToRequests(o, shardRequests).doOnError(new Action1<Throwable>() {
               ...
}).doOnCompleted(new Action0() {                ... }).subscribe(); } catch (Exception e) { ... } } } catch (Exception e) { ... } finally { batchLock.writeLock().unlock(); } } }

 

hystrix总结之请求批量执行

标签:tco   let   hold   call   parent   extends   for   res   alt   

原文地址:https://www.cnblogs.com/zhangwanhua/p/8271618.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!