Coordinator - better-WFP-00002 queue

This coordinator feeds the better-wfp-00002 data pipeline queue for the Sentinel-1 backscatter timeseries

  • First do the imports of the Python libraries required
In [4]:
import sys
import os

import owslib
from owslib.wps import monitorExecution
from owslib.wps import WebProcessingService
import json

import lxml.etree as etree

import cioppy

from shapely.wkt import loads
import getpass

import folium

from datetime import datetime, timedelta
import dateutil.parser

import requests

from nbconvert.preprocessors import ExecutePreprocessor, CellExecutionError
import nbformat as nbf
  • Read the data pipeline configuration information:
In [5]:
%store -r

nb_config = os.path.join('..', 'configuration.ipynb')

nb = nbf.read(nb_config, 4)

exec(nb['cells'][1]['source']) in globals(), locals()

app = dict([('artifact_id', app_artifact_id),
            ('version', app_version),
            ('repository', repository),
            ('community', community)])

app_process_id = '%s_%s_%s_%s' % (app['community'].replace('-', '_'),
                                  app['artifact_id'].replace('-', '_'),
                                  app['artifact_id'].replace('-', '_'),
                                  app['version'].replace('.', '_'))

trigger_queue = dict([('artifact_id', trigger_queue_artifact_id),
                      ('version', trigger_queue_version),
                      ('repository', repository),
                      ('folder', folder),
                      ('community', community)])

trigger_queue_process_id = '%s_%s_%s_%s' % (trigger_queue['community'].replace('-', '_'),
                                           trigger_queue['artifact_id'].replace('-', '_'),
                                           trigger_queue['artifact_id'].replace('-', '_'),
                                           trigger_queue['version'].replace('.', '_'))

print 'This notebook will create a coordinator for a queue to invoke the application %s with the trigger %s' % (app_process_id,
                                                                                                                trigger_queue_process_id)
This notebook will create a coordinator for a queue to invoke the application ec_better_ewf_wfp_01_01_02_ewf_wfp_01_01_02_0_20 with the trigger ec_better_tg_wfp_01_01_02_queue_tg_wfp_01_01_02_queue_0_8

Area of Interest definition

The geometry defining the 4 sub areas of interest for WFP-01-01-02:

  • NW - South Sudan
  • Renk - Blue Nile
  • Niger Delta, Mali
  • NE - Nigeria
In [6]:

geom = 'MULTIPOLYGON (((26.832 9.5136, 28.6843 9.5136, 28.6843 7.8009, 26.832 7.8009, 26.832 9.5136)), ((21.29611111111111 39.58638888888889, 21.29611111111111 41.032, 19.89788888888889 41.032, 19.89788888888889 39.58638888888889, 21.29611111111111 39.58638888888889)), ((-5.5 17.26, -1.08 17.26, -1.08 13.5, -5.5 13.5, -5.5 17.26)), ((67.7116 37.9032, 68.791 37.9032, 68.791 36.9211, 67.7116 36.9211, 67.7116 37.9032)))'

In [ ]:
wkt = loads(geom)[0]
AoI = wkt.wkt

Visual check on the AOI

In [13]:
lat = (loads(AoI).bounds[3]+loads(AoI).bounds[1])/2
lon = (loads(AoI).bounds[2]+loads(AoI).bounds[0])/2

zoom_start = 8

m = folium.Map(location=[lat, lon], zoom_start=zoom_start)

radius = 4
folium.CircleMarker(
    location=[lat, lon],
    radius=radius,
    color='#FF0000',
    stroke=False,
    fill=True,
    fill_opacity=0.6,
    opacity=1,
    popup='{} pixels'.format(radius),
    tooltip='I am in pixels',
).add_to(m)

locations = []

locations.append([t[::-1] for t in list(loads(AoI).exterior.coords)])

folium.PolyLine(
    locations=locations,
    color='#FF0000',
    weight=2,
    tooltip='',
).add_to(m)

m.save(os.path.join('maps', '%s_search.html' % data_pipeline))

m
Out[13]:
Check data transformation application
In [14]:
wps_url_apps = '%s/zoo-bin/zoo_loader.cgi' % apps_deployer

wps = WebProcessingService(wps_url_apps, verbose=False, skip_caps=False)

found_process = False

message = "The process %s is not deployed" % app_process_id

for index, elem in enumerate(wps.processes):

    if elem.identifier == app_process_id:
        message = "The process %s is deployed" % app_process_id
        found_process = True

print message

if not found_process:
    raise Exception(message)
The process ec_better_ewf_wfp_01_01_02_ewf_wfp_01_01_02_0_20 is deployed
Check trigger coordinator
In [15]:
wps_url_triggers = '%s/zoo-bin/zoo_loader.cgi' % trigger_deployer

wps = WebProcessingService(wps_url_triggers, verbose=False, skip_caps=False)

found_process = False

message = "The queue coordinator process %s is not deployed" % co_trigger_queue_process_id

for index, elem in enumerate(wps.processes):

    if elem.identifier == co_trigger_queue_process_id:
        message = "The queue coordinator process %s is deployed" % co_trigger_queue_process_id
        found_process = True

print message

if not found_process:
    raise Exception(message)
The queue coordinator process coordinator_ec_better_tg_wfp_01_01_02_queue_tg_wfp_01_01_02_queue_0_8 is deployed
In [16]:
trigger_deployer
Out[16]:
'https://ec-better-triggers-deployer-creodias-c1.terradue.com'
Feed the queue
In [17]:
process = wps.describeprocess(co_trigger_queue_process_id)

print process.title

print process.abstract
Trigger for the WFP-01-01-02 Queue Coordinator
Coordinator: Trigger for the WFP-01-01-02 Sentinel-1 coherence timeseries - Queue

Get the needed input parameters list

In [18]:
for data_input in process.dataInputs:
    print data_input.identifier
Sources
data_pipeline
wps_url
process_id
api_key
tg_quotation
polarisation
swaths
cohWinRg
cohWinAz
wkt
update
start
stop
product_type
swath
geom
t2_coordinator_date_start
t2_coordinator_date_stop
t2_coordinator_period
t2_coordinator_name
quotation
_T2Username

Set the input parameters

In [ ]:
series = 'https://catalog.terradue.com/sentinel1/search'

tg_quotation = 'No'
quotation = tg_quotation
_T2Username = data_pipeline
In [ ]:
polarisation = 'VV,VH'
subswaths = 'IW1,IW2,IW3'
cohWinRg = '20'
cohWinAz = '5'
wkt = AoI
In [9]:
start_queue = ''
stop_queue = ''

update_start_queue = '${coord:formatTime(coord:dateOffset(coord:nominalTime(), -1, \'DAY\'), "yyyy-MM-dd\'T\'HH:mm:ss\'Z\'")}'
update_stop_queue = '${coord:formatTime(coord:dateOffset(coord:nominalTime(), -0, \'DAY\'), "yyyy-MM-dd\'T\'HH:mm:ss\'Z\'")}'
update_queue = '%s/%s' %(update_start_queue, update_stop_queue)

product_type = 'SLC'
swath = 'IW1 IW2 IW3'
geom = AoI.replace(' ', '%20').replace(',', '%2C')

In [8]:
coordinator_name = 'Coordinator_%s_queue' % data_pipeline
In [10]:
co_trigger_queue_process_id = 'coordinator_%s' % trigger_queue_process_id

print (co_trigger_queue_process_id)
coordinator_ec_better_tg_wfp_01_01_02_queue_tg_wfp_01_01_02_queue_0_8
In [11]:
coordinator_date_start = '2019-03-20T00:00Z'
coordinator_date_stop = '2021-02-05T00:01Z'
coordinator_period = '0 0 * * *'

In [18]:
inputs = [('Sources', series),
          ('data_pipeline', data_pipeline),
          ('wps_url', wps_url_apps),
          ('process_id', app_process_id),
          ('api_key', datapipeline_api_key),
          ('tg_quotation', tg_quotation),
          ('polarisation', polarisation),
          ('swaths', subswaths),
          ('cohWinRg', cohWinRg),
          ('cohWinAz', cohWinAz),
          ('wkt', wkt),
          ('update', update_queue),
          ('start', start_queue),
          ('stop', stop_queue),
          ('product_type', product_type),
          ('swath', swath),
          ('geom',  geom),
          ('t2_coordinator_date_start', coordinator_date_start),
          ('t2_coordinator_date_stop', coordinator_date_stop),
          ('t2_coordinator_period', coordinator_period),
          ('t2_coordinator_name', coordinator_name),
          ('quotation', quotation),
          ('_T2Username', _T2username)]

Submit the coordinator request

In [19]:
execution = owslib.wps.WPSExecution(url=wps_url_triggers)

execution_request = execution.buildRequest(co_trigger_queue_process_id,
                                           inputs,
                                           output=[('coordinatorIds', False)])

execution_response = execution.submitRequest(etree.tostring(execution_request, pretty_print=True))

execution.parseResponse(execution_response)

execution.statusLocation

monitorExecution(execution)

if not execution.isSucceded():

    raise Exception('Coordinator %s creation failed' % co_trigger_queue_process_id)
In [20]:
coordinator_id = str(json.loads(execution.processOutputs[0].data[0])['coordinatorsId'][0]['oozieId'])
In [21]:
coordinator_id
Out[21]:
'0002170-181221095105003-oozie-oozi-C'

** DANGER ZONE **

In [ ]:
answer = raw_input('Are you sure you want to kill the coordinator %s (YES I DO to confirm)?' % coordinator_id)

if answer == 'YES I DO':
    r = requests.put('%s:11000/oozie/v1/job/%s?user.name=oozie&action=kill' % (production_centre, coordinator_id))
    if r.status_code:
        print 'Coordinator %s killed' % coordinator_id