Sentinel 是面向分布式服务架构的轻量级流量控制框架,主要以流量为切入点,从流量控制、熔断降级、系统负载保护等多个维度来帮助您保护服务的稳定性。
1. Sentinel资源&规则
我们说的资源,可以是任何东西,服务,服务里的方法,甚至是一段代码。使用 Sentinel 来进行资源保护,主要分为两个步骤:
* 定义资源
* 定义规则
先把可能需要保护的资源定义好,之后再配置规则。也可以理解为,只要有了资源,我们就可以在任何时候灵活地定义各种流量控制规则。在编码的时候,只需要考虑这个代码是否需要保护,如果需要保护,就将之定义为一个资源。
2. Demo分析
以QPS流控为分析样例
定义规则
private static void initFlowQpsRule() { //可以看出规则是个链表,那么意味着可以一个资源对应多个规则
List<FlowRule> rules = new ArrayList<FlowRule>(); FlowRule rule1 = new
FlowRule(); //规则设置资源名字 rule1.setResource("abc"); //设置现在qps为20
rule1.setCount(20); //设置流控的规则的以QPS为准,还有以线程为准
rule1.setGrade(RuleConstant.FLOW_GRADE_QPS); //根据调用方进行流量控制,默认就是全部生效
rule1.setLimitApp("default"); rules.add(rule1); //添加进流控规则管理中
FlowRuleManager.loadRules(rules); }
定义资源
Entry entry = null; try { entry = SphU.entry(”abc“); //意味着通过 } catch
(BlockException e1) { //意味着限流了 } catch (Exception e2) { // 业务异常 } finally {
//确保这里一定要执行 if (entry != null) { entry.exit(); } }
可以看到这个限制了只能20个pass,其他block
Debug分析
entry = SphU.entry(”abc“);
SphU.entry
public static Entry entry(String name) throws BlockException {
return Env.sph.entry(name, EntryType.OUT, 1, OBJECTS0); }
这里是sph.entry实际上是CtSph.entry方法
CtSph.entry
public Entry entry(String name, EntryType type, int count, Object...
args) throws BlockException { StringResourceWrapper resource
= new StringResourceWrapper(name, type); return entry(resource, count,
args); }
这里根据name和type创建个StringResourceWrapper,name是我们传递进去的abc,type是EntryType.OUT表示出站流量Outbound
traffic
CtSph.entry
public Entry entry(ResourceWrapper resourceWrapper, int count, Object...
args) throws BlockException { return entryWithPriority(resourceWrapper,
count, false, args); }
CtSph.entryWithPriority
private Entry entryWithPriority(ResourceWrapper
resourceWrapper, int count, boolean prioritized, Object... args)
throws BlockException { Context context = ContextUtil.getContext();
if (context instanceof NullContext) {
//进入这里代表Context超过阈值,这里只初始化entry,没有规则检验
return new CtEntry(resourceWrapper, null, context); }
if (context == null) { //使用默认的context context =
MyContextUtil.myEnter(Constants.CONTEXT_DEFAULT_NAME, "",
resourceWrapper.getType()); } // 全局开关是不是关闭了,不然也不会进行规则检查
if (!Constants.ON) { return new CtEntry(resourceWrapper, null,
context); } //这里生成一个链表,责任链的体现 ProcessorSlot<Object> chain =
lookProcessChain(resourceWrapper);
//在生成chain的里面有个判断,如果chainMap.size大于一个值就返回null,也不进行规则检测 if (chain
== null) { return new CtEntry(resourceWrapper, null, context); }
//下面这里才真正开始,生成个entry Entry e = new CtEntry(resourceWrapper, chain,
context); try { //开始检验规则 chain.entry(context,
resourceWrapper, null, count, prioritized, args); } catch (BlockException
e1) { //限流了,往上抛,这里exit了,而外界finally需要判空再exit e.exit(count,
args); throw e1; } catch (Throwable e1) { // This should
not happen, unless there are errors existing in Sentinel internal.
RecordLog.info("Sentinel unexpected exception", e1); }
//正常pass情况 return e; }
这里分为几个步骤
* 做一些检查,全局开关,是不是超过Context阈值,entry数量是不是超过阈值等等,符合就返回个CtEntry不做后面的规则检查
* 根据resourceWrapper生成个slot责任链
* 如果抛出BlockException,entry就是exit然后抛给上层异常,让上层感知到block了,限流了
* 正常通过那就返回entry,上层就知道没有被限流
CtSph. lookProcessChain
//这里是线程不安全的Map,而且加了volatile private static volatile Map<ResourceWrapper,
ProcessorSlotChain> chainMap = new HashMap<ResourceWrapper,
ProcessorSlotChain>(); ProcessorSlot<Object> lookProcessChain(ResourceWrapper
resourceWrapper) { //先从缓存中获取,看能不能获取到 ProcessorSlotChain chain =
chainMap.get(resourceWrapper); //这里做了double check
单例,所以HashMap没有线程不安全,加volatile是为了让其他线程立刻看到 if (chain == null) {
synchronized (LOCK) { chain =
chainMap.get(resourceWrapper); if (chain == null) {
//chainMap大小大于一个值,也就是entry数量大小限制了,一个chain对应一个entry
if (chainMap.size() >= Constants.MAX_SLOT_CHAIN_SIZE) {
return null; } //新建Chain
chain = SlotChainProvider.newSlotChain();
//这里是逻辑是,新建一个Map大小是oldMap+1
Map<ResourceWrapper, ProcessorSlotChain> newMap
= new HashMap<ResourceWrapper, ProcessorSlotChain>(
chainMap.size() + 1);
//然后先整体放入oldMap,再放新建的chain
newMap.putAll(chainMap);
newMap.put(resourceWrapper, chain); //替换
chainMap = newMap;
//这里的逻辑,应该是避免频繁的扩容!因为Constants.MAX_SLOT_CHAIN_SIZE默认6000,扩容一次的消耗太大
} } } return chain; }
SlotChainProvider.newSlotChain
public static ProcessorSlotChain newSlotChain() { if (builder != null) {
return builder.build(); } resolveSlotChainBuilder();
if (builder == null) { RecordLog.warn("[SlotChainProvider] Wrong
state when resolving slot chain builder, using default"); builder
= new DefaultSlotChainBuilder(); } return builder.build(); }
这里是SPI扩展点,如果自己扩展,那么builder就是自己的,不走DefaultSlotChainBuilder,先不关注SPI,关注默认的DefaultSlotChainBuilder
DefaultSlotChainBuilder.build
public ProcessorSlotChain build() { ProcessorSlotChain chain
= new DefaultProcessorSlotChain(); chain.addLast(new NodeSelectorSlot());
chain.addLast(new ClusterBuilderSlot()); chain.addLast(new LogSlot());
chain.addLast(new StatisticSlot()); chain.addLast(new SystemSlot());
chain.addLast(new AuthoritySlot()); chain.addLast(new FlowSlot());
chain.addLast(new DegradeSlot()); return chain; }
很明显Chain是一个责任链模式,本质上是个链表,添加很多的slot
然后我们得进入DefaultProcessorSlotChain看看
3. DefaultProcessorSlotChain
3.1. 初始化
在这里层次图中,以Slot结尾的是Chain中的元素,也就是一个一个的handler,这里叫slot而已,
这里的模式很有意思
在DefaultProcessorSlotChain中有两个元素first和end两个引用,类型是AbstractLinkedProcessorSlot,实际上指向是那些AbstractLinkedProcessorSlot的子类,FlowSlot之类
AbstractLinkedProcessorSlot<?> first
= new AbstractLinkedProcessorSlot<Object>() { @Override
public void entry(Context context, ResourceWrapper resourceWrapper, Object
t, int count, boolean prioritized, Object... args) throws Throwable {
super.fireEntry(context, resourceWrapper, t, count, prioritized, args);
} @Override public void exit(Context context, ResourceWrapper
resourceWrapper, int count, Object... args) { super.fireExit(context,
resourceWrapper, count, args); } }; AbstractLinkedProcessorSlot<?> end =
first;
刚刚开始的情况,frist和end都指向一个匿名内部类
添加Slot
最后分析的结果和debug的结果相同
SlotChain的entry方法
下面开始看一个entry走过的流程
CtSph.entryWithPriority
chain.entry(context, resourceWrapper, null, count, prioritized, args);
这里进入的是DefaultProcessorSlotChain的entry方法
public void entry(Context context, ResourceWrapper resourceWrapper, Object
t, int count, boolean prioritized, Object... args) throws Throwable {
first.transformEntry(context, resourceWrapper, t, count, prioritized,
args); }
transformEntry方法在AbstractLinkedProcessorSlot中被定义
void transformEntry(Context context, ResourceWrapper resourceWrapper, Object
o, int count, boolean prioritized, Object... args) throws Throwable { T
t = (T)o; entry(context, resourceWrapper, t, count, prioritized, args); }
这里entry方法是AbstractLinkedProcessorSlot$1这个匿名内部类重写的那个方法
@Override public void entry(Context context, ResourceWrapper resourceWrapper,
Object t, int count, boolean prioritized, Object... args) throws Throwable
{ super.fireEntry(context, resourceWrapper, t, count, prioritized, args); }
然后又是调用父类方法super.fireEntry
@Override public void fireEntry(Context context, ResourceWrapper
resourceWrapper, Object obj, int count, boolean prioritized, Object... args)
throws Throwable { if (next != null) {
next.transformEntry(context, resourceWrapper, obj, count, prioritized,
args); } }
这里就开始调用下一个slot执行逻辑了
这里需要关注一点
entry方法 各个Slot自己实现
fireEntry方法 AbstractLinkedProcessorSlot定义好了 如果next不为空触发transformEntry方法
transformEntry方法 AbstractLinkedProcessorSlot定义好了 触发自定义的entry方法
到此,Sentinel的工作流程架构就梳理完成
具体的功能是Slot的部分
可以看下Sentinel自带提供了那些Slot
* NodeSelectorSlot 负责收集资源的路径,并将这些资源的调用路径,以树状结构存储起来,用于根据调用路径来限流降级;
* ClusterBuilderSlot 则用于存储资源的统计信息以及调用者信息,例如该资源的 RT, QPS, thread count
等等,这些信息将用作为多维度限流,降级的依据;
* StatisticSlot 则用于记录、统计不同纬度的 runtime 指标监控信息;
* FlowSlot 则用于根据预设的限流规则以及前面 slot 统计的状态,来进行流量控制;
* AuthoritySlot 则根据配置的黑白名单和调用来源信息,来做黑白名单控制;
* DegradeSlot 则通过统计信息以及预设的规则,来做熔断降级;
* SystemSlot 则通过系统的状态,例如 load1 等,来控制总的入口流量;
下一章:构建节点资源树 <https://blog.csdn.net/qq_33330687/article/details/86567955>
热门工具 换一换