Test the data transformation application¶
This Jupyter Notebook to query the catalog for a Sentinel-1 GRD product, creates a Web Processing Service (WPS) request invoking the data transformation application that was deployed in the Deploy step, monitors the WPS request execution and finally retrieves the data transformation execution results
- First do the imports of the Python libraries required
In [1]:
import os
import owslib
from owslib.wps import monitorExecution
from owslib.wps import WebProcessingService
import lxml.etree as etree
import cioppy
from shapely.wkt import loads
import getpass
from nbconvert.preprocessors import ExecutePreprocessor, CellExecutionError
import nbformat as nbf
- Read the data pipeline configuration information:
In [2]:
%store -r
nb_config = os.path.join('../operations', 'configuration.ipynb')
nb = nbf.read(nb_config, 4)
exec(nb['cells'][1]['source']) in globals(), locals()
app = dict([('artifact_id', app_artifact_id),
('version', app_version),
('repository', repository),
('community', community)])
app_process_id = '%s_%s_%s_%s' % (app['community'].replace('-', '_'), app['artifact_id'].replace('-', '_'), app['artifact_id'].replace('-', '_'), app['version'].replace('.', '_'))
In [3]:
app_process_id
Out[3]:
'ec_better_ewf_wfp_01_01_01_ewf_wfp_01_01_01_1_20'
- Define the search parameters: the catalog series OpenSearch endpoint, the time of interest and the area of interest
In [4]:
series = 'https://catalog.terradue.com/sentinel1/search'
start_date = '2017-12-09T00:00:00'
stop_date = '2017-12-11T23:59:59'
s1_prd_type = 'GRD'
geom = 'MULTIPOLYGON (((26.832 9.5136, 28.6843 9.5136, 28.6843 7.8009, 26.832 7.8009, 26.832 9.5136)), ((43.721626 4.577164,45.956501 5.041664,46.261715 3.533043,44.031803 3.064234,43.721626 4.577164)), ((-5.5 17.26, -1.08 17.26, -1.08 13.5, -5.5 13.5, -5.5 17.26)), ((12.9415 13.7579, 14.6731 13.7579, 14.6731 12.0093, 12.9415 12.0093, 12.9415 13.7579)))'
Limit the test to one of the AOIs:
In [5]:
wkt = loads(geom)[1].wkt
print wkt
POLYGON ((43.721626 4.577164, 45.956501 5.041664, 46.261715 3.533043, 44.031803 3.064234, 43.721626 4.577164))
- Search for Sentinel-1 GRD products in one of the polygons of the area of interest
In [6]:
search_params = dict([('geom', wkt),
('start', start_date),
('stop', stop_date),
('pt', s1_prd_type)])
In [7]:
ciop = cioppy.Cioppy()
search = ciop.search(end_point=series,
params=search_params,
output_fields='self,enclosure,identifier,updated,wkt',
model='GeoTime')
- List the Sentinel-1 GRD products found
In [8]:
for index, elem in enumerate(search):
print(index, elem['updated'],elem['identifier'])
(0, '2017-12-09T19:22:14.0937910+00:00', 'S1B_IW_GRDH_1SDV_20171209T150740_20171209T150805_008644_00F5AB_87B2')
(1, '2017-12-09T19:12:30.7262480+00:00', 'S1B_IW_GRDH_1SDV_20171209T150711_20171209T150740_008644_00F5AB_744A')
In [9]:
search_params2 = dict([('geom', wkt),
('update', '2017-12-09T19:12:30Z/2017-12-09T19:22:15Z'),
('pt', s1_prd_type)])
In [10]:
ciop = cioppy.Cioppy()
search2 = ciop.search(end_point=series,
params=search_params2,
output_fields='self,enclosure,identifier,updated',
model='GeoTime')
In [11]:
for index, elem in enumerate(search2):
print(index, elem['updated'],elem['identifier'])
(0, '2017-12-09T19:22:14.0937910+00:00', 'S1B_IW_GRDH_1SDV_20171209T150740_20171209T150805_008644_00F5AB_87B2')
(1, '2017-12-09T19:12:30.7262480+00:00', 'S1B_IW_GRDH_1SDV_20171209T150711_20171209T150740_008644_00F5AB_744A')
print search2[0][‘self’] product1 = ciop.search(end_point=search2[0][‘self’], params=[], output_fields=’‘) print (product1)
- Select the Sentinel-1 GRD product to process
In [12]:
s1_index = 0
- Connect to the WPS server
In [13]:
wps_url = '%s/zoo-bin/zoo_loader.cgi' % apps_deployer
wps = WebProcessingService(wps_url,
verbose=False,
skip_caps=True)
- Do a GetCapabilities WPS request and list the process:
In [23]:
wps.getcapabilities()
In [24]:
app_deployed = False
for index, elem in enumerate(wps.processes):
if elem.identifier == app_process_id:
app_deployed = True
if app_deployed:
print 'Process %s deployed' % app_process_id
else:
raise Exception('Process %s not deployed' % app_process_id)
Process ec_better_ewf_wfp_01_01_01_ewf_wfp_01_01_01_1_20 deployed
- Select the process and print the title and abstract after having submited a WPS DescribeProcess request
In [16]:
process = wps.describeprocess(app_process_id)
print process.title
print process.abstract
WFP-01-01-01 Sentinel-1 backscatter timeseries
WFP-01-01-01 Data transformation application - Sentinel-1 backscatter timeseries
- List the WPS process inputs:
In [18]:
for data_input in process.dataInputs:
print data_input.identifier
source
filterSizeX
filterSizeY
wkt
_T2Username
- Create a Python dictionary with the inputs:
In [19]:
filterSizeX = '5'
filterSizeY = '5'
self = search2[s1_index]['self']
In [20]:
inputs = [('source', self),
('filterSizeX', filterSizeX),
('filterSizeY', filterSizeY),
('wkt', wkt),
('_T2Username', data_pipeline)]
- Submit the Execute WPS request:
In [21]:
execution = owslib.wps.WPSExecution(url=wps.url)
execution_request = execution.buildRequest(app_process_id,
inputs,
output=[('result_osd', False)])
execution_response = execution.submitRequest(etree.tostring(execution_request))
execution.parseResponse(execution_response)
- Monitor the request:
In [22]:
execution.statusLocation
Out[22]:
'http://ec-better-apps-deployer.terradue.com/zoo-bin/zoo_loader.cgi?request=Execute&service=WPS&version=1.0.0&Identifier=GetStatus&DataInputs=sid=21fd03d6-343d-11e9-9c20-0242ac11000f&RawDataOutput=Result'
In [14]:
monitorExecution(execution)
- Check the outcome of the processing request
In [15]:
if not execution.isSucceded():
raise Exception('Processing failed')
- Search for the results produced
In [ ]:
results_osd = execution.processOutputs[0].reference
print results_osd
In [ ]:
recast_process_id = 'dataPublication'
recast_wps_url = 'https://recast.terradue.com/t2api/ows'
wps = WebProcessingService(recast_wps_url,
verbose=False,
skip_caps=False)
recast_inputs = [('items', results_osd),
('index', data_pipeline),
('_T2ApiKey', data_pipeline_api_key),
('_T2Username', data_pipeline)]
recast_execution = wps.execute(recast_process_id,
recast_inputs,
output = [('result_osd', True)])
monitorExecution(recast_execution, sleepSecs=60)
etree.fromstring(recast_execution.processOutputs[0].data[0]).xpath('./@href')[0]