Trigger Queue example for Sentinel-2 L2A BOA ARD

This example shows an implementation for a Trigger Queue input.ipynb file, as a part of a Trigger Queue Application.

Prerequisites

In order to run this example you will need to edit the value for the key ‘value’ for the following dictionaries under the Parameters section:

  • data_pipeline
  • api_key

For the data_pipeline value, contact support@terradue.com. The api_key value can be obtained from your Profile page.

Modules

[ ]:
from owslib.etree import etree
import json
import geopandas as gp
import requests
from shapely.geometry import box
from shapely.wkt import loads
import pandas as pd
import numpy as np
import owslib
from owslib.wps import monitorExecution
from owslib.wps import WebProcessingService
import cioppy
import ellip_triggers
import sys
import os
import dateutil.parser

Service definition

[ ]:
service = dict([('title', 'Trigger queue for Sentinel-2 L2A BOA ARD'),
                ('abstract', 'Trigger queue for Sentinel-2 L2A BOA ARD'),
                ('id', 'tg-common-s2-l2a-boa-ard')])

Parameters

[ ]:
data_pipeline = dict([('id', 'data_pipeline', ),
              ('title', 'Ellip data pipeline'),
              ('abstract', 'Ellip data pipeline'),
              ('value', '<set value here>')])
[ ]:
wps_url = dict([('id', 'wps_url'),
              ('title', 'Application WPS end point URL'),
              ('abstract', 'Application WPS end point URL'),
              ('value', 'https://ec-better-apps-deployer.terradue.com/zoo/?')])
[ ]:
process_id = dict([('id', 'process_id'),
              ('title', 'Application process id'),
              ('abstract', 'Application process id'),
              ('value', 'ec_better_ewf_s2_l2a_boa_ard_ewf_s2_l2a_boa_ard_1_2')])
[ ]:
api_key = dict([('id', 'api_key'),
              ('title', 'Ellip API key for data pipeline'),
              ('abstract', 'Ellip API key for data pipeline'),
              ('value', '<set value here>')])
[ ]:
aoi = dict([('id', 'aoi'),
              ('title', 'Area of interest (bbox or WKT)'),
              ('abstract', 'Area of interest (bbox or WKT)'),
              ('value', '36.115823746000046,34.34882545500005,37.91779518100003,35.104433060000076')])
[ ]:
start_date = dict([('id', 'start_date'),
              ('title', 'Start date in ISO8601'),
              ('abstract', 'Start date in ISO8601'),
              ('value', '2019-04-10T00:00:00Z')])
[ ]:
end_date = dict([('id', 'end_date'),
              ('title', 'End date in ISO8601'),
              ('abstract', 'Start date in ISO8601'),
              ('value', '2019-04-25T23:59:59Z')])
[ ]:
update = dict([('id', 'update'),
              ('title', 'End date in ISO8601'),
              ('abstract', 'Start date in ISO8601'),
              ('value', 'void')])

Runtime parameter definition

Input references

This is the Sentinel-1 stack catalogue references

[ ]:
input_references = ('dummy')

Data pipeline information

[ ]:
data_pipeline_parameters=dict()

data_pipeline_parameters['data_pipeline'] = data_pipeline['value']
data_pipeline_parameters['username'] = data_pipeline_parameters['data_pipeline']
data_pipeline_parameters['wps_url'] = wps_url['value']
data_pipeline_parameters['process_id'] = process_id['value']
data_pipeline_parameters['api_key'] = api_key['value']

Get information about the processing service and its inputs

[ ]:
wps = WebProcessingService(data_pipeline_parameters['wps_url'], verbose=False, skip_caps=True)

wps.getcapabilities()

deployed = False

for index, elem in enumerate(wps.processes):

    if data_pipeline_parameters['process_id'] in elem.identifier:
        deployed = True
        print 'Process {0} is deployed'.format(data_pipeline_parameters['process_id'])

if not deployed:

    raise ValueError('Process not deployed')

Describe process information

[ ]:
process = wps.describeprocess(data_pipeline_parameters['process_id'])
[ ]:
print ('Title: {0}'.format(process.title))
print ('Abstract: {0}'.format(process.abstract))
[ ]:
for data_input in process.dataInputs:
    print ('Parameter identifier: {0}\n'.format(data_input.identifier))
    print ('Title/abstract: {0} - {1}\n'.format(data_input.title, data_input.abstract))

Area and time of interest analysis

[ ]:
interest = dict()
[ ]:
try:

    interest['aoi'] = box(*[float(i) for i in aoi['value'].split(',')]).wkt

except ValueError:

    interest['aoi'] = loads(aoi['value']).wkt
[ ]:
interest['start'] = start_date['value']
interest['stop'] = end_date['value']

Search for Sentinel-2 acquisitions over the AOI

[ ]:
search_params = dict()

search_params['geom'] = interest['aoi']
search_params['pt'] = 'S2MSI2A'
search_params['count'] = '150'
search_params['cc'] = '100]'

if update['value'] == 'void':

    search_params['start'] = interest['start']
    search_params['stop'] = interest['stop']

else:

    search_params['update'] = update['value']


[ ]:
ciop = cioppy.Cioppy()
[ ]:
end_point = 'https://catalog.terradue.com/sentinel2/description'



search = gp.GeoDataFrame(ciop.search(end_point=end_point,
                                      params=search_params,
                                       output_fields='self,track,enclosure,identifier,' \
                                      'wkt,startdate,enddate,updated,platform,cc',
                                       model='EOP'))
[ ]:
def analyse_search(row):

    series = dict()

    series['utm_zone'] = row['identifier'][39:41]
    series['latitude_band'] = row['identifier'][41]
    series['grid_square']  = row['identifier'][42:44]


    return pd.Series(series)
[ ]:
search = search.merge(search.apply(lambda row: analyse_search(row), axis=1),
              left_index=True,
              right_index=True)
[ ]:
search['startdate'] = pd.to_datetime(search['startdate'].apply(lambda x: dateutil.parser.parse(x)))
search['enddate'] = pd.to_datetime(search['enddate'].apply(lambda x: dateutil.parser.parse(x)))

search = search.rename(index=str, columns={'wkt': 'geometry'})
search['geometry'] = search['geometry'].apply(loads)
[ ]:
search = gp.GeoDataFrame(search)

search.crs = {'init':'epsg:4326'}
[ ]:
search.head(5)

Trigger

Instantiate a trigger:

[ ]:
trigger = ellip_triggers.Trigger(data_pipeline_parameters['data_pipeline'],
                                 data_pipeline_parameters['username'],
                                 data_pipeline_parameters['api_key'], '', '')

trigger.wps_url = data_pipeline_parameters['wps_url']
trigger.process_id = data_pipeline_parameters['process_id']

Create a function to create and queue data items:

[ ]:
def queue(row, trigger, username):

    series = dict()

    data_item = trigger.create_data_item_from_single_reference(row.self)

    title = 'Sentinel-2 BOA ARD ({0}-{1})'.format(row.startdate.strftime('%Y-%m-%dT%H:%m:%S'),
                                                                                     row.enddate.strftime('%Y-%m-%dT%H:%m:%S'))


    data_item.set_title(title)

    data_item.set_description(title)

    data_item.set_category('validation', 'Validation data item')

    data_item.processing_parameters.append(('source', row.self))
    data_item.processing_parameters.append(('_T2Username', username))

    trigger.queue(data_item)

    series['data_item_self'] = 'https://catalog.terradue.com/{0}/search?uid={1}'.format(data_pipeline_parameters['username'],
                                                                                      data_item.get_identifier())

    return pd.Series(series)
[ ]:
search = search.merge(search.apply(lambda row: queue(row, trigger, data_pipeline_parameters['username']),
                                   axis=1),
                      left_index=True,
                      right_index=True)
[ ]:
search.head(5)