Test the trigger WFP-01-01-02

This Notebook tests:

  • the process for feeding the data pipeline queue with data items
  • the execution of a single data item
  • the piping of all the queued data items
  • First do the imports of the Python libraries required
In [1]:
import sys
import os

import owslib
from owslib.wps import monitorExecution
from owslib.wps import WebProcessingService

import ellip_triggers
import pandas as pd

import lxml.etree as etree
import requests

import cioppy

from shapely.wkt import loads
import getpass

import folium

from datetime import datetime, timedelta
import dateutil.parser

from nbconvert.preprocessors import ExecutePreprocessor, CellExecutionError
import nbformat as nbf

import pandas as pd
from geopandas import GeoDataFrame

import time
  • Read the data pipeline configuration information:
In [2]:
%store -r

nb_config = os.path.join('..', 'configuration.ipynb')

nb = nbf.read(nb_config, 4)

exec(nb['cells'][1]['source']) in globals(), locals()

app = dict([('artifact_id', app_artifact_id),
            ('version', app_version),
            ('repository', repository),
            ('community', community)])

app_process_id = '%s_%s_%s_%s' % (app['community'].replace('-', '_'),
                                  app['artifact_id'].replace('-', '_'),
                                  app['artifact_id'].replace('-', '_'),
                                  app['version'].replace('.', '_'))

trigger_queue = dict([('artifact_id', trigger_queue_artifact_id),
                      ('version', trigger_queue_version),
                      ('repository', repository),
                      ('folder', folder),
                      ('community', community)])

trigger_queue_process_id = '%s_%s_%s_%s' % (trigger_queue['community'].replace('-', '_'),
                                            trigger_queue['artifact_id'].replace('-', '_'),
                                            trigger_queue['artifact_id'].replace('-', '_'),
                                            trigger_queue['version'].replace('.', '_'))




print 'This notebook will create data items and insert them into the queue with the trigger %s to invoke the application %s with the local trigger pipe' % (trigger_queue_process_id,
                                                                                                                          app_process_id)
This notebook will create data items and insert them into the queue with the trigger ec_better_tg_wfp_01_01_02_queue_tg_wfp_01_01_02_queue_0_8 to invoke the application ec_better_ewf_wfp_01_01_02_ewf_wfp_01_01_02_0_20 with the local trigger pipe
  • Establish the connection with the WPS server hosting the triggers:
In [3]:
wps_url = '%s/zoo-bin/zoo_loader.cgi' % trigger_deployer

wps = WebProcessingService(wps_url,
                           verbose=False,
                           skip_caps=True)

wps.getcapabilities()

process = wps.describeprocess(trigger_queue_process_id)

print process.title
print process.abstract
Trigger for the WFP-01-01-02 Queue
Trigger for the WFP-01-01-02 Sentinel-1 coherence timeseries - Queue
In [4]:
trigger_queue_process_id
Out[4]:
'ec_better_tg_wfp_01_01_02_queue_tg_wfp_01_01_02_queue_0_8'
  • List the WPS trigger queue process data inputs:
In [5]:
for data_input in process.dataInputs:

    print data_input.identifier
Sources
data_pipeline
wps_url
process_id
api_key
tg_quotation
polarisation
swaths
cohWinRg
cohWinAz
wkt
update
start
stop
product_type
swath
geom
quotation
_T2Username
  • Create a Python dictionary with the inputs:
In [6]:
geom = 'MULTIPOLYGON (((30.360611 8.155859,28.120043 8.608053,28.444235 10.23355,30.695391 9.785728,30.360611 8.155859)), ((30.24551 7.595311,28.008101 8.04909,28.367687 9.847935,30.616158 9.399037,30.24551 7.595311)), ((-5.5 17.26, -1.08 17.26, -1.08 13.5, -5.5 13.5, -5.5 17.26)), ((67.7116 37.9032, 68.791 37.9032, 68.791 36.9211, 67.7116 36.9211, 67.7116 37.9032)))'

In [7]:
wkt = loads(geom)[1]
AoI=wkt.wkt
In [8]:
AoI
Out[8]:
'POLYGON ((30.24551 7.595311, 28.008101 8.04909, 28.367687 9.847935, 30.616158 9.399037, 30.24551 7.595311))'
In [9]:
tg_quotation = 'No'
quotation = 'False'
series = 'https://catalog.terradue.com/sentinel1/description'
update = ''
start = '2018-08-28T03:50:50Z'
stop = '2018-08-28T03:51:18Z'

inputs = [('sources', series),
          ('data_pipeline', data_pipeline),
          ('wps_url', '%s/zoo-bin/zoo_loader.cgi' % apps_deployer),
          ('process_id', app_process_id),
          ('api_key', 'AKCp5bBXii3cnz78Xu1X5CvoTVRNS33ug7PcNng7bKdWpBwgDmDUmkcn5zpugg2hCsSRFPrhz'),
          ('tg_quotation', tg_quotation),
          ('polarisation','VV'),
          ('swaths','IW3'),
          ('cohWinRg','20'),
          ('cohWinAz','5'),
          ('wkt', AoI),
          ('update', update),
          ('start', start),
          ('stop', stop),
          ('product_type','SLC'),
          ('swath','IW1 IW2 IW3'),
          ('geom',AoI),
          ('quotation', quotation),
          ('_T2Username', data_pipeline)]

'2017-01-11T11:34:09.376962Z/2017-01-11T11:34:09.376965Z'
'2017-01-17T08:35:40.265835Z/2017-01-17T08:35:40.265838Z'
  • Submit the Execute WPS and monitor the request:
In [10]:
start_date_queue = datetime.utcnow().isoformat()

execution = owslib.wps.WPSExecution(url=wps_url)

execution_request = execution.buildRequest(trigger_queue_process_id,
                                           inputs,
                                           output=[('result_osd', False)])

execution_response = execution.submitRequest(etree.tostring(execution_request))

execution.parseResponse(execution_response)

print('Queue process status location:')
print(execution.statusLocation)
Queue process status location:
http://ec-better-triggers-deployer-creodias-c1.terradue.com/zoo-bin/zoo_loader.cgi?request=Execute&service=WPS&version=1.0.0&Identifier=GetStatus&DataInputs=sid=9f45db12-4a52-11e9-881a-0242ac110005&RawDataOutput=Result
In [11]:
monitorExecution(execution)
  • Check the outcome of the processing request
In [12]:
if not execution.isSucceded():

    raise Exception('Processing failed')
In [13]:
stop_date_queue = datetime.utcnow().isoformat()
print 'queue update range: %s/%s' %(start_date_queue,stop_date_queue)
queue update range: 2019-03-19T14:23:52.950231/2019-03-19T14:41:01.100788
  • Search the queue content
In [ ]:
series_queue = 'https://catalog.terradue.com/%s/series/source-queue/description' % data_pipeline

print series_queue
In [ ]:
print start_date_queue
print stop_date_queue

In [4]:
ciop = cioppy.Cioppy()

search_params_queue = dict()

search_params_queue['count'] = '200'
start_date_queue='2019-03-18T09:49:00'
stop_date_queue = '2019-03-18T09:50:00'
search_params_queue['update'] = '%sZ/%sZ' %(start_date_queue,stop_date_queue)

print search_params_queue

search_queue = ciop.search(end_point=series_queue,
                            params=search_params_queue,
                            output_fields='self,identifier,wkt,startdate',
                            model='GeoTime',
                            timeout=50000)


{'count': '200', 'update': '2019-03-18T09:49:00Z/2019-03-18T09:50:00Z'}
In [8]:
for i,item in enumerate(search_queue):
    print i,item['identifier'],item['startdate'],item['wkt']
0 6f5f525bd0123e9dae16ae775c8df7de 2018-08-22T03:49:59.0310000Z POLYGON((30.051582 6.653241,27.819071 7.109609,28.145733 8.735811,30.386953 8.283947,30.051582 6.653241))
  • plot the queued items
In [9]:
m = folium.Map(
    location=[45, 90],
    zoom_start=2,
    tiles='Stamen Terrain'
)

