Test feeding the queueΒΆ

This Jupyter Notebook queries the catalog for Landsat 8 L1TP products and plots these on a map. The same set of products is then put in the data pipeline queue using the trigger WPS service.

  • First do the imports of the Python libraries required
In [1]:
import sys
import os

import owslib
from owslib.wps import monitorExecution
from owslib.wps import WebProcessingService


import lxml.etree as etree

import cioppy

from shapely.wkt import loads
import getpass

import folium

from datetime import datetime, timedelta
import dateutil.parser

from nbconvert.preprocessors import ExecutePreprocessor, CellExecutionError
import nbformat as nbf
  • Read the data pipeline configuration information:
In [2]:
%store -r

nb_config = os.path.join('..', 'configuration.ipynb')

nb = nbf.read(nb_config, 4)

exec(nb['cells'][0]['source']) in globals(), locals()

app = dict([('artifact_id', app_artifact_id),
            ('version', app_version),
            ('repository', repository),
            ('community', community)])

app_process_id = '%s_%s_%s_%s' % (app['community'].replace('-', '_'),
                                  app['artifact_id'].replace('-', '_'),
                                  app['artifact_id'].replace('-', '_'),
                                  app['version'].replace('.', '_'))

trigger_queue = dict([('artifact_id', trigger_queue_artifact_id),
                      ('version', trigger_queue_version),
                      ('repository', repository),
                      ('folder', folder),
                      ('community', community)])

trigger_queue_process_id = '%s_%s_%s_%s' % (trigger_queue['community'].replace('-', '_'),
                                            trigger_queue['artifact_id'].replace('-', '_'),
                                            trigger_queue['artifact_id'].replace('-', '_'),
                                            trigger_queue['version'].replace('.', '_'))

print 'This notebook will create a queue to invoke the application %s with the trigger %s' % (app_process_id,
                                                                                              trigger_queue_process_id)
This notebook will create a queue to invoke the application ec_better_ewf_wfp_01_02_02_ewf_wfp_01_02_02_0_5 with the trigger ec_better_tg_wfp_01_02_02_queue_tg_wfp_01_02_02_queue_0_3
  • Set the data discovery parameters
In [11]:
series = 'https://catalog.terradue.com/landsat8/search'

update = '2018-10-05T00:00:00Z/2018-10-05T23:59:59Z'

geom = 'POLYGON ((6.4788 14.5973, 7.5577 14.5973, 7.5577 13.6328, 6.4788 13.6328, 6.4788 14.5973))'
#geom = 'MULTIPOLYGON (((6.4788 14.5973, 7.5577 14.5973, 7.5577 13.6328, 6.4788 13.6328, 6.4788 14.5973)), ((67.7116 37.9032, 68.791 37.9032, 68.791 36.9211, 67.7116 36.9211, 67.7116 37.9032)), ((-10.3668 15.3471, -9.351800000000001 15.3471, -9.351800000000001 14.3406, -10.3668 14.3406, -10.3668 15.3471)), ((67.62430000000001 36.7228, 68.116 36.7228, 68.116 35.6923, 67.62430000000001 35.6923, 67.62430000000001 36.7228)))'
In [12]:
search_params = dict([('geom', geom),
                      ('update', update),
                      ('pt', 'L1TP')])

ciop = cioppy.Cioppy()

search = ciop.search(end_point=series,
                     params=search_params,
                     output_fields='self,enclosure,identifier,wkt,updated',
                     model='GeoTime')
for item in search:
    print item['identifier'],item['updated']
LC08_L1TP_189051_20180929_20180929_01_RT 2018-10-05T16:12:35.8757120+00:00
LC08_L1TP_189050_20180929_20180929_01_RT 2018-10-05T16:12:35.8757090+00:00
  • Plot the areas of interest and the Sentinel-1 data discovered:
In [13]:
lat = (loads(geom).bounds[3]+loads(geom).bounds[1])/2
lon = (loads(geom).bounds[2]+loads(geom).bounds[0])/2

zoom_start = 4

m = folium.Map(location=[lat, lon], zoom_start=zoom_start)

radius = 4
folium.CircleMarker(
    location=[lat, lon],
    radius=radius,
    color='#FF0000',
    stroke=False,
    fill=True,
    fill_opacity=0.6,
    opacity=1,
    popup='{} pixels'.format(radius),
    tooltip='I am in pixels',
).add_to(m)

locations = []

geoms = loads(geom)
print geoms.geometryType()

if geoms.geometryType() == 'Polygon':

    locations.append([t[::-1] for t in list(geoms.exterior.coords)])

elif geoms.geometryType() == 'MultiPolygon':

    for index, entry in enumerate(geoms):

        locations.append([t[::-1] for t in list(entry.exterior.coords)])


folium.PolyLine(
    locations=locations,
    color='#FF0000',
    weight=2,
    tooltip='',
).add_to(m)

discovery_locations = []

for index, elem in enumerate(search):

    discovery_locations.append([t[::-1] for t in list(loads(elem['wkt']).exterior.coords)])

folium.PolyLine(
    locations=discovery_locations,
    color='orange',
    weight=1,
    opacity=1,
    smooth_factor=0,
).add_to(m)

