Test the trigger in pipe modeΒΆ

This Jupyter Notebook queries the catalog for a Sentinel-1 GRD product, creates a Web Processing Service (WPS) request invoking the data transformation application that was deployed in Step 8, monitors the WPS request execution and finally retrieves the data transformation execution results

  • First do the imports of the Python libraries required
In [1]:
import sys
import os

import owslib
from owslib.wps import monitorExecution
from owslib.wps import WebProcessingService


import lxml.etree as etree

import cioppy

from shapely.wkt import loads
import getpass

import folium

from datetime import datetime, timedelta
import dateutil.parser

from nbconvert.preprocessors import ExecutePreprocessor, CellExecutionError
import nbformat as nbf
  • Read the data pipeline configuration information:
In [2]:
%store -r

nb_config = os.path.join('..', 'configuration.ipynb')

nb = nbf.read(nb_config, 4)

exec(nb['cells'][1]['source']) in globals(), locals()

app = dict([('artifact_id', app_artifact_id),
            ('version', app_version),
            ('repository', repository),
            ('community', community)])

app_process_id = '%s_%s_%s_%s' % (app['community'].replace('-', '_'),
                                  app['artifact_id'].replace('-', '_'),
                                  app['artifact_id'].replace('-', '_'),
                                  app['version'].replace('.', '_'))

trigger_pipe = dict([('artifact_id', trigger_pipe_artifact_id),
                      ('version', trigger_pipe_version),
                      ('repository', repository),
                      ('folder', folder),
                      ('community', community)])

trigger_pipe_process_id = '%s_%s_%s_%s' % (trigger_pipe['community'].replace('-', '_'),
                                           trigger_pipe['artifact_id'].replace('-', '_'),
                                           trigger_pipe['artifact_id'].replace('-', '_'),
                                           trigger_pipe['version'].replace('.', '_'))

print 'This notebook will process the queue of %s with the trigger %s' % (app_process_id,
                                                                          trigger_pipe_process_id)
This notebook will process the queue of ec_better_ewf_wfp_01_02_03_ewf_wfp_01_02_03_0_5 with the trigger ec_better_tg_wfp_01_02_03_pipe_tg_wfp_01_02_03_pipe_0_9
  • Set the data pipeline catalogue series endpoints:
In [3]:
series_queue = 'https://catalog.terradue.com/%s/series/source-queue/description' % data_pipeline
series_in = 'https://catalog.terradue.com/%s/series/source-in/description' % data_pipeline
series_out = 'https://catalog.terradue.com/%s/series/source-out/description' % data_pipeline

series_endpoint = 'https://catalog.terradue.com/%s/description' % data_pipeline
  • set the update range of the queue entries:
In [4]:
update = '2019-01-16T18:32:00.135195Z/2019-01-16T18:42:46.207528Z'

In [5]:
wps_url = '%s/zoo-bin/zoo_loader.cgi' % trigger_deployer

wps = WebProcessingService(wps_url, verbose=False, skip_caps=True)

wps.getcapabilities()

process = wps.describeprocess(trigger_pipe_process_id)

print process.title
Trigger for the WFP-01-02-03 Pipe
  • List the WPS process inputs:
In [6]:
for data_input in process.dataInputs:
    print data_input.identifier
series
data_pipeline
wps_url
process_id
api_key
tg_quotation
update
geom
quotation
_T2Username
  • Create a Python dictionary with the inputs:
In [10]:
series = series_queue
wps_url = '%s/zoo-bin/zoo_loader.cgi' % apps_deployer
recovery = 'No'
tg_quotation = 'No'
quotation = "False"
In [11]:
inputs = [('series', series),
          ('data_pipeline', data_pipeline),
          ('wps_url', wps_url),
          ('process_id', app_process_id),
          ('api_key', datapipeline_api_key),
          ('tg_quotation', tg_quotation),
          ('update', update),
          ('geom',''),
          ('quotation', quotation),
          ('_T2Username', data_pipeline)]

  • Submit the Execute WPS and monitor the request:
In [12]:
execution = owslib.wps.WPSExecution(url=wps.url)

execution_request = execution.buildRequest(trigger_pipe_process_id,
                                           inputs,
                                           output=[('result_osd', False)])

execution_response = execution.submitRequest(etree.tostring(execution_request))

execution.parseResponse(execution_response)

print(execution.statusLocation)
http://ec-better-triggers-deployer.terradue.com/zoo-bin/zoo_loader.cgi?request=Execute&service=WPS&version=1.0.0&Identifier=GetStatus&DataInputs=sid=d1ee0cf2-19be-11e9-a3f0-0242ac110012&RawDataOutput=Result
  • Check the outcome of the processing request
In [13]:
ciop = cioppy.Cioppy()

if execution.isNotComplete():

    search_queue = ciop.search(end_point=series_queue,
                               params=dict([('count', 100)]),
                               output_fields='self',
                               model='GeoTime')

    search_in = ciop.search(end_point=series_in,
                            params=dict([('count', 100)]),
                            output_fields='self',
                            model='GeoTime')

    print (str(len(search_in)) + ' ' + str(len(search_queue)))
1 39
In [14]:
import time
import folium

geom = 'MULTIPOLYGON (((6.4788 14.5973, 7.5577 14.5973, 7.5577 13.6328, 6.4788 13.6328, 6.4788 14.5973)), ((67.7116 37.9032, 68.791 37.9032, 68.791 36.9211, 67.7116 36.9211, 67.7116 37.9032)), ((-10.3668 15.3471, -9.351800000000001 15.3471, -9.351800000000001 14.3406, -10.3668 14.3406, -10.3668 15.3471)), ((67.62430000000001 36.7228, 68.116 36.7228, 68.116 35.6923, 67.62430000000001 35.6923, 67.62430000000001 36.7228)))'
lat = (loads(geom).bounds[3]+loads(geom).bounds[1])/2
lon = (loads(geom).bounds[2]+loads(geom).bounds[0])/2

zoom_start = 4

m = folium.Map(location=[lat, lon], zoom_start=zoom_start)

radius = 4
folium.CircleMarker(
    location=[lat, lon],
    radius=radius,
    color='#FF0000',
    stroke=False,
    fill=True,
    fill_opacity=0.6,
    opacity=1,
    popup='{} pixels'.format(radius),
    tooltip='I am in pixels',
).add_to(m)

# add the AOI
locations = []

for index, entry in enumerate(loads(geom)):

    locations.append([t[::-1] for t in list(entry.exterior.coords)])

folium.PolyLine(
    locations=locations,
    color='#FF0000',
    weight=2,
    tooltip='',
).add_to(m)

colors = ['blue', 'orange', 'green', 'black']

if execution.isNotComplete():

    for series_index, series_type in enumerate(['queue', 'in', 'out', 'err']):

        search_locations = []

        try:
            search = ciop.search(end_point = series_endpoint,
                     params = dict([('count', 100), ('cat', series_type)]),
                     output_fields='identifier,wkt',
                     model='GeoTime')

            for index, elem in enumerate(search):

                 search_locations.append([t[::-1] for t in list(loads(elem['wkt']).exterior.coords)])

            folium.PolyLine(
                locations=search_locations,
                color=colors[series_index],
                weight=1,
                opacity=1,
                smooth_factor=0,
            ).add_to(m)

        except IndexError:
            continue

    m.save(os.path.join('results', '%s_search.html' % 'discovery'))

m
Out[14]: