CCTools调度框架介绍
CCTools
应用
- Makeflow 是生物信息常用的一种流程组织工具。详细信息可以参考其官方网站。其优点在于代码可以在本地机器执行的同时也可以按在集群环境进行执行。【参考文献:http://www.xtaohub.com/Getting-Started/Makeflow-best-pra】
特点
实际任务黑盒化
Bash 命令描述所有的逻辑
任务的数据依赖需要文件系统的接入
支持简单的资源管理?
- 是否有负载均衡算法或者资源调度算法?
提供 C 和 Python 接口
可以和 Parsl 结合使用: Parsl 在上层
几大组件
TaskVine
- TaskVine 是一个用于构建在高性能计算 (HPC) 集群、GPU 集群、云服务提供商和其他分布式计算系统上运行的大规模数据密集型动态工作流程的框架。工作流程是以图形结构组织的程序和文件的集合,允许工作流程的各个部分以并行、可重现的方式运行:TaskVine 工作流程需要一个管理器和大量工作进程。应用程序生成大量小任务,并将其分发给 worker。随着任务访问外部数据源并产生自己的输出,越来越多的数据被拉入集群节点上的本地存储。这些数据用于加速未来的任务并避免重新计算现有结果。应用程序“像藤蔓一样”通过集群逐渐生长。
- TaskVine 管理器应用程序可以在 Linux 或 OSX 平台上用 Python 或 C 编写。单个任务可以是简单的 Python 函数、复杂的 Unix 应用程序或无服务器函数调用。关键思想是声明文件对象,然后声明使用它们并生成新文件对象的任务。例如,此代码片段从项目存储库中提取一个输入文件,并运行一个任务来搜索字符串“needle”,生成文件 output.txt
f = m.declare_url("https://www.gutenberg.org/cache/epub/2600/pg2600.txt")
g = m.declare_file("myoutput.txt")
t = Task("grep needle warandpeace.txt > output.txt")
t.add_input(f, "warandpeace.txt")
t.add_output(g, "outfile.txt")
- 任务共享一组公共选项。每个任务都可以用其执行所需的资源(CPU 核心、GPU 设备、内存、磁盘空间)进行标记。这允许每个Worker打包适当数量的任务。例如,在 64 核计算机上运行的工作程序可以运行 32 个双核任务、16 个四核任务或总计为 64 个核心的任何其他组合。如果您不知道所需的资源,可以通过 Resource Monitor **来自动跟踪、报告和分配每个任务使用的资源。**
t.set_cores(2)
t.set_memory(4096)
t.set_tag("config-4.5.0")
# this can once again be done at task declaration as well:
t = vine.Task(
command = "./gzip < my-file > my-file.gz",
cores = 2,
memory = 4096,
tag = "config-4.5.0"
)
- TaskVine 很容易部署在现有的 HPC 和云设施上。工作进程是独立的可执行文件,TaskVine 安排将所有必要的任务依赖项移至工作进程,使系统能够自托管。应用程序通常包含数以百万计的任务,这些任务运行在数千名 worker 上。提供的工具可轻松在 HTCondor、SLURM 和 Grid Engine 上部署 worker。
三种任务形式
- 标准任务由 Unix 命令行组成
- Python 任务由 Python 函数及其依赖项组成
def my_sum(x, y):
return x+y
# task to execute x = my_sum(1, 2)
t = vine.PythonTask(my_sum, 1, 2)
- 无服务器任务由远程库中的函数调用组成
def my_sum(x, y):
return x+y
def my_mul(x, y):
return x*y
libtask = m.create_library_from_functions("my_library", my_sum, my_mul)
t = vine.FunctionCall("my_library", "my_mul", 20, 30);
t.set_cores(1)
t.set_memory(100)
t.set_disk(100)
m.submit(t)
TaskVine Online Status Display
TaskVine Factory
- 可以给每个 worker 配置不同的参数种类!
- 如果您可以访问其他机器,您可以简单地在那里进行 ssh 并运行工作程序。一般来说,启动的 worker 越多,工作完成的速度就越快。如果 worker 失败,TaskVine 基础设施将在其他地方重试工作,因此将许多 worker 提交给不可靠的系统是安全的。
workers = vine.Factory("condor", "myproject")
workers.cores = 4
workers.memory = 4000
workers.disk = 5000
workers.max_workers = 20
with workers:
while not m.empty():
t = m.wait(5)
...
更高效的文件管理机制
缓存机制 => 具体实现技术没有很理解
支持执行上下文
- 已经非常类似容器的管理方案了,可以为每一个执行的进程都配置自己所需要的上下文
{
"conda": {
"channels": [
"conda-forge"
],
"dependencies": [
"python=3.10",
"numpy=1.24.2"
]
}
}
更高效的任务处理机制
安全 => 不关心
失败重试机制
- 在 TaskVine 中,设计了失败重试机制!
t.set_retries(5) # Task will be try at most 6 times (5 retries).
# this can be done at task declaration as well:
t = vine.Task(
command = ...,
retries = 5
)
流水线提交
- 如果您有大量任务要运行,则可能无法提交所有任务,然后等待所有任务。相反,提交少量任务,然后交替等待和提交,以在管理器中保持恒定的数量。饥饿者会告诉您是否需要更多提交:
磁盘文件的自动垃圾回收
- 对于在计算出最终结果后生成不再需要的部分结果的工作流程,当应用程序指示不再需要它们时,TaskVine 可以自动从磁盘中删除它们
partial_result = m.declare_file("my_partial_result", unlink_when_done=True)
t1 = Task(...)
t1.add_output(partial_result, "my_partial_result")
...
t2 = Task(...)
t2.add_input(partial_result, "my_partial_result")
...
# once t2 is done, the following call will remove the file from the
# taskvine workflow. Further, when no task refers to the file, the file
# will be removed from the manager's disk because of unlink_when_done=True
# at its declaration.
m.undeclare_file(partial_result)
自动断开慢的 workers
- 大型计算通常会因落后者而减慢速度。如果您有大量需要很短时间的小任务,那么自动断开慢速 worker 的连接会有所帮助。启用此功能后,将保留任务执行时间的统计信息并终止统计异常值。如果同一 worker 中取消了两个不同的任务,则该 worker 将被断开连接并列入黑名单。
支持任务的取消: 适合流式任务场景
- 此功能在存在冗余任务或随着其他任务完成而变得过时的任务的工作流中非常有用。可以通过 task_id 或 tag 来删除任务。以这种方式删除的任务仍将通过等待以通常的方式返回,结果为 VINE_RESULT_CANCELLED。例如:
# create task as usual and tag it with an arbitrary string.
t = vine.Task(...)
t.set_tag("my-tag")
# or set tag in task declaration
t = vine.Task(
command = ...,
tag = "my-tag"
)
taskid = m.submit(t)
# cancel task by id.
m.cancel_by_taskid(taskid)
# or cancel task by tag.
m.cancel_by_tasktag("my-tag")
资源管理模块
Task Resources
- 要在 worker 中运行多个任务,每个任务都必须有其使用的资源的描述,包括核心、内存、磁盘和 GPU。虽然时间不完全是一种资源,但指定任务的运行时间通常有助于将任务映射到 worker。可以按以下示例指定这些资源:
- t.set_cores(1) => CPU 核心
- t.set_memory(1024) => Mem
- t.set_disk(4096) => Disk 空间的配置
- t.set_gpus(0) => 配置需要的 GPU 数量
- t.set_time_max(100) => 任务可以执行的最大时间,实际执行时超过这个时间的任务会被 kill
- t.set_time_min(10) => 用户设置的一个最短时间,用于辅助任务在系统中找到合适的 worker
t.set_cores(1) # task needs one core
t.set_memory(1024) # task needs 1024 MB of memory
t.set_disk(4096) # task needs 4096 MB of disk space
t.set_gpus(0) # task does not need a gpu
t.set_time_max(100) # task is allowed to run in 100 seconds
t.set_time_min(10) # task needs at least 10 seconds to run (see vine_worker --wall-time option above)
t.add_feature("NVIDIA RTX A2000") # task requires this specific GPU type
# these can be set when the task is declared as well:
t = vine.Task(
command = "./gzip < my-file > my-file.gz",
cores = 1,
memory = 1024,
disk = 4096,
gpus = 0,
time_max = 100,
time_min = 10
)
- 资源分配的主要原则
- 如果任务没有指定任何资源,那么它会被分配一个完整的 worker。
- 任务将分配至少与指定资源值一样多的资源。例如,指定两个核心的任务将被分配至少两个核心。
- 如果 GPU 仍未指定,则为任务分配零个 GPU。
- 如果任务指定 GPU,但未指定核心,则为该任务分配零个核心???
- 在所有其他情况下,worker 的核心、内存和磁盘将根据指定任务需求占 worker 资源的最大比例进行平均分配。
Worker Resources
- 默认情况下,worker 尝试使用其正在运行的机器的所有资源。 Worker 启动时会显示检测到的资源。也可以可以手动调整由 worker 管理的资源
Factory Resources
- vine_factory 接受参数 –cores、–memory、–disk 和 –gpus 来设置 worker 的资源配置。
GPU Type 选择
t.add_feature("NVIDIA RTX A2000") # task requires worker with this feature
监控资源的运行情况
t = m.wait(5)
if t:
print("Task used {} cores, {} MB memory, {} MB disk",
t.resources_measured.cores,
t.resources_measured.memory,
t.resources_measured.disk)
print("Task was allocated {} cores, {} MB memory, {} MB disk",
t.resources_requested.cores,
t.resources_requested.memory,
t.resources_requested.disk)
if t.limits_exceeded and t.limits_exceeded.cores > -1:
print("Task exceeded its cores allocation.")
把具有类似资源需求的任务组合起来
- 多个任务通常共享相同的资源描述,为此,TaskVine 允许您将任务分为称为类别的组。您可以将资源描述附加到每个类别,然后标记任务以将其设置为类别的一部分。
- 我们可以创建一些类别及其资源描述,如下所示:
# memory and disk values in MB.
m.set_category_resources_max('my-category-a', {'cores': 2, 'memory': 1024, 'disk': 2048, 'gpus': 0})
m.set_category_resources_max('my-category-b', {'cores': 1})
m.set_category_resources_max('my-category-c', {})
自动资源管理
- 如果类别使用的资源未知,则可以引导 TaskVine 找到有效的资源值,以最大化吞吐量或最小化资源浪费。在这些模式中,如果使用 set_resources_max 设置资源值,则将其用作理论最大值。
- 自动计算资源时,如果 set_resources_max 中未指定任何核心、内存或磁盘,那么 TaskVine 将使用整个 worker 运行一些任务来收集一些资源使用统计信息。如果设置了所有核心、内存和磁盘,则 TaskVine 将使用这些最大值,而不是使用整个 worker。和之前一样,未指定的 GPU 默认为 0。
- 一旦一些统计数据可用,如果这种更改会增加吞吐量,则可以使用较小的分配来运行更多任务。如果任务耗尽其资源,将使用 set_resources_max 的值或整个 worker 重试,如前所述。
性能管理模块
Workflow Integration
和 Parsl 集成
- TaskVine 可以用作 Parsl 工作流的工作流执行引擎,补充 Parsl 的资源管理能力
import parsl
from parsl import python_app
from parsl.config import Config
from parsl.executors.taskvine import TaskVineExecutor
from parsl.executors.taskvine import TaskVineFactoryConfig
from parsl.executors.taskvine import TaskVineManagerConfig
config = Config(
executors=[
TaskVineExecutor(
factory_config=TaskVineFactoryConfig(
batch_type="condor",
min_workers=1,
max_workers=1,
cores=12,
),
manager_config=TaskVineManagerConfig(
project_name="taskvine_parsl",
)
)
]
)
parsl.load(Config)
l = ["Cooperative", "Computing", "Lab"]
@python_app
def hello_taskvine(x, l=l):
return l[x]
futures = []
for i in range(3):
futures.append(hello_taskvine(i))
for i in futures:
print(i.result())
和 Dask 集成
- TaskVine 可用于使用管理器作为 Dask 调度程序来执行 Dask 工作流程。 DaskVine 类实现了一个 TaskVine 管理器,它有一个 get 方法
Work Queue
工作队列(Work Queue)是一个用于构建大型管理者-工作者应用程序的框架。使用工作队列库,你可以创建一个自定义管理程序,定义并提交大量小型任务。每个任务都分配给一个远程工作进程,由其执行并返回结果。随着结果的产生,管理器可能会生成更多任务来执行。编写将数百万个任务分配给数千个远程工作者的程序并不罕见。
Makeflow
Makeflow 是用于大规模分布式计算的工作流引擎。它接受要执行的大量工作的规范,并尽可能在远程计算机上并行运行。此外,Makeflow 是容错的,因此您可以使用它来协调非常大的任务,这些任务可能在出现故障时运行数天或数周。Makeflow 的设计与 Make 类似,因此如果可以编写 Makefile,那么就可以编写 Makeflow。
Makeflow 使得将大量工作从一个设施移动到另一个设施变得很容易。在编写了一个工作流之后,您可以在本地笔记本电脑上进行测试,然后在您的大学计算中心运行它,将其转移到类似 XSEDE 这样的国家级计算设施,然后再转到商业云系统。使用(捆绑的)工作队列系统,您甚至可以同时跨多个系统运行。无论您在何处运行任务,工作流语言都保持不变。
Makeflow 在生产中用于支持科学和工程中的大规模问题。生物信息学、生物特征学、地理学和高能物理等领域的研究人员都使用 Makeflow 从现有应用程序中组合工作流。
Makeflow 可以将您的作业发送到各种各样的服务,例如批处理系统(HTCondor、SGE、PBS、Torque)、集群管理器(Mesos 和 Kubernetes)、云服务(amazonec2 或 Lambda)和容器环境(如 Docker 和 Singularity)。批处理系统支持部分提供了每个系统的详细信息。
Resource Monitor: 低配版 Automap
- resources_monitor 是一个工具,用于监视由作为参数给出的命令及其所有后代创建的进程所使用的计算资源。监视器间接工作,即通过观察进程运行时环境如何变化,因此报告的所有信息都应被视为估计。
-
resource_monitor 最多生成三个日志文件:
- 一个 JSON 编码的摘要文件,其中包含所用资源的最大值及其发生时间
- a time-series that shows the resources used at given time intervals
- 执行期间打开的文件列表: resource_monitor 可以设置为根据某些文件中的事件(例如,当创建、删除文件或文件中出现正则表达式模式时)生成测量快照。最大资源限制可以以文件或命令行给出的字符串的形式指定。如果其中一项资源超出了指定的限制,则监视器将终止任务,并报告哪个资源超出了相应的限制。
- JSON 编码的摘要文件包含的信息
- 时间序列日志每个时间样本都有一行。对于每一行,各列具有以下含义:
JX Language
- 支持使用 JX 工作流语言描述,使得整个工作流的描述非常便捷
{
"define" : {
"RANGE" : range(1,4),
"FILELIST" : [ "output." + N + ".txt" for N in RANGE ],
},
"categories" : {
"simulate" : {
"resources" : { "cores" : 4, "memory" : 512, "disk" : 1024 }
},
"collect" : {
"resources" : { "cores" : 1, "memory" : 512, "disk" : 8192 }
}
},
"rules" : [
{
"command" : "python ./simulate.py --parameter " + N + " > output."+N+".txt",
"inputs" : [ "simulate.py" ],
"outputs" : [ "output." + N + ".txt" ],
"category" : "simulate"
} for N in RANGE,
{
"command" : "/bin/cat " + join(FILELIST," ") + " > output.all.txt",
"inputs" : FILELIST,
"outputs" : [ "output.all.txt" ],
"category" : "collect"
}
]
}