Coordinator - better-wfp-00004 pipe

This coordinator processes the better-wfp-00004 data pipeline queue for the Landsat 8 reflectances and vegetation indices

  • First do the imports of the Python libraries required
In [31]:
import sys
import os
import requests

import owslib
from owslib.wps import monitorExecution
from owslib.wps import WebProcessingService


import lxml.etree as etree
import json
import cioppy

from shapely.wkt import loads
import getpass

import folium

from datetime import datetime, timedelta
import dateutil.parser

from nbconvert.preprocessors import ExecutePreprocessor, CellExecutionError
import nbformat as nbf
  • Read the data pipeline configuration information:
In [32]:
%store -r

nb_config = os.path.join('..', 'configuration.ipynb')

nb = nbf.read(nb_config, 4)

exec(nb['cells'][0]['source']) in globals(), locals()

app = dict([('artifact_id', app_artifact_id),
            ('version', app_version),
            ('repository', repository),
            ('community', community)])

app_process_id = '%s_%s_%s_%s' % (app['community'].replace('-', '_'),
                                  app['artifact_id'].replace('-', '_'),
                                  app['artifact_id'].replace('-', '_'),
                                  app['version'].replace('.', '_'))

trigger_pipe = dict([('artifact_id', trigger_pipe_artifact_id),
                      ('version', trigger_pipe_version),
                      ('repository', repository),
                      ('folder', folder),
                      ('community', community)])

trigger_pipe_process_id = '%s_%s_%s_%s' % (trigger_pipe['community'].replace('-', '_'),
                                           trigger_pipe['artifact_id'].replace('-', '_'),
                                           trigger_pipe['artifact_id'].replace('-', '_'),
                                           trigger_pipe['version'].replace('.', '_'))

print 'This notebook will process the queue of %s with the trigger %s' % (app_process_id,
                                                                          trigger_pipe_process_id)
This notebook will process the queue of ec_better_ewf_wfp_01_02_02_ewf_wfp_01_02_02_0_6 with the trigger ec_better_tg_wfp_01_02_02_pipe_tg_wfp_01_02_02_pipe_0_7

Queue selection parameters

In [33]:
series = 'https://catalog.terradue.com/%s/series/source-queue/description' % data_pipeline

Coordinator parameters

In [34]:
coordinator_date_start = '2018-11-16T00:00Z'
coordinator_date_stop = '2020-02-10T03:01Z'
coordinator_period = '0 3 * * *'
In [35]:
start_pipe = '${coord:formatTime(coord:dateOffset(coord:nominalTime(), -1, \'DAY\'), "yyyy-MM-dd")}T00:00:00Z'
end_pipe = '${coord:formatTime(coord:dateOffset(coord:nominalTime(), -1, \'DAY\'), "yyyy-MM-dd")}T23:59:59.999Z'
In [36]:
co_trigger_pipe_process_id = 'coordinator_%s' % trigger_pipe_process_id

Common Parameters

In [37]:
tg_quotation = 'No'
quotation = 'No'
recovery = 'No'
_T2Username = data_pipeline

Check data transformation application

In [38]:
wps_url_apps = '%s/zoo-bin/zoo_loader.cgi' % apps_deployer

wps = WebProcessingService(wps_url_apps, verbose=False, skip_caps=False)

found_process = False

message = "The process %s is not deployed" % app_process_id

for index, elem in enumerate(wps.processes):

    if elem.identifier == app_process_id:
        message = "The process %s is deployed" % app_process_id
        found_process = True

print message

if not found_process:
    raise Exception()
The process ec_better_ewf_wfp_01_02_02_ewf_wfp_01_02_02_0_6 is deployed

Check trigger coordinator

In [39]:
wps_url_triggers = '%s/zoo-bin/zoo_loader.cgi' % trigger_deployer

wps = WebProcessingService(wps_url_triggers, verbose=False, skip_caps=False)

found_process = False

message = "The pipe coordinator process %s is not deployed" % co_trigger_pipe_process_id

for index, elem in enumerate(wps.processes):

    if elem.identifier == co_trigger_pipe_process_id:
        message = "The pipe coordinator process %s is deployed" % co_trigger_pipe_process_id
        found_process = True

        print message

if not found_process:
    raise Exception(message)
The pipe coordinator process coordinator_ec_better_tg_wfp_01_02_02_pipe_tg_wfp_01_02_02_pipe_0_7 is deployed

Process the queue

In [40]:
process = wps.describeprocess(co_trigger_pipe_process_id)

print process.title

print process.abstract
Trigger for the WFP-01-02-02 Pipe Coordinator
Coordinator: Trigger for the WFP-01-02-02 Landsat-8 reflectances and vegetation indices data pipeline - Pipe
In [41]:
for data_input in process.dataInputs:
    print data_input.identifier
series
data_pipeline
update
geom
api_key
recovery
t2_coordinator_date_start
t2_coordinator_date_stop
t2_coordinator_period
t2_coordinator_name
quotation
_T2Username
In [42]:
coordinator_name = 'Coordinator_better_wfp_00004_pipe'
geom=''

inputs = [('series', series),
          ('data_pipeline', data_pipeline),
          ('update', '%s/%s' % (start_pipe, end_pipe)),
          ('geom', geom),
          ('api_key', datapipeline_api_key),
          ('recovery', 'No'),
          ('wps_url', wps_url_apps),
          ('process_id', app_process_id),
          ('t2_coordinator_date_start', coordinator_date_start),
          ('t2_coordinator_date_stop', coordinator_date_stop),
          ('t2_coordinator_period', coordinator_period),
          ('t2_coordinator_name',coordinator_name),
          ('quotation', quotation),
          ('_T2Username', data_pipeline)]

Submit the coordinator request

In [43]:
execution = owslib.wps.WPSExecution(url=wps_url_triggers)

execution_request = execution.buildRequest(co_trigger_pipe_process_id,
                                           inputs,
                                           output=[('coordinatorIds', False)])

execution_response = execution.submitRequest(etree.tostring(execution_request, pretty_print=True))

execution.parseResponse(execution_response)

execution.statusLocation

monitorExecution(execution)

if not execution.isSucceded():

    raise Exception('Coordinator %s creation failed' % co_trigger_queue_process_id)
In [46]:
coordinator_id = str(json.loads(execution.processOutputs[0].data[0])['coordinatorsId'][0]['oozieId'])
In [47]:
coordinator_id
Out[47]:
'0024145-180330140554685-oozie-oozi-C'

** DANGER ZONE **

In [29]:
production_centre = 'http://pc-platform-c01-s01.terradue.com'
In [30]:

answer = raw_input('Are you sure you want to kill the coordinator %s (YES I DO to confirm)? ' % coordinator_id)

if answer == 'YES I DO':
    r = requests.put('%s:11000/oozie/v1/job/%s?user.name=oozie&action=kill' % (production_centre, coordinator_id))
    if r.status_code:
        print 'Coordinator %s killed' % coordinator_id
Coordinator 0023660-180330140554685-oozie-oozi-C killed