Nextflow: Scatter-gather Method
Nextflow offers support for Scatter-gather pattern natively. The initial example uses this pattern by splitting the fasta file into chunks to channel records in the task splitSequences, then by processing these chunks in the task reverse.
In this tutorial we will create a pipeline which will split a TSV file into chunks, sort them, and merge them together. First we present the individual processes:
process split {
container 'ubuntu:latest'
pod annotation: 'scheduler.illumina.com/presetSize', value: 'standard-small'
cpus 1
memory '512 MB'
input:
path x
output:
path("split.*.tsv")
"""
split -a10 -d -l3 --numeric-suffixes=1 --additional-suffix .tsv ${x} split.
"""
}
process sort {
container 'ubuntu:latest'
pod annotation: 'scheduler.illumina.com/presetSize', value: 'standard-small'
cpus 1
memory '512 MB'
input:
path x
output:
path '*.sorted.tsv'
"""
sort -gk1,1 $x > ${x.baseName}.sorted.tsv
"""
}
process merge {
container 'ubuntu:latest'
pod annotation: 'scheduler.illumina.com/presetSize', value: 'standard-small'
cpus 1
memory '512 MB'
publishDir 'out', mode: 'move'
input:
path x
output:
path 'merged.tsv'
"""
cat $x > merged.tsv
"""
}
Here is the corresponding main.nf file: the operators flatten and collect are used to transform the emitting channels. The Flatten operator transforms a channel in such a way that every item of type Collection or Array is flattened so that each single entry is emitted separately by the resulting channel. The collect operator collects all the items emitted by a channel to a List and return the resulting object as a sole emission.
nextflow.enable.dsl=2
include { sort } from './sort.nf'
include { split } from './split.nf'
include { merge } from './merge.nf'
params.myinput = "test.test"
workflow {
input_ch = Channel.fromPath(params.myinput)
split(input_ch)
sort(split.out.flatten())
merge(sort.out.collect())
}
The corresponding XML configuration can be as following:
<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<pd:pipeline xmlns:pd="xsd://www.illumina.com/ica/cp/pipelinedefinition" code="" version="1.0">
<pd:dataInputs>
<pd:dataInput code="myinput" format="TSV" type="FILE" required="true" multiValue="false">
<pd:label>myinput</pd:label>
<pd:description></pd:description>
</pd:dataInput>
</pd:dataInputs>
<pd:steps/>
</pd:pipeline>
From the log files it is clear that in the first step the input file is split into multiple chunks, then these chunks are sorted and merged.
.

Last modified 1mo ago