WFP-01-03-02 Data transformation application - CHIRPS Rainfall Estimates (RFE) - Aggregations and AnomaliesΒΆ

This Jupyter Notebook to query the catalog for a CHIRPS product, creates a Web Processing Service (WPS) request invoking the data transformation application that was deployed in the Deploy step, monitors the WPS request execution and finally retrieves the data transformation execution results

  • First do the imports of the Python libraries required.
In [1]:
import os
import owslib
from owslib.wps import monitorExecution
from owslib.wps import WebProcessingService
import lxml.etree as etree
import cioppy

from shapely.wkt import loads

import getpass

from nbconvert.preprocessors import ExecutePreprocessor, CellExecutionError
import nbformat as nbf
  • Read the data pipeline configuration information:
In [2]:
%store -r

nb_config = os.path.join('../operations', 'configuration.ipynb')

nb = nbf.read(nb_config, 4)

exec(nb['cells'][1]['source']) in globals(), locals()

app = dict([('artifact_id', app_artifact_id.replace('ewf-', '')),
            ('version', app_version),
            ('repository', repository),
            ('community', community)])

app_process_id = '%s_%s_%s_%s' % (app['community'].replace('-', '_'), app['artifact_id'].replace('-', '_'), app['artifact_id'].replace('-', '_'), app['version'].replace('.', '_'))
In [3]:
print(app_process_id)
ec_better_wfp_01_03_02_wfp_01_03_02_1_16
  • Define the search parameters: the catalog series OpenSearch endpoint, the time of interest and the area of interest
In [4]:
series = 'https://catalog.terradue.com/chirps/description'

start_date = '2017-09-01T00:00:00Z'
stop_date = '2017-09-30T23:59:59Z'
count = 366

geom = 'MULTIPOLYGON (((19.564118379527372 -14.910538401469323,27.210602754527372 -14.910538401469323,27.210602754527372 -21.09243333786736,19.564118379527372 -21.09243333786736,19.564118379527372 -14.910538401469323)), ((12.9415 13.7579, 14.6731 13.7579, 14.6731 12.0093, 12.9415 12.0093, 12.9415 13.7579)))'

Limit the test to the AOIs:

In [5]:
wkt = loads(geom)[0].wkt

print wkt
POLYGON ((19.56411837952737 -14.91053840146932, 27.21060275452737 -14.91053840146932, 27.21060275452737 -21.09243333786736, 19.56411837952737 -21.09243333786736, 19.56411837952737 -14.91053840146932))
  • Search for CHIRPS products
In [6]:
search_params = dict([('start', start_date),
                      ('stop', stop_date),
                      ('count', count)])

In [7]:
ciop = cioppy.Cioppy()

search = ciop.search(end_point=series,
                     params=search_params,
                     output_fields='self,enclosure,identifier',
                     model='GeoTime')

  • List the CHIRPS products found
In [8]:
for index, elem in enumerate(search):
    print(index, elem['identifier'])
(0, 'chirps-v2.0.2017.09.30')
(1, 'chirps-v2.0.2017.09.29')
(2, 'chirps-v2.0.2017.09.28')
(3, 'chirps-v2.0.2017.09.27')
(4, 'chirps-v2.0.2017.09.26')
(5, 'chirps-v2.0.2017.09.25')
(6, 'chirps-v2.0.2017.09.24')
(7, 'chirps-v2.0.2017.09.23')
(8, 'chirps-v2.0.2017.09.22')
(9, 'chirps-v2.0.2017.09.21')
(10, 'chirps-v2.0.2017.09.20')
(11, 'chirps-v2.0.2017.09.19')
(12, 'chirps-v2.0.2017.09.18')
(13, 'chirps-v2.0.2017.09.17')
(14, 'chirps-v2.0.2017.09.16')
(15, 'chirps-v2.0.2017.09.15')
(16, 'chirps-v2.0.2017.09.14')
(17, 'chirps-v2.0.2017.09.13')
(18, 'chirps-v2.0.2017.09.12')
(19, 'chirps-v2.0.2017.09.11')
(20, 'chirps-v2.0.2017.09.10')
(21, 'chirps-v2.0.2017.09.09')
(22, 'chirps-v2.0.2017.09.08')
(23, 'chirps-v2.0.2017.09.07')
(24, 'chirps-v2.0.2017.09.06')
(25, 'chirps-v2.0.2017.09.05')
(26, 'chirps-v2.0.2017.09.04')
(27, 'chirps-v2.0.2017.09.03')
(28, 'chirps-v2.0.2017.09.02')
(29, 'chirps-v2.0.2017.09.01')
  • Connect to WPS server and do a GetCapabilities WPS request and list the process:
In [9]:
wps_url = 'https://ec-better-apps-deployer.terradue.com/zoo-bin/zoo_loader.cgi'
wps = WebProcessingService(wps_url, verbose=False, skip_caps=True)

wps.getcapabilities()

#process_id = 'ec_better_wfp_01_03_02_wfp_01_03_02_1_6'

process = wps.describeprocess(app_process_id)

for input in process.dataInputs:
    print(input.identifier)
source
N_10
N_30
N_60
N_90
N_120
N_150
N_180
N_270
N_365
regionOfInterest
nameOfRegion
_T2Username
In [10]:
N_10 = 'True'
N_30 = 'True'
N_60 = 'False'
N_90 = 'False'
N_120 = 'False'
N_150 = 'False'
N_180 = 'False'
N_270 = 'False'
N_365 = 'False'
In [11]:
inputs = []
for catalogue_url in search:
    inputs.append(('source', catalogue_url['self']))
inputs_2 = [('N_10', N_10), ('N_30', N_30), ('N_60', N_60), ('N_90', N_90), ('N_120', N_120), ('N_150', N_150),('N_180', N_180),('N_270', N_270), ('N_365', N_365),
           ('regionOfInterest', wkt),
           ('nameOfRegion', 'Sudan'),
           ('_T2Username', 'rirr')]
inputs += inputs_2
print(inputs)
[('source', 'https://catalog.terradue.com/chirps/search?format=atom&uid=chirps-v2.0.2017.09.30'), ('source', 'https://catalog.terradue.com/chirps/search?format=atom&uid=chirps-v2.0.2017.09.29'), ('source', 'https://catalog.terradue.com/chirps/search?format=atom&uid=chirps-v2.0.2017.09.28'), ('source', 'https://catalog.terradue.com/chirps/search?format=atom&uid=chirps-v2.0.2017.09.27'), ('source', 'https://catalog.terradue.com/chirps/search?format=atom&uid=chirps-v2.0.2017.09.26'), ('source', 'https://catalog.terradue.com/chirps/search?format=atom&uid=chirps-v2.0.2017.09.25'), ('source', 'https://catalog.terradue.com/chirps/search?format=atom&uid=chirps-v2.0.2017.09.24'), ('source', 'https://catalog.terradue.com/chirps/search?format=atom&uid=chirps-v2.0.2017.09.23'), ('source', 'https://catalog.terradue.com/chirps/search?format=atom&uid=chirps-v2.0.2017.09.22'), ('source', 'https://catalog.terradue.com/chirps/search?format=atom&uid=chirps-v2.0.2017.09.21'), ('source', 'https://catalog.terradue.com/chirps/search?format=atom&uid=chirps-v2.0.2017.09.20'), ('source', 'https://catalog.terradue.com/chirps/search?format=atom&uid=chirps-v2.0.2017.09.19'), ('source', 'https://catalog.terradue.com/chirps/search?format=atom&uid=chirps-v2.0.2017.09.18'), ('source', 'https://catalog.terradue.com/chirps/search?format=atom&uid=chirps-v2.0.2017.09.17'), ('source', 'https://catalog.terradue.com/chirps/search?format=atom&uid=chirps-v2.0.2017.09.16'), ('source', 'https://catalog.terradue.com/chirps/search?format=atom&uid=chirps-v2.0.2017.09.15'), ('source', 'https://catalog.terradue.com/chirps/search?format=atom&uid=chirps-v2.0.2017.09.14'), ('source', 'https://catalog.terradue.com/chirps/search?format=atom&uid=chirps-v2.0.2017.09.13'), ('source', 'https://catalog.terradue.com/chirps/search?format=atom&uid=chirps-v2.0.2017.09.12'), ('source', 'https://catalog.terradue.com/chirps/search?format=atom&uid=chirps-v2.0.2017.09.11'), ('source', 'https://catalog.terradue.com/chirps/search?format=atom&uid=chirps-v2.0.2017.09.10'), ('source', 'https://catalog.terradue.com/chirps/search?format=atom&uid=chirps-v2.0.2017.09.09'), ('source', 'https://catalog.terradue.com/chirps/search?format=atom&uid=chirps-v2.0.2017.09.08'), ('source', 'https://catalog.terradue.com/chirps/search?format=atom&uid=chirps-v2.0.2017.09.07'), ('source', 'https://catalog.terradue.com/chirps/search?format=atom&uid=chirps-v2.0.2017.09.06'), ('source', 'https://catalog.terradue.com/chirps/search?format=atom&uid=chirps-v2.0.2017.09.05'), ('source', 'https://catalog.terradue.com/chirps/search?format=atom&uid=chirps-v2.0.2017.09.04'), ('source', 'https://catalog.terradue.com/chirps/search?format=atom&uid=chirps-v2.0.2017.09.03'), ('source', 'https://catalog.terradue.com/chirps/search?format=atom&uid=chirps-v2.0.2017.09.02'), ('source', 'https://catalog.terradue.com/chirps/search?format=atom&uid=chirps-v2.0.2017.09.01'), ('N_10', 'True'), ('N_30', 'True'), ('N_60', 'False'), ('N_90', 'False'), ('N_120', 'False'), ('N_150', 'False'), ('N_180', 'False'), ('N_270', 'False'), ('N_365', 'False'), ('regionOfInterest', 'POLYGON ((19.56411837952737 -14.91053840146932, 27.21060275452737 -14.91053840146932, 27.21060275452737 -21.09243333786736, 19.56411837952737 -21.09243333786736, 19.56411837952737 -14.91053840146932))'), ('nameOfRegion', 'Sudan'), ('_T2Username', 'rirr')]
In [12]:
app_deployed = False

for index, elem in enumerate(wps.processes):
    if elem.identifier == app_process_id:
        app_deployed = True

if app_deployed:
    print 'Process %s deployed' % app_process_id
else:
    raise Exception('Process %s not deployed' % app_process_id)
Process ec_better_wfp_01_03_02_wfp_01_03_02_1_16 deployed
  • Select the process and print the title and abstract after having submited a WPS DescribeProcess request
In [13]:
process = wps.describeprocess(app_process_id)

print process.title

print process.abstract
CHIRPS Rainfall Estimates (RFE) - Aggregations
TBD
  • List the WPS process inputs:
In [14]:
for data_input in process.dataInputs:
    print data_input.identifier
source
N_10
N_30
N_60
N_90
N_120
N_150
N_180
N_270
N_365
regionOfInterest
nameOfRegion
_T2Username
  • Submit the Execute WPS request:
In [15]:
execution = owslib.wps.WPSExecution(url=wps.url)

execution_request = execution.buildRequest(app_process_id,
                                           inputs,
                                           output=[('result_osd', False)])

execution_response = execution.submitRequest(etree.tostring(execution_request))

execution.parseResponse(execution_response)
print(execution.statusLocation)
http://ec-better-apps-deployer.terradue.com/zoo-bin/zoo_loader.cgi?request=Execute&service=WPS&version=1.0.0&Identifier=GetStatus&DataInputs=sid=e5cb374c-3c08-11e9-b124-0242ac11000f&RawDataOutput=Result
  • Monitor the request:
In [16]:
execution.statusLocation
Out[16]:
'http://ec-better-apps-deployer.terradue.com/zoo-bin/zoo_loader.cgi?request=Execute&service=WPS&version=1.0.0&Identifier=GetStatus&DataInputs=sid=e5cb374c-3c08-11e9-b124-0242ac11000f&RawDataOutput=Result'
In [17]:
monitorExecution(execution)
  • Check the outcome of the processing request
In [18]:
if not execution.isSucceded():
    raise Exception('Processing failed')
  • Search for the results produced
In [19]:
results_osd = execution.processOutputs[0].reference

print results_osd
https://recast.terradue.com/t2api/describe/rirr/_results/workflows/ec_better_wfp_01_03_02_wfp_01_03_02_1_16/run/e5cb374c-3c08-11e9-b124-0242ac11000f/0022575-181221095105003-oozie-oozi-W
In [20]:
from geopandas import GeoDataFrame
search_results = GeoDataFrame(ciop.search(end_point=results_osd,
                         params=[],
                         output_fields='title,enclosure',
                         model='GeoTime',
                            timeout=50000))

search_results
Out[20]:
enclosure title
0 https://store.terradue.com/rirr/_results/workf... Output CHIRPSv2_Sudan_N10_daystotal_2017-09-01...
1 https://store.terradue.com/rirr/_results/workf... Output CHIRPSv2_Sudan_N10_daystotal_2017-09-11...
2 https://store.terradue.com/rirr/_results/workf... Output CHIRPSv2_Sudan_N10_daystotal_2017-09-21...
3 https://store.terradue.com/rirr/_results/workf... Output CHIRPSv2_Sudan_N30_countaboveone_2017-0...
4 https://store.terradue.com/rirr/_results/workf... Output CHIRPSv2_Sudan_N30_daystotal_2017-09-01...
5 https://store.terradue.com/rirr/_results/workf... Output CHIRPSv2_Sudan_N30_dryspell_2017-09-01_...
6 https://store.terradue.com/rirr/_results/workf... result.ipynb
In [21]:
search_results['title'][0]
Out[21]:
'Output CHIRPSv2_Sudan_N10_daystotal_2017-09-01_2017-09-10.tif'
In [ ]:
recast_process_id = 'dataPublication'
recast_wps_url = 'https://recast.terradue.com/t2api/ows'

wps = WebProcessingService(recast_wps_url,
                           verbose=False,
                           skip_caps=False)

recast_inputs = [('items', results_osd),
                  ('index', data_pipeline),
                  ('_T2ApiKey', data_pipeline_api_key),
                  ('_T2Username', data_pipeline)]

recast_execution = wps.execute(recast_process_id,
                               recast_inputs,
                               output = [('result_osd', True)])


monitorExecution(recast_execution, sleepSecs=60)

etree.fromstring(recast_execution.processOutputs[0].data[0]).xpath('./@href')[0]