m.save(os.path.join('results', '%s_search.html' % 'discovery'))

m
Polygon
Out[13]:
  • Establish the connection with the WPS server hosting the triggers:
In [14]:
wps_url = '%s/zoo-bin/zoo_loader.cgi' % trigger_deployer

wps = WebProcessingService(wps_url,
                           verbose=False,
                           skip_caps=True)

wps.getcapabilities()

process = wps.describeprocess(trigger_queue_process_id)

print process.title
Trigger for the WFP-01-02-02 Queue
  • List the WPS trigger process data inputs:
In [15]:
for data_input in process.dataInputs:
    print data_input.identifier
series
data_pipeline
wps_url
process_id
api_key
tg_quotation
update
geom
product_type
quotation
_T2Username
  • Create a Python dictionary with the inputs:
In [16]:

print "update = %s" % update
update = 2018-10-05T00:00:00Z/2018-10-05T23:59:59Z
In [17]:
product_type = 'L1TP'
tg_quotation = 'No'
quotation = 'False'

inputs = [('series', series),
          ('data_pipeline', data_pipeline),
          ('wps_url', '%s/zoo-bin/zoo_loader.cgi' % apps_deployer),
          ('process_id', app_process_id),
          ('update', update),
          ('geom', geom.replace(' ', '%20').replace(',', '%2C')),
          ('product_type', product_type),
          ('api_key', datapipeline_api_key),
          ('tg_quotation', tg_quotation),
          ('quotation', quotation),
          ('_T2Username', data_pipeline)]

for item in inputs:
    print '%s : %s' %(item[0],item[1])
series : https://catalog.terradue.com/landsat8/search
data_pipeline : better-wfp-00004
wps_url : https://ec-better-apps-deployer.terradue.com/zoo-bin/zoo_loader.cgi
process_id : ec_better_ewf_wfp_01_02_02_ewf_wfp_01_02_02_0_5
update : 2018-10-05T00:00:00Z/2018-10-05T23:59:59Z
geom : POLYGON%20((6.4788%2014.5973%2C%207.5577%2014.5973%2C%207.5577%2013.6328%2C%206.4788%2013.6328%2C%206.4788%2014.5973))
product_type : L1TP
api_key : AKCp5bBXii3wmgBKbG63zzajjDq948bkB926xxDZjt7Jxpxd1YEosvqoRhaxVkT7F5nh6p7oj
tg_quotation : No
quotation : False
_T2Username : better-wfp-00004
  • Submit the Execute WPS and monitor the request:
In [18]:
start_date_queue = datetime.utcnow().isoformat()

execution = owslib.wps.WPSExecution(url=wps_url)

execution_request = execution.buildRequest(trigger_queue_process_id,
                                           inputs,
                                           output=[('result_osd', False)])

execution_response = execution.submitRequest(etree.tostring(execution_request))

execution.parseResponse(execution_response)

print(execution.statusLocation)

monitorExecution(execution)

stop_date_queue = datetime.utcnow().isoformat()
http://ec-better-triggers-deployer.terradue.com/zoo-bin/zoo_loader.cgi?request=Execute&service=WPS&version=1.0.0&Identifier=GetStatus&DataInputs=sid=a8ce8c32-d375-11e8-a117-0242ac11000d&RawDataOutput=Result
  • Check the outcome of the processing request
In [19]:
if not execution.isSucceded():

    raise Exception('Processing failed')
In [20]:
update_queue = '%sZ/%sZ' % (start_date_queue, stop_date_queue)

print update_queue
2018-10-19T08:04:53.027292Z/2018-10-19T08:11:46.433251Z
  • Search the queue content
In [21]:
series_queue = 'https://catalog.terradue.com/%s/series/source-queue/description' % data_pipeline
In [22]:
search_params_queue = dict([('update', update_queue),
                            ('count', 30)])

search_queue = ciop.search(end_point=series_queue,
                           params=search_params_queue,
                           output_fields='self,enclosure,identifier,wkt',
                           model='GeoTime')
In [23]:
locations_queue = []

for index, elem in enumerate(search_queue):

    locations_queue.append([t[::-1] for t in list(loads(elem['wkt']).exterior.coords)])
In [24]:
lat = (loads(geom).bounds[3]+loads(geom).bounds[1])/2
lon = (loads(geom).bounds[2]+loads(geom).bounds[0])/2


zoom_start = 4

m = folium.Map(location=[lat, lon], zoom_start=zoom_start)

radius = 4
folium.CircleMarker(
    location=[lat, lon],
    radius=radius,
    color='#FF0000',
    stroke=False,
    fill=True,
    fill_opacity=0.6,
    opacity=1,
    popup='{} pixels'.format(radius),
    tooltip='I am in pixels',
).add_to(m)


folium.PolyLine(
    locations=locations,
    color='#FF0000',
    weight=2,
    tooltip='',
).add_to(m)

folium.PolyLine(
    locations=locations_queue,
    color='green',
    weight=1,
    opacity=1,
    smooth_factor=0,
).add_to(m)

m.save(os.path.join('results', '%s_search.html' % 'discovery'))

m
Out[24]: