Coordinator - better-satcen-00004 pipe

This coordinator processes the better-satcen-00004 data pipeline queue for the Sentinel-2 DVI, composites and BOA reflectances.

The coordinator periodically launches pipe jobs, which in turn launch jobs of the underlying data-transforming application, according to the parameters defined below.

Preparatory work

  • First do the imports of the Python libraries required
In [ ]:
import sys
import os

import owslib
from owslib.wps import monitorExecution
from owslib.wps import WebProcessingService


import lxml.etree as etree
import json
import cioppy

from shapely.wkt import loads
import getpass

import folium

from datetime import datetime, timedelta
import dateutil.parser

from nbconvert.preprocessors import ExecutePreprocessor, CellExecutionError
import nbformat as nbf

import requests
  • Read the data pipeline configuration information:
In [ ]:
%store -r

nb_config = os.path.join('..', 'configuration.ipynb')

nb = nbf.read(nb_config, 4)

exec(nb['cells'][1]['source']) in globals(), locals()

app = dict([('artifact_id', app_artifact_id),
            ('version', app_version),
            ('repository', repository),
            ('community', community)])

app_process_id = '%s_%s_%s_%s' % (app['community'].replace('-', '_'),
                                  app['artifact_id'].replace('-', '_'),
                                  app['artifact_id'].replace('-', '_'),
                                  app['version'].replace('.', '_'))

trigger_pipe = dict([('artifact_id', trigger_pipe_artifact_id),
                      ('version', trigger_pipe_version),
                      ('repository', repository),
                      ('folder', folder),
                      ('community', community)])

trigger_pipe_process_id = '%s_%s_%s_%s' % (trigger_pipe['community'].replace('-', '_'),
                                           trigger_pipe['artifact_id'].replace('-', '_'),
                                           trigger_pipe['artifact_id'].replace('-', '_'),
                                           trigger_pipe['version'].replace('.', '_'))

print 'This notebook will process the queue of %s with the trigger %s' % (app_process_id,
                                                                          trigger_pipe_process_id)

Pipe selection parameters

The pipe process selects data from the pipeline’s queue series (source-queue), based on the insertion date.

In [ ]:
series = 'https://catalog.terradue.com/%s/series/source-queue/description' % data_pipeline

Coordinator parameters

The coordinator has a start and an end date, a repetition interval (period) and expressions for the start and end date to select from the queue. In the case of ‘0 0 * * *’, the coordinator will be triggered every midnight. That moment is the nominal time of the coordinator execution. If the execution start date is in the past, the coordinator will spawn jobs more frequently until the nominal time has kept up with the current time. The start_pipe and end_pipe dates are expressions relative to the nominal execution time. They are used for formulating the search date range within the pipeline’s queue series. Under normal circumstances, when the coordinator is processing current data, this expression has to define a range in the past (relative to the nominal execution time).

In [ ]:
coordinator_date_start = '2019-02-01T00:00Z'
coordinator_date_stop = '2019-03-01T23:59Z'
coordinator_period = '0 * * * *'
In [ ]:
start_pipe = '${coord:formatTime(coord:dateOffset(coord:nominalTime(), -1, \'DAY\'), "yyyy-MM-dd")}T00:00:00.00Z'
end_pipe = '${coord:formatTime(coord:dateOffset(coord:nominalTime(), -1, \'DAY\'), "yyyy-MM-dd")}T23:59:59.99Z'
In [ ]:
co_trigger_pipe_process_id = 'coordinator_%s' % trigger_pipe_process_id

Common Parameters

In [ ]:
tg_quotation = 'No'
recovery = 'No'
_T2Username = data_pipeline
geometry = ''

Check data transformation application

This step verifies that the WPS process offering for the underlying application is available.

In [ ]:
wps_url_apps = '%s/zoo-bin/zoo_loader.cgi' % apps_deployer

wps = WebProcessingService(wps_url_apps, verbose=False, skip_caps=False)

found_process = False

message = "The process %s is not deployed" % app_process_id

for index, elem in enumerate(wps.processes):

    if elem.identifier == app_process_id:
        message = "The process %s is deployed" % app_process_id
        found_process = True

print message

if not found_process:
    raise Exception()

Check trigger coordinator

This step verifies that the WPS process offering for the pipe trigger coordinator is available.

In [ ]:
wps_url_triggers = '%s/zoo-bin/zoo_loader.cgi' % trigger_deployer

wps = WebProcessingService(wps_url_triggers, verbose=False, skip_caps=False)

found_process = False

message = "The pipe coordinator process %s is not deployed" % co_trigger_pipe_process_id

for index, elem in enumerate(wps.processes):

    if elem.identifier == co_trigger_pipe_process_id:
        message = "The pipe coordinator process %s is deployed" % co_trigger_pipe_process_id
        found_process = True

        print message

if not found_process:
    raise Exception(message)

Pipe trigger setup

  • Display trigger information and parameters (there are no parameters related to the data-transforming application with pipe triggers/coordinators) for manual check.
In [ ]:
process = wps.describeprocess(co_trigger_pipe_process_id)

print process.title

print process.abstract
In [ ]:
for data_input in process.dataInputs:
    print data_input.identifier
  • Set the parameters for the coordinator process.
In [ ]:
inputs = [('series', series),
          ('data_pipeline', data_pipeline),
          ('wps_url', wps_url_apps),
          ('process_id', app_process_id),
          ('update', '%s/%s' % (start_pipe, end_pipe)),
          ('api_key', datapipeline_api_key),
          ('recovery', 'No'),
          ('tg_quotation', tg_quotation),
          ('geom', geometry),
          ('t2_coordinator_date_start', coordinator_date_start),
          ('t2_coordinator_date_stop', coordinator_date_stop),
          ('t2_coordinator_period', coordinator_period),
          ('t2_coordinator_name', 'better-satcen-00004-pipe'),
          ('quotation', tg_quotation),
          ('_T2Username', data_pipeline)]
  • Submit the coordinator request and obtain the Oozie process ID.
In [ ]:
execution = owslib.wps.WPSExecution(url=wps_url_triggers)

execution_request = execution.buildRequest(co_trigger_pipe_process_id,
                                           inputs,
                                           output=[('coordinatorIds', False)])

execution_response = execution.submitRequest(etree.tostring(execution_request, pretty_print=True))

execution.parseResponse(execution_response)

execution.statusLocation

monitorExecution(execution)

if not execution.isSucceded():

    raise Exception('Coordinator %s creation failed' % co_trigger_queue_process_id)
In [ ]:
coordinator_id = str(json.loads(execution.processOutputs[0].data[0])['coordinatorsId'][0]['oozieId'])
print(coordinator_id)

** DANGER ZONE **

This is only to be used to kill the coordinator process.

Alternatively, you can kill using the commands seen in the output of the next cell:

In [ ]:
production_centre = 'http://pc-platform-c01-s01.terradue.com'
coordinator_id='0022466-180330140554685-oozie-oozi-C'
print("JOBID=\"%s\"" % coordinator_id)
print("curl -XPUT \"%s/:11000/oozie/v1/job/${JOBID}?user.name=oozie&action=suspend\"" % (production_centre))
In [ ]:
answer = raw_input('Are you sure you want to kill the coordinator %s (YES I AM to confirm)?' % coordinator_id)

if answer == 'YES I AM':
    r = requests.put('%s:11000/oozie/v1/job/%s?user.name=oozie&action=suspend' % (production_centre, coordinator_id))
    if r.status_code:
        print 'Coordinator %s killed' % coordinator_id