Pipeline Chaining on AWS
There are several ways to connect pipelines in ICAv2. One of them is to use Single Notification Service (SNS) and a Lambda function deployed on AWS. Once the initial pipeline is completed, SNS triggers Lambda function. Lambda extracts from the event parameter information to create an API call to start the subsequent pipeline.

Architecture
Notifications are used to subscribe to events in the platform and trigger the delivery of a message to an external delivery target. Read more here. Important: In order to allow the platform to deliver events to Amazon SQS or SNS delivery targets, a cross-account policy needs to be added to the target Amazon service.
{
"Version":"2012-10-17",
"Statement":[
{
"Effect":"Allow",
"Principal":{
"AWS":"arn:aws:iam::079623148045:root"
},
"Action":"SNS:Publish",
"Resource": "*arn*"
}
]
}
with arn the Amazon Resource Name (ARN) of the target SNS topic. Once the SNS is created in AWS, one can create New ICA Subscription in the Notifications of the corresponding project. The following screenshot displays the settings of a subscription for 'Analysis success' of a pipeline with the name starting with Hello.

Screenshot
To start a Nextflow pipeline using the API one has to use the endpoint /api/projects/{projectId}/analysis:nextflow. One has to provide the projectID and the reference body in JSON format containing userReference, pipelineId, analysisInput etc. Two parameters activationCodeDetailId and analysisStorageId have to be retrieved using the API endpoint api/activationCodes:findBestMatchingForNextflow from Entitlement Detail section in Swagger. Here an example

ScreenshotActivation
Here an output of the API call:
{
"id": "6375eb43-e865-4d7c-a9e2-2c153c998a5c",
"allowedSlots": -1,
"usedSlots": 0,
"movedSlots": 0,
"originalSlots": -1,
"pipelineBundle": {
"id": "b4f2840c-4f79-44db-9e1c-5e7339a1b507",
"name": "ICA_Ent-DE_Pipeline_Entitlement",
"maxNumberOfAllowedSlots": -1,
"activePipelines": [],
"canceledPipelines": [],
"retiredPipelines": [],
"regions": [
{
}
],
"analysisStorages": [
{},
{},
{
"id": "6e1b6c8f-f913-48b2-9bd0-7fc13eda0fd0",
"timeCreated": "2021-11-05T10:28:20Z",
"timeModified": "2021-11-05T10:28:20Z",
"ownerId": "8ec463f6-1acb-341b-b321-043c39d8716a",
"tenantId": "f91bb1a0-c55f-4bce-8014-b2e60c0ec7d3",
"tenantName": "ica-cp-admin",
"name": "Small",
"description": "1.2 TB"
}
]
},
"usages": []
}
In this particular case the activationCodeDetailId is "6375eb43-e865-4d7c-a9e2-2c153c998a5c" and analysisStorageId is "6e1b6c8f-f913-48b2-9bd0-7fc13eda0fd0" (for resource type "Small").
Once one has all these parameters, one can start the pipeline using API.
Next, create a new Lambda function in the AWS Management Console. Choose "Author from scratch" and select "Python3.7" (includes requests library) as the runtime. In the "Function code" section, write the code for the Lambda function that will use different Python modules and execute API calls to the existing online application. Add SNS created above as a trigger.
Here is an example of a Python code to check if there is file named 'test.txt' in the output of the successful pipeline. If the file exists, a new API call will be made to invoke the second pipeline with this 'test.txt' as an input.
import json
import requests
import string
import random
import json
def lambda_handler(event, context):
message = json.loads(event['Records'][0]['Sns']['Message'])
project_id = message['projectId']
analysis_id = message['payload']['id']
url = 'https://ica.illumina.com/ica/rest/api/projects/' + \
project_id + '/analyses/' + analysis_id + '/outputs'
headers = {
'X-API-Key': '${API-KEY}',
'accept': 'application/vnd.illumina.v3+json'
}
response = requests.get(url, headers=headers)
json_data_slice = response.json()['items'][0]['data'][0]['children']
for json_obj in json_data_slice:
if json_obj.get('name') == 'test.txt':
file_id = json_obj['dataId']
# some variables
activation_code_detail_id = '6375eb43-e865-4d7c-a9e2-2c153c998a5c'
analysis_storage_id = "6e1b6c8f-f913-48b2-9bd0-7fc13eda0fd0"
pipeline_id = "fd540bf8-67f1-4506-99e9-c89cc9a98fdd"
user_reference = 'CallLambda' + ''.join(random.choices(string.ascii_uppercase + \
string.digits, k=6))
payload = {
"userReference": user_reference,
"pipelineId": pipeline_id,
"tags": {
"technicalTags": [],
"userTags": []
},
"activationCodeDetailId": activation_code_detail_id,
"analysisStorageId": analysis_storage_id,
"analysisInput": {
"inputs": [
{
"parameterCode": "file",
"dataIds": [
file_id
]
}
]
}
}
url_pipeline2 = 'https://ica.illumina.com/ica/rest/api/projects/' + \
project_id + '/analysis:nextflow'
headers_pipeline2 = {
'X-API-Key': '${API-KEY}',
'accept': 'application/vnd.illumina.v3+json',
'Content-Type': 'application/vnd.illumina.v3+json'
}
response_start = requests.post(url_pipeline2, data=json.dumps(payload), headers=headers_pipeline2)
# Check the response status code
if response_start.status_code == 201:
# POST request successful
response_data = response_start.json()
return {
'statusCode': 201,
'body': response_data
}
else:
# POST request failed
return {
'statusCode': response_start.status_code,
'body': 'Error: Failed'
}
Last modified 1mo ago