Test the trigger in pipe modeΒΆ
This Jupyter Notebook queries the catalog for a Sentinel-2 L2A product, creates a Web Processing Service (WPS) request invoking the data transformation application that was deployed in Step 8, monitors the WPS request execution and finally retrieves the data transformation execution results
- First do the imports of the Python libraries required
In [ ]:
import sys
import os
import owslib
from owslib.wps import monitorExecution
from owslib.wps import WebProcessingService
import pytz
import lxml.etree as etree
import cioppy
ciop = cioppy.Cioppy()
from shapely.wkt import loads
import getpass
import folium
from datetime import datetime, timedelta
import dateutil.parser
from nbconvert.preprocessors import ExecutePreprocessor, CellExecutionError
import nbformat as nbf
- Read the data pipeline configuration information:
In [ ]:
%store -r
nb_config = os.path.join('..', 'configuration.ipynb')
nb = nbf.read(nb_config, 4)
exec(nb['cells'][1]['source']) in globals(), locals()
print app_artifact_id
app = dict([('artifact_id', app_artifact_id),
('version', app_version),
('repository', repository),
('community', community)])
app_process_id = '%s_%s_%s_%s' % (app['community'].replace('-', '_'),
app['artifact_id'].replace('-', '_'),
app['artifact_id'].replace('-', '_'),
app['version'].replace('.', '_'))
trigger_pipe = dict([('artifact_id', trigger_pipe_artifact_id),
('version', trigger_pipe_version),
('repository', repository),
('folder', folder),
('community', community)])
trigger_pipe_process_id = '%s_%s_%s_%s' % (trigger_pipe['community'].replace('-', '_'),
trigger_pipe['artifact_id'].replace('-', '_'),
trigger_pipe['artifact_id'].replace('-', '_'),
trigger_pipe['version'].replace('.', '_'))
print 'This notebook will process the queue of %s with the trigger %s' % (app_process_id,
trigger_pipe_process_id)
- Set the data pipeline catalogue series endpoints:
In [ ]:
series_queue = 'https://catalog.terradue.com/%s/series/source-queue/description' % data_pipeline
series_in = 'https://catalog.terradue.com/%s/series/source-in/description' % data_pipeline
series_out = 'https://catalog.terradue.com/%s/series/source-out/description' % data_pipeline
series_endpoint = 'https://catalog.terradue.com/%s/description' % data_pipeline
In [ ]:
apps_deployer = 'https://%s-apps-deployer.terradue.com' % app['community']
trigger_deployer = 'https://%s-triggers-deployer.terradue.com' % trigger_pipe['community']
In [ ]:
apps_deployer
In [ ]:
trigger_deployer
- set the update range of the queue entries:
In [7]:
update = '2019-01-02T10:47:18.028123Z/2019-01-02T10:52:26.754808Z'
start = '2019-01-02T10:47:18.028123Z'
In [8]:
wps_url = '%s/zoo-bin/zoo_loader.cgi' % trigger_deployer
wps = WebProcessingService(wps_url, verbose=False, skip_caps=True)
wps.getcapabilities()
process = wps.describeprocess(trigger_pipe_process_id)
print process.title
Trigger for the WFP-01-02-01 Pipe
- List the WPS process inputs:
In [9]:
print process.abstract
Trigger for the WFP-01-02-01 Sentinel-2 reflectances and vegetation indices data pipeline - Pipe
In [10]:
for data_input in process.dataInputs:
print data_input.identifier
series
data_pipeline
wps_url
process_id
update
geom
api_key
quotation
_T2Username
- Create a Python dictionary with the inputs:
In [11]:
series = series_queue
wps_url = '%s/zoo-bin/zoo_loader.cgi' % apps_deployer
recovery = 'No'
tg_quotation = 'No'
quotation = "False"
In [12]:
series_queue
Out[12]:
'https://catalog.terradue.com/better-wfp-00003/series/source-queue/description'
In [19]:
series
Out[19]:
'https://catalog.terradue.com/better-wfp-00003/series/source-queue/description'
In [39]:
geom='MULTIPOLYGON (((6.4788 14.5973, 7.5577 14.5973, 7.5577 13.6328, 6.4788 13.6328, 6.4788 14.5973)), ((67.7116 37.9032, 68.791 37.9032, 68.791 36.9211, 67.7116 36.9211, 67.7116 37.9032)), ((-10.3668 15.3471, -9.3518 15.3471, -9.3518 14.3406, -10.3668 14.3406, -10.3668 15.3471)), ((67.62430000000001 36.7228, 68.116 36.7228, 68.116 35.6923, 67.62430000000001 35.6923, 67.62430000000001 36.7228)))'
wkt = loads(geom)[3].wkt
In [22]:
inputs = [('series', series),
('data_pipeline', data_pipeline),
('wps_url', '%s/zoo-bin/zoo_loader.cgi' % apps_deployer),
('process_id', app_process_id),
('update', update),
('geom', wkt.replace(' ', '%20').replace(',', '%2C')),
('api_key', datapipeline_api_key),
('quotation', quotation),
('_T2Username', data_pipeline)]
- Submit the Execute WPS and monitor the request:
In [23]:
execution = owslib.wps.WPSExecution(url=wps.url)
execution_request = execution.buildRequest(trigger_pipe_process_id,
inputs,
output=[('result_osd', False)])
start_pipe = datetime.now(pytz.timezone('utc')).replace(microsecond=0)
execution_response = execution.submitRequest(etree.tostring(execution_request))
execution.parseResponse(execution_response)
print(execution.statusLocation)
http://ec-better-triggers-deployer.terradue.com/zoo-bin/zoo_loader.cgi?request=Execute&service=WPS&version=1.0.0&Identifier=GetStatus&DataInputs=sid=35c054e2-fdea-11e8-9ab2-0242ac110012&RawDataOutput=Result
- Check the outcome of the processing request
In [24]:
print 'percentCompleted : %s '%execution.percentCompleted
print 'status : %s' %execution.checkStatus()
print execution.isComplete()
execution.isNotComplete()
execution.statusMessage
execution.isSucceded()
http://ec-better-triggers-deployer.terradue.com/zoo-bin/zoo_loader.cgi?request=Execute&service=WPS&version=1.0.0&Identifier=GetStatus&DataInputs=sid=35c054e2-fdea-11e8-9ab2-0242ac110012&RawDataOutput=Result
percentCompleted : 0
status : None
False
Out[24]:
False
In [42]:
now = datetime.now(pytz.timezone('utc')).replace(microsecond=0)
if execution.isNotComplete():
search_queue = ciop.search(end_point=series_queue,
params=dict([('count', 100),('geom',wkt),('update','%s/%s' %(start,now))]),
output_fields='self',
model='GeoTime')
search_in = ciop.search(end_point=series_in,
params=dict([('count', 100),('geom',wkt),('update','%s/%s' %(start,now))]),
output_fields='self',
model='GeoTime')
#print search_queue
#print search_in
print ('source-in items: %s - source-queue items: %s ' %(len(search_in),len(search_queue)))
else:
stop_pipe = datetime.now(pytz.timezone('utc')).replace(microsecond=0)
print 'PIPE finished @ %s' %stop_pipe
source-in items: 8 - source-queue items: 31
In [43]:
import time
import folium
lat = (loads(geom).bounds[3]+loads(geom).bounds[1])/2
lon = (loads(geom).bounds[2]+loads(geom).bounds[0])/2
zoom_start = 4
m = folium.Map(location=[lat, lon], zoom_start=zoom_start)
radius = 4
folium.CircleMarker(
location=[lat, lon],
radius=radius,
color='#FF0000',
stroke=False,
fill=True,
fill_opacity=0.6,
opacity=1,
popup='{} pixels'.format(radius),
tooltip='I am in pixels',
).add_to(m)
# add the AOI
locations = []
for index, entry in enumerate(loads(geom)):
locations.append([t[::-1] for t in list(entry.exterior.coords)])
folium.PolyLine(
locations=locations,
color='#FF0000',
weight=2,
tooltip='',
).add_to(m)
colors = ['blue', 'orange', 'green', 'black']
if execution.isNotComplete():
for series_index, series_type in enumerate(['queue', 'in', 'out', 'err']):
search_locations = []
try:
search = ciop.search(end_point = series_endpoint,
params = dict([('count', 100), ('cat', series_type)]),
output_fields='identifier,wkt',
model='GeoTime')
for index, elem in enumerate(search):
search_locations.append([t[::-1] for t in list(loads(elem['wkt']).exterior.coords)])
folium.PolyLine(
locations=search_locations,
color=colors[series_index],
weight=1,
opacity=1,
smooth_factor=0,
).add_to(m)
except IndexError:
continue
m.save(os.path.join('results', '%s_search.html' % 'discovery'))
m
Out[43]: