Test the data transformation applicationΒΆ
This Jupyter Notebook:
- queries the catalog for input product(s)
- creates a Web Processing Service (WPS) request invoking the application that was deployed in the Deploy step
- monitors the WPS request execution and finally retrieves the data transformation execution results
- First do the imports of the Python libraries required
In [122]:
import os
import owslib
from owslib.wps import monitorExecution
from owslib.wps import WebProcessingService
import lxml.etree as etree
import cioppy
from shapely.wkt import loads
import getpass
from nbconvert.preprocessors import ExecutePreprocessor, CellExecutionError
import nbformat as nbf
from shapely.wkt import loads
from shapely.geometry import Polygon
from shapely.ops import cascaded_union
from shapely.geometry import box
import py_earthquakes
import pandas as pd
from geopandas import GeoDataFrame
import geopandas
import folium
import requests
import json
- Read the data pipeline configuration information:
In [3]:
%store -r
nb_config = os.path.join('../operations', 'configuration.ipynb')
nb = nbf.read(nb_config, 4)
exec(nb['cells'][2]['source']) in globals(), locals()
app = dict([('artifact_id', app_artifact_id),
('version', app_version),
('repository', repository),
('community', community)])
app_process_id = '%s_%s_%s_%s' % (app['community'].replace('-', '_'), app['artifact_id'].replace('-', '_'), app['artifact_id'].replace('-', '_'), app['version'].replace('.', '_'))
In [4]:
with open('coverage.txt') as f:
list_wkt = f.read().splitlines()
In [5]:
polygons = []
for index, wkt in enumerate(list_wkt):
polygons.append(loads(wkt))
union = cascaded_union(polygons)
store the coverage in the catalogue
In [116]:
coverage = dict()
coverage['geometry'] = union.wkt
coverage['identifier'] = 'service-coverage'
c = GeoDataFrame(coverage, index=['0'])
In [117]:
c
Out[117]:
geometry | identifier | |
---|---|---|
0 | MULTIPOLYGON (((114.5 -8.800000000000001, 110.... | service-coverage |
In [118]:
c['geometry'] = c['geometry'].apply(loads)
In [121]:
try:
os.remove('coverage.geojson')
except OSError:
pass
c.to_file('coverage.geojson', driver='GeoJSON')
In [108]:
with open('coverage.geojson') as json_data:
data = json.load(json_data,)
In [126]:
endpoint = 'https://catalog.terradue.com/%s' % data_pipeline
headers = {"Content-Type": "application/json"}
request_track = requests.post(endpoint,
data=json.dumps(data),
headers=headers,
auth=(data_pipeline, datapipeline_api_key))
print request_track.status_code
200
** search for earthquakes **
In [110]:
eq_search = py_earthquakes.EarthQuakes()
# Get search params
bbox = (-180, -90, 180, 90)
min_mag = 5
start_date = '2018-01-09'
end_date= '2018-01-20'
# Make the search
eq_search.search(start_date, end_date, min_mag=min_mag, bbox=bbox)
In [8]:
eqs = pd.DataFrame(eq_search.earthquakes)
In [9]:
eq_search.earthquakes[0].wkt
Out[9]:
'POINT(45.7137 33.7156)'
In [10]:
def expand_eq(row, coverage):
series = dict()
series['identifier'] = row[0].id
series['date'] = row[0].date
series['geometry'] = row[0].wkt
# is point in polygon
series['covered'] = union.contains(loads(row[0].wkt))
return pd.Series(series)
In [11]:
eqs = eqs.merge(eqs.apply(lambda row: expand_eq(row, union), axis=1),
left_index=True,
right_index=True)
In [12]:
eqs['geometry'] = eqs['geometry'].apply(loads)
In [13]:
eqs.head()
Out[13]:
0 | covered | date | geometry | identifier | |
---|---|---|---|---|---|
0 | <py_earthquakes.EarthQuake instance at 0x7f935... | True | 2018-01-19T23:17:55.880000Z | POINT (45.7137 33.7156) | us2000clji |
1 | <py_earthquakes.EarthQuake instance at 0x7f935... | False | 2018-01-19T17:17:44.720000Z | POINT (-111.0785 26.6862) | us2000cl83 |
2 | <py_earthquakes.EarthQuake instance at 0x7f935... | False | 2018-01-19T02:11:39.030000Z | POINT (-74.578 -13.229) | us2000cl06 |
3 | <py_earthquakes.EarthQuake instance at 0x7f935... | False | 2018-01-18T18:48:40.550000Z | POINT (132.8126 -6.3958) | usd0008ke6 |
4 | <py_earthquakes.EarthQuake instance at 0x7f935... | False | 2018-01-18T15:58:42.040000Z | POINT (-93.1769 14.5745) | us2000cksw |
In [14]:
selected = eqs[eqs['covered'] == True]
In [15]:
selected
Out[15]:
0 | covered | date | geometry | identifier | |
---|---|---|---|---|---|
0 | <py_earthquakes.EarthQuake instance at 0x7f935... | True | 2018-01-19T23:17:55.880000Z | POINT (45.7137 33.7156) | us2000clji |
22 | <py_earthquakes.EarthQuake instance at 0x7f935... | True | 2018-01-11T19:42:59.510000Z | POINT (96.1686 18.4505) | us2000cifw |
23 | <py_earthquakes.EarthQuake instance at 0x7f935... | True | 2018-01-11T19:28:38.430000Z | POINT (95.9558 18.4229) | us2000cpbb |
24 | <py_earthquakes.EarthQuake instance at 0x7f935... | True | 2018-01-11T19:26:24.240000Z | POINT (96.07170000000001 18.3719) | us2000cifa |
26 | <py_earthquakes.EarthQuake instance at 0x7f935... | True | 2018-01-11T18:03:18.470000Z | POINT (-84.9196 10.6577) | us2000cidd |
28 | <py_earthquakes.EarthQuake instance at 0x7f935... | True | 2018-01-11T09:00:39.860000Z | POINT (45.7027 33.7205) | us2000ci4h |
29 | <py_earthquakes.EarthQuake instance at 0x7f935... | True | 2018-01-11T08:55:00.430000Z | POINT (45.7897 33.711) | us2000ci4f |
30 | <py_earthquakes.EarthQuake instance at 0x7f935... | True | 2018-01-11T08:21:43.600000Z | POINT (46.42 33.2164) | us2000ci3z |
31 | <py_earthquakes.EarthQuake instance at 0x7f935... | True | 2018-01-11T08:14:15.990000Z | POINT (45.7299 33.8017) | us2000ci3y |
32 | <py_earthquakes.EarthQuake instance at 0x7f935... | True | 2018-01-11T08:00:52.390000Z | POINT (45.7935 33.8578) | us2000ci3x |
33 | <py_earthquakes.EarthQuake instance at 0x7f935... | True | 2018-01-11T07:59:30.470000Z | POINT (45.7239 33.7131) | us2000ci3s |
In [16]:
m = folium.Map(
location=[45, 90],
zoom_start=2,
tiles='Stamen Terrain'
)
tooltip = 'Click me!'
for index, row in selected.iterrows():
folium.Marker([eqs.iloc[index]['geometry'].y, eqs.iloc[index]['geometry'].x], popup='<i>%s</i>' % eqs.iloc[index]['identifier']).add_to(m)
m
Out[16]:
- Connect to the WPS server
In [17]:
wps_url = '%s/zoo-bin/zoo_loader.cgi' % apps_deployer
wps = WebProcessingService(wps_url,
verbose=False,
skip_caps=True)
- Do a GetCapabilities WPS request and list the process:
In [18]:
wps.getcapabilities()
In [19]:
app_deployed = False
for index, elem in enumerate(wps.processes):
if elem.identifier == app_process_id:
app_deployed = True
if app_deployed:
print 'Process %s deployed' % app_process_id
else:
raise Exception('Process %s not deployed' % app_process_id)
Process ec_better_ewf_ethz_01_02_01_ewf_ethz_01_02_01_0_7 deployed
- Select the process and print the title and abstract after having submited a WPS DescribeProcess request
In [20]:
process = wps.describeprocess(app_process_id)
print process.title
print process.abstract
ETHZ-01-02-01 Filtered DInSAR interferograms
ETHZ-01-02-01 Filtered DInSAR interferograms
- List the WPS process inputs:
In [21]:
for data_input in process.dataInputs:
print data_input.identifier
source
buffer_size
_T2Username
- Create a Python dictionary with the inputs:
In [22]:
source = eqs_list = ','.join(selected['identifier'].values)
buffer_size = '0.9'
In [23]:
inputs = [('source', source),
('buffer_size', buffer_size),
('quotation', 'No'),
('_T2Username', data_pipeline)]
- Submit the Execute WPS request:
In [24]:
execution = owslib.wps.WPSExecution(url=wps.url)
execution_request = execution.buildRequest(app_process_id,
inputs,
output=[('result_osd', False)])
execution_response = execution.submitRequest(etree.tostring(execution_request))
execution.parseResponse(execution_response)
- Monitor the request:
In [ ]:
execution.statusLocation
'http://ec-better-apps-deployer.terradue.com/zoo-bin/zoo_loader.cgi?request=Execute&service=WPS&version=1.0.0&Identifier=GetStatus&DataInputs=sid=54975e74-fa48-11e8-a78d-0242ac11000f&RawDataOutput=Result'
In [ ]:
monitorExecution(execution)
- Check the outcome of the processing request
In [ ]:
if not execution.isSucceded():
raise Exception('Processing failed')
import os import owslib from owslib.wps import monitorExecution from owslib.wps import WebProcessingService import lxml.etree as etree import cioppy
from shapely.wkt import loads
import getpass
from nbconvert.preprocessors import ExecutePreprocessor, CellExecutionError import nbformat as nbf
from shapely.wkt import loads from shapely.geometry import Polygon from shapely.ops import cascaded_union from shapely.geometry import box
import py_earthquakes
import pandas as pd from geopandas import geopandas
In [ ]:
results_osd = execution.processOutputs[0].reference
print results_osd
In [ ]:
recast_process_id = 'dataPublication'
recast_wps_url = 'https://recast.terradue.com/t2api/ows'
wps = WebProcessingService(recast_wps_url,
verbose=False,
skip_caps=False)
recast_inputs = [('items', results_osd),
('index', data_pipeline),
('_T2ApiKey', data_pipeline_api_key),
('_T2Username', data_pipeline)]
recast_execution = wps.execute(recast_process_id,
recast_inputs,
output = [('result_osd', True)])
monitorExecution(recast_execution, sleepSecs=60)
etree.fromstring(recast_execution.processOutputs[0].data[0]).xpath('./@href')[0]
In [131]:
url = 'https://catalog.terradue.com/better-ethz-00002/search?uid=service-coverage'
In [135]:
ciop = cioppy.Cioppy()
In [137]:
wkt = ciop.search(end_point=url,
params=[],
model='GeoTime',
output_fields='wkt')[0]['wkt']
In [141]:
loads(wkt)
Self-intersection at or near point 7.96 47.484000000000002
Out[141]: