Test feeding the queueΒΆ

This Jupyter Notebook queries the catalog for COPERNICUS (LAI/FAPAR) products. The same set of products is then put in the data pipeline queue using the trigger WPS service.

  • First do the imports of the Python libraries required
In [1]:
import sys
import os

import owslib
from owslib.wps import monitorExecution
from owslib.wps import WebProcessingService


import lxml.etree as etree

import cioppy

from shapely.wkt import loads
import getpass

#import folium

from datetime import datetime, timedelta
import dateutil.parser

from nbconvert.preprocessors import ExecutePreprocessor, CellExecutionError
import nbformat as nbf
datapipeline_api_key = getpass.getpass('API key:')
In [2]:
from geopandas import GeoDataFrame
import ellip_triggers
import time
ciop = cioppy.Cioppy()
  • Read the data pipeline configuration information:
In [3]:
%store -r

nb_config = os.path.join('..', 'configuration.ipynb')

nb = nbf.read(nb_config, 4)

exec(nb['cells'][1]['source']) in globals(), locals()

app = dict([('artifact_id', app_artifact_id),
            ('version', app_version),
            ('repository', repository),
            ('community', community)])

app_process_id = '%s_%s_%s_%s' % (app['community'].replace('-', '_'),
                                  app['artifact_id'].replace('-', '_'),
                                  app['artifact_id'].replace('-', '_'),
                                  app['version'].replace('.', '_'))
app_process_id = app_process_id.replace('ewf_','')

trigger_queue = dict([('artifact_id', trigger_queue_artifact_id),
                      ('version', trigger_queue_version),
                      ('repository', repository),
                      ('folder', folder),
                      ('community', community)])

trigger_queue_process_id = '%s_%s_%s_%s' % (trigger_queue['community'].replace('-', '_'),
                                            trigger_queue['artifact_id'].replace('-', '_'),
                                            trigger_queue['artifact_id'].replace('-', '_'),
                                            trigger_queue['version'].replace('.', '_'))

print 'This notebook will create a queue to invoke the application %s with the trigger %s' % (app_process_id,
                                                                                              trigger_queue_process_id)
This notebook will create a queue to invoke the application ec_better_wfp_01_03_01_wfp_01_03_01_1_5 with the trigger ec_better_tg_wfp_01_03_01_queue_tg_wfp_01_03_01_queue_1_2
  • Set the data discovery parameters
In [41]:
series = 'https://catalog.terradue.com/cgls/description'

start_date = '2017-01-01T00:00:00.00Z'
stop_date = '2017-01-31T23:59:00Z'


geom = 'POLYGON ((11.50307555189977 -11.11416337069092, 41.03432555189977 -11.11416337069092, 41.03432555189977 -34.97636566938584, 11.50307555189977 -34.97636566938584, 11.50307555189977 -11.11416337069092))'
In [42]:
wkt = loads(geom).wkt
In [43]:
wkt
Out[43]:
'POLYGON ((11.50307555189977 -11.11416337069092, 41.03432555189977 -11.11416337069092, 41.03432555189977 -34.97636566938584, 11.50307555189977 -34.97636566938584, 11.50307555189977 -11.11416337069092))'
In [44]:
search_params = dict([('start', start_date),
                      ('stop', stop_date),
                      ('count', 366)])

ciop = cioppy.Cioppy()

search = ciop.search(end_point=series,
                     params=search_params,
                     output_fields='self,enclosure,identifier,wkt',
                     model='GeoTime')
In [45]:
for index, elem in enumerate(search):
    print(index, elem['self'])
(0, 'https://catalog.terradue.com/cgls/search?format=atom&uid=fapar_v2_1km_FAPAR-RT6_201701310000_GLOBE_PROBAV_V2.0.2')
(1, 'https://catalog.terradue.com/cgls/search?format=atom&uid=lai_v2_1km_LAI-RT6_201701310000_GLOBE_PROBAV_V2.0.2')
(2, 'https://catalog.terradue.com/cgls/search?format=atom&uid=fapar_v2_1km_FAPAR-RT6_201701200000_GLOBE_PROBAV_V2.0.2')
(3, 'https://catalog.terradue.com/cgls/search?format=atom&uid=lai_v2_1km_LAI-RT6_201701200000_GLOBE_PROBAV_V2.0.2')
(4, 'https://catalog.terradue.com/cgls/search?format=atom&uid=fapar_v2_1km_FAPAR-RT6_201701100000_GLOBE_PROBAV_V2.0.2')
(5, 'https://catalog.terradue.com/cgls/search?format=atom&uid=lai_v2_1km_LAI-RT6_201701100000_GLOBE_PROBAV_V2.0.2')
In [47]:
series
Out[47]:
'https://catalog.terradue.com/cgls/description'
In [48]:
trigger_queue_process_id
Out[48]:
'ec_better_tg_wfp_01_03_01_queue_tg_wfp_01_03_01_queue_1_2'
  • Establish the connection with the WPS server hosting the triggers:
In [49]:
wps_url = '%s/zoo-bin/zoo_loader.cgi' % trigger_deployer

wps = WebProcessingService(wps_url,
                           verbose=False,
                           skip_caps=True)

wps.getcapabilities()

process = wps.describeprocess(trigger_queue_process_id)

print process.title
Vegetation Indicators Aggregations Trigger - Queue
  • List the WPS trigger process data inputs:
In [50]:
for data_input in process.dataInputs:
    print data_input.identifier
series
data_pipeline
wps_url
process_id
api_key
start_time
end_time
N_1
N_3
N_6
N_9
N_12
N_15
N_18
N_27
N_36
regionOfInterest
nameOfRegion
quotation
_T2Username
In [51]:
print '%s/zoo/' % apps_deployer
https://ec-better-apps-deployer.terradue.com/zoo/
  • Create a Python dictionary with the inputs:
In [52]:
#update = '%s/%s' % (start_date, stop_date)
region_name = 'SouthernAfrica'
quotation = 'No'

inputs = [('series', series),
          ('data_pipeline', data_pipeline),
          ('wps_url', '%s/zoo/' % apps_deployer),
          ('process_id', app_process_id),
          ('start_time', start_date),
          ('end_time', stop_date),
          ('regionOfInterest', wkt),
          ('nameOfRegion', region_name),
          ('api_key', datapipeline_api_key),
          ('quotation', quotation),
          ('_T2Username', data_pipeline),
          ('N_1', 'No'),
          ('N_3', 'Yes'),
          ('N_6', 'No'),
          ('N_9', 'No'),
          ('N_12', 'No'),
          ('N_15', 'No'),
          ('N_18', 'No'),
          ('N_27', 'No'),
          ('N_36', 'No')]

  • Submit the Execute WPS and monitor the request:
In [53]:
start_date_queue = datetime.utcnow().isoformat()

execution = owslib.wps.WPSExecution(url=wps_url)

execution_request = execution.buildRequest(trigger_queue_process_id,
                                           inputs,
                                           output=[('result_osd', False)])

execution_response = execution.submitRequest(etree.tostring(execution_request))

execution.parseResponse(execution_response)

print(execution.statusLocation)

monitorExecution(execution)

stop_date_queue = datetime.utcnow().isoformat()
http://ec-better-triggers-deployer.terradue.com/zoo-bin/zoo_loader.cgi?request=Execute&service=WPS&version=1.0.0&Identifier=GetStatus&DataInputs=sid=962d8394-4107-11e9-85ec-0242ac110012&RawDataOutput=Result
  • Check the outcome of the processing request
In [54]:
if not execution.isSucceded():

    raise Exception('Processing failed')
In [55]:
update_queue = '%sZ/%sZ' % (start_date_queue, stop_date_queue)

print update_queue
2019-03-07T18:34:05.050573Z/2019-03-07T18:38:13.146852Z
  • Search the queue content
In [56]:
series_queue = 'https://catalog.terradue.com/%s/series/source-queue/description' % data_pipeline
In [57]:
series_queue
Out[57]:
'https://catalog.terradue.com/better-wfp-00006/series/source-queue/description'
In [58]:
search_params_queue = dict()

search_params_queue['count'] = '100'

search_queue = GeoDataFrame(ciop.search(end_point=series_queue,
                            params=search_params_queue,
                            output_fields='self,identifier,wkt,startdate',
                            model='GeoTime',
                            timeout=50000))

search_queue['wkt'] = search_queue['wkt'].apply(loads)
In [59]:
search_queue
Out[59]:
identifier self startdate wkt
0 38c7b079f8f773f716e80186a31e355c https://catalog.terradue.com//better-wfp-00006... 2017-01-10T00:00:00.0000000Z POLYGON ((180 -60, 180 80, -180 80, -180 -60, ...
1 9fae11455d1d92d371e52c8ec819d661 https://catalog.terradue.com//better-wfp-00006... 2016-01-10T00:00:00.0000000Z POLYGON ((180 -60, 180 80, -180 80, -180 -60, ...
2 bb842b2929e627c97c697a4eb281c6f2 https://catalog.terradue.com//better-wfp-00006... 2015-01-10T00:00:00.0000000Z POLYGON ((180 -60, 180 80, -180 80, -180 -60, ...
In [ ]:
import requests
In [ ]:
data_item = 'https://catalog.terradue.com/better-wfp-00006/search?uid=5804d1303d522787953a9aaf7eb5fef3'
#data_item = search_queue.iloc[-1]['self']
print data_item
root = etree.fromstring(requests.get(data_item).content)
print root
In [ ]:
ns = {'a':'http://www.w3.org/2005/Atom',
      'b':'http://www.opengis.net/owc/1.0',
      'c':'http://www.opengis.net/wps/1.0.0',
      'd':'http://www.opengis.net/ows/1.1'}
In [ ]:
wps_url = root.xpath('/a:feed/a:entry/b:offering/b:operation[@code="Execute"]',
                                 namespaces=ns)[0].attrib['href']

wps_url
In [ ]:
process_id = root.xpath('/a:feed/a:entry/b:offering/b:operation[@code="Execute"]/b:request/c:Execute/d:Identifier',
                                 namespaces=ns)[0].text

process_id
In [ ]:
identifiers = root.xpath('/a:feed/a:entry/b:offering/b:operation[@code="Execute"]/b:request/c:Execute/c:DataInputs/c:Input/d:Identifier',
                                 namespaces=ns)

values = root.xpath('/a:feed/a:entry/b:offering/b:operation[@code="Execute"]/b:request/c:Execute/c:DataInputs/c:Input/c:Data/c:LiteralData',
                                 namespaces=ns)

params = dict()

for index, elem in enumerate(identifiers):

    params[elem.text] = values[index].text


params
In [ ]:
wps = WebProcessingService(wps_url, verbose=False, skip_caps=True)

execution = owslib.wps.WPSExecution(url=wps.url)

execution_request = root.xpath('/a:feed/a:entry/b:offering/b:operation[@code="Execute"]/b:request/c:Execute',
                                 namespaces=ns)[0]


execution_response = execution.submitRequest(etree.tostring(execution_request))

execution.parseResponse(execution_response)

print execution.statusLocation
In [ ]:
monitorExecution(execution)
In [ ]:
if not execution.isSucceded():
    raise Exception('Single data item submission failed')
In [ ]:
for output in execution.processOutputs:
    print(output.identifier)

results_osd = execution.processOutputs[0].reference

print results_osd
In [ ]:
search_results = GeoDataFrame(ciop.search(end_point=results_osd,
                         params=[],
                         output_fields='title,enclosure',
                         model='GeoTime',
                            timeout=50000))

search_results
In [ ]:
search_results.iloc[0]['enclosure']
In [60]:
trigger_pipe = ellip_triggers.Trigger(data_pipeline,
                                      data_pipeline,
                                      datapipeline_api_key,
                                      '',
                                      app_process_id,
                                      'via',
                                      False)
reporter:status:2019-03-07T19:39:49.539976 [WARNING] [user process] No data pipeline configuration found (expected string or buffer), using default config
2019-03-07T19:39:49.539976 [WARNING] [user process] No data pipeline configuration found (expected string or buffer), using default config
reporter:status:2019-03-07T19:39:49.919381 [INFO   ] [user process] {
  "max_err_recovery_in": 2,
  "total_queue": 3,
  "max_err_recovery_retry": 2,
  "total_in": 0,
  "max_in": 10,
  "total_err": 69
}
2019-03-07T19:39:49.919381 [INFO   ] [user process] {
  "max_err_recovery_in": 2,
  "total_queue": 3,
  "max_err_recovery_retry": 2,
  "total_in": 0,
  "max_in": 10,
  "total_err": 69
}
In [61]:
search_params = dict()
search_params['cat'] = '{in,queue}'
search_params['count'] = '100'

end_point = "https://catalog.terradue.com/{0}/search".format(data_pipeline)

In [62]:
end_point
Out[62]:
'https://catalog.terradue.com/better-wfp-00006/search'
  • loop as many times as required until queue is empty
In [ ]:
exit = False

while not exit:
    try:
        queue_search = ciop.search(end_point=end_point,
                                   params=search_params,
                                   output_fields='self,title',
                                   model='GeoTime',
                                  timeout=50000)


        for index, elem in enumerate(queue_search):

            data_input_queue_ref = trigger_pipe.create_data_item_from_single_reference(elem['self'])

            trigger_pipe.pipe(data_input_queue_ref)

        print '------------- sleeping ------------------'
        print '------------- sleeping ------------------'
        print '------------- sleeping ------------------'
        print '------------- sleeping ------------------'
        print '------------- sleeping ------------------'

        time.sleep(60)

    except IndexError:

        exit = True
In [ ]:
search_params = dict()
search_params['cat'] = '{out,err}'
search_params['count'] = '100'

end_point = "https://catalog.terradue.com/{0}/search".format(data_pipeline)

In [ ]:
search_piped = GeoDataFrame(ciop.search(end_point=end_point,
                           params=search_params,
                           output_fields='self,wkt,identifier',
                           model='GeoTime',
                         timeout=50000))

search_piped['wkt'] = search_piped['wkt'].apply(loads)
In [ ]:
'''m = folium.Map(
    location=[45, 90],
    zoom_start=2,
    tiles='Stamen Terrain'
)

for index, row in search_queue.iterrows():

    folium.Marker([search_piped.iloc[index]['wkt'].y,
                   search_piped.iloc[index]['wkt'].x],
                   popup='<i>%s</i>' % search_piped.iloc[index]['identifier']).add_to(m)

m
'''
In [ ]:
search_piped.head()