Snakemake pipeline 搭建的进阶教程
关于 Snakemake
Snakemake 是一个工作流管理系统,可用于创建易于迁移和复现的数据分析流程(pipeline),因此被广泛应用于生物学分析中。虽然 Snakemake 算是一个独立的编程语言,但其本质是基于 Python 搭建的,因此如果你对 Python 有一定了解,那么上手 Snakemake 就会更容易很多。并且 Python 技能的掌握对于某些情况下 Pipeline 下游分析部分的拓展是有必要的。
本文将结合多个近些年生物论文中公开的 Snakemake Pipeline,介绍如何搭建一个具有一定功能的 Pipeline。
Snakemake 安装:https://snakemake.readthedocs.io/en/stable/getting_started/installation.html
Snakemake Pipeline 示例教程:https://snakemake.readthedocs.io/en/stable/tutorial/basics.html
有利于提升阅读体验的一些条件:
- 对于 Python 编程已经具备一定的了解,包括文件的一些基本读写操作和 lambda 表达式的应用。
- 具有初步的 Snakemake Pipeline 制作经验。
阅读后有望获得的一些能力:
- 知晓 Snakemake 中一些基本的语法和操作。
- 编写具有明确结构的 Snakemake Pipeline。
搭建 Pipeline
注意,该 Pipeline 的完整版本在以下 github repository 的 Tutorial-Pipeline 可见,建议结合完整 Pipeline 阅读文章以避免不必要的混淆:
https://github.com/JuseTiZ/Blog-Snakemake-pipeline
该 Pipeline 的运行环境也可通过以上 repository 中的指引进行安装和配置。
Pipeline 要求
目标:搭建一个可以根据 SRR accession number 自动下载数据、处理数据和进行表达定量的 Pipeline。
流程:
prefetch
下载数据,该步骤还需配置pigz
(多线程压缩)及fasterq-dump
(提取 fastq)。trim_galore
过滤数据salmon
进行定量
功能:①、Pipeline 可以识别来自不同样本的数据,并把同一样本的多个数据进行合并;②、Pipeline 可以分为单端和双端模式进行处理;③、可以自定义各个软件运行时使用的参数;④、下载数据时如果遇到网络错误应当可以自行重续下载。
需要提前准备的:①、指定自行构建的 salmon 索引;②、已安装好的并放置于环境变量中的软件。
如果你想将软件的版本控制也考虑进 Pipeline 中,可见本文 Pipeline 拓展 部分。
你可以通过下述命令进行 hg38 salmon 索引的构建以在后文中使用,或者你可以在阅读本文后将该步骤嵌入 Pipeline 流程中(但并不建议,因为该步骤并不是每一次分析都需要复用,这种情况下最好作为参数指定):
1 | mkdir hg38_salmon_index |
Pipeline 管理
首先,一个清晰的文件夹结构对于 Pipeline 的管理和拓展是有帮助的,因此请 cd
到拟存储 Pipeline 的路径,并输入以下命令:
1 | cd [path] |
这些文件夹的用途将在后续搭建过程中一一介绍。
Pipeline 搭建
阅读时请留意每个代码块指定的文件,文件头中的 #
仅作文件名说明用,在实际文件中请将该标头去除。
1、输出目标文件确定
搭建一个 Pipeline 具有一定的前置条件,首先你需要对每一个步骤会产生什么样的文件有具体的了解,因此建议完全跑过一套流程以后再进行这一步骤。
按照上述目标,我们首先希望 Pipeline 能够做到以下事情:
①、下载 SRR id 对应的原始数据。
②、经过一系列处理以后,将这些数据在样本层级进行定量。
已知第一步我们会得到一些以 .sra
结尾的文件,而完成 ② 后我们会得到所有样本的 quant.sf
文件,这些将决定我们如何定义 Snakemake 的输出目标。
因此我们先创建一个包含我们定量所用 sra id 及其对应的样本名的文件,命名为 group.txt
,以下分析将以一个人类海拉细胞示例数据集为例进行:
1 | # group.txt |
假设每个 sra 文件我们打算放置于 rawfastq/[srrid]
中,样本的定量结果我们打算放置于 results/[sample]
中,那我们预期得到的文件应该有:
1 | rawfastq/SRR25601734/SRR25601734.sra |
首先,我们需要一个方法指定 group.txt
能让 Snakemake 识别,请在 config/config.yaml
文件中添加以下内容:
1 | inputfile: |
这是一个 yaml 数据格式的文件,我们将在 Snakefile 中读取它并确定相关参数。在之后每次 Pipeline 的复用中一般只需要调整该文件。
现在,请打开之前新建的 Snakefile
文件,输入以下内容:
1 | configfile: "config/config.yaml" |
逐段解释:
首先我们通过
configfile
指定配置文件,该命令行会将config/config.yaml
读取为字典并赋予给config
变量。此后我们通过 Python 语法读取了先前在 config.yaml 中指定的
group.txt
,并分别将 SRR id 和 Sample 的集合传入给了srrlist
和samples
两个变量。通过
rule all
语句,确定 Snakemake 最终检查的文件(用于判断该 workflow 是否运行完成),其中:expand
为 Snakemake 的特殊语法,本质是遍历并得到一个新列表,即expand("{x}", x=xlist)
功能上等价于[f"{x}" for x in xlist]
。此处使用
srrlist
和samples
通过expand
语法将先前预期得到的文件列表传递给了 Snakemake,以使其监控工作流的运行结果。
这里需要注意的是,虽然我们最终的目的仅是得到每个样本的 quant.sf
,但是由于后续 Snakemake 的运行中,其是通过 rule all
中的 input
为不同 rule 的 input 分配通配符的,因此如果不在 input 中给定 srr 文件相关的信息,它无法判断后续我们每个 rule 运行的意图,具体可见后文介绍。
2、工作流框架实现
到这里,我们已经为工作流指定了最终的目标文件,现在我们可以开始搭建得到这一系列文件所需要的框架,接下来的步骤将主要在 rule
文件夹下进行,并涉及到一些 config.yaml
中参数的补充。
由于我们想要开发的 Pipeline 能够根据单端和双端数据进行调整,因此我们现在 config.yaml
文件中添加一个新的参数:
1 | mode: "paired" |
该 mode 参数也可改为 single,后续我们框架中每个 rule 的行为将根据该 mode 的指定发生变化。
1 | cd $tutorial_path/rules |
根据先前拟定的 Pipeline 流程,我们首先创建一个下载规则文件:
1 | touch 01_prefetch.smk |
下载部分中我们可能需要调整的参数:
- 可以同时并行的下载任务数量。该点将通过运行 Snakemake 时指定
--resources
实现。 - 下载如果失败(因为网络原因)重试的次数。该点将通过在
config.yaml
中添加参数实现。
config.yaml
新添参数以让下载失败时重新运行,最多运行三次:
1 | download: |
01_prefetch.smk
内容:
1 | rule download: |
逐段解释:
output
段用于定义该 rule 用于产生什么文件,Snakemake 会自行创建相应目录(如果不存在)。运行完该 rule 后 Snakemake 会检查是否有产生output
中指定的输出文件,如果没有则判断其运行失败。上例中该段指定了一个输出文件srafile
,其中通配符{srr}
的识别方式可见下文。log
段用于定义该 rule 的日志文件,可以在相应命令行中将屏幕输出记录在该文件中。params
段用于定义该 rule 使用的参数。threads
段用于定义该 rule 使用的线程数(核数)。retries
段用于定义该 rule 失败后重新运行的次数,如果不指定则该 rule 失败后即会停止 Pipeline 的运行(正在进行的任务不会中断)。可以在 snakemake 运行时通过--keep-going
使工作流继续运行其他独立的任务。resources
段用于定义该 rule 所占用的资源,可以在 snakemake 运行时通过--resources
指定资源总量。以该 rule 为例,每个下载任务在 rule 中指定会占用 1 个下载槽(download_slots),因此可以通过--resources download_slots=4
分配四个下载槽,使下载任务最多同时进行四个。shell
段用于定义该 rule 执行的 shell 命令行,内容解释如下:{output.srafile}
将替换为 output 段的 srafile 字段。首先,让其检查是否存在.lock
文件(prefetch
在下载过程中会产生的文件,如果不移除则无法重续下载),这一步是为失败以后重试而准备的。此后通过prefetch
下载,将日志输出到{log}
中。下载结束后进行检查,将后缀统一为.sra
(部分 sra 文件以.sralite
结尾)。
对于一个 rule 而言,最少仅需要 output
和指令(如 shell
、script
、run
)部分,其他都是可选字段(增强 rule 的功能和可管理性)。
下载结束后就是我们还需要进行处理,即将 sra
文件提取为 fastq 文件(使用 fasterq-dump
),考虑到这也属于原始数据的处理,因此新的规则可以继续添加在 01_prefetch.smk
中,首先我们确定一些要提前确定的参数:
fasterq-dump
和pigz
使用的线程数。
1 | # config.yaml |
此外,这一步中单端时仅产生一个文件,双端时产生两个文件。因此规则需要根据 config
中指定的 mode
进行行为调整:
1 | # 01_prefetch.smk |
逐段解释:
input
段指定该 rule 需要的输入文件,snakemake 将进行监测直到这些文件出现后再开始运行该 rule。这里涉及到 snakemake 中最重要的一个概念:- Snakemake 将依据各个 rule 的 input 和 output 确认它们彼此之间的依赖关系,构建一个工作流的有向无环图,此后按照图的拓扑顺序运行,确保某一规则在依赖的其他规则结束后才会执行。同样也是在这个过程中,Snakemake 会自动根据目标文件向各规则的 input 和 output 中进行通配符填补(即上述的
{srr}
)。以本文拟构建的 Pipeline 为例(snakemake --dag | dot -Tpng > dag.png
):
- Snakemake 将依据各个 rule 的 input 和 output 确认它们彼此之间的依赖关系,构建一个工作流的有向无环图,此后按照图的拓扑顺序运行,确保某一规则在依赖的其他规则结束后才会执行。同样也是在这个过程中,Snakemake 会自动根据目标文件向各规则的 input 和 output 中进行通配符填补(即上述的
output
段新增了根据 mode 进行的输出文件调整。shell
段中,首先通过fasterq-dump
进行提取,并使用pigz
进行压缩。此后,如果使用单端模式,则将后缀统一为[SRR id].fastq.gz
;如果使用双端模式,则检查是否两端数据都存在。
到此即产生我们所需要的原始数据,将这些 rule 整合进 Snakefile
中(声明 smk
文件):
1 | # Snakefile |
此后 trim_galore
的流程构建也遵循相同的思路(设定参数 —— 编写规则 —— 整合):
1 | touch 02_trimgalore.smk |
1 | # config.yaml |
1 | # 02_trimgalore.smk |
1 | # Snakefile |
这里将监测先前 extract
规则输出的文件,如果 extract
规则运行完成则启动,并根据单端和双端模式自动调节输入的 fastq 文件和过滤后的 fastq 文件。可以设置的参数有两个,一个是每个 trim_galore
使用的线程数,另一个是 trim_galore
的额外参数。通过设置一个可以灵活调节的参数项可以让使用者依据自身的情况进行参数设定而不至于受 Pipeline 限制,例如自行设置最短长度和接头序列等。如果不需要则留空即可。
salmon
的构造流程大致相同,但由于这里使用的通配符将变为样本名,因此这里需要进行一定变动:
1 | touch 03_salmon.smk |
1 | # config.yaml |
1 | # 03_salmon.smk |
1 | # Snakefile |
可以看到这里使用的通配符变为了 {sample}
,并且使用了两个自行构造的函数 get_fastq_list
及 get_fastq_path
,分别用于获取每个样本中所有 fastq 文件的列表和路径。这两个函数的具体实现在 common.smk
中进行:
1 | import csv |
这两个函数根据接收到的模式及 group.txt
内容,返回特定 sample 对应的 fastq 文件列表和路径,其中列表传输给 input 使其监视上游依赖 rule 的运行情况,而路径传输给特定参数以在 salmon 定量中对同一样本的测序数据进行合并。
这里涉及到的其他一些概念:
- wildcards 是一个命名空间,在函数和 lambda 表达式中用于访问通配符。
wildcards.sample
只有在规则的输出、输入或其他字段中定义了{sample}
占位符时才会存在。结合 lambda 和 wildcards 并通过自定义的处理函数可以获取需要动态规划的输入和输出。注意,直接在这些函数中给定{sample}
是不可行的。 - common.smk 同样需要在 Snakefile 中声明,此后该文件中定义的所有函数都可以被其他规则所识别。
1 | # Snakefile |
到这里,Pipeline 的整体搭建就已完成。
3、Pipeline 运行
使用 snakemake 命令行运行 Pipeline:
1 | cd $tutorial_path |
这里 --core
指定 snakemake 总共可以管理的 CPU 核数量,snakemake 将根据每个 rule 占用的 threads 自动分配任务,例如:
如果一个 extract 要占用 8 个 threads,一个 salmon 要占用 4 个 threads。那么 snakemake 可能同时进行一个 extract 任务和两个 salmon 任务,确保提供的核被最大化利用(前提是这些任务的前置依赖都已经满足)。
--resources
指定 snakemake 的其他分配资源,这些都是用户规定的。以上述命令为例,Snakemake 共有 4 个 download_slots 资源,而在需要 download_slots 资源运行的 rule 中,同时运行所需的资源总和将被限定在这个值以内。在该 Pipeline 中即最高只可同时并行四个下载任务。
运行完成后,留意以下文件夹:
log
文件夹中包含各个环节运行时的输出日志文件。rawfastq
文件夹中为原始测序数据。trimgalore_result
文件夹中为过滤后的测序数据。results
文件夹中为各样本的定量结果。
实测以上 Pipeline 能够正确运行,如果存在问题请前往 github repository 页面查看完整 Pipeline 并确定出错点:
https://github.com/JuseTiZ/Blog-Snakemake-pipeline/tree/main/Tutorial-Pipeline
拓展 Pipeline
以下内容不再详细解释 Pipeline 中如何具体实现,但原理较简单且具有一定实用性,因此这里做一些简单介绍。
版本控制
每个 rule 在运行时都可以指定一个 conda 环境,例如:
1 | rule download: |
这里的相对路径是以 rule 所在 smk 文件开始寻找的,你可以在 ../envs/sratool.yaml
中填写以下内容:
1 | name: sra |
在运行该规则时,Snakemake 会寻找名为 sra 的环境,如果不存在则新建该环境并安装指定依赖(sra-tools)。
请注意,Snakemake 所有环境操作都是默认使用 mamba
进行的,如果你想使用 conda
进行需要在命令行中指定 --conda-frontend conda
(不推荐)。
另外你也可以在 snakefile 中指定运行这套 pipeline 所需的最低版本,例如:
1 | from snakemake.utils import min_version |
性能监测
每个 rule 在运行时都可以指定一个 benchmark 文件,例如:
1 | rule download: |
Snakemake 会自动测量并记录该 rule 的执行时间、CPU 使用率、内存使用情况等信息并储存到 benchmark 文件中。这对于需要优化工作流和分析性能瓶颈的情况非常有用。
再次改进的可能
不难看出,上述 Pipeline 依然存在很多可以改进的地方以增强 Pipeline 的可读性和可拓展性,但要做到这些可能会在一定程度上提升理解难度,因此如果你认为上述内容不算困难且想要进一步检验自己当前的能力,可以考虑对该 Pipeline 做以下改进:
- 将
group.txt
换为sample.yaml
,在Snakefile
中提取到config
变量里,据此进行 rule all input 的指定(可通过动态函数进行),并在 salmon 规则中的 input 和 output 里使用更具效率的函数进行动态规划,sample.yaml
示例:
1 | samples: |
- 将涉及单端双端判断的 rule 拆分成两个 rule(一个用于 single-end,一个用于 paired-end),并将
mode
参数从config.yaml
中移除,使 Pipeline 能够同时处理单端和双端的数据。例如可以通过在上面提到的sample.yaml
中添加每个样本的数据类型来进行单双端数据的混合处理:
1 | samples: |
- 添加下游分析部分,例如新建
scripts
文件夹存放自定义的用于提取表达矩阵的脚本,并在 Pipeline 中添加新的规则以对其进行实现(别忘了 rule all 的 input 部分也要进行对应更新)。
前两者的具体实现可参考 repository 下的 ChIP Pipeline。