Test the data transformation application

This Jupyter Notebook to query the catalog for a set of Sentinel-1 SLC products, creates a Web Processing Service (WPS) request invoking the data transformation application that was deployed in the Deploy step, monitors the WPS request execution and finally retrieves the data transformation execution results

  • First do the imports of the Python libraries required
In [1]:
app = dict([('artifact_id', 'ewf-ethz-01-03-01'),
            ('version', '0.2'),
            ('repository', 'Gitlab Groups'),
            ('community', 'ec-better')])

process_id = '%s_%s_%s_%s' % (app['community'].replace('-', '_'), app['artifact_id'].replace('-', '_'), app['artifact_id'].replace('-', '_'), app['version'].replace('.', '_'))
In [2]:
import os
import owslib
from owslib.wps import monitorExecution
from owslib.wps import WebProcessingService
import lxml.etree as etree
import cioppy
import numpy as np
from shapely.wkt import loads
from shapely.geometry import box

import getpass

ciop = cioppy.Cioppy()

from datetime import datetime, timedelta
import dateutil.parser
import folium

import py_earthquakes

import geopandas as gp

Search for an earthquake

In [5]:
bbox=[21.944860, 36.940880,23.944860, 38.940880]

min_mag = 4.1

start_date = '2016-10-01'

stop_date = '2016-11-30'

eq_search = py_earthquakes.EarthQuakes(start_date,
                                       stop_date,
                                       min_mag = min_mag,
                                       bbox=bbox)

In [6]:
eq_search.earthquakes[0].title
Out[6]:
'M 4.2 - 1km SSW of Makrakomi, Greece'
In [8]:
eq = eq_search.earthquakes[0]
In [9]:
geo_buffer = 0.1

bbox = '%s,%s,%s,%s' % (loads(eq.wkt).buffer(geo_buffer).bounds[0],
                        loads(eq.wkt).buffer(geo_buffer).bounds[1],
                        loads(eq.wkt).buffer(geo_buffer).bounds[2],
                        loads(eq.wkt).buffer(geo_buffer).bounds[3])


aoi_wkt = box(*loads(eq.wkt).buffer(0.5).bounds).wkt

Search parameters

Set the catalogue endpoint to Sentinel-1:

In [10]:
series = 'https://catalog.terradue.com/sentinel1/search'

Define the end of the time of interest:

In [11]:
slave_search_stop_date = (dateutil.parser.parse(eq_search.earthquakes[0].date) + timedelta(days=6)).isoformat()

Search for the pre-event masters

In [20]:
master_search_start_date = (dateutil.parser.parse(slave['startdate']) + timedelta(days=-24)).isoformat()
In [21]:
master_search_stop_date = (dateutil.parser.parse(slave['startdate']) + timedelta(days=-1)).isoformat()
In [22]:
master_search_params = dict([('geom', slave['wkt']),
                           ('track', slave['track']),
                            ('pt',slave['productType']),
                            ('start', master_search_start_date),
                            ('stop', master_search_stop_date)])
In [23]:
try:
    master_search = ciop.search(end_point=series,
                            params=master_search_params,
                            output_fields='identifier,enclosure,self,startdate,wkt',
                            model='EOP')
except IndexError:
    print('no masters')
In [24]:
result = []

for index, elem in enumerate(master_search):
    print(index, elem['wkt'])

    master_wkt = loads(elem['wkt'])

    result.append({'self' : elem['self'],
                   'identifier' : elem['identifier'],
                   'enclosure' : elem['enclosure'],
                   'wkt': loads(elem['wkt']),
                   'aoi_intersec' : (master_wkt.intersection(aoi).area/aoi.area) * 100,
                   'slave_intersec' : (master_wkt.intersection(loads(slave['wkt']))).area / loads(slave['wkt']).area * 100,
                   'contains': master_wkt.contains(aoi),
                   'days': (dateutil.parser.parse(slave['startdate']) - dateutil.parser.parse(elem['startdate'])).days
                  })

masters = gp.GeoDataFrame(result)
(0, 'POLYGON((19.520258 40.118217,22.542038 40.522858,22.889389 38.845127,19.942297 38.439388,19.520258 40.118217))')
(1, 'POLYGON((19.906658 38.567837,22.860817 38.973766,23.19211 37.353638,20.304918 36.946354,19.906658 38.567837))')
(2, 'POLYGON((19.368031 40.835575,22.396507 41.235806,22.733843 39.616779,19.779461 39.21563,19.368031 40.835575))')
(3, 'POLYGON((19.736567 39.342892,22.70108 39.744785,23.034298 38.125225,20.13876 37.722118,19.736567 39.342892))')
(4, 'POLYGON((20.105968 37.850845,23.007288 38.253937,23.336889 36.633373,20.499729 36.22879,20.105968 37.850845))')
(5, 'POLYGON((19.367752 40.834766,22.396078 41.234989,22.733395 39.616081,19.77915 39.214939,19.367752 40.834766))')
(6, 'POLYGON((19.736254 39.342201,22.700668 39.744091,23.033894 38.124527,20.138449 37.72142,19.736254 39.342201))')
(7, 'POLYGON((20.105749 37.849777,23.00696 38.252869,23.336514 36.632545,20.499449 36.227962,20.105749 37.849777))')
In [25]:
masters
Out[25]:
aoi_intersec contains days enclosure identifier self slave_intersec wkt
0 68.636837 False 6 https://store.terradue.com/download/sentinel1/... S1B_IW_SLC__1SDV_20161024T163111_20161024T1631... https://catalog.terradue.com/sentinel1/search?... 55.755995 POLYGON ((19.520258 40.118217, 22.542038 40.52...
1 44.640286 False 6 https://store.terradue.com/download/sentinel1/... S1B_IW_SLC__1SDV_20161024T163046_20161024T1631... https://catalog.terradue.com/sentinel1/search?... 52.199313 POLYGON ((19.906658 38.567837, 22.860817 38.97...
2 0.000000 False 11 https://store.terradue.com/download/sentinel1/... S1A_IW_SLC__1SDV_20161018T163206_20161018T1632... https://catalog.terradue.com/sentinel1/search?... 7.893120 POLYGON ((19.368031 40.835575, 22.396507 41.23...
3 100.000000 True 12 https://store.terradue.com/download/sentinel1/... S1A_IW_SLC__1SDV_20161018T163141_20161018T1632... https://catalog.terradue.com/sentinel1/search?... 99.881697 POLYGON ((19.736567 39.342892, 22.70108 39.744...
4 0.000000 False 12 https://store.terradue.com/download/sentinel1/... S1A_IW_SLC__1SDV_20161018T163116_20161018T1631... https://catalog.terradue.com/sentinel1/search?... 7.974487 POLYGON ((20.105968 37.850845, 23.007288 38.25...
5 0.000000 False 23 https://store.terradue.com/download/sentinel1/... S1A_IW_SLC__1SDV_20161006T163206_20161006T1632... https://catalog.terradue.com/sentinel1/search?... 7.933108 POLYGON ((19.367752 40.834766, 22.396078 41.23...
6 100.000000 True 24 https://store.terradue.com/download/sentinel1/... S1A_IW_SLC__1SDV_20161006T163141_20161006T1632... https://catalog.terradue.com/sentinel1/search?... 99.917743 POLYGON ((19.736254 39.342201, 22.700668 39.74...
7 0.000000 False 24 https://store.terradue.com/download/sentinel1/... S1A_IW_SLC__1SDV_20161006T163116_20161006T1631... https://catalog.terradue.com/sentinel1/search?... 7.912009 POLYGON ((20.105749 37.849777, 23.00696 38.252...
In [26]:
master_1 = master_search[masters.sort_values(['aoi_intersec', 'days'], ascending=[False, False]).iloc[1].name]
master_2 = master_search[masters.sort_values(['aoi_intersec', 'days'], ascending=[False, False]).iloc[0].name]
In [27]:
s1_identifiers = []
s1_references = []
locations = []

for product in [slave, master_1, master_2]:

    locations.append([t[::-1] for t in list(loads(product['wkt']).exterior.coords)])

    s1_identifiers.append(product['identifier'])
    s1_references.append(product['self'])

Plot the Sentinel-1 products (slave, master 1 and master 2), the earthquake point and its area of interest

In [28]:
lat = loads(eq_search.earthquakes[0].wkt).y
lon = loads(eq_search.earthquakes[0].wkt).x

zoom_start = 7

m = folium.Map(location=[lat, lon], zoom_start=zoom_start)

radius = 4
folium.CircleMarker(
    location=[lat, lon],
    radius=radius,
    color='#FF0000',
    stroke=False,
    fill=True,
    fill_opacity=0.6,
    opacity=1,
    popup='{} pixels'.format(radius),
    tooltip='I am in pixels',
).add_to(m)


folium.PolyLine(
    locations=np.asarray([t[::-1] for t in list(loads(aoi_wkt).exterior.coords)]).tolist(),
    color='#FF0000',
    weight=2,
    tooltip='Japan flooding',
).add_to(m)

folium.PolyLine(
    locations=locations,
    color='orange',
    weight=1,
    opacity=1,
    smooth_factor=0,
).add_to(m)

m.save(os.path.join('results', '%s_final.html' % eq_search.earthquakes[0].id))

m
Out[28]:

Prepare the variables assignment for the Jupyter Notebook streaming executable

In [29]:
print 'input_identifiers = %s' % s1_identifiers
input_identifiers = ['S1A_IW_SLC__1SDV_20161030T163141_20161030T163208_013722_01603F_4094', 'S1A_IW_SLC__1SDV_20161018T163141_20161018T163208_013547_015AEB_5994', 'S1A_IW_SLC__1SDV_20161006T163141_20161006T163208_013372_01554F_F08D']
In [30]:
print 'input_references = %s' % s1_references
input_references = ['https://catalog.terradue.com/sentinel1/search?format=atom&uid=S1A_IW_SLC__1SDV_20161030T163141_20161030T163208_013722_01603F_4094', 'https://catalog.terradue.com/sentinel1/search?format=atom&uid=S1A_IW_SLC__1SDV_20161018T163141_20161018T163208_013547_015AEB_5994', 'https://catalog.terradue.com/sentinel1/search?format=atom&uid=S1A_IW_SLC__1SDV_20161006T163141_20161006T163208_013372_01554F_F08D']
In [46]:
wps_url = 'https://ec-better-apps-deployer.terradue.com/zoo-bin/zoo_loader.cgi'

wps = WebProcessingService(wps_url,
                           verbose=False,
                           skip_caps=True)
  • Do a GetCapabilities WPS request and list the process:
In [47]:
wps.getcapabilities()
In [48]:
for index, elem in enumerate(wps.processes):
    print(index, elem.identifier)
(0, 'ec_better_ewf_wfp_01_02_02_ewf_wfp_01_02_02_0_1')
(1, 'ec_better_ewf_wfp_01_01_02_wfp_01_01_02_0_1')
(2, 'ec_better_wfp_01_01_01_wfp_01_01_01_1_5')
(3, 'ec_better_ewf_sen2cor_sen2cor_0_3')
(4, 'ec_better_ewf_ethz_01_01_01_ewf_ethz_01_01_01_0_2')
(5, 'ec_better_ewf_wfp_01_01_01_ewf_wfp_01_01_01_1_9')
(6, 'ec_better_wfp_01_01_01_wfp_01_01_01_0_4')
(7, 'ec_better_wfp_01_01_01_wfp_01_01_01_1_7')
(8, 'TerradueUnDeployProcess')
(9, 'ec_better_ewf_sen2cor_sen2cor_0_4')
(10, 'ec_better_ewf_ethz_01_01_01_ewf_ethz_01_01_01_0_1')
(11, 'ec_better_wfp_01_01_01_wfp_01_01_01_1_1')
(12, 'ec_better_ewf_sen2cor_sen2cor_0_1')
(13, 'ec_better_wfp_01_01_01_wfp_01_01_01_0_1')
(14, 'ec_better_ewf_ethz_01_03_01_ewf_ethz_01_03_01_0_2')
(15, 'ec_better_ewf_wfp_01_01_02_ewf_wfp_01_01_02_0_2')
(16, 'ec_better_wfp_01_01_01_wfp_01_01_01_1_6')
(17, 'GetStatus')
(18, 'ec_better_ewf_sen2cor_ewf_sen2cor_0_6')
(19, 'ec_better_wfp_01_01_01_wfp_01_01_01_1_2')
(20, 'TerradueDeployProcess')
(21, 'ec_better_ewf_sen2cor_sen2cor_0_2')
(22, 'ec_better_wfp_01_01_01_a_wfp_01_01_01_0_1')
(23, 'ec_better_wfp_01_01_01_wfp_01_01_01_1_0')
  • Select the process and print the title and abstract after having submited a WPS DescribeProcess request
In [49]:
process = wps.describeprocess(process_id)
In [50]:
process.title
Out[50]:
'Sentinel-1 pre and coseismic deformation maps'
In [51]:
process.abstract
Out[51]:
'Sentinel-1 pre and coseismic deformation maps'
  • List the WPS process inputs:
In [52]:
for data_input in process.dataInputs:
    print data_input.identifier
source
format
quotation
_T2Username
In [53]:
source = [slave['self'],
          master_1['self']]


  • Create a Python dictionary with the inputs:
In [54]:
inputs = [('source', ','.join(source)),
          ('format', 'GeoTIFF-BigTIFF'),
          ('quotation', 'No'),
          ('_T2Username', 'fbrito')]
  • Submit the Execute WPS request:
In [55]:
execution = owslib.wps.WPSExecution(url=wps.url)

execution_request = execution.buildRequest(process_id,
                                           inputs,
                                           output=[('result_osd', False)])

execution_response = execution.submitRequest(etree.tostring(execution_request))

execution.parseResponse(execution_response)
  • Monitor the request:
In [56]:
execution.statusLocation
Out[56]:
'http://ec-better-apps-deployer.terradue.com/zoo-bin/zoo_loader.cgi?request=Execute&service=WPS&version=1.0.0&Identifier=GetStatus&DataInputs=sid=6ec93e58-a218-11e8-a6e0-0242ac110007&RawDataOutput=Result'
In [57]:
monitorExecution(execution)
 owslib.wps.WPSException : {'locator': None, 'code': 'InternalError', 'text': 'Unable to run the Service. The message returned back by the Service was the following: GetStatus was unable to find any cache file for Service ID 6ec93e58-a218-11e8-a6e0-0242ac110007.'}
 owslib.wps.WPSException : {'locator': None, 'code': 'InternalError', 'text': 'Unable to run the Service. The message returned back by the Service was the following: GetStatus was unable to find any cache file for Service ID 6ec93e58-a218-11e8-a6e0-0242ac110007.'}
 owslib.wps.WPSException : {'locator': None, 'code': 'InternalError', 'text': 'Unable to run the Service. The message returned back by the Service was the following: GetStatus was unable to find any cache file for Service ID 6ec93e58-a218-11e8-a6e0-0242ac110007.'}

KeyboardInterruptTraceback (most recent call last)
<ipython-input-57-1cd684eda421> in <module>()
----> 1 monitorExecution(execution)

/opt/anaconda/lib/python2.7/site-packages/owslib/wps.pyc in monitorExecution(execution, sleepSecs, download, filepath)
   1629
   1630     while execution.isComplete() == False:
-> 1631         execution.checkStatus(sleepSecs=sleepSecs)
   1632         log.info('Execution status: %s' % execution.status)
   1633

/opt/anaconda/lib/python2.7/site-packages/owslib/wps.pyc in checkStatus(self, url, response, sleepSecs)
    650                      self.statusLocation)
    651             response = reader.readFromUrl(
--> 652                 self.statusLocation, username=self.username, password=self.password)
    653         else:
    654             response = reader.readFromString(response)

