Source code for eodms_rapi.geo

##############################################################################
# MIT License
# 
# Copyright (c) His Majesty the King in Right of Canada, as
# represented by the Minister of Natural Resources, 2023
# 
# Permission is hereby granted, free of charge, to any person obtaining a 
# copy of this software and associated documentation files (the "Software"), 
# to deal in the Software without restriction, including without limitation 
# the rights to use, copy, modify, merge, publish, distribute, sublicense, 
# and/or sell copies of the Software, and to permit persons to whom the 
# Software is furnished to do so, subject to the following conditions:
# 
# The above copyright notice and this permission notice shall be included in 
# all copies or substantial portions of the Software.
# 
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING 
# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER 
# DEALINGS IN THE SOFTWARE.
# 
##############################################################################

import os
# import sys
import re
from xml.etree import ElementTree
import json
import logging
# import traceback
from geomet import wkt
from warnings import warn

# import decimal

try:
    import osgeo.ogr as ogr
    import osgeo.osr as osr

    GDAL_INSTALLED = True
except ImportError:
    try:
        import ogr
        import osr

        GDAL_INSTALLED = True
    except ImportError:
        GDAL_INSTALLED = False


# try:
# import ogr
# import osr
# GDAL_INSTALLED = True
# except ImportError:
# GDAL_INSTALLED = False

# try:
# import geojson
# GEOJSON_INSTALLED = True
# except ImportError:
# GEOJSON_INSTALLED = False

[docs]class EODMSGeo: """ The Geo class contains all the methods and functions used to perform geographic processes mainly using OGR. """ def __init__(self, eodmsrapi=None): """ Initializer for the Geo object. :param eodmsrapi: The parent EODMSRAPI object. :type eodmsrapi: eodms.EODMSRAPI """ self.aoi = None self.logger = logging.getLogger('EODMSRAPI') self.wkt_types = ['point', 'linestring', 'polygon', 'multipoint', 'multilinestring', 'multipolygon'] self.eodmsrapi = eodmsrapi self.logger = eodmsrapi.logger self.feats = None ############################################################### # Backwards compatibility methods ###############################################################
[docs] def convert_imageGeom(self, coords, output='array'): warn("Method 'convert_imageGeom' is deprecated. Please use " "'convert_image_geom'.", DeprecationWarning, stacklevel=2) return self.convert_image_geom(coords, output)
[docs] def convert_toWKT(self, in_feat, in_type): warn("Method 'convert_toWKT' is deprecated. Please use " "'convert_to_wkt'.", DeprecationWarning, stacklevel=2) return self.convert_to_wkt(in_feat, in_type)
[docs] def convert_toGeoJSON(self, results, output='FeatureCollection'): warn("Method 'convert_toGeoJSON' is deprecated. Please use " "'convert_to_geojson'.", DeprecationWarning, stacklevel=2) return self.convert_to_geojson(results, output)
############################################################### def _check_ogr(self): """ There is another ogr Python package. This will check if it was imported instead of the proper ogr. """ if ogr.__doc__ is not None and \ ogr.__doc__.find("Module providing one api for multiple git " "services") > -1: if self.eodmsrapi is not None: msg = "Another package named 'ogr' is installed." self.eodmsrapi.log_msg(msg, 'warning') return False return True def _convert_list(self, in_feat, out='wkt'): """ Converts a list to a specified output. :param in_feat: The input feature(s). :type in_feat: list :param out: The type of output, either 'wkt' or 'json'. :type out: str :return: The converted feature(s) to the specified output. :rtype: json or str """ pnts = [list(p) for p in in_feat] if len(pnts) == 1: geojson = {"type": "Point", "coordinates": pnts[0]} else: geojson = {"type": "Polygon", "coordinates": [pnts]} if out == 'json': return geojson else: out_wkt = wkt.dumps(geojson) return out_wkt def _is_wkt(self, in_feat, show_error=False, return_wkt=False): """ Verifies if a string is WKT. :param in_feat: Input string containing a WKT. :type in_feat: str :param show_error: Determines whether to display the error. :type show_error: boolean :param return_wkt: Determines whether to return the converted WKT if True. :type return_wkt: boolean :return: If the input is a valid WKT, return WKT if return_wkt is True or return just True; False if not valid. :rtype: str or boolean """ try: wkt_val = wkt.loads(in_feat.upper()) except (ValueError, TypeError) as e: if show_error: if self.eodmsrapi is not None: self.eodmsrapi.log_msg(str(e), 'warning') return False return wkt_val if return_wkt else True def _remove_zero_trail(self, in_wkt): """ Removes the trailing zeros in coordinates after the decimal in a WKT. :param in_wkt: The input WKT. :type in_wkt: str :return: The WKT without trailing zeros after the decimal. :rtype: str """ numbers = re.findall("\d+\.\d+", in_wkt) out_wkt = in_wkt for num in numbers: flt_num = float(num) out_wkt = out_wkt.replace(num, str(flt_num)) return out_wkt def _split_multi(self, feats, in_type='json', out='wkt'): """ Splits multi-geometry into several valid geometry for the RAPI. :param feats: The input feature(s). :type feats: json, str or list :param in_type: Helps determine the type of input (either 'wkt', 'json' or 'list'). :type in_type: str :param out: Determines the type of output (either 'wkt' or 'json'). :type out: str :return: The geometry (or geometries) in the specified output type. :rtype: json, str """ if in_type == 'wkt': # Convert feats to json for easier manipulation json_geom = self._is_wkt(feats, True, True) elif in_type == 'list': json_geom = self._convert_list(feats, 'json') else: json_geom = feats geom_type = json_geom.get('type').lower() if geom_type.find('multi') > -1: feat_coords = json_geom.get('coordinates') out_geom = [] if geom_type == 'multipoint': for pnt in feat_coords: geom = {'type': 'Point', 'coordinates': pnt} out_geom.append(geom) elif geom_type == 'multilinestring': for line in feat_coords: geom = {'type': 'LineString', 'coordinates': line} out_geom.append(geom) elif geom_type == 'multipolygon': for poly in feat_coords: geom = {'type': 'Polygon', 'coordinates': poly} out_geom.append(geom) else: out_geom = json_geom if out == 'wkt': out_feats = [] if isinstance(out_geom, list): for g in out_geom: wkt_feat = self.convert_to_wkt(g, 'json') out_feats.append(wkt_feat) else: out_feats = self.convert_to_wkt(out_geom, 'json') else: out_feats = out_geom return out_feats
[docs] def add_geom(self, in_src): """ Processes the source and converts it for use in the RAPI. :param in_src: The in_src can either be: - a filename (ESRI Shapefile, KML, GML or GeoJSON) of multiple features - a WKT format string of a single feature - the 'geometry' entry from a GeoJSON Feature - a list of coordinates (ex: ``[(x1, y1), (x2, y2), ...]``) :type in_src: str :return: A string of the WKT of the feature. :rtype: str """ if in_src is None: return None # If the source is in JSON format if self.eodmsrapi is not None: if self.eodmsrapi.is_json(in_src): in_src = json.loads(in_src) if isinstance(in_src, dict): # self.feats = self.convert_toWKT(in_src, 'json') self.feats = self._split_multi(in_src, 'json') return self.feats if isinstance(in_src, list): # self.feats = self.convert_toWKT(in_src, 'list') self.feats = self._split_multi(in_src, 'list') return self.feats # If the source is a file if os.path.isfile(in_src): self.feats = self.get_features(in_src) return self.feats if os.path.isdir(in_src): return None # print("in_src: %s" % in_src) if in_src.find('(') > -1 and in_src.find(')') > -1: if self._is_wkt(in_src, True): # Can only be a single WKT object self.feats = self._split_multi(in_src, 'wkt') return self.feats # If the source is a list of coordinates print(f"in_src: {in_src}") if not isinstance(in_src, list): try: eval(in_src) except SyntaxError as err: self.logger.warning(f"{err}") return err
[docs] def convert_coords(self, coord_lst, geom_type): """ Converts a list of points to GeoJSON format. :param coord_lst: A list of points. :type coord_lst: list :param geom_type: The type of geometry, either 'Point', 'LineString' or 'Polygon'. :type geom_type: str :return: A dictionary in the GeoJSON format. :rtype: dict """ pnts_array = [] pnts = None for c in coord_lst: pnts = [ p.strip('\n').strip('\t').split(',') for p in c.split(' ') if p.strip('\n').strip('\t') != '' ] pnts_array += pnts if pnts is None: return None if geom_type == 'LineString': return { 'type': 'LineString', 'coordinates': [[float(p[0]), float(p[1])] for p in pnts], } elif geom_type == 'Point': return { 'type': 'Point', 'coordinates': [float(pnts[0][0]), float(pnts[0][1])], } else: return { 'type': 'Polygon', 'coordinates': [[[float(p[0]), float(p[1])] for p in pnts]], }
[docs] def convert_image_geom(self, coords, output='array'): """ Converts a list of coordinates from the RAPI to a polygon geometry, array of points or as WKT. :param coords: A list of coordinates from the RAPI results. :type coords: list :param output: The type of return, can be 'array', 'wkt' or 'geom'. :type output: str :return: Either a polygon geometry, WKT or array of points. :rtype: multiple types """ if isinstance(coords, dict): if 'coordinates' in coords.keys(): val = coords['coordinates'] level = 0 while isinstance(val, list): val = val[0] level += 1 lst_level = level - 2 if lst_level > -1: pnt_array = eval("coords['coordinates']" + '[0]' * lst_level) else: pnt_array = coords['coordinates'] else: self.logger.warning("No coordinates provided.") return None else: pnt_array = coords[0] if GDAL_INSTALLED: if not self._check_ogr(): if self.eodmsrapi is not None: self.eodmsrapi.log_msg("Cannot convert geometry.", 'warning') return None # Create ring ring = ogr.Geometry(ogr.wkbLinearRing) # Get the points from the coordinates list pnt1 = pnt_array[0] ring.AddPoint(pnt1[0], pnt1[1]) pnt2 = pnt_array[1] ring.AddPoint(pnt2[0], pnt2[1]) pnt3 = pnt_array[2] ring.AddPoint(pnt3[0], pnt3[1]) pnt4 = pnt_array[3] ring.AddPoint(pnt4[0], pnt4[1]) ring.AddPoint(pnt1[0], pnt1[1]) # Create polygon poly = ogr.Geometry(ogr.wkbPolygon) poly.AddGeometry(ring) # Send specified output if output == 'wkt': return poly.ExportToWkt() elif output == 'geom': return poly else: return pnt_array elif output == 'wkt': # Convert values in point array to strings pnt_array = [[str(p[0]), str(p[1])] for p in pnt_array] return "POLYGON ((%s))" % ', '.join([' '.join(pnt) for pnt in pnt_array]) else: return pnt_array
# def convert_fromWKT(self, in_feat): # """ # Converts a WKT to a GDAL geometry. # :param in_feat: The WKT of the feature. # :type in_feat: str # :return: The polygon geometry of the input WKT. # :rtype: ogr.Geometry # """ # if GDAL_INSTALLED: # out_poly = ogr.CreateGeometryFromWkt(in_feat) # return out_poly
[docs] def convert_to_wkt(self, in_feat, in_type): """ Converts a feature into WKT format. :param in_feat: The input feature, either as a GeoJSON dictionary or list of points. :type in_feat: dict or list :param in_type: The type of the input, whether it's 'json', 'list' or 'file'. :type in_type: str :return: The input feature converted to WKT. :rtype: str """ out_wkt = None if in_type == 'json': out_wkt = wkt.dumps(in_feat) elif in_type == 'list': out_wkt = self._convert_list(in_feat) elif in_type == 'file': return self.get_features(in_feat) out_wkt = self._remove_zero_trail(out_wkt) return out_wkt
[docs] def convert_to_geojson(self, results, output='FeatureCollection'): """ Converts RAPI results to GeoJSON geometries. :param results: A list of results from the RAPI. :type results: list :param output: The output of the results (either 'FeatureCollection' or 'list' for a list of features in geojson) :type output: str :return: A dictionary of a GeoJSON FeatureCollection. :rtype: dict """ if isinstance(results, dict): results = [results] features = [] for rec in results: if self.eodmsrapi is None: return None geom = rec.get(self.eodmsrapi.get_conv('geometry')) props = self.eodmsrapi.parse_metadata(rec) feat = {"type": "Feature", "geometry": geom, "properties": props} features.append(feat) if output == 'list': return features return {"type": "FeatureCollection", "features": features}
[docs] def process_polygon(self, geom, t_crs): # Convert the geometry to WGS84 s_crs = geom.GetSpatialReference() if s_crs is None: s_crs = osr.SpatialReference() s_crs.ImportFromEPSG(4326) # Get the EPSG codes from the spatial references epsg_scrs = s_crs.GetAttrValue("AUTHORITY", 1) epsg_tcrs = t_crs.GetAttrValue("AUTHORITY", 1) if str(epsg_scrs) != '4326': if epsg_tcrs is None: print("\nCannot reproject AOI.") return None if not s_crs.IsSame(t_crs) and epsg_scrs != epsg_tcrs: # Create the CoordinateTransformation print("\nReprojecting input AOI...") coord_trans = osr.CoordinateTransformation(s_crs, t_crs) geom.Transform(coord_trans) # Reverse x and y of transformed geometry ring = geom.GetGeometryRef(0) for i in range(ring.GetPointCount()): ring.SetPoint(i, ring.GetY(i), ring.GetX(i)) # Convert multipolygon to polygon (if applicable) if geom.GetGeometryType() == 6: geom = geom.UnionCascaded() # Convert to WKT return geom.ExportToWkt()
[docs] def get_features(self, in_src): """ Extracts the features from an AOI file. :param in_src: The input filename of the AOI file. Can either be a GML, KML, GeoJSON, or Shapefile. :type in_src: str :return: The AOI in WKT format. :rtype: str """ # print() out_feats = [] if GDAL_INSTALLED: # There is another ogr Python package that might have been imported # Check if its the wrong ogr if not self._check_ogr(): msg = "Cannot import feature using OGR." if self.eodmsrapi is not None: self.eodmsrapi.log_msg(msg, 'warning') return None # Determine the OGR driver of the input AOI if in_src.find('.gml') > -1: ogr_driver = 'GML' elif in_src.find('.kml') > -1: ogr_driver = 'KML' elif in_src.find('.json') > -1 or in_src.find('.geojson') > -1: ogr_driver = 'GeoJSON' elif in_src.find('.shp') > -1: ogr_driver = 'ESRI Shapefile' else: self.logger.error("The AOI file type could not be determined.") return None # Open AOI file and extract AOI driver = ogr.GetDriverByName(ogr_driver) ds = driver.Open(in_src, 0) # Get the layer from the file lyr = ds.GetLayer() # Set the target spatial reference to WGS84 t_crs = osr.SpatialReference() t_crs.ImportFromEPSG(4326) for feat in lyr: # Create the geometry geom = feat.GetGeometryRef() if geom.GetGeometryName() == 'MULTIPOLYGON': out_feats.extend(self.process_polygon(geom_part, t_crs) for geom_part in geom) else: out_feats.append(self.process_polygon(geom, t_crs)) elif in_src.find('.gml') > -1 or in_src.find('.kml') > -1: with open(in_src, 'rt') as f: tree = ElementTree.parse(f) root = tree.getroot() geom_choices = ['Point', 'LineString', 'Polygon'] if in_src.find('.gml') > -1: for feat in root.findall('.//{http://www.opengis.net/' 'gml}featureMember'): # Get geometry type geom_type = 'Polygon' for elem in feat.findall('*'): tag = elem.tag.replace('{http://ogr.maptools.' 'org/}', '') if tag in geom_choices: geom_type = tag coord_lst = [ coords.text for coords in root.findall( './/{http://www.opengis' '.net/gml}coordinates' ) ] json_geom = self.convert_coords(coord_lst, geom_type) wkt_feat = self._split_multi(json_geom) if isinstance(wkt_feat, list): out_feats += wkt_feat else: out_feats.append(wkt_feat) else: for plcmark in root.findall('.//{http://www.opengis.net/' 'kml/2.2}Placemark'): # Get geometry type geom_type = 'Polygon' for elem in plcmark.findall('*'): tag = elem.tag.replace('{http://www.opengis.net/' 'kml/2.2}', '') if tag in geom_choices: geom_type = tag coord_lst = [ coords.text for coords in plcmark.findall( './/{http://www.opengis.net/kml/2.2}' 'coordinates' ) ] json_geom = self.convert_coords(coord_lst, geom_type) wkt_feat = self._split_multi(json_geom) if isinstance(wkt_feat, list): out_feats += wkt_feat else: out_feats.append(wkt_feat) elif in_src.find('.json') > -1 or in_src.find('.geojson') > -1: with open(in_src) as f: data = json.load(f) feats = data['features'] for f in feats: wkt_feat = self._split_multi(f['geometry']) if isinstance(wkt_feat, list): out_feats += wkt_feat else: out_feats.append(wkt_feat) elif in_src.find('.shp') > -1: msg = "Could not open shapefile. The GDAL Python Package " \ "must be installed to use shapefiles." self.logger.warning(msg) return None return out_feats