samples获取
在samples.json文件中写入样本信息。
{
"A" :
{
"R1" : "/local_data1/project/A_Q20L50_1.fastq.gz",
"R2": "/local_data1/project/A_Q20L50_2.fastq.gz"
},
"B" :
{
"R1" : " /local_data1/project//B_Q20L50_1.fastq.gz",
"R2" : " /local_data1/project/B_Q20L50_2.fastq.gz"
}
}
读入json文件。
FILES = json.load(open("test.json"))
FILES是一个字典
{'A': {'R1': '/local_data1/project/A_Q20L50_1.fastq.gz',
'R2': '/local_data1/project/A_Q20L50_2.fastq.gz'},
'B': {'R1': ' /local_data1/project//B_Q20L50_1.fastq.gz',
'R2': ' /local_data1/project/B_Q20L50_2.fastq.gz'}}
ALL_SAMPLES得到列表 ['A','B']
ALL_FASTQ的expand为snakemake中的用法。这里相当于得到
shell脚本和python结合
rule kallisto_quant:
input:
r1 = lambda wildcards: FILES[wildcards.sample]['R1'],
r2 = lambda wildcards: FILES[wildcards.sample]['R2'],
index = rules.kallisto_index.output.index
output:
join(OUT_DIR, '{sample}', 'abundance.tsv'),
join(OUT_DIR, '{sample}', 'run_info.json')
version:
KALLISTO_VERSION
threads:
4
resources:
mem = 4000
run:
fastqs = ' '.join(chain.from_iterable(zip(input.r1, input.r2)))
shell('kallisto quant'
' --threads={threads}'
' --index={input.index}'
' --output-dir=' + join(OUT_DIR, '{wildcards.sample}') +
' ' + fastqs)
cluster wrapper
当用--cluster参数时,snakemake处理顺序
直接提交
snakemake --snakefile res-snake.py --cluster "qsub -l p=2 -q res -o o.logs -e e.logs -cwd" --jobs 8
qsub提交脚本
snakemake -j 2 --cluster-config cluster.json --cluster './bsub_cluster.py {dependencies}'
dependencies
如果在这里'./bsub_cluster.py -p 2'
,脚本增加如下
bsub ... -w 'done(-p) && done(2)'
.sh
http://www.glue.umd.edu/lsf-docs/man/bsub.html
bsub的w参数
specifies the dependency condition of a batch job. Only when depend_cond is satisfied (TRUE), will the job be considered for dispatch.
似乎是设置job之间的联系。仅用于bsub。
snakemake根据snakefile的rule和sample拆分job,每一个job生成一个jobscript,放在临时目录中。比如
01snaketest/.snakemake/tmp.o81t2076/snakejob.fastqc_clean.1.sh
。
每一个jobscript通过qsub提交,生成qsub命令时候,读取cluster引号内容,jobscirpt作为最后一个参数。
bsub -n 1 -W 00:15 -u XXXX -q res -J trimming-17 -o bsub_log/trimming-17.out -e bsub_log/trimming-17.err -M 16384 -R rusage[mem=16384] projects/01snaketest/.snakemake/tmp.5qjmx_9i/snakejob.trimming.17.sh | tail -1 | cut -f 2 -d \< | cut -f 1 -d \>
这是一个jobscriptsnakejob.trimming.17.sh
的内容。
其中cluster的配置由cluster.json设置,--cluster-config cluster.json
读入。
#!/bin/sh
# properties = {"type": "single", "rule": "trimming", "local": false, "input": ["rawData/reads/WR180002S_R1.fastq.gz", "rawData/reads/WR180002S_R2.fastq.gz"], "output": ["outData/trimmed/WR180002S_clean_R1.fq.gz", "outData/trimmed/WR180002S_clean_R2.fq.gz", "outData/trimmed/unpaired_WR180002S_R1.fq.gz", "outData/trimmed/unpaired_WR180002S_R2.fq.gz"], "wildcards": {"sample": "WR180002S"}, "params": {"trimmomatic": "/local_data1/software/Trimmomatic/Trimmomatic-0.38/trimmomatic-0.38.jar"}, "log": [], "threads": 10, "resources": {}, "jobid": 17, "cluster": {"time": "00:15", "cpu": 1, "email": "XXX", "EmailNotice": "N", "MaxMem": 16384, "queue": "res"}}
cd projects/01snaketest && \
software/anaconda3/bin/python3.6 \
-m snakemake outData/trimmed/WR180002S_clean_R1.fq.gz --snakefile /projects/01snaketest/Snakefile \
--force -j --keep-target-files --keep-remote \
--wait-for-files /projects/01snaketest/.snakemake/tmp.t8ndnw4x rawData/reads/WR180002S_R1.fastq.gz rawData/reads/WR180002S_R2.fastq.gz --latency-wait 5 \
--attempt 1 --force-use-threads \
--wrapper-prefix https://bitbucket.org/snakemake/snakemake-wrappers/raw/ \
--allowed-rules trimming --nocolor --notemp --no-hooks --nolock \
--mode 2 && touch "/projects/01snaketest/.snakemake/tmp.t8ndnw4x/17.jobfinished" || (touch "/projects/01snaketest/.snakemake/tmp.t8ndnw4x/17.jobfailed"; exit 1)
简单的wrapper函数
read_job_properties
是snakemake自带函数,可以读取jobscript中的properties,得到一个字典。
#!/usr/bin/env python3
import os
import sys
from snakemake.utils import read_job_properties
jobscript = sys.argv[1]
job_properties = read_job_properties(jobscript)
# do something useful with the threads
threads = job_properties[threads]
# access property defined in the cluster configuration file (Snakemake ≥3.6.0)
job_properties["cluster"]["time"]
print("qsub -t {threads} {script}".format(threads=threads, script=jobscript))