1 public class CommandHelloWorld extends HystrixCommand<String> {
 2  
 3     private final String name;
 4  
 5     public CommandHelloWorld(String name) {
 6         super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("ExampleGroup"))  //必须
 7                 .andCommandPropertiesDefaults(HystrixCommandProperties.Setter()
 8                         .withExecutionTimeoutInMilliseconds(500))  //超时时间
 9                 .andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey("ExampleGroup-pool"))  //可选,默认 使用 this.getClass().getSimpleName();
10                 .andThreadPoolPropertiesDefaults(HystrixThreadPoolProperties.Setter().withCoreSize(4)));
11  
12         this.name = name;
13     }
14  
15     @Override
16     protected String run() throws InterruptedException {
17         System.out.println("running");
18         TimeUnit.MILLISECONDS.sleep(1000);
19         return "Hello " + name + "!";
20     }
21  
22     @Override
23     protected String getFallback() {
24         return "Hello "+"Fallback";
25     }
26 }
27   
28 @Test
29 public void fallbackTest(){
30     assertEquals("Hello Fallback",new CommandHelloWorld("World").execute());
31 }
 
 Q2:什么情况下会触发fallback?
简单来说,就是run方法抛异常,超时,线程/信号量reject、短路
 
 
 
 
 
 
以下为测试的主程序:
 1 public class CommandHelloFailure extends HystrixCommand<String> {
 2  
 3     private final String name;
 4  
 5     public CommandHelloFailure(String name) {
 6         super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("ExampleGroup"))  //必须
 7                 .andCommandPropertiesDefaults(HystrixCommandProperties.Setter()
 8                         .withExecutionTimeoutInMilliseconds(1000))  //超时时间
 9                 .andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey("ExampleGroup-pool"))
10                 .andThreadPoolPropertiesDefaults(HystrixThreadPoolProperties.Setter().withCoreSize(3)));
11  
12         this.name = name;
13     }
14  
15     @Override
16     protected String run() throws InterruptedException {
17         String theadName = this.getThreadPoolKey().name();
18         String cmdKey=this.getThreadPoolKey().name();
19         System.out.println("running begin , threadPool="+theadName+" cmdKey="+cmdKey+" name="+name);
20  
21         if("Exception".equals(name)) {
22             throw new RuntimeException("this command always fails");
23         }else if("Timeout".equals(name)){
24             TimeUnit.SECONDS.sleep(2);
25         }else if("Reject".equals(name)){
26             TimeUnit.MILLISECONDS.sleep(800);
27         }
28         System.out.println(" run end");
29  
30         return "Hello " + name + "!";
31     }
32  
33     @Override
34     protected String getFallback() {
35         StringBuilder sb = new StringBuilder("running fallback");
36         boolean isRejected = isResponseRejected();
37         boolean isException = isFailedExecution();
38         boolean isTimeout= isResponseTimedOut();
39         boolean isCircut = isCircuitBreakerOpen();
40  
41         sb.append(", isRejected:").append(isRejected);
42         sb.append(", isException:"+isException);
43         if(isException){
44             sb.append(" msg=").append(getExecutionException().getMessage());
45         }
46         sb.append(",  isTimeout: "+isTimeout);
47         sb.append(",  isCircut:"+isCircut);
48  
49         sb.append(", group:").append(this.getCommandGroup().name());
50         sb.append(", threadpool:").append(getThreadPoolKey().name());
51         System.out.println(sb.toString());
52  
53         String msg="Hello Failure " + name + "!";
54         return msg;
55     }
56 }
 
 
FAILURE 
测试由异常导致的fallback
1 @Test
2 public void expTest() {
3     assertEquals("Hello Failure Exception!", new CommandHelloFailure("Exception").execute());
4 }
5   
 
//控制台输出
running begin , threadPool=ExampleGroup-pool cmdKey=ExampleGroup-pool name=Exception
running fallback, isRejected:false, isException:true msg=this command always fails, isTimeout: false, isCircut:false, group:ExampleGroup, threadpool:ExampleGroup-pool
 
TIMEOUT
测试有超时导致的fallback
 
@Test
public void timeOutTest() {
    assertEquals("Hello Failure Timeout!", new CommandHelloFailure("Timeout").execute());
}
  
 
//控制台输出
running begin , threadPool=ExampleGroup-pool cmdKey=ExampleGroup-pool name=Timeout
running fallback, isRejected:false, isException:false, isTimeout: true, isCircut:false, group:ExampleGroup, threadpool:ExampleGroup-pool
 
THREAD_POOL_REJECTED
并发执行的任务数超过线程池和队列之和会被reject,导致fallback
1 @Test
2 public void rejectTest() throws InterruptedException {
3     int count = 5;
4     while (count-- > 0){
5         new CommandHelloFailure("Reject").queue();
6         TimeUnit.MILLISECONDS.sleep(100);
7     }
8 }
 
//控制台输出
running begin , threadPool=ExampleGroup-pool cmdKey=ExampleGroup-pool name=Reject
running begin , threadPool=ExampleGroup-pool cmdKey=ExampleGroup-pool name=Reject
running begin , threadPool=ExampleGroup-pool cmdKey=ExampleGroup-pool name=Reject
running fallback, isRejected:true, isException:false, isTimeout: false, isCircut:false, group:ExampleGroup, threadpool:ExampleGroup-pool
running fallback, isRejected:true, isException:false, isTimeout: false, isCircut:false, group:ExampleGroup, threadpool:ExampleGroup-pool
SEMAPHORE_REJECTED  与 THREAD_POOL_REJECTED 类似,不再演示
 
SHORT_CIRCUITED
在一定时间内,用户请求超过一定的比例失败时(timeout, failure, reject),断路器就会打开;短路器打开后所有请求直接走fallback
参数设置
 
 
 
 
 
 
 
 
一般配置如下:
1 Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("ExampleGroup"))  //必须
2         .andCommandPropertiesDefaults(HystrixCommandProperties.Setter()
3                 .withExecutionTimeoutInMilliseconds(50)//超时时间
4                 .withCircuitBreakerRequestVolumeThreshold(5)
5                 .withCircuitBreakerSleepWindowInMilliseconds(1000)
6                 .withCircuitBreakerErrorThresholdPercentage(50))
7         .andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey("ExampleGroup-pool"))  //可选,默认 使用 this.getClass().getSimpleName();
8         .andThreadPoolPropertiesDefaults(HystrixThreadPoolProperties.Setter().withCoreSize(4));
 
以上配置的含义是: 在10s内,如果请求在5个及以上,且有50%失败的情况下,开启断路器;断路器开启1000ms后尝试关闭
短路器的工作机制,引用自官方文档:
The precise way that the circuit opening and closing occurs is as follows:
Assuming the volume across a circuit meets a certain threshold (HystrixCommandProperties.circuitBreakerRequestVolumeThreshold())...
And assuming that the error percentage exceeds the threshold error percentage (HystrixCommandProperties.circuitBreakerErrorThresholdPercentage())...
Then the circuit-breaker transitions from CLOSED to OPEN.
While it is open, it short-circuits all requests made against that circuit-breaker.
After some amount of time (HystrixCommandProperties.circuitBreakerSleepWindowInMilliseconds()), the next single request is let through (this is the HALF-OPEN state). If the request fails, the circuit-breaker returns to the OPEN state for the duration of the sleep window. If the request succeeds, the circuit-breaker transitions to CLOSED and the logic in 1. takes over again.
 
Q3:fallback时我们应该怎么办?
一般有以下几种策略:
1、不实现getFallback方法:依赖调用失败时直接抛出异常
2、实现getFallback方法,返回默认值:这是一种常见的策略
3、实现getFallback方法,走降级方案
此外,生产环境中,fallback时,一般需要打点记录
请求合并
简单来说,就是将一段时间内的多次请求合并为一次请求,常用于网络IO中,能减少IO次数,缺点是增加平均延迟

以下是测试代码主程序:
 1 public class CommandCollapserGetValueForKey extends HystrixCollapser<List<String>, String, Integer> {
 2  
 3     private final Integer key;
 4  
 5     public CommandCollapserGetValueForKey(Integer key) {
 6         super(Setter.withCollapserKey(HystrixCollapserKey.Factory.asKey("Collapser"))
 7                 .andCollapserPropertiesDefaults(HystrixCollapserProperties.Setter()
 8                         .withMaxRequestsInBatch(3)
 9                 .withTimerDelayInMilliseconds(10)));
10         this.key = key;
11     }
12  
13     @Override
14     public Integer getRequestArgument() {
15         return key;
16     }
17  
18     @Override
19     protected HystrixCommand<List<String>> createCommand(final Collection<CollapsedRequest<String, Integer>> requests) {
20         return new BatchCommand(requests);
21     }
22  
23     @Override
24     protected void mapResponseToRequests(List<String> batchResponse, Collection<CollapsedRequest<String, Integer>> requests) {
25         int count = 0;
26         for (CollapsedRequest<String, Integer> request : requests) {
27             request.setResponse(batchResponse.get(count++));
28         }
29     }
30  
31     private static final class BatchCommand extends HystrixCommand<List<String>> {
32         private final Collection<CollapsedRequest<String, Integer>> requests;
33  
34         private BatchCommand(Collection<CollapsedRequest<String, Integer>> requests) {
35             super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("ExampleGroup"))
36                     .andCommandKey(HystrixCommandKey.Factory.asKey("GetValueForKey")));
37             this.requests = requests;
38         }
39  
40         @Override
41         protected List<String> run() {
42             System.out.println("BatchCommand run  "+requests.size());
43             ArrayList<String> response = new ArrayList<String>();
44             for (CollapsedRequest<String, Integer> request : requests) {
45                 // artificial response for each argument received in the batch
46                 response.add("ValueForKey: " + request.getArgument());
47             }
48             return response;
49         }
50     }
51 }
52   
53   
54 @Test
55 public void testCollapser() throws Exception {
56     HystrixRequestContext context = HystrixRequestContext.initializeContext();
57     try {
58         Future<String> f1 = new CommandCollapserGetValueForKey(1).queue();
59         Future<String> f2 = new CommandCollapserGetValueForKey(2).queue();
60         Future<String> f3 = new CommandCollapserGetValueForKey(3).queue();
61         Future<String> f4 = new CommandCollapserGetValueForKey(4).queue();
62  
63  
64         assertEquals("ValueForKey: 1", f1.get());
65         assertEquals("ValueForKey: 2", f2.get());
66         assertEquals("ValueForKey: 3", f3.get());
67         assertEquals("ValueForKey: 4", f4.get());
68  
69         // assert that the batch command ‘GetValueForKey‘ was in fact
70         // executed and that it executed only once
71         assertEquals(2, HystrixRequestLog.getCurrentRequest().getAllExecutedCommands().size());
72         HystrixCommand<?> command = HystrixRequestLog.getCurrentRequest().getAllExecutedCommands().toArray(new HystrixCommand<?>[1])[0];
73         // assert the command is the one we‘re expecting
74         assertEquals("GetValueForKey", command.getCommandKey().name());
75         // confirm that it was a COLLAPSED command execution
76         assertTrue(command.getExecutionEvents().contains(HystrixEventType.COLLAPSED));
77         // and that it was successful
78         assertTrue(command.getExecutionEvents().contains(HystrixEventType.SUCCESS));
79     } finally {
80         context.shutdown();
81     }
82 }
83   
84 //控制输出
85 BatchCommand run  3
86 BatchCommand run  1
 
执行流程:

 
使用该特性
1、必须继承HystrixCollapser类,
2、实现以下方法:
getRequestArgument: 返回请求参数对象
createCommand : 返回BatchCommand
mapResponseToRequests:实现Response和Request的映射
3、创建对应的BatchCommand类:批量请求的具体实现
 
参数配置:
 
 
 
 
 
 
一般配置如下
Setter.withCollapserKey(HystrixCollapserKey.Factory.asKey("Collapser"))
       .andCollapserPropertiesDefaults(HystrixCollapserProperties.Setter()
               .withMaxRequestsInBatch(3)
       .withTimerDelayInMilliseconds(5));
 
 
请求cache
 1 public class CommandUsingRequestCache extends HystrixCommand<Boolean> {
 2     private final int value;
 3      
 4     public CommandUsingRequestCache(int value) {
 5         super(HystrixCommandGroupKey.Factory.asKey("ExampleGroup"));
 6         this.value = value;
 7     }
 8  
 9     @Override
10     public Boolean run() {
11         return value == 0 || value % 2 == 0;
12     }
13  
14    //使用cache功能,必须实现该方法
15     @Override
16     public String getCacheKey() {
17         return String.valueOf(value);
18     }
19 }
20   
21 @Test
22 public void testWithCacheHits() {
23     HystrixRequestContext context = HystrixRequestContext.initializeContext();
24     try {
25         CommandUsingRequestCache command2a = new CommandUsingRequestCache(2);
26         CommandUsingRequestCache command2b = new CommandUsingRequestCache(2);
27  
28         assertTrue(command2a.execute());
29         //第一次请求,没有cache
30         assertFalse(command2a.isResponseFromCache());
31  
32         assertTrue(command2b.execute());
33         // 第二次请求,从cache中拿的结果
34         assertTrue(command2b.isResponseFromCache());
35     } finally {
36         context.shutdown();
37     }
38  
39     context = HystrixRequestContext.initializeContext();
40     try {
41         CommandUsingRequestCache command3b = new CommandUsingRequestCache(2);
42         assertTrue(command3b.execute());
43         // this is a new request context so this
44         //new了新的 request context后,之前的cache失效
45         assertFalse(command3b.isResponseFromCache());
46     } finally {
47         context.shutdown();
48     }
49 }
 
Hystrix Context
 Global Context
UserRequest Context
 
使用与监控
1、工程中使用
使用Hystrix很简单,只需要添加相应依赖即可,以Maven为例:
 
 1 <!-- hystrix 依赖 -->
 2 <dependency>
 3     <groupId>com.netflix.hystrix</groupId>
 4     <artifactId>hystrix-core</artifactId>
 5     <version>1.5.9</version>
 6 </dependency>
 7 <dependency>
 8     <groupId>com.netflix.hystrix</groupId>
 9     <artifactId>hystrix-metrics-event-stream</artifactId>
10     <version>1.5.9</version>
11 </dependency>
 
2、DashBoard使用 
web.xml中配置相应的Servlet
1 <servlet>
2           <display-name>HystrixMetricsStreamServlet</display-name>
3           <servlet-name>HystrixMetricsStreamServlet</servlet-name>
4           <servlet-class>com.netflix.hystrix.contrib.metrics.eventstream.HystrixMetricsStreamServlet</servlet-class>
5 </servlet>
6 <servlet-mapping>
7           <servlet-name>HystrixMetricsStreamServlet</servlet-name>
8           <url-pattern>/hystrix.stream</url-pattern>
9    </servlet-mapping>
 
下载附件中的war文件和jar文件到任意目录,执行
java -jar jetty-runner-9.2.10.v20150310.jar --port 8410 hystrix-dashboard-1.5.1.war
然后在浏览器中打开:http://localhost:8410/  ,在输入框中填写 http://hostname:port/application/hystrix.stream, 点击 Add Stream ,然后在点击Monitor Stream, 看到如下图:

每个指标对应的含义:

一般来说: Thread-pool Rejections  和Failuress/Exception应该是0,Thread timeouts是个很小的值。
代码结构
附件
1、启动脚本 start.sh
2、hystrix-dashboard-1.5.1.war
3、jetty-runner-9.2.10.v20150310.jar