异步任务框架Machinery简介及工程实践

背景

在任何一个稍微复杂的后台系统中,异步任务总是无法避免的,其具有松耦合、易扩展的特性。我当前开发使用的语言是Golang,需要类似python下的celery,PHP中laraval框架的Queues等异步任务框架或任务队列系统。

异步框架Machinery简介

简介

Machinery是一个第三方开源的基于分布式消息分发的异步任务队列,有着以下特性:

  • 任务重试机制
  • 延迟任务支持
  • 任务回调机制
  • 任务结果记录
  • 支持Workflow模式:Chain,Group,Chord
  • 多Brokers支持:Redis, AMQP, AWS SQS
  • 多Backends支持:Redis, Memcache, AMQP, MongoDB

当前Machinery在v1 stable版本,作者在文档中说明“请注意使用V2版本直到其准备完成为止,因为V2正在开发之中且有可能发生重大更改”,所以在实际项目中,我们也是采用v1稳定版本。可以通过go get github.com/RichardKnop/machinery/v1获取。

组件构成

Machinery中的各个组件,组成了一个放大的“生产者-消费者”模型,由服务端生成任务并放进队列中,由执行端充当消费者从队列中领取任务并执行,框架由以下组件和概念构成:

  • Server:处理客户端同步请求,并生成异步任务,放置进任务队列;
  • Worker:工作线程,从任务队列中消费任务并执行;
  • Broker:任务队列,存储序列化后的任务,支持多种类型如Redis, AMQP, AWS SQS
  • Backend:后端存储,存储任务执行状态和结果,支持多种类型如Redis, Memcache, AMQP, MongoDB
  • Signature:任务实体,用来描述任务的执行过程、所需参数等信息。

工作流程

machinery-workflow

基本的工作流程如下:

  1. 由server生成并发布任务,推送到broker中;
  2. worker通过key向broker订阅任务,当key相同的任务到达时,worker消费任务;
  3. worker执行任务;
  4. worker将执行结果(SUCCESS、FAILURE)存储至backend。

状态维护

Machinery中每个任务状态有PENDING、RECEIVED、STARTED、RETRY、SUCCESS和FAILURE几种,任务在生成和处理的不同阶段状态不同:

  1. server插入任务时状态为PENDING;
  2. worker收到任务时状态为RECEIVED;
  3. worker开始执行任务将状态修改为STARTED;
  4. 当设置retry标志时,任务一旦失败,状态修改为RETRY,等待后续轮转执行;
  5. 任务最终的执行结果为SUCCESS或FAILURE,表示成功或失败,失败时伴随失败原因存储至backend。
    状态流转示意图如下:

machinery-state-flow

关于Machinery详细用法,请参考官方文档,本文不做赘述。

工程应用

当前我负责产品的数据中心模块支持多种数据源的数据接入、多种存储介质的数据存储、数据集管理、数据导出等功能。

支持多种结构化/非结构化数据源及数据存储的适配,支持:

  • 配置结构化存储,如Mysql;
  • 非结构化存储,如腾讯cos、开源ceph、hdfs;
  • 块存储,如nas;

作为平台存储。

同时支持对接多种外部数据源获取训练样本及业务生产数据,如腾讯cos,开源ceph,大数据hdfs存储,Hive数据仓库及结构化mysql数据库。

异步任务场景

在上述业务功能和逻辑实现过程中,伴随着大量耗时操作如数据集导入、数据导出、异步下载、异步删除等场景:

  • 数据集导入:客户端上传数据集压缩包后,发送创建数据集请求,后台异步完成数据集导入逻辑;
  • 数据集导出:根据客户端选中数据集标签,筛选多个数据集的数据文件和标签,生成压缩文件导出;
  • 异步下载:后台拉取数据集并生成压缩包后,生成下载地址给客户端下载;
  • 异步删除:客户端发起删除请求后,同步删除数据集记录,并异步清理数据集文件;
    ……(等其他场景)
    上述业务操作均需要通过异步任务的方式来增加系统的吞吐量,减少同步请求的耗时。

Machinery实践应用

由于数据中心业务逻辑较复杂,涉及的操作比较多,无法一一列举,这里以数据集异步删除为例,说明Machinery在项目过程中的实践应用。

异步删除流程图

dataset-delete-workflow

该工作流程解释如下:

  1. server端收到请求并处理;
  2. server端发送删除文件异步任务;
  3. server端删除数据集记录;
  4. 返回请求处理成功,无耗时操作,同步返回;
  5. 多worker实例分取1个删除文件任务;
  6. 每个worker实例执行删除任务。

示例代码如下。

Server端代码示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// server端注册删除文件任务
server.RegisterTask("delete-file", DeleteFile)

// 生成删除文件任务
signature := &tasks.Signature{
UUID: "delete-file-1234567",
Name: "delete-file",
RoutingKey: "delete-file",
Args: []tasks.Arg{
{
Name: "filePath"
Type: "string",
Value: "/data/dataset/1234567",
},
},
}

// 发送任务至broker
asyncResult, err := server.SendTask(signature)

Worker端代码示例

1
2
3
4
5
6
7
8
9
10
11
12
// 获取任务执行worker,此处删除文件队列名为"delete-file-queue"
worker := server.NewCustomQueueWorker("delete-file"/*RoutingKey*/, 2/*worker并发数*/, "delete-file-queue")
// 启动worker
errorsChan := make(chan error)
worker.LaunchAsync(errorsChan)

// delete-file执行方法(已由server端注册)
// Machinery执行方法返回的最后一个参数必须为error
func DeleteFile(filePath string) error {
// 简单示例
os.RemoveAll(filePath)
}

总结

本文介绍了Machinery异步框架的组件构成、工作原理、状态流转等基本内容,阐述了该框架在实际项目实际开发中的应用,并给出了实际业务场景中的简化实现。