Running Workflows

Description & purpose: A set of code snippets and guidance to help users submit and execute workflows using the pyeodh API client.

Author(s): Alastair Graham, Dusan Figala

Date created: 2024-09-10

Date last modified: 2024-12-10

Licence: This notebook is licensed under Creative Commons Attribution-ShareAlike 4.0 International. The code is released using the BSD-2-Clause license.

Copyright (c) , All rights reserved.

Redistribution and use in source and binary forms, with or without modification, are permitted provided that the following conditions are met:

Redistributions of source code must retain the above copyright notice, this list of conditions and the following disclaimer. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the following disclaimer in the documentation and/or other materials provided with the distribution. THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS “AS IS” AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

Introduction

This notebook demonstrates how to use the EODH API through pyeodh to interact with the Workflow Runner (WR).

The EODH platform provides access to a component called the Application Deployment & Execution Service (ADES), otherwise known as the WR. To this, a user can deploy CWL workflows and execute parametrised processing jobs. pyeodh provides an interface to simplify interaction with ADES from python scripts.

How-to

Note: This API requires authentication credentials to be provided by the user (in this case read from environment variables). This is a subject to change as the hub is implementing proper IAM solution. For information about obtaining API credentials, please see the page introducing the API client.

Note: Once you have the necessary API token, create a .env file and point dotenv_path (see below) at the file. The file should contain at lease two lines in the following format:

ADES_USER=username ADES_TOKEN=api_token

where username and api_token are the values you use on the system.

First we need to instantiate the pyeodh client and create an ADES entrypoint.

# package imports
from requests import HTTPError
import os
from pprint import pp
from dotenv import load_dotenv
from pathlib import Path

import pyeodh

# import API token information
dotenv_path = Path('reqts/.env')
load_dotenv(dotenv_path=dotenv_path)

username = os.getenv("ADES_USER")
token = os.getenv("ADES_TOKEN")
# Create the link to ADES
client = pyeodh.Client(username=username, token=token)
ades = client.get_ades()

Workflows (represented as executable processes) are predefined applications which can be parameterised and run by users. To get a list of currently available processes in our user workspace we need to implement the ades.get_processes() method:


for p in ades.get_processes():
    print(p.id)
echo
convert-url

Once we can see how many workflows are in the workspace, it is possible to fetch information about a specific workflow (assuming the user knows its name). The process object also contains metadata giving us more information about the process and how to execute it.

For example, it is possible to interrogate the schema of inputs. From this it is possible to parameterise the process, or set the output schema.

convert_url_proc = ades.get_process("convert-url")

print('Input Schema')
pp(convert_url_proc.inputs_schema)

print('-'*40)

print('Output Schema')
pp(convert_url_proc.outputs_schema)
Input Schema
{'fn': {'title': 'the operation to perform',
        'description': 'the operation to perform',
        'schema': {'type': 'string'}},
 'size': {'title': 'the percentage for a resize operation',
          'description': 'the percentage for a resize operation',
          'schema': {'type': 'string'}},
 'url': {'title': 'the image to convert',
         'description': 'the image to convert',
         'schema': {'type': 'string'}}}
----------------------------------------
Output Schema
{'converted_image': {'title': 'converted_image',
                     'description': 'None',
                     'extended-schema': {'oneOf': [{'allOf': [{'$ref': 'http://zoo-project.org/dl/link.json'},
                                                              {'type': 'object',
                                                               'properties': {'type': {'enum': ['application/json']}}}]},
                                                   {'type': 'object',
                                                    'required': ['value'],
                                                    'properties': {'value': {'oneOf': [{'type': 'object'}]}}}]},
                     'schema': {'oneOf': [{'type': 'object'}]}}}

Further information can be found out about the process object, including its id, process_id (name) and its status.

for j in ades.get_jobs():
    print(j.id, j.process_id, j.status)
672f0742-b6f5-11ef-a8b0-6a040e2afd6f convert-url successful

Only one process with the same ID can exist. To demonstrate deploying a process further down in this notebook, we first need to undeploy convert-url. Note that attempting to delete a non-existent process will result in an error.

try:
    ades.get_process("convert-url").delete()
except HTTPError:
    print("Process not found, no need to undeploy.")

Let’s deploy the convert-url process again.

There are two ways we can provide the CWL file - either referencing the file by URL or by passing the CWL file content directly. Note that the ades.deploy_process() command will fail if we try to create a process with an ID that already exists. If we want to update an existing process, we should use the process.update() method instead.

Both methods can handle URL or CWL YAML inputs. In the example below a process referenced by URL is deployed and then updated by passing the new CWL YAML content directly.

Note: When updating a worklow you need to provide the entire workflow, the API does not support partial updates (i.e. to change the description we need to provide the entire workflow again).

convert_url_proc = ades.deploy_process(
    cwl_url="https://raw.githubusercontent.com/EOEPCA/deployment-guide/main/deploy/samples/requests/processing/convert-url-app.cwl"
)
print(convert_url_proc.id, convert_url_proc.description)
convert-url Convert URL
cwl_yaml = """cwlVersion: v1.0
$namespaces:
  s: https://schema.org/
s:softwareVersion: 0.1.2
schemas:
  - http://schema.org/version/9.0/schemaorg-current-http.rdf
$graph:
  # Workflow entrypoint
  - class: Workflow
    id: convert-url
    label: convert url app
    doc: Convert URL YAML
    requirements:
      ResourceRequirement:
        coresMax: 1
        ramMax: 1024
    inputs:
      fn:
        label: the operation to perform
        doc: the operation to perform
        type: string
      url:
        label: the image to convert
        doc: the image to convert
        type: string
      size:
        label: the percentage for a resize operation
        doc: the percentage for a resize operation
        type: string
    outputs:
      - id: converted_image
        type: Directory
        outputSource:
          - convert/results
    steps:
      convert:
        run: "#convert"
        in:
          fn: fn
          url: url
          size: size
        out:
          - results
  # convert.sh - takes input args `--url`
  - class: CommandLineTool
    id: convert
    requirements:
      ResourceRequirement:
        coresMax: 1
        ramMax: 512
    hints:
      DockerRequirement:
        dockerPull: eoepca/convert:latest
    baseCommand: convert.sh
    inputs:
      fn:
        type: string
        inputBinding:
          position: 1
      url:
        type: string
        inputBinding:
          position: 2
          prefix: --url
      size:
        type: string
        inputBinding:
          position: 3
    outputs:
      results:
        type: Directory
        outputBinding:
          glob: .
"""

convert_url_proc.update(cwl_yaml=cwl_yaml)
print(convert_url_proc.id, convert_url_proc.description)
convert-url Convert URL YAML

The process needs to be parameterised before it is run, but how does a user know what inputs this particular workflow is expecting? That is where the process.inputs_schema response is useful (see above).

Let’s execute the deployed process. The inputs are best supplied as a dictionary.

convert_url_job = convert_url_proc.execute(
    {
        "fn": "resize",
        "url": "https://eoepca.org/media_portal/images/logo6_med.original.png",
        "size": "50%",
    }
)

print(convert_url_job.id, convert_url_job.status, convert_url_job.message)
007b69d0-b70b-11ef-aba4-6a040e2afd6f running ZOO-Kernel accepted to run your service!

The job should now be running.

To get the most up-to-date status of the job the user can call the job.refresh() method and then interrogate the job.status and job.message properties.

Note: these properties only hold the latest response from the API, and don’t keep any historical records.

convert_url_job.refresh()
print(convert_url_job.id, convert_url_job.status, convert_url_job.message)
007b69d0-b70b-11ef-aba4-6a040e2afd6f running upload required files
# We can continually poll the job using a simple loop and print status and message updates like so:

from pyeodh.ades import AdesJobStatus
import time


old_status = ""
old_message = ""
while convert_url_job.status == AdesJobStatus.RUNNING.value:
    time.sleep(2)
    convert_url_job.refresh()
    if convert_url_job.status != old_status:
        print("\n")
        print(f"Status: {convert_url_job.status}")
    if convert_url_job.message != old_message:
        print(f"Message: {convert_url_job.message}")

    old_status = convert_url_job.status
    old_message = convert_url_job.message


Status: running
Message: execution submitted
Message: delivering outputs, logs and usage report


Status: successful
Message: ZOO-Kernel successfully run your service!

After the job has finished successfully, we can view the results as a link to where the data files are stored.

Note: the outputs of a workflow is a directory conataining a STAC catalogue, where individual assets are represented in the metadata.

results = convert_url_job.get_result_items()
for res in results:
    print(res.id, res.assets)
logo6_med.original-resize-1733844363.263300377 {'logo6_med.original-resize': <Asset href=https://ajgwords.workspaces.test.eodhp.eco-ke-staging.com/files/workspaces-eodhp-test/processing-results/cat_007b69d0-b70b-11ef-aba4-6a040e2afd6f/col_007b69d0-b70b-11ef-aba4-6a040e2afd6f/logo6_med.original-resize.png>}