Test the trigger in pipe modeΒΆ

This Jupyter Notebook queries the catalog for a Sentinel-1 GRD product, creates a Web Processing Service (WPS) request invoking the data transformation application that was deployed in Step 8, monitors the WPS request execution and finally retrieves the data transformation execution results

  • First do the imports of the Python libraries required
In [107]:
import sys
import os

import owslib
from owslib.wps import monitorExecution
from owslib.wps import WebProcessingService


import lxml.etree as etree

import cioppy

from shapely.wkt import loads
import getpass

import folium

from datetime import datetime, timedelta
import dateutil.parser
  • Set the information about the data pipeline and its associated Ellip API key:
In [108]:
data_pipeline = 'better-ethz-00003'
In [124]:
api_key = getpass.getpass('What is the Ellip platform API key for user "%s"?' % data_pipeline)
  • Set the data pipeline catalogue series endpoints:
In [111]:
series_queue = 'https://catalog.terradue.com/%s/series/source-queue/description' % data_pipeline
series_in = 'https://catalog.terradue.com/%s/series/source-in/description' % data_pipeline
series_out = 'https://catalog.terradue.com/%s/series/source-out/description' % data_pipeline

series_endpoint = 'https://catalog.terradue.com/%s/description' % data_pipeline
  • Set the data transformation application that will be triggered:
In [120]:
app = dict([('artifact_id', 'ewf-ethz-01-03-01'),
            ('version', '1.0'),
            ('repository', 'Gitlab Groups'),
            ('community', 'ec-better')])

data_transformation_id = ('%s_%s_%s_%s' % (app['community'],
                                           app['artifact_id'],
                                           app['artifact_id'],
                                           app['version'])).replace('-', '_').replace('.', '_')

print 'This notebook will process the %s queue' % data_transformation_id
This notebook will process the ec_better_wfp_01_01_01_wfp_01_01_01_1_0 queue
  • Set the trigger information and mode:
In [112]:
trigger = dict([('artifact_id', 'tg-ethz-01-03-01-pipe'),
                ('version', '0.4'),
                ('community', 'ec-better')])

trigger_id = ('%s_%s_%s_%s' % (trigger['community'],
                                     trigger['artifact_id'],
                                     trigger['artifact_id'],
                                      trigger['version'])).replace('-', '_').replace('.', '_')

print 'This notebook will process the queue of %s with the trigger %s' % (data_transformation_id,
                                                                                              trigger_id)

This notebook will process the queue of ec_better_wfp_01_01_01_wfp_01_01_01_1_0 with the trigger ec_better_tg_better_wfp_00001_pipe_tg_better_wfp_00001_pipe_0_4
  • Set the deployers to use for the data tranformation application and for the trigger:
In [121]:
apps_deployer = 'https://%s-apps-deployer.terradue.com' % app['community']
trigger_deployer = 'https://%s-triggers-deployer.terradue.com' % trigger['community']
In [115]:
update = "2017-09-01T00:00:00/2017-09-10T23:59:59"
In [116]:
wps_url = '%s/zoo-bin/zoo_loader.cgi' % trigger_deployer

wps = WebProcessingService(wps_url, verbose=False, skip_caps=True)

wps.getcapabilities()

for index, elem in enumerate(wps.processes):
    print(index, elem.identifier)

process = wps.describeprocess(trigger_id)

print process.title

print process.abstract
(0, 'ec_better_tg_better_wfp_00001_via_tg_better_wfp_00001_via_0_1')
(1, 'ec_better_tg_better_wfp_00001_pipe_tg_better_wfp_00001_pipe_0_4')
(2, 'ec_better_tg_ethz_01_03_01_queue_tg_ethz_01_03_01_queue_1_1')
(3, 'TerradueUnDeployProcess')
(4, 'ec_better_tg_better_wfp_00001_queue_tg_better_wfp_00001_queue_0_1')
(5, 'ec_better_tg_ethz_01_03_01_via_tg_tg_ethz_01_03_01_via_1_1')
(6, 'GetStatus')
(7, 'ec_better_tg_better_wfp_00001_queue_tg_better_wfp_00001_queue_0_3')
(8, 'TerradueDeployProcess')
(9, 'ec_better_wfp_01_01_01_wfp_01_01_01_1_0')
WFP-01-01-01 Trigger - Pipe
Trigger for the WFP-01-01-01 Sentinel-1 backscatter timeseries data pipeline - Pipe
  • List the WPS process inputs:
