消息:分布式工作流任务调度系统Easy Scheduler开源地址:https://github.com/analysys/EasyScheduler
<https://github.com/analysys/EasyScheduler> , 欢迎感兴趣的伙伴参与!谢谢!

部署了一个单机版本的小demo,想体验的伙伴,请访问 http://52.82.13.76:8888 <http://52.82.13.76:8888>,
涉及多人修改的问题,管理员登录暂不开放!

<>承载每天10万级任务调度系统的架构设计

<>导语


EasyScheduler(易调度)是易观数据平台研发的大数据分布式调度系统。主要解决数据处理中错综复杂的依赖关系,而不能直观监控任务健康状态等问题。EasyScheduler以DAG流式的方式将Task组装起来,可实时监控任务的运行状态,同时支持重试、从指定节点恢复失败、暂停及Kill任务等操作

<>背景


任务调度系统在大数据平台当中是一个核心的基础设施,由于数据处理流程常常具有很长的依赖链条,因此依赖单机的crontab等单纯依赖时间调度的方式,往往存在很大的弊端,如依赖不清晰,出错难以查找等问题,因此,我们调研了市面上流行的调度系统:



鉴于易观日处理数据30多TB,复杂的ETL依赖关系、易用性、可维护性及方便二次开发等综合原因,我们开发了自己的大数据分布式调度系统 Easy
Scheduler。

其主要目标如下:

* 以DAG图的方式将Task按照任务的依赖关系关联起来,可实时可视化监控任务的运行状态
*
支持丰富的任务类型:Shell、MR、Spark、SQL(mysql、postgresql、hive/impala、sparksql、oracle)、Clickhouse、Python、Sub_Process、Procedure、DEPENDENT(依赖)等任务类型
* 支持工作流定时调度、依赖调度、手动调度、手动暂停/停止/恢复,同时支持失败重试/告警、从指定节点恢复失败、Kill任务等操作
* 支持工作流优先级、任务优先级及任务的故障转移及任务超时告警/失败
* 支持工作流全局参数及节点自定义参数设置
* 支持资源文件的在线上传/下载,管理等,支持在线文件创建、编辑
* 支持任务日志在线查看及滚动、在线下载日志等
* 实现集群HA,通过Zookeeper实现Master集群和Worker集群去中心化
* 支持对Master/Worker cpu load,memory,cpu在线查看
* 支持工作流运行历史树形/甘特图展示、支持任务状态统计、流程状态统计
* 支持补数
* 支持多租户
* 支持国际化
* 任务类型插件化Plugin(计划中)
* 支持按日历调度(feature idea)
今天我们就来分享一下Easy Scheduler调度系统的架构实现,在对调度系统架构说明之前,我们先来认识一下调度系统常用的名词

<>1 名词解释

DAG: 全称Directed Acyclic
Graph,简称DAG。工作流中的Task任务以有向无环图的形式组装起来,从入度为零的节点进行拓扑遍历,直到无后继节点为止。举例如下图:
<https://analysys.github.io/EasyScheduler/zh_CN/images/dag_examples_cn.jpg>

流程定义:通过拖拽任务节点并建立任务节点的关联所形成的可视化DAG

流程实例:流程实例是流程定义的实例化,可以通过手动启动或定时调度生成

任务实例:任务实例是流程定义中任务节点的实例化,标识着具体的任务执行状态

任务类型:
目前支持有SHELL、SQL、SUB_PROCESS、PROCEDURE、MR、SPARK、PYTHON、DEPENDENT,同时计划支持动态插件扩展,注意:其中子
SUB_PROCESS 也是一个单独的流程定义,是可以单独启动执行的

调度方式:
系统支持基于cron表达式的定时调度和手动调度。命令类型支持:启动工作流、从当前节点开始执行、恢复被容错的工作流、恢复暂停流程、从失败节点开始执行、补数、调度、重跑、暂停、停止、恢复等待线程。其中
恢复被容错的工作流 和 恢复等待线程 两种命令类型是由调度内部控制使用,外部无法调用

定时调度:系统采用 quartz 分布式调度器,并同时支持cron表达式可视化的生成

依赖:系统不单单支持 DAG 简单的前驱和后继节点之间的依赖,同时还提供任务依赖节点,支持流程间的自定义任务依赖

优先级 :支持流程实例和任务实例的优先级,如果流程实例和任务实例的优先级不设置,则默认是先进先出

邮件告警:支持 SQL任务 查询结果邮件发送,流程实例运行结果邮件告警及容错告警通知

失败策略:对于并行运行的任务,如果有任务失败,提供两种失败策略处理方式,继续是指不管并行运行任务的状态,直到流程失败结束。结束
是指一旦发现失败任务,则同时Kill掉正在运行的并行任务,流程失败结束

补数:补历史数据,支持区间并行和串行两种补数方式

<>2 EasyScheduler系统架构

<>2.1 EasyScheduler系统架构图

<https://github.com/analysys/EasyScheduler/>

<>2.2 EasyScheduler架构说明

*
MasterServer

MasterServer采用分布式无中心设计理念,MasterServer主要负责 DAG
任务切分、任务提交监控,并同时监听其它MasterServer和WorkerServer的健康状态。
MasterServer服务启动时向Zookeeper注册临时节点,通过监听Zookeeper临时节点变化来进行容错处理。

<>该服务内主要包含:

*
Distributed Quartz分布式调度组件,主要负责定时任务的启停操作,当quartz调起任务后,Master内部会有线程池具体负责处理任务的后续操作

*
MasterSchedulerThread是一个扫描线程,定时扫描数据库中的 command 表,根据不同的命令类型进行不同的业务操作

*
MasterExecThread主要是负责DAG任务切分、任务提交监控、各种不同命令类型的逻辑处理

*
MasterTaskExecThread主要负责任务的持久化

*
WorkerServer


WorkerServer也采用分布式无中心设计理念,WorkerServer主要负责任务的执行和提供日志服务。WorkerServer服务启动时向Zookeeper注册临时节点,并维持心跳。

<>该服务包含:

*
FetchTaskThread主要负责不断从Task Queue中领取任务,并根据不同任务类型调用TaskScheduleThread对应执行器。

*
LoggerServer是一个RPC服务,提供日志分片查看、刷新和下载等功能

*
ZooKeeper


ZooKeeper服务,系统中的MasterServer和WorkerServer节点都通过ZooKeeper来进行集群管理和容错。另外系统还基于ZooKeeper进行事件监听和分布式锁。
我们也曾经基于Redis实现过队列,不过我们希望EasyScheduler依赖到的组件尽量地少,所以最后还是去掉了Redis实现。

*
Task Queue


提供任务队列的操作,目前队列也是基于Zookeeper来实现。由于队列中存的信息较少,不必担心队列里数据过多的情况,实际上我们压测过百万级数据存队列,对系统稳定性和性能没影响。

*
Alert

提供告警相关接口,接口主要包括告警两种类型的告警数据的存储、查询和通知功能。其中通知功能又有邮件通知和**SNMP(暂未实现)**两种。

*
API

API接口层,主要负责处理前端UI层的请求。该服务统一提供RESTful api向外部提供请求服务。
接口包括工作流的创建、定义、查询、修改、发布、下线、手工启动、停止、暂停、恢复、从该节点开始执行等等。

*
UI

系统的前端页面,提供系统的各种可视化操作界面,详见**系统使用手册
<https://blog.csdn.net/oDaiLiDong/article/details/%E7%B3%BB%E7%BB%9F%E4%BD%BF%E7%94%A8%E6%89%8B%E5%86%8C.md>
**部分。

<>2.3 EasyScheduler架构设计思想

<>一、去中心化vs中心化
<>中心化思想
中心化的设计理念比较简单,分布式集群中的节点按照角色分工,大体上分为两种角色:
<https://analysys.github.io/EasyScheduler/zh_CN/images/master_slave.png>

* Master的角色主要负责任务分发并监督Slave的健康状态,可以动态的将任务均衡到Slave上,以致Slave节点不至于“忙死”或”闲死”的状态。
* Worker的角色主要负责任务的执行工作并维护和Master的心跳,以便Master可以分配任务给Slave。
中心化思想设计存在的问题:

*
一旦Master出现了问题,则群龙无首,整个集群就会崩溃。为了解决这个问题,大多数Master/Slave架构模式都采用了主备Master的设计方案,可以是热备或者冷备,也可以是自动切换或手动切换,而且越来越多的新系统都开始具备自动选举切换Master的能力,以提升系统的可用性。
*
另外一个问题是如果Scheduler在Master上,虽然可以支持一个DAG中不同的任务运行在不同的机器上,但是会产生Master的过负载。如果Scheduler在Slave上,则一个DAG中所有的任务都只能在某一台机器上进行作业提交,则并行任务比较多的时候,Slave的压力可能会比较大。
<>去中心化
<https://github.com/analysys/EasyScheduler/>

*

在去中心化设计里,通常没有Master/Slave的概念,所有的角色都是一样的,地位是平等的,全球互联网就是一个典型的去中心化的分布式系统,联网的任意节点设备down机,都只会影响很小范围的功能。

*
去中心化设计的核心设计在于整个分布式系统中不存在一个区别于其他节点的”管理者”,因此不存在单点故障问题。但由于不存在”
管理者”节点所以每个节点都需要跟其他节点通信才得到必须要的机器信息,而分布式系统通信的不可靠行,则大大增加了上述功能的实现难度。

*

实际上,真正去中心化的分布式系统并不多见。反而动态中心化分布式系统正在不断涌出。在这种架构下,集群中的管理者是被动态选择出来的,而不是预置的,并且集群在发生故障的时候,集群的节点会自发的举行"会议"来选举新的"管理者"去主持工作。最典型的案例就是ZooKeeper及Go语言实现的Etcd。

*

EasyScheduler的去中心化是Master/Worker注册到Zookeeper中,实现Master集群和Worker集群无中心,并使用Zookeeper分布式锁来选举其中的一台Master或Worker为“管理者”来执行任务。

<>二、分布式锁实践

EasyScheduler使用ZooKeeper分布式锁来实现同一时刻只有一台Master执行Scheduler,或者只有一台Worker执行任务的提交。

* 获取分布式锁的核心流程算法如下
<https://github.com/analysys/EasyScheduler/>

* EasyScheduler中Scheduler线程分布式锁实现流程图:
<https://github.com/analysys/EasyScheduler/>
<>三、线程不足循环等待问题

* 如果一个DAG中没有子流程,则如果Command中的数据条数大于线程池设置的阈值,则直接流程等待或失败。
* 如果一个大的DAG中嵌套了很多子流程,如下图则会产生“死等”状态:
<https://github.com/analysys/EasyScheduler/>
上图中MainFlowThread等待SubFlowThread1结束,SubFlowThread1等待SubFlowThread2结束,
SubFlowThread2等待SubFlowThread3结束,而SubFlowThread3等待线程池有新线程,则整个DAG流程不能结束,从而其中的线程也不能释放。这样就形成的子父流程循环等待的状态。此时除非启动新的Master来增加线程来打破这样的”僵局”,否则调度集群将不能再使用。

对于启动新Master来打破僵局,似乎有点差强人意,于是我们提出了以下三种方案来降低这种风险:

*
计算所有Master的线程总和,然后对每一个DAG需要计算其需要的线程数,也就是在DAG流程执行之前做预计算。因为是多Master线程池,所以总线程数不太可能实时获取。
* 对单Master线程池进行判断,如果线程池已经满了,则让线程直接失败。
* 增加一种资源不足的Command类型,如果线程池不足,则将主流程挂起。这样线程池就有了新的线程,可以让资源不足挂起的流程重新唤醒执行。
注意:Master Scheduler线程在获取Command的时候是FIFO的方式执行的。

于是我们选择了第三种方式来解决线程不足的问题。

<>四、容错设计

容错分为服务宕机容错和任务重试,服务宕机容错又分为Master容错和Worker容错两种情况
<>1. 宕机容错
服务容错设计依赖于ZooKeeper的Watcher机制,实现原理如图:

<https://github.com/analysys/EasyScheduler/>

其中Master监控其他Master和Worker的目录,如果监听到remove事件,则会根据具体的业务逻辑进行流程实例容错或者任务实例容错。

* Master容错流程图:
<https://github.com/analysys/EasyScheduler/>
ZooKeeper Master容错完成之后则重新由EasyScheduler中Scheduler线程调度,遍历 DAG
找到”正在运行”和“提交成功”的任务,对”正在运行”的任务监控其任务实例的状态,对”提交成功”的任务需要判断Task
Queue中是否已经存在,如果存在则同样监控任务实例的状态,如果不存在则重新提交任务实例。

* Worker容错流程图:
<https://github.com/analysys/EasyScheduler/>

Master Scheduler线程一旦发现任务实例为” 需要容错”状态,则接管任务并进行重新提交。

注意:由于”
网络抖动”可能会使得节点短时间内失去和ZooKeeper的心跳,从而发生节点的remove事件。对于这种情况,我们使用最简单的方式,那就是节点一旦和ZooKeeper发生超时连接,则直接将Master或Worker服务停掉。
<>2.任务失败重试
这里首先要区分任务失败重试、流程失败恢复、流程失败重跑的概念:

* 任务失败重试是任务级别的,是调度系统自动进行的,比如一个Shell任务设置重试次数为3次,那么在Shell任务运行失败后会自己再最多尝试运行3次
* 流程失败恢复是流程级别的,是手动进行的,恢复是从只能从失败的节点开始执行或从当前节点开始执行
* 流程失败重跑也是流程级别的,是手动进行的,重跑是从开始节点进行
接下来说正题,我们将工作流中的任务节点分了两种类型。

*
一种是业务节点,这种节点都对应一个实际的脚本或者处理语句,比如Shell节点,MR节点、Spark节点、依赖节点等。

*
还有一种是逻辑节点,这种节点不做实际的脚本或语句处理,只是整个流程流转的逻辑处理,比如子流程节等。

每一个业务节点都可以配置失败重试的次数,当该任务节点失败,会自动重试,直到成功或者超过配置的重试次数。逻辑节点不支持失败重试。但是逻辑节点里的任务支持重试。

如果工作流中有任务失败达到最大重试次数,工作流就会失败停止,失败的工作流可以手动进行重跑操作或者流程恢复操作

<>五、任务优先级设计


在早期调度设计中,如果没有优先级设计,采用公平调度设计的话,会遇到先行提交的任务可能会和后继提交的任务同时完成的情况,而不能做到设置流程或者任务的优先级,因此我们对此进行了重新设计,目前我们设计如下:

* 按照不同流程实例优先级优先于同一个流程实例优先级优先于同一流程内任务优先级优先于同一流程内任务提交顺序依次从高到低进行任务处理。
*
具体实现是根据任务实例的json解析优先级,然后把流程实例优先级_流程实例id_任务优先级_任务id
信息保存在ZooKeeper任务队列中,当从任务队列获取的时候,通过字符串比较即可得出最需要优先执行的任务

*
其中流程定义的优先级是考虑到有些流程需要先于其他流程进行处理,这个可以在流程启动或者定时启动时配置,共有5级,依次为HIGHEST、HIGH、MEDIUM、LOW、LOWEST。如下图
<https://github.com/analysys/EasyScheduler/>
- 任务的优先级也分为5级,依次为HIGHEST、HIGH、MEDIUM、LOW、LOWEST。如下图
<https://github.com/analysys/EasyScheduler/>

<>六、Logback和gRPC实现日志访问

*
由于Web(UI)和Worker不一定在同一台机器上,所以查看日志不能像查询本地文件那样。有两种方案:

*
将日志放到ES搜索引擎上

*
通过gRPC通信获取远程日志信息

*
介于考虑到尽可能的EasyScheduler的轻量级性,所以选择了gRPC实现远程访问日志信息。

<https://github.com/analysys/EasyScheduler/>

* 我们使用自定义Logback的FileAppender和Filter功能,实现每个任务实例生成一个日志文件。
* FileAppender主要实现如下: /** * task log appender */ public class TaskLogAppender
extends FileAppender<ILoggingEvent { ... @Override protected void append(
ILoggingEvent event) { if (currentlyActiveFile == null){ currentlyActiveFile =
getFile(); } String activeFile = currentlyActiveFile; // thread name:
taskThreadName-processDefineId_processInstanceId_taskInstanceId String
threadName= event.getThreadName(); String[] threadNameArr = threadName.split("-"
); // logId = processDefineId_processInstanceId_taskInstanceId String logId =
threadNameArr[1]; ... super.subAppend(event); } }
以/流程定义id/流程实例id/任务实例id.log的形式生成日志

*
过滤匹配以TaskLogInfo开始的线程名称:

*
TaskLogFilter实现如下:
/** * task log filter */ public class TaskLogFilter extends Filter<
ILoggingEvent{ @Override public FilterReply decide(ILoggingEvent event) { if (
event.getThreadName().startsWith("TaskLogInfo-")){ return FilterReply.ACCEPT; }
return FilterReply.DENY; } }
<>3 近期EasyScheduler的工作进展

EasyScheduler于5.27号发布了自开源后的第四个迭代版本-- 1.0.3。

新版本更新内容:

** 新特性 ** :

* [EasyScheduler-254] 流程定义删除和批量删除
* [EasyScheduler-347] 任务依赖增加“今日”
* [EasyScheduler-273] sql任务添加title
* [EasyScheduler-247] API在线文档
* [EasyScheduler-319] 单机容错
* [EasyScheduler-253] 项目增加流程定义统计和运行流程实例统计
* [EasyScheduler-292] 启用SSL的邮箱发送邮件
* [EasyScheduler-77] 定时管理、工作流定义添加删除功能
* [EasyScheduler-380] 服务监控功能
* [EasyScheduler-380] 项目增加流程定义统计和运行流程实例统计
** 增强 ** :

* [EasyScheduler-192] 租户删除前可以考虑校验租户和资源
* [EasyScheduler-376] 删除实例时候,没有删除对应zookeeper队列里的任务
* [EasyScheduler-185] 项目删除工作流定义还存在
* [EasyScheduler-206] 优化部署,完善docker化支持
* [EasyScheduler-381] 前端一键部署脚本支持ubuntu
** 修复** :

* [EasyScheduler-255] 子父流程全局变量覆盖,子流程继承父流程全局变量并可以重写
* [EasyScheduler-256] 子父流程参数显示异常
* [EasyScheduler-186] 所有查询中只要输入%会返回所有数据
* [EasyScheduler-185] 项目删除工作流定义还存在
* [EasyScheduler-266] Stop process return: process definition 1 not on line
* [EasyScheduler-300] 超时告警时间单位
* [EasyScheduler-235] nginx超时连接问题修复
* [EasyScheduler-272] 管理员不能生成token
* [EasyScheduler-272] save global parameters error
* [EasyScheduler-183] 创建中文名称的Worker分组报错
* [EasyScheduler-377] 资源文件重命名只修改描述时会报名称已存在错误
* [EasyScheduler-235] 创建spark数据源,点击“测试连接”,系统回退回到登入页面
* [EasyScheduler-83] 1.0.1版本启动api server报错
* [EasyScheduler-379] 跨天恢复执行定时任务时,时间参数不对
* [EasyScheduler-383] sql邮件不显示前面的空行
感谢
最后但最重要的是,没有以下伙伴的贡献就没有新版本的诞生:

Baoqi, jimmy201602, samz406, petersear, millionfor, hyperknob, fanguanqun,
yangqinlong, qq389401879, feloxx, coding-now, hymzcn, nysyxxg, chgxtony……


这里也要感谢EasyScheduler社区里数百位发现问题、给出建议以及代码贡献的伙伴们,非常感谢!社区才是发展的基石,我们会持续跟进社区反馈,跟社区进行深度协同,进而实现由社区驱动!

<>综述


本文着重介绍了大数据分布式工作流调度系统–EasyScheduler的架构原理及实现思路,后续我们会深入分析数据任务流向架构。EasyScheduler在经过易观生产环境近2年的稳定运行和开源早期用户的打磨,据愿意公开使用EasyScheduler情况的统计,已经有十多家用户部署使用(
Who is using EasyScheduler <https://github.com/analysys/EasyScheduler/issues/57>
),非常感谢伙伴们这么热情和信任我们,我们会和大家一道继续奔走在使调度系统开箱即用这条大道上,为使"数据能力平民化"添砖加瓦。同时,Easy
Scheduler使用了很多优秀的开源项目,比如google的guava、guice、grpc,netty,ali的bonecp,quartz,以及apache的众多开源项目等等,正是由于站在这些开源项目的肩膀上,才有Easy
Scheduler的诞生的可能。我们希望自己不仅是开源的受益者,也能成为开源的贡献者,也希望对开源调度系统有同样热情和信念的伙伴加入进来,一起为开源献出一份力!为数据时代贡献自己的激情和汗水!

附:
开源github地址:https://github.com/analysys/EasyScheduler
<https://github.com/analysys/EasyScheduler> (欢迎star, 欢迎fork)
在线文档地址:https://analysys.github.io/easyscheduler_docs_cn/
<https://analysys.github.io/easyscheduler_docs_cn/>

我们也成立了钉钉群(微信群由于人数已超500人,多群方式也不合适,微信群正逐步转移到钉钉中),欢迎加钉钉群(群号:
23107445),加时请注明公司+姓名,谢谢!!

友情链接
KaDraw流程图
API参考文档
OK工具箱
云服务器优惠
阿里云优惠券
腾讯云优惠券
华为云优惠券
站点信息
问题反馈
邮箱:[email protected]
QQ群:637538335
关注微信