Soul网关Divide插件


divide plugin是什么?

  1. divide插件是网关处理 http协议请求的核心处理插件。
  2. divide插件是进行http正向代理的插件,所有http类型的请求,都是由该插件进行负载均衡的调用。

divide plugin如何工作?

soul plugin 通过职责链模式串联。初始化网关的web服务SoulWebHandler时候加载了所有配置的插件保存在
DefaultSoulPluginChain职责链对象中。当请求来了之后,按照插件顺序依次执行。

/**
 * Init SoulWebHandler.
 *
 * @param plugins this plugins is All impl SoulPlugin.
 * @return {@linkplain SoulWebHandler}
 */
@Bean("webHandler")
public SoulWebHandler soulWebHandler(final ObjectProvider<List<SoulPlugin>> plugins) {
    List<SoulPlugin> pluginList = plugins.getIfAvailable(Collections::emptyList);
    final List<SoulPlugin> soulPlugins = pluginList.stream()
            .sorted(Comparator.comparingInt(SoulPlugin::getOrder)).collect(Collectors.toList());
    soulPlugins.forEach(soulPlugin -> log.info("load plugin:[{}] [{}]", soulPlugin.named(), soulPlugin.getClass().getName()));
    return new SoulWebHandler(soulPlugins);
}
@Override
public Mono<Void> handle(@NonNull final ServerWebExchange exchange) {
    MetricsTrackerFacade.getInstance().counterInc(MetricsLabelEnum.REQUEST_TOTAL.getName());
    Optional<HistogramMetricsTrackerDelegate> startTimer = MetricsTrackerFacade.getInstance().histogramStartTimer(MetricsLabelEnum.REQUEST_LATENCY.getName());
    // 依次执行插件处理
    return new DefaultSoulPluginChain(plugins).execute(exchange).subscribeOn(scheduler)
            .doOnSuccess(t -> startTimer.ifPresent(time -> MetricsTrackerFacade.getInstance().histogramObserveDuration(time)));
}
public class DividePlugin extends AbstractSoulPlugin {

    @Override
    protected Mono<Void> doExecute(final ServerWebExchange exchange, final SoulPluginChain chain, final SelectorData selector, final RuleData rule) {
        // 获取上下文信息(参数透传的一种方式)
        final SoulContext soulContext = exchange.getAttribute(Constants.CONTEXT);
        assert soulContext != null;
        // 规则处理反序列化
        final DivideRuleHandle ruleHandle = GsonUtils.getInstance().fromJson(rule.getHandle(), DivideRuleHandle.class);
        // 根据selector id从本地缓存中获取被代理的服务
        final List<DivideUpstream> upstreamList = UpstreamCacheManager.getInstance().findUpstreamListBySelectorId(selector.getId());
        if (CollectionUtils.isEmpty(upstreamList)) {
            // 如果没有一个被代理的服务就返回错误
            log.error("divide upstream configuration error: {}", rule.toString());
            Object error = SoulResultWrap.error(SoulResultEnum.CANNOT_FIND_URL.getCode(), SoulResultEnum.CANNOT_FIND_URL.getMsg(), null);
            return WebFluxResultUtils.result(exchange, error);
        }
        // 获取请求目标ip地址
        final String ip = Objects.requireNonNull(exchange.getRequest().getRemoteAddress()).getAddress().getHostAddress();
        // 使用选择器做第一次流量的筛选(这里通过spi加载扩展的负载均衡算法进行处理)
        DivideUpstream divideUpstream = LoadBalanceUtils.selector(upstreamList, ruleHandle.getLoadBalance(), ip);
        if (Objects.isNull(divideUpstream)) {
            // 筛选后找不到一个被代理的服务就返回错误
            log.error("divide has no upstream");
            Object error = SoulResultWrap.error(SoulResultEnum.CANNOT_FIND_URL.getCode(), SoulResultEnum.CANNOT_FIND_URL.getMsg(), null);
            return WebFluxResultUtils.result(exchange, error);
        }
        // set the http url
        // 拼接出最终目标url
        String domain = buildDomain(divideUpstream);
        String realURL = buildRealURL(domain, soulContext, exchange);
        // 把加工的信息写入请求ServerWebExchange中
        exchange.getAttributes().put(Constants.HTTP_URL, realURL);
        // set the http timeout
        exchange.getAttributes().put(Constants.HTTP_TIME_OUT, ruleHandle.getTimeout());
        exchange.getAttributes().put(Constants.HTTP_RETRY, ruleHandle.getRetry());
        // 调用下一个插件处理
        return chain.execute(exchange);
    }
}

文章作者: maybe
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 maybe !