Coordinator - better-wfp-00004-queue-ss for WFP-01-02-02 over Tajikistan

This coordinator feeds the better-wfp-00004 data pipeline queue for the Landsat 8 reflectances and vegetation indices over the Tajikistan area defined in

It responds to the user story WFP-01-02-02-US-04 - Data pipeline submission :

As a data pipeline operator, I want to define a data stream for the trigger deployed in WFP-01-02-01-US-03, using the corresponding deployer and trigger identifier, with the following parameters: * Data stream unique identifier * Area of interest (AOI) * Time range (start date / end date) * Data polling period

The data pipeline is submitted as a coordinated job triggering new instances of the pipeline running as streams periodically at the frequency and for the time range specified.

Data selection is based on start and stop acquisition dates of the Landsat 8 products

  • First do the imports of the Python libraries required
In [51]:
import sys
import os

import owslib
from owslib.wps import monitorExecution
from owslib.wps import WebProcessingService
import json

import lxml.etree as etree

import cioppy

from shapely.wkt import loads
import getpass

import folium

from datetime import datetime, timedelta
import dateutil.parser

import requests

from nbconvert.preprocessors import ExecutePreprocessor, CellExecutionError
import nbformat as nbf
  • Read the data pipeline configuration information:
In [52]:
%store -r

nb_config = os.path.join('..', 'configuration.ipynb')

nb =, 4)

exec(nb['cells'][0]['source']) in globals(), locals()
#exec(nb['cells'][1]['source']) in globals(), locals()

app = dict([('artifact_id', app_artifact_id),
            ('version', app_version),
            ('repository', repository),
            ('community', community)])

app_process_id = '%s_%s_%s_%s' % (app['community'].replace('-', '_'),
                                  app['artifact_id'].replace('-', '_'),
                                  app['artifact_id'].replace('-', '_'),
                                  app['version'].replace('.', '_'))

trigger_queue = dict([('artifact_id', trigger_queue_ss_artifact_id),
                      ('version', trigger_queue_ss_version),
                      ('repository', repository),
                      ('folder', folder),
                      ('community', community)])

trigger_queue_process_id = '%s_%s_%s_%s' % (trigger_queue['community'].replace('-', '_'),
                                           trigger_queue['artifact_id'].replace('-', '_'),
                                           trigger_queue['artifact_id'].replace('-', '_'),
                                           trigger_queue['version'].replace('.', '_'))

print 'This notebook will create a coordinator for a queue to invoke the application %s with the trigger %s' % (app_process_id,
  • Set the data transformation parameters
In [53]:
geom = 'POLYGON ((67.7116 37.9032, 68.791 37.9032, 68.791 36.9211, 67.7116 36.9211, 67.7116 37.9032))'
wkt = loads(geom).wkt

Data selection parameters

In [54]:
series = ''
In [55]:
product_type = 'L1TP'

Coordinator parameters

In [56]:
start_queue = '${coord:formatTime(coord:dateOffset(coord:nominalTime(), -26, \'HOUR\'), "yyyy-MM-dd\'T\'HH:mm:ss\'Z\'")}'
end_queue = '${coord:formatTime(coord:dateOffset(coord:nominalTime(), -2, \'HOUR\'), "yyyy-MM-dd\'T\'HH:mm:ss\'Z\'")}'
In [57]:
co_trigger_queue_process_id = 'coordinator_%s' % trigger_queue_process_id
In [64]:
coordinator_date_start = '2017-01-02T00:00Z'
coordinator_date_stop = '2017-01-11T02:01Z'
coordinator_period = '0 2 * * *' #every day at 2 A.M.
coordinator_name = ''

Common Parameters

In [65]:
tg_quotation = 'No'
quotation = 'False'
_T2Username = data_pipeline

Check data transformation application

In [66]:
wps_url_apps = '%s/zoo-bin/zoo_loader.cgi' % apps_deployer

wps = WebProcessingService(wps_url_apps, verbose=False, skip_caps=False)

found_process = False

message = "The process %s is not deployed" % app_process_id

for index, elem in enumerate(wps.processes):

    if elem.identifier == app_process_id:
        message = "The process %s is deployed" % app_process_id
        found_process = True

print message

if not found_process:
    raise Exception(message)
Check trigger coordinator

In [67]:
wps_url_triggers = '%s/zoo-bin/zoo_loader.cgi' % trigger_deployer

wps = WebProcessingService(wps_url_triggers, verbose=False, skip_caps=False)

found_process = False

message = "The queue coordinator process %s is not deployed" % co_trigger_queue_process_id

for index, elem in enumerate(wps.processes):

    if elem.identifier == co_trigger_queue_process_id:
        message = "The queue coordinator process %s is deployed" % co_trigger_queue_process_id
        found_process = True

print message

if not found_process:
    raise Exception(message)
Feed the queue

In [68]:
process = wps.describeprocess(co_trigger_queue_process_id)

print process.title

print process.abstract
Trigger for the WFP-01-02-02 Queue start/stop Coordinator
Coordinator: Trigger for the WFP-01-02-02 Landsat-8 reflectances and vegetation indices data pipeline - Queue using start and stop acquisition dates
In [69]:
for data_input in process.dataInputs:
    print data_input.identifier

Define the input parameters

In [73]:
mode = 'Queue'
coordinator_name = 'Coordinator_better_wfp_00004_queue_ss_Tajikistan'

inputs = [('series', series),
          ('data_pipeline', data_pipeline),
          ('wps_url', wps_url_apps),
          ('process_id', app_process_id),
          ('api_key', datapipeline_api_key),
          ('tg_quotation', tg_quotation),
          ('start', start_queue),
          ('stop', end_queue),
          ('geom',  wkt.replace(' ', '%20').replace(',', '%2C')),
          ('product_type', product_type),
          ('t2_coordinator_date_start', coordinator_date_start),
          ('t2_coordinator_date_stop', coordinator_date_stop),
          ('t2_coordinator_period', coordinator_period),
          ('t2_coordinator_name', coordinator_name),
          ('quotation', quotation),
          ('_T2Username', data_pipeline)]

Submit the coordinator request

In [74]:
execution = owslib.wps.WPSExecution(url=wps_url_triggers)

execution_request = execution.buildRequest(co_trigger_queue_process_id,
                                           output=[('coordinatorIds', False)])

execution_response = execution.submitRequest(etree.tostring(execution_request, pretty_print=True))




if not execution.isSucceded():

    raise Exception('Coordinator %s creation failed' % co_trigger_queue_process_id)
In [75]:
coordinator_id = str(json.loads(execution.processOutputs[0].data[0])['coordinatorsId'][0]['oozieId'])
In [76]:


In [77]:
production_centre = ''
In [79]:
answer = raw_input('Are you sure you want to kill the coordinator %s (YES I DO to confirm)?' % coordinator_id)

if answer == 'YES I DO':
    r = requests.put('%s:11000/oozie/v1/job/%s?' % (production_centre, coordinator_id))
    if r.status_code:
        print 'Coordinator %s killed' % coordinator_id
Coordinator 0021262-180330140554685-oozie-oozi-C killed