Coordinator - better-SATCEN-00001 queue

This coordinator feeds the better-SATCEN-00001 data pipeline queue with Sentinel-2 or BETTER-common products.

  • First do the imports of the Python libraries required
In [1]:
import sys
import os

import owslib
from owslib.wps import monitorExecution
from owslib.wps import WebProcessingService
import json

import lxml.etree as etree

import cioppy

from shapely.wkt import loads

from datetime import datetime, timedelta
import dateutil.parser

import requests

from nbconvert.preprocessors import ExecutePreprocessor, CellExecutionError
import nbformat as nbf
  • Read the data pipeline configuration information:
In [2]:
%store -r

nb_config = os.path.join('..', 'configuration.ipynb')

nb = nbf.read(nb_config, 4)

exec(nb['cells'][1]['source']) in globals(), locals()

app = dict([('artifact_id', app_artifact_id),
            ('version', app_version),
            ('repository', repository),
            ('community', community)])

app_process_id = '%s_%s_%s_%s' % (app['community'].replace('-', '_'),
                                  app['artifact_id'].replace('-', '_'),
                                  app['artifact_id'].replace('-', '_'),
                                  app['version'].replace('.', '_'))

trigger_queue = dict([('artifact_id', trigger_queue_artifact_id),
                      ('version', trigger_queue_version),
                      ('repository', repository),
                      ('folder', folder),
                      ('community', community)])

trigger_queue_process_id = '%s_%s_%s_%s' % (trigger_queue['community'].replace('-', '_'),
                                           trigger_queue['artifact_id'].replace('-', '_'),
                                           trigger_queue['artifact_id'].replace('-', '_'),
                                           trigger_queue['version'].replace('.', '_'))

print 'This notebook will create a coordinator for a queue to invoke the application %s with the trigger %s' % (app_process_id,
                                                                                                                trigger_queue_process_id)
This notebook will create a coordinator for a queue to invoke the application ec_better_ewf_satcen_01_01_01_ewf_satcen_01_01_01_0_14 with the trigger ec_better_tg_satcen_01_01_01_queue_tg_satcen_01_01_01_queue_0_8
  • Set the data transformation parameters
In [3]:
#The multipolygon of all areas covered by SEN2COR

geom = 'MULTIPOLYGON (((-77.24516666666666 0.9424444444444445, -77.24516666666666 2.753833333333333, -79.0253888888889 2.753833333333333, -79.0253888888889 0.9424444444444445, -77.24516666666666 0.9424444444444445)), ((21.29611111111111 39.58638888888889, 21.29611111111111 41.032, 19.89788888888889 41.032, 19.89788888888889 39.58638888888889, 21.29611111111111 39.58638888888889)), ((65.02055555555556 30.43894444444445, 65.02055555555556 33.39566666666666, 63.94222222222222 33.39566666666666, 63.94222222222222 30.43894444444445, 65.02055555555556 30.43894444444445)), ((19.6875 44.82172222222222, 19.6875 45.97733333333333, 18.70788888888889 45.97733333333333, 18.70788888888889 44.82172222222222, 19.6875 44.82172222222222)))'

wkt = loads(geom)[1].wkt

In [4]:
wkt
Out[4]:
'POLYGON ((21.29611111111111 39.58638888888889, 21.29611111111111 41.032, 19.89788888888889 41.032, 19.89788888888889 39.58638888888889, 21.29611111111111 39.58638888888889))'

Data selection parameters

In [5]:
series = 'https://catalog.terradue.com/sentinel2/description'
In [6]:
product_type = 'S2MSI2A'

Coordinator parameters

In [7]:
start_queue = '${coord:formatTime(coord:dateOffset(coord:nominalTime(), -5, \'DAY\'), "yyyy-MM-dd\'T\'HH:mm:ss\'Z\'")}'
end_queue = '${coord:formatTime(coord:dateOffset(coord:nominalTime(), -4, \'DAY\'), "yyyy-MM-dd\'T\'HH:mm:ss\'Z\'")}'

In [8]:
coordinator_name = 'co_%s_queue_TEST' % data_pipeline
co_trigger_queue_process_id = 'coordinator_%s' % trigger_queue_process_id
print (co_trigger_queue_process_id)
print (coordinator_name)
coordinator_ec_better_tg_satcen_01_01_01_queue_tg_satcen_01_01_01_queue_0_8
co_better-satcen-00001_queue_TEST
In [9]:
coordinator_date_start = '2018-11-21T00:00Z'
coordinator_date_stop = '2018-11-21T23:59Z'
coordinator_period = '15 15 * * *'

Common Parameters

In [10]:
tg_quotation = 'No'
recovery = 'No'
_T2Username = data_pipeline

Check data transformation application

In [11]:
wps_url_apps = '%s/zoo-bin/zoo_loader.cgi' % apps_deployer

wps = WebProcessingService(wps_url_apps, verbose=False, skip_caps=False)


found_process = False

message = "The process %s is not deployed" % app_process_id

for index, elem in enumerate(wps.processes):

    if elem.identifier == app_process_id:
        message = "The process %s is deployed" % app_process_id
        found_process = True

print message

if not found_process:
    raise Exception(message)
The process ec_better_ewf_satcen_01_01_01_ewf_satcen_01_01_01_0_14 is deployed

Check trigger coordinator

In [12]:
wps_url_triggers = '%s/zoo-bin/zoo_loader.cgi' % trigger_deployer

wps = WebProcessingService(wps_url_triggers, verbose=False, skip_caps=False)

found_process = False

message = "The queue coordinator process %s is not deployed" % co_trigger_queue_process_id

for index, elem in enumerate(wps.processes):

    if elem.identifier == co_trigger_queue_process_id:
        message = "The queue coordinator process %s is deployed" % co_trigger_queue_process_id
        found_process = True

print message

if not found_process:
    raise Exception(message)
The queue coordinator process coordinator_ec_better_tg_satcen_01_01_01_queue_tg_satcen_01_01_01_queue_0_8 is deployed

Feed the queue

In [13]:
process = wps.describeprocess(co_trigger_queue_process_id)

print process.title

print process.abstract
Trigger for the SATCEN-01-01-01 Queue Coordinator
Coordinator: Trigger for the SATCEN-01-01-01 data pipeline - Queue
In [14]:
for data_input in process.dataInputs:
    print data_input.identifier
series
data_pipeline
wps_url
process_id
api_key
tg_quotation
update
geom
product_type
flag_expr
percentage_threshold
aoi_wkt
t2_coordinator_date_start
t2_coordinator_date_stop
t2_coordinator_period
t2_coordinator_name
quotation
_T2Username

Define the input parameters

In [16]:

inputs = [('series', series),
          ('data_pipeline', data_pipeline),
          ('wps_url', wps_url_apps),
          ('process_id', app_process_id),
          ('api_key', datapipeline_api_key),
          ('tg_quotation', tg_quotation),
          ('update', '%s/%s' % (start_queue, end_queue)),
          ('geom',  wkt.replace(' ', '%20').replace(',', '%2C')),
          ('product_type', product_type),
          ('flag_expr', 'saturated_l1a_B4 or scl_water'),
          ('percentage_threshold', '20'),
          ('aoi_wkt', wkt),
          ('t2_coordinator_date_start', coordinator_date_start),
          ('t2_coordinator_date_stop', coordinator_date_stop),
          ('t2_coordinator_period', coordinator_period),
          ('t2_coordinator_name',coordinator_name),
          ('quotation', tg_quotation),
          ('_T2Username', data_pipeline)]

Submit the coordinator request

In [18]:
execution = owslib.wps.WPSExecution(url=wps_url_triggers)

execution_request = execution.buildRequest(co_trigger_queue_process_id,
                                           inputs,
                                           output=[('coordinatorIds', False)])

execution_response = execution.submitRequest(etree.tostring(execution_request, pretty_print=True))

execution.parseResponse(execution_response)

execution.statusLocation

monitorExecution(execution)

if not execution.isSucceded():

    raise Exception('Coordinator %s creation failed' % co_trigger_queue_process_id)
In [19]:
coordinator_id = str(json.loads(execution.processOutputs[0].data[0])['coordinatorsId'][0]['oozieId'])
In [20]:
coordinator_id
Out[20]:
'0024664-180330140554685-oozie-oozi-C'

** DANGER ZONE **

In [21]:
answer = raw_input('Are you sure you want to kill the coordinator %s (YES I DO to confirm)?' % coordinator_id)

if answer == 'YES I DO':
    r = requests.put('%s:11000/oozie/v1/job/%s?user.name=oozie&action=kill' % (production_centre, coordinator_id))
    if r.status_code:
        print 'Coordinator %s killed' % coordinator_id