SpringCloud 04 —— Hystrix(上)原生使用

HyStrix 介绍

很多系统在架构的时候都需要考虑单点故障和单点故障等问题,对于一个很大的分布式系统来说服务挂掉、机器损坏、响应延迟等问题不可避免,如何减少故障对整个分布式集群的影响成为一个重要的研究课题。
那么我们如何去解决呢?Spring Cloud 提供的解决的方案是Hystrix。

Hystrix 的历史

说起Hystrix 的历史,先了解一下Netflix这家神奇的公司。
想必很多人都看过美剧《纸牌屋》,不过应该很少有人知道它的出品方就是Netflix,可以说是《纸牌屋》之父。作为一家在线影片租赁提供商,Netflix成立于1997年,在美国、加拿大提供互联网随选流媒体播放,定制DVD、蓝光光碟在线出租业务,经过多次商业模式的变革,Netflix成为了在线内容的霸主。据统计,Netflix在高峰期间的下载流量可以占到北美地区的1/3。

这里不得不提一点的是,从09年开始,Netflix逐渐把它的IT系统迁移到AWS云平台上,并开始业务的转型,从DVD租赁演变为在线视频供应商,依托于强大的AWS,这也给AWS带来了巨大挑战。

Netflix在AWS运行多年期间,总结了不少实践经验,比如必须考虑到故障的可能性,在AWS云平台上进行架构设计的一个经验法则是要作为一个悲观主义者来设计应用架构:假设会出问题。

我们知道硬件总会发生故障,服务器会发生宕机,唯一不确定的就是在什么时候发生,所以在应用架构上要进行高可用设计,比如需要有一个清晰的数据备份和恢复机制。

Netflix API团队在2011年启动了弹性工程工作,即Hystrix,旨在通过控制那些访问远程系统、服务和第三方库的节点,从而对延迟和故障提供更强大的容错能力,提供了熔断、隔离、Fallback、cache、监控等功能,能够在一个、或多个依赖同时出现问题时保证系统依然可用,目前它在Netflix每天处理着数百亿的隔离线程以及数千亿的隔离信号调用。

Hystrix是基于Apache License 2.0协议的开源的程序库,目前托管在GitHub上。Spring Cloud 目前生态里目前已经集成了Hystrix,所以我们我可以更方便的使用Hystrix 相关的API。

Hystrix的作用是什么?

Hystrix旨在执行以下操作:

  • 提供保护并控制通过第三方客户端库访问(通常是通过网络)的依赖项带来的延迟和失败。
  • 停止复杂的分布式系统中的级联故障。
  • 快速失败并快速恢复。
  • 回退并在可能的情况下正常降级。
  • 启用近乎实时的监视,警报和操作控制。

Hystrix解决什么问题?

复杂分布式体系结构中的应用程序具有数十种依赖关系,每种依赖关系不可避免地会在某个时刻失败。如果主机应用程序未与这些外部故障隔离开来,则可能会被淘汰。

例如,对于依赖于30个服务的应用程序,其中每个服务的正常运行时间为99.99%,您可以期望:

99.9930= 99.7%的正常运行时间
10亿个请求中的0.3%= 3,000,000个故障
/每月2小时以上的停机时间,即使所有依赖项都具有出色的正常运行时间。

现实通常更糟。

即使您没有对整个系统进行永续性设计,即使所有依赖项都能很好地执行,即使0.01%的停机时间对数十种服务中的每一项的总体影响,也可能相当于每月停机数小时。

当一切正常时,请求流如下所示:

当许多后端系统之一变得潜在时,它可以阻止整个用户请求:

随着高流量,单个后端依赖关系变得潜在,这可能导致所有服务器上的所有资源在几秒钟内变得饱和。

应用程序中可能会导致网络请求通过网络或客户端库传播的每个点都是潜在失败的根源。比故障更糟糕的是,这些应用程序还可能导致服务之间的延迟增加,从而备份队列,线程和其他系统资源,从而导致整个系统出现更多级联故障。

