azkaban:azkaban-common:Project

azkaban源码:azkaban-common:Project

由于种种原因,要基于azkaban二次开发一个适合自身的调度框架。所以进行源码的学习。方便后续开发。

源码构建

  • build
    • gradlew clean
    • Gradlew build
    • gradlew installDist
  • create db
    • CREATE DATABASE azkaban;
    • run sql script
    • download jdbc jar

Project

该模块主要包含了project的一系列操作。

假设我们现在有一个项目的zip文件。如何解决如下问题:

  • 如何创建一个project?
  • 一个project里面包含flow。这个flow是如何构建的?
  • 一个flow里面应该有多个job或者说node。又是如何构建的?
  • project flow 这些概念持久化存储在哪?
  • 如何查询project flow 信息?
  • 一个project中的flow如何调度?

当然这里面也包含了user,permission等概念。

首先 我们来看一下项目创建的流程。这里面只是创建project。还没有上传zip。

  • User 提交数据
  • ProjectManagerServlet 处理请求。
  • 调用ProjectManager创建project。
  • 调用ProjectLorder创建project。
  • 调用ProjectLorder实现类JDBCProjectImpl创建project
  • 调用DataBaseOperator提供的方法插入到数据库。
版面 11

接下来 我们来看一下项目上传的流程。==解析flow等信息都是在上传的过程中完成的。==

  • user提交zip
  • ProjectManagerServlet处理请求
  • 调用ProjectManager 上传project
  • 调用AzkabanProjectLoader上传project
    • 解压zip
    • 调用FlowLoaderFactory创建FlowLoader,创建Flow,并将Flow设置为Project里面的属性。
    • 调用StorageManager,内部调用projectloader上传信息到db。
    • 调用projectLoader,执行各种方法,持久化相关信息到DB。
版面 12

最后看一下其他查询请求的处理。

handleGet

  • handleAJAXAction
    • ajaxFetchProjectLogEvents: projectManager.getProjectEventLogs(project, num, skip); 获取项目日志信息
    • ajaxFetchFlow
    • ajaxFetchFlowDetails
    • ajaxFetchFlowGraph
    • ajaxFetchFlowNodeData
    • ajaxFetchProjectFlows
    • ajaxChangeDescription
    • ajaxGetPermissions
    • ajaxGetGroupPermissions
    • ajaxGetProxyUsers
    • ajaxChangePermissions
    • ajaxAddPermission
    • ajaxAddProxyUser
    • ajaxRemoveProxyUser
    • ajaxFetchFlowExecutions
    • ajaxFetchLastSuccessfulFlowExecution
    • ajaxFetchJobInfo
    • ajaxSetJobOverrideProperty
  • handleProjectLogsPage
  • handlePermissionPage
  • handlePropertyPage
  • ...

基本上类似上述过程。再次不做重复。

通过上述的表述,我们可以发现。主要的project处理的的逻辑都集中在ProjectManager上。下面我们将详细的学习分析ProjectManager。

ProjectManager

project层次上的管理。单例模式。

==主要包含各种姿势的project的查询设置创建等。属于对外暴漏提供服务的类==。里面内部实现是由AzkabanProjectLoader,ProjectLoader等类提供服务实现的。

getProjectNames()
getUserProjects()
getGroupProjects()
getUserProjectsByRegex()
getProjects()
getProjectsByRegex()
isActiveProject()
getProject()
createProject()
purgeProject()
removeProject()
updateProjectDescription()
...

整个调用的层次结构大概如下:

  • projectManager
    • AzkabanProjectLoader
      • ProjectLoader
      • StorageManager
        • ProjectLoader
        • Storage
      • FlowLoaderFactory
        • FlowLoader
    • ProjectLoader
      • JdbcProjectImpl
      • ...
版面 8

AzkabanProjectLoader

Handles the downloading and uploading of projects.

主要处理project的上传和下载。

核心方法==uploadProject==过程。

  • 解压缩 file = unzipProject(archive, fileType);
  • 验证有效性 reports = validateProject(project, archive, file, prop);
  • 创建flowloader loader = this.flowLoaderFactory.createFlowLoader(file);
    • ==工厂模式==:根据AZKABAN_FLOW_VERSION创建不同的flowloader,即DirectoryYamlFlowLoader或者DirectoryFlowLoader
  • 上传project到db persistProject(project, loader, archive, file, uploader);
  • 清空目录 FlowLoaderUtils.cleanUpDir(file);
  • 返回reports

ProjectLoader

定义了各种接口。其应该有各种实现,默认使用jdbcprojectimpl实现。

JdbcProjectImpl

ProjectLoader的一种实现,This class implements ProjectLoader using new azkaban-db code to allow DB failover. 单例模式。

其主要功能是提供存储project各种信息的持久化方法。以及这新信息获取的方法。

