Test the data transformation applicationΒΆ

This Jupyter Notebook:

  • queries the catalog for input product(s)
  • creates a Web Processing Service (WPS) request invoking the application that was deployed in the Deploy step
  • monitors the WPS request execution and finally retrieves the data transformation execution results
  • First do the imports of the Python libraries required
In [122]:
import os
import owslib
from owslib.wps import monitorExecution
from owslib.wps import WebProcessingService
import lxml.etree as etree
import cioppy

from shapely.wkt import loads

import getpass

from nbconvert.preprocessors import ExecutePreprocessor, CellExecutionError
import nbformat as nbf

from shapely.wkt import loads
from shapely.geometry import Polygon
from shapely.ops import cascaded_union
from shapely.geometry import box

import py_earthquakes

import pandas as pd
from geopandas import GeoDataFrame
import geopandas
import folium
import requests
import json
  • Read the data pipeline configuration information:
In [3]:
%store -r

nb_config = os.path.join('../operations', 'configuration.ipynb')

nb = nbf.read(nb_config, 4)

exec(nb['cells'][2]['source']) in globals(), locals()

app = dict([('artifact_id', app_artifact_id),
            ('version', app_version),
            ('repository', repository),
            ('community', community)])

app_process_id = '%s_%s_%s_%s' % (app['community'].replace('-', '_'), app['artifact_id'].replace('-', '_'), app['artifact_id'].replace('-', '_'), app['version'].replace('.', '_'))
In [4]:
with open('coverage.txt') as f:
    list_wkt = f.read().splitlines()
In [5]:
polygons = []

for index, wkt in enumerate(list_wkt):
    polygons.append(loads(wkt))

union = cascaded_union(polygons)

store the coverage in the catalogue

In [116]:
coverage = dict()

coverage['geometry'] = union.wkt
coverage['identifier'] = 'service-coverage'

c = GeoDataFrame(coverage, index=['0'])
In [117]:
c
Out[117]:
geometry identifier
0 MULTIPOLYGON (((114.5 -8.800000000000001, 110.... service-coverage
In [118]:
c['geometry'] = c['geometry'].apply(loads)
In [121]:
try:
    os.remove('coverage.geojson')
except OSError:
    pass

c.to_file('coverage.geojson', driver='GeoJSON')
In [108]:
with open('coverage.geojson') as json_data:
    data = json.load(json_data,)
In [126]:
endpoint = 'https://catalog.terradue.com/%s' % data_pipeline

headers = {"Content-Type": "application/json"}

request_track = requests.post(endpoint,
                                  data=json.dumps(data),
                                  headers=headers,
                                  auth=(data_pipeline, datapipeline_api_key))

print request_track.status_code
200

** search for earthquakes **

In [110]:
eq_search = py_earthquakes.EarthQuakes()

# Get search params
bbox = (-180, -90, 180, 90)
min_mag = 5
start_date = '2018-01-09'
end_date= '2018-01-20'

# Make the search

eq_search.search(start_date, end_date, min_mag=min_mag, bbox=bbox)
In [8]:
eqs = pd.DataFrame(eq_search.earthquakes)
In [9]:
eq_search.earthquakes[0].wkt
Out[9]:
'POINT(45.7137 33.7156)'
In [10]:
def expand_eq(row, coverage):

    series = dict()

    series['identifier'] = row[0].id

    series['date'] = row[0].date

    series['geometry'] = row[0].wkt

    # is point in polygon
    series['covered'] = union.contains(loads(row[0].wkt))

    return pd.Series(series)
In [11]:
eqs = eqs.merge(eqs.apply(lambda row: expand_eq(row, union), axis=1),
              left_index=True,
              right_index=True)
In [12]:
eqs['geometry'] = eqs['geometry'].apply(loads)
In [13]:
eqs.head()
Out[13]:
0 covered date geometry identifier
0 <py_earthquakes.EarthQuake instance at 0x7f935... True 2018-01-19T23:17:55.880000Z POINT (45.7137 33.7156) us2000clji
1 <py_earthquakes.EarthQuake instance at 0x7f935... False 2018-01-19T17:17:44.720000Z POINT (-111.0785 26.6862) us2000cl83
2 <py_earthquakes.EarthQuake instance at 0x7f935... False 2018-01-19T02:11:39.030000Z POINT (-74.578 -13.229) us2000cl06
3 <py_earthquakes.EarthQuake instance at 0x7f935... False 2018-01-18T18:48:40.550000Z POINT (132.8126 -6.3958) usd0008ke6
4 <py_earthquakes.EarthQuake instance at 0x7f935... False 2018-01-18T15:58:42.040000Z POINT (-93.1769 14.5745) us2000cksw
In [14]:
selected = eqs[eqs['covered'] == True]
In [15]:
selected
Out[15]:
0 covered date geometry identifier
0 <py_earthquakes.EarthQuake instance at 0x7f935... True 2018-01-19T23:17:55.880000Z POINT (45.7137 33.7156) us2000clji
22 <py_earthquakes.EarthQuake instance at 0x7f935... True 2018-01-11T19:42:59.510000Z POINT (96.1686 18.4505) us2000cifw
23 <py_earthquakes.EarthQuake instance at 0x7f935... True 2018-01-11T19:28:38.430000Z POINT (95.9558 18.4229) us2000cpbb
24 <py_earthquakes.EarthQuake instance at 0x7f935... True 2018-01-11T19:26:24.240000Z POINT (96.07170000000001 18.3719) us2000cifa
26 <py_earthquakes.EarthQuake instance at 0x7f935... True 2018-01-11T18:03:18.470000Z POINT (-84.9196 10.6577) us2000cidd
28 <py_earthquakes.EarthQuake instance at 0x7f935... True 2018-01-11T09:00:39.860000Z POINT (45.7027 33.7205) us2000ci4h
29 <py_earthquakes.EarthQuake instance at 0x7f935... True 2018-01-11T08:55:00.430000Z POINT (45.7897 33.711) us2000ci4f
30 <py_earthquakes.EarthQuake instance at 0x7f935... True 2018-01-11T08:21:43.600000Z POINT (46.42 33.2164) us2000ci3z
31 <py_earthquakes.EarthQuake instance at 0x7f935... True 2018-01-11T08:14:15.990000Z POINT (45.7299 33.8017) us2000ci3y
32 <py_earthquakes.EarthQuake instance at 0x7f935... True 2018-01-11T08:00:52.390000Z POINT (45.7935 33.8578) us2000ci3x
33 <py_earthquakes.EarthQuake instance at 0x7f935... True 2018-01-11T07:59:30.470000Z POINT (45.7239 33.7131) us2000ci3s
In [16]:
m = folium.Map(
    location=[45, 90],
    zoom_start=2,
    tiles='Stamen Terrain'
)

tooltip = 'Click me!'

for index, row in selected.iterrows():

    folium.Marker([eqs.iloc[index]['geometry'].y, eqs.iloc[index]['geometry'].x], popup='<i>%s</i>' % eqs.iloc[index]['identifier']).add_to(m)


m
Out[16]:
  • Connect to the WPS server
In [17]:
wps_url = '%s/zoo-bin/zoo_loader.cgi' % apps_deployer

wps = WebProcessingService(wps_url,
                           verbose=False,
                           skip_caps=True)
  • Do a GetCapabilities WPS request and list the process:
In [18]:
wps.getcapabilities()
In [19]:
app_deployed = False

for index, elem in enumerate(wps.processes):
    if elem.identifier == app_process_id:
        app_deployed = True

if app_deployed:
    print 'Process %s deployed' % app_process_id
else:
    raise Exception('Process %s not deployed' % app_process_id)
Process ec_better_ewf_ethz_01_02_01_ewf_ethz_01_02_01_0_7 deployed
  • Select the process and print the title and abstract after having submited a WPS DescribeProcess request
In [20]:
process = wps.describeprocess(app_process_id)

print process.title

print process.abstract
ETHZ-01-02-01 Filtered DInSAR interferograms
ETHZ-01-02-01 Filtered DInSAR interferograms
  • List the WPS process inputs:
In [21]:
for data_input in process.dataInputs:
    print data_input.identifier
source
buffer_size
_T2Username
  • Create a Python dictionary with the inputs:
In [22]:
source = eqs_list = ','.join(selected['identifier'].values)
buffer_size = '0.9'

In [23]:
inputs = [('source', source),
          ('buffer_size', buffer_size),
          ('quotation', 'No'),
          ('_T2Username', data_pipeline)]
  • Submit the Execute WPS request:
In [24]:
execution = owslib.wps.WPSExecution(url=wps.url)

execution_request = execution.buildRequest(app_process_id,
                                           inputs,
                                           output=[('result_osd', False)])

execution_response = execution.submitRequest(etree.tostring(execution_request))

execution.parseResponse(execution_response)
  • Monitor the request:
In [ ]:
execution.statusLocation
'http://ec-better-apps-deployer.terradue.com/zoo-bin/zoo_loader.cgi?request=Execute&service=WPS&version=1.0.0&Identifier=GetStatus&DataInputs=sid=54975e74-fa48-11e8-a78d-0242ac11000f&RawDataOutput=Result'
In [ ]:
monitorExecution(execution)
  • Check the outcome of the processing request
In [ ]:
if not execution.isSucceded():
    raise Exception('Processing failed')

import os import owslib from owslib.wps import monitorExecution from owslib.wps import WebProcessingService import lxml.etree as etree import cioppy

from shapely.wkt import loads

import getpass

from nbconvert.preprocessors import ExecutePreprocessor, CellExecutionError import nbformat as nbf

from shapely.wkt import loads from shapely.geometry import Polygon from shapely.ops import cascaded_union from shapely.geometry import box

import py_earthquakes

import pandas as pd from geopandas import geopandas

In [ ]:
results_osd = execution.processOutputs[0].reference

print results_osd
In [ ]:
recast_process_id = 'dataPublication'
recast_wps_url = 'https://recast.terradue.com/t2api/ows'

wps = WebProcessingService(recast_wps_url,
                           verbose=False,
                           skip_caps=False)

recast_inputs = [('items', results_osd),
                  ('index', data_pipeline),
                  ('_T2ApiKey', data_pipeline_api_key),
                  ('_T2Username', data_pipeline)]

recast_execution = wps.execute(recast_process_id,
                               recast_inputs,
                               output = [('result_osd', True)])


monitorExecution(recast_execution, sleepSecs=60)

etree.fromstring(recast_execution.processOutputs[0].data[0]).xpath('./@href')[0]

In [131]:
url = 'https://catalog.terradue.com/better-ethz-00002/search?uid=service-coverage'
In [135]:
ciop = cioppy.Cioppy()
In [137]:
wkt = ciop.search(end_point=url,
                 params=[],
                 model='GeoTime',
                 output_fields='wkt')[0]['wkt']
In [141]:
loads(wkt)
Self-intersection at or near point 7.96 47.484000000000002
Out[141]:
../../../../_images/pipelines_ETH-Z_ethz-01-02-01_application_test_51_1.svg