转载自爱编码真是太好了
在Hystrix上篇中我们介绍了Hystrix一些基本的特性,在下篇中我们主要来介绍它的一些高级特性Request Context、Request Cache和Request Collapsing。
RequestCache和RequestCollapsing是Hystrix高级的特性,一般来说,我们会在Request范围内使用这些高级特性,需要先初始化HystrixRequestContext进行生命周期的管理,在标准Web应用中我们可以借助过滤器来解决这个问题。
引入依赖
<dependency>
<groupId>com.netflix.hystrix</groupId>
<artifactId>hystrix-core</artifactId>
<version>1.5.9</version>
</dependency>
HystrixRequestContext对象初始化、释放方式
在web.xml中添加过滤器相关配置:
<filter>
<display-name>HystrixRequestContextServletFilter</display-name>
<filter-name>HystrixRequestContextServletFilter</filter-name>
<filter-class>com.netflix.hystrix.contrib.requestservlet.HystrixRequestContextServletFilter</filter-class>
</filter>
<filter-mapping>
<filter-name>HystrixRequestContextServletFilter</filter-name>
<url-pattern>/*</url-pattern>
</filter-mapping>
增加自定义过滤器
public classHystrixRequestContextServletFilter implements Filter {
@Override
public void doFilter(ServletRequest request, ServletResponse response,FilterChain chain)
throws IOException, ServletException {
HystrixRequestContext context = HystrixRequestContext.initializeContext();
try {
chain.doFilter(request, response);
} finally {
context.shutdown();
}
}
}
在应用中使用过滤器的目的是在执行每次请求的时候能及时对HystrixRequestContext进行初始化,在当前请求结束时关闭HystrixRequestContext,及时释放资源。
RequestCache实现及使用方式
假设两个线程发起相同的HTTP请求,Hystrix会把请求参数初始化到ThreadLocal中,两个Command异步执行,每个Command会把请求参数从ThreadLocal中拷贝到Command所在自身的线程中,Command在执行的时候会通过CacheKey优先从缓存中尝试获取是否已有缓存结果,如果命中,直接从HystrixRequestCache返回,如果没有命中,那么需要进行一次真实调用,然后把结果回写到缓存中,在请求范围内共享响应结果。
RequestCache主要有三个优点:
1.在当次请求内对同一个依赖进行重复调用,只会真实调用一次。
2.在当次请求内数据可以保证一致性。
3.可以减少不必要的线程开销。
具体使用:
public classHelloHystrixRequestCacheCommand extends HystrixCommand<Boolean> {
private final int value;
public HelloHystrixRequestCacheCommand(final int value) {
super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("hello")));
this.value = value;
}
protected Boolean run() throws Exception {
return value == 0 || value % 2 == 0;
}
@Override
protected String getCacheKey() {
return String.valueOf(value) ;
}
}
单元测试
@Test
public void testRequestCacheCommand() {
HystrixRequestContext context =HystrixRequestContext.initializeContext();
try {
HelloHystrixRequestCacheCommand command2a = new HelloHystrixRequestCacheCommand(2);
HelloHystrixRequestCacheCommandcommand2b = new HelloHystrixRequestCacheCommand(2);
assertTrue(command2a.execute());
assertFalse(command2a.isResponseFromCache());
assertTrue(command2b.execute());
assertTrue(command2b.isResponseFromCache());
} finally {
context.shutdown();
}
//重新初始化HystrixRequestContext
context = HystrixRequestContext.initializeContext();
try {
HelloHystrixRequestCacheCommand command3b = newHelloHystrixRequestCacheCommand(2);
assertTrue(command3b.execute());
assertFalse(command3b.isResponseFromCache());
} finally {
context.shutdown();
}
}
Hystrix通过实现getCacheKey方法来激活缓存机制,我们通过测试用例可以看出,command2a执行时没有命中缓存,而command2b命中缓存,直接从缓存中获取结果,接着对HystrixRequestContext对象执行shutdown,本次请求的生命周期也就随之结束了,因此缓存失效,command3b对象已经不能命中缓存结果。
RequestCollapser的实现及使用方式
使用和不使用请求合并两种场景下线程和网络连接情况:
为什么要进行请求合并?举个例子,有个矿山,每过一段时间都会生产一批矿产出来(质量为卡车载重量的1/100),卡车可以一等到矿产生产出来就马上运走矿产,也可以等到卡车装满再运走矿产,前者一次生产对应卡车一次往返,卡车需要往返100次,而后者只需要往返一次,可以大大减少卡车往返次数。显而易见,利用请求合并可以减少线程和网络连接,开发人员不必单独提供一个批量请求接口就可以完成批量请求。
在Hystrix中进行请求合并也是要付出一定代价的,请求合并会导致依赖服务的请求延迟增高,延迟的最大值是合并时间窗口的大小,默认为10ms,当然我们也可以通过hystrix.collapser.default.timerDelayInMilliseconds属性进行修改,如果请求一次依赖服务的平均响应时间是20ms,那么最坏情况下(合并窗口开始是请求加入等待队列)这次请求响应时间就会变成30ms。在Hystrix中对请求进行合并是否值得主要取决于Command本身,高并发度的接口通过请求合并可以极大提高系统吞吐量,从而基本可以忽略合并时间窗口的开销,反之,并发量较低,对延迟敏感的接口不建议使用请求合并。
请求合并执行流程:
可以看出Hystrix会把多个Command放入Request队列中,一旦满足合并时间窗口周期大小,Hystrix会进行一次批量提交,进行一次依赖服务的调用,通过充写HystrixCollapser父类的mapResponseToRequests方法,将批量返回的请求分发到具体的每次请求中。
具体使用:
public class HelloHystrixCollapserCommand extends HystrixCollapser<List<String>, String, Integer> {
private final Integer key;
public HelloHystrixCollapserCommand(Integer key) {
this.key = key;
}
@Override
public Integer getRequestArgument() {
return key;
}
@Override
protected HystrixCommand<List<String>> createCommand(Collection<CollapsedRequest<String, Integer>> collapsedRequests) {
return new BatchCommand(collapsedRequests);
}
@Override
protected void mapResponseToRequests(List<String> batchResponse, Collection<CollapsedRequest<String, Integer>> collapsedRequests) {
int count = 0;
for (CollapsedRequest<String, Integer> collapsedRequest : collapsedRequests) {
collapsedRequest.setResponse(batchResponse.get(count++));
}
}
private static final class BatchCommand extends HystrixCommand<List<String>> {
private final Collection<CollapsedRequest<String, Integer>> requests;
private BatchCommand(Collection<CollapsedRequest<String, Integer>> requests) {
super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("hello"))
.andCommandKey(HystrixCommandKey.Factory.asKey("hello_collapser")));
this.requests = requests;
}
@Override
protected List<String> run() {
ArrayList<String> response = new ArrayList<String>();
for (CollapsedRequest<String, Integer> request : requests) {
response.add("ValueForKey: " + request.getArgument());
}
return response;
}
}
}
单元测试:
@Test
public void testHelloHystrixCollapser() throws ExecutionException, InterruptedException {
HystrixRequestContext context = HystrixRequestContext.initializeContext();
try {
Future<String> f1 = new HelloHystrixCollapserCommand(1).queue();
Future<String> f2 = new HelloHystrixCollapserCommand(2).queue();
Future<String> f3 = new HelloHystrixCollapserCommand(3).queue();
Future<String> f4 = new HelloHystrixCollapserCommand(4).queue();
assertEquals("ValueForKey: 1", f1.get());
assertEquals("ValueForKey: 2", f2.get());
assertEquals("ValueForKey: 3", f3.get());
assertEquals("ValueForKey: 4", f4.get());
assertEquals(2, HystrixRequestLog.getCurrentRequest().getExecutedCommands().size());
HystrixCommand<?> command = HystrixRequestLog.getCurrentRequest()
.getExecutedCommands().toArray(new HystrixCommand<?>[1])[0];
assertEquals("hello_collapser", command.getCommandKey().name());
assertTrue(command.getExecutionEvents().contains(HystrixEventType.COLLAPSED));
assertTrue(command.getExecutionEvents().contains(HystrixEventType.SUCCESS));
} finally {
context.shutdown();
}
}
遇到的问题:
官方提供的单元测试是assertEquals(1, HystrixRequestLog.getCurrentRequest().getExecutedCommands().size()),但是在我执行的时候发现测试用例不通过,总是会生成两个Command,通过查看源码发现:
原来在RequestCollapser类中提交请求的时候,第一次请求被添加的时候Hystrix会为CollapserTimer注册一个CollapsedTask对象,立即触发一次CollapsedTask,之后每次达到定时器执行周期并且批量提交的请求数量不为0,那么会触发一次CollapsedTask,然后会调用开发者重写的createCommand方法进行Command的创建。
关于Hystrix的特性,差不多上下篇基本已经涵盖了,在实际生产环境中场景比较复杂,需要根据实际情况进行调整,达到一个最佳的使用效果。
参考
1.https://github.com/Netflix/Hystrix/wiki/How-it-Works#flow3