具体的方法:

  • fetchAllActiveProjects:获取所有active的项目
  • fetchProjectById:通过projectid 获取project
  • fetchProjectByName:通过project name 获取project
  • createNewProject:根据名字 描述等创建新项目.
  • uploadProjectFile: 上传项目文件,这个函数挺有意思的。里面是读这个文件,然后将它分成每10M一个chuck,然后插入到数据库中,并更新相应的表。
  • addProjectVersion: 添加projectversion
  • fetchProjectMetaData:获取上传的项目元数据。返回类型是ProjectFileHandler,具体数据为里面的属性。
  • getUploadedFile:获取上传的file,返回类型是ProjectFileHandler,具体数据为里面的属性。其中查处所有的chuck,然后写入到文件中,再计算生成文件的md5与元数据中的md5是否一致,一直返回文件。不一致抛出异常。
  • changeProjectVersion:修改项目版本。
  • updateProjectSettings:更改项目配置。
  • removeProject:为假删除。SET active=false
  • postEvent
  • getProjectEvents
  • updateDescription
  • getLatestProjectVersion
  • uploadFlows: 该方法主要是上传一个project的flow。首先讲一个flow对象 转换成一个Map<String, Object>,再然后转换成json,再然后转换成byte[] 存入到数据库中。
  • fetchAllProjectFlows:查询一个project的所有flow。
  • uploadProjectProperty:上传project properties。
  • fetchProjectProperty
  • cleanOlderProjectVersion
  • uploadFlowFile 上传flow file 类似于uploadFlows
  • getUploadedFlowFile 获取flow file 类似于fetchAllProjectFlows
  • getLatestFlowVersion
  • isFlowFileUploaded

StorageManager

StorageManager manages and coordinates all interactions with the Storage layer. This also includes bookkeeping like updating DB with the new versionm, etc

azkaban提供了多种存储层的实现方式,包括database,hdfs,localfile等。也可以自定义自己的存储层。默认使用的是database进行存储。

其中这里面最重要的对外的类应该是==StorageManager==。主要提供了uploadproject和getprojectfile等方法。

FlowLoaderFactory

Factory class to generate flow loaders.

flowLoader

Interface to load project flows.

包含以下重要接口:

  • ValidationReport loadProjectFlow(final Project project, final File projectDir);
  • Map<String, Flow> getFlowMap();

更直白一点,就是提供如何从一个project中构建flow的功能。

根据不同版本,flowloader提供了两种实现方式。

  • DirectoryYamlFlowLoader:flow信息存储在yaml文件中。
  • DirectoryFlowLoader:flow信息以directory的方式存在。
版面 4

NodeBeanLoader

其实该部分 应该是如何构建flow,包括node,什么的。最终对外输出的是==FlowLoaderUtils==。里面包含了yaml方式的构建和一些公用的方法。

  • AzkabanNode
    • A unit of execution that could be either a job or a flow.
  • AzkabanFlow
    • Flow level definition of the DAG.
    • Contains a list of AzkabanNodes and related flow properties.
  • AzkabanJob
    • The smallest individual unit of execution in Azkaban.
    • Contains information about job type and related properties.
版面 3

数据结构

  • node的内部属性:
    • level: 距离起始节点的距离(多条路径的话取最大值),起始节点level为0
    • jobSource
    • propsSource
  • Edge的内部属性:
    • id:getSourceId() + ">>" + getTargetId()
    • sourceId
    • source
    • targetId
    • target
    • error
  • flow的内部属性:
    • inEdges:key为jobName,value为节点的入边集合
    • outEdges:key为jobName,value为节点的出边集合
    • startNodes:起始Node集合,没有入边的谓之起始节点
    • endNodes:结束Node结合,没有出边的谓之结束节点
    • 每个flow里能配置报警等信息

AZ.Storage

Storage:
/**
 * The Azkaban Storage interface would facilitate getting and putting objects into a storage
 * mechanism of choice. By default, this is set to the MySQL database. However, users can have the
 * ability to choose between multiple storage types in future.
 *
 * This is different from storing Azkaban state in MySQL which would typically be maintained in a
 * different database.
 *
 * Note: This is a synchronous interface.
 */
版面 7

Az.DB

该模块主要是azkaban db相关代码。包含以下功能。

  • 建立jdbc连接,获取datasource
  • 执行sql操作接口。

如需扩展其他链接,需要继承==AzkabanDataSource==。实现相应功能。另外,对外使用 主要使用AzkabanDataSource 和DatabaseOperator两个类。

版面 5
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 217,734评论 6 505
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,931评论 3 394
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 164,133评论 0 354
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,532评论 1 293
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,585评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,462评论 1 302
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,262评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,153评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,587评论 1 314
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,792评论 3 336
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,919评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,635评论 5 345
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,237评论 3 329
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,855评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,983评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,048评论 3 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,864评论 2 354

推荐阅读更多精彩内容