# package imports
from requests import HTTPError
import os
from pprint import pp
from dotenv import load_dotenv
from pathlib import Path
import pyeodh
# import API token information
= Path('reqts/.env')
dotenv_path =dotenv_path)
load_dotenv(dotenv_path
= os.getenv("ADES_USER")
username = os.getenv("ADES_TOKEN") token
Running Workflows
Description & purpose: A set of code snippets and guidance to help users submit and execute workflows using the pyeodh API client.
Author(s): Alastair Graham, Dusan Figala
Date created: 2024-09-10
Date last modified: 2024-12-10
Licence: This notebook is licensed under Creative Commons Attribution-ShareAlike 4.0 International. The code is released using the BSD-2-Clause license.
Copyright (c) , All rights reserved.
Redistribution and use in source and binary forms, with or without modification, are permitted provided that the following conditions are met:
Redistributions of source code must retain the above copyright notice, this list of conditions and the following disclaimer. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the following disclaimer in the documentation and/or other materials provided with the distribution. THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS “AS IS” AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
Introduction
This notebook demonstrates how to use the EODH API through pyeodh
to interact with the Workflow Runner (WR).
The EODH platform provides access to a component called the Application Deployment & Execution Service (ADES), otherwise known as the WR. To this, a user can deploy CWL workflows and execute parametrised processing jobs. pyeodh
provides an interface to simplify interaction with ADES from python scripts.
How-to
Note: This API requires authentication credentials to be provided by the user (in this case read from environment variables). This is a subject to change as the hub is implementing proper IAM solution. For information about obtaining API credentials, please see the page introducing the API client.
Note: Once you have the necessary API token, create a .env file and point dotenv_path
(see below) at the file. The file should contain at lease two lines in the following format:
ADES_USER=username ADES_TOKEN=api_token
where username and api_token are the values you use on the system.
First we need to instantiate the pyeodh
client and create an ADES entrypoint.
# Create the link to ADES
= pyeodh.Client(username=username, token=token)
client = client.get_ades() ades
Workflows (represented as executable processes) are predefined applications which can be parameterised and run by users. To get a list of currently available processes in our user workspace we need to implement the ades.get_processes()
method:
for p in ades.get_processes():
print(p.id)
echo
convert-url
Once we can see how many workflows are in the workspace, it is possible to fetch information about a specific workflow (assuming the user knows its name). The process object also contains metadata giving us more information about the process and how to execute it.
For example, it is possible to interrogate the schema of inputs. From this it is possible to parameterise the process, or set the output schema.
= ades.get_process("convert-url")
convert_url_proc
print('Input Schema')
pp(convert_url_proc.inputs_schema)
print('-'*40)
print('Output Schema')
pp(convert_url_proc.outputs_schema)
Input Schema
{'fn': {'title': 'the operation to perform',
'description': 'the operation to perform',
'schema': {'type': 'string'}},
'size': {'title': 'the percentage for a resize operation',
'description': 'the percentage for a resize operation',
'schema': {'type': 'string'}},
'url': {'title': 'the image to convert',
'description': 'the image to convert',
'schema': {'type': 'string'}}}
----------------------------------------
Output Schema
{'converted_image': {'title': 'converted_image',
'description': 'None',
'extended-schema': {'oneOf': [{'allOf': [{'$ref': 'http://zoo-project.org/dl/link.json'},
{'type': 'object',
'properties': {'type': {'enum': ['application/json']}}}]},
{'type': 'object',
'required': ['value'],
'properties': {'value': {'oneOf': [{'type': 'object'}]}}}]},
'schema': {'oneOf': [{'type': 'object'}]}}}
Further information can be found out about the process object, including its id, process_id (name) and its status.
for j in ades.get_jobs():
print(j.id, j.process_id, j.status)
672f0742-b6f5-11ef-a8b0-6a040e2afd6f convert-url successful
Only one process with the same ID can exist. To demonstrate deploying a process further down in this notebook, we first need to undeploy convert-url
. Note that attempting to delete a non-existent process will result in an error.
try:
"convert-url").delete()
ades.get_process(except HTTPError:
print("Process not found, no need to undeploy.")
Let’s deploy the convert-url
process again.
There are two ways we can provide the CWL file - either referencing the file by URL or by passing the CWL file content directly. Note that the ades.deploy_process()
command will fail if we try to create a process with an ID that already exists. If we want to update an existing process, we should use the process.update()
method instead.
Both methods can handle URL or CWL YAML inputs. In the example below a process referenced by URL is deployed and then updated by passing the new CWL YAML content directly.
Note: When updating a worklow you need to provide the entire workflow, the API does not support partial updates (i.e. to change the description we need to provide the entire workflow again).
= ades.deploy_process(
convert_url_proc ="https://raw.githubusercontent.com/EOEPCA/deployment-guide/main/deploy/samples/requests/processing/convert-url-app.cwl"
cwl_url
)print(convert_url_proc.id, convert_url_proc.description)
convert-url Convert URL
= """cwlVersion: v1.0
cwl_yaml $namespaces:
s: https://schema.org/
s:softwareVersion: 0.1.2
schemas:
- http://schema.org/version/9.0/schemaorg-current-http.rdf
$graph:
# Workflow entrypoint
- class: Workflow
id: convert-url
label: convert url app
doc: Convert URL YAML
requirements:
ResourceRequirement:
coresMax: 1
ramMax: 1024
inputs:
fn:
label: the operation to perform
doc: the operation to perform
type: string
url:
label: the image to convert
doc: the image to convert
type: string
size:
label: the percentage for a resize operation
doc: the percentage for a resize operation
type: string
outputs:
- id: converted_image
type: Directory
outputSource:
- convert/results
steps:
convert:
run: "#convert"
in:
fn: fn
url: url
size: size
out:
- results
# convert.sh - takes input args `--url`
- class: CommandLineTool
id: convert
requirements:
ResourceRequirement:
coresMax: 1
ramMax: 512
hints:
DockerRequirement:
dockerPull: eoepca/convert:latest
baseCommand: convert.sh
inputs:
fn:
type: string
inputBinding:
position: 1
url:
type: string
inputBinding:
position: 2
prefix: --url
size:
type: string
inputBinding:
position: 3
outputs:
results:
type: Directory
outputBinding:
glob: .
"""
=cwl_yaml)
convert_url_proc.update(cwl_yamlprint(convert_url_proc.id, convert_url_proc.description)
convert-url Convert URL YAML
The process needs to be parameterised before it is run, but how does a user know what inputs this particular workflow is expecting? That is where the process.inputs_schema
response is useful (see above).
Let’s execute the deployed process. The inputs are best supplied as a dictionary.
= convert_url_proc.execute(
convert_url_job
{"fn": "resize",
"url": "https://eoepca.org/media_portal/images/logo6_med.original.png",
"size": "50%",
}
)
print(convert_url_job.id, convert_url_job.status, convert_url_job.message)
007b69d0-b70b-11ef-aba4-6a040e2afd6f running ZOO-Kernel accepted to run your service!
The job should now be running.
To get the most up-to-date status of the job the user can call the job.refresh()
method and then interrogate the job.status
and job.message
properties.
Note: these properties only hold the latest response from the API, and don’t keep any historical records.
convert_url_job.refresh()print(convert_url_job.id, convert_url_job.status, convert_url_job.message)
007b69d0-b70b-11ef-aba4-6a040e2afd6f running upload required files
# We can continually poll the job using a simple loop and print status and message updates like so:
from pyeodh.ades import AdesJobStatus
import time
= ""
old_status = ""
old_message while convert_url_job.status == AdesJobStatus.RUNNING.value:
2)
time.sleep(
convert_url_job.refresh()if convert_url_job.status != old_status:
print("\n")
print(f"Status: {convert_url_job.status}")
if convert_url_job.message != old_message:
print(f"Message: {convert_url_job.message}")
= convert_url_job.status
old_status = convert_url_job.message old_message
Status: running
Message: execution submitted
Message: delivering outputs, logs and usage report
Status: successful
Message: ZOO-Kernel successfully run your service!
After the job has finished successfully, we can view the results as a link to where the data files are stored.
Note: the outputs of a workflow is a directory conataining a STAC catalogue, where individual assets are represented in the metadata.
= convert_url_job.get_result_items()
results for res in results:
print(res.id, res.assets)
logo6_med.original-resize-1733844363.263300377 {'logo6_med.original-resize': <Asset href=https://ajgwords.workspaces.test.eodhp.eco-ke-staging.com/files/workspaces-eodhp-test/processing-results/cat_007b69d0-b70b-11ef-aba4-6a040e2afd6f/col_007b69d0-b70b-11ef-aba4-6a040e2afd6f/logo6_med.original-resize.png>}