当通过第三方客户端执行网络访问时,这些问题会更加严重。第三方客户端是一个“黑匣子”,实现细节被隐藏并且可以随时更改,并且每个客户端库的网络或资源配置都不相同,并且通常难以监控和更改。

更糟糕的是,传递依赖关系会执行潜在的昂贵或易出错的网络调用,而不会被应用程序明确调用。

网络连接失败或降级。服务和服务器出现故障或变慢。新的库或服务部署会更改行为或性能特征。客户端库有错误。

所有这些都代表需要隔离和管理的故障和延迟,以使单个故障依赖项无法关闭整个应用程序或系统。

Hystrix的工作原理

  • 防止任何单个依赖项耗尽所有容器(例如Tomcat)用户线程。
  • 减少负载并快速失败,而不是排队。
  • 在可行的情况下提供备用,以保护用户免受故障的影响。
  • 使用隔离技术(例如隔板,泳道和断路器模式)来限制任何一种依赖关系的影响。
  • 通过近实时指标,监视和警报优化发现时间
  • 通过在Hystrix的大多数方面中以低延迟传播配置更改来优化恢复时间,并支持动态属性更改,这使您可以通过低延迟反馈回路进行实时操作修改。
  • 防止整个依赖性客户端执行失败,而不仅仅是网络流量失败。

Hystrix通过以下方式做到这几点:

  • 将对外部系统(或“依赖项”)的所有调用包装在通常在单独线程中执行的HystrixCommand或HystrixObservableCommand对象中(这是命令模式的示例)。
  • 超时呼叫花费的时间超过您定义的阈值。有一个默认的,而是由“属性”,使它们比测量的99.5略高的方式对大多数依赖你自定义设置这些超时个百分点每个依存性的性能。
  • 为每个依赖项维护一个小的线程池(或信号灯);如果已满,发往该依赖项的请求将立即被拒绝,而不是排队。
  • 测量成功,失败(客户端抛出的异常),超时和线程拒绝。
  • 如果该服务的错误百分比超过阈值,则使断路器跳闸,以在一段时间内手动或自动停止所有对特定服务的请求。
  • 当请求失败,被拒绝,超时或短路时执行回退逻辑。
  • 几乎实时监控指标和配置更改。

使用Hystrix封装每个基础依赖项时,如上图所示的体系结构将更改为类似于下图。每个依赖项彼此隔离,受到延迟时发生饱和的资源的限制,并包含回退逻辑,该逻辑决定了在依赖项中发生任何类型的故障时做出什么响应:

非SpringCloud

Hystrix 基础使用

新建项目cloud-hystrix-without-spring(此项目是不在Spring Cloud下运行的,仅仅演示Hystrix的特性,后续会专门讲解和Spring Cloud的使用。)

导入maven依赖:

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <!-- https://mvnrepository.com/artifact/com.netflix.hystrix/hystrix-core -->
        <dependency>
            <groupId>com.netflix.hystrix</groupId>
            <artifactId>hystrix-core</artifactId>
            <version>1.5.18</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/commons-configuration/commons-configuration -->
        <dependency>
            <groupId>commons-configuration</groupId>
            <artifactId>commons-configuration</artifactId>
            <version>1.10</version>
        </dependency>
    </dependencies>

编写一个HystrixCommand的实现类,通过设置一个Groupkey。具体的逻辑卸载run()方法中,并在方法中输出当前的线程名,本节我们都将通过main()方法调用。

命令组名称(Groupkey) 是必传的,默认情况下全局维护的线程池Map以该值作为Key,该Map的Value为执行命令的线程池。

public class HystrixCommond extends HystrixCommand<String> {

    private final String name;

    protected HystrixCommond(String name) {
        //创建一个组名
        super(HystrixCommandGroupKey.Factory.asKey("myGroup"));
        this.name = name;
    }

