Trigger Queue example for Sentinel-2 L2A BOA ARD¶
This example shows an implementation for a Trigger Queue input.ipynb file, as a part of a Trigger Queue Application.
Prerequisites¶
In order to run this example you will need to edit the value for the key ‘value’ for the following dictionaries under the Parameters section:
- data_pipeline
- api_key
For the data_pipeline value, contact support@terradue.com. The api_key value can be obtained from your Profile page.
Modules¶
[ ]:
from owslib.etree import etree
import json
import geopandas as gp
import requests
from shapely.geometry import box
from shapely.wkt import loads
import pandas as pd
import numpy as np
import owslib
from owslib.wps import monitorExecution
from owslib.wps import WebProcessingService
import cioppy
import ellip_triggers
import sys
import os
import dateutil.parser
Service definition¶
[ ]:
service = dict([('title', 'Trigger queue for Sentinel-2 L2A BOA ARD'),
('abstract', 'Trigger queue for Sentinel-2 L2A BOA ARD'),
('id', 'tg-common-s2-l2a-boa-ard')])
Parameters¶
[ ]:
data_pipeline = dict([('id', 'data_pipeline', ),
('title', 'Ellip data pipeline'),
('abstract', 'Ellip data pipeline'),
('value', '<set value here>')])
[ ]:
wps_url = dict([('id', 'wps_url'),
('title', 'Application WPS end point URL'),
('abstract', 'Application WPS end point URL'),
('value', 'https://ec-better-apps-deployer.terradue.com/zoo/?')])
[ ]:
process_id = dict([('id', 'process_id'),
('title', 'Application process id'),
('abstract', 'Application process id'),
('value', 'ec_better_ewf_s2_l2a_boa_ard_ewf_s2_l2a_boa_ard_1_2')])
[ ]:
api_key = dict([('id', 'api_key'),
('title', 'Ellip API key for data pipeline'),
('abstract', 'Ellip API key for data pipeline'),
('value', '<set value here>')])
[ ]:
aoi = dict([('id', 'aoi'),
('title', 'Area of interest (bbox or WKT)'),
('abstract', 'Area of interest (bbox or WKT)'),
('value', '36.115823746000046,34.34882545500005,37.91779518100003,35.104433060000076')])
[ ]:
start_date = dict([('id', 'start_date'),
('title', 'Start date in ISO8601'),
('abstract', 'Start date in ISO8601'),
('value', '2019-04-10T00:00:00Z')])
[ ]:
end_date = dict([('id', 'end_date'),
('title', 'End date in ISO8601'),
('abstract', 'Start date in ISO8601'),
('value', '2019-04-25T23:59:59Z')])
[ ]:
update = dict([('id', 'update'),
('title', 'End date in ISO8601'),
('abstract', 'Start date in ISO8601'),
('value', 'void')])
Runtime parameter definition¶
Input references
This is the Sentinel-1 stack catalogue references
[ ]:
input_references = ('dummy')
Data pipeline information¶
[ ]:
data_pipeline_parameters=dict()
data_pipeline_parameters['data_pipeline'] = data_pipeline['value']
data_pipeline_parameters['username'] = data_pipeline_parameters['data_pipeline']
data_pipeline_parameters['wps_url'] = wps_url['value']
data_pipeline_parameters['process_id'] = process_id['value']
data_pipeline_parameters['api_key'] = api_key['value']
Get information about the processing service and its inputs¶
[ ]:
wps = WebProcessingService(data_pipeline_parameters['wps_url'], verbose=False, skip_caps=True)
wps.getcapabilities()
deployed = False
for index, elem in enumerate(wps.processes):
if data_pipeline_parameters['process_id'] in elem.identifier:
deployed = True
print 'Process {0} is deployed'.format(data_pipeline_parameters['process_id'])
if not deployed:
raise ValueError('Process not deployed')
Describe process information
[ ]:
process = wps.describeprocess(data_pipeline_parameters['process_id'])
[ ]:
print ('Title: {0}'.format(process.title))
print ('Abstract: {0}'.format(process.abstract))
[ ]:
for data_input in process.dataInputs:
print ('Parameter identifier: {0}\n'.format(data_input.identifier))
print ('Title/abstract: {0} - {1}\n'.format(data_input.title, data_input.abstract))
Area and time of interest analysis¶
[ ]:
interest = dict()
[ ]:
try:
interest['aoi'] = box(*[float(i) for i in aoi['value'].split(',')]).wkt
except ValueError:
interest['aoi'] = loads(aoi['value']).wkt
[ ]:
interest['start'] = start_date['value']
interest['stop'] = end_date['value']
Search for Sentinel-2 acquisitions over the AOI¶
[ ]:
search_params = dict()
search_params['geom'] = interest['aoi']
search_params['pt'] = 'S2MSI2A'
search_params['count'] = '150'
search_params['cc'] = '100]'
if update['value'] == 'void':
search_params['start'] = interest['start']
search_params['stop'] = interest['stop']
else:
search_params['update'] = update['value']
[ ]:
ciop = cioppy.Cioppy()
[ ]:
end_point = 'https://catalog.terradue.com/sentinel2/description'
search = gp.GeoDataFrame(ciop.search(end_point=end_point,
params=search_params,
output_fields='self,track,enclosure,identifier,' \
'wkt,startdate,enddate,updated,platform,cc',
model='EOP'))
[ ]:
def analyse_search(row):
series = dict()
series['utm_zone'] = row['identifier'][39:41]
series['latitude_band'] = row['identifier'][41]
series['grid_square'] = row['identifier'][42:44]
return pd.Series(series)
[ ]:
search = search.merge(search.apply(lambda row: analyse_search(row), axis=1),
left_index=True,
right_index=True)
[ ]:
search['startdate'] = pd.to_datetime(search['startdate'].apply(lambda x: dateutil.parser.parse(x)))
search['enddate'] = pd.to_datetime(search['enddate'].apply(lambda x: dateutil.parser.parse(x)))
search = search.rename(index=str, columns={'wkt': 'geometry'})
search['geometry'] = search['geometry'].apply(loads)
[ ]:
search = gp.GeoDataFrame(search)
search.crs = {'init':'epsg:4326'}
[ ]:
search.head(5)
Trigger¶
Instantiate a trigger:
[ ]:
trigger = ellip_triggers.Trigger(data_pipeline_parameters['data_pipeline'],
data_pipeline_parameters['username'],
data_pipeline_parameters['api_key'], '', '')
trigger.wps_url = data_pipeline_parameters['wps_url']
trigger.process_id = data_pipeline_parameters['process_id']
Create a function to create and queue data items:
[ ]:
def queue(row, trigger, username):
series = dict()
data_item = trigger.create_data_item_from_single_reference(row.self)
title = 'Sentinel-2 BOA ARD ({0}-{1})'.format(row.startdate.strftime('%Y-%m-%dT%H:%m:%S'),
row.enddate.strftime('%Y-%m-%dT%H:%m:%S'))
data_item.set_title(title)
data_item.set_description(title)
data_item.set_category('validation', 'Validation data item')
data_item.processing_parameters.append(('source', row.self))
data_item.processing_parameters.append(('_T2Username', username))
trigger.queue(data_item)
series['data_item_self'] = 'https://catalog.terradue.com/{0}/search?uid={1}'.format(data_pipeline_parameters['username'],
data_item.get_identifier())
return pd.Series(series)
[ ]:
search = search.merge(search.apply(lambda row: queue(row, trigger, data_pipeline_parameters['username']),
axis=1),
left_index=True,
right_index=True)
[ ]:
search.head(5)