Coordinator - better-satcen-00004 queue - Afghanistan¶
The coordinator periodically launches queue jobs, which fill the pipeline’s queue of incoming data items (source-queue series), according to the parameters defined below.
- First do the imports of the Python libraries required
In [ ]:
import sys
import os
import owslib
from owslib.wps import monitorExecution
from owslib.wps import WebProcessingService
import json
import lxml.etree as etree
import cioppy
from shapely.wkt import loads
import getpass
import folium
from datetime import datetime, timedelta
import dateutil.parser
import requests
from nbconvert.preprocessors import ExecutePreprocessor, CellExecutionError
import nbformat as nbf
- Read the data pipeline configuration information:
In [ ]:
%store -r
nb_config = os.path.join('..', 'configuration.ipynb')
nb = nbf.read(nb_config, 4)
exec(nb['cells'][1]['source']) in globals(), locals()
app = dict([('artifact_id', app_artifact_id),
('version', app_version),
('repository', repository),
('community', community)])
app_process_id = '%s_%s_%s_%s' % (app['community'].replace('-', '_'),
app['artifact_id'].replace('-', '_'),
app['artifact_id'].replace('-', '_'),
app['version'].replace('.', '_'))
trigger_queue = dict([('artifact_id', trigger_queue_artifact_id),
('version', trigger_queue_version),
('repository', repository),
('folder', folder),
('community', community)])
trigger_queue_process_id = '%s_%s_%s_%s' % (trigger_queue['community'].replace('-', '_'),
trigger_queue['artifact_id'].replace('-', '_'),
trigger_queue['artifact_id'].replace('-', '_'),
trigger_queue['version'].replace('.', '_'))
print 'This notebook will create a coordinator for a queue to invoke the application %s with the trigger %s' % (app_process_id,
trigger_queue_process_id)
- Set the data transformation parameters
In [ ]:
resolution = '10'
percentage_threshold = '20.0'
flag_expr = '( saturated_l1a_B4 or scl_water )'
geom = 'MULTIPOLYGON (((-77.24516666666666 0.9424444444444445, -77.24516666666666 2.753833333333333, -79.0253888888889 2.753833333333333, -79.0253888888889 0.9424444444444445, -77.24516666666666 0.9424444444444445)), ((21.29611111111111 39.58638888888889, 21.29611111111111 41.032, 19.89788888888889 41.032, 19.89788888888889 39.58638888888889, 21.29611111111111 39.58638888888889)), ((65.02055555555556 30.43894444444445, 65.02055555555556 33.39566666666666, 63.94222222222222 33.39566666666666, 63.94222222222222 30.43894444444445, 65.02055555555556 30.43894444444445)))'
# Afghanistan area
wkt = loads(geom)[2].wkt
Data selection parameters¶
The queue process selects data from the pipeline’s input series (this can be any kind of data, e.g. the results series of another pipeline), based on the insertion date.
In [ ]:
series = 'https://catalog.terradue.com/better-common-00001/description'
product_type = 'S2MSI2Ap'
geom = wkt
Coordinator parameters¶
The coordinator has a start and an end date, a repetition interval (period) and expressions for the start and end date to select from the queue. In the case of ‘0 0 * * *’, the coordinator will be triggered every midnight. That moment is the nominal time of the coordinator execution. If the execution start date is in the past, the coordinator will spawn jobs more frequently until the nominal time has kept up with the current time. The start_queue and end_queue dates are expressions relative to the nominal execution time. They are used for formulating the search date range within the pipeline’s original input series. Under normal circumstances, when the coordinator is processing current data, this expression has to define a range in the past (relative to the nominal execution time).
In [ ]:
start_queue = '${coord:formatTime(coord:dateOffset(coord:nominalTime(), -0, \'DAY\'), "yyyy-MM-dd")}T12:00:00Z'
end_queue = '${coord:formatTime(coord:dateOffset(coord:nominalTime(), -0, \'DAY\'), "yyyy-MM-dd")}T15:00:00Z'
In [ ]:
co_trigger_queue_process_id = 'coordinator_%s' % trigger_queue_process_id
In [ ]:
coordinator_date_start = '2018-11-15T00:00Z'
coordinator_date_stop = '2018-11-15T23:59Z'
coordinator_period = '0 0 * * *'
Common Parameters¶
In [ ]:
tg_quotation = 'No'
recovery = 'No'
_T2Username = data_pipeline
Check data transformation application¶
This step verifies that the WPS process offering for the underlying application is available.
In [ ]:
wps_url_apps = '%s/zoo-bin/zoo_loader.cgi' % apps_deployer
wps = WebProcessingService(wps_url_apps, verbose=False, skip_caps=False)
found_process = False
message = "The process %s is not deployed" % app_process_id
for index, elem in enumerate(wps.processes):
if elem.identifier == app_process_id:
message = "The process %s is deployed" % app_process_id
found_process = True
print message
if not found_process:
raise Exception(message)
Check trigger coordinator¶
This step verifies that the WPS process offering for the queue trigger coordinator is available.
In [ ]:
wps_url_triggers = '%s/zoo-bin/zoo_loader.cgi' % trigger_deployer
wps = WebProcessingService(wps_url_triggers, verbose=False, skip_caps=False)
found_process = False
message = "The queue coordinator process %s is not deployed" % co_trigger_queue_process_id
for index, elem in enumerate(wps.processes):
if elem.identifier == co_trigger_queue_process_id:
message = "The queue coordinator process %s is deployed" % co_trigger_queue_process_id
found_process = True
print message
if not found_process:
raise Exception(message)
Queue trigger setup¶
- Display trigger information and parameters (including the parameters related to the data-transforming application) for manual check.
In [ ]:
process = wps.describeprocess(co_trigger_queue_process_id)
print process.title
print process.abstract
In [ ]:
for data_input in process.dataInputs:
print data_input.identifier
- Set the parameters for the coordinator process.
In [ ]:
mode = 'Queue'
coordinator_name = 'Better-satcen-00004-queue-Afghanistan'
inputs = [('series', series),
('data_pipeline', data_pipeline),
('wps_url', wps_url_apps),
('process_id', app_process_id),
('update', '%s/%s' % (start_queue, end_queue)),
('geom', wkt),#.replace(' ', '%20').replace(',', '%2C')),
('pt', product_type),
('tg_quotation', tg_quotation),
('api_key', datapipeline_api_key),
('resolution', resolution),
('percentage_threshold', percentage_threshold),
('flag_expr', flag_expr),
('wkt', wkt),
('t2_coordinator_date_start', coordinator_date_start),
('t2_coordinator_date_stop', coordinator_date_stop),
('t2_coordinator_period', coordinator_period),
('t2_coordinator_name',coordinator_name),
('quotation', tg_quotation),
('_T2Username', data_pipeline)]
- Submit the coordinator request and obtain the Oozie process ID
In [ ]:
execution = owslib.wps.WPSExecution(url=wps_url_triggers)
execution_request = execution.buildRequest(co_trigger_queue_process_id,
inputs,
output=[('coordinatorIds', False)])
execution_response = execution.submitRequest(etree.tostring(execution_request, pretty_print=True))
execution.parseResponse(execution_response)
execution.statusLocation
monitorExecution(execution)
if not execution.isSucceded():
raise Exception('Coordinator %s creation failed' % co_trigger_queue_process_id)
In [ ]:
coordinator_id = str(json.loads(execution.processOutputs[0].data[0])['coordinatorsId'][0]['oozieId'])
print(coordinator_id)
- ** DANGER ZONE **
This is only to be used to kill the coordinator process.
Alternatively, you can kill using the commands seen in the output of the next cell:
In [ ]:
print("JOBID=\"%s\"" % coordinator_id)
print("curl -XPUT \"%s/:11000/oozie/v1/job/${JOBID}?user.name=oozie&action=kill\"" % (production_centre))
In [ ]:
answer = raw_input('Are you sure you want to kill the coordinator %s (YES I AM to confirm)?' % coordinator_id)
if answer == 'YES I AM':
r = requests.put('%s:11000/oozie/v1/job/%s?user.name=oozie&action=kill' % (production_centre, coordinator_id))
if r.status_code:
print 'Coordinator %s killed' % coordinator_id