divide plugin是什么?
- divide插件是网关处理
http协议
请求的核心处理插件。 - divide插件是进行http正向代理的插件,所有http类型的请求,都是由该插件进行负载均衡的调用。
divide plugin如何工作?
soul plugin 通过职责链模式串联。初始化网关的web服务SoulWebHandler时候加载了所有配置的插件保存在
DefaultSoulPluginChain职责链对象中。当请求来了之后,按照插件顺序依次执行。
/**
* Init SoulWebHandler.
*
* @param plugins this plugins is All impl SoulPlugin.
* @return {@linkplain SoulWebHandler}
*/
@Bean("webHandler")
public SoulWebHandler soulWebHandler(final ObjectProvider<List<SoulPlugin>> plugins) {
List<SoulPlugin> pluginList = plugins.getIfAvailable(Collections::emptyList);
final List<SoulPlugin> soulPlugins = pluginList.stream()
.sorted(Comparator.comparingInt(SoulPlugin::getOrder)).collect(Collectors.toList());
soulPlugins.forEach(soulPlugin -> log.info("load plugin:[{}] [{}]", soulPlugin.named(), soulPlugin.getClass().getName()));
return new SoulWebHandler(soulPlugins);
}
@Override
public Mono<Void> handle(@NonNull final ServerWebExchange exchange) {
MetricsTrackerFacade.getInstance().counterInc(MetricsLabelEnum.REQUEST_TOTAL.getName());
Optional<HistogramMetricsTrackerDelegate> startTimer = MetricsTrackerFacade.getInstance().histogramStartTimer(MetricsLabelEnum.REQUEST_LATENCY.getName());
// 依次执行插件处理
return new DefaultSoulPluginChain(plugins).execute(exchange).subscribeOn(scheduler)
.doOnSuccess(t -> startTimer.ifPresent(time -> MetricsTrackerFacade.getInstance().histogramObserveDuration(time)));
}
public class DividePlugin extends AbstractSoulPlugin {
@Override
protected Mono<Void> doExecute(final ServerWebExchange exchange, final SoulPluginChain chain, final SelectorData selector, final RuleData rule) {
// 获取上下文信息(参数透传的一种方式)
final SoulContext soulContext = exchange.getAttribute(Constants.CONTEXT);
assert soulContext != null;
// 规则处理反序列化
final DivideRuleHandle ruleHandle = GsonUtils.getInstance().fromJson(rule.getHandle(), DivideRuleHandle.class);
// 根据selector id从本地缓存中获取被代理的服务
final List<DivideUpstream> upstreamList = UpstreamCacheManager.getInstance().findUpstreamListBySelectorId(selector.getId());
if (CollectionUtils.isEmpty(upstreamList)) {
// 如果没有一个被代理的服务就返回错误
log.error("divide upstream configuration error: {}", rule.toString());
Object error = SoulResultWrap.error(SoulResultEnum.CANNOT_FIND_URL.getCode(), SoulResultEnum.CANNOT_FIND_URL.getMsg(), null);
return WebFluxResultUtils.result(exchange, error);
}
// 获取请求目标ip地址
final String ip = Objects.requireNonNull(exchange.getRequest().getRemoteAddress()).getAddress().getHostAddress();
// 使用选择器做第一次流量的筛选(这里通过spi加载扩展的负载均衡算法进行处理)
DivideUpstream divideUpstream = LoadBalanceUtils.selector(upstreamList, ruleHandle.getLoadBalance(), ip);
if (Objects.isNull(divideUpstream)) {
// 筛选后找不到一个被代理的服务就返回错误
log.error("divide has no upstream");
Object error = SoulResultWrap.error(SoulResultEnum.CANNOT_FIND_URL.getCode(), SoulResultEnum.CANNOT_FIND_URL.getMsg(), null);
return WebFluxResultUtils.result(exchange, error);
}
// set the http url
// 拼接出最终目标url
String domain = buildDomain(divideUpstream);
String realURL = buildRealURL(domain, soulContext, exchange);
// 把加工的信息写入请求ServerWebExchange中
exchange.getAttributes().put(Constants.HTTP_URL, realURL);
// set the http timeout
exchange.getAttributes().put(Constants.HTTP_TIME_OUT, ruleHandle.getTimeout());
exchange.getAttributes().put(Constants.HTTP_RETRY, ruleHandle.getRetry());
// 调用下一个插件处理
return chain.execute(exchange);
}
}