Test feeding the queue (on Production Centre)ΒΆ

This Jupyter Notebook queries the catalog for a Sentinel2 L2A product or a BETTER-common L2A-p product. The same set of products is then put in the data pipeline queue using the trigger WPS service.

  • First do the imports of the Python libraries required
In [1]:
import sys
import os

import owslib
from owslib.wps import monitorExecution
from owslib.wps import WebProcessingService


import lxml.etree as etree

import cioppy

from shapely.wkt import loads
import getpass

import folium

from datetime import datetime, timedelta
import dateutil.parser

from nbconvert.preprocessors import ExecutePreprocessor, CellExecutionError
import nbformat as nbf
  • Read the data pipeline configuration information:
In [2]:
%store -r

nb_config = os.path.join('..', 'configuration.ipynb')

nb = nbf.read(nb_config, 4)

exec(nb['cells'][1]['source']) in globals(), locals()

app = dict([('artifact_id', app_artifact_id),
            ('version', app_version),
            ('repository', repository),
            ('community', community)])

app_process_id = '%s_%s_%s_%s' % (app['community'].replace('-', '_'),
                                  app['artifact_id'].replace('-', '_'),
                                  app['artifact_id'].replace('-', '_'),
                                  app['version'].replace('.', '_'))

trigger_queue = dict([('artifact_id', trigger_queue_artifact_id),
                      ('version', trigger_queue_version),
                      ('repository', repository),
                      ('folder', folder),
                      ('community', community)])

trigger_queue_process_id = '%s_%s_%s_%s' % (trigger_queue['community'].replace('-', '_'),
                                            trigger_queue['artifact_id'].replace('-', '_'),
                                            trigger_queue['artifact_id'].replace('-', '_'),
                                            trigger_queue['version'].replace('.', '_'))

print 'This notebook will create a queue to invoke the application %s with the trigger %s' % (app_process_id,
                                                                                              trigger_queue_process_id)
This notebook will create a queue to invoke the application ec_better_ewf_satcen_01_01_01_ewf_satcen_01_01_01_0_14 with the trigger ec_better_tg_satcen_01_01_01_queue_tg_satcen_01_01_01_queue_0_8
  • Set the data discovery parameters
In [3]:
# We can use Sentinel2 index for datasets acquired out of Europe after 13th December 2013 (Sentinel2 L2A data have been being worldwide disseminated from ESA since the midlle of December 2018)
# the corresponding productType is S2MSI2A while is S2MSI2Ap for datasets produced with SEN2COR

#series = 'https://catalog.terradue.com/better-common-00001/series/results/description'
series = 'https://catalog.terradue.com/sentinel2/description'

start_date = '2018-05-01T00:00:00Z'
stop_date = '2018-05-15T23:59:59.99Z'

#s2_prd_type = 'S2MSI2Ap'
s2_prd_type = 'S2MSI2A'

geom ='MULTIPOLYGON (((-77.24516666666666 0.9424444444444445, -77.24516666666666 2.753833333333333, -79.0253888888889 2.753833333333333, -79.0253888888889 0.9424444444444445, -77.24516666666666 0.9424444444444445)), ((21.29611111111111 39.58638888888889, 21.29611111111111 41.032, 19.89788888888889 41.032, 19.89788888888889 39.58638888888889, 21.29611111111111 39.58638888888889)), ((65.02055555555556 30.43894444444445, 65.02055555555556 33.39566666666666, 63.94222222222222 33.39566666666666, 63.94222222222222 30.43894444444445, 65.02055555555556 30.43894444444445)), ((19.6875 44.82172222222222, 19.6875 45.97733333333333, 18.70788888888889 45.97733333333333, 18.70788888888889 44.82172222222222, 19.6875 44.82172222222222)))'

In [4]:
wkt = loads(geom)[1].wkt

print wkt

POLYGON ((21.29611111111111 39.58638888888889, 21.29611111111111 41.032, 19.89788888888889 41.032, 19.89788888888889 39.58638888888889, 21.29611111111111 39.58638888888889))
In [5]:
search_params = dict([('geom', wkt),
                      ('start', start_date),
                      ('stop', stop_date),
                      ('pt', s2_prd_type),
                      ('count', 200)])

ciop = cioppy.Cioppy()

search = ciop.search(end_point=series,
                     params=search_params,
                     output_fields='self,enclosure,identifier,updated,wkt',
                     model='GeoTime')
In [6]:
for index, elem in enumerate(search):
    print(index, elem['identifier'],elem['updated'])
(0, 'S2A_MSIL2A_20180514T092031_N0207_R093_T34TEL_20180514T104614', '2018-05-25T01:00:42.8136150+00:00')
(1, 'S2A_MSIL2A_20180514T092031_N0207_R093_T34TDL_20180514T104614', '2018-05-25T01:03:05.3095850+00:00')
(2, 'S2A_MSIL2A_20180514T092031_N0207_R093_T34SEJ_20180514T104614', '2018-05-25T03:18:11.2012250+00:00')
(3, 'S2A_MSIL2A_20180514T092031_N0207_R093_T34SDJ_20180514T104614', '2018-05-25T03:19:43.7902060+00:00')
(4, 'S2A_MSIL2A_20180514T092031_N0207_R093_T34TEK_20180514T104614', '2018-05-24T17:07:39.9456170+00:00')
(5, 'S2A_MSIL2A_20180514T092031_N0207_R093_T34TDK_20180514T104614', '2018-05-24T17:39:07.0482090+00:00')
(6, 'S2B_MSIL2A_20180512T093029_N0207_R136_T34SCJ_20180512T132632', '2018-05-20T15:54:35.7483940+00:00')
(7, 'S2B_MSIL2A_20180512T093029_N0207_R136_T34TCL_20180512T132632', '2018-05-20T16:28:21.9118190+00:00')
(8, 'S2B_MSIL2A_20180512T093029_N0207_R136_T34SEJ_20180512T132632', '2018-05-20T15:07:34.3113260+00:00')
(9, 'S2B_MSIL2A_20180512T093029_N0207_R136_T34SDJ_20180512T132632', '2018-05-20T17:24:06.9550720+00:00')
(10, 'S2B_MSIL2A_20180512T093029_N0207_R136_T34TCK_20180512T132632', '2018-05-20T16:00:59.9342370+00:00')
(11, 'S2B_MSIL2A_20180512T093029_N0207_R136_T34TEK_20180512T132632', '2018-05-20T17:22:25.0459840+00:00')
(12, 'S2B_MSIL2A_20180512T093029_N0207_R136_T34TEL_20180512T132632', '2018-05-20T14:21:22.2259670+00:00')
(13, 'S2B_MSIL2A_20180512T093029_N0207_R136_T34TDK_20180512T132632', '2018-05-20T13:46:56.6689750+00:00')
(14, 'S2B_MSIL2A_20180512T093029_N0207_R136_T34TDL_20180512T132632', '2018-05-20T14:11:17.9670210+00:00')
(15, 'S2B_MSIL2A_20180509T092029_N0207_R093_T34TEL_20180509T105136', '2018-05-10T22:35:48.4913870+00:00')
(16, 'S2B_MSIL2A_20180509T092029_N0207_R093_T34TDK_20180509T105136', '2018-05-10T19:39:01.1413840+00:00')
(17, 'S2B_MSIL2A_20180509T092029_N0207_R093_T34TEK_20180509T105136', '2018-05-18T17:20:10.5687380+00:00')
(18, 'S2B_MSIL2A_20180509T092029_N0207_R093_T34TDL_20180509T105136', '2018-05-10T18:32:43.1990710+00:00')
(19, 'S2B_MSIL2A_20180509T092029_N0207_R093_T34SEJ_20180509T105136', '2018-05-10T19:54:53.4182900+00:00')
(20, 'S2B_MSIL2A_20180509T092029_N0207_R093_T34SDJ_20180509T105136', '2018-05-10T19:06:18.3468450+00:00')
(21, 'S2A_MSIL2A_20180507T093041_N0207_R136_T34TDK_20180507T114155', '2018-05-13T10:53:13.5086850+00:00')
(22, 'S2A_MSIL2A_20180507T093041_N0207_R136_T34SCJ_20180507T114155', '2018-05-13T10:36:57.7154010+00:00')
(23, 'S2A_MSIL2A_20180507T093041_N0207_R136_T34TEL_20180507T114155', '2018-05-13T10:28:36.7698060+00:00')
(24, 'S2A_MSIL2A_20180507T093041_N0207_R136_T34TCK_20180507T114155', '2018-05-13T10:41:19.2254130+00:00')
(25, 'S2A_MSIL2A_20180507T093041_N0207_R136_T34SDJ_20180507T114155', '2018-05-13T10:52:34.3093740+00:00')
(26, 'S2A_MSIL2A_20180507T093041_N0207_R136_T34TCL_20180507T114155', '2018-05-13T11:02:43.4735430+00:00')
(27, 'S2A_MSIL2A_20180507T093041_N0207_R136_T34TDL_20180507T114155', '2018-05-13T10:50:28.3469670+00:00')
(28, 'S2A_MSIL2A_20180507T093041_N0207_R136_T34TEK_20180507T114155', '2018-05-13T10:29:34.1181440+00:00')
(29, 'S2A_MSIL2A_20180507T093041_N0207_R136_T34SEJ_20180507T114155', '2018-05-13T09:35:03.0925140+00:00')
(30, 'S2A_MSIL2A_20180504T092031_N0207_R093_T34TEK_20180504T112905', '2018-05-10T00:32:58.0003790+00:00')
(31, 'S2A_MSIL2A_20180504T092031_N0207_R093_T34TEL_20180504T112905', '2018-05-10T12:05:28.2802730+00:00')
(32, 'S2A_MSIL2A_20180504T092031_N0207_R093_T34TDK_20180504T112905', '2018-05-09T22:49:46.9237420+00:00')
(33, 'S2A_MSIL2A_20180504T092031_N0207_R093_T34SEJ_20180504T112905', '2018-05-09T23:06:42.7585370+00:00')
(34, 'S2A_MSIL2A_20180504T092031_N0207_R093_T34TDL_20180504T112905', '2018-05-09T23:23:02.7832920+00:00')
(35, 'S2A_MSIL2A_20180504T092031_N0207_R093_T34SDJ_20180504T112905', '2018-05-09T21:45:23.7255040+00:00')
(36, 'S2B_MSIL2A_20180502T093039_N0207_R136_T34SDJ_20180502T113747', '2018-05-16T21:31:19.0144120+00:00')
(37, 'S2B_MSIL2A_20180502T093039_N0207_R136_T34TCK_20180502T113747', '2018-05-17T00:13:54.2432620+00:00')
(38, 'S2B_MSIL2A_20180502T093039_N0207_R136_T34TDL_20180502T113747', '2018-05-17T00:23:27.8880510+00:00')
(39, 'S2B_MSIL2A_20180502T093039_N0207_R136_T34TDK_20180502T113747', '2018-05-16T21:36:44.2618730+00:00')
(40, 'S2B_MSIL2A_20180502T093039_N0207_R136_T34SEJ_20180502T113747', '2018-05-17T00:00:48.2898130+00:00')
(41, 'S2B_MSIL2A_20180502T093039_N0207_R136_T34TEK_20180502T113747', '2018-05-16T21:57:36.4389260+00:00')
(42, 'S2B_MSIL2A_20180502T093039_N0207_R136_T34SCJ_20180502T113747', '2018-05-17T00:39:30.2305240+00:00')
(43, 'S2B_MSIL2A_20180502T093039_N0207_R136_T34TEL_20180502T113747', '2018-05-16T23:49:20.9379340+00:00')
(44, 'S2B_MSIL2A_20180502T093039_N0207_R136_T34TCL_20180502T113747', '2018-05-16T21:20:47.0409240+00:00')
In [7]:
update = '2018-05-09T21:45:00Z/2018-05-09T23:59:59Z'
search_params = dict([('geom', wkt),
                       ('update', update),
                       ('pt', s2_prd_type),
                       ('count', 200)])

ciop = cioppy.Cioppy()

search = ciop.search(end_point=series,
                     params=search_params,
                     output_fields='identifier,updated,startdate,enddate,wkt',
                     model='GeoTime')
for index, elem in enumerate(search):
    print(index, elem['identifier'], elem['startdate'], elem['updated'])
(0, 'S2A_MSIL2A_20180504T092031_N0207_R093_T34TDK_20180504T112905', '2018-05-04T09:20:31.0240000Z', '2018-05-09T22:49:46.9237420+00:00')
(1, 'S2A_MSIL2A_20180504T092031_N0207_R093_T34SEJ_20180504T112905', '2018-05-04T09:20:31.0240000Z', '2018-05-09T23:06:42.7585370+00:00')
(2, 'S2A_MSIL2A_20180504T092031_N0207_R093_T34TDL_20180504T112905', '2018-05-04T09:20:31.0240000Z', '2018-05-09T23:23:02.7832920+00:00')
(3, 'S2A_MSIL2A_20180504T092031_N0207_R093_T34SDJ_20180504T112905', '2018-05-04T09:20:31.0240000Z', '2018-05-09T21:45:23.7255040+00:00')
In [8]:
search[0]
Out[8]:
{'enddate': '2018-05-04T09:20:31.0240000Z',
 'identifier': 'S2A_MSIL2A_20180504T092031_N0207_R093_T34TDK_20180504T112905',
 'startdate': '2018-05-04T09:20:31.0240000Z',
 'updated': '2018-05-09T22:49:46.9237420+00:00',
 'wkt': 'POLYGON((20.5056281296162 39.6587959885234,20.5489079313893 39.7970076359227,20.5925335873178 39.9442728844826,20.6361207385198 40.0915944199932,20.6822913718753 40.2382553213412,20.7272938865202 40.38536779668,20.7725996775118 40.5325506050779,20.8072485114097 40.6493739076368,21.1154430676197 40.6507988113653,21.1137799145263 39.6615512273907,20.5056281296162 39.6587959885234))'}
  • Plot the areas of interest and the Sentinel-1 data discovered:
In [9]:
lat = (loads(geom).bounds[3]+loads(geom).bounds[1])/2
lon = (loads(geom).bounds[2]+loads(geom).bounds[0])/2

zoom_start = 5

m = folium.Map(location=[lat, lon], zoom_start=zoom_start)

radius = 4
folium.CircleMarker(
    location=[lat, lon],
    radius=radius,
    color='#FF0000',
    stroke=False,
    fill=True,
    fill_opacity=0.6,
    opacity=1,
    popup='{} pixels'.format(radius),
    tooltip='I am in pixels',
).add_to(m)

locations = []

for index, entry in enumerate(loads(geom)):

    locations.append([t[::-1] for t in list(entry.exterior.coords)])

folium.PolyLine(
    locations=locations,
    color='#FF0000',
    weight=2,
    tooltip='',
).add_to(m)

discovery_locations = []

for index, elem in enumerate(search):

    discovery_locations.append([t[::-1] for t in list(loads(elem['wkt']).exterior.coords)])

folium.PolyLine(
    locations=discovery_locations,
    color='orange',
    weight=1,
    opacity=1,
    smooth_factor=0,
).add_to(m)

m.save(os.path.join('results', '%s_search.html' % 'discovery'))

m
Out[9]:
In [10]:
print trigger_deployer
https://ec-better-triggers-deployer.terradue.com
  • Establish the connection with the WPS server hosting the triggers:
In [11]:
wps_url = '%s/zoo-bin/zoo_loader.cgi' % trigger_deployer
wps = WebProcessingService(wps_url,
                           verbose=False,
                           skip_caps=True)

wps.getcapabilities()

process = wps.describeprocess(trigger_queue_process_id)

print process.title
Trigger for the SATCEN-01-01-01 Queue
In [12]:
print process.title
Trigger for the SATCEN-01-01-01 Queue
  • List the WPS trigger process data inputs:
In [13]:
for data_input in process.dataInputs:
    print data_input.identifier
series
data_pipeline
wps_url
process_id
api_key
tg_quotation
update
geom
product_type
flag_expr
percentage_threshold
aoi_wkt
quotation
_T2Username
  • Create a Python dictionary with the inputs:
In [15]:
print update
#product_type = 'S2MSI2Ap'
product_type = 'S2MSI2A'

tg_quotation = 'No'


quotation = 'No'

inputs = [('series', series),
          ('data_pipeline', data_pipeline),
          ('wps_url', '%s/zoo-bin/zoo_loader.cgi' % apps_deployer),
          ('process_id', app_process_id),
          ('update', update),
          ('geom', wkt),
          ('flag_expr','saturated_l1a_B4 or scl_water' ),
          ('percentage_threshold', '20'),
          ('product_type', product_type),
          ('tg_quotation', tg_quotation),
          ('api_key', datapipeline_api_key),
          ('aoi_wkt', wkt),
          ('quotation', quotation),
          ('_T2Username', data_pipeline)]




2018-05-09T21:45:00Z/2018-05-09T23:59:59Z
  • Submit the Execute WPS and monitor the request:
In [16]:
start_date_queue = datetime.utcnow().isoformat()

execution = owslib.wps.WPSExecution(url=wps_url)

execution_request = execution.buildRequest(trigger_queue_process_id,
                                           inputs,
                                           output=[('result_osd', False)])

execution_response = execution.submitRequest(etree.tostring(execution_request))

execution.parseResponse(execution_response)

print(execution.statusLocation)

monitorExecution(execution)

stop_date_queue = datetime.utcnow().isoformat()
http://ec-better-triggers-deployer.terradue.com/zoo-bin/zoo_loader.cgi?request=Execute&service=WPS&version=1.0.0&Identifier=GetStatus&DataInputs=sid=0bfd9e0a-35d8-11e9-aff9-0242ac110012&RawDataOutput=Result
  • Check the outcome of the processing request
In [17]:
if not execution.isSucceded():

    raise Exception('Processing failed')
In [18]:
update_queue = '%sZ/%sZ' % (start_date_queue, stop_date_queue)

print update_queue
2019-02-21T12:56:04.067077Z/2019-02-21T13:00:16.212351Z
  • Search the queue content
In [19]:
series_queue = 'https://catalog.terradue.com/%s/series/source-queue/description' % data_pipeline
In [20]:
search_params_queue = dict([('update', update_queue),
                            ('count', 60)])

search_queue = ciop.search(end_point=series_queue,
                           params=search_params_queue,
                           output_fields='self,enclosure,identifier,wkt',
                           model='GeoTime')
In [21]:
locations_queue = []

for index, elem in enumerate(search_queue):

    locations_queue.append([t[::-1] for t in list(loads(elem['wkt']).exterior.coords)])
In [22]:
lat = (loads(geom).bounds[3]+loads(geom).bounds[1])/2
lon = (loads(geom).bounds[2]+loads(geom).bounds[0])/2


zoom_start = 4

m = folium.Map(location=[lat, lon], zoom_start=zoom_start)

radius = 4
folium.CircleMarker(
    location=[lat, lon],
    radius=radius,
    color='#FF0000',
    stroke=False,
    fill=True,
    fill_opacity=0.6,
    opacity=1,
    popup='{} pixels'.format(radius),
    tooltip='I am in pixels',
).add_to(m)


folium.PolyLine(
    locations=locations,
    color='#FF0000',
    weight=2,
    tooltip='',
).add_to(m)

folium.PolyLine(
    locations=locations_queue,
    color='green',
    weight=1,
    opacity=1,
    smooth_factor=0,
).add_to(m)

m.save(os.path.join('results', '%s_search.html' % 'discovery'))

m
Out[22]: