Coordinator queue¶
This coordinator feeds the data pipeline queue
- First do the imports of the Python libraries required
In [1]:
import sys
import os
import owslib
from owslib.wps import monitorExecution
from owslib.wps import WebProcessingService
import json
import lxml.etree as etree
import cioppy
from shapely.wkt import loads
import getpass
import folium
from datetime import datetime, timedelta
import dateutil.parser
import requests
from nbconvert.preprocessors import ExecutePreprocessor, CellExecutionError
import nbformat as nbf
datapipeline_api_key = getpass.getpass('API key:')
- Read the data pipeline configuration information:
In [2]:
%store -r
nb_config = os.path.join('..', 'configuration.ipynb')
nb = nbf.read(nb_config, 4)
exec(nb['cells'][1]['source']) in globals(), locals()
app = dict([('artifact_id', app_artifact_id),
('version', app_version),
('repository', repository),
('community', community)])
app_process_id = '%s_%s_%s_%s' % (app['community'].replace('-', '_'),
app['artifact_id'].replace('-', '_'),
app['artifact_id'].replace('-', '_'),
app['version'].replace('.', '_'))
app_process_id = app_process_id.replace('ewf_','')
trigger_queue = dict([('artifact_id', trigger_queue_artifact_id),
('version', trigger_queue_version),
('repository', repository),
('folder', folder),
('community', community)])
trigger_queue_process_id = '%s_%s_%s_%s' % (trigger_queue['community'].replace('-', '_'),
trigger_queue['artifact_id'].replace('-', '_'),
trigger_queue['artifact_id'].replace('-', '_'),
trigger_queue['version'].replace('.', '_'))
print 'This notebook will create a coordinator for a queue to invoke the application %s with the trigger %s' % (app_process_id,
trigger_queue_process_id)
This notebook will create a coordinator for a queue to invoke the application ec_better_wfp_01_03_02_wfp_01_03_02_1_17 with the trigger ec_better_tg_wfp_01_03_02_queue_tg_wfp_01_03_02_queue_0_7
Application parameters¶
In [3]:
bbox = 'POLYGON ((11.50307555189977 -11.11416337069092, 41.03432555189977 -11.11416337069092, 41.03432555189977 -34.97636566938584, 11.50307555189977 -34.97636566938584, 11.50307555189977 -11.11416337069092))'
min_mag = 5
buffer_size = '0.9'
region_name = 'SouthernAfrica'
Coordinator parameters¶
In [4]:
coordinator_name = 'co_%s_validation_queue' % data_pipeline
In [5]:
start_queue = '${coord:formatTime(coord:dateOffset(coord:nominalTime(), -1, \'MONTH\'), "yyyy-MM-dd\'T\'HH:mm:ss\'Z\'")}'
end_queue = '${coord:formatTime(coord:dateOffset(coord:nominalTime(), -1, \'DAY\'), "yyyy-MM-dd\'T\'HH:mm:ss\'Z\'")}'
In [6]:
co_trigger_queue_process_id = 'coordinator_%s' % trigger_queue_process_id
In [7]:
coordinator_date_start = '2019-04-01T00:00Z'
coordinator_date_stop = '2020-01-01T00:00Z'
coordinator_period = '0 0 1 * *'
Common Parameters¶
In [8]:
tg_quotation = 'No'
recovery = 'No'
_T2Username = data_pipeline
Check data transformation application¶
In [9]:
wps_url_apps = '%s/zoo-bin/zoo_loader.cgi' % apps_deployer
wps = WebProcessingService(wps_url_apps, verbose=False, skip_caps=False)
found_process = False
message = "The process %s is not deployed" % app_process_id
for index, elem in enumerate(wps.processes):
if elem.identifier == app_process_id:
message = "The process %s is deployed" % app_process_id
found_process = True
print message
if not found_process:
raise Exception(message)
The process ec_better_wfp_01_03_02_wfp_01_03_02_1_17 is deployed
Check trigger coordinator¶
In [10]:
wps_url_triggers = '%s/zoo-bin/zoo_loader.cgi' % trigger_deployer
wps = WebProcessingService(wps_url_triggers, verbose=False, skip_caps=False)
found_process = False
message = "The queue coordinator process %s is not deployed" % co_trigger_queue_process_id
for index, elem in enumerate(wps.processes):
if elem.identifier == co_trigger_queue_process_id:
message = "The queue coordinator process %s is deployed" % co_trigger_queue_process_id
found_process = True
print message
if not found_process:
raise Exception(message)
The queue coordinator process coordinator_ec_better_tg_wfp_01_03_02_queue_tg_wfp_01_03_02_queue_0_7 is deployed
Feed the queue¶
In [11]:
process = wps.describeprocess(co_trigger_queue_process_id)
print process.title
print process.abstract
RFE Aggregations Trigger - Queue Coordinator
Coordinator: Trigger for the CHIRPS Rainfall Estimate Aggregations Pipeline - Queue
In [12]:
for data_input in process.dataInputs:
print data_input.identifier
series
data_pipeline
wps_url
process_id
api_key
start_time
end_time
N_10
N_30
N_60
N_90
N_120
N_150
N_180
N_270
N_365
regionOfInterest
nameOfRegion
t2_coordinator_date_start
t2_coordinator_date_stop
t2_coordinator_period
t2_coordinator_name
quotation
_T2Username
Define the input parameters¶
In [15]:
mode = 'Queue'
inputs = [('data_pipeline', data_pipeline),
('series', 'https://catalog.terradue.com/chirps/description'),
('wps_url', wps_url_apps),
('process_id', app_process_id),
('api_key', datapipeline_api_key),
('username', data_pipeline),
('tg_quotation', tg_quotation),
('start_time', start_queue),
('end_time', end_queue),
('regionOfInterest', bbox),
('nameOfRegion', region_name),
('buffer_size', buffer_size),
('t2_coordinator_date_start', coordinator_date_start),
('t2_coordinator_date_stop', coordinator_date_stop),
('t2_coordinator_period', coordinator_period),
('t2_coordinator_name', coordinator_name),
('quotation', tg_quotation),
('_T2Username', data_pipeline),
('N_10', 'No'),
('N_30', 'Yes'),
('N_60', 'No'),
('N_90', 'No'),
('N_120', 'No'),
('N_150', 'No'),
('N_180', 'No'),
('N_270', 'No'),
('N_365', 'No')]
Submit the coordinator request¶
In [16]:
execution = owslib.wps.WPSExecution(url=wps_url_triggers)
execution_request = execution.buildRequest(co_trigger_queue_process_id,
inputs,
output=[('coordinatorIds', False)])
execution_response = execution.submitRequest(etree.tostring(execution_request, pretty_print=True))
execution.parseResponse(execution_response)
execution.statusLocation
monitorExecution(execution)
if not execution.isSucceded():
raise Exception('Coordinator %s creation failed' % co_trigger_queue_process_id)
In [17]:
coordinator_id = str(json.loads(execution.processOutputs[0].data[0])['coordinatorsId'][0]['oozieId'])
In [18]:
coordinator_id
Out[18]:
'0036730-181221095105003-oozie-oozi-C'
** DANGER ZONE **
Suspend the coordinator
In [ ]:
answer = raw_input('Are you sure you want to suspend the coordinator %s (YES I DO to confirm)?' % coordinator_id)
if answer == 'YES I DO':
url = '%s:11000/oozie/v1/job/%s?user.name=oozie&action=%s' % (production_centre,
coordinator_id,
'suspend')
r = requests.put(url)
print r.status_code
if r.status_code:
print 'Coordinator %s suspended' % coordinator_id
Kill the coordinator
In [ ]:
answer = raw_input('Are you sure you want to kill the coordinator %s (YES I DO to confirm)?' % coordinator_id)
if answer == 'YES I DO':
r = requests.put('%s:11000/oozie/v1/job/%s?user.name=oozie&action=kill' % (production_centre, coordinator_id))
if r.status_code:
print 'Coordinator %s killed' % coordinator_id