Coordinator - better-wfp-00005 pipe¶
This coordinator processes the better-wfp-00005 data pipeline queue for the Sentinel-1 Sigma-0 backscatter all polarization
- First do the imports of the Python libraries required
In [ ]:
import sys
import os
import owslib
from owslib.wps import monitorExecution
from owslib.wps import WebProcessingService
import lxml.etree as etree
import json
import cioppy
from shapely.wkt import loads
import getpass
import folium
from datetime import datetime, timedelta
import dateutil.parser
from nbconvert.preprocessors import ExecutePreprocessor, CellExecutionError
import nbformat as nbf
- Read the data pipeline configuration information:
In [ ]:
%store -r
nb_config = os.path.join('..', 'configuration.ipynb')
nb = nbf.read(nb_config, 4)
exec(nb['cells'][1]['source']) in globals(), locals()
app = dict([('artifact_id', app_artifact_id),
('version', app_version),
('repository', repository),
('community', community)])
app_process_id = '%s_%s_%s_%s' % (app['community'].replace('-', '_'),
app['artifact_id'].replace('-', '_'),
app['artifact_id'].replace('-', '_'),
app['version'].replace('.', '_'))
trigger_pipe = dict([('artifact_id', trigger_pipe_artifact_id),
('version', trigger_pipe_version),
('repository', repository),
('folder', folder),
('community', community)])
trigger_pipe_process_id = '%s_%s_%s_%s' % (trigger_pipe['community'].replace('-', '_'),
trigger_pipe['artifact_id'].replace('-', '_'),
trigger_pipe['artifact_id'].replace('-', '_'),
trigger_pipe['version'].replace('.', '_'))
print 'This notebook will process the queue of %s with the trigger %s' % (app_process_id,
trigger_pipe_process_id)
Queue selection parameters¶
In [ ]:
series = 'https://catalog.terradue.com/%s/series/source-queue/description' % data_pipeline
Coordinator parameters¶
In [ ]:
coordinator_date_start = '2018-07-20T00:00Z'
coordinator_date_stop = '2018-12-31T00:00Z'
coordinator_period = '0 0 * * *'
In [ ]:
start_pipe = '${coord:formatTime(coord:dateOffset(coord:nominalTime(), -1, \'DAY\'), "yyyy-MM-dd\'T\'HH:mm:ss\'Z\'")}'
end_pipe = '${coord:formatTime(coord:dateOffset(coord:nominalTime(), -0, \'DAY\'), "yyyy-MM-dd\'T\'HH:mm:ss\'Z\'")}'
In [ ]:
co_trigger_pipe_process_id = 'coordinator_%s' % trigger_pipe_process_id
Common Parameters¶
In [ ]:
tg_quotation = 'No'
recovery = 'No'
_T2Username = data_pipeline
Check data transformation application¶
In [ ]:
wps_url_apps = '%s/zoo-bin/zoo_loader.cgi' % apps_deployer
wps = WebProcessingService(wps_url_apps, verbose=False, skip_caps=False)
found_process = False
message = "The process %s is not deployed" % app_process_id
for index, elem in enumerate(wps.processes):
if elem.identifier == app_process_id:
message = "The process %s is deployed" % app_process_id
found_process = True
print message
if not found_process:
raise Exception()
Check trigger coordinator¶
In [ ]:
wps_url_triggers = '%s/zoo-bin/zoo_loader.cgi' % trigger_deployer
wps = WebProcessingService(wps_url_triggers, verbose=False, skip_caps=False)
found_process = False
message = "The pipe coordinator process %s is not deployed" % co_trigger_pipe_process_id
for index, elem in enumerate(wps.processes):
if elem.identifier == co_trigger_pipe_process_id:
message = "The pipe coordinator process %s is deployed" % co_trigger_pipe_process_id
found_process = True
print message
if not found_process:
raise Exception(message)
Process the queue¶
In [ ]:
process = wps.describeprocess(co_trigger_pipe_process_id)
print process.title
print process.abstract
In [ ]:
for data_input in process.dataInputs:
print data_input.identifier
In [ ]:
mode = 'Via'
coordinator_name = 'co_%s_pipe' % data_pipeline'
inputs = [('series', series),
('data_pipeline', data_pipeline),
('wps_url', wps_url_apps),
('process_id', app_process_id),
('api_key', datapipeline_api_key),
('tg_quotation', tg_quotation),
('update', '%s/%s' % (start_pipe, end_pipe)),
('geom',''),
('t2_coordinator_date_start', coordinator_date_start),
('t2_coordinator_date_stop', coordinator_date_stop),
('t2_coordinator_period', coordinator_period),
('t2_coordinator_name',coordinator_name),
('quotation', tg_quotation),
('_T2Username', data_pipeline)]
Submit the coordinator request¶
In [ ]:
execution = owslib.wps.WPSExecution(url=wps_url_triggers)
execution_request = execution.buildRequest(co_trigger_pipe_process_id,
inputs,
output=[('coordinatorIds', False)])
execution_response = execution.submitRequest(etree.tostring(execution_request, pretty_print=True))
execution.parseResponse(execution_response)
execution.statusLocation
monitorExecution(execution)
if not execution.isSucceded():
raise Exception('Coordinator %s creation failed' % co_trigger_queue_process_id)
In [ ]:
coordinator_id = str(json.loads(execution.processOutputs[0].data[0])['coordinatorsId'][0]['oozieId'])
In [ ]:
coordinator_id