Links

Pipeline Chaining on AWS

There are several ways to connect pipelines in ICAv2. One of them is to use Single Notification Service (SNS) and a Lambda function deployed on AWS. Once the initial pipeline is completed, SNS triggers Lambda function. Lambda extracts from the event parameter information to create an API call to start the subsequent pipeline.
Architecture

SNS

Notifications are used to subscribe to events in the platform and trigger the delivery of a message to an external delivery target. Read more here. Important: In order to allow the platform to deliver events to Amazon SQS or SNS delivery targets, a cross-account policy needs to be added to the target Amazon service.
{
"Version":"2012-10-17",
"Statement":[
{
"Effect":"Allow",
"Principal":{
"AWS":"arn:aws:iam::079623148045:root"
},
"Action":"SNS:Publish",
"Resource": "*arn*"
}
]
}
with arn the Amazon Resource Name (ARN) of the target SNS topic. Once the SNS is created in AWS, one can create New ICA Subscription in the Notifications of the corresponding project. The following screenshot displays the settings of a subscription for 'Analysis success' of a pipeline with the name starting with Hello.
Screenshot

ICA API endpoints

On this site the user have a list of all available API endpoints for ICA. To use it, one has to obtain the API-Key from the Illumina ICA portal.

Starting a Nextflow pipeline using the API

To start a Nextflow pipeline using the API one has to use the endpoint /api/projects/{projectId}/analysis:nextflow. One has to provide the projectID and the reference body in JSON format containing userReference, pipelineId, analysisInput etc. Two parameters activationCodeDetailId and analysisStorageId have to be retrieved using the API endpoint api/activationCodes:findBestMatchingForNextflow from Entitlement Detail section in Swagger. Here an example
ScreenshotActivation
Here an output of the API call:
{
"id": "6375eb43-e865-4d7c-a9e2-2c153c998a5c",
"allowedSlots": -1,
"usedSlots": 0,
"movedSlots": 0,
"originalSlots": -1,
"pipelineBundle": {
"id": "b4f2840c-4f79-44db-9e1c-5e7339a1b507",
"name": "ICA_Ent-DE_Pipeline_Entitlement",
"maxNumberOfAllowedSlots": -1,
"activePipelines": [],
"canceledPipelines": [],
"retiredPipelines": [],
"regions": [
{
}
],
"analysisStorages": [
{},
{},
{
"id": "6e1b6c8f-f913-48b2-9bd0-7fc13eda0fd0",
"timeCreated": "2021-11-05T10:28:20Z",
"timeModified": "2021-11-05T10:28:20Z",
"ownerId": "8ec463f6-1acb-341b-b321-043c39d8716a",
"tenantId": "f91bb1a0-c55f-4bce-8014-b2e60c0ec7d3",
"tenantName": "ica-cp-admin",
"name": "Small",
"description": "1.2 TB"
}
]
},
"usages": []
}
In this particular case the activationCodeDetailId is "6375eb43-e865-4d7c-a9e2-2c153c998a5c" and analysisStorageId is "6e1b6c8f-f913-48b2-9bd0-7fc13eda0fd0" (for resource type "Small").
Once one has all these parameters, one can start the pipeline using API.

Setup of Lambda function

Next, create a new Lambda function in the AWS Management Console. Choose "Author from scratch" and select "Python3.7" (includes requests library) as the runtime. In the "Function code" section, write the code for the Lambda function that will use different Python modules and execute API calls to the existing online application. Add SNS created above as a trigger.

Example

Here is an example of a Python code to check if there is file named 'test.txt' in the output of the successful pipeline. If the file exists, a new API call will be made to invoke the second pipeline with this 'test.txt' as an input.
import json
import requests
import string
import random
import json
def lambda_handler(event, context):
message = json.loads(event['Records'][0]['Sns']['Message'])
project_id = message['projectId']
analysis_id = message['payload']['id']
url = 'https://ica.illumina.com/ica/rest/api/projects/' + \
project_id + '/analyses/' + analysis_id + '/outputs'
headers = {
'X-API-Key': '${API-KEY}',
'accept': 'application/vnd.illumina.v3+json'
}
response = requests.get(url, headers=headers)
json_data_slice = response.json()['items'][0]['data'][0]['children']
for json_obj in json_data_slice:
if json_obj.get('name') == 'test.txt':
file_id = json_obj['dataId']
# some variables
activation_code_detail_id = '6375eb43-e865-4d7c-a9e2-2c153c998a5c'
analysis_storage_id = "6e1b6c8f-f913-48b2-9bd0-7fc13eda0fd0"
pipeline_id = "fd540bf8-67f1-4506-99e9-c89cc9a98fdd"
user_reference = 'CallLambda' + ''.join(random.choices(string.ascii_uppercase + \
string.digits, k=6))
payload = {
"userReference": user_reference,
"pipelineId": pipeline_id,
"tags": {
"technicalTags": [],
"userTags": []
},
"activationCodeDetailId": activation_code_detail_id,
"analysisStorageId": analysis_storage_id,
"analysisInput": {
"inputs": [
{
"parameterCode": "file",
"dataIds": [
file_id
]
}
]
}
}
url_pipeline2 = 'https://ica.illumina.com/ica/rest/api/projects/' + \
project_id + '/analysis:nextflow'
headers_pipeline2 = {
'X-API-Key': '${API-KEY}',
'accept': 'application/vnd.illumina.v3+json',
'Content-Type': 'application/vnd.illumina.v3+json'
}
response_start = requests.post(url_pipeline2, data=json.dumps(payload), headers=headers_pipeline2)
# Check the response status code
if response_start.status_code == 201:
# POST request successful
response_data = response_start.json()
return {
'statusCode': 201,
'body': response_data
}
else:
# POST request failed
return {
'statusCode': response_start.status_code,
'body': 'Error: Failed'
}