Test the trigger in pipe modeΒΆ

This Jupyter Notebook queries the catalog for a Landsat-8 dataitems in the related source queue series, creates a Web Processing Service (WPS) request invoking the data transformation application that was deployed in Step 8, monitors the WPS request execution and finally retrieves the data transformation execution results

  • First do the imports of the Python libraries required
In [13]:
import sys
import os

import owslib
from owslib.wps import monitorExecution
from owslib.wps import WebProcessingService
import pytz

import lxml.etree as etree

import cioppy
ciop = cioppy.Cioppy()

from shapely.wkt import loads
import getpass

import folium

from datetime import datetime, timedelta
import dateutil.parser

from nbconvert.preprocessors import ExecutePreprocessor, CellExecutionError
import nbformat as nbf
  • Read the data pipeline configuration information:
In [14]:
%store -r

nb_config = os.path.join('..', 'configuration.ipynb')

nb = nbf.read(nb_config, 4)


exec(nb['cells'][0]['source']) in globals(), locals()
#exec(nb['cells'][1]['source']) in globals(), locals()

print app_artifact_id

app = dict([('artifact_id', app_artifact_id),
            ('version', app_version),
            ('repository', repository),
            ('community', community)])

app_process_id = '%s_%s_%s_%s' % (app['community'].replace('-', '_'),
                                  app['artifact_id'].replace('-', '_'),
                                  app['artifact_id'].replace('-', '_'),
                                  app['version'].replace('.', '_'))

trigger_pipe = dict([('artifact_id', trigger_pipe_artifact_id),
                      ('version', trigger_pipe_version),
                      ('repository', repository),
                      ('folder', folder),
                      ('community', community)])

trigger_pipe_process_id = '%s_%s_%s_%s' % (trigger_pipe['community'].replace('-', '_'),
                                           trigger_pipe['artifact_id'].replace('-', '_'),
                                           trigger_pipe['artifact_id'].replace('-', '_'),
                                           trigger_pipe['version'].replace('.', '_'))

print 'This notebook will process the queue of %s with the trigger %s' % (app_process_id,
                                                                          trigger_pipe_process_id)
ewf-wfp-01-02-02
This notebook will process the queue of ec_better_ewf_wfp_01_02_02_ewf_wfp_01_02_02_0_11 with the trigger ec_better_tg_wfp_01_02_02_pipe_tg_wfp_01_02_02_pipe_0_7
  • Set the data pipeline catalogue series endpoints:
In [15]:
series_queue = 'https://catalog.terradue.com/%s/series/source-queue/description' % data_pipeline
series_in = 'https://catalog.terradue.com/%s/series/source-in/description' % data_pipeline
series_out = 'https://catalog.terradue.com/%s/series/source-out/description' % data_pipeline

series_endpoint = 'https://catalog.terradue.com/%s/description' % data_pipeline
  • Set the update range of the queue entries:
In [16]:
update = '2019-02-03T11:48:44.945679Z/2019-02-03T11:54:12.623734Z'
start = update.split('/')[0]#'2018-10-20T09:17:55.868804Z'

In [17]:
wps_url = '%s/zoo-bin/zoo_loader.cgi' % trigger_deployer

wps = WebProcessingService(wps_url, verbose=False, skip_caps=True)

wps.getcapabilities()

process = wps.describeprocess(trigger_pipe_process_id)

print process.title

print process.abstract
Trigger for the WFP-01-02-02 Pipe
Trigger for the WFP-01-02-02 Landsat-8 reflectances and vegetation indices data pipeline - Pipe
  • List the WPS process inputs:
In [18]:
for data_input in process.dataInputs:
    print data_input.identifier
series
data_pipeline
update
geom
api_key
recovery
quotation
_T2Username
  • Create a Python dictionary with the inputs:
In [19]:
series = series_queue

recovery = 'No'
tg_quotation = 'No'
quotation = "False"

geom = ''
In [20]:
inputs = [('series', series),
          ('data_pipeline', data_pipeline),
          ('update', update),
          ('geom', geom),
          ('api_key', datapipeline_api_key),
          ('recovery',recovery),
          ('quotation', quotation),
           ('_T2Username', data_pipeline)]
  • Submit the Execute WPS and monitor the request:
In [21]:
execution = owslib.wps.WPSExecution(url=wps.url)

execution_request = execution.buildRequest(trigger_pipe_process_id,
                                           inputs,
                                           output=[('result_osd', False)])

