Source code for promis.geo.collection

"""This module contains a class for handling a collection of spatially referenced data."""

#
# Copyright (c) Simon Kohaut, Honda Research Institute Europe GmbH, Felix Divo, and contributors
#
# This file is part of ProMis and licensed under the BSD 3-Clause License.
# You should have received a copy of the BSD 3-Clause License along with ProMis.
# If not, see https://opensource.org/license/bsd-3-clause/.
#

# Standard Library
from abc import ABC, abstractmethod
from collections.abc import Callable
from copy import deepcopy
from pickle import dump, load
from typing import Any

# Third Party
import smopy
from matplotlib import pyplot as plt
from numpy import (
    argsort,
    array,
    atleast_2d,
    concatenate,
    histogram,
    isnan,
    ndarray,
    repeat,
    sort,
    unique,
    zeros,
)
from numpy.linalg import norm
from numpy.typing import NDArray
from pandas import DataFrame, concat
from scipy.interpolate import CloughTocher2DInterpolator, LinearNDInterpolator, NearestNDInterpolator
from scipy.spatial import distance_matrix
from scipy.stats import entropy as shannon_entropy
from scipy.stats.qmc import LatinHypercube, scale
from sklearn.cluster import AgglomerativeClustering
from sklearn.neighbors import NearestNeighbors

# ProMis
from promis.geo.location import CartesianLocation, PolarLocation
from promis.models import GaussianProcess


[docs] class Collection(ABC): """An abstract base class for a collection of spatially referenced data points. This class provides a common interface for managing data points that have associated spatial coordinates and values. It uses a pandas DataFrame for internal storage. Subclasses implement specific coordinate systems, such as Cartesian or Polar. Args: columns: The names of the columns for the internal DataFrame. The first two columns are expected to be coordinates, followed by value columns. origin: The polar coordinates of this collection's reference frame's center. number_of_values: The number of value columns to associate with each location. """ def __init__(self, columns: list[str], origin: PolarLocation, number_of_values: int = 1) -> None: self.number_of_values = number_of_values self.origin = origin self.basemap = None # Initialize the data frame self.data = DataFrame(columns=columns)
[docs] @staticmethod def load(path) -> "Collection": """Load a collection from a pickle file. Args: path: The path to the file. Returns: The loaded Collection instance. """ with open(path, "rb") as file: return load(file)
[docs] def save(self, path: str): """Save the collection to a pickle file. Args: path: The path to the file, including its name and file extension. """ with open(path, "wb") as file: dump(self, file)
[docs] def clear(self): """Empties out the kept data.""" self.data = self.data.iloc[0:0]
@property @abstractmethod def dimensions(self) -> tuple[float, float]: """Get the dimensions of this Collection in meters. Returns: The dimensions of this Collection in meters as ``(width, height)``. """ raise NotImplementedError
[docs] def extent(self) -> tuple[float, float, float, float]: """Get the extent of this collection, i.e., the min and max coordinates. Returns: The minimum and maximum coordinates in order ``west, east, south, north``. """ # TODO this might fail near the international date line for polar coordinates west = min(self.data[self.data.columns[0]]) east = max(self.data[self.data.columns[0]]) south = min(self.data[self.data.columns[1]]) north = max(self.data[self.data.columns[1]]) return west, east, south, north
[docs] def values(self) -> NDArray[Any]: """Unpack the location values as numpy array. Returns: The values of this Collection as numpy array. """ value_columns = self.data.columns[2:] return self.data[value_columns].to_numpy()
[docs] def coordinates(self) -> NDArray[Any]: """Unpack the location coordinates as numpy array. Returns: The values of this Collection as numpy array. """ location_columns = self.data.columns[:2] return self.data[location_columns].to_numpy()
def __getitem__(self, position: tuple[float, float]) -> NDArray[Any]: """Get the value(s) at a specific coordinate. Args: position: A tuple `(x, y)` representing the coordinate to look up. Returns: A numpy array of value(s) at the specified coordinate. """ x, y = position return ( self.data.loc[(self.data[self.data.columns[0]] == x) & (self.data[self.data.columns[1]] == y)] .to_numpy()[:, 2:] # Get all columns except the first two .squeeze(0) # Makes sure we get a 1D array )
[docs] def to_csv(self, path: str, mode: str = "w"): """Saves the collection as comma-separated values file. Args: path: The path with filename to write to. mode: The writing mode, one of {w, x, a}. """ self.data.to_csv(path, mode=mode, index=False, float_format="%f")
[docs] def append( self, coordinates: NDArray[Any] | list[PolarLocation | CartesianLocation], values: NDArray[Any], ): """Append location and associated value vectors to collection. Args: coordinates: A list of locations to append or matrix of coordinates. values: The associated values as 2D matrix, each row belongs to a single location. """ assert len(coordinates) == values.shape[0], ( f"Number of coordinates and values mismatch, got {len(coordinates)} and {values.shape[0]}" ) if isinstance(coordinates, ndarray): new_entries = concatenate([coordinates, values], axis=1) else: new_entries = concatenate( [array([[location.x, location.y] for location in coordinates]), values], axis=1 ) if self.data.empty: self.data = DataFrame(new_entries, columns=self.data.columns) else: self.data = concat( [self.data, DataFrame(new_entries, columns=self.data.columns)], ignore_index=True ) # Reset basemap since new data is added self.basemap = None
[docs] def append_with_default( self, coordinates: NDArray[Any] | list[PolarLocation | CartesianLocation], value: NDArray[Any], ): """Append location with a default value. Args: coordinates: A list of locations or a matrix of coordinates to append. value: The default value to assign to all new locations. """ self.append(atleast_2d(coordinates), repeat(atleast_2d(value), len(coordinates), axis=0))
[docs] def get_basemap(self, zoom=16): """Obtain the OSM basemap image of the collection's area. Args: zoom: The zoom level requested from OSM. Returns: The basemap image. """ # Would cause circular import if done at module scope from promis.loaders import OsmLoader # Get OpenStreetMap and crop to relevant area south, west, north, east = OsmLoader.compute_polar_bounding_box(self.origin, self.dimensions) map = smopy.Map((south, west, north, east), z=zoom) left, bottom = map.to_pixels(south, west) right, top = map.to_pixels(north, east) region = map.img.crop((left, top, right, bottom)) return region
[docs] def get_nearest_coordinate(self, point: NDArray) -> NDArray: """Get the closest coordinate in this collection relative to a given point. Args: point: The point to find the nearest coordinate to. Returns: The coordinate that is closest to the given point. """ nearest = min(self.coordinates(), key=lambda node: norm(point - array(node))) return nearest
[docs] def get_distance_to(self, other: "Collection") -> NDArray: """Computes the minimum distance from each point in another collection to this collection. For each point in the `other` collection, this method finds the distance to the closest point in the current (`self`) collection. Args: other: The other collection. Returns: A numpy array where the i-th element is the minimum distance from the i-th point in `other` to any point in this collection. """ own_coordinates = self.coordinates() other_coordinates = other.coordinates() return distance_matrix(other_coordinates, own_coordinates).min(axis=1)
[docs] def improve( self, candidate_sampler: Callable, value_function: Callable, number_of_iterations: int, number_of_improvement_points: int, scaler: float, value_index: int = 0, acquisition_method: str = "entropy", ) -> None: """Improves the collection by adding new, informative points. This method uses an active learning approach to iteratively add points to the collection. At each iteration, it samples candidate points, scores them based on an acquisition function, and adds the highest-scoring points to the collection. Args: candidate_sampler: A function that, when called, returns a `Collection` of candidate points. value_function: A function that computes the value(s) for a given set of new point coordinates. number_of_iterations: The number of improvement iterations to run. number_of_improvement_points: The number of new points to add at each iteration. scaler: A factor to scale the contribution of entropy or standard deviation in the acquisition score, relative to the distance. value_index: The index of the value column to use for acquisition methods like 'entropy' or 'gaussian_process'. acquisition_method: The scoring method to use for selecting new points. Supported methods are "entropy" and "gaussian_process". """ if acquisition_method == "gaussian_process": gp = GaussianProcess() gp.fit(self.coordinates(), self.values()[:, value_index, None], 50) # Repeat improvement for specified number of iterations for _ in range(number_of_iterations): candidate_collection = candidate_sampler() candidate_coordinates = candidate_collection.coordinates() distances = self.get_distance_to(candidate_collection) nn = NearestNeighbors(n_neighbors=1).fit(self.coordinates()) _, indices = nn.kneighbors(candidate_coordinates) # We decide score based on chosen method if acquisition_method == "entropy": entropy = self.get_entropy(value_index=value_index) distances_norm = (distances - distances.min()) / (distances.max() - distances.min()) entropies_norm = (entropy - entropy.min()) / (entropy.max() - entropy.min() + 1e-12) score = distances_norm * (1 + scaler * entropies_norm[indices.flatten()]) elif acquisition_method == "gaussian_process": gp.fit(self.coordinates(), self.values()[:, value_index, None], 10) std = gp.predict(candidate_coordinates, return_std=True)[1][:, 0] distances_norm = (distances - distances.min()) / (distances.max() - distances.min()) std_norm = (std - std.min()) / (std.max() - std.min()) score = distances_norm * (1 + scaler * std_norm) else: raise NotImplementedError( f'Requested unknown acquisition method "{acquisition_method}" from Collection' ) # Decide next points to sample at top_candidate_indices = argsort(score)[-number_of_improvement_points:][::-1] next_points = atleast_2d(candidate_coordinates[top_candidate_indices]) # Compute new samples and append to collection next_values = value_function(next_points) self.append(next_points, next_values)
[docs] def get_entropy( self, number_of_neighbours: int = 4, number_of_bins: int = 10, value_index: int = 0 ) -> NDArray: """Compute the local entropy in the collection. Args: number_of_neighbours: The number of neighbours of a point to take into account. number_of_bins: The number of bins to be used for the histogram. value_index: Decides which value of the collection the entropy is computed from. Returns: The local entropy for each point. """ coordinates = self.coordinates() values = self.values()[:, value_index] nn = NearestNeighbors(n_neighbors=number_of_neighbours + 1).fit(coordinates) _, indices = nn.kneighbors(coordinates) entropies = zeros(coordinates.shape[0]) for i, neighbors in enumerate(indices): neighbor_values = values[neighbors[1:]] hist, _ = histogram( neighbor_values, bins=number_of_bins, range=(min(values), max(values)), density=True ) hist += 1e-12 hist /= sum(hist) entropies[i] = shannon_entropy(hist) return entropies
[docs] def scatter(self, value_index: int = 0, plot_basemap=True, ax=None, zoom=16, **kwargs): """Create a scatterplot of this Collection. Args: value_index: Which value of the collection to plot. plot_basemap: Whether an OpenStreetMap tile shall be rendered below. ax: The axis to plot to, default pyplot context if None. zoom: The zoom level of the OSM basemap, default 16. **kwargs: Args passed to the matplotlib scatter function. """ # Either render with given axis or default context if ax is None: ax = plt.gca() # Render base map if plot_basemap: if self.basemap is None: self.basemap = self.get_basemap(zoom) ax.imshow(self.basemap, extent=self.extent()) # Scatter collection data coordinates = self.coordinates() colors = self.values()[:, value_index].ravel() return ax.scatter(coordinates[:, 0], coordinates[:, 1], c=colors, **kwargs)
[docs] class CartesianCollection(Collection): def __init__(self, origin: PolarLocation, number_of_values: int = 1): """Initializes a CartesianCollection. Args: origin: The polar coordinates of this collection's reference frame's center. number_of_values: The number of value columns to associate with each location. """ super().__init__(CartesianCollection._columns(number_of_values), origin, number_of_values) @staticmethod def _columns(number_of_values: int) -> list[str]: return ["east", "north"] + [f"v{i}" for i in range(number_of_values)]
[docs] @classmethod def make_latin_hypercube( cls, origin: PolarLocation, width: float, height: float, number_of_samples: int, number_of_values: int = 1, include_corners: bool = False, ) -> "CartesianCollection": """Creates a collection by sampling points from a Latin Hypercube design.""" samples = LatinHypercube(d=2).random(n=number_of_samples) samples = scale(samples, [-width / 2, -height / 2], [width / 2, height / 2]) collection = cls(origin, number_of_values) collection.append_with_default(samples, 0.0) if include_corners: collection.append_with_default( array( [ [-width / 2, -height / 2], [-width / 2, height / 2], [width / 2, -height / 2], [width / 2, height / 2], ] ), 0.0, ) return collection
@property def dimensions(self) -> tuple[float, float]: """Get the dimensions of this Collection in meters. Returns: The dimensions of this Collection in meters as ``(width, height)``. """ west, east, south, north = self.extent() return east - west, north - south
[docs] def to_cartesian_locations(self) -> list[CartesianLocation]: """Converts the collection's coordinates to a list of CartesianLocation objects. Returns: A list of `CartesianLocation` objects. """ coordinates = self.coordinates() locations = [] for i in range(coordinates.shape[0]): locations.append(CartesianLocation(east=coordinates[i, 0], north=coordinates[i, 1])) return locations
[docs] def to_polar(self) -> "PolarCollection": """Converts this CartesianCollection to a PolarCollection. The coordinates are projected from the local Cartesian plane back to polar (WGS84) coordinates using the collection's origin. Returns: A new `PolarCollection` with the data from this collection. """ # Apply the inverse projection of the origin location longitudes, latitudes = self.origin.projection( self.data["east"].to_numpy(), self.data["north"].to_numpy(), inverse=True ) # Create the new collection in polar coordinates polar_collection = PolarCollection(self.origin, len(self.data.columns) - 2) polar_collection.data["longitude"] = longitudes polar_collection.data["latitude"] = latitudes # Copy over the values for i in range(len(self.data.columns) - 2): polar_collection.data[f"v{i}"] = self.data[f"v{i}"] return polar_collection
[docs] def into( self, other: "Collection", interpolation_method: str = "linear", in_place: bool = True ) -> "Collection": """Interpolates the values of this collection onto the coordinates of another collection. This method takes the spatial data from the current collection and uses an interpolation strategy to estimate the values at the coordinate locations of the `other` collection. The `other` collection's values are then updated with these interpolated values. Args: other: The target collection whose coordinates will be used for interpolation and whose values will be updated. interpolation_method: The interpolation method to use. Supported methods are "linear", "nearest", "hybrid", "gaussian_process", and "clough-tocher". in_place: If True, the `other` collection is modified directly. If False, a deep copy of `other` is created and modified instead. Returns: The modified collection (either `other` itself or a copy) with interpolated values. """ if in_place: target = other else: target = deepcopy(other) # Expand or reduce target to take up the same number of value columns while len(target.data.columns) < len(self.data.columns): target.data[f"v{len(target.data.columns) - 2}"] = 0.0 while len(target.data.columns) > len(self.data.columns): del target.data[f"v{len(target.data.columns) - 3}"] interpolated = self.get_interpolator(interpolation_method)(target.coordinates()) target.data.iloc[:, 2:] = atleast_2d(interpolated) return target
[docs] def get_interpolator(self, interpolation_method: str = "linear") -> Any: """Get an interpolator for the data. Args: interpolation_method: The interpolation method to use, one of {linear, nearest, hybrid, gaussian_process, clough-tocher}. Returns: A callable interpolator function. """ # Create interpolator match interpolation_method: case "linear": return LinearNDInterpolator(self.coordinates(), self.values()) case "nearest": return NearestNDInterpolator(self.coordinates(), self.values()) case "hybrid": return HybridInterpolator(self.coordinates(), self.values()) case "gaussian_process": gp = GaussianProcess() gp.fit(self.coordinates(), self.values()) return gp case "clough-tocher": return CloughTocher2DInterpolator(self.coordinates(), self.values()) case _: raise NotImplementedError( f'Interpolation method "{interpolation_method}" is not implemented for Collection' )
[docs] def prune( self, threshold: float, ): """Reduces the number of points in the collection by clustering nearby points. This method uses Agglomerative Clustering to group points. For each cluster, only one representative point (the first one encountered in the cluster) is kept. This is useful for simplifying dense point clouds. Args: threshold: The maximum distance between points to be considered in the same cluster. """ coordinates = self.coordinates() clusters = AgglomerativeClustering(n_clusters=None, distance_threshold=threshold).fit(coordinates) pruning_index = sort(unique(clusters.labels_, return_index=True)[1]) pruned_coordinates = coordinates[pruning_index] pruned_values = self.values()[pruning_index] self.clear() self.append(pruned_coordinates, pruned_values)
[docs] class PolarCollection(Collection): def __init__(self, origin: PolarLocation, number_of_values: int = 1): """Initializes a PolarCollection. Args: origin: The polar coordinates of this collection's reference frame's center. number_of_values: The number of value columns to associate with each location. """ super().__init__(PolarCollection._columns(number_of_values), origin, number_of_values) @staticmethod def _columns(number_of_values: int) -> list[str]: return ["longitude", "latitude"] + [f"v{i}" for i in range(number_of_values)] @property def dimensions(self) -> tuple[float, float]: """Get the dimensions of this Collection in meters. Returns: The dimensions of this Collection in meters as ``(width, height)``. """ if self.data.empty: return 0.0, 0.0 # Project polar coordinates to Cartesian to get dimensions in meters easts, norths = self.origin.projection( self.data["longitude"].to_numpy(), self.data["latitude"].to_numpy() ) width = easts.max() - easts.min() height = norths.max() - norths.min() return width, height
[docs] def to_polar_locations(self) -> list[PolarLocation]: """Converts the collection's coordinates to a list of PolarLocation objects. Returns: A list of `PolarLocation` objects. """ coordinates = self.coordinates() locations = [] for i in range(coordinates.shape[0]): locations.append(PolarLocation(longitude=coordinates[i, 0], latitude=coordinates[i, 1])) return locations
[docs] def to_cartesian(self) -> CartesianCollection: """Converts this PolarCollection to a CartesianCollection. The polar (WGS84) coordinates are projected onto a local Cartesian plane centered at the collection's origin. Returns: A new `CartesianCollection` with the data from this collection. """ # Apply the inverse projection of the origin location easts, norths = self.origin.projection( self.data["longitude"].to_numpy(), self.data["latitude"].to_numpy() ) # Create the new collection in polar coordinates cartesian_collection = CartesianCollection(self.origin, len(self.data.columns) - 2) cartesian_collection.data["east"] = easts cartesian_collection.data["north"] = norths # Copy over the values for i in range(len(self.data.columns) - 2): cartesian_collection.data[f"v{i}"] = self.data[f"v{i}"] return cartesian_collection
class HybridInterpolator: """An interpolator that combines linear and nearest-neighbor interpolation. This interpolator first attempts to use linear interpolation. For any points where linear interpolation results in NaN (which occurs for points outside the convex hull of the input data), it falls back to nearest-neighbor interpolation. This provides a robust way to interpolate over a whole grid, even at the edges. Args: coordinates: The coordinates of the data points. values: The values at the data points. """ def __init__(self, coordinates: NDArray, values: NDArray): self.linear = LinearNDInterpolator(coordinates, values) self.nearest = NearestNDInterpolator(coordinates, values) def __call__(self, coordinates: NDArray) -> NDArray: result = self.linear(coordinates) nan_values = isnan(result).reshape(len(result)) result[nan_values] = self.nearest(coordinates[nan_values]) return result