    @Override
    protected String run() throws Exception {
        return this.name + ":" + Thread.currentThread().getName();
    }

    //    非异步执行
//    public static void main(String[] args) {
//        String test = new FwHystrixCommond("test").execute();
//        System.out.println(test);
//    }
//    异步执行
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        Future<String> test = new HystrixCommond("test").queue();
        System.out.println(test.get());
    }
}

右键运行之后,可以看到正常的输出,并且我们的组名变成了线程的名字test:hystrix-myGroup-1

Hystrix 回退

上面的简单使用可能很多人觉得有点懵逼,哎,这玩意跟Hystrix有啥关系?请不要懵逼,下面一步一步介绍,请耐心。

在实现HystrixCommand方法中,有一个方getFallback()法我们需要重写,当系统断路器被打开时、实际执行命令失败时等就会触发这个方法。我们可以在这个方法中定义自己的回退逻辑。

新建HystrixCommondFallBack类

我们在run()方法中设置延迟10秒钟,web默认1秒钟没有响应寄回发生超时异常。因此在运行的时候回触发getFallback()方法

public class HystrixCommondFallBack extends HystrixCommand<String> {

    private final String name;

    protected HystrixCommondFallBack(String name) {
        super(HystrixCommandGroupKey.Factory.asKey("myGroup"));
        this.name = name;
    }

    @Override
    protected String run() {
        try {
            Thread.sleep(10000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return this.name + ":" + Thread.currentThread().getName();
    }

    /**
     * 默认情况下调用没在1秒内响应,就会触发回退
     *
     * @return
     */
    @Override
    protected String getFallback() {
        return "当前调用失败";
    }

    public static void main(String[] args) {
        String test = new HystrixCommondFallBack("test").execute();
        System.out.println(test);
    }
}

运行main 方法可以看到运行后输出了已下日志:当前调用失败

Hystrix 信号量隔离

Hystrix提供了两种隔离机制,信号量(SEMAPHORE)线程(THREAD)隔离,我们先看信号量的隔离
当请求的并发数高于设定的阀值时,就不会再执行命令。相对于线程池机制,信号量的开销较小,但是信号量机制不支持超时和异步,除非对调用的服务有足够的信任,否则不建议使用信号量机制进行隔离。

注意:如果依赖关系被信号量隔离,然后变为潜在状态,则父线程将保持阻塞状态,直到基础网络调用超时为止。
一旦达到限制,信号灯拒绝将开始,但是填充信号量的线程无法得到释放.

接下来我们在代码中看看信号量机制怎么进行隔离的

新建HystrixCommondSemaphore类
需要在构造方法中指定SEMAPHORE机制,这里我们用默认的最大并发数(10)和回退并发数(10),开20线程进行测试。

@Slf4j
public class HystrixCommondSemaphore extends HystrixCommand<String> {
    private final String name;

    protected HystrixCommondSemaphore(String name) {
        super(HystrixCommand.Setter
                .withGroupKey(HystrixCommandGroupKey.Factory.asKey("myGroup"))
                .andCommandPropertiesDefaults(
                        HystrixCommandProperties.Setter().withExecutionIsolationStrategy(
                                HystrixCommandProperties.ExecutionIsolationStrategy.SEMAPHORE
                        )
                ));
        this.name = name;
    }

    @Override
    protected String getFallback() {
        log.info(this.name + ":" + Thread.currentThread().getName() + "异常");
        return this.name + ":" + Thread.currentThread().getName();
    }

    @Override
    protected String run() throws Exception {
        log.info(this.name + ":" + Thread.currentThread().getName() + "成功");
        return this.name + ":" + Thread.currentThread().getName();
    }


    public static void main(String[] args) throws ExecutionException, InterruptedException {
        for (int i = 0; i < 20; i++) {
            final int index = i;
            Thread t = new Thread() {
                @Override
                public void run() {
                    HystrixCommondSemaphore test = new HystrixCommondSemaphore("test" + index);
                    test.execute();
                }
            };
            t.start();
        }
        Thread.sleep(5000);
    }
}

运行.

Hystrix 线程隔离

前面说了Hystrix 隔离机制中的一种信号量机制,现在我们来说一下线程隔离的机制。

线程池隔离的优缺点

线程池的好处

  • 应用程序会被完全保护起来,即使依赖的一个服务的线程池满了,也不会影响到应用程序的其他部分。
  • 我们给应用程序引入一个新的风险较低的客户端lib的时候,如果发生问题,也是在本lib中,并不会影响到其他内容,因此我们可以大胆的引入新lib库。
  • 当依赖的一个失败的服务恢复正常时,应用程序会立即恢复正常的性能。
  • 如果我们的应用程序一些参数配置错误了,线程池的运行状况将会很快显示出来,比如延迟、超时、拒绝等。同时可以通过动态属性实时执行来处理纠正错误的参数配置。
  • 如果服务的性能有变化,从而需要调整,比如增加或者减少超时时间,更改重试次数,就可以通过线程池指标动态属性修改,而且不会影响到其他调用请求。
  • 除了隔离优势外,hystrix拥有专门的线程池可提供内置的并发功能,使得可以在同步调用之上构建异步的外观模式,这样就可以很方便的做异步编程(Hystrix引入了Rxjava异步框架)

注意:尽管有单独的线程提供了隔离,但您的基础客户端代码也应具有超时和/或对线程中断的响应,因此它不能无限期地阻塞并使Hystrix线程池饱和。

线程池的缺点

线程池的主要缺点是它们增加了计算开销。每个命令执行都涉及在单独的线程上运行命令所涉及的队列,调度和上下文切换。
Netflix在设计此系统时,决定接受此方式,以换取其提供的好处,并认为它很小,不会对成本或性能造成重大影响。

线程成本

Hystrix测量在子线程上执行construct()orrun()方法时的延迟以及父线程上的总的端到端时间。这样,您可以看到Hystrix开销(线程,度量,日志记录,断路器等)的成本。

Netflix API使用线程隔离每天处理10+亿次Hystrix Command执行。每个API实例有40多个线程池,每个线程池中有5-20个线程(大多数设置为10)。

下图表示一个HystrixCommand在单个API实例上以每秒60个请求的速度执行的情况(每个服务器每秒约350个线程执行总数):

新建HystrixCommondThread 类

在这里我们需要设置隔离级别为THREAD,并且为线程池的大小为3,但是在代码里面运行了6个线程,所以只可能有3个成功,剩下的回退。

@Slf4j
public class HystrixCommondThread extends HystrixCommand<String> {
    private final String name;

    protected HystrixCommondThread(String name) {
        super(Setter
                .withGroupKey(HystrixCommandGroupKey.Factory.asKey("myGroup"))
                .andCommandPropertiesDefaults(
                        HystrixCommandProperties.Setter().withExecutionIsolationStrategy(
                                HystrixCommandProperties.ExecutionIsolationStrategy.THREAD
                        )
                ).andThreadPoolPropertiesDefaults(
                        HystrixThreadPoolProperties.Setter()
                                .withCoreSize(3)
                ));
        this.name = name;
    }

    @Override
    protected String getFallback() {
        log.info(this.name + ":" + Thread.currentThread().getName() + "异常");
        return this.name + ":" + Thread.currentThread().getName();
    }

    @Override
    protected String run() throws Exception {
        log.info(this.name + ":" + Thread.currentThread().getName() + "成功");
        return this.name + ":" + Thread.currentThread().getName();
    }

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        for (int i = 0; i < 6; i++) {
            final int index = i;
            Thread t = new Thread() {
                @Override
                public void run() {
                    HystrixCommondThread test = new HystrixCommondThread("test" + index);
                    test.execute();
                }
            };
            t.start();
        }
        Thread.sleep(5000);
    }
}

其他参数可自行了解:

Hystrix 结果缓存

缓存的在开发中经常被使用,不管是本地缓存、Redis缓存、Guava、ECache、MemCache等第三方缓存,在目前的目分布式系统中,尤其是对并发要求高的系统,缓存所占的地位非常重要。

hystrix支持将一个请求结果缓存起来,下一个具有相同key的请求将直接从缓存中取出结果,减少请求开销。要使用hystrix cache功能

  1. 重写getCacheKey(),用来构造cache key;
  2. 构建context,如果请求B要用到请求A的结果缓存,A和B必须同处一个context

通过HystrixRequestContext.initializeContext()和context.shutdown()可以构建一个context,这两条语句间的所有请求都处于同一个context。

新建HystrixCommondCache类这里面我们重写了getCacheKey()方法,并已穿进去的参数作为缓存的key,我在run()方法中加了一个log日志,方便看到run()方法被调用到了几次

@Slf4j
public class HystrixCommondCache extends HystrixCommand<String> {
    private final String name;

    protected HystrixCommondCache(String name) {
        super(HystrixCommandGroupKey.Factory.asKey("myGrop"));
        this.name = name;
    }

    @Override
    protected String run() {
        log.info("get data,{}", this.name);
        return this.name + ":" + Thread.currentThread().getName();
    }

    @Override
    protected String getCacheKey() {
        return this.name;
    }


    public static void main(String[] args) throws ExecutionException, InterruptedException {
        HystrixRequestContext context = HystrixRequestContext.initializeContext();
        for (int i = 0; i < 5; i++) {
            HystrixCommondCache test = new HystrixCommondCache("test");
            log.info(test.execute());
        }
        context.shutdown();
    }
}

运行main方法可以看到run()方法只被执行了一次

Hystrix 清除缓存

有缓存设置就必然会有缓存清除,当代数据发生变动的时候,需要清除缓存,不让会造成脏数据。

新建HystrixCommondFlushCache类方法里面设置了HystrixCommandKey 用来指明删除缓存的Key。通过手动执行HystrixCommondFlushCache.flushCache("test");可以实现将缓存删除

@Slf4j
public class HystrixCommondFlushCache extends HystrixCommand<String> {

    public static final HystrixCommandKey TEST_KEY = HystrixCommandKey.Factory.asKey("TestKey");
    private final String name;

    protected HystrixCommondFlushCache(String name) {
        super(HystrixCommand.Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("myGroup"))
                .andCommandKey(TEST_KEY));
        this.name = name;
    }

    @Override
    protected String run() {
        log.info("get data,{}", this.name);
        return this.name + ":" + Thread.currentThread().getName();
    }

    /**
     * 清理缓存
     *
     * @param name
     */
    private static void flushCache(String name) {
        HystrixRequestCache.getInstance(TEST_KEY,
                HystrixConcurrencyStrategyDefault.getInstance()).clear(name);
    }

    @Override
    protected String getCacheKey() {
        return this.name;
    }


    public static void main(String[] args) throws ExecutionException, InterruptedException {
        HystrixRequestContext context = HystrixRequestContext.initializeContext();
        for (int i = 0; i < 5; i++) {
            HystrixCommondFlushCache test = new HystrixCommondFlushCache("test");
            log.info(test.execute());
            HystrixCommondFlushCache.flushCache("test");
        }
        context.shutdown();
    }
}

执行main方法,如果我们在将方法HystrixCommondFlushCache.flushCache("test");注释掉,可以看到run()方法仍然被执行一次,因此清缓存OK

Hystrix 断路器开启

在命令结果没有缓存命中的时候,Hystrix在执行命令前需要检查断路器是否为打开状态:
如果断路器是打开的,那么Hystrix不会执行命令,而是转接到fallback处理逻辑.
如果断路器是关闭的,那么Hystrix调到第5步(线程池/请求队列/信号量是否占满),检查是否有可用资源来执行命令

断路器开启:

  1. 整个链路达到一定的阈值,默认情况下,10秒内产生超过20次请求,则符合第一个条件
  2. 满足第一个条件的情况下,如果请求的错误百分比大于阈值,则会打开断路器,默认50%

断路器一旦开启就会执行回退方法,不在执行目标方法,而且也不会更新链路的健康信息。

下图显示了通过Hystrix向服务依赖项请求时发生的情况

  1. 构造一个HystrixCommand或HystrixObservableCommand对象
  2. 执行命令
  3. 响应是否已缓存?
  4. 断路器开了吗?
  5. 线程池/队列/信号量是否已满?
  6. HystrixObservableCommand.construct() 要么 HystrixCommand.run()
  7. 计算电路健康
  8. 获取后备
  9. 返回成功的回应

接下来我们来开启断路器.新建类HystrixCommondCircuitEnable,在代码里设置了10秒内有10次请求,操作这个即满足第一个条件,设置操作时间为500毫秒,但是在run()方法中sleep(800),这样请求都会超时。开启断路器

@Slf4j
public class HystrixCommondCircuitEnable {

    public static void main(String[] args) {
        //10秒内有10次请求满足第一个条件
        ConfigurationManager.getConfigInstance().setProperty(
                "hystrix.command.default.circuitBreaker.requestVolumeThreshold", 10);
        for (int i = 0; i < 15; i++) {
            ErrorCommand c = new ErrorCommand();
            c.execute();
            if (c.isCircuitBreakerOpen()) {
                log.info("当前断路器被打开,在第{}索引", i);
            }
        }
    }

    static class ErrorCommand extends HystrixCommand<String> {
        public ErrorCommand() {
            super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("ExampleGroup"))
                    .andCommandPropertiesDefaults(HystrixCommandProperties.Setter().withExecutionTimeoutInMilliseconds(500)));
        }

        protected String run() throws InterruptedException {
            Thread.sleep(800);
            return "success";
        }

        protected String getFallback() {
            return "fallback";
        }
    }
}

运行main方法可以看到从第10 个索引开始,断路器被打开.

Hystrix 断路器关闭

上面我们分析了断路器的开启,那么断路器的状态是如何变化的,怎样让断路器关闭已实现正常的系统访问。

断路器有三个状态 :OPEN、 CLOSED(默认状态) 、HALF_OPEN状态,这些状态在什么情况下出现呢?

当断路器打开后,对应接口的请求会有段休眠期,这个休眠期内接口请求不会被正真的执行,但是如果休眠期时间过了,
这个时候断路器的状态就到了HALF_OPEN状态,这个时候断路器允许一次真实的接口请求,如果这次请求失败,则断路
器打开OPEN,循环上面的动作,如果请求成功则断路器关CLOSED。

enum Status {
        CLOSED, OPEN, HALF_OPEN;
    }

新建HystrixCommondCircuitClose类,这里设置一个变量,初始化为true,并且在run()方法中我们将根据传入的值设置时间,先让其超时,开启断路器,然后休眠6秒后,调用的时间减少值不超时。断路器关闭,并且10内满足3个请求就会触发断路器第一个条件。

@Slf4j
public class HystrixCommondCircuitClose {
    public static void main(String[] args) throws InterruptedException {
        ConfigurationManager.getConfigInstance().setProperty(
                "hystrix.command.default.circuitBreaker.requestVolumeThreshold", 3);
        boolean isTimeout = true;
        for (int i = 0; i < 10; i++) {
            TestCommand c = new TestCommand(isTimeout);
            c.execute();

            HystrixCommandMetrics.HealthCounts hc = c.getMetrics().getHealthCounts();
            System.out.println("健康信息:" + hc.getTotalRequests());
            if (c.isCircuitBreakerOpen()) {
                isTimeout = false;
                log.info("断路器打开了,第{}索引,等待休眠期结束", i);
                log.info("休眠6秒");
                Thread.sleep(6000);
            }
        }
    }

    static class TestCommand extends HystrixCommand<String> {
        private boolean isTimeout;

        public TestCommand(boolean isTimeout) {
            super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("ExampleGroup"))
                    .andCommandPropertiesDefaults(HystrixCommandProperties.Setter().withExecutionTimeoutInMilliseconds(500)));
            this.isTimeout = isTimeout;
        }

        protected String run() throws InterruptedException {
            if (isTimeout) {
                Thread.sleep(800);
            } else {
                Thread.sleep(200);
            }
            return "";
        }

        protected String getFallback() {
            return "fallback";
        }
    }
}

运行main方法

Hystrix 合并请求

从前面的内容可以得知,Hystrix 会为方法执行分配线程,线程的切换会消耗服务器的性能,Hystrix 提供了合并请求的功能,再一次请求的过程中,可以将一段时间内相同的请求合并到一个命令中执行,方法中允许不同的参数。合并后可以减少网络的请求,进而提升性。

合并请求需要实现以下功能:

  • 可以整理请求过来的参数
  • 可以将多个请求合并的处理器

新建HystrixCollapser类这里我们需要实现集成HystrixCollapser抽象类,并实现里面的方法,按照上面的设想,我们需要实现请求的合并和参数的整理,以实现最终只调用一次请求。

@Slf4j
public class HystrixCollapser extends com.netflix.hystrix.HystrixCollapser<List<String>, String, String> {

    private final String name;

    public HystrixCollapser(String name) {
        this.name = name;
    }

    @Override
    public String getRequestArgument() {
        return this.name;
    }

    @Override
    protected HystrixCommand<List<String>> createCommand(Collection<CollapsedRequest<String, String>> collapsedRequests) {
        return new HystrixCollapser.BatchCommand(collapsedRequests);
    }

    @Override
    protected void mapResponseToRequests(List<String> batchResponse, Collection<CollapsedRequest<String, String>> collapsedRequests) {
        int count=0;
        for (CollapsedRequest<String, String> request : collapsedRequests) {
            request.setResponse(batchResponse.get(count++));
        }
    }
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        HystrixRequestContext context = HystrixRequestContext.initializeContext();
        Future<String> tesFuture1 = new HystrixCollapser("test1").queue();
        Future<String> tesFuture2 = new HystrixCollapser("test2").queue();
        log.info(tesFuture1.get());
        log.info(tesFuture2.get());
        context.shutdown();
    }

    private static final class BatchCommand extends HystrixCommand<List<String>> {

        private Collection<CollapsedRequest<String, String>> requests;

        protected BatchCommand(Collection<CollapsedRequest<String, String>> requests) {
            super(Setter.withGroupKey(
                    HystrixCommandGroupKey.Factory.asKey("testGroup")
                    ).andCommandKey(
                    HystrixCommandKey.Factory.asKey("testKey")
                    )
            );
            this.requests=requests;
        }

        @Override
        protected List<String> run() throws Exception {
            log.info("real request");
            List<String> response=new ArrayList<>();
            for (CollapsedRequest<String, String> request : requests) {
                response.add("result:"+request.getArgument());
            }
            return response;
        }
    }
}

运行main方法可以看到run()方法只执行了一次

更新时间:2020-04-15 21:09:05

本文由 寻非 创作,如果您觉得本文不错,请随意赞赏
采用 知识共享署名4.0 国际许可协议进行许可
本站文章除注明转载/出处外,均为本站原创或翻译,转载前请务必署名
原文链接:https://www.zhouning.group/archives/springcloud04hystrix自我保护
最后更新:2020-04-15 21:09:05

评论

Your browser is out of date!

Update your browser to view this website correctly. Update my browser now

×