Pipeline Chaining on AWS

There are several ways to connect pipelines in ICAv2. One of them is to use Single Notification Service (SNS) and a Lambda function deployed on AWS. Once the initial pipeline is completed, SNS triggers the Lambda function. Lambda extracts information from the event parameter to create an API call to start the subsequent pipeline.

SNS

Notifications are used to subscribe to events in the platform and trigger the delivery of a message to an external delivery target. You can read more here. Important: In order to allow the platform to deliver events to Amazon SQS or SNS delivery targets, a cross-account policy needs to be added to the target Amazon service.

        {
        "Version":"2012-10-17",
        "Statement":[
            {
                "Effect":"Allow",
                "Principal":{
                    "AWS":"arn:aws:iam::079623148045:root"
                },
                "Action":"SNS:Publish",
                "Resource": "*arn*"
            }
        ]
        }

with arn being the Amazon Resource Name (ARN) of the target SNS topic. Once the SNS is created in AWS, you can create a New ICA Subscription in Projects > your_project > Project Settings > Notifications > New ICA Subscription. The following screenshot displays the settings of a subscription for Analysis success of a pipeline with the name starting with Hello.

ICA API endpoints

On this site there is a list of all available API endpoints for ICA. To use it, obtain the API-Key from the Illumina ICA portal.

Starting a Nextflow pipeline using the API

To start a Nextflow pipeline using the API, use the endpoint /api/projects/{projectId}/analysis:nextflow. Provide the projectID and the reference body in JSON format containing userReference, pipelineId, analysisInput etc. Two parameters activationCodeDetailId and analysisStorageId have to be retrieved using the API endpoint api/activationCodes:findBestMatchingForNextflow from Entitlement Detail section in Swagger. For example:

Output of the API call:

        {
            "id": "6375eb43-e865-4d7c-a9e2-2c153c998a5c",
            "allowedSlots": -1,
            "usedSlots": 0,
            "movedSlots": 0,
            "originalSlots": -1,
            "pipelineBundle": {
                "id": "b4f2840c-4f79-44db-9e1c-5e7339a1b507",
                "name": "ICA_Ent-DE_Pipeline_Entitlement",
                "maxNumberOfAllowedSlots": -1,
                "activePipelines": [],
                "canceledPipelines": [],
                "retiredPipelines": [],
                "regions": [
                    {
                    }
                ],
                "analysisStorages": [
                    {},
                    {},
                    {
                        "id": "6e1b6c8f-f913-48b2-9bd0-7fc13eda0fd0",
                        "timeCreated": "2021-11-05T10:28:20Z",
                        "timeModified": "2021-11-05T10:28:20Z",
                        "ownerId": "8ec463f6-1acb-341b-b321-043c39d8716a",
                        "tenantId": "f91bb1a0-c55f-4bce-8014-b2e60c0ec7d3",
                        "tenantName": "ica-cp-admin",
                        "name": "Small",
                        "description": "1.2 TB"
                    }
                ]
            },
            "usages": []
        }

In this particular case, the activationCodeDetailId is "6375eb43-e865-4d7c-a9e2-2c153c998a5c" and analysisStorageId is "6e1b6c8f-f913-48b2-9bd0-7fc13eda0fd0" (for resource type "Small").

Once you have all these parameters, you can start the pipeline using API.

Setup of Lambda function

Next, create a new Lambda function in the AWS Management Console. Choose Author from scratch and select Python3.7 (includes requests library) as the runtime. In the Function code section, write the code for the Lambda function that will use different Python modules and execute API calls to the existing online application. Add the SNS created above as a trigger.

Example

Here is an example of a Python code to check if there is file named 'test.txt' in the output of the successful pipeline. If the file exists, a new API call will be made to invoke the second pipeline with this 'test.txt' as an input.

        import json
        import requests

        import string
        import random

        import json

        def lambda_handler(event, context):
            
            message = json.loads(event['Records'][0]['Sns']['Message'])
            
            project_id = message['projectId']
            analysis_id = message['payload']['id']
            
            url = 'https://ica.illumina.com/ica/rest/api/projects/' + \
            project_id + '/analyses/' + analysis_id + '/outputs'
            headers = {
                'X-API-Key': '${API-KEY}',
                'accept': 'application/vnd.illumina.v3+json'
            }

            response = requests.get(url, headers=headers)
            json_data_slice = response.json()['items'][0]['data'][0]['children']
            
            for json_obj in json_data_slice:
                if json_obj.get('name') == 'test.txt':
                    file_id = json_obj['dataId']                   

            # some variables
            
            activation_code_detail_id = '6375eb43-e865-4d7c-a9e2-2c153c998a5c'
            analysis_storage_id = "6e1b6c8f-f913-48b2-9bd0-7fc13eda0fd0"
            pipeline_id = "fd540bf8-67f1-4506-99e9-c89cc9a98fdd"
            user_reference = 'CallLambda' + ''.join(random.choices(string.ascii_uppercase + \
            string.digits, k=6))
            
            payload = {
            "userReference": user_reference,
            "pipelineId": pipeline_id,
            "tags": {
                "technicalTags": [],
                "userTags": []
            },
            "activationCodeDetailId": activation_code_detail_id,
            "analysisStorageId": analysis_storage_id,
            "analysisInput": {
                "inputs": [
                {
                    "parameterCode": "file",
                    "dataIds": [
                    file_id
                    ]
                }
                ]
            }
            }
                 
            url_pipeline2 = 'https://ica.illumina.com/ica/rest/api/projects/' + \ 
            project_id + '/analysis:nextflow'
            headers_pipeline2 = {
                'X-API-Key': '${API-KEY}',
                'accept': 'application/vnd.illumina.v3+json',
                'Content-Type': 'application/vnd.illumina.v3+json'
            }
            
            response_start = requests.post(url_pipeline2, data=json.dumps(payload), headers=headers_pipeline2)

            # Check the response status code
            if response_start.status_code == 201:
                # POST request successful
                response_data = response_start.json()
                return {
                    'statusCode': 201,
                    'body': response_data
                }
            else:
                # POST request failed
                return {
                    'statusCode': response_start.status_code,
                    'body': 'Error: Failed'
                }

Last updated