Test the trigger in pipe modeΒΆ

This Jupyter Notebook queries the catalog for a Sentinel-1 GRD product, creates a Web Processing Service (WPS) request invoking the data transformation application that was deployed in Step 8, monitors the WPS request execution and finally retrieves the data transformation execution results

  • First do the imports of the Python libraries required
In [1]:
import sys
import os

import owslib
from owslib.wps import monitorExecution
from owslib.wps import WebProcessingService


import lxml.etree as etree

import cioppy

from shapely.wkt import loads
import getpass

import folium

from datetime import datetime, timedelta
import dateutil.parser

from nbconvert.preprocessors import ExecutePreprocessor, CellExecutionError
import nbformat as nbf
  • Read the data pipeline configuration information:
In [2]:
%store -r

nb_config = os.path.join('..', 'configuration.ipynb')

nb = nbf.read(nb_config, 4)

exec(nb['cells'][1]['source']) in globals(), locals()

app = dict([('artifact_id', app_artifact_id),
            ('version', app_version),
            ('repository', repository),
            ('community', community)])

app_process_id = '%s_%s_%s_%s' % (app['community'].replace('-', '_'),
                                  app['artifact_id'].replace('-', '_'),
                                  app['artifact_id'].replace('-', '_'),
                                  app['version'].replace('.', '_'))

trigger_pipe = dict([('artifact_id', trigger_pipe_artifact_id),
                      ('version', trigger_pipe_version),
                      ('repository', repository),
                      ('folder', folder),
                      ('community', community)])

trigger_pipe_process_id = '%s_%s_%s_%s' % (trigger_pipe['community'].replace('-', '_'),
                                           trigger_pipe['artifact_id'].replace('-', '_'),
                                           trigger_pipe['artifact_id'].replace('-', '_'),
                                           trigger_pipe['version'].replace('.', '_'))

print 'This notebook will process the queue of %s with the trigger %s' % (app_process_id,
                                                                          trigger_pipe_process_id)
This notebook will process the queue of ec_better_ewf_satcen_01_01_01_ewf_satcen_01_01_01_0_14 with the trigger ec_better_tg_satcen_01_01_01_pipe_tg_satcen_01_01_01_pipe_0_4
  • Set the data pipeline catalogue series endpoints:
In [3]:
series_queue = 'https://catalog.terradue.com/%s/series/source-queue/description' % data_pipeline
series_in = 'https://catalog.terradue.com/%s/series/source-in/description' % data_pipeline
series_out = 'https://catalog.terradue.com/%s/series/source-out/description' % data_pipeline

series_endpoint = 'https://catalog.terradue.com/%s/description' % data_pipeline
  • set the update range of the queue entries:
In [4]:
update = '2019-02-21T12:56:04.067077Z/2019-02-21T13:00:16.212351Z'

In [5]:
wps_url = '%s/zoo-bin/zoo_loader.cgi' % trigger_deployer

wps = WebProcessingService(wps_url, verbose=False, skip_caps=True)

wps.getcapabilities()

process = wps.describeprocess(trigger_pipe_process_id)

print process.title
Trigger for the SATCEN-01-01-01 Pipe
  • List the WPS process inputs:
In [6]:
for data_input in process.dataInputs:
    print data_input.identifier
series
data_pipeline
update
api_key
geom
quotation
_T2Username
  • Create a Python dictionary with the inputs:
In [7]:
series = series_queue
wps_url = '%s/zoo-bin/zoo_loader.cgi' % apps_deployer
recovery = 'No'
tg_quotation = 'No'
quotation = "False"
In [8]:
inputs = [('series', series),
          ('data_pipeline', data_pipeline),
          ('wps_url', wps_url),
          ('process_id', app_process_id),
          ('update', update),
          ('geom',''),
          ('recovery', recovery),
          ('tg_quotation', tg_quotation),
          ('api_key', datapipeline_api_key),
          ('quotation', quotation),
          ('_T2Username', data_pipeline)]
  • Submit the Execute WPS and monitor the request:
In [9]:
execution = owslib.wps.WPSExecution(url=wps.url)

execution_request = execution.buildRequest(trigger_pipe_process_id,
                                           inputs,
                                           output=[('result_osd', False)])

execution_response = execution.submitRequest(etree.tostring(execution_request))

execution.parseResponse(execution_response)

print(execution.statusLocation)
http://ec-better-triggers-deployer.terradue.com/zoo-bin/zoo_loader.cgi?request=Execute&service=WPS&version=1.0.0&Identifier=GetStatus&DataInputs=sid=23d1c1b0-3145-11e9-9b9b-0242ac110012&RawDataOutput=Result
  • Check the outcome of the processing request
In [10]:
ciop = cioppy.Cioppy()

if execution.isNotComplete():

    search_queue = ciop.search(end_point=series_queue,
                               params=dict([('count', 100)]),
                               output_fields='self',
                               model='GeoTime')

    search_in = ciop.search(end_point=series_in,
                            params=dict([('count', 100)]),
                            output_fields='self',
                            model='GeoTime')

    print (str(len(search_in)) + ' ' + str(len(search_queue)))
1 4
In [11]:
import time
import folium

geom = 'MULTIPOLYGON (((-77.24516666666666 0.9424444444444445, -77.24516666666666 2.753833333333333, -79.0253888888889 2.753833333333333, -79.0253888888889 0.9424444444444445, -77.24516666666666 0.9424444444444445)), ((21.29611111111111 39.58638888888889, 21.29611111111111 41.032, 19.89788888888889 41.032, 19.89788888888889 39.58638888888889, 21.29611111111111 39.58638888888889)), ((65.02055555555556 30.43894444444445, 65.02055555555556 33.39566666666666, 63.94222222222222 33.39566666666666, 63.94222222222222 30.43894444444445, 65.02055555555556 30.43894444444445)), ((19.6875 44.82172222222222, 19.6875 45.97733333333333, 18.70788888888889 45.97733333333333, 18.70788888888889 44.82172222222222, 19.6875 44.82172222222222)))'

lat = (loads(geom).bounds[3]+loads(geom).bounds[1])/2
lon = (loads(geom).bounds[2]+loads(geom).bounds[0])/2

zoom_start = 4

m = folium.Map(location=[lat, lon], zoom_start=zoom_start)

radius = 4
folium.CircleMarker(
    location=[lat, lon],
    radius=radius,
    color='#FF0000',
    stroke=False,
    fill=True,
    fill_opacity=0.6,
    opacity=1,
    popup='{} pixels'.format(radius),
    tooltip='I am in pixels',
).add_to(m)

# add the AOI
locations = []

for index, entry in enumerate(loads(geom)):

    locations.append([t[::-1] for t in list(entry.exterior.coords)])

folium.PolyLine(
    locations=locations,
    color='#FF0000',
    weight=2,
    tooltip='',
).add_to(m)

colors = ['blue', 'orange', 'green', 'black']

if execution.isNotComplete():

    for series_index, series_type in enumerate(['queue', 'in', 'out', 'err']):

        search_locations = []

        try:
            search = ciop.search(end_point = series_endpoint,
                     params = dict([('count', 100), ('cat', series_type)]),
                     output_fields='identifier,wkt',
                     model='GeoTime')

            for index, elem in enumerate(search):

                 search_locations.append([t[::-1] for t in list(loads(elem['wkt']).exterior.coords)])

            folium.PolyLine(
                locations=search_locations,
                color=colors[series_index],
                weight=1,
                opacity=1,
                smooth_factor=0,
            ).add_to(m)

        except IndexError:
            continue

    m.save(os.path.join('results', '%s_search.html' % 'discovery'))

m
Out[11]: