Coordinator - better-SATCEN-00001 pipe

This coordinator processes the better-SATCEN-00001 data pipeline queue for the Sentinel2 or BETTER-common products.

  • First do the imports of the Python libraries required
In [1]:
import sys
import os

import owslib
from owslib.wps import monitorExecution
from owslib.wps import WebProcessingService


import lxml.etree as etree
import json
import cioppy

from shapely.wkt import loads
import getpass

import folium

from datetime import datetime, timedelta
import dateutil.parser

from nbconvert.preprocessors import ExecutePreprocessor, CellExecutionError
import nbformat as nbf

import requests
  • Read the data pipeline configuration information:
In [2]:
%store -r

nb_config = os.path.join('..', 'configuration.ipynb')

nb = nbf.read(nb_config, 4)

exec(nb['cells'][1]['source']) in globals(), locals()

app = dict([('artifact_id', app_artifact_id),
            ('version', app_version),
            ('repository', repository),
            ('community', community)])

app_process_id = '%s_%s_%s_%s' % (app['community'].replace('-', '_'),
                                  app['artifact_id'].replace('-', '_'),
                                  app['artifact_id'].replace('-', '_'),
                                  app['version'].replace('.', '_'))

trigger_pipe = dict([('artifact_id', trigger_pipe_artifact_id),
                      ('version', trigger_pipe_version),
                      ('repository', repository),
                      ('folder', folder),
                      ('community', community)])

trigger_pipe_process_id = '%s_%s_%s_%s' % (trigger_pipe['community'].replace('-', '_'),
                                           trigger_pipe['artifact_id'].replace('-', '_'),
                                           trigger_pipe['artifact_id'].replace('-', '_'),
                                           trigger_pipe['version'].replace('.', '_'))

print 'This notebook will process the queue of %s with the trigger %s' % (app_process_id,
                                                                          trigger_pipe_process_id)
This notebook will process the queue of ec_better_ewf_satcen_01_01_01_ewf_satcen_01_01_01_0_14 with the trigger ec_better_tg_satcen_01_01_01_pipe_tg_satcen_01_01_01_pipe_0_4
In [3]:
trigger_pipe_process_id
Out[3]:
'ec_better_tg_satcen_01_01_01_pipe_tg_satcen_01_01_01_pipe_0_4'

Queue selection parameters

In [4]:
series = 'https://catalog.terradue.com/%s/series/source-queue/description' % data_pipeline
product_type = 'S2MSI2Ap'
In [5]:


geom = 'MULTIPOLYGON (((-77.24516666666666 0.9424444444444445, -77.24516666666666 2.753833333333333, -79.0253888888889 2.753833333333333, -79.0253888888889 0.9424444444444445, -77.24516666666666 0.9424444444444445)), ((21.29611111111111 39.58638888888889, 21.29611111111111 41.032, 19.89788888888889 41.032, 19.89788888888889 39.58638888888889, 21.29611111111111 39.58638888888889)), ((65.02055555555556 30.43894444444445, 65.02055555555556 33.39566666666666, 63.94222222222222 33.39566666666666, 63.94222222222222 30.43894444444445, 65.02055555555556 30.43894444444445)), ((19.6875 44.82172222222222, 19.6875 45.97733333333333, 18.70788888888889 45.97733333333333, 18.70788888888889 44.82172222222222, 19.6875 44.82172222222222)))'

wkt = loads(geom)[1].wkt
print(wkt)
POLYGON ((21.29611111111111 39.58638888888889, 21.29611111111111 41.032, 19.89788888888889 41.032, 19.89788888888889 39.58638888888889, 21.29611111111111 39.58638888888889))

Coordinator parameters

The coordinator has a start and an end date, a repetition interval (period) and expressions for the start and end date to select from the queue. In the case of ‘0 0 * * *’, the coordinator will be triggered every midnight. That moment is the nominal time of the coordinator execution. If the execution start date is in the past, the coordinator will spawn jobs more frequently until the nominal time has kept up with the current time. The _startpipe and _endpipe dates are expressions relative to the nominal execution time. They are used for formulating the search date range within the pipeline’s queue series. Under normal circumstances, when the coordinator is processing current data, this expression has to define a range in the past (relative to the nominal execution time).

In [6]:
coordinator_date_start = '2019-01-01T00:00Z'
coordinator_date_stop = '2020-01-01T00:01Z'
coordinator_period = '0 0 * * *'
In [7]:
start_pipe = '${coord:formatTime(coord:dateOffset(coord:nominalTime(), -1, \'HOUR\'), "yyyy-MM-dd\'T\'HH:mm:ss\'Z\'")}'
end_pipe = '${coord:formatTime(coord:dateOffset(coord:nominalTime(), -0, \'HOUR\'), "yyyy-MM-dd\'T\'HH:mm:ss\'Z\'")}'
In [8]:
coordinator_name = 'co_%s_pipe_TEST' % data_pipeline
co_trigger_pipe_process_id = 'coordinator_%s' %trigger_pipe_process_id
print (co_trigger_pipe_process_id)
coordinator_ec_better_tg_satcen_01_01_01_pipe_tg_satcen_01_01_01_pipe_0_4

Common Parameters

In [9]:
tg_quotation = 'No'
recovery = 'No'
_T2Username = data_pipeline

Check data transformation application

In [10]:
wps_url_apps = '%s/zoo-bin/zoo_loader.cgi' % apps_deployer

wps = WebProcessingService(wps_url_apps, verbose=False, skip_caps=False)

found_process = False

message = "The process %s is not deployed" % app_process_id

for index, elem in enumerate(wps.processes):

    if elem.identifier == app_process_id:
        message = "The process %s is deployed" % app_process_id
        found_process = True

print message

if not found_process:
    raise Exception()
The process ec_better_ewf_satcen_01_01_01_ewf_satcen_01_01_01_0_14 is deployed

Check trigger coordinator

In [11]:
wps_url_triggers = '%s/zoo-bin/zoo_loader.cgi' % trigger_deployer

wps = WebProcessingService(wps_url_triggers, verbose=False, skip_caps=False)

found_process = False

message = "The pipe coordinator process %s is not deployed" % co_trigger_pipe_process_id

for index, elem in enumerate(wps.processes):

    if elem.identifier == co_trigger_pipe_process_id:
        message = "The pipe coordinator process %s is deployed" % co_trigger_pipe_process_id
        found_process = True

        print message

if not found_process:
    raise Exception(message)
The pipe coordinator process coordinator_ec_better_tg_satcen_01_01_01_pipe_tg_satcen_01_01_01_pipe_0_4 is deployed

Process the queue

In [12]:
process = wps.describeprocess(co_trigger_pipe_process_id)

print process.title

print process.abstract
Trigger for the SATCEN-01-01-01 Pipe Coordinator
Coordinator: Trigger for the SATCEN-01-01-01 SATCEN-01-01-01 Sentinel-2 Vegetation and Water Thematic Index - Pipe
In [13]:
for data_input in process.dataInputs:
    print data_input.identifier
series
data_pipeline
update
api_key
geom
t2_coordinator_date_start
t2_coordinator_date_stop
t2_coordinator_period
t2_coordinator_name
quotation
_T2Username
In [14]:
inputs = [('series', series),
          ('data_pipeline', data_pipeline),
          ('update', '%s/%s' % (start_pipe, end_pipe)),
          ('api_key', datapipeline_api_key),
          ('geom',  wkt.replace(' ', '%20').replace(',', '%2C')),
          ('t2_coordinator_date_start', coordinator_date_start),
          ('t2_coordinator_date_stop', coordinator_date_stop),
          ('t2_coordinator_period', coordinator_period),
          ('t2_coordinator_name',coordinator_name),
          ('quotation', tg_quotation),
          ('_T2Username', data_pipeline)]

Submit the coordinator request

In [15]:
co_trigger_pipe_process_id
Out[15]:
'coordinator_ec_better_tg_satcen_01_01_01_pipe_tg_satcen_01_01_01_pipe_0_4'
In [16]:
execution = owslib.wps.WPSExecution(url=wps_url_triggers)

execution_request = execution.buildRequest(co_trigger_pipe_process_id,
                                           inputs,
                                           output=[('coordinatorIds', False)])

execution_response = execution.submitRequest(etree.tostring(execution_request, pretty_print=True))

execution.parseResponse(execution_response)

execution.statusLocation

monitorExecution(execution)

if not execution.isSucceded():

    raise Exception('Coordinator %s creation failed' % co_trigger_pipe_process_id)
In [17]:
coordinator_id = str(json.loads(execution.processOutputs[0].data[0])['coordinatorsId'][0]['oozieId'])
In [18]:
coordinator_id
Out[18]:
'0024665-180330140554685-oozie-oozi-C'

** DANGER ZONE **

In [ ]:
answer = raw_input('Are you sure you want to kill the coordinator %s (YES I DO to confirm)?' % coordinator_id)

if answer == 'YES I DO':
    r = requests.put('%s:11000/oozie/v1/job/%s?user.name=oozie&action=kill' % (production_centre, coordinator_id))
    if r.status_code:
        print 'Coordinator %s killed' % coordinator_id