Sentinel 是面向分布式服务架构的轻量级流量控制框架,主要以流量为切入点,从流量控制、熔断降级、系统负载保护等多个维度来帮助您保护服务的稳定性。

 

1. Sentinel资源&规则

我们说的资源,可以是任何东西,服务,服务里的方法,甚至是一段代码。使用 Sentinel 来进行资源保护,主要分为两个步骤:

* 定义资源
* 定义规则

先把可能需要保护的资源定义好,之后再配置规则。也可以理解为,只要有了资源,我们就可以在任何时候灵活地定义各种流量控制规则。在编码的时候,只需要考虑这个代码是否需要保护,如果需要保护,就将之定义为一个资源。

2. Demo分析

以QPS流控为分析样例

定义规则
private static void initFlowQpsRule() { //可以看出规则是个链表,那么意味着可以一个资源对应多个规则
List<FlowRule> rules = new ArrayList<FlowRule>(); FlowRule rule1 = new
FlowRule(); //规则设置资源名字 rule1.setResource("abc"); //设置现在qps为20
rule1.setCount(20); //设置流控的规则的以QPS为准,还有以线程为准
rule1.setGrade(RuleConstant.FLOW_GRADE_QPS); //根据调用方进行流量控制,默认就是全部生效
rule1.setLimitApp("default"); rules.add(rule1); //添加进流控规则管理中
FlowRuleManager.loadRules(rules); }
定义资源
Entry entry = null; try { entry = SphU.entry(”abc“); //意味着通过 } catch
(BlockException e1) { //意味着限流了 } catch (Exception e2) { // 业务异常 } finally {
//确保这里一定要执行 if (entry != null) { entry.exit(); } }


可以看到这个限制了只能20个pass,其他block

Debug分析
entry = SphU.entry(”abc“);
SphU.entry
public static Entry entry(String name) throws BlockException {
    return Env.sph.entry(name, EntryType.OUT, 1, OBJECTS0); }
这里是sph.entry实际上是CtSph.entry方法

CtSph.entry
public Entry entry(String name, EntryType type, int count, Object...
args) throws BlockException {     StringResourceWrapper resource
= new StringResourceWrapper(name, type);     return entry(resource, count,
args); }

这里根据name和type创建个StringResourceWrapper,name是我们传递进去的abc,type是EntryType.OUT表示出站流量Outbound
traffic

CtSph.entry
public Entry entry(ResourceWrapper resourceWrapper, int count, Object...
args) throws BlockException {     return entryWithPriority(resourceWrapper,
count, false, args); }
CtSph.entryWithPriority
private Entry entryWithPriority(ResourceWrapper
resourceWrapper, int count, boolean prioritized, Object... args)
    throws BlockException {     Context context = ContextUtil.getContext();
    if (context instanceof NullContext) {
        //进入这里代表Context超过阈值,这里只初始化entry,没有规则检验
        return new CtEntry(resourceWrapper, null, context);     }
    if (context == null) {         //使用默认的context         context =
MyContextUtil.myEnter(Constants.CONTEXT_DEFAULT_NAME, "",
resourceWrapper.getType());     }     // 全局开关是不是关闭了,不然也不会进行规则检查
    if (!Constants.ON) {         return new CtEntry(resourceWrapper, null,
context);     }     //这里生成一个链表,责任链的体现     ProcessorSlot<Object> chain =
lookProcessChain(resourceWrapper);
    //在生成chain的里面有个判断,如果chainMap.size大于一个值就返回null,也不进行规则检测     if (chain
== null) {         return new CtEntry(resourceWrapper, null, context);     }
    //下面这里才真正开始,生成个entry     Entry e = new CtEntry(resourceWrapper, chain,
context);     try {         //开始检验规则         chain.entry(context,
resourceWrapper, null, count, prioritized, args);     } catch (BlockException
e1) {         //限流了,往上抛,这里exit了,而外界finally需要判空再exit         e.exit(count,
args);         throw e1;     } catch (Throwable e1) {         // This should
not happen, unless there are errors existing in Sentinel internal.
        RecordLog.info("Sentinel unexpected exception", e1);     }
    //正常pass情况     return e; }
这里分为几个步骤

* 做一些检查,全局开关,是不是超过Context阈值,entry数量是不是超过阈值等等,符合就返回个CtEntry不做后面的规则检查
* 根据resourceWrapper生成个slot责任链
* 如果抛出BlockException,entry就是exit然后抛给上层异常,让上层感知到block了,限流了
* 正常通过那就返回entry,上层就知道没有被限流
 

CtSph. lookProcessChain
//这里是线程不安全的Map,而且加了volatile private static volatile Map<ResourceWrapper,
ProcessorSlotChain> chainMap     = new HashMap<ResourceWrapper,
ProcessorSlotChain>(); ProcessorSlot<Object> lookProcessChain(ResourceWrapper
resourceWrapper) {     //先从缓存中获取,看能不能获取到     ProcessorSlotChain chain =
chainMap.get(resourceWrapper);     //这里做了double check
单例,所以HashMap没有线程不安全,加volatile是为了让其他线程立刻看到     if (chain == null) {
        synchronized (LOCK) {             chain =
chainMap.get(resourceWrapper);             if (chain == null) {
                //chainMap大小大于一个值,也就是entry数量大小限制了,一个chain对应一个entry
                if (chainMap.size() >= Constants.MAX_SLOT_CHAIN_SIZE) {
                    return null;                 }                 //新建Chain
                chain = SlotChainProvider.newSlotChain();
                //这里是逻辑是,新建一个Map大小是oldMap+1
                Map<ResourceWrapper, ProcessorSlotChain> newMap
= new HashMap<ResourceWrapper, ProcessorSlotChain>(
                    chainMap.size() + 1);
                //然后先整体放入oldMap,再放新建的chain
                newMap.putAll(chainMap);
                newMap.put(resourceWrapper, chain);                 //替换
                chainMap = newMap;
                //这里的逻辑,应该是避免频繁的扩容!因为Constants.MAX_SLOT_CHAIN_SIZE默认6000,扩容一次的消耗太大
            }         }     }     return chain; }
SlotChainProvider.newSlotChain
public static ProcessorSlotChain newSlotChain() {     if (builder != null) {
        return builder.build();     }     resolveSlotChainBuilder();
    if (builder == null) {         RecordLog.warn("[SlotChainProvider] Wrong
state when resolving slot chain builder, using default");         builder
= new DefaultSlotChainBuilder();     }     return builder.build(); }

这里是SPI扩展点,如果自己扩展,那么builder就是自己的,不走DefaultSlotChainBuilder,先不关注SPI,关注默认的DefaultSlotChainBuilder

DefaultSlotChainBuilder.build
public ProcessorSlotChain build() {     ProcessorSlotChain chain
= new DefaultProcessorSlotChain();     chain.addLast(new NodeSelectorSlot());
    chain.addLast(new ClusterBuilderSlot());     chain.addLast(new LogSlot());
    chain.addLast(new StatisticSlot());     chain.addLast(new SystemSlot());
    chain.addLast(new AuthoritySlot());     chain.addLast(new FlowSlot());
    chain.addLast(new DegradeSlot());     return chain; }
很明显Chain是一个责任链模式,本质上是个链表,添加很多的slot

然后我们得进入DefaultProcessorSlotChain看看

3. DefaultProcessorSlotChain

3.1. 初始化



在这里层次图中,以Slot结尾的是Chain中的元素,也就是一个一个的handler,这里叫slot而已,

这里的模式很有意思


在DefaultProcessorSlotChain中有两个元素first和end两个引用,类型是AbstractLinkedProcessorSlot,实际上指向是那些AbstractLinkedProcessorSlot的子类,FlowSlot之类
AbstractLinkedProcessorSlot<?> first
= new AbstractLinkedProcessorSlot<Object>() {     @Override
    public void entry(Context context, ResourceWrapper resourceWrapper, Object
t, int count, boolean prioritized, Object... args)         throws Throwable {
        super.fireEntry(context, resourceWrapper, t, count, prioritized, args);
    }     @Override     public void exit(Context context, ResourceWrapper
resourceWrapper, int count, Object... args) {         super.fireExit(context,
resourceWrapper, count, args);     } }; AbstractLinkedProcessorSlot<?> end =
first;


刚刚开始的情况,frist和end都指向一个匿名内部类

 添加Slot





最后分析的结果和debug的结果相同

SlotChain的entry方法

下面开始看一个entry走过的流程

CtSph.entryWithPriority
chain.entry(context, resourceWrapper, null, count, prioritized, args);
这里进入的是DefaultProcessorSlotChain的entry方法
public void entry(Context context, ResourceWrapper resourceWrapper, Object
t, int count, boolean prioritized, Object... args)     throws Throwable {
    first.transformEntry(context, resourceWrapper, t, count, prioritized,
args); }
transformEntry方法在AbstractLinkedProcessorSlot中被定义
void transformEntry(Context context, ResourceWrapper resourceWrapper, Object
o, int count, boolean prioritized, Object... args)     throws Throwable {     T
t = (T)o;     entry(context, resourceWrapper, t, count, prioritized, args); }
这里entry方法是AbstractLinkedProcessorSlot$1这个匿名内部类重写的那个方法
@Override public void entry(Context context, ResourceWrapper resourceWrapper,
Object t, int count, boolean prioritized, Object... args)     throws Throwable
{     super.fireEntry(context, resourceWrapper, t, count, prioritized, args); }
然后又是调用父类方法super.fireEntry
@Override public void fireEntry(Context context, ResourceWrapper
resourceWrapper, Object obj, int count, boolean prioritized, Object... args)
    throws Throwable {     if (next != null) {
        next.transformEntry(context, resourceWrapper, obj, count, prioritized,
args);     } }
这里就开始调用下一个slot执行逻辑了

这里需要关注一点

entry方法          各个Slot自己实现

fireEntry方法     AbstractLinkedProcessorSlot定义好了   如果next不为空触发transformEntry方法

transformEntry方法    AbstractLinkedProcessorSlot定义好了  触发自定义的entry方法



到此,Sentinel的工作流程架构就梳理完成

具体的功能是Slot的部分

可以看下Sentinel自带提供了那些Slot

* NodeSelectorSlot 负责收集资源的路径,并将这些资源的调用路径,以树状结构存储起来,用于根据调用路径来限流降级;
* ClusterBuilderSlot 则用于存储资源的统计信息以及调用者信息,例如该资源的 RT, QPS, thread count
等等,这些信息将用作为多维度限流,降级的依据;
* StatisticSlot 则用于记录、统计不同纬度的 runtime 指标监控信息;
* FlowSlot 则用于根据预设的限流规则以及前面 slot 统计的状态,来进行流量控制;
* AuthoritySlot 则根据配置的黑白名单和调用来源信息,来做黑白名单控制;
* DegradeSlot 则通过统计信息以及预设的规则,来做熔断降级;
* SystemSlot 则通过系统的状态,例如 load1 等,来控制总的入口流量;
 

下一章:构建节点资源树 <https://blog.csdn.net/qq_33330687/article/details/86567955>

友情链接
KaDraw流程图
API参考文档
OK工具箱
云服务器优惠
阿里云优惠券
腾讯云优惠券
华为云优惠券
站点信息
问题反馈
邮箱:[email protected]
QQ群:637538335
关注微信