for row in search_queue:
    row['geometry'] = loads(row.pop('wkt'))
    gdf = GeoDataFrame([row])
    gdf.crs = {'init': 'epsg:4326', 'no_defs': True}
    geojson = folium.GeoJson(gdf)
    popup = folium.Popup('<i>%s</i>' % row['identifier'])
    popup.add_to(geojson)
    geojson.add_to(m)

m
Out[9]:

Test single queued date item submission

Get the first queue data item and submit the WPS Execution

In [11]:
search_queue = GeoDataFrame(search_queue)
data_item = search_queue.iloc[0]['self']
print data_item
root = etree.fromstring(requests.get(data_item).content)
print root
https://catalog.terradue.com//better-wfp-00002/series/source-queue/search?format=atom&uid=6f5f525bd0123e9dae16ae775c8df7de
<Element {http://www.w3.org/2005/Atom}feed at 0x7f5d7356ed88>
In [12]:
ns = {'a':'http://www.w3.org/2005/Atom',
      'b':'http://www.opengis.net/owc/1.0',
      'c':'http://www.opengis.net/wps/1.0.0',
      'd':'http://www.opengis.net/ows/1.1'}
In [13]:
wps_url = root.xpath('/a:feed/a:entry/b:offering/b:operation[@code="Execute"]',
                                 namespaces=ns)[0].attrib['href']

wps_url
Out[13]:
'https://ec-better-apps-deployer.terradue.com/zoo/'
In [14]:
process_id = root.xpath('/a:feed/a:entry/b:offering/b:operation[@code="Execute"]/b:request/c:Execute/d:Identifier',
                                 namespaces=ns)[0].text

process_id
Out[14]:
'ec_better_ewf_wfp_01_01_02_ewf_wfp_01_01_02_0_18'
In [15]:
identifiers = root.xpath('/a:feed/a:entry/b:offering/b:operation[@code="Execute"]/b:request/c:Execute/c:DataInputs/c:Input/d:Identifier',
                                 namespaces=ns)

values = root.xpath('/a:feed/a:entry/b:offering/b:operation[@code="Execute"]/b:request/c:Execute/c:DataInputs/c:Input/c:Data/c:LiteralData',
                                 namespaces=ns)

params = dict()

for index, elem in enumerate(identifiers):

    params[elem.text] = values[index].text


params
Out[15]:
{'_T2Username': 'better-wfp-00002',
 'cohWinAz': '5',
 'cohWinRg': '20',
 'polarisation': 'VV',
 'quotation': 'No',
 'source': 'https://catalog.terradue.com/sentinel1/search?format=atom&uid=S1B_IW_SLC__1SDV_20180822T034959_20180822T035025_012370_016CE1_E4E1,https://catalog.terradue.com:443/sentinel1/search?format=atomeop&uid=S1A_IW_SLC__1SDV_20180828T035050_20180828T035118_023441_028D29_EE64',
 'swaths': 'IW1,IW2,IW3'}
  • Submit the request
In [16]:
wps = WebProcessingService(wps_url, verbose=False, skip_caps=True)

execution = owslib.wps.WPSExecution(url=wps.url)

execution_request = root.xpath('/a:feed/a:entry/b:offering/b:operation[@code="Execute"]/b:request/c:Execute',
                                 namespaces=ns)[0]


execution_response = execution.submitRequest(etree.tostring(execution_request))

execution.parseResponse(execution_response)

print execution.statusLocation
http://ec-better-apps-deployer.terradue.com/zoo-bin/zoo_loader.cgi?request=Execute&service=WPS&version=1.0.0&Identifier=GetStatus&DataInputs=sid=05b9dd68-4967-11e9-8333-0242ac11000f&RawDataOutput=Result
In [ ]:
monitorExecution(execution)
In [ ]:
if not execution.isSucceded():
    raise Exception('Single data item submission failed')
In [ ]:
for output in execution.processOutputs:
    print(output.identifier)

results_osd = execution.processOutputs[0].reference

print results_osd
In [ ]:
search_results = GeoDataFrame(ciop.search(end_point=results_osd,
                         params=[],
                         output_fields='title,enclosure',
                         model='GeoTime',
                         timeout=50000))

search_results

Pipe the queue

Run the pipe execution with the local trigger framework and process all data items queued previously

In [5]:
#trigger_pipe = ellip_triggers.Trigger(data_pipeline,
#                                      data_pipeline,
#                                      datapipeline_api_key,
#                                      '',
#                                      app_process_id)

wps_url = '%s/zoo-bin/zoo_loader.cgi' % apps_deployer
print data_pipeline, datapipeline_api_key

trigger_pipe = ellip_triggers.Trigger(data_pipeline,
                                      data_pipeline,
                                      datapipeline_api_key,
                                      wps_url,
                                      app_process_id)

better-wfp-00002 AKCp5bBXii3cnz78Xu1X5CvoTVRNS33ug7PcNng7bKdWpBwgDmDUmkcn5zpugg2hCsSRFPrhz
reporter:status:2019-03-18T11:28:17.298159 [WARNING] [user process] No data pipeline configuration found (expected string or buffer), using default config
2019-03-18T11:28:17.298159 [WARNING] [user process] No data pipeline configuration found (expected string or buffer), using default config
reporter:status:2019-03-18T11:28:17.680400 [INFO   ] [user process] {
  "max_err_recovery_in": 2,
  "total_queue": 1,
  "max_err_recovery_retry": 2,
  "total_in": 0,
  "max_in": 10,
  "total_err": 0
}
2019-03-18T11:28:17.680400 [INFO   ] [user process] {
  "max_err_recovery_in": 2,
  "total_queue": 1,
  "max_err_recovery_retry": 2,
  "total_in": 0,
  "max_in": 10,
  "total_err": 0
}
In [6]:
start_date_queue = '2019-03-18T00:00:00.00Z'
stop_date_queue = '2019-03-18T23:59:59.99Z'

search_params = dict()
search_params['cat'] = '{in,queue}'
search_params['count'] = '10'
#search_params_queue['update'] = '%s/%s' %(start_date_queue,stop_date_queue)
search_params['update'] = '%s/%s' %(start_date_queue,stop_date_queue)


end_point = "https://catalog.terradue.com/{0}/search".format(data_pipeline)

  • loop as many times as required until queue is empty
In [7]:
exit = False

while not exit:
    try:
        queue_search = ciop.search(end_point=end_point,
                                   params=search_params,
                                   output_fields='self,title',
                                   model='GeoTime',
                                   timeout=50000)


        for index, elem in enumerate(queue_search):

            data_input_queue_ref = trigger_pipe.create_data_item_from_single_reference(elem['self'])

            trigger_pipe.pipe(data_input_queue_ref)

        print '------------- sleeping ------------------'
        print '------------- sleeping ------------------'
        print '------------- sleeping ------------------'
        print '------------- sleeping ------------------'
        print '------------- sleeping ------------------'

        time.sleep(180)

    except IndexError:

        exit = True
reporter:status:2019-03-18T11:28:57.175024 [INFO   ] [user process] Processing source-queue data item : https://catalog.terradue.com/better-wfp-00002/search?uid=6f5f525bd0123e9dae16ae775c8df7de
2019-03-18T11:28:57.175024 [INFO   ] [user process] Processing source-queue data item : https://catalog.terradue.com/better-wfp-00002/search?uid=6f5f525bd0123e9dae16ae775c8df7de
reporter:status:2019-03-18T11:28:57.175510 [INFO   ] [user process] Job Submission...
2019-03-18T11:28:57.175510 [INFO   ] [user process] Job Submission...
reporter:status:2019-03-18T11:28:57.340686 [INFO   ] [user process] WPS submission OK. status location: http://ec-better-apps-deployer.terradue.com/zoo-bin/zoo_loader.cgi?request=Execute&service=WPS&version=1.0.0&Identifier=GetStatus&DataInputs=sid=a3193684-4968-11e9-9994-0242ac11000f&RawDataOutput=Result
2019-03-18T11:28:57.340686 [INFO   ] [user process] WPS submission OK. status location: http://ec-better-apps-deployer.terradue.com/zoo-bin/zoo_loader.cgi?request=Execute&service=WPS&version=1.0.0&Identifier=GetStatus&DataInputs=sid=a3193684-4968-11e9-9994-0242ac11000f&RawDataOutput=Result
reporter:status:2019-03-18T11:28:57.470330 [INFO   ] [user process] Data item updated https://catalog.terradue.com/better-wfp-00002/search?uid=6f5f525bd0123e9dae16ae775c8df7de (200)
2019-03-18T11:28:57.470330 [INFO   ] [user process] Data item updated https://catalog.terradue.com/better-wfp-00002/search?uid=6f5f525bd0123e9dae16ae775c8df7de (200)
reporter:status:2019-03-18T11:28:57.470766 [DEBUG  ] [user process] {"added":0,"updated":1,"deleted":0,"errors":0,"items":[{"id":"6f5f525bd0123e9dae16ae775c8df7de","type":"gtfeature","operation":"Update"}]}
2019-03-18T11:28:57.470766 [DEBUG  ] [user process] {"added":0,"updated":1,"deleted":0,"errors":0,"items":[{"id":"6f5f525bd0123e9dae16ae775c8df7de","type":"gtfeature","operation":"Update"}]}
reporter:status:2019-03-18T11:28:57.470956 [INFO   ] [user process] Metrics
2019-03-18T11:28:57.470956 [INFO   ] [user process] Metrics
------------- sleeping ------------------
------------- sleeping ------------------
------------- sleeping ------------------
------------- sleeping ------------------
------------- sleeping ------------------
reporter:status:2019-03-18T11:32:01.142721 [INFO   ] [user process] Processing source-in data item : https://catalog.terradue.com/better-wfp-00002/search?uid=6f5f525bd0123e9dae16ae775c8df7de
2019-03-18T11:32:01.142721 [INFO   ] [user process] Processing source-in data item : https://catalog.terradue.com/better-wfp-00002/search?uid=6f5f525bd0123e9dae16ae775c8df7de
reporter:status:2019-03-18T11:32:01.143146 [INFO   ] [user process] Checking running processing for DI https://catalog.terradue.com/better-wfp-00002/search?uid=6f5f525bd0123e9dae16ae775c8df7de (http://ec-better-apps-deployer.terradue.com/zoo-bin/zoo_loader.cgi?request=Execute&service=WPS&version=1.0.0&Identifier=GetStatus&DataInputs=sid=a3193684-4968-11e9-9994-0242ac11000f&RawDataOutput=Result)
2019-03-18T11:32:01.143146 [INFO   ] [user process] Checking running processing for DI https://catalog.terradue.com/better-wfp-00002/search?uid=6f5f525bd0123e9dae16ae775c8df7de (http://ec-better-apps-deployer.terradue.com/zoo-bin/zoo_loader.cgi?request=Execute&service=WPS&version=1.0.0&Identifier=GetStatus&DataInputs=sid=a3193684-4968-11e9-9994-0242ac11000f&RawDataOutput=Result)
reporter:status:2019-03-18T11:32:01.143304 [INFO   ] [user process] Metrics
2019-03-18T11:32:01.143304 [INFO   ] [user process] Metrics
 owslib.wps.WPSException : {'locator': 'https://recast.terradue.com/t2api/searchfile/better-wfp-00002/_results/workflows/ec_better_ewf_wfp_01_01_02_ewf_wfp_01_01_02_0_18/run/a3193684-4968-11e9-9994-0242ac11000f/0035474-181221095105003-oozie-oozi-W/.t2error.json', 'code': 'NoApplicableCode', 'text': 'Application has been Killed!\n'}
 owslib.wps.WPSException : {'locator': 'https://recast.terradue.com/t2api/searchfile/better-wfp-00002/_results/workflows/ec_better_ewf_wfp_01_01_02_ewf_wfp_01_01_02_0_18/run/a3193684-4968-11e9-9994-0242ac11000f/0035474-181221095105003-oozie-oozi-W/.t2error.json', 'code': 'NoApplicableCode', 'text': 'Application has been Killed!\n'}
 owslib.wps.WPSException : {'locator': 'https://recast.terradue.com/t2api/searchfile/better-wfp-00002/_results/workflows/ec_better_ewf_wfp_01_01_02_ewf_wfp_01_01_02_0_18/run/a3193684-4968-11e9-9994-0242ac11000f/0035474-181221095105003-oozie-oozi-W/.t2error.json', 'code': 'NoApplicableCode', 'text': 'Application has been Killed!\n'}
reporter:status:2019-03-18T11:32:01.465010 [INFO   ] [user process] ProcessFailed for DI https://catalog.terradue.com/better-wfp-00002/search?uid=6f5f525bd0123e9dae16ae775c8df7de
2019-03-18T11:32:01.465010 [INFO   ] [user process] ProcessFailed for DI https://catalog.terradue.com/better-wfp-00002/search?uid=6f5f525bd0123e9dae16ae775c8df7de
reporter:status:2019-03-18T11:32:01.465238 [INFO   ] [user process] Metrics
2019-03-18T11:32:01.465238 [INFO   ] [user process] Metrics
reporter:status:2019-03-18T11:32:01.701571 [INFO   ] [user process] Data item updated https://catalog.terradue.com/better-wfp-00002/search?uid=6f5f525bd0123e9dae16ae775c8df7de (200)
2019-03-18T11:32:01.701571 [INFO   ] [user process] Data item updated https://catalog.terradue.com/better-wfp-00002/search?uid=6f5f525bd0123e9dae16ae775c8df7de (200)
reporter:status:2019-03-18T11:32:01.702406 [DEBUG  ] [user process] {"added":0,"updated":1,"deleted":0,"errors":0,"items":[{"id":"6f5f525bd0123e9dae16ae775c8df7de","type":"gtfeature","operation":"Update"}]}
2019-03-18T11:32:01.702406 [DEBUG  ] [user process] {"added":0,"updated":1,"deleted":0,"errors":0,"items":[{"id":"6f5f525bd0123e9dae16ae775c8df7de","type":"gtfeature","operation":"Update"}]}
reporter:status:2019-03-18T11:32:01.702899 [INFO   ] [user process] Metrics
2019-03-18T11:32:01.702899 [INFO   ] [user process] Metrics
reporter:status:2019-03-18T11:32:01.703264 [ERROR  ] [user process] WPS process FAILED
2019-03-18T11:32:01.703264 [ERROR  ] [user process] WPS process FAILED
------------- sleeping ------------------
------------- sleeping ------------------
------------- sleeping ------------------
------------- sleeping ------------------
------------- sleeping ------------------
  • Check the outcome
In [26]:
search_params = dict()
search_params['cat'] = '{out,err}'
search_params['count'] = '100'
search_params['update'] = '2019-03-18T00:00:00Z/2019-03-18T23:59:59Z'

end_point = "https://catalog.terradue.com/{0}/search".format(data_pipeline)

In [50]:
end_point

Out[50]:
'https://catalog.terradue.com/better-wfp-00002/search'
In [51]:
search_piped = ciop.search(end_point=end_point,
                           params=search_params,
                           output_fields='self,wkt,identifier',
                           model='GeoTime',
                         timeout=50000)
In [52]:
m = folium.Map(
    location=[45, 90],
    zoom_start=2,
    tiles='Stamen Terrain'
)

for row in search_piped:
    row['geometry'] = loads(row.pop('wkt'))
    gdf = GeoDataFrame([row])
    gdf.crs = {'init': 'epsg:4326', 'no_defs': True}
    geojson = folium.GeoJson(gdf)
    popup = folium.Popup('<i>%s</i>' % row['identifier'])
    popup.add_to(geojson)
    geojson.add_to(m)

m
Out[52]:
In [53]:
GeoDataFrame(search_piped).head()
Out[53]:
geometry identifier self
0 POLYGON ((30.051582 6.65375, 27.81901 7.110089... 8139306dcaa5cc0a51cfd0aaf1daedc6 https://catalog.terradue.com/better-wfp-00002/...
1 POLYGON ((30.348467 8.095181999999999, 28.1129... 959137fdcb57999737b387d0c7443608 https://catalog.terradue.com/better-wfp-00002/...