Test the data transformation application on Production Centre

This Jupyter Notebook to query the catalog for a Sentinel2 product, creates a Web Processing Service (WPS) request invoking the data transformation application that was deployed in the Deploy step, monitors the WPS request execution and finally retrieves the data transformation execution results

  • First do the imports of the Python libraries required
In [1]:
import os
import owslib
from owslib.wps import monitorExecution
from owslib.wps import WebProcessingService
import lxml.etree as etree
import cioppy

from shapely.wkt import loads

import getpass

from nbconvert.preprocessors import ExecutePreprocessor, CellExecutionError
import nbformat as nbf
  • Read the data pipeline configuration information:
In [2]:
%store -r

nb_config = os.path.join('../operations', 'configuration.ipynb')

nb = nbf.read(nb_config, 4)

exec(nb['cells'][1]['source']) in globals(), locals()

app = dict([('artifact_id', app_artifact_id),
            ('version', app_version),
            ('repository', repository),
            ('community', community)])

app_process_id = '%s_%s_%s_%s' % (app['community'].replace('-', '_'), app['artifact_id'].replace('-', '_'), app['artifact_id'].replace('-', '_'), app['version'].replace('.', '_'))
In [3]:
app_process_id
Out[3]:
'ec_better_ewf_satcen_01_01_01_ewf_satcen_01_01_01_0_14'
  • Define the search parameters: the catalog series OpenSearch endpoint, the time of interest and the area of interest
In [4]:
# We can use Sentinel2 index for datasets acquired out of Europe after 13th December 2013 (Sentinel2 L2A data have been being worldwide disseminated from ESA since the midlle of December 2018)
# the corresponding productType is S2MSI2A while is S2MSI2Ap for datasets produced with SEN2COR

#series = 'https://catalog.terradue.com/better-common-00001/series/results/description'
series = 'https://catalog.terradue.com/sentinel2/search'

start_date = '2018-07-01T00:00:00'
stop_date = '2018-09-30T23:59:59'

#s2_prd_type = 'S2MSI2Ap'
s2_prd_type = 'S2MSI2A'

geom = 'MULTIPOLYGON (((-77.24516666666666 0.9424444444444445, -77.24516666666666 2.753833333333333, -79.0253888888889 2.753833333333333, -79.0253888888889 0.9424444444444445, -77.24516666666666 0.9424444444444445)), ((21.29611111111111 39.58638888888889, 21.29611111111111 41.032, 19.89788888888889 41.032, 19.89788888888889 39.58638888888889, 21.29611111111111 39.58638888888889)), ((65.02055555555556 30.43894444444445, 65.02055555555556 33.39566666666666, 63.94222222222222 33.39566666666666, 63.94222222222222 30.43894444444445, 65.02055555555556 30.43894444444445)), ((19.6875 44.82172222222222, 19.6875 45.97733333333333, 18.70788888888889 45.97733333333333, 18.70788888888889 44.82172222222222, 19.6875 44.82172222222222)))'

Limit the test to one of the AOIs:

In [5]:
wkt = loads(geom)[1].wkt

print wkt
print wkt.replace(' ', '%20').replace(',', '%2C')
POLYGON ((21.29611111111111 39.58638888888889, 21.29611111111111 41.032, 19.89788888888889 41.032, 19.89788888888889 39.58638888888889, 21.29611111111111 39.58638888888889))
POLYGON%20((21.29611111111111%2039.58638888888889%2C%2021.29611111111111%2041.032%2C%2019.89788888888889%2041.032%2C%2019.89788888888889%2039.58638888888889%2C%2021.29611111111111%2039.58638888888889))
  • Search for Sentinel-2 products in one of the polygons of the area of interest
In [6]:
search_params = dict([('geom', wkt),
                      ('start', start_date),
                      ('stop', stop_date),
                      ('pt', s2_prd_type)
                     ])
In [7]:
ciop = cioppy.Cioppy()

search = ciop.search(end_point=series,
                     params=search_params,
                     output_fields='identifier,self,enclosure,wkt',
                     model='GeoTime')
  • List the Sentinel-2 products found
In [8]:
for index, elem in enumerate(search):
    print(index, elem['identifier'])
(0, 'S2B_MSIL2A_20180929T093029_N0208_R136_T34TEL_20180929T140703')
(1, 'S2B_MSIL2A_20180929T093029_N0208_R136_T34TEK_20180929T140703')
(2, 'S2B_MSIL2A_20180929T093029_N0208_R136_T34TDK_20180929T140703')
(3, 'S2B_MSIL2A_20180929T093029_N0208_R136_T34SDJ_20180929T140703')
(4, 'S2B_MSIL2A_20180929T093029_N0208_R136_T34SEJ_20180929T140703')
(5, 'S2B_MSIL2A_20180929T093029_N0208_R136_T34SCJ_20180929T140703')
(6, 'S2B_MSIL2A_20180929T093029_N0208_R136_T34TDL_20180929T140703')
(7, 'S2B_MSIL2A_20180929T093029_N0208_R136_T34TCK_20180929T140703')
(8, 'S2B_MSIL2A_20180929T093029_N0208_R136_T34TCL_20180929T140703')
(9, 'S2B_MSIL2A_20180926T092019_N0208_R093_T34TEK_20180926T143716')
(10, 'S2B_MSIL2A_20180926T092019_N0208_R093_T34TEL_20180926T143716')
(11, 'S2B_MSIL2A_20180926T092019_N0208_R093_T34SEJ_20180926T143716')
(12, 'S2B_MSIL2A_20180926T092019_N0208_R093_T34SDJ_20180926T143716')
(13, 'S2B_MSIL2A_20180926T092019_N0208_R093_T34TDL_20180926T143716')
(14, 'S2B_MSIL2A_20180926T092019_N0208_R093_T34TDK_20180926T143716')
(15, 'S2A_MSIL2A_20180924T093031_N0208_R136_T34TDL_20180924T122639')
(16, 'S2A_MSIL2A_20180924T093031_N0208_R136_T34TDK_20180924T122639')
(17, 'S2A_MSIL2A_20180924T093031_N0208_R136_T34TCL_20180924T122639')
(18, 'S2A_MSIL2A_20180924T093031_N0208_R136_T34TCK_20180924T122639')
(19, 'S2A_MSIL2A_20180924T093031_N0208_R136_T34SEJ_20180924T122639')
  • Select the Sentinel-2 product to process
In [9]:
s2_index = 18
In [10]:
search[s2_index]['self']
Out[10]:
'https://catalog.terradue.com/sentinel2/search?format=atom&uid=S2A_MSIL2A_20180924T093031_N0208_R136_T34TCK_20180924T122639'
In [11]:
search[s2_index]['enclosure']
Out[11]:
'https://store.terradue.com/download/sentinel2/files/v1/S2A_MSIL2A_20180924T093031_N0208_R136_T34TCK_20180924T122639'

Check if selected product still have intersection with the aoi_wkt

In [12]:
from shapely.wkt import loads

geom1 = loads(search[s2_index]['wkt'])
geom2 = loads(wkt)


if not geom1.intersects(geom2):
    raise ValueError('Area of Interest has no intersection with selected product.')
  • Connect to the WPS server
In [13]:
wps_url = '%s/zoo-bin/zoo_loader.cgi' % apps_deployer

wps = WebProcessingService(wps_url,
                           verbose=False,
                           skip_caps=True)
In [14]:
print wps_url
https://ec-better-apps-deployer.terradue.com/zoo-bin/zoo_loader.cgi
  • Do a GetCapabilities WPS request and list the process:
In [15]:
wps.getcapabilities()
In [16]:
app_deployed = False

for index, elem in enumerate(wps.processes):
    if elem.identifier == app_process_id:
        app_deployed = True

if app_deployed:
    print 'Process %s deployed' % app_process_id
else:
    raise Exception('Process %s not deployed' % app_process_id)
Process ec_better_ewf_satcen_01_01_01_ewf_satcen_01_01_01_0_14 deployed
  • Select the process and print the title and abstract after having submited a WPS DescribeProcess request
In [17]:
process = wps.describeprocess(app_process_id)

print process.title

print process.abstract
NDVI-NDWI & Cloud Coverage Filtering
Sentinel-2 NDVI NDWI
  • List the WPS process inputs:
In [18]:
for data_input in process.dataInputs:
    print data_input.identifier
source
aoi_wkt
flag_expr
percentage_threshold
_T2Username
  • Create a Python dictionary with the inputs:
In [19]:
search[s2_index]['self']
Out[19]:
'https://catalog.terradue.com/sentinel2/search?format=atom&uid=S2A_MSIL2A_20180924T093031_N0208_R136_T34TCK_20180924T122639'
In [20]:
inputs = [('source', search[s2_index]['self']),
          ('aoi_wkt', wkt),
          ('flag_expr','saturated_l1a_B4 or scl_water' ),
          ('percentage_threshold', '20'),
          ('_T2Username', data_pipeline)]
  • Submit the Execute WPS request:
In [21]:
execution = owslib.wps.WPSExecution(url=wps.url)

execution_request = execution.buildRequest(app_process_id,
                                           inputs,
                                           output=[('result_osd', False)])

execution_response = execution.submitRequest(etree.tostring(execution_request))

execution.parseResponse(execution_response)
  • Monitor the request:
In [ ]:
execution.statusLocation
'http://ec-better-apps-deployer.terradue.com/zoo-bin/zoo_loader.cgi?request=Execute&service=WPS&version=1.0.0&Identifier=GetStatus&DataInputs=sid=a3e33572-35d8-11e9-bbfd-0242ac11000f&RawDataOutput=Result'
In [ ]:
monitorExecution(execution)
  • Check the outcome of the processing request
In [ ]:
if not execution.isSucceded():

    raise Exception('Processing failed')
In [ ]:
results_osd = execution.processOutputs[0].reference

print results_osd
In [ ]:
recast_process_id = 'dataPublication'
recast_wps_url = 'https://recast.terradue.com/t2api/ows'

wps = WebProcessingService(recast_wps_url,
                           verbose=False,
                           skip_caps=False)

recast_inputs = [('items', results_osd),
                  ('index', data_pipeline),
                  ('_T2ApiKey', datapipeline_api_key),
                  ('_T2Username', data_pipeline)]

recast_execution = wps.execute(recast_process_id,
                               recast_inputs,
                               output = [('result_osd', True)])


monitorExecution(recast_execution, sleepSecs=60)

etree.fromstring(recast_execution.processOutputs[0].data[0]).xpath('./@href')[0]