Test feeding the queueΒΆ
This Jupyter Notebook queries the catalog for CHIRPS products and plots these on a map. The same set of products is then put in the data pipeline queue using the trigger WPS service.
- First do the imports of the Python libraries required
In [1]:
import sys
import os
import owslib
from owslib.wps import monitorExecution
from owslib.wps import WebProcessingService
import lxml.etree as etree
import cioppy
from shapely.wkt import loads
import getpass
#import folium
from datetime import datetime, timedelta
import dateutil.parser
from nbconvert.preprocessors import ExecutePreprocessor, CellExecutionError
import nbformat as nbf
datapipeline_api_key = getpass.getpass('API key:')
In [2]:
from geopandas import GeoDataFrame
import ellip_triggers
import time
ciop = cioppy.Cioppy()
- Read the data pipeline configuration information:
In [3]:
%store -r
nb_config = os.path.join('..', 'configuration.ipynb')
nb = nbf.read(nb_config, 4)
exec(nb['cells'][1]['source']) in globals(), locals()
app = dict([('artifact_id', app_artifact_id),
('version', app_version),
('repository', repository),
('community', community)])
app_process_id = '%s_%s_%s_%s' % (app['community'].replace('-', '_'),
app['artifact_id'].replace('-', '_'),
app['artifact_id'].replace('-', '_'),
app['version'].replace('.', '_'))
app_process_id = app_process_id.replace('ewf_','')
trigger_queue = dict([('artifact_id', trigger_queue_artifact_id),
('version', trigger_queue_version),
('repository', repository),
('folder', folder),
('community', community)])
trigger_queue_process_id = '%s_%s_%s_%s' % (trigger_queue['community'].replace('-', '_'),
trigger_queue['artifact_id'].replace('-', '_'),
trigger_queue['artifact_id'].replace('-', '_'),
trigger_queue['version'].replace('.', '_'))
print 'This notebook will create a queue to invoke the application %s with the trigger %s' % (app_process_id,
trigger_queue_process_id)
This notebook will create a queue to invoke the application ec_better_wfp_01_03_02_wfp_01_03_02_1_17 with the trigger ec_better_tg_wfp_01_03_02_queue_tg_wfp_01_03_02_queue_0_7
- Set the data discovery parameters
In [4]:
series = 'https://catalog.terradue.com/chirps/description'
start_date = '2017-01-01T00:00:00.00Z'
stop_date = '2017-01-31T23:59:00Z'
geom = 'POLYGON ((11.50307555189977 -11.11416337069092, 41.03432555189977 -11.11416337069092, 41.03432555189977 -34.97636566938584, 11.50307555189977 -34.97636566938584, 11.50307555189977 -11.11416337069092))'
In [5]:
wkt = loads(geom).wkt
In [6]:
wkt
Out[6]:
'POLYGON ((11.50307555189977 -11.11416337069092, 41.03432555189977 -11.11416337069092, 41.03432555189977 -34.97636566938584, 11.50307555189977 -34.97636566938584, 11.50307555189977 -11.11416337069092))'
In [7]:
search_params = dict([('start', start_date),
('stop', stop_date),
('count', 366)])
ciop = cioppy.Cioppy()
search = ciop.search(end_point=series,
params=search_params,
output_fields='self,enclosure,identifier,wkt',
model='GeoTime')
In [8]:
for index, elem in enumerate(search):
print(index, elem['self'])
(0, 'https://catalog.terradue.com/chirps/search?format=atom&uid=chirps-v2.0.2017.01.31')
(1, 'https://catalog.terradue.com/chirps/search?format=atom&uid=chirps-v2.0.2017.01.30')
(2, 'https://catalog.terradue.com/chirps/search?format=atom&uid=chirps-v2.0.2017.01.29')
(3, 'https://catalog.terradue.com/chirps/search?format=atom&uid=chirps-v2.0.2017.01.28')
(4, 'https://catalog.terradue.com/chirps/search?format=atom&uid=chirps-v2.0.2017.01.27')
(5, 'https://catalog.terradue.com/chirps/search?format=atom&uid=chirps-v2.0.2017.01.26')
(6, 'https://catalog.terradue.com/chirps/search?format=atom&uid=chirps-v2.0.2017.01.25')
(7, 'https://catalog.terradue.com/chirps/search?format=atom&uid=chirps-v2.0.2017.01.24')
(8, 'https://catalog.terradue.com/chirps/search?format=atom&uid=chirps-v2.0.2017.01.23')
(9, 'https://catalog.terradue.com/chirps/search?format=atom&uid=chirps-v2.0.2017.01.22')
(10, 'https://catalog.terradue.com/chirps/search?format=atom&uid=chirps-v2.0.2017.01.21')
(11, 'https://catalog.terradue.com/chirps/search?format=atom&uid=chirps-v2.0.2017.01.20')
(12, 'https://catalog.terradue.com/chirps/search?format=atom&uid=chirps-v2.0.2017.01.19')
(13, 'https://catalog.terradue.com/chirps/search?format=atom&uid=chirps-v2.0.2017.01.18')
(14, 'https://catalog.terradue.com/chirps/search?format=atom&uid=chirps-v2.0.2017.01.17')
(15, 'https://catalog.terradue.com/chirps/search?format=atom&uid=chirps-v2.0.2017.01.16')
(16, 'https://catalog.terradue.com/chirps/search?format=atom&uid=chirps-v2.0.2017.01.15')
(17, 'https://catalog.terradue.com/chirps/search?format=atom&uid=chirps-v2.0.2017.01.14')
(18, 'https://catalog.terradue.com/chirps/search?format=atom&uid=chirps-v2.0.2017.01.13')
(19, 'https://catalog.terradue.com/chirps/search?format=atom&uid=chirps-v2.0.2017.01.12')
(20, 'https://catalog.terradue.com/chirps/search?format=atom&uid=chirps-v2.0.2017.01.11')
(21, 'https://catalog.terradue.com/chirps/search?format=atom&uid=chirps-v2.0.2017.01.10')
(22, 'https://catalog.terradue.com/chirps/search?format=atom&uid=chirps-v2.0.2017.01.09')
(23, 'https://catalog.terradue.com/chirps/search?format=atom&uid=chirps-v2.0.2017.01.08')
(24, 'https://catalog.terradue.com/chirps/search?format=atom&uid=chirps-v2.0.2017.01.07')
(25, 'https://catalog.terradue.com/chirps/search?format=atom&uid=chirps-v2.0.2017.01.06')
(26, 'https://catalog.terradue.com/chirps/search?format=atom&uid=chirps-v2.0.2017.01.05')
(27, 'https://catalog.terradue.com/chirps/search?format=atom&uid=chirps-v2.0.2017.01.04')
(28, 'https://catalog.terradue.com/chirps/search?format=atom&uid=chirps-v2.0.2017.01.03')
(29, 'https://catalog.terradue.com/chirps/search?format=atom&uid=chirps-v2.0.2017.01.02')
(30, 'https://catalog.terradue.com/chirps/search?format=atom&uid=chirps-v2.0.2017.01.01')
In [9]:
series
Out[9]:
'https://catalog.terradue.com/chirps/description'
In [10]:
trigger_queue_process_id
Out[10]:
'ec_better_tg_wfp_01_03_02_queue_tg_wfp_01_03_02_queue_0_7'
- Establish the connection with the WPS server hosting the triggers:
In [11]:
wps_url = '%s/zoo-bin/zoo_loader.cgi' % trigger_deployer
wps = WebProcessingService(wps_url,
verbose=False,
skip_caps=True)
wps.getcapabilities()
process = wps.describeprocess(trigger_queue_process_id)
print process.title
RFE Aggregations Trigger - Queue
- List the WPS trigger process data inputs:
In [12]:
for data_input in process.dataInputs:
print data_input.identifier
series
data_pipeline
wps_url
process_id
api_key
start_time
end_time
N_10
N_30
N_60
N_90
N_120
N_150
N_180
N_270
N_365
regionOfInterest
nameOfRegion
quotation
_T2Username
In [13]:
print '%s/zoo/' % apps_deployer
https://ec-better-apps-deployer.terradue.com/zoo/
- Create a Python dictionary with the inputs:
In [14]:
#update = '%s/%s' % (start_date, stop_date)
region_name = 'SouthernAfrica'
quotation = 'No'
inputs = [('series', series),
('data_pipeline', data_pipeline),
('wps_url', '%s/zoo/' % apps_deployer),
('process_id', app_process_id),
('start_time', start_date),
('end_time', stop_date),
('regionOfInterest', wkt),
('nameOfRegion', region_name),
('api_key', datapipeline_api_key),
('quotation', quotation),
('_T2Username', data_pipeline),
('N_10', 'No'),
('N_30', 'Yes'),
('N_60', 'No'),
('N_90', 'No'),
('N_120', 'No'),
('N_150', 'No'),
('N_180', 'No'),
('N_270', 'No'),
('N_365', 'No')]
- Submit the Execute WPS and monitor the request:
In [ ]:
start_date_queue = datetime.utcnow().isoformat()
execution = owslib.wps.WPSExecution(url=wps_url)
execution_request = execution.buildRequest(trigger_queue_process_id,
inputs,
output=[('result_osd', False)])
execution_response = execution.submitRequest(etree.tostring(execution_request))
execution.parseResponse(execution_response)
print(execution.statusLocation)
monitorExecution(execution)
stop_date_queue = datetime.utcnow().isoformat()
- Check the outcome of the processing request
In [ ]:
if not execution.isSucceded():
raise Exception('Processing failed')
In [ ]:
update_queue = '%sZ/%sZ' % (start_date_queue, stop_date_queue)
print update_queue
- Search the queue content
In [ ]:
series_queue = 'https://catalog.terradue.com/%s/series/source-queue/description' % data_pipeline
In [ ]:
series_queue
In [ ]:
search_params_queue = dict()
search_params_queue['count'] = '100'
search_queue = GeoDataFrame(ciop.search(end_point=series_queue,
params=search_params_queue,
output_fields='self,identifier,wkt,startdate',
model='GeoTime',
timeout=50000))
search_queue['wkt'] = search_queue['wkt'].apply(loads)
In [ ]:
search_queue
In [ ]:
import requests
In [ ]:
data_item = search_queue.iloc[-1]['self']
print data_item
root = etree.fromstring(requests.get(data_item).content)
print root
In [ ]:
ns = {'a':'http://www.w3.org/2005/Atom',
'b':'http://www.opengis.net/owc/1.0',
'c':'http://www.opengis.net/wps/1.0.0',
'd':'http://www.opengis.net/ows/1.1'}
In [ ]:
wps_url = root.xpath('/a:feed/a:entry/b:offering/b:operation[@code="Execute"]',
namespaces=ns)[0].attrib['href']
wps_url
In [ ]:
process_id = root.xpath('/a:feed/a:entry/b:offering/b:operation[@code="Execute"]/b:request/c:Execute/d:Identifier',
namespaces=ns)[0].text
process_id
In [ ]:
identifiers = root.xpath('/a:feed/a:entry/b:offering/b:operation[@code="Execute"]/b:request/c:Execute/c:DataInputs/c:Input/d:Identifier',
namespaces=ns)
values = root.xpath('/a:feed/a:entry/b:offering/b:operation[@code="Execute"]/b:request/c:Execute/c:DataInputs/c:Input/c:Data/c:LiteralData',
namespaces=ns)
params = dict()
for index, elem in enumerate(identifiers):
params[elem.text] = values[index].text
params
In [ ]:
wps = WebProcessingService(wps_url, verbose=False, skip_caps=True)
execution = owslib.wps.WPSExecution(url=wps.url)
execution_request = root.xpath('/a:feed/a:entry/b:offering/b:operation[@code="Execute"]/b:request/c:Execute',
namespaces=ns)[0]
execution_response = execution.submitRequest(etree.tostring(execution_request))
execution.parseResponse(execution_response)
print execution.statusLocation
In [ ]:
monitorExecution(execution)
In [ ]:
if not execution.isSucceded():
raise Exception('Single data item submission failed')
In [ ]:
for output in execution.processOutputs:
print(output.identifier)
results_osd = execution.processOutputs[0].reference
print results_osd
In [ ]:
search_results = GeoDataFrame(ciop.search(end_point=results_osd,
params=[],
output_fields='title,enclosure',
model='GeoTime',
timeout=50000))
search_results
In [ ]:
search_results.iloc[0]['enclosure']
In [ ]:
trigger_pipe = ellip_triggers.Trigger(data_pipeline,
data_pipeline,
datapipeline_api_key,
'',
app_process_id,
'via',
False)
In [ ]:
search_params = dict()
search_params['cat'] = '{in,queue}'
search_params['count'] = '100'
end_point = "https://catalog.terradue.com/{0}/search".format(data_pipeline)
In [ ]:
end_point
- loop as many times as required until queue is empty
In [ ]:
exit = False
while not exit:
try:
queue_search = ciop.search(end_point=end_point,
params=search_params,
output_fields='self,title',
model='GeoTime',
timeout=50000)
for index, elem in enumerate(queue_search):
data_input_queue_ref = trigger_pipe.create_data_item_from_single_reference(elem['self'])
trigger_pipe.pipe(data_input_queue_ref)
print '------------- sleeping ------------------'
print '------------- sleeping ------------------'
print '------------- sleeping ------------------'
print '------------- sleeping ------------------'
print '------------- sleeping ------------------'
time.sleep(60)
except IndexError:
exit = True
In [ ]:
search_params = dict()
search_params['cat'] = '{out,err}'
search_params['count'] = '100'
end_point = "https://catalog.terradue.com/{0}/search".format(data_pipeline)
In [ ]:
search_piped = GeoDataFrame(ciop.search(end_point=end_point,
params=search_params,
output_fields='self,wkt,identifier',
model='GeoTime',
timeout=50000))
search_piped['wkt'] = search_piped['wkt'].apply(loads)
In [ ]:
search_piped.head()