# =================================================================
#
# Authors: Tom Kralidis <tomkralidis@gmail.com>
#
# Copyright (c) 2020 Tom Kralidis
#
# Permission is hereby granted, free of charge, to any person
# obtaining a copy of this software and associated documentation
# files (the "Software"), to deal in the Software without
# restriction, including without limitation the rights to use,
# copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the
# Software is furnished to do so, subject to the following
# conditions:
#
# The above copyright notice and this permission notice shall be
# included in all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
# OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
# HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
# OTHER DEALINGS IN THE SOFTWARE.
#
# =================================================================
from collections import OrderedDict
import json
import logging
from elasticsearch import Elasticsearch, exceptions, helpers
from elasticsearch.client.indices import IndicesClient
from pygeoapi.provider.base import (BaseProvider, ProviderConnectionError,
ProviderQueryError,
ProviderItemNotFoundError)
LOGGER = logging.getLogger(__name__)
[docs]class ElasticsearchProvider(BaseProvider):
"""Elasticsearch Provider"""
def __init__(self, provider_def):
"""
Initialize object
:param provider_def: provider definition
:returns: pygeoapi.provider.elasticsearch_.ElasticsearchProvider
"""
BaseProvider.__init__(self, provider_def)
url_tokens = self.data.split('/')
LOGGER.debug('Setting Elasticsearch properties')
self.es_host = url_tokens[2]
self.index_name = url_tokens[-1]
self.is_gdal = False
LOGGER.debug('host: {}'.format(self.es_host))
LOGGER.debug('index: {}'.format(self.index_name))
LOGGER.debug('Connecting to Elasticsearch')
self.es = Elasticsearch(self.es_host)
if not self.es.ping():
msg = 'Cannot connect to Elasticsearch'
LOGGER.error(msg)
raise ProviderConnectionError(msg)
LOGGER.debug('Determining ES version')
v = self.es.info()['version']['number'][:3]
if float(v) < 7:
msg = 'only ES 7+ supported'
LOGGER.error(msg)
raise ProviderConnectionError(msg)
LOGGER.debug('Grabbing field information')
try:
self.fields = self.get_fields()
except exceptions.NotFoundError as err:
LOGGER.error(err)
raise ProviderQueryError(err)
[docs] def get_fields(self):
"""
Get provider field information (names, types)
:returns: dict of fields
"""
fields_ = {}
ic = IndicesClient(self.es)
ii = ic.get(self.index_name)
try:
p = ii[self.index_name]['mappings']['properties']['properties'] # noqa
except KeyError:
LOGGER.debug('ES index looks generated by GDAL')
self.is_gdal = True
p = ii[self.index_name]['mappings']
for k, v in p['properties'].items():
if 'type' in v:
if v['type'] == 'text':
type_ = 'string'
else:
type_ = v['type']
fields_[k] = type_
return fields_
[docs] def query(self, startindex=0, limit=10, resulttype='results',
bbox=[], datetime_=None, properties=[], sortby=[],
select_properties=[], skip_geometry=False):
"""
query Elasticsearch index
:param startindex: starting record to return (default 0)
:param limit: number of records to return (default 10)
:param resulttype: return results or hit limit (default results)
:param bbox: bounding box [minx,miny,maxx,maxy]
:param datetime_: temporal (datestamp or extent)
:param properties: list of tuples (name, value)
:param sortby: list of dicts (property, order)
:param select_properties: list of property names
:param skip_geometry: bool of whether to skip geometry (default False)
:returns: dict of 0..n GeoJSON features
"""
query = {'track_total_hits': True, 'query': {'bool': {'filter': []}}}
filter_ = []
feature_collection = {
'type': 'FeatureCollection',
'features': []
}
if resulttype == 'hits':
LOGGER.debug('hits only specified')
limit = 0
if bbox:
LOGGER.debug('processing bbox parameter')
minx, miny, maxx, maxy = bbox
bbox_filter = {
'geo_shape': {
'geometry': {
'shape': {
'type': 'envelope',
'coordinates': [[minx, maxy], [maxx, miny]]
},
'relation': 'intersects'
}
}
}
query['query']['bool']['filter'].append(bbox_filter)
if datetime_ is not None:
LOGGER.debug('processing datetime parameter')
if self.time_field is None:
LOGGER.error('time_field not enabled for collection')
raise ProviderQueryError()
time_field = self.mask_prop(self.time_field)
if '/' in datetime_: # envelope
LOGGER.debug('detected time range')
time_begin, time_end = datetime_.split('/')
range_ = {
'range': {
time_field: {
'gte': time_begin,
'lte': time_end
}
}
}
if time_begin == '..':
range_['range'][time_field].pop('gte')
elif time_end == '..':
range_['range'][time_field].pop('lte')
filter_.append(range_)
else: # time instant
LOGGER.debug('detected time instant')
filter_.append({'match': {time_field: datetime_}})
LOGGER.debug(filter_)
query['query']['bool']['filter'].append(*filter_)
if properties:
LOGGER.debug('processing properties')
for prop in properties:
pf = {
'match': {
self.mask_prop(prop[0]): prop[1]
}
}
query['query']['bool']['filter'].append(pf)
if sortby:
LOGGER.debug('processing sortby')
query['sort'] = []
for sort in sortby:
LOGGER.debug('processing sort object: {}'.format(sort))
sp = sort['property']
if self.fields[sp] == 'string':
LOGGER.debug('setting ES .raw on property')
sort_property = '{}.raw'.format(self.mask_prop(sp))
else:
sort_property = self.mask_prop(sp)
sort_order = 'asc'
if sort['order'] == 'D':
sort_order = 'desc'
sort_ = {
sort_property: {
'order': sort_order
}
}
query['sort'].append(sort_)
if self.properties or select_properties:
LOGGER.debug('including specified fields: {}'.format(
self.properties))
query['_source'] = {
'includes': list(map(self.mask_prop,
set(self.properties) | set(select_properties))) # noqa
}
query['_source']['includes'].append(self.mask_prop(self.id_field))
query['_source']['includes'].append('type')
query['_source']['includes'].append('geometry')
if skip_geometry:
LOGGER.debug('limiting to specified fields: {}'.format(
select_properties))
try:
query['_source']['excludes'] = ['geometry']
except KeyError:
query['_source'] = {'excludes': ['geometry']}
try:
LOGGER.debug('querying Elasticsearch')
LOGGER.debug(json.dumps(query, indent=4))
LOGGER.debug('Setting ES paging zero-based')
if startindex > 0:
startindex2 = startindex - 1
else:
startindex2 = startindex
if startindex2 + limit > 10000:
gen = helpers.scan(client=self.es, query=query,
preserve_order=True,
index=self.index_name)
results = {'hits': {'total': limit, 'hits': []}}
for i in range(startindex2 + limit):
try:
if i >= startindex2:
results['hits']['hits'].append(next(gen))
else:
next(gen)
except StopIteration:
break
results['hits']['total'] = \
len(results['hits']['hits']) + startindex2
else:
results = self.es.search(index=self.index_name,
from_=startindex2, size=limit,
body=query)
results['hits']['total'] = results['hits']['total']['value']
except exceptions.ConnectionError as err:
LOGGER.error(err)
raise ProviderConnectionError()
except exceptions.RequestError as err:
LOGGER.error(err)
raise ProviderQueryError()
except exceptions.NotFoundError as err:
LOGGER.error(err)
raise ProviderQueryError()
feature_collection['numberMatched'] = results['hits']['total']
if resulttype == 'hits':
return feature_collection
feature_collection['numberReturned'] = len(results['hits']['hits'])
LOGGER.debug('serializing features')
for feature in results['hits']['hits']:
feature_ = self.esdoc2geojson(feature)
feature_collection['features'].append(feature_)
return feature_collection
[docs] def get(self, identifier):
"""
Get ES document by id
:param identifier: feature id
:returns: dict of single GeoJSON feature
"""
try:
LOGGER.debug('Fetching identifier {}'.format(identifier))
result = self.es.get(self.index_name, id=identifier)
LOGGER.debug('Serializing feature')
feature_ = self.esdoc2geojson(result)
except exceptions.NotFoundError as err:
LOGGER.debug('Not found via ES id query: {}'.format(err))
LOGGER.debug('Trying via a real query')
query = {
'query': {
'bool': {
'filter': [{
'match': {
self.id_field: identifier
}
}]
}
}
}
result = self.es.search(index=self.index_name, body=query)
if len(result['hits']['hits']) == 0:
LOGGER.error(err)
raise ProviderItemNotFoundError(err)
LOGGER.debug('Serializing feature')
feature_ = self.esdoc2geojson(result['hits']['hits'][0])
except Exception as err:
LOGGER.error(err)
return None
return feature_
[docs] def esdoc2geojson(self, doc):
"""
generate GeoJSON `dict` from ES document
:param doc: `dict` of ES document
:returns: GeoJSON `dict`
"""
feature_ = {}
feature_thinned = {}
if 'properties' not in doc['_source']:
LOGGER.debug('Looks like a GDAL ES 7 document')
id_ = doc['_source'][self.id_field]
if 'type' not in doc['_source']:
feature_['id'] = id_
feature_['type'] = 'Feature'
feature_['geometry'] = doc['_source'].get('geometry')
feature_['properties'] = {}
for key, value in doc['_source'].items():
if key == 'geometry':
continue
feature_['properties'][key] = value
else:
LOGGER.debug('Looks like true GeoJSON document')
feature_ = doc['_source']
id_ = doc['_source']['properties'][self.id_field]
feature_['id'] = id_
feature_['geometry'] = doc['_source'].get('geometry')
if self.properties:
feature_thinned = {
'id': id_,
'type': feature_['type'],
'geometry': feature_.get('geometry'),
'properties': OrderedDict()
}
for p in self.properties:
try:
feature_thinned['properties'][p] = feature_['properties'][p] # noqa
except KeyError as err:
LOGGER.error(err)
raise ProviderQueryError()
if feature_thinned:
return feature_thinned
else:
return feature_
[docs] def mask_prop(self, property_name):
"""
generate property name based on ES backend setup
:param property_name: property name
:returns: masked property name
"""
if self.is_gdal:
return property_name
else:
return 'properties.{}'.format(property_name)
def __repr__(self):
return '<ElasticsearchProvider> {}'.format(self.data)