Test feeding the queueΒΆ

This Jupyter Notebook queries the catalog for a Sentinel-2 L2A product and plots these on a map. The same set of products is then put in the data pipeline queue using the trigger WPS service.

  • First do the imports of the Python libraries required
In [19]:
import sys
import os

import owslib
from owslib.wps import monitorExecution
from owslib.wps import WebProcessingService


import lxml.etree as etree

import cioppy

from shapely.wkt import loads
import getpass

import folium

from datetime import datetime, timedelta
import dateutil.parser

from nbconvert.preprocessors import ExecutePreprocessor, CellExecutionError
import nbformat as nbf
  • Read the data pipeline configuration information:
In [20]:
%store -r

nb_config = os.path.join('..', 'configuration.ipynb')

nb = nbf.read(nb_config, 4)

exec(nb['cells'][1]['source']) in globals(), locals()

app = dict([('artifact_id', app_artifact_id),
            ('version', app_version),
            ('repository', repository),
            ('community', community)])

app_process_id = '%s_%s_%s_%s' % (app['community'].replace('-', '_'),
                                  app['artifact_id'].replace('-', '_'),
                                  app['artifact_id'].replace('-', '_'),
                                  app['version'].replace('.', '_'))

trigger_queue = dict([('artifact_id', trigger_queue_artifact_id),
                      ('version', trigger_queue_version),
                      ('repository', repository),
                      ('folder', folder),
                      ('community', community)])

trigger_queue_process_id = '%s_%s_%s_%s' % (trigger_queue['community'].replace('-', '_'),
                                            trigger_queue['artifact_id'].replace('-', '_'),
                                            trigger_queue['artifact_id'].replace('-', '_'),
                                            trigger_queue['version'].replace('.', '_'))

print 'This notebook will create a queue to invoke the application %s with the trigger %s' % (app_process_id,
                                                                                              trigger_queue_process_id)
This notebook will create a queue to invoke the application ec_better_ewf_wfp_01_02_01_ewf_wfp_01_02_01_0_2 with the trigger ec_better_tg_wfp_01_02_01_queue_tg_wfp_01_02_01_queue_0_3
  • Set the data discovery parameters
In [22]:
series = 'https://catalog.terradue.com/better-common-00001/series/results/search'

start_date = '2018-12-11T14:00:00Z'
stop_date = '2018-12-11T17:00:00Z'

s2_prd_type = 'S2MSI2Ap'

geom='MULTIPOLYGON (((6.4788 14.5973, 7.5577 14.5973, 7.5577 13.6328, 6.4788 13.6328, 6.4788 14.5973)), ((67.7116 37.9032, 68.791 37.9032, 68.791 36.9211, 67.7116 36.9211, 67.7116 37.9032)), ((-10.3668 15.3471, -9.3518 15.3471, -9.3518 14.3406, -10.3668 14.3406, -10.3668 15.3471)), ((67.62430000000001 36.7228, 68.116 36.7228, 68.116 35.6923, 67.62430000000001 35.6923, 67.62430000000001 36.7228)))'

In [23]:
wkt = loads(geom)[3].wkt
In [24]:
search_params = dict([('geom', wkt),
                      ('update','%s/%s' %(start_date,stop_date)),
                      ('pt', s2_prd_type),
                      ('count', 50)])


ciop = cioppy.Cioppy()

search = ciop.search(end_point=series,
                     params=search_params,
                     output_fields='self,enclosure,identifier,wkt',
                     model='GeoTime')
In [25]:
for index, elem in enumerate(search):
    print(index, elem['identifier'])
(0, '03ccbae032cc3f0b24eb086b1901ae38bd3cd272')
(1, '50d0e03621eb992274cc2d9125a451853f7a2eab')
(2, '5aadf8e2fc100e810ede515fae3f481211bd86df')
(3, '6cbef04c0299d7cbcc1440d33b8eca0750a3496c')
(4, '29cad51949319a71681d26286e298d6556148558')
(5, '7ae125a9e35787e49afe61824b56c0678e16dc77')
(6, '9af4be871442d3125378591a66be0597693c779c')
(7, 'd83a2b23775056c92afe5af69f9743e7137d4707')
(8, '623b80dd655338979cd2b6ff061a4e57606a80b0')
(9, 'da95f7db836074457f52102d6f316df3886a73f7')
(10, 'dcaccadfd9e714898b36763b82700ceecf2ab0b4')
(11, '3245bfb30e48029239b6939c0bde8db3b7079cf8')
(12, 'b2108152ead9fd10c4eb79c5e2289516e73977fb')
(13, 'da24aef8ab6d5f9520db1b17d3486cf3fcd81ae0')
(14, 'f198d9c7a8a73b071bb45b455ec9adab5e027f0c')
(15, 'e4ee7b2457057b181e0feed6c630bb0d7499df3b')
(16, 'f796f255035885a1708201c0c4aee150dc1e704c')
(17, 'f8139b24f45148df744d00cb541a87af7042d4f1')
(18, '0a55ebae5a481b7ad56268c2bf1a68d0d5707eec')
(19, '1b9847bc6ff6c3c6c813250204b0ed7375940301')
(20, '830c9586e71e3fc9ad181d44bd53a0b6864a6f68')
(21, '9ffa3d93e1c718f5d31c1bb96087a0258f6ff28f')
(22, '548e101489a4004ab0805ae1db5d509c1675f391')
(23, '7c9b366bcc347972af05bf564f9a30fd7ce367fc')
(24, '9acde607f1cd2018a624e4503d92995668d66fc2')
(25, 'c4004a36d31fc3aa32e638ea9cdcb3197a0eee4b')
(26, '24f87f03685f698f74bb1b6570980fa78e9041a1')
(27, '343358e3a23a139df6281ca0434ffca176e7162e')
(28, '63c50e361ff7047268242caebc2b82491e19bf3f')
(29, '934f7d27c505e1aaabb225f5916f2d20be3d2950')
(30, '5eaef53a475c9bafefabe823fe2fc9e5a3885c7e')
(31, '0da4127910555ac196531b7cb3446d0e1c4c3352')
(32, '3eb5546631759fcca76211962addf31250362bb6')
(33, '868132d9621718d24780d2595d05e9a15921e2bb')
(34, 'dbbabf173c32fc53c5d11c4c216dbd74a27b79c0')
(35, '3a4ac14be5b1591ea36d9b7023233a47a6ceec76')
(36, '4b40b75a60237e2976b967765faba3c3bfa193a1')
(37, '5edb4a936898c7e94eb59e70147cfb46a43c328c')
(38, '63a86dfae973815ae4182d6c8d8760f6e5f9568a')
(39, 'a33a12f48d58cd208afef44021785c78673586d8')
(40, '8e28aee459e1125dddf020e0fb4db03cfecdbeeb')
  • Plot the areas of interest and the Sentinel-2 data discovered:
In [27]:
lat = (loads(geom).bounds[3]+loads(geom).bounds[1])/2
lon = (loads(geom).bounds[2]+loads(geom).bounds[0])/2



zoom_start = 5

m = folium.Map(location=[lat, lon], zoom_start=zoom_start)

radius = 4
folium.CircleMarker(
    location=[lat, lon],
    radius=radius,
    color='#FF0000',
    stroke=False,
    fill=True,
    fill_opacity=0.6,
    opacity=1,
    popup='{} pixels'.format(radius),
    tooltip='I am in pixels',
).add_to(m)

locations = []

for index, entry in enumerate(loads(geom)):

    locations.append([t[::-1] for t in list(entry.exterior.coords)])

folium.PolyLine(
    locations=locations,
    color='#FF0000',
    weight=2,
    tooltip='',
).add_to(m)

discovery_locations = []

for index, elem in enumerate(search):

    discovery_locations.append([t[::-1] for t in list(loads(elem['wkt']).exterior.coords)])

folium.PolyLine(
    locations=discovery_locations,
    color='orange',
    weight=1,
    opacity=1,
    smooth_factor=0,
).add_to(m)

m.save(os.path.join('results', '%s_search.html' % 'discovery'))

m
Out[27]:
In [28]:
print trigger_deployer
https://ec-better-triggers-deployer.terradue.com
  • Establish the connection with the WPS server hosting the triggers:
In [29]:
wps_url = '%s/zoo-bin/zoo_loader.cgi' % trigger_deployer

wps = WebProcessingService(wps_url,
                           verbose=False,
                           skip_caps=True)

wps.getcapabilities()

process = wps.describeprocess(trigger_queue_process_id)

print process.title
Trigger for the WFP-01-02-01 Queue
In [30]:
print process.title
Trigger for the WFP-01-02-01 Queue
  • List the WPS trigger process data inputs:
In [31]:
for data_input in process.dataInputs:
    print data_input.identifier
series
data_pipeline
wps_url
process_id
api_key
tg_quotation
update
geom
pt
resolution
wkt
flag_expr
quotation
_T2Username
  • Create a Python dictionary with the inputs:
In [32]:
update = '%s/%s' % (start_date, stop_date)

product_type = 'S2MSI2Ap'
tg_quotation = 'No'
quotation = 'No'

inputs = [('series', series),
          ('data_pipeline', data_pipeline),
          ('wps_url', '%s/zoo-bin/zoo_loader.cgi' % apps_deployer),
          ('process_id', app_process_id),
          ('api_key', datapipeline_api_key),
          ('tg_quotation', tg_quotation),
          ('update', update),
          ('geom', wkt.replace(' ', '%20').replace(',', '%2C')),
          ('pt',s2_prd_type),
          ('resolution', '10'),
          ('wkt', wkt),
          ('flag_expr', 'saturated_l1a_B4 or scl_water'),
          ('quotation', quotation),
          ('_T2Username', data_pipeline)]

  • Submit the Execute WPS and monitor the request:
In [33]:
start_date_queue = datetime.utcnow().isoformat()

execution = owslib.wps.WPSExecution(url=wps_url)

execution_request = execution.buildRequest(trigger_queue_process_id,
                                           inputs,
                                           output=[('result_osd', False)])

execution_response = execution.submitRequest(etree.tostring(execution_request))

execution.parseResponse(execution_response)

print(execution.statusLocation)

monitorExecution(execution)

stop_date_queue = datetime.utcnow().isoformat()
http://ec-better-triggers-deployer.terradue.com/zoo-bin/zoo_loader.cgi?request=Execute&service=WPS&version=1.0.0&Identifier=GetStatus&DataInputs=sid=2296ece0-fd69-11e8-af0d-0242ac110012&RawDataOutput=Result
  • Check the outcome of the processing request
In [36]:
if not execution.isSucceded():

    raise Exception('Processing failed')
In [37]:
update_queue = '%sZ/%sZ' % (start_date_queue, stop_date_queue)

print update_queue
2018-12-11T17:21:02.712323Z/2018-12-11T17:26:14.494820Z
  • Search the queue content
In [18]:
series_queue = 'https://catalog.terradue.com/%s/series/source-queue/description' % data_pipeline
In [19]:
search_params_queue = dict([('update', update_queue),
                            ('count', 30)])

search_queue = ciop.search(end_point=series_queue,
                           params=search_params_queue,
                           output_fields='self,enclosure,identifier,wkt',
                           model='GeoTime')
In [20]:
locations_queue = []

for index, elem in enumerate(search_queue):

    locations_queue.append([t[::-1] for t in list(loads(elem['wkt']).exterior.coords)])
In [21]:
lat = (loads(geom).bounds[3]+loads(geom).bounds[1])/2
lon = (loads(geom).bounds[2]+loads(geom).bounds[0])/2


zoom_start = 4

m = folium.Map(location=[lat, lon], zoom_start=zoom_start)

radius = 4
folium.CircleMarker(
    location=[lat, lon],
    radius=radius,
    color='#FF0000',
    stroke=False,
    fill=True,
    fill_opacity=0.6,
    opacity=1,
    popup='{} pixels'.format(radius),
    tooltip='I am in pixels',
).add_to(m)


folium.PolyLine(
    locations=locations,
    color='#FF0000',
    weight=2,
    tooltip='',
).add_to(m)

folium.PolyLine(
    locations=locations_queue,
    color='green',
    weight=1,
    opacity=1,
    smooth_factor=0,
).add_to(m)

m.save(os.path.join('results', '%s_search.html' % 'discovery'))

m
Out[21]: