Test the trigger in pipe modeΒΆ

This Jupyter Notebook queries the catalog for a Sentinel-1 GRD product, creates a Web Processing Service (WPS) request invoking the data transformation application that was deployed in Step 8, monitors the WPS request execution and finally retrieves the data transformation execution results

  • First do the imports of the Python libraries required
In [1]:
import sys
import os

import owslib
from owslib.wps import monitorExecution
from owslib.wps import WebProcessingService


import lxml.etree as etree

import cioppy

from shapely.wkt import loads
import getpass

import folium

from datetime import datetime, timedelta
import dateutil.parser

from nbconvert.preprocessors import ExecutePreprocessor, CellExecutionError
import nbformat as nbf
  • Read the data pipeline configuration information:
In [2]:
%store -r

nb_config = os.path.join('..', 'configuration.ipynb')

nb = nbf.read(nb_config, 4)

exec(nb['cells'][1]['source']) in globals(), locals()

app = dict([('artifact_id', app_artifact_id),
            ('version', app_version),
            ('repository', repository),
            ('community', community)])

app_process_id = '%s_%s_%s_%s' % (app['community'].replace('-', '_'),
                                  app['artifact_id'].replace('-', '_'),
                                  app['artifact_id'].replace('-', '_'),
                                  app['version'].replace('.', '_'))

trigger_pipe = dict([('artifact_id', trigger_pipe_artifact_id),
                      ('version', trigger_pipe_version),
                      ('repository', repository),
                      ('folder', folder),
                      ('community', community)])

trigger_pipe_process_id = '%s_%s_%s_%s' % (trigger_pipe['community'].replace('-', '_'),
                                           trigger_pipe['artifact_id'].replace('-', '_'),
                                           trigger_pipe['artifact_id'].replace('-', '_'),
                                           trigger_pipe['version'].replace('.', '_'))

print 'This notebook will process the queue of %s with the trigger %s' % (app_process_id,
                                                                          trigger_pipe_process_id)
This notebook will process the queue of ec_better_ewf_wfp_01_01_01_ewf_wfp_01_01_01_1_18 with the trigger ec_better_tg_wfp_01_01_01_pipe_tg_wfp_01_01_01_pipe_0_11
  • Set the data pipeline catalogue series endpoints:
In [3]:
series_queue = 'https://catalog.terradue.com/%s/series/source-queue/description' % data_pipeline
series_in = 'https://catalog.terradue.com/%s/series/source-in/description' % data_pipeline
series_out = 'https://catalog.terradue.com/%s/series/source-out/description' % data_pipeline

series_endpoint = 'https://catalog.terradue.com/%s/description' % data_pipeline
In [4]:
series_queue
Out[4]:
'https://catalog.terradue.com/better-wfp-00001/series/source-queue/description'
  • set the update range of the queue entries:
In [4]:
update = '2019-02-19T00:00:00Z/2019-02-20T23:59:59Z'

In [5]:
wps_url = '%s/zoo-bin/zoo_loader.cgi' % trigger_deployer

wps = WebProcessingService(wps_url, verbose=False, skip_caps=True)

wps.getcapabilities()

process = wps.describeprocess(trigger_pipe_process_id)

print process.title
WFP-01-01-01 Trigger - Pipe
  • List the WPS process inputs:
In [6]:
for data_input in process.dataInputs:
    print data_input.identifier
series
data_pipeline
wps_url
process_id
api_key
recovery
quotation
_T2Username
In [8]:
data_pipeline
Out[8]:
'better-wfp-00001'
  • Create a Python dictionary with the inputs:
In [9]:
series = series_queue
wps_url = '%s/zoo-bin/zoo_loader.cgi' % apps_deployer
recovery = 'No'
tg_quotation = 'No'
quotation = 'False'
In [10]:
inputs = [('series', series),
          ('data_pipeline', data_pipeline),
          ('wps_url', wps_url),
          ('process_id', app_process_id),
          ('update', update),
          ('recovery', recovery),
          ('tg_quotation', tg_quotation),
          ('api_key', datapipeline_api_key),
          ('quotation', quotation),
          ('_T2Username', data_pipeline)]
  • Submit the Execute WPS and monitor the request:
In [11]:
execution = owslib.wps.WPSExecution(url=wps.url)

execution_request = execution.buildRequest(trigger_pipe_process_id,
                                           inputs,
                                           output=[('result_osd', False)])

execution_response = execution.submitRequest(etree.tostring(execution_request))

execution.parseResponse(execution_response)

print(execution.statusLocation)
http://ec-better-triggers-deployer.terradue.com/zoo-bin/zoo_loader.cgi?request=Execute&service=WPS&version=1.0.0&Identifier=GetStatus&DataInputs=sid=680e93b2-34ec-11e9-b64c-0242ac110012&RawDataOutput=Result
  • Check the outcome of the processing request
In [12]:
ciop = cioppy.Cioppy()

if execution.isNotComplete():

    search_queue = ciop.search(end_point=series_queue,
                               params=dict([('count', 100)]),
                               output_fields='self',
                               model='GeoTime')

    search_in = ciop.search(end_point=series_in,
                            params=dict([('count', 100)]),
                            output_fields='self',
                            model='GeoTime')

    print (str(len(search_in)) + ' ' + str(len(search_queue)))
1 4
In [16]:
import time
import folium

geom = 'MULTIPOLYGON (((26.832 9.5136, 28.6843 9.5136, 28.6843 7.8009, 26.832 7.8009, 26.832 9.5136)), ((32.0572 12.4549, 33.9087 12.4549, 33.9087 10.7344, 32.0572 10.7344, 32.0572 12.4549)), ((-5.5 17.26, -1.08 17.26, -1.08 13.5, -5.5 13.5, -5.5 17.26)), ((12.9415 13.7579, 14.6731 13.7579, 14.6731 12.0093, 12.9415 12.0093, 12.9415 13.7579)))'

lat = (loads(geom).bounds[3]+loads(geom).bounds[1])/2
lon = (loads(geom).bounds[2]+loads(geom).bounds[0])/2

zoom_start = 4

m = folium.Map(location=[lat, lon], zoom_start=zoom_start)

radius = 4
folium.CircleMarker(
    location=[lat, lon],
    radius=radius,
    color='#FF0000',
    stroke=False,
    fill=True,
    fill_opacity=0.6,
    opacity=1,
    popup='{} pixels'.format(radius),
    tooltip='I am in pixels',
).add_to(m)

# add the AOI
locations = []

for index, entry in enumerate(loads(geom)):

    locations.append([t[::-1] for t in list(entry.exterior.coords)])

folium.PolyLine(
    locations=locations,
    color='#FF0000',
    weight=2,
    tooltip='',
).add_to(m)

colors = ['blue', 'orange', 'green', 'black']

if execution.isNotComplete():

    for series_index, series_type in enumerate(['queue', 'in', 'out', 'err']):

        search_locations = []

        try:
            search = ciop.search(end_point = series_endpoint,
                     params = dict([('count', 100), ('cat', series_type)]),
                     output_fields='identifier,wkt',
                     model='GeoTime')

            for index, elem in enumerate(search):

                 search_locations.append([t[::-1] for t in list(loads(elem['wkt']).exterior.coords)])

            folium.PolyLine(
                locations=search_locations,
                color=colors[series_index],
                weight=1,
                opacity=1,
                smooth_factor=0,
            ).add_to(m)

        except IndexError:
            continue

    m.save(os.path.join('results', '%s_search.html' % 'discovery'))

m
Out[16]: