Test feeding the queueΒΆ

This Jupyter Notebook queries the catalog for CHIRPS products and plots these on a map. The same set of products is then put in the data pipeline queue using the trigger WPS service.

  • First do the imports of the Python libraries required
In [1]:
import sys
import os

import owslib
from owslib.wps import monitorExecution
from owslib.wps import WebProcessingService


import lxml.etree as etree

import cioppy

from shapely.wkt import loads
import getpass

#import folium

from datetime import datetime, timedelta
import dateutil.parser

from nbconvert.preprocessors import ExecutePreprocessor, CellExecutionError
import nbformat as nbf
datapipeline_api_key = getpass.getpass('API key:')
In [2]:
from geopandas import GeoDataFrame
import ellip_triggers
import time
ciop = cioppy.Cioppy()
  • Read the data pipeline configuration information:
In [3]:
%store -r

nb_config = os.path.join('..', 'configuration.ipynb')

nb = nbf.read(nb_config, 4)

exec(nb['cells'][1]['source']) in globals(), locals()

app = dict([('artifact_id', app_artifact_id),
            ('version', app_version),
            ('repository', repository),
            ('community', community)])

app_process_id = '%s_%s_%s_%s' % (app['community'].replace('-', '_'),
                                  app['artifact_id'].replace('-', '_'),
                                  app['artifact_id'].replace('-', '_'),
                                  app['version'].replace('.', '_'))
app_process_id = app_process_id.replace('ewf_','')

trigger_queue = dict([('artifact_id', trigger_queue_artifact_id),
                      ('version', trigger_queue_version),
                      ('repository', repository),
                      ('folder', folder),
                      ('community', community)])

trigger_queue_process_id = '%s_%s_%s_%s' % (trigger_queue['community'].replace('-', '_'),
                                            trigger_queue['artifact_id'].replace('-', '_'),
                                            trigger_queue['artifact_id'].replace('-', '_'),
                                            trigger_queue['version'].replace('.', '_'))

print 'This notebook will create a queue to invoke the application %s with the trigger %s' % (app_process_id,
                                                                                              trigger_queue_process_id)
This notebook will create a queue to invoke the application ec_better_wfp_01_03_02_wfp_01_03_02_1_17 with the trigger ec_better_tg_wfp_01_03_02_queue_tg_wfp_01_03_02_queue_0_7
  • Set the data discovery parameters
In [4]:
series = 'https://catalog.terradue.com/chirps/description'

start_date = '2017-01-01T00:00:00.00Z'
stop_date = '2017-01-31T23:59:00Z'


geom = 'POLYGON ((11.50307555189977 -11.11416337069092, 41.03432555189977 -11.11416337069092, 41.03432555189977 -34.97636566938584, 11.50307555189977 -34.97636566938584, 11.50307555189977 -11.11416337069092))'
In [5]:
wkt = loads(geom).wkt
In [6]:
wkt
Out[6]:
'POLYGON ((11.50307555189977 -11.11416337069092, 41.03432555189977 -11.11416337069092, 41.03432555189977 -34.97636566938584, 11.50307555189977 -34.97636566938584, 11.50307555189977 -11.11416337069092))'
In [7]:
search_params = dict([('start', start_date),
                      ('stop', stop_date),
                      ('count', 366)])

ciop = cioppy.Cioppy()

search = ciop.search(end_point=series,
                     params=search_params,
                     output_fields='self,enclosure,identifier,wkt',
                     model='GeoTime')
In [8]:
for index, elem in enumerate(search):
    print(index, elem['self'])
(0, 'https://catalog.terradue.com/chirps/search?format=atom&uid=chirps-v2.0.2017.01.31')
(1, 'https://catalog.terradue.com/chirps/search?format=atom&uid=chirps-v2.0.2017.01.30')
(2, 'https://catalog.terradue.com/chirps/search?format=atom&uid=chirps-v2.0.2017.01.29')
(3, 'https://catalog.terradue.com/chirps/search?format=atom&uid=chirps-v2.0.2017.01.28')
(4, 'https://catalog.terradue.com/chirps/search?format=atom&uid=chirps-v2.0.2017.01.27')
(5, 'https://catalog.terradue.com/chirps/search?format=atom&uid=chirps-v2.0.2017.01.26')
(6, 'https://catalog.terradue.com/chirps/search?format=atom&uid=chirps-v2.0.2017.01.25')
(7, 'https://catalog.terradue.com/chirps/search?format=atom&uid=chirps-v2.0.2017.01.24')
(8, 'https://catalog.terradue.com/chirps/search?format=atom&uid=chirps-v2.0.2017.01.23')
(9, 'https://catalog.terradue.com/chirps/search?format=atom&uid=chirps-v2.0.2017.01.22')
(10, 'https://catalog.terradue.com/chirps/search?format=atom&uid=chirps-v2.0.2017.01.21')
(11, 'https://catalog.terradue.com/chirps/search?format=atom&uid=chirps-v2.0.2017.01.20')
(12, 'https://catalog.terradue.com/chirps/search?format=atom&uid=chirps-v2.0.2017.01.19')
(13, 'https://catalog.terradue.com/chirps/search?format=atom&uid=chirps-v2.0.2017.01.18')
(14, 'https://catalog.terradue.com/chirps/search?format=atom&uid=chirps-v2.0.2017.01.17')
(15, 'https://catalog.terradue.com/chirps/search?format=atom&uid=chirps-v2.0.2017.01.16')
(16, 'https://catalog.terradue.com/chirps/search?format=atom&uid=chirps-v2.0.2017.01.15')
(17, 'https://catalog.terradue.com/chirps/search?format=atom&uid=chirps-v2.0.2017.01.14')
(18, 'https://catalog.terradue.com/chirps/search?format=atom&uid=chirps-v2.0.2017.01.13')
(19, 'https://catalog.terradue.com/chirps/search?format=atom&uid=chirps-v2.0.2017.01.12')
(20, 'https://catalog.terradue.com/chirps/search?format=atom&uid=chirps-v2.0.2017.01.11')
(21, 'https://catalog.terradue.com/chirps/search?format=atom&uid=chirps-v2.0.2017.01.10')
(22, 'https://catalog.terradue.com/chirps/search?format=atom&uid=chirps-v2.0.2017.01.09')
(23, 'https://catalog.terradue.com/chirps/search?format=atom&uid=chirps-v2.0.2017.01.08')
(24, 'https://catalog.terradue.com/chirps/search?format=atom&uid=chirps-v2.0.2017.01.07')
(25, 'https://catalog.terradue.com/chirps/search?format=atom&uid=chirps-v2.0.2017.01.06')
(26, 'https://catalog.terradue.com/chirps/search?format=atom&uid=chirps-v2.0.2017.01.05')
(27, 'https://catalog.terradue.com/chirps/search?format=atom&uid=chirps-v2.0.2017.01.04')
(28, 'https://catalog.terradue.com/chirps/search?format=atom&uid=chirps-v2.0.2017.01.03')
(29, 'https://catalog.terradue.com/chirps/search?format=atom&uid=chirps-v2.0.2017.01.02')
(30, 'https://catalog.terradue.com/chirps/search?format=atom&uid=chirps-v2.0.2017.01.01')
In [9]:
series
Out[9]:
'https://catalog.terradue.com/chirps/description'
In [10]:
trigger_queue_process_id
Out[10]:
'ec_better_tg_wfp_01_03_02_queue_tg_wfp_01_03_02_queue_0_7'
  • Establish the connection with the WPS server hosting the triggers:
In [11]:
wps_url = '%s/zoo-bin/zoo_loader.cgi' % trigger_deployer

wps = WebProcessingService(wps_url,
                           verbose=False,
                           skip_caps=True)

wps.getcapabilities()

process = wps.describeprocess(trigger_queue_process_id)

print process.title
RFE Aggregations Trigger - Queue
  • List the WPS trigger process data inputs:
In [12]:
for data_input in process.dataInputs:
    print data_input.identifier
series
data_pipeline
wps_url
process_id
api_key
start_time
end_time
N_10
N_30
N_60
N_90
N_120
N_150
N_180
N_270
N_365
regionOfInterest
nameOfRegion
quotation
_T2Username
In [13]:
print '%s/zoo/' % apps_deployer
https://ec-better-apps-deployer.terradue.com/zoo/
  • Create a Python dictionary with the inputs:
In [14]:
#update = '%s/%s' % (start_date, stop_date)
region_name = 'SouthernAfrica'
quotation = 'No'

inputs = [('series', series),
          ('data_pipeline', data_pipeline),
          ('wps_url', '%s/zoo/' % apps_deployer),
          ('process_id', app_process_id),
          ('start_time', start_date),
          ('end_time', stop_date),
          ('regionOfInterest', wkt),
          ('nameOfRegion', region_name),
          ('api_key', datapipeline_api_key),
          ('quotation', quotation),
          ('_T2Username', data_pipeline),
          ('N_10', 'No'),
          ('N_30', 'Yes'),
          ('N_60', 'No'),
          ('N_90', 'No'),
          ('N_120', 'No'),
          ('N_150', 'No'),
          ('N_180', 'No'),
          ('N_270', 'No'),
          ('N_365', 'No')]

  • Submit the Execute WPS and monitor the request:
In [ ]:
start_date_queue = datetime.utcnow().isoformat()

execution = owslib.wps.WPSExecution(url=wps_url)

execution_request = execution.buildRequest(trigger_queue_process_id,
                                           inputs,
                                           output=[('result_osd', False)])

execution_response = execution.submitRequest(etree.tostring(execution_request))

execution.parseResponse(execution_response)

print(execution.statusLocation)

monitorExecution(execution)

stop_date_queue = datetime.utcnow().isoformat()
  • Check the outcome of the processing request
In [ ]:
if not execution.isSucceded():

    raise Exception('Processing failed')
In [ ]:
update_queue = '%sZ/%sZ' % (start_date_queue, stop_date_queue)

print update_queue
  • Search the queue content
In [ ]:
series_queue = 'https://catalog.terradue.com/%s/series/source-queue/description' % data_pipeline
In [ ]:
series_queue
In [ ]:
search_params_queue = dict()

search_params_queue['count'] = '100'

search_queue = GeoDataFrame(ciop.search(end_point=series_queue,
                            params=search_params_queue,
                            output_fields='self,identifier,wkt,startdate',
                            model='GeoTime',
                            timeout=50000))

search_queue['wkt'] = search_queue['wkt'].apply(loads)
In [ ]:
search_queue
In [ ]:
import requests
In [ ]:
data_item = search_queue.iloc[-1]['self']
print data_item
root = etree.fromstring(requests.get(data_item).content)
print root
In [ ]:
ns = {'a':'http://www.w3.org/2005/Atom',
      'b':'http://www.opengis.net/owc/1.0',
      'c':'http://www.opengis.net/wps/1.0.0',
      'd':'http://www.opengis.net/ows/1.1'}
In [ ]:
wps_url = root.xpath('/a:feed/a:entry/b:offering/b:operation[@code="Execute"]',
                                 namespaces=ns)[0].attrib['href']

wps_url
In [ ]:
process_id = root.xpath('/a:feed/a:entry/b:offering/b:operation[@code="Execute"]/b:request/c:Execute/d:Identifier',
                                 namespaces=ns)[0].text

process_id
In [ ]:
identifiers = root.xpath('/a:feed/a:entry/b:offering/b:operation[@code="Execute"]/b:request/c:Execute/c:DataInputs/c:Input/d:Identifier',
                                 namespaces=ns)

values = root.xpath('/a:feed/a:entry/b:offering/b:operation[@code="Execute"]/b:request/c:Execute/c:DataInputs/c:Input/c:Data/c:LiteralData',
                                 namespaces=ns)

params = dict()

for index, elem in enumerate(identifiers):

    params[elem.text] = values[index].text


params
In [ ]:
wps = WebProcessingService(wps_url, verbose=False, skip_caps=True)

execution = owslib.wps.WPSExecution(url=wps.url)

execution_request = root.xpath('/a:feed/a:entry/b:offering/b:operation[@code="Execute"]/b:request/c:Execute',
                                 namespaces=ns)[0]


execution_response = execution.submitRequest(etree.tostring(execution_request))

execution.parseResponse(execution_response)

print execution.statusLocation
In [ ]:
monitorExecution(execution)
In [ ]:
if not execution.isSucceded():
    raise Exception('Single data item submission failed')
In [ ]:
for output in execution.processOutputs:
    print(output.identifier)

results_osd = execution.processOutputs[0].reference

print results_osd
In [ ]:
search_results = GeoDataFrame(ciop.search(end_point=results_osd,
                         params=[],
                         output_fields='title,enclosure',
                         model='GeoTime',
                            timeout=50000))

search_results
In [ ]:
search_results.iloc[0]['enclosure']
In [ ]:
trigger_pipe = ellip_triggers.Trigger(data_pipeline,
                                      data_pipeline,
                                      datapipeline_api_key,
                                      '',
                                      app_process_id,
                                      'via',
                                      False)
In [ ]:
search_params = dict()
search_params['cat'] = '{in,queue}'
search_params['count'] = '100'

end_point = "https://catalog.terradue.com/{0}/search".format(data_pipeline)

In [ ]:
end_point
  • loop as many times as required until queue is empty
In [ ]:
exit = False

while not exit:
    try:
        queue_search = ciop.search(end_point=end_point,
                                   params=search_params,
                                   output_fields='self,title',
                                   model='GeoTime',
                                  timeout=50000)


        for index, elem in enumerate(queue_search):

            data_input_queue_ref = trigger_pipe.create_data_item_from_single_reference(elem['self'])

            trigger_pipe.pipe(data_input_queue_ref)

        print '------------- sleeping ------------------'
        print '------------- sleeping ------------------'
        print '------------- sleeping ------------------'
        print '------------- sleeping ------------------'
        print '------------- sleeping ------------------'

        time.sleep(60)

    except IndexError:

        exit = True
In [ ]:
search_params = dict()
search_params['cat'] = '{out,err}'
search_params['count'] = '100'

end_point = "https://catalog.terradue.com/{0}/search".format(data_pipeline)

In [ ]:
search_piped = GeoDataFrame(ciop.search(end_point=end_point,
                           params=search_params,
                           output_fields='self,wkt,identifier',
                           model='GeoTime',
                         timeout=50000))

search_piped['wkt'] = search_piped['wkt'].apply(loads)
In [ ]:
search_piped.head()