In [117]:
for data_input in process.dataInputs:
    print data_input.identifier
series
data_pipeline
wps_url
process_id
update
api_key
recovery
quotation
_T2Username
  • Create a Python dictionary with the inputs:
In [125]:
series = series_queue
wps_url = '%s/zoo-bin/zoo_loader.cgi' % apps_deployer
recovery = 'No'
tg_quotation = 'No'
quotation = "False"
In [126]:
inputs = [('series', series),
          ('data_pipeline', data_pipeline),
          ('wps_url', wps_url),
          ('process_id', data_transformation_id),
          ('update', update),
          ('recovery', recovery),
          ('tg_quotation', tg_quotation),
          ('api_key', api_key),
          ('quotation', quotation),
           ('_T2Username', data_pipeline)]
  • Submit the Execute WPS and monitor the request:
In [127]:
execution = owslib.wps.WPSExecution(url=wps.url)

execution_request = execution.buildRequest(process_id,
                                           inputs,
                                           output=[('result_osd', False)])

execution_response = execution.submitRequest(etree.tostring(execution_request))

execution.parseResponse(execution_response)

print(execution.statusLocation)
http://ec-better-triggers-deployer.terradue.com/zoo-bin/zoo_loader.cgi?request=Execute&service=WPS&version=1.0.0&Identifier=GetStatus&DataInputs=sid=7a3cb4f8-8c1b-11e8-a5ef-0242ac11000f&RawDataOutput=Result
  • Check the outcome of the processing request
In [128]:
if execution.isNotComplete():
    search_queue = ciop.search(end_point = series_queue,
                     params = dict([('count', 100)]),
                     output_fields='self',
                     model='GeoTime')

    search_in = ciop.search(end_point = series_in,
                     params = dict([('count', 100)]),
                     output_fields='self',
                     model='GeoTime')
    print (str(len(search_in)) + ' ' + str(len(search_queue)))
1 1
In [129]:
import time
import folium

geom = 'MULTIPOLYGON (((26.832 9.5136, 28.6843 9.5136, 28.6843 7.8009, 26.832 7.8009, 26.832 9.5136)), ((32.0572 12.4549, 33.9087 12.4549, 33.9087 10.7344, 32.0572 10.7344, 32.0572 12.4549)), ((-5.5 17.26, -1.08 17.26, -1.08 13.5, -5.5 13.5, -5.5 17.26)), ((12.9415 13.7579, 14.6731 13.7579, 14.6731 12.0093, 12.9415 12.0093, 12.9415 13.7579)))'



lat = (loads(geom).bounds[3]+loads(geom).bounds[1])/2
lon = (loads(geom).bounds[2]+loads(geom).bounds[0])/2

zoom_start = 4

m = folium.Map(location=[lat, lon], zoom_start=zoom_start)

radius = 4
folium.CircleMarker(
    location=[lat, lon],
    radius=radius,
    color='#FF0000',
    stroke=False,
    fill=True,
    fill_opacity=0.6,
    opacity=1,
    popup='{} pixels'.format(radius),
    tooltip='I am in pixels',
).add_to(m)

# add the AOI
locations = []

for index, entry in enumerate(loads(geom)):

    locations.append([t[::-1] for t in list(entry.exterior.coords)])

folium.PolyLine(
    locations=locations,
    color='#FF0000',
    weight=2,
    tooltip='',
).add_to(m)

colors = ['blue', 'orange', 'green', 'black']

if execution.isNotComplete():

    for series_index, series_type in enumerate(['queue', 'in', 'out', 'err']):

        search_locations = []

        try:
            search = ciop.search(end_point = series_endpoint,
                     params = dict([('count', 100), ('cat', series_type)]),
                     output_fields='identifier,wkt',
                     model='GeoTime')

            for index, elem in enumerate(search):

                 search_locations.append([t[::-1] for t in list(loads(elem['wkt']).exterior.coords)])

            folium.PolyLine(
                locations=search_locations,
                color=colors[series_index],
                weight=1,
                opacity=1,
                smooth_factor=0,
            ).add_to(m)

        except IndexError:
            continue

    m.save(os.path.join('results', '%s_search.html' % 'discovery'))

m
Out[129]: