Source code for promis.geo.collection

"""This module contains a class for handling a collection of spatially referenced data."""

#
# Copyright (c) Simon Kohaut, Honda Research Institute Europe GmbH
#
# This file is part of ProMis and licensed under the BSD 3-Clause License.
# You should have received a copy of the BSD 3-Clause License along with ProMis.
# If not, see https://opensource.org/license/bsd-3-clause/.
#

# Standard Library
from abc import ABC
from collections.abc import Callable
from copy import deepcopy
from pickle import dump, load
from typing import Any

# Third Party
import smopy
from matplotlib import pyplot as plt
from numpy import (
    argsort,
    array,
    atleast_2d,
    concatenate,
    histogram,
    isnan,
    ndarray,
    repeat,
    sort,
    unique,
    zeros,
)
from numpy.linalg import norm
from numpy.typing import NDArray
from pandas import DataFrame, concat
from scipy.interpolate import CloughTocher2DInterpolator, LinearNDInterpolator, NearestNDInterpolator
from scipy.spatial import distance_matrix
from scipy.stats import entropy as shannon_entropy
from scipy.stats.qmc import LatinHypercube, scale
from sklearn.cluster import AgglomerativeClustering
from sklearn.neighbors import NearestNeighbors

# ProMis
from promis.geo.location import CartesianLocation, PolarLocation
from promis.models import GaussianProcess


[docs] class Collection(ABC): """A collection of values over a polar or Cartesian space. Locations are stored as Cartesian coordinates, but data can be unpacked into both polar and Cartesian frames. Args: origin: The polar coordinates of this collection's Cartesian frame's center data: A list of Cartesian location and value pairs """ def __init__( self, columns: list[str], origin: PolarLocation, number_of_values: int = 1 ) -> None: # Attributes setup self.number_of_values = number_of_values self.origin = origin self.basemap = None # Initialize the data frame self.data = DataFrame(columns=columns)
[docs] @classmethod def make_latin_hypercube( cls, origin: PolarLocation, width: float, height: float, number_of_samples: int, number_of_values: int = 1, include_corners: bool = False ) -> "Collection": samples = LatinHypercube(d=2).random(n=number_of_samples) samples = scale(samples, [-width / 2, -height / 2], [width / 2, height / 2]) collection = cls(origin, number_of_values) collection.append_with_default(samples, 0.0) if include_corners: collection.append_with_default( array([ [-width / 2, -height / 2], [-width / 2, height / 2], [width / 2, -height / 2], [width / 2, height / 2] ]), 0.0 ) return collection
[docs] @staticmethod def load(path) -> "Collection": with open(path, "rb") as file: return load(file)
[docs] def save(self, path: str): with open(path, "wb") as file: dump(self, file)
[docs] def clear(self): """Empties out the kept data.""" self.data = self.data.iloc[0:0]
[docs] def extent(self) -> tuple[float, float, float, float]: """Get the extent of this collection, i.e., the min and max coordinates. Returns: The minimum and maximum coordinates in order ``west, east, south, north`` """ # TODO this might fail near the international date line for polar coordinates west = min(self.data[self.data.columns[0]]) east = max(self.data[self.data.columns[0]]) south = min(self.data[self.data.columns[1]]) north = max(self.data[self.data.columns[1]]) return west, east, south, north
[docs] def values(self) -> NDArray[Any]: """Unpack the location values as numpy array. Returns: The values of this Collection as numpy array """ value_columns = self.data.columns[2:] return self.data[value_columns].to_numpy()
[docs] def coordinates(self) -> NDArray[Any]: """Unpack the location coordinates as numpy array. Returns: The values of this Collection as numpy array """ location_columns = self.data.columns[:2] return self.data[location_columns].to_numpy()
[docs] def to_csv(self, path: str, mode: str = "w"): """Saves the collection as comma-separated values file. Args: path: The path with filename to write to mode: The writing mode, one of {w, x, a} """ self.data.to_csv(path, mode=mode, index=False, float_format="%f")
[docs] def append( self, coordinates: NDArray[Any] | list[PolarLocation | CartesianLocation], values: NDArray[Any], ): """Append location and associated value vectors to collection. Args: coordinates: A list of locations to append or matrix of coordinates values: The associated values as 2D matrix, each row belongs to a single location """ assert len(coordinates) == values.shape[0], ( f"Number of coordinates and values mismatch, got {len(coordinates)} and {values.shape[0]}" ) if isinstance(coordinates, ndarray): new_entries = concatenate([coordinates, values], axis=1) else: new_entries = concatenate( [array([[location.x, location.y] for location in coordinates]), values], axis=1 ) if self.data.empty: self.data = DataFrame(new_entries, columns=self.data.columns) else: self.data = concat( [self.data, DataFrame(new_entries, columns=self.data.columns)], ignore_index=True ) # Reset basemap since new data is added self.basemap = None
[docs] def append_with_default( self, coordinates: NDArray[Any] | list[PolarLocation | CartesianLocation], value: NDArray[Any], ): """Append location with a default value. Args: coordinates: A list of locations to append or matrix of coordinates values: The default value to assign to all locations """ self.append( atleast_2d(coordinates), repeat(atleast_2d(value), len(coordinates), axis=0) )
[docs] def get_basemap(self, zoom=16): """Obtain the OSM basemap image of the collection's area. Args: zoom: The zoom level requested from OSM Returns: The basemap image """ # Would cause circular import if done at module scope from promis.loaders import OsmLoader # Get OpenStreetMap and crop to relevant area south, west, north, east = OsmLoader.compute_polar_bounding_box( self.origin, self.dimensions ) map = smopy.Map((south, west, north, east), z=zoom) left, bottom = map.to_pixels(south, west) right, top = map.to_pixels(north, east) region = map.img.crop((left, top, right, bottom)) return region
[docs] def get_nearest_coordinate(self, point: NDArray) -> NDArray: """Get the closest coordinate in this collection relative to a given point. Args: point: The point to find the nearest coordinate to Returns: The coordinate that is closest to the given point """ nearest = min(self.coordinates(), key=lambda node: norm(point - array(node))) return nearest
[docs] def get_distance_to(self, other: "Collection") -> NDArray: """Computes the distances from this collection to another. Args: other: The other collection to compute the distance to Returns: An array that contains the distance to the respectively closest point in the other collection """ own_coordinates = self.coordinates() other_coordinates = other.coordinates() return distance_matrix(other_coordinates, own_coordinates).min(axis=1)
[docs] def improve( self, candidate_sampler: Callable, value_function: Callable, number_of_iterations: int, number_of_improvement_points: int, scaler: float, value_index: int = 0, acquisition_method: str = "entropy", ) -> None: """Automatic improvement of the collection via informed sampling. Args: candidate_sampler: A sampler that provides a candidate Collection that may be used to improve this collection value_function: The function to get new values from for the chosen points number_of_iterations: The number of iterations to run the improvement for number_of_improvement_points: The number of samples to take at every iteration (will be multiplied by number of value columns) scaler: How much to scale the entropy or standard deviation within the score relative to the distance acquisition_method: The scoreing method to use, one of {entropy, gaussian_process} """ # Get minimum distances to candidates and get nearest neigbours index # candidate_coordinates = candidate_collection.coordinates() if acquisition_method == "gaussian_process": gp = GaussianProcess() gp.fit(self.coordinates(), self.values()[:, value_index, None], 50) # Repeat improvement for specified number of iterations for _ in range(number_of_iterations): candidate_collection = candidate_sampler() candidate_coordinates = candidate_collection.coordinates() distances = self.get_distance_to(candidate_collection) nn = NearestNeighbors(n_neighbors=1).fit(self.coordinates()) _, indices = nn.kneighbors(candidate_coordinates) # We decide score based on chosen method if acquisition_method == "entropy": entropy = self.get_entropy(value_index=value_index) distances_norm = (distances - distances.min()) / (distances.max() - distances.min()) entropies_norm = (entropy - entropy.min()) / (entropy.max() - entropy.min()) score = distances_norm * (1 + scaler * entropies_norm[indices.flatten()]) elif acquisition_method == "gaussian_process": gp.fit(self.coordinates(), self.values()[:, value_index, None], 10) std = gp.predict(candidate_coordinates, return_std=True)[1][:, 0] distances_norm = (distances - distances.min()) / (distances.max() - distances.min()) std_norm = (std - std.min()) / (std.max() - std.min()) score = distances_norm * (1 + scaler * std_norm) else: raise NotImplementedError( f'Requested unknown acquisition method "{acquisition_method}" from Collection' ) # Decide next points to sample at top_candidate_indices = argsort(score)[-number_of_improvement_points:][::-1] next_points = atleast_2d(candidate_coordinates[top_candidate_indices]) # Compute new samples and append to collection next_values = value_function(next_points) self.append(next_points, next_values)
[docs] def get_entropy( self, number_of_neighbours: int = 4, number_of_bins: int = 10, value_index: int = 0 ) -> NDArray: """Compute the local entropy in the collection. Args: number_of_neighbours: The number of neighbours of a point to take into account number_of_bins: The number of bins to be used for the histogram value_index: Decides which value of the collection the entropy is computed from Returns: The local entropy for each point """ coordinates = self.coordinates() values = self.values()[:, value_index] nn = NearestNeighbors(n_neighbors=number_of_neighbours + 1).fit(coordinates) _, indices = nn.kneighbors(coordinates) entropies = zeros(coordinates.shape[0]) for i, neighbors in enumerate(indices): neighbor_values = values[neighbors[1:]] hist, _ = histogram( neighbor_values, bins=number_of_bins, range=(min(values), max(values)), density=True ) hist += 1e-12 hist /= sum(hist) entropies[i] = shannon_entropy(hist) return entropies
[docs] def scatter( self, value_index: int = 0, plot_basemap=True, ax=None, zoom=16, **kwargs ): """Create a scatterplot of this Collection. Args: value_index: Which value of the plot_basemap: Whether an OpenStreetMap tile shall be rendered below ax: The axis to plot to, default pyplot context if None zoom: The zoom level of the OSM basemap, default 16 **kwargs: Args passed to the matplotlib scatter function """ # Would cause circular import if done at module scope # Either render with given axis or default context if ax is None: ax = plt.gca() # Render base map if plot_basemap: if self.basemap is None: self.basemap = self.get_basemap(zoom) ax.imshow(self.basemap, extent=self.extent()) # Scatter collection data coordinates = self.coordinates() colors = self.values()[:, value_index].ravel() return ax.scatter(coordinates[:, 0], coordinates[:, 1], c=colors, **kwargs)
[docs] class CartesianCollection(Collection): def __init__(self, origin: PolarLocation, number_of_values: int = 1): super().__init__(CartesianCollection._columns(number_of_values), origin, number_of_values) @staticmethod def _columns(number_of_values: int) -> list[str]: return ["east", "north"] + [f"v{i}" for i in range(number_of_values)] @property def dimensions(self) -> tuple[float, float]: """Get the dimensions of this Collection in meters. Returns: The dimensions of this Collection in meters as ``(width, height)``. """ west, east, south, north = self.extent() return east - west, north - south
[docs] def to_cartesian_locations(self) -> list[CartesianLocation]: coordinates = self.coordinates() locations = [] for i in range(coordinates.shape[0]): locations.append(CartesianLocation(east=coordinates[i, 0], north=coordinates[i, 1])) return locations
[docs] def to_polar(self) -> "PolarCollection": # Apply the inverse projection of the origin location longitudes, latitudes = self.origin.projection( self.data["east"].to_numpy(), self.data["north"].to_numpy(), inverse=True ) # Create the new collection in polar coordinates polar_collection = PolarCollection(self.origin, len(self.data.columns) - 2) polar_collection.data["longitude"] = longitudes polar_collection.data["latitude"] = latitudes # Copy over the values for i in range(len(self.data.columns) - 2): polar_collection.data[f"v{i}"] = self.data[f"v{i}"] return polar_collection
[docs] def into( self, other: "Collection", interpolation_method: str = "linear", in_place: bool = True ) -> "Collection": if in_place: target = other else: target = deepcopy(other) # Expand or reduce target to take up the same number of value columns while len(target.data.columns) < len(self.data.columns): target.data[f"v{len(target.data.columns) - 2}"] = 0.0 while len(target.data.columns) > len(self.data.columns): del target.data[f"v{len(target.data.columns) - 3}"] interpolated = self.get_interpolator(interpolation_method)(target.coordinates()) target.data.iloc[:, 2:] = atleast_2d(interpolated) return target
[docs] def get_interpolator(self, interpolation_method: str = "linear") -> Any: """Get an interpolator for the data. Args: interpolation_method: The interpolation method to use, one of {linear, nearest, hybrid, gaussian_process} Returns: A callable interpolator function """ # Create interpolator match interpolation_method: case "linear": return LinearNDInterpolator(self.coordinates(), self.values()) case "nearest": return NearestNDInterpolator(self.coordinates(), self.values()) case "hybrid": return HybridInterpolator(self.coordinates(), self.values()) case "gaussian_process": gp = GaussianProcess() gp.fit(self.coordinates(), self.values()) return gp case "clough-tocher": return CloughTocher2DInterpolator(self.coordinates(), self.values()) case _: raise NotImplementedError( f'Interpolation method "{interpolation_method}" is not implemented for Collection' )
[docs] def prune( self, threshold: float, ): coordinates = self.coordinates() clusters = AgglomerativeClustering(n_clusters=None, distance_threshold=threshold).fit( coordinates ) pruning_index = sort(unique(clusters.labels_, return_index=True)[1]) pruned_coordinates = coordinates[pruning_index] pruned_values = self.values()[pruning_index] self.clear() self.append(pruned_coordinates, pruned_values)
[docs] class PolarCollection(Collection): def __init__(self, origin: PolarLocation, number_of_values: int = 1): super().__init__(PolarCollection._columns(number_of_values), origin, number_of_values) @staticmethod def _columns(number_of_values: int) -> list[str]: return ["longitude", "latitude"] + [f"v{i}" for i in range(number_of_values)] @property def dimensions(self) -> tuple[float, float]: """Get the dimensions of this Collection in meters. Returns: The dimensions of this Collection in meters as ``(width, height)``. """ return self.to_cartesian().dimensions
[docs] def to_polar_locations(self) -> list[PolarLocation]: coordinates = self.coordinates() locations = [] for i in range(coordinates.shape[0]): locations.append(PolarLocation(longitude=coordinates[i, 0], latitude=coordinates[i, 1])) return locations
[docs] def to_cartesian(self) -> CartesianCollection: # Apply the inverse projection of the origin location easts, norths = self.origin.projection( self.data["longitude"].to_numpy(), self.data["latitude"].to_numpy() ) # Create the new collection in polar coordinates cartesian_collection = CartesianCollection(self.origin, len(self.data.columns) - 2) cartesian_collection.data["east"] = easts cartesian_collection.data["north"] = norths # Copy over the values for i in range(len(self.data.columns) - 2): cartesian_collection.data[f"v{i}"] = self.data[f"v{i}"] return cartesian_collection
class HybridInterpolator: def __init__(self, coordinates, values): self.linear = LinearNDInterpolator(coordinates, values) self.nearest = NearestNDInterpolator(coordinates, values) def __call__(self, coordinates): result = self.linear(coordinates) nan_values = isnan(result).reshape(len(result)) result[nan_values] = self.nearest(coordinates[nan_values]) return result