The most NextFlow-like (DSL2) way to incorporate a former bash scheduler submission script to a NextFlow workflow
New to NextFlow
, here, and struggling with some basic concepts. I'm in the process of converting a set of bash
scripts from a previous publication into a NextFlow
workflow.
I'm converting a simple bash script (included below for convenience) that did some basic prep work and submitted a new job to the cluster scheduler for each iteration.
Ultimate question: What is the most NextFlow-like way to incorporate this script into a NextFlow workflow (preferably using the new DSL2 schema)?
Possible subquestion: Is it possible to emit a list of lists based on bash
variables? I've seen ways to pass lists from workflows into processes, but not out of process. I could print each set of parameters to a file and then emit that file, but that doesn't seem very NextFlow-like.
I would really appreciate any guidance on how to incorporate the following bash
script into a NextFlow workflow. I have added comments and indicate the four variables that I need to emit as a set of parameters.
Thanks!
# Input variables. I know how to take these in.
GVCF_DIR=$1
GATK_bed=$2
RESULT_DIR=$3
CAMO_MASK_REF_PREFIX=$4
GATK_JAR=$5
# For each directory
for dir in ${GVCF_DIR}/*
do
# Do some some basic prep work defining
# variables and setting up results directory
ploidy=$(basename $dir)
repeat=$((${ploidy##*_} / 2))
result_dir="${RESULT_DIR}/genotyped_by_region/${ploidy}" # Needs to be emitted
mkdir -p $result_dir
# Create a new file with a list of files. This file
# will be used as input to the downstream NextFlow process
gvcf_list="${ploidy}.gvcfs.list" # Needs to be emitted
find $dir -name "*.g.vcf" > $gvcf_list
REF="${CAMO_MASK_REF_PREFIX}.${ploidy}.fa" # Needs to be emitted
# For each line in the $GATK_bed file where
# column 5 == repeat (defined above), submit
# a new job to the scheduler with that region.
awk "\$5 == $repeat {print \$1\":\"\$2\"-\"\$3}" $GATK_bed | \
while read region # Needs to be emitted
do
qsub combine_and_genotype.ogs \
$gvcf_list \
$region \
$result_dir \
$REF \
$GATK_JAR
done
done
What is the most NextFlow-like way to incorporate this script into a NextFlow workflow
In some cases, it is possible to incorporate third-party scripts that do not need to be compiled "as-is" by making them executable and moving them into a folder called 'bin' in the root directory of your project repository. Nextflow automatically adds this folder to the $PATH in the execution environment.
However, some scripts do not lend themselves for inclusion in this manner. This is especially the case if the objective is to produce a portable and reproducible workflow, which is how I interpret "the most Nextflow-like way". The objective ultimately becomes how run each process step in isolation. Given your example, below is my take on this:
nextflow.enable.dsl=2
params.GVCF_DIRECTORY = './path/to/directories'
params.GATK_BED_FILE = './path/to/file.bed'
params.CAMO_MASK_REF_PREFIX = 'someprefix'
params.publish_dir = './results'
process combine_and_genotype {
publishDir "${params.publish_dir}/${dirname}"
container 'quay.io/biocontainers/gatk4:4.2.4.1--hdfd78af_0'
cpus 1
memory 40.GB
input:
tuple val(dirname), val(region_string), path(ref_fasta), path(gvcf_files)
output:
tuple val(dirname), val(region_string), path("full_cohort.combined.${region}.g.vcf")
script:
region = region_string.replaceAll(':', '_')
def avail_mem = task.memory ? task.memory.toGiga() : 0
def Xmx = avail_mem >= 8 ? "-Xmx${avail_mem - 1}G" : ''
def Xms = avail_mem >= 8 ? "-Xms${avail_mem.intdiv(2)}G" : ''
"""
cat << __EOF__ > "${dirname}.gvcf.list"
${gvcf_files.join('\n'+' '*4)}
__EOF__
gatk \\
--java-options "${Xmx} ${Xms} -XX:+UseSerialGC" \\
CombineGVCFs \\
-R "${ref_fasta}" \\
-L "${region_string}" \\
-O "full_cohort.combined.${region}.g.vcf" \\
-V "${dirname}.gvcf.list"
gatk \\
--java-options "${Xmx} ${Xms} -XX:+UseSerialGC" \\
GenotypeGVCFs \\
-R "${ref_fasta}" \\
-L "${region_string}" \\
-O "full_cohort.combined.${region}.vcf" \\
-V "full_cohort.combined.${region}.g.vcf" \\
-A GenotypeSummaries
"""
}
workflow {
GVCF_DIRECTORY = file( params.GVCF_DIRECTORY )
GATK_BED_FILE = file( params.GATK_BED_FILE )
Channel.fromPath( params.GATK_BED_FILE ) \
| splitCsv(sep: '\t') \
| map { row ->
tuple( row[4].toInteger(), "${row[0]}:${row[1]}-${row[2]}" )
} \
| set { regions }
Channel.fromPath( "${GVCF_DIRECTORY.toString()}/**/*.g.vcf" ) \
| map { tuple( GVCF_DIRECTORY.relativize(it).subpath(0,1).name, it ) } \
| groupTuple() \
| map { dirname, gvcf_files ->
def ploidy = dirname.replaceFirst(/^.*_/, "").toInteger()
def repeat = ploidy.intdiv(2)
def ref_fasta = file( "${params.CAMO_MASK_REF_PREFIX}.${dirname}.fa" )
tuple( repeat, dirname, ref_fasta, gvcf_files )
} \
| combine( regions, by: 0 ) \
| map { repeat, dirname, ref_fasta, gvcf_files, region ->
tuple( dirname, region, ref_fasta, gvcf_files )
} \
| combine_and_genotype
}
From the GATK docs, I couldn't actually see where the variant inputs could be a list of files. Maybe this feature was only available using an older GATK. The code above is untested.
Also, you will need to ensure your code is indented using four spaces. The above will throw some error if tab indentation is used, or if you were to indent using a different number of spaces.