Nextflow: Scatter-gather Method

Nextflow offers support for Scatter-gather pattern natively. The initial example uses this pattern by splitting the FASTA file into chunks to channel records in the task splitSequences, then by processing these chunks in the task reverse.

In this tutorial, we will create a pipeline which will split a TSV file into chunks, sort them, and merge them together.

Creating the pipeline

Select Projects > your_project > Flow > Pipelines. From the Pipelines view, click the +Create pipeline > Nextflow button to start creating a Nextflow pipeline.

In the Information tab, add values for the required Code (unique pipeline name) and Description fields. Nextflow Version and Storage size defaults to preassigned values.

First, we present the individual processes. Select +Nextflow files > + Create file and label the file split.nf. Copy and paste the following definition.

process split {
    container 'public.ecr.aws/lts/ubuntu:22.04'
    pod annotation: 'scheduler.illumina.com/presetSize', value: 'standard-small'
    cpus 1
    memory '512 MB'
    
    input:
    path x
    
    output:
    path("split.*.tsv")
    
    """
    split -a10 -d -l3 --numeric-suffixes=1 --additional-suffix .tsv ${x} split.
    """
    }

Next, select +Create file and name the file sort.nf. Copy and paste the following definition.

process sort {
    container 'public.ecr.aws/lts/ubuntu:22.04'
    pod annotation: 'scheduler.illumina.com/presetSize', value: 'standard-small'
    cpus 1
    memory '512 MB'
    
    input:
    path x
    
    output:
    path '*.sorted.tsv'
    
    """
    sort -gk1,1 $x > ${x.baseName}.sorted.tsv
    """
}

Select +Create file again and label the file merge.nf. Copy and paste the following definition.

process merge {
    container 'public.ecr.aws/lts/ubuntu:22.04'
    pod annotation: 'scheduler.illumina.com/presetSize', value: 'standard-small'
    cpus 1
    memory '512 MB'

    publishDir 'out', mode: 'symlink'
    
    input:
    path x
    
    output:
    path 'merged.tsv'
    
    """
    cat $x > merged.tsv
    """
}

Add the corresponding main.nf file by navigating to the Nextflow files > main.nf tab and copying and pasting the following definition.

nextflow.enable.dsl=2
 
include { sort } from './sort.nf'
include { split } from './split.nf'
include { merge } from './merge.nf'
 
 
params.myinput = "test.test"
 
workflow {
    input_ch = Channel.fromPath(params.myinput)
    split(input_ch)
    sort(split.out.flatten())
    merge(sort.out.collect())
}

Here, the operators flatten and collect are used to transform the emitting channels. The Flatten operator transforms a channel in such a way that every item of type Collection or Array is flattened so that each single entry is emitted separately by the resulting channel. The collect operator collects all the items emitted by a channel to a List and return the resulting object as a sole emission.

Finally, copy and paste the following XML configuration into the XML Configuration tab.

<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<pd:pipeline xmlns:pd="xsd://www.illumina.com/ica/cp/pipelinedefinition" code="" version="1.0">
    <pd:dataInputs>
        <pd:dataInput code="myinput" format="TSV" type="FILE" required="true" multiValue="false">
            <pd:label>myinput</pd:label>
            <pd:description></pd:description>
        </pd:dataInput>
    </pd:dataInputs>
    <pd:steps/>
</pd:pipeline>

Click the Generate button (at the bottom of the text editor) to preview the launch form fields.

Click the Save button to save the changes.

Running the pipeline

Go to the Pipelines page from the left navigation pane. Select the pipeline you just created and click Start New Analysis.

Fill in the required fields indicated by red "*" sign and click on Start Analysis button. You can monitor the run from the Analyses page. Once the Status changes to Succeeded, you can click on the run to access the results page.

Select Projects > your_project > Flow > Analyses, and open the Logs tab. From the log files, it is clear that in the first step, the input file is split into multiple chunks, then these chunks are sorted and merged.

Last updated