start_pipe = datetime.now(pytz.timezone('utc')).replace(microsecond=0)

execution_response = execution.submitRequest(etree.tostring(execution_request))

execution.parseResponse(execution_response)

print(execution.statusLocation)
http://ec-better-triggers-deployer.terradue.com/zoo-bin/zoo_loader.cgi?request=Execute&service=WPS&version=1.0.0&Identifier=GetStatus&DataInputs=sid=24719f6c-27b0-11e9-8ee1-0242ac110012&RawDataOutput=Result
In [22]:
print 'percentCompleted : %s '%execution.percentCompleted
#print 'status : %s' %execution.checkStatus()
print 'execution is completed: %s' %execution.isComplete()
print 'execution is not completed: %s' %execution.isNotComplete()
execution.statusMessage
print 'execution is succeeded: %s' %execution.isSucceded()
percentCompleted : 0
execution is completed: False
execution is not completed: True
execution is succeeded: False
  • Check the outcome of the processing request
In [11]:
geom = 'MULTIPOLYGON (((6.4788 14.5973, 7.5577 14.5973, 7.5577 13.6328, 6.4788 13.6328, 6.4788 14.5973)), ((67.7116 37.9032, 68.791 37.9032, 68.791 36.9211, 67.7116 36.9211, 67.7116 37.9032)), ((-10.3668 15.3471, -9.351800000000001 15.3471, -9.351800000000001 14.3406, -10.3668 14.3406, -10.3668 15.3471)), ((67.62430000000001 36.7228, 68.116 36.7228, 68.116 35.6923, 67.62430000000001 35.6923, 67.62430000000001 36.7228)))'

now = datetime.now(pytz.timezone('utc')).replace(microsecond=0)


if execution.isNotComplete():

    search_queue = ciop.search(end_point=series_queue,
                               params=dict([('count', 100),('geom',geom),('update','%s/%s' %(start,now))]),
                               output_fields='self',
                               model='GeoTime')

    search_in = ciop.search(end_point=series_in,
                            params=dict([('count', 100),('geom',geom),('update','%s/%s' %(start,now))]),
                            output_fields='self',
                            model='GeoTime')

    #print search_queue
    #print search_in

    print (str(len(search_in)) + ' ' + str(len(search_queue)))
else:
    stop_pipe = datetime.now(pytz.timezone('utc')).replace(microsecond=0)
    print 'PIPE finished @ %s' %stop_pipe
1 24
In [12]:
import time
import folium


lat = (loads(geom).bounds[3]+loads(geom).bounds[1])/2
lon = (loads(geom).bounds[2]+loads(geom).bounds[0])/2

zoom_start = 4

m = folium.Map(location=[lat, lon], zoom_start=zoom_start)

radius = 4
folium.CircleMarker(
    location=[lat, lon],
    radius=radius,
    color='#FF0000',
    stroke=False,
    fill=True,
    fill_opacity=0.6,
    opacity=1,
    popup='{} pixels'.format(radius),
    tooltip='I am in pixels',
).add_to(m)

# add the AOI
locations = []

geoms = loads(geom)
print geoms.geometryType()

if geoms.geometryType() == 'Polygon':

    locations.append([t[::-1] for t in list(geoms.exterior.coords)])

elif geoms.geometryType() == 'MultiPolygon':

    for index, entry in enumerate(geoms):

        locations.append([t[::-1] for t in list(entry.exterior.coords)])

now = datetime.now(pytz.timezone('utc')).replace(microsecond=0)

folium.PolyLine(
    locations=locations,
    color='#FF0000',
    weight=2,
    tooltip='',
).add_to(m)

colors = ['blue', 'orange', 'green', 'black']


if execution.isNotComplete():

    for series_index, series_type in enumerate(['queue', 'in', 'out', 'err']):

        search_locations = []

        try:
            search = ciop.search(end_point = series_endpoint,
                     params = dict([('count', 100), ('cat', series_type), ('geom',geoms),('update','%s/%s' %(start,now))]),
                     output_fields='identifier,wkt',
                     model='GeoTime')

            for index, elem in enumerate(search):

                 search_locations.append([t[::-1] for t in list(loads(elem['wkt']).exterior.coords)])

            folium.PolyLine(
                locations=search_locations,
                color=colors[series_index],
                weight=1,
                opacity=1,
                smooth_factor=0,
            ).add_to(m)

        except IndexError:
            continue

    m.save(os.path.join('results', '%s_search.html' % 'discovery'))

m
MultiPolygon
Out[12]: