Test the data transformation applicationΒΆ

This Jupyter Notebook to query the catalog for a Sentinel-2 L2A products, creates a Web Processing Service (WPS) request invoking the data transformation application that was deployed in the Deploy step, monitors the WPS request execution and finally retrieves the data transformation execution results

  • First do the imports of the Python libraries required
In [22]:
import os
import owslib
from owslib.wps import monitorExecution
from owslib.wps import WebProcessingService
import lxml.etree as etree
import cioppy

from shapely.wkt import loads

import getpass

from nbconvert.preprocessors import ExecutePreprocessor, CellExecutionError
import nbformat as nbf
  • Read the data pipeline configuration information:
In [23]:
%store -r

nb_config = os.path.join('../operations', 'configuration.ipynb')

nb = nbf.read(nb_config, 4)

exec(nb['cells'][1]['source']) in globals(), locals()

app = dict([('artifact_id', app_artifact_id),
            ('version', app_version),
            ('repository', repository),
            ('community', community)])

app_process_id = '%s_%s_%s_%s' % (app['community'].replace('-', '_'), app['artifact_id'].replace('-', '_'), app['artifact_id'].replace('-', '_'), app['version'].replace('.', '_'))
  • Define the search parameters: the catalog series OpenSearch endpoint, the time of interest and the area of interest
In [24]:
series = 'https://catalog.terradue.com/sentinel2/search'

start_date = '2018-08-28T00:00:00'
stop_date = '2018-08-28T23:59:59'

s2_prd_type = 'S2MSI2A'

geom = 'MULTIPOLYGON (((14.9997721265064 38.0212298555682,15.0339421243074 38.1469295398089,15.076722011609 38.2943830685215,15.1196939590664 38.4418167678655,15.1628416993738 38.5892613166416,15.2061510925436 38.7366798078584,15.2388261265461 38.8477042183055,16.2649601148006 38.8421486728812,16.2478654087227 37.8528295032852,14.9997726359092 37.8594419593273,14.9997721265064 38.0212298555682))), ((0, 0, 0, 0, 0))'

Limit the test to one of the AOIs:

In [25]:
wkt = loads(geom).wkt
print wkt
MULTIPOLYGON (((14.9997721265064 38.0212298555682, 15.0339421243074 38.1469295398089, 15.076722011609 38.2943830685215, 15.1196939590664 38.4418167678655, 15.1628416993738 38.5892613166416, 15.2061510925436 38.7366798078584, 15.2388261265461 38.8477042183055, 16.2649601148006 38.8421486728812, 16.2478654087227 37.8528295032852, 14.9997726359092 37.8594419593273, 14.9997721265064 38.0212298555682)))
  • Search for Sentinel-2 products in one of the polygons of the area of interest
In [26]:
search_params = dict([('geom', wkt),
                      ('start', start_date),
                      ('stop', stop_date),
                      #('pt', s1_prd_type)
                      ('pt', s2_prd_type)])
In [27]:
ciop = cioppy.Cioppy()

search = ciop.search(end_point=series,
                     params=search_params,
                     output_fields='self,enclosure,identifier',
                     model='GeoTime')
  • List the Sentinel-2 products found
In [28]:
for index, elem in enumerate(search):
    print(index, elem['identifier'])
(0, 'S2A_MSIL2A_20180828T094031_N0208_R036_T33SXD_20180828T143211')
(1, 'S2A_MSIL2A_20180828T094031_N0208_R036_T33SXC_20180828T143211')
(2, 'S2A_MSIL2A_20180828T094031_N0208_R036_T33SXB_20180828T143211')
(3, 'S2A_MSIL2A_20180828T094031_N0208_R036_T33SWC_20180828T143211')
(4, 'S2A_MSIL2A_20180828T094031_N0208_R036_T33SWB_20180828T143211')
(5, 'S2A_MSIL2A_20180828T094031_N0208_R036_T33SWD_20180828T143211')
(6, 'S2A_MSIL2A_20180828T094031_N0208_R036_T33SVB_20180828T143211')
(7, 'S2A_MSIL2A_20180828T094031_N0208_R036_T33SVC_20180828T143211')
  • Select the Sentinel-2 S2MSI2A product to process
In [29]:
s2_index = 3
print s2_index
3
  • Connect to the WPS server
In [30]:
wps_url = '%s/zoo-bin/zoo_loader.cgi' % apps_deployer

wps = WebProcessingService(wps_url,
                           verbose=False,
                           skip_caps=True)
  • Do a GetCapabilities WPS request and list the process:
In [31]:
wps.getcapabilities()
In [32]:
app_deployed = False

for index, elem in enumerate(wps.processes):
    if elem.identifier == app_process_id:
        app_deployed = True

if app_deployed:
    print 'Process %s deployed' % app_process_id
else:
    raise Exception('Process %s not deployed' % app_process_id)
Process ec_better_ewf_wfp_01_02_01_ewf_wfp_01_02_01_0_2 deployed
  • Select the process and print the title and abstract after having submited a WPS DescribeProcess request
In [33]:
process = wps.describeprocess(app_process_id)

print process.title

print process.abstract
wfp-01-02-01
WFP-01-02-01 Sentinel-2 reflectances and vegetation indices
  • List the WPS process inputs:
In [34]:
for data_input in process.dataInputs:
    print data_input.identifier
source
resolution
wkt
flag_expr
_T2Username
  • Create a Python dictionary with the inputs:
In [36]:
resolution = '10'
flag_expr = '( saturated_l1a_B4 or scl_water )'
wkt = 'POLYGON((14.9997721265064 38.0212298555682,15.0339421243074 38.1469295398089,15.076722011609 38.2943830685215,15.1196939590664 38.4418167678655,15.1628416993738 38.5892613166416,15.2061510925436 38.7366798078584,15.2388261265461 38.8477042183055,16.2649601148006 38.8421486728812,16.2478654087227 37.8528295032852,14.9997726359092 37.8594419593273,14.9997721265064 38.0212298555682))'
In [37]:
inputs = [('source', search[s2_index]['self']),
          ('resolution', resolution),
          ('flag_expr', flag_expr),
          ('wkt', wkt),
          ('_T2Username', data_pipeline)]
  • Submit the Execute WPS request:
In [38]:
execution = owslib.wps.WPSExecution(url=wps.url)

execution_request = execution.buildRequest(app_process_id,
                                           inputs,
                                           output=[('result_osd', False)])

execution_response = execution.submitRequest(etree.tostring(execution_request))

execution.parseResponse(execution_response)
  • Monitor the request:
In [39]:
execution.statusLocation
Out[39]:
'http://ec-better-apps-deployer.terradue.com/zoo-bin/zoo_loader.cgi?request=Execute&service=WPS&version=1.0.0&Identifier=GetStatus&DataInputs=sid=46db4496-d8fe-11e8-ad73-0242ac11000f&RawDataOutput=Result'
In [40]:
monitorExecution(execution)
  • Check the outcome of the processing request
In [41]:
if not execution.isSucceded():
    raise Exception('Processing failed')
  • Search for the results produced
In [42]:
results_osd = execution.processOutputs[0].reference

print results_osd
https://recast.terradue.com/t2api/describe/better-wfp-00003/_results/workflows/ec_better_ewf_wfp_01_02_01_ewf_wfp_01_02_01_0_2/run/46db4496-d8fe-11e8-ad73-0242ac11000f/0022422-180330140554685-oozie-oozi-W
In [43]:
recast_process_id = 'dataPublication'
recast_wps_url = 'https://recast.terradue.com/t2api/ows'

wps = WebProcessingService(recast_wps_url,
                           verbose=False,
                           skip_caps=False)

recast_inputs = [('items', results_osd),
                  ('index', data_pipeline),
                  ('_T2ApiKey', datapipeline_api_key),
                  ('_T2Username', data_pipeline)]

recast_execution = wps.execute(recast_process_id,
                               recast_inputs,
                               output = [('result_osd', True)])


monitorExecution(recast_execution, sleepSecs=60)

etree.fromstring(recast_execution.processOutputs[0].data[0]).xpath('./@href')[0]

Out[43]:
'https://catalog.terradue.com:443/better-wfp-00003/description'