/opt/anaconda/lib/python2.7/site-packages/owslib/wps.pyc in readFromUrl(self, url, data, method, username, password)
    480         """
    481
--> 482         return self._readFromUrl(url, data, method, username=username, password=password)
    483
    484

/opt/anaconda/lib/python2.7/site-packages/owslib/wps.pyc in _readFromUrl(self, url, data, method, username, password)
    396             spliturl = request_url.split('?')
    397             u = openURL(spliturl[0], spliturl[
--> 398                         1], method='Get', username=username, password=password)
    399             return etree.fromstring(u.read())
    400

/opt/anaconda/lib/python2.7/site-packages/owslib/util.pyc in openURL(url_base, data, method, cookies, username, password, timeout, headers)
    177                            url_base,
    178                            headers=headers,
--> 179                            **rkwargs)
    180
    181     if req.status_code in [400, 401]:

/opt/anaconda/lib/python2.7/site-packages/requests/api.pyc in request(method, url, **kwargs)
     54     # cases, and look like a memory leak in others.
     55     with sessions.Session() as session:
---> 56         return session.request(method=method, url=url, **kwargs)
     57
     58

/opt/anaconda/lib/python2.7/site-packages/requests/sessions.pyc in request(self, method, url, params, data, headers, cookies, files, auth, timeout, allow_redirects, proxies, hooks, stream, verify, cert, json)
    472             hooks = hooks,
    473         )
--> 474         prep = self.prepare_request(req)
    475
    476         proxies = proxies or {}

/opt/anaconda/lib/python2.7/site-packages/requests/sessions.pyc in prepare_request(self, request)
    401             data=request.data,
    402             json=request.json,
--> 403             headers=merge_setting(request.headers, self.headers, dict_class=CaseInsensitiveDict),
    404             params=merge_setting(request.params, self.params),
    405             auth=merge_setting(auth, self.auth),

/opt/anaconda/lib/python2.7/site-packages/requests/sessions.pyc in merge_setting(request_setting, session_setting, dict_class)
     59         return request_setting
     60
---> 61     merged_setting = dict_class(to_key_val_list(session_setting))
     62     merged_setting.update(to_key_val_list(request_setting))
     63

/opt/anaconda/lib/python2.7/site-packages/requests/structures.pyc in __init__(self, data, **kwargs)
     44         if data is None:
     45             data = {}
---> 46         self.update(data, **kwargs)
     47
     48     def __setitem__(self, key, value):

/opt/anaconda/lib/python2.7/_abcoll.pyc in update(*args, **kwds)
    570             else:
    571                 for key, value in other:
--> 572                     self[key] = value
    573         for key, value in kwds.items():
    574             self[key] = value

/opt/anaconda/lib/python2.7/site-packages/requests/structures.pyc in __setitem__(self, key, value)
     49         # Use the lowercased key for lookups, but store the actual
     50         # key alongside the value.
---> 51         self._store[key.lower()] = (key, value)
     52
     53     def __getitem__(self, key):

/opt/anaconda/lib/python2.7/site-packages/requests/packages/urllib3/packages/ordered_dict.pyc in __setitem__(self, key, value, dict_setitem)
     50             last = root[0]
     51             last[1] = root[0] = self.__map[key] = [last, root, key]
---> 52         dict_setitem(self, key, value)
     53
     54     def __delitem__(self, key, dict_delitem=dict.__delitem__):

KeyboardInterrupt:
  • Check the outcome of the processing request
In [27]:
execution.isSucceded()
Out[27]:
True
  • Search for the results produced
In [32]:
for output in execution.processOutputs:
    print(output.identifier)
result_osd
In [33]:
results_osd = execution.processOutputs[0].reference
In [34]:
results_osd
Out[34]:
'https://recast.terradue.com/t2api/describe/fbrito/_results/workflows/ec_better_ewf_ethz_01_01_01_ewf_ethz_01_01_01_0_2/run/3711c858-9f8a-11e8-9f35-0242ac110007/0004884-180330140554685-oozie-oozi-W'
In [35]:
api_key = getpass.getpass('The %s Ellip API key:' % data_pipeline)

NameErrorTraceback (most recent call last)
<ipython-input-35-4248bb9db3ef> in <module>()
----> 1 api_key = getpass.getpass('The %s Ellip API key:' % data_pipeline)

NameError: name 'data_pipeline' is not defined
In [ ]:
data_pipeline = 'better-ethz-00001'

recast_process_id = 'dataPublication'
recast_wps_url = 'https://recast.terradue.com/t2api/ows'

wps = WebProcessingService(recast_wps_url,
                           verbose=False,
                           skip_caps=False)

recast_inputs = [('items', results_osd),
                 ('index', data_pipeline),
                 ('_T2ApiKey', api_key),
                 ('_T2Username', data_pipeline)]

recast_execution = wps.execute(recast_process_id,
                               recast_inputs,
                               output = [('result_osd', True)])


monitorExecution(recast_execution, sleepSecs=60)

etree.fromstring(recast_execution.processOutputs[0].data[0]).xpath('./@href')[0]