Test the trigger in pipe modeΒΆ

This Jupyter Notebook queries the catalog for a Sentinel-2 L2A product, creates a Web Processing Service (WPS) request invoking the data transformation application that was deployed in Step 8, monitors the WPS request execution and finally retrieves the data transformation execution results

  • First do the imports of the Python libraries required
In [ ]:
import sys
import os

import owslib
from owslib.wps import monitorExecution
from owslib.wps import WebProcessingService
import pytz

import lxml.etree as etree

import cioppy
ciop = cioppy.Cioppy()

from shapely.wkt import loads
import getpass

import folium

from datetime import datetime, timedelta
import dateutil.parser

from nbconvert.preprocessors import ExecutePreprocessor, CellExecutionError
import nbformat as nbf
  • Read the data pipeline configuration information:
In [ ]:
%store -r

nb_config = os.path.join('..', 'configuration.ipynb')

nb = nbf.read(nb_config, 4)

exec(nb['cells'][1]['source']) in globals(), locals()

print app_artifact_id

app = dict([('artifact_id', app_artifact_id),
            ('version', app_version),
            ('repository', repository),
            ('community', community)])

app_process_id = '%s_%s_%s_%s' % (app['community'].replace('-', '_'),
                                  app['artifact_id'].replace('-', '_'),
                                  app['artifact_id'].replace('-', '_'),
                                  app['version'].replace('.', '_'))

trigger_pipe = dict([('artifact_id', trigger_pipe_artifact_id),
                      ('version', trigger_pipe_version),
                      ('repository', repository),
                      ('folder', folder),
                      ('community', community)])

trigger_pipe_process_id = '%s_%s_%s_%s' % (trigger_pipe['community'].replace('-', '_'),
                                           trigger_pipe['artifact_id'].replace('-', '_'),
                                           trigger_pipe['artifact_id'].replace('-', '_'),
                                           trigger_pipe['version'].replace('.', '_'))

print 'This notebook will process the queue of %s with the trigger %s' % (app_process_id,
                                                                          trigger_pipe_process_id)
  • Set the data pipeline catalogue series endpoints:
In [ ]:
series_queue = 'https://catalog.terradue.com/%s/series/source-queue/description' % data_pipeline
series_in = 'https://catalog.terradue.com/%s/series/source-in/description' % data_pipeline
series_out = 'https://catalog.terradue.com/%s/series/source-out/description' % data_pipeline

series_endpoint = 'https://catalog.terradue.com/%s/description' % data_pipeline
In [ ]:
apps_deployer = 'https://%s-apps-deployer.terradue.com' % app['community']
trigger_deployer = 'https://%s-triggers-deployer.terradue.com' % trigger_pipe['community']
In [ ]:
apps_deployer
In [ ]:
trigger_deployer
  • set the update range of the queue entries:
In [7]:
update = '2019-01-02T10:47:18.028123Z/2019-01-02T10:52:26.754808Z'
start = '2019-01-02T10:47:18.028123Z'
In [8]:
wps_url = '%s/zoo-bin/zoo_loader.cgi' % trigger_deployer

wps = WebProcessingService(wps_url, verbose=False, skip_caps=True)

wps.getcapabilities()

process = wps.describeprocess(trigger_pipe_process_id)

print process.title
Trigger for the WFP-01-02-01 Pipe
  • List the WPS process inputs:
In [9]:
print process.abstract
Trigger for the WFP-01-02-01 Sentinel-2 reflectances and vegetation indices data pipeline - Pipe
In [10]:
for data_input in process.dataInputs:
    print data_input.identifier
series
data_pipeline
wps_url
process_id
update
geom
api_key
quotation
_T2Username
  • Create a Python dictionary with the inputs:
In [11]:
series = series_queue
wps_url = '%s/zoo-bin/zoo_loader.cgi' % apps_deployer
recovery = 'No'
tg_quotation = 'No'
quotation = "False"
In [12]:
series_queue
Out[12]:
'https://catalog.terradue.com/better-wfp-00003/series/source-queue/description'
In [19]:
series
Out[19]:
'https://catalog.terradue.com/better-wfp-00003/series/source-queue/description'
In [39]:
geom='MULTIPOLYGON (((6.4788 14.5973, 7.5577 14.5973, 7.5577 13.6328, 6.4788 13.6328, 6.4788 14.5973)), ((67.7116 37.9032, 68.791 37.9032, 68.791 36.9211, 67.7116 36.9211, 67.7116 37.9032)), ((-10.3668 15.3471, -9.3518 15.3471, -9.3518 14.3406, -10.3668 14.3406, -10.3668 15.3471)), ((67.62430000000001 36.7228, 68.116 36.7228, 68.116 35.6923, 67.62430000000001 35.6923, 67.62430000000001 36.7228)))'

wkt = loads(geom)[3].wkt
In [22]:
inputs = [('series', series),
          ('data_pipeline', data_pipeline),
          ('wps_url', '%s/zoo-bin/zoo_loader.cgi' % apps_deployer),
          ('process_id', app_process_id),
          ('update', update),
          ('geom', wkt.replace(' ', '%20').replace(',', '%2C')),
          ('api_key', datapipeline_api_key),
          ('quotation', quotation),
          ('_T2Username', data_pipeline)]
  • Submit the Execute WPS and monitor the request:
In [23]:
execution = owslib.wps.WPSExecution(url=wps.url)

execution_request = execution.buildRequest(trigger_pipe_process_id,
                                           inputs,
                                           output=[('result_osd', False)])
start_pipe = datetime.now(pytz.timezone('utc')).replace(microsecond=0)

execution_response = execution.submitRequest(etree.tostring(execution_request))

execution.parseResponse(execution_response)

print(execution.statusLocation)
http://ec-better-triggers-deployer.terradue.com/zoo-bin/zoo_loader.cgi?request=Execute&service=WPS&version=1.0.0&Identifier=GetStatus&DataInputs=sid=35c054e2-fdea-11e8-9ab2-0242ac110012&RawDataOutput=Result
  • Check the outcome of the processing request
In [24]:
print 'percentCompleted : %s '%execution.percentCompleted
print 'status : %s' %execution.checkStatus()
print execution.isComplete()
execution.isNotComplete()
execution.statusMessage
execution.isSucceded()
http://ec-better-triggers-deployer.terradue.com/zoo-bin/zoo_loader.cgi?request=Execute&service=WPS&version=1.0.0&Identifier=GetStatus&DataInputs=sid=35c054e2-fdea-11e8-9ab2-0242ac110012&RawDataOutput=Result
percentCompleted : 0
status : None
False
Out[24]:
False
In [42]:
now = datetime.now(pytz.timezone('utc')).replace(microsecond=0)


if execution.isNotComplete():

    search_queue = ciop.search(end_point=series_queue,
                               params=dict([('count', 100),('geom',wkt),('update','%s/%s' %(start,now))]),
                               output_fields='self',
                               model='GeoTime')

    search_in = ciop.search(end_point=series_in,
                            params=dict([('count', 100),('geom',wkt),('update','%s/%s' %(start,now))]),
                            output_fields='self',
                            model='GeoTime')

    #print search_queue
    #print search_in

    print ('source-in items: %s  - source-queue items: %s ' %(len(search_in),len(search_queue)))
else:
    stop_pipe = datetime.now(pytz.timezone('utc')).replace(microsecond=0)
    print 'PIPE finished @ %s' %stop_pipe
source-in items: 8  - source-queue items: 31
In [43]:
import time
import folium

lat = (loads(geom).bounds[3]+loads(geom).bounds[1])/2
lon = (loads(geom).bounds[2]+loads(geom).bounds[0])/2

zoom_start = 4

m = folium.Map(location=[lat, lon], zoom_start=zoom_start)

radius = 4
folium.CircleMarker(
    location=[lat, lon],
    radius=radius,
    color='#FF0000',
    stroke=False,
    fill=True,
    fill_opacity=0.6,
    opacity=1,
    popup='{} pixels'.format(radius),
    tooltip='I am in pixels',
).add_to(m)

# add the AOI
locations = []

for index, entry in enumerate(loads(geom)):

    locations.append([t[::-1] for t in list(entry.exterior.coords)])

folium.PolyLine(
    locations=locations,
    color='#FF0000',
    weight=2,
    tooltip='',
).add_to(m)

colors = ['blue', 'orange', 'green', 'black']

if execution.isNotComplete():

    for series_index, series_type in enumerate(['queue', 'in', 'out', 'err']):

        search_locations = []

        try:
            search = ciop.search(end_point = series_endpoint,
                     params = dict([('count', 100), ('cat', series_type)]),
                     output_fields='identifier,wkt',
                     model='GeoTime')

            for index, elem in enumerate(search):

                 search_locations.append([t[::-1] for t in list(loads(elem['wkt']).exterior.coords)])

            folium.PolyLine(
                locations=search_locations,
                color=colors[series_index],
                weight=1,
                opacity=1,
                smooth_factor=0,
            ).add_to(m)

        except IndexError:
            continue

    m.save(os.path.join('results', '%s_search.html' % 'discovery'))

m
Out[43]: