Soul网关Hystrix插件6


前面几篇都聊了Hystrix的基本原理,今天回归主题,看看soul是如何使用Hystrix的。

首先使用Hystrix就要定义Command,嘿嘿,一看,果然soul里面定义了Command。

public interface Command {
    /**
     * wrap fetch Observable in {@link HystrixCommand} and {@link HystrixCommandOnThread}.
     *
     * @return {@code Observable<R>} that executes and calls back with the result of command execution
     *         or a fallback if the command fails for any reason.
     */
    Observable<Void> fetchObservable();

    /**
     * whether the 'circuit-breaker' is open.
     *
     * @return boolean
     */
    boolean isCircuitBreakerOpen();

    /**
     * generate a error when some error occurs.
     *
     * @param exchange  the exchange
     * @param exception exception instance
     * @return error which be wrapped by {@link SoulResultWrap}
     */
    default Object generateError(ServerWebExchange exchange, Throwable exception) {
        Object error;
        if (exception instanceof HystrixRuntimeException) {
            HystrixRuntimeException e = (HystrixRuntimeException) exception;
            if (e.getFailureType() == HystrixRuntimeException.FailureType.TIMEOUT) {
                exchange.getResponse().setStatusCode(HttpStatus.GATEWAY_TIMEOUT);
                error = SoulResultWrap.error(SoulResultEnum.SERVICE_TIMEOUT.getCode(), SoulResultEnum.SERVICE_TIMEOUT.getMsg(), null);
            } else {
                exchange.getResponse().setStatusCode(HttpStatus.INTERNAL_SERVER_ERROR);
                error = SoulResultWrap.error(SoulResultEnum.SERVICE_RESULT_ERROR.getCode(), SoulResultEnum.SERVICE_RESULT_ERROR.getMsg(), null);
            }
        } else if (exception instanceof HystrixTimeoutException) {
            exchange.getResponse().setStatusCode(HttpStatus.GATEWAY_TIMEOUT);
            error = SoulResultWrap.error(SoulResultEnum.SERVICE_TIMEOUT.getCode(), SoulResultEnum.SERVICE_TIMEOUT.getMsg(), null);
        } else {
            exchange.getResponse().setStatusCode(HttpStatus.INTERNAL_SERVER_ERROR);
            error = SoulResultWrap.error(SoulResultEnum.SERVICE_RESULT_ERROR.getCode(), SoulResultEnum.SERVICE_RESULT_ERROR.getMsg(), null);
        }
        return error;
    }

    /**
     * do fall back when some error occurs on hystrix execute.
     * @param exchange {@link ServerWebExchange}
     * @param exception {@link Throwable}
     * @return {@code Mono<Void>} to indicate when request processing is complete.
     */
    default Mono<Void> doFallback(ServerWebExchange exchange, Throwable exception) {
        if (Objects.isNull(getCallBackUri())) {
            Object error;
            error = generateError(exchange, exception);
            return WebFluxResultUtils.result(exchange, error);
        }
        DispatcherHandler dispatcherHandler =
            SpringBeanUtils.getInstance().getBean(DispatcherHandler.class);
        ServerHttpRequest request = exchange.getRequest().mutate().uri(getCallBackUri()).build();
        ServerWebExchange mutated = exchange.mutate().request(request).build();
        return dispatcherHandler.handle(mutated);
    }

    /**
     * get call back uri.
     * @return when some error occurs in hystrix invoke it will forward to this
     */
    URI getCallBackUri();

}

soul定义了一个Command接口,定义了fetchObservable()、isCircuitBreakerOpen()、getCallBackUri()三个接口,generateError()、doFallback()两个默认方法。fetchObservable()用来包装command的执行。isCircuitBreakerOpen()用来判断熔断器是否打开。getCallBackUri()用来获取快速失败回退降级的url。generateError()判断了失败的类型返回一个响应。doFallback()方法则是调用远程降级回退接口获取响应。

HystrixCommand、HystrixCommandOnThread两个Command都实现了Command接口,并分别继承了Hystrix的HystrixObservableCommand、HystrixCommand。

public class HystrixCommandOnThread extends HystrixCommand<Mono<Void>> implements Command {

    private final ServerWebExchange exchange;

    private final SoulPluginChain chain;

    private final URI callBackUri;

    /**
     * Instantiates a new Http command.
     *
     * @param setter      the setter
     * @param exchange    the exchange
     * @param chain       the chain
     * @param callBackUri the call back uri
     */
    public HystrixCommandOnThread(final HystrixCommand.Setter setter,
                          final ServerWebExchange exchange,
                          final SoulPluginChain chain,
                          final String callBackUri) {
        super(setter);
        this.exchange = exchange;
        this.chain = chain;
        this.callBackUri = UriUtils.createUri(callBackUri);
    }

    @Override
    protected Mono<Void> run() {
        RxReactiveStreams.toObservable(chain.execute(exchange)).toBlocking().subscribe();
        return Mono.empty();
    }

    @Override
    protected Mono<Void> getFallback() {
        if (isFailedExecution()) {
            log.error("hystrix execute have error: ", getExecutionException());
        }
        final Throwable exception = getExecutionException();
        return doFallback(exchange, exception);
    }

    @Override
    public Observable<Void> fetchObservable() {
        return RxReactiveStreams.toObservable(this.execute());
    }

    @Override
    public URI getCallBackUri() {
        return callBackUri;
    }
}

HystrixCommand、HystrixCommandOnThread的区别就是HystrixObservableCommand、HystrixCommand两者的区别。HystrixCommandOnThread类Hystrix会从线程池中取一个线程以非阻塞方式执行run(),调用线程不必等待run();HystrixCommand类将以调用线程堵塞执行construct(),调用线程需等待construct()执行完才能继续往下走。

找到了Command命令,那么接下来应该看看是如何配置Command的。

public class HystrixBuilder {

    /**
     * this is build HystrixObservableCommand.Setter.
     *
     * @param hystrixHandle {@linkplain HystrixHandle}
     * @return {@linkplain HystrixObservableCommand.Setter}
     */
    public static HystrixObservableCommand.Setter build(final HystrixHandle hystrixHandle) {
        initHystrixHandleOnRequire(hystrixHandle);
        HystrixCommandGroupKey groupKey = HystrixCommandGroupKey.Factory.asKey(hystrixHandle.getGroupKey());
        HystrixCommandKey commandKey = HystrixCommandKey.Factory.asKey(hystrixHandle.getCommandKey());
        HystrixCommandProperties.Setter propertiesSetter =
                HystrixCommandProperties.Setter()
                        .withExecutionTimeoutInMilliseconds((int) hystrixHandle.getTimeout())
                        .withCircuitBreakerEnabled(true)
                        .withExecutionIsolationStrategy(HystrixCommandProperties.ExecutionIsolationStrategy.SEMAPHORE)
                        .withExecutionIsolationSemaphoreMaxConcurrentRequests(hystrixHandle.getMaxConcurrentRequests())
                        .withCircuitBreakerErrorThresholdPercentage(hystrixHandle.getErrorThresholdPercentage())
                        .withCircuitBreakerRequestVolumeThreshold(hystrixHandle.getRequestVolumeThreshold())
                        .withCircuitBreakerSleepWindowInMilliseconds(hystrixHandle.getSleepWindowInMilliseconds());
        return HystrixObservableCommand.Setter
                .withGroupKey(groupKey)
                .andCommandKey(commandKey)
                .andCommandPropertiesDefaults(propertiesSetter);
    }

    /**
     * this is build HystrixCommand.Setter.
     * @param hystrixHandle {@linkplain HystrixHandle}
     * @return {@linkplain HystrixCommand.Setter}
     */
    public static HystrixCommand.Setter buildForHystrixCommand(final HystrixHandle hystrixHandle) {
        initHystrixHandleOnRequire(hystrixHandle);
        HystrixCommandGroupKey groupKey = HystrixCommandGroupKey.Factory.asKey(hystrixHandle.getGroupKey());
        HystrixCommandKey commandKey = HystrixCommandKey.Factory.asKey(hystrixHandle.getCommandKey());
        HystrixCommandProperties.Setter propertiesSetter =
                HystrixCommandProperties.Setter()
                        .withExecutionTimeoutInMilliseconds((int) hystrixHandle.getTimeout())
                        .withCircuitBreakerEnabled(true)
                        .withCircuitBreakerErrorThresholdPercentage(hystrixHandle.getErrorThresholdPercentage())
                        .withCircuitBreakerRequestVolumeThreshold(hystrixHandle.getRequestVolumeThreshold())
                        .withCircuitBreakerSleepWindowInMilliseconds(hystrixHandle.getSleepWindowInMilliseconds());
        HystrixThreadPoolConfig hystrixThreadPoolConfig = hystrixHandle.getHystrixThreadPoolConfig();
        HystrixThreadPoolProperties.Setter threadPoolPropertiesSetter =
                HystrixThreadPoolProperties.Setter()
                        .withCoreSize(hystrixThreadPoolConfig.getCoreSize())
                        .withMaximumSize(hystrixThreadPoolConfig.getMaximumSize())
                        .withMaxQueueSize(hystrixThreadPoolConfig.getMaxQueueSize())
                        .withKeepAliveTimeMinutes(hystrixThreadPoolConfig.getKeepAliveTimeMinutes())
                        .withAllowMaximumSizeToDivergeFromCoreSize(true);
        return HystrixCommand.Setter
                .withGroupKey(groupKey)
                .andCommandKey(commandKey)
                .andCommandPropertiesDefaults(propertiesSetter)
                .andThreadPoolPropertiesDefaults(threadPoolPropertiesSetter);
    }

    private static void initHystrixHandleOnRequire(final HystrixHandle hystrixHandle) {
        if (hystrixHandle.getMaxConcurrentRequests() == 0) {
            hystrixHandle.setMaxConcurrentRequests(Constants.MAX_CONCURRENT_REQUESTS);
        }
        if (hystrixHandle.getErrorThresholdPercentage() == 0) {
            hystrixHandle.setErrorThresholdPercentage(Constants.ERROR_THRESHOLD_PERCENTAGE);
        }
        if (hystrixHandle.getRequestVolumeThreshold() == 0) {
            hystrixHandle.setRequestVolumeThreshold(Constants.REQUEST_VOLUME_THRESHOLD);
        }
        if (hystrixHandle.getSleepWindowInMilliseconds() == 0) {
            hystrixHandle.setSleepWindowInMilliseconds(Constants.SLEEP_WINDOW_INMILLISECONDS);
        }
        if (Objects.isNull(hystrixHandle.getHystrixThreadPoolConfig())) {
            hystrixHandle.setHystrixThreadPoolConfig(new HystrixThreadPoolConfig());
        }
        HystrixThreadPoolConfig hystrixThreadPoolConfig = hystrixHandle.getHystrixThreadPoolConfig();
        if (hystrixThreadPoolConfig.getCoreSize() == 0) {
            hystrixThreadPoolConfig.setCoreSize(Constants.HYSTRIX_THREAD_POOL_CORE_SIZE);
        }
        if (hystrixThreadPoolConfig.getMaximumSize() == 0) {
            hystrixThreadPoolConfig.setMaximumSize(Constants.HYSTRIX_THREAD_POOL_MAX_SIZE);
        }
        if (hystrixThreadPoolConfig.getMaxQueueSize() == 0) {
            hystrixThreadPoolConfig.setMaxQueueSize(Constants.HYSTRIX_THREAD_POOL_QUEUE_SIZE);
        }
    }
}

因为Command的配置很多,所以此处使用了建造者模式通过建造者进行配置。好了,前面学到的两种资源隔离模式来了。两个build方法分别构建了线程隔离模式、信号量隔离模式。而其它的配置全部来自admin同步过来的数据配置。

万事俱备只欠东风,Hystrix的东西都齐活了,那就等上帝之手把它串联运用起来了。HystrixPlugin就是上帝之手。所有的逻辑都在doExecute中进行串联。

public class HystrixPlugin extends AbstractSoulPlugin {

    @Override
    protected Mono<Void> doExecute(final ServerWebExchange exchange, final SoulPluginChain chain, final SelectorData selector, final RuleData rule) {
        final SoulContext soulContext = exchange.getAttribute(Constants.CONTEXT);
        assert soulContext != null;
        final HystrixHandle hystrixHandle = GsonUtils.getInstance().fromJson(rule.getHandle(), HystrixHandle.class);
        if (StringUtils.isBlank(hystrixHandle.getGroupKey())) {
            hystrixHandle.setGroupKey(Objects.requireNonNull(soulContext).getModule());
        }
        if (StringUtils.isBlank(hystrixHandle.getCommandKey())) {
            hystrixHandle.setCommandKey(Objects.requireNonNull(soulContext).getMethod());
        }
        Command command = fetchCommand(hystrixHandle, exchange, chain);
        return Mono.create(s -> {
            Subscription sub = command.fetchObservable().subscribe(s::success,
                    s::error, s::success);
            s.onCancel(sub::unsubscribe);
            if (command.isCircuitBreakerOpen()) {
                log.error("hystrix execute have circuitBreaker is Open! groupKey:{},commandKey:{}", hystrixHandle.getGroupKey(), hystrixHandle.getCommandKey());
            }
        }).doOnError(throwable -> {
            log.error("hystrix execute exception:", throwable);
            exchange.getAttributes().put(Constants.CLIENT_RESPONSE_RESULT_TYPE, ResultEnum.ERROR.getName());
            chain.execute(exchange);
        }).then();
    }

    private Command fetchCommand(final HystrixHandle hystrixHandle, final ServerWebExchange exchange, final SoulPluginChain chain) {
        if (hystrixHandle.getExecutionIsolationStrategy() == HystrixIsolationModeEnum.SEMAPHORE.getCode()) {
            return new HystrixCommand(HystrixBuilder.build(hystrixHandle),
                exchange, chain, hystrixHandle.getCallBackUri());
        }
        return new HystrixCommandOnThread(HystrixBuilder.buildForHystrixCommand(hystrixHandle),
            exchange, chain, hystrixHandle.getCallBackUri());
    }

    @Override
    public String named() {
        return PluginEnum.HYSTRIX.getName();
    }

    @Override
    public int getOrder() {
        return PluginEnum.HYSTRIX.getCode();
    }

}

doExcuete中根据配置创建一个基于线程隔离还是信号量隔离的Command,执行command的fetchObservable()方法。这样就把Hystrix使用起来了。

问题

default Mono<Void> doFallback(ServerWebExchange exchange, Throwable exception) {
        if (Objects.isNull(getCallBackUri())) {
            Object error;
            error = generateError(exchange, exception);
            return WebFluxResultUtils.result(exchange, error);
        }
        DispatcherHandler dispatcherHandler =
            SpringBeanUtils.getInstance().getBean(DispatcherHandler.class);
        ServerHttpRequest request = exchange.getRequest().mutate().uri(getCallBackUri()).build();
        ServerWebExchange mutated = exchange.mutate().request(request).build();
        return dispatcherHandler.handle(mutated);
    }

doFallback是否应该包装在Command中执行,防止降级服务也跨了引起问题。


文章作者: maybe
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 maybe !