Test feeding the queueΒΆ

This Jupyter Notebook queries the catalog for a Sentinel-1 GRD product and plots these on a map. The same set of products is then put in the data pipeline queue using the trigger WPS service.

  • First do the imports of the Python libraries required
In [1]:
import sys
import os

import owslib
from owslib.wps import monitorExecution
from owslib.wps import WebProcessingService


import lxml.etree as etree

import cioppy

from shapely.wkt import loads
import getpass

import folium

from datetime import datetime, timedelta
import dateutil.parser
  • Set the information about the data pipeline and its associated Ellip API key:
In [2]:
data_pipeline = 'better-satcen-00003'
In [3]:
api_key = getpass.getpass('What is the Ellip platform API key for user "%s"?' % data_pipeline)
  • Set the data transformation application that will be triggered:
In [4]:
app = dict([('artifact_id', 'ewf-ethz-01-03-01'),
            ('version', '0.1'),
            ('repository', 'Gitlab Groups'),
            ('community', 'ec-better')])

data_transformation_id = ('%s_%s_%s_%s' % (app['community'],
                                           app['artifact_id'],
                                           app['artifact_id'],
                                           app['version'])).replace('-', '_').replace('.', '_')

print 'This notebook will create a queue to process data with the application %s' % data_transformation_id
This notebook will create a queue to process data with the application ec_better_wfp_01_01_01_wfp_01_01_01_1_0
  • Set the trigger information and mode:
In [7]:
trigger = dict([('artifact_id', 'tg-ethz-01-03-01-queue'),
                ('version', '0.1'),
                ('community', 'ec-better')])

trigger_id = ('%s_%s_%s_%s' % (trigger['community'],
                                     trigger['artifact_id'],
                                     trigger['artifact_id'],
                                      trigger['version'])).replace('-', '_').replace('.', '_')

print 'This notebook will create a queue to invoke the application %s with the trigger %s' % (data_transformation_id,
                                                                                              trigger_id)

This notebook will create a queue to invoke the application ec_better_wfp_01_01_01_wfp_01_01_01_1_0 with the trigger ec_better_tg_better_wfp_00001_queue_tg_better_wfp_00001_queue_0_3
  • Set the deployers to use for the data tranformation application and for the trigger:
In [8]:
apps_deployer = 'https://%s-apps-deployer.terradue.com' % app['community']
trigger_deployer = 'https://%s-triggers-deployer.terradue.com' % trigger['community']
  • Set the data discovery parameters
In [9]:
series = 'https://catalog.terradue.com/sentinel1/search'

start_date = '2017-09-01T00:00:00'
stop_date = '2017-09-10T23:59:59'

geom = 'MULTIPOLYGON (((26.832 9.5136, 28.6843 9.5136, 28.6843 7.8009, 26.832 7.8009, 26.832 9.5136)), ((32.0572 12.4549, 33.9087 12.4549, 33.9087 10.7344, 32.0572 10.7344, 32.0572 12.4549)), ((-5.5 17.26, -1.08 17.26, -1.08 13.5, -5.5 13.5, -5.5 17.26)), ((12.9415 13.7579, 14.6731 13.7579, 14.6731 12.0093, 12.9415 12.0093, 12.9415 13.7579)))'
In [10]:
search_params = dict([('geom', geom),
                      ('start', start_date),
                      ('stop', stop_date),
                      ('pt', 'GRD'),
                      ('count', 30)])

ciop = cioppy.Cioppy()

search = ciop.search(end_point=series,
                     params=search_params,
                     output_fields='self,enclosure,identifier,wkt',
                     model='GeoTime')
  • Plot the areas of interest and the Sentinel-1 data discovered:
In [11]:
lat = (loads(geom).bounds[3]+loads(geom).bounds[1])/2
lon = (loads(geom).bounds[2]+loads(geom).bounds[0])/2

zoom_start = 4

m = folium.Map(location=[lat, lon], zoom_start=zoom_start)

radius = 4
folium.CircleMarker(
    location=[lat, lon],
    radius=radius,
    color='#FF0000',
    stroke=False,
    fill=True,
    fill_opacity=0.6,
    opacity=1,
    popup='{} pixels'.format(radius),
    tooltip='I am in pixels',
).add_to(m)

locations = []

for index, entry in enumerate(loads(geom)):

    locations.append([t[::-1] for t in list(entry.exterior.coords)])

folium.PolyLine(
    locations=locations,
    color='#FF0000',
    weight=2,
    tooltip='',
).add_to(m)

discovery_locations = []

for index, elem in enumerate(search):

    discovery_locations.append([t[::-1] for t in list(loads(elem['wkt']).exterior.coords)])

folium.PolyLine(
    locations=discovery_locations,
    color='orange',
    weight=1,
    opacity=1,
    smooth_factor=0,
).add_to(m)

m.save(os.path.join('results', '%s_search.html' % 'discovery'))

m
Out[11]:
  • Establish the connection with the WPS server hosting the triggers:
In [18]:
wps_url = '%s/zoo-bin/zoo_loader.cgi' % trigger_deployer

wps = WebProcessingService(wps_url,
                           verbose=False,
                           skip_caps=True)

wps.getcapabilities()

process = wps.describeprocess(trigger_id)



print process.title
WFP-01-01-01 Trigger - Queue
  • List the WPS trigger process data inputs:
In [19]:
for data_input in process.dataInputs:
    print data_input.identifier
series
data_pipeline
wps_url
process_id
update
geom
product_type
tg_quotation
api_key
filterSizeX
filterSizeY
polarisation
wkt
quotation
_T2Username
  • Create a Python dictionary with the inputs:
In [37]:
update = '%s/%s' % (start_date, stop_date)

print 'update = "%s/%s"' % (start_date, stop_date)
update = "2017-09-01T00:00:00/2017-09-10T23:59:59"
In [25]:
update = '%s/%s' % (start_date, stop_date)

product_type = 'GRD'
tg_quotation = 'No'

filterSizeX = '5'
filterSizeY = '5'
polarisation = 'VV'
quotation = "False"

inputs = [('series', series),
          ('data_pipeline', data_pipeline),
          ('wps_url', '%s/zoo-bin/zoo_loader.cgi' % apps_deployer),
          ('process_id', data_transformation_id),
          ('update', update),
          ('geom', geom.replace(' ', '%20').replace(',', '%2C')),
          ('product_type', product_type),
          ('tg_quotation', tg_quotation),
          ('api_key', api_key),
          ('filterSizeX', filterSizeX),
          ('filterSizeY', filterSizeY),
          ('polarisation', polarisation),
          ('wkt', loads(geom)[0].wkt),
          ('quotation', quotation),
          ('_T2Username', data_pipeline)]
  • Submit the Execute WPS and monitor the request:
In [ ]:
start_date_queue = datetime.utcnow().isoformat()

execution = owslib.wps.WPSExecution(url=wps_url)

execution_request = execution.buildRequest(trigger_id,
                                           inputs,
                                           output=[('result_osd', False)])

execution_response = execution.submitRequest(etree.tostring(execution_request))

execution.parseResponse(execution_response)

print(execution.statusLocation)

monitorExecution(execution)

stop_date_queue = datetime.utcnow().isoformat()
http://ec-better-triggers-deployer.terradue.com/zoo-bin/zoo_loader.cgi?request=Execute&service=WPS&version=1.0.0&Identifier=GetStatus&DataInputs=sid=3a637dc2-8b57-11e8-b6c5-0242ac11000f&RawDataOutput=Result
  • Check the outcome of the processing request
In [27]:
execution.isSucceded()
Out[27]:
True
In [35]:
print '%sZ/%sZ' % (start_date_queue, stop_date_queue)
2018-07-19T13:25:39.167851Z/2018-07-19T13:35:27.122272Z
  • Search the queue content
In [28]:
series_queue = 'https://catalog.terradue.com/%s/series/source-queue/description' % data_pipeline
In [32]:
search_params_queue = dict([('update', '%sZ/%sZ' % (start_date_queue, stop_date_queue)),
                            ('count', 30)])

search_queue = ciop.search(end_point=series_queue,
                           params=search_params_queue,
                           output_fields='self,enclosure,identifier,wkt',
                           model='GeoTime')
2018-07-19T13:35:27.122272
In [33]:
locations_queue = []

for index, elem in enumerate(search_queue):

    locations_queue.append([t[::-1] for t in list(loads(elem['wkt']).exterior.coords)])
In [34]:
lat = (loads(geom).bounds[3]+loads(geom).bounds[1])/2
lon = (loads(geom).bounds[2]+loads(geom).bounds[0])/2


zoom_start = 4

m = folium.Map(location=[lat, lon], zoom_start=zoom_start)

radius = 4
folium.CircleMarker(
    location=[lat, lon],
    radius=radius,
    color='#FF0000',
    stroke=False,
    fill=True,
    fill_opacity=0.6,
    opacity=1,
    popup='{} pixels'.format(radius),
    tooltip='I am in pixels',
).add_to(m)


folium.PolyLine(
    locations=locations,
    color='#FF0000',
    weight=2,
    tooltip='',
).add_to(m)

folium.PolyLine(
    locations=locations_queue,
    color='green',
    weight=1,
    opacity=1,
    smooth_factor=0,
).add_to(m)

m.save(os.path.join('results', '%s_search.html' % 'discovery'))

m
Out[34]: