Coordinator - better-wfp-00005 queue - Afghanistan area¶
This coordinator feeds the better-wfp-00005 data pipeline queue for the Sentinel-1 Sigma-0 backscatter (all polarization) processing over Afghanistan area
- First do the imports of the Python libraries required
In [2]:
import sys
import os
import owslib
from owslib.wps import monitorExecution
from owslib.wps import WebProcessingService
import json
import lxml.etree as etree
import cioppy
from shapely.wkt import loads
import getpass
import folium
from datetime import datetime, timedelta
import dateutil.parser
import requests
from nbconvert.preprocessors import ExecutePreprocessor, CellExecutionError
import nbformat as nbf
- Read the data pipeline configuration information:
In [3]:
%store -r
nb_config = os.path.join('..', 'configuration.ipynb')
nb = nbf.read(nb_config, 4)
exec(nb['cells'][1]['source']) in globals(), locals()
app = dict([('artifact_id', app_artifact_id),
('version', app_version),
('repository', repository),
('community', community)])
app_process_id = '%s_%s_%s_%s' % (app['community'].replace('-', '_'),
app['artifact_id'].replace('-', '_'),
app['artifact_id'].replace('-', '_'),
app['version'].replace('.', '_'))
trigger_queue = dict([('artifact_id', trigger_queue_artifact_id),
('version', trigger_queue_version),
('repository', repository),
('folder', folder),
('community', community)])
trigger_queue_process_id = '%s_%s_%s_%s' % (trigger_queue['community'].replace('-', '_'),
trigger_queue['artifact_id'].replace('-', '_'),
trigger_queue['artifact_id'].replace('-', '_'),
trigger_queue['version'].replace('.', '_'))
print 'This notebook will create a coordinator for a queue to invoke the application %s with the trigger %s' % (app_process_id,
trigger_queue_process_id)
This notebook will create a coordinator for a queue to invoke the application ec_better_ewf_wfp_01_02_03_ewf_wfp_01_02_03_0_5 with the trigger ec_better_tg_wfp_01_02_03_queue_tg_wfp_01_02_03_queue_0_6
- Set the data transformation parameters
In [4]:
filter_size_x = '5'
filter_size_y = '5'
polarisation = 'VV'
geom = 'MULTIPOLYGON (((26.832 9.5136, 28.6843 9.5136, 28.6843 7.8009, 26.832 7.8009, 26.832 9.5136)), ((32.0572 12.4549, 33.9087 12.4549, 33.9087 10.7344, 32.0572 10.7344, 32.0572 12.4549)), ((-5.5 17.26, -1.08 17.26, -1.08 13.5, -5.5 13.5, -5.5 17.26)), ((12.9415 13.7579, 14.6731 13.7579, 14.6731 12.0093, 12.9415 12.0093, 12.9415 13.7579)))'
wkt = loads(geom)[3].wkt
Data selection parameters¶
In [5]:
series = 'https://catalog.terradue.com/sentinel1/description'
In [ ]:
product_type = 'GRD'
Coordinator parameters¶
In [ ]:
coordinator_name = 'co_%s_queue_Afghanistan' % data_pipeline
In [ ]:
start_queue = '${coord:formatTime(coord:dateOffset(coord:nominalTime(), -2, \'DAY\'), "yyyy-MM-dd\'T\'HH:mm:ss\'Z\'")}'
end_queue = '${coord:formatTime(coord:dateOffset(coord:nominalTime(), -1, \'DAY\'), "yyyy-MM-dd\'T\'HH:mm:ss\'Z\'")}'
In [ ]:
co_trigger_queue_process_id = 'coordinator_%s' % trigger_queue_process_id
co_trigger_queue_process_id
In [ ]:
coordinator_date_start = '2018-10-01T00:00Z'
coordinator_date_stop = '2018-10-31T00:00Z'
coordinator_period = '0 0 * * *'
Common Parameters¶
In [ ]:
tg_quotation = 'No'
recovery = 'No'
_T2Username = data_pipeline
Visual check on the AOI¶
In [ ]:
lat = (loads(wkt).bounds[3]+loads(wkt).bounds[1])/2
lon = (loads(wkt).bounds[2]+loads(wkt).bounds[0])/2
zoom_start = 8
m = folium.Map(location=[lat, lon], zoom_start=zoom_start)
radius = 4
folium.CircleMarker(
location=[lat, lon],
radius=radius,
color='#FF0000',
stroke=False,
fill=True,
fill_opacity=0.6,
opacity=1,
popup='{} pixels'.format(radius),
tooltip='I am in pixels',
).add_to(m)
locations = []
locations.append([t[::-1] for t in list(loads(wkt).exterior.coords)])
folium.PolyLine(
locations=locations,
color='#FF0000',
weight=2,
tooltip='',
).add_to(m)
m.save(os.path.join('maps', '%s_search.html' % data_pipeline))
m
Check data transformation application¶
In [ ]:
wps_url_apps = '%s/zoo-bin/zoo_loader.cgi' % apps_deployer
wps = WebProcessingService(wps_url_apps, verbose=False, skip_caps=False)
found_process = False
message = "The process %s is not deployed" % app_process_id
for index, elem in enumerate(wps.processes):
if elem.identifier == app_process_id:
message = "The process %s is deployed" % app_process_id
found_process = True
print message
if not found_process:
raise Exception(message)
Check trigger coordinator¶
In [ ]:
wps_url_triggers = '%s/zoo-bin/zoo_loader.cgi' % trigger_deployer
wps = WebProcessingService(wps_url_triggers, verbose=False, skip_caps=False)
found_process = False
message = "The queue coordinator process %s is not deployed" % co_trigger_queue_process_id
for index, elem in enumerate(wps.processes):
if elem.identifier == co_trigger_queue_process_id:
message = "The queue coordinator process %s is deployed" % co_trigger_queue_process_id
found_process = True
print message
if not found_process:
raise Exception(message)
Feed the queue¶
In [ ]:
process = wps.describeprocess(co_trigger_queue_process_id)
print process.title
print process.abstract
In [ ]:
for data_input in process.dataInputs:
print data_input.identifier
Define the input parameters¶
In [ ]:
mode = 'Queue'
inputs = [('series', series),
('data_pipeline', data_pipeline),
('wps_url', wps_url_apps),
('process_id', app_process_id),
('update', '%s/%s' % (start_queue, end_queue)),
('geom', wkt.replace(' ', '%20').replace(',', '%2C')),
('product_type', product_type),
('tg_quotation', tg_quotation),
('api_key', datapipeline_api_key),
('filterSizeX', filter_size_x),
('filterSizeY', filter_size_y),
('polarisation', polarisation),
('wkt', wkt),
('t2_coordinator_date_start', coordinator_date_start),
('t2_coordinator_date_stop', coordinator_date_stop),
('t2_coordinator_period', coordinator_period),
('t2_coordinator_name', coordinator_name),
('quotation', tg_quotation),
('_T2Username', data_pipeline)]
Submit the coordinator request¶
In [ ]:
execution = owslib.wps.WPSExecution(url=wps_url_triggers)
execution_request = execution.buildRequest(co_trigger_queue_process_id,
inputs,
output=[('coordinatorIds', False)])
execution_response = execution.submitRequest(etree.tostring(execution_request, pretty_print=True))
execution.parseResponse(execution_response)
execution.statusLocation
monitorExecution(execution)
if not execution.isSucceded():
raise Exception('Coordinator %s creation failed' % co_trigger_queue_process_id)
In [ ]:
coordinator_id = str(json.loads(execution.processOutputs[0].data[0])['coordinatorsId'][0]['oozieId'])
In [ ]:
coordinator_id
** DANGER ZONE **
In [ ]:
answer = raw_input('Are you sure you want to kill the coordinator %s (YES I DO to confirm)?' % coordinator_id)
if answer == 'YES I DO':
r = requests.put('%s:11000/oozie/v1/job/%s?user.name=oozie&action=kill' % (production_centre, coordinator_id))
if r.status_code:
print 'Coordinator %s killed' % coordinator_id