Test the trigger in pipe modeΒΆ
This Jupyter Notebook queries the catalog for a Landsat-8 dataitems in the related source queue series, creates a Web Processing Service (WPS) request invoking the data transformation application that was deployed in Step 8, monitors the WPS request execution and finally retrieves the data transformation execution results
- First do the imports of the Python libraries required
In [13]:
import sys
import os
import owslib
from owslib.wps import monitorExecution
from owslib.wps import WebProcessingService
import pytz
import lxml.etree as etree
import cioppy
ciop = cioppy.Cioppy()
from shapely.wkt import loads
import getpass
import folium
from datetime import datetime, timedelta
import dateutil.parser
from nbconvert.preprocessors import ExecutePreprocessor, CellExecutionError
import nbformat as nbf
- Read the data pipeline configuration information:
In [14]:
%store -r
nb_config = os.path.join('..', 'configuration.ipynb')
nb = nbf.read(nb_config, 4)
exec(nb['cells'][0]['source']) in globals(), locals()
#exec(nb['cells'][1]['source']) in globals(), locals()
print app_artifact_id
app = dict([('artifact_id', app_artifact_id),
('version', app_version),
('repository', repository),
('community', community)])
app_process_id = '%s_%s_%s_%s' % (app['community'].replace('-', '_'),
app['artifact_id'].replace('-', '_'),
app['artifact_id'].replace('-', '_'),
app['version'].replace('.', '_'))
trigger_pipe = dict([('artifact_id', trigger_pipe_artifact_id),
('version', trigger_pipe_version),
('repository', repository),
('folder', folder),
('community', community)])
trigger_pipe_process_id = '%s_%s_%s_%s' % (trigger_pipe['community'].replace('-', '_'),
trigger_pipe['artifact_id'].replace('-', '_'),
trigger_pipe['artifact_id'].replace('-', '_'),
trigger_pipe['version'].replace('.', '_'))
print 'This notebook will process the queue of %s with the trigger %s' % (app_process_id,
trigger_pipe_process_id)
ewf-wfp-01-02-02
This notebook will process the queue of ec_better_ewf_wfp_01_02_02_ewf_wfp_01_02_02_0_11 with the trigger ec_better_tg_wfp_01_02_02_pipe_tg_wfp_01_02_02_pipe_0_7
- Set the data pipeline catalogue series endpoints:
In [15]:
series_queue = 'https://catalog.terradue.com/%s/series/source-queue/description' % data_pipeline
series_in = 'https://catalog.terradue.com/%s/series/source-in/description' % data_pipeline
series_out = 'https://catalog.terradue.com/%s/series/source-out/description' % data_pipeline
series_endpoint = 'https://catalog.terradue.com/%s/description' % data_pipeline
- Set the update range of the queue entries:
In [16]:
update = '2019-02-03T11:48:44.945679Z/2019-02-03T11:54:12.623734Z'
start = update.split('/')[0]#'2018-10-20T09:17:55.868804Z'
In [17]:
wps_url = '%s/zoo-bin/zoo_loader.cgi' % trigger_deployer
wps = WebProcessingService(wps_url, verbose=False, skip_caps=True)
wps.getcapabilities()
process = wps.describeprocess(trigger_pipe_process_id)
print process.title
print process.abstract
Trigger for the WFP-01-02-02 Pipe
Trigger for the WFP-01-02-02 Landsat-8 reflectances and vegetation indices data pipeline - Pipe
- List the WPS process inputs:
In [18]:
for data_input in process.dataInputs:
print data_input.identifier
series
data_pipeline
update
geom
api_key
recovery
quotation
_T2Username
- Create a Python dictionary with the inputs:
In [19]:
series = series_queue
recovery = 'No'
tg_quotation = 'No'
quotation = "False"
geom = ''
In [20]:
inputs = [('series', series),
('data_pipeline', data_pipeline),
('update', update),
('geom', geom),
('api_key', datapipeline_api_key),
('recovery',recovery),
('quotation', quotation),
('_T2Username', data_pipeline)]
- Submit the Execute WPS and monitor the request:
In [21]:
execution = owslib.wps.WPSExecution(url=wps.url)
execution_request = execution.buildRequest(trigger_pipe_process_id,
inputs,
output=[('result_osd', False)])
start_pipe = datetime.now(pytz.timezone('utc')).replace(microsecond=0)
execution_response = execution.submitRequest(etree.tostring(execution_request))
execution.parseResponse(execution_response)
print(execution.statusLocation)
http://ec-better-triggers-deployer.terradue.com/zoo-bin/zoo_loader.cgi?request=Execute&service=WPS&version=1.0.0&Identifier=GetStatus&DataInputs=sid=24719f6c-27b0-11e9-8ee1-0242ac110012&RawDataOutput=Result
In [22]:
print 'percentCompleted : %s '%execution.percentCompleted
#print 'status : %s' %execution.checkStatus()
print 'execution is completed: %s' %execution.isComplete()
print 'execution is not completed: %s' %execution.isNotComplete()
execution.statusMessage
print 'execution is succeeded: %s' %execution.isSucceded()
percentCompleted : 0
execution is completed: False
execution is not completed: True
execution is succeeded: False
- Check the outcome of the processing request
In [11]:
geom = 'MULTIPOLYGON (((6.4788 14.5973, 7.5577 14.5973, 7.5577 13.6328, 6.4788 13.6328, 6.4788 14.5973)), ((67.7116 37.9032, 68.791 37.9032, 68.791 36.9211, 67.7116 36.9211, 67.7116 37.9032)), ((-10.3668 15.3471, -9.351800000000001 15.3471, -9.351800000000001 14.3406, -10.3668 14.3406, -10.3668 15.3471)), ((67.62430000000001 36.7228, 68.116 36.7228, 68.116 35.6923, 67.62430000000001 35.6923, 67.62430000000001 36.7228)))'
now = datetime.now(pytz.timezone('utc')).replace(microsecond=0)
if execution.isNotComplete():
search_queue = ciop.search(end_point=series_queue,
params=dict([('count', 100),('geom',geom),('update','%s/%s' %(start,now))]),
output_fields='self',
model='GeoTime')
search_in = ciop.search(end_point=series_in,
params=dict([('count', 100),('geom',geom),('update','%s/%s' %(start,now))]),
output_fields='self',
model='GeoTime')
#print search_queue
#print search_in
print (str(len(search_in)) + ' ' + str(len(search_queue)))
else:
stop_pipe = datetime.now(pytz.timezone('utc')).replace(microsecond=0)
print 'PIPE finished @ %s' %stop_pipe
1 24
In [12]:
import time
import folium
lat = (loads(geom).bounds[3]+loads(geom).bounds[1])/2
lon = (loads(geom).bounds[2]+loads(geom).bounds[0])/2
zoom_start = 4
m = folium.Map(location=[lat, lon], zoom_start=zoom_start)
radius = 4
folium.CircleMarker(
location=[lat, lon],
radius=radius,
color='#FF0000',
stroke=False,
fill=True,
fill_opacity=0.6,
opacity=1,
popup='{} pixels'.format(radius),
tooltip='I am in pixels',
).add_to(m)
# add the AOI
locations = []
geoms = loads(geom)
print geoms.geometryType()
if geoms.geometryType() == 'Polygon':
locations.append([t[::-1] for t in list(geoms.exterior.coords)])
elif geoms.geometryType() == 'MultiPolygon':
for index, entry in enumerate(geoms):
locations.append([t[::-1] for t in list(entry.exterior.coords)])
now = datetime.now(pytz.timezone('utc')).replace(microsecond=0)
folium.PolyLine(
locations=locations,
color='#FF0000',
weight=2,
tooltip='',
).add_to(m)
colors = ['blue', 'orange', 'green', 'black']
if execution.isNotComplete():
for series_index, series_type in enumerate(['queue', 'in', 'out', 'err']):
search_locations = []
try:
search = ciop.search(end_point = series_endpoint,
params = dict([('count', 100), ('cat', series_type), ('geom',geoms),('update','%s/%s' %(start,now))]),
output_fields='identifier,wkt',
model='GeoTime')
for index, elem in enumerate(search):
search_locations.append([t[::-1] for t in list(loads(elem['wkt']).exterior.coords)])
folium.PolyLine(
locations=search_locations,
color=colors[series_index],
weight=1,
opacity=1,
smooth_factor=0,
).add_to(m)
except IndexError:
continue
m.save(os.path.join('results', '%s_search.html' % 'discovery'))
m
MultiPolygon
Out[12]: