Source code for promis.star_map

"""This module contains a class for handling probabilistic, semantic and geospatial data."""

#
# Copyright (c) Simon Kohaut, Honda Research Institute Europe GmbH, Felix Divo, and contributors
#
# This file is part of ProMis and licensed under the BSD 3-Clause License.
# You should have received a copy of the BSD 3-Clause License along with ProMis.
# If not, see https://opensource.org/license/bsd-3-clause/.
#

# Standard Library
from collections import defaultdict
from collections.abc import Callable, Iterable
from copy import deepcopy
from pickle import dump, load
from re import finditer
from traceback import format_exception
from warnings import warn

# Third Party
from numpy import array
from numpy.typing import NDArray

# ProMis
from promis.geo import CartesianCollection, CartesianMap
from promis.logic.spatial import Depth, Distance, Over, Relation


[docs] class StaRMap: """A Statistical Relational Map. This map holds all information about spatial relations between an agent's state space and features on a map. It can be used to compute parameters for these relations on a set of support points, and provides an interface to query these parameters for arbitrary locations. Args: uam: The uncertainty annotated map in Cartesian space. """ def __init__( self, uam: CartesianMap, ) -> None: self.uam = uam self.relations: dict[str, dict[str, Relation]] = {"over": {}, "distance": {}, "depth": {}}
[docs] def initialize(self, evaluation_points: CartesianCollection, number_of_random_maps: int, logic: str): """Setup the StaRMap for a given set of support points, number of samples and logic. Args: evaluation_points: The points to initialize the StaR Map on. number_of_random_maps: The number of samples to be used per support point. logic: The set of constraints deciding which relations are computed. """ self.sample( evaluation_points, number_of_random_maps, self._get_mentioned_relations(logic) )
[docs] @staticmethod def relation_name_to_class(relation: str) -> Relation: """Get the class for a given relation name. Args: relation: The name of the relation. Returns: The class corresponding to the relation name. Raises: NotImplementedError: If the relation name is unknown. """ match relation: case "over": return Over case "distance": return Distance case "depth": return Depth case _: raise NotImplementedError(f'Requested unknown relation "{relation}" from StaR Map')
@property def relation_types(self) -> set[str]: """Get the names of all relation types in the map.""" return set(self.relations.keys()) @property def relation_and_location_types(self) -> dict[str, set[str]]: """Get all relation and location type combinations in the map.""" return {name: set(info.keys()) for name, info in self.relations.items() if info} @property def location_types(self) -> set[str]: """Get all unique location types present in the map.""" return {location_type for info in self.relations.values() for location_type in info.keys()} @property def relation_arities(self) -> dict[str, int]: """Get the arity for each relation type.""" return {name: self.relation_name_to_class(name).arity() for name in self.relation_types}
[docs] @staticmethod def load(path: str) -> "StaRMap": """Load a StaRMap from a file. Args: path: The path to the file. Returns: The loaded StaRMap object. """ with open(path, "rb") as file: return load(file)
[docs] def save(self, path: str) -> None: """Save the StaRMap to a file. Args: path: The path to the file. """ with open(path, "wb") as file: dump(self, file)
[docs] def get(self, relation: str, location_type: str) -> Relation: """Get the computed data for a relation to a location type. Args: relation: The relation to return. location_type: The location type to relate to. Returns: A deepcopy of the relation object for the given relation and location type. """ return deepcopy(self.relations[relation][location_type])
[docs] def get_all(self, logic: str | None = None) -> dict[str, dict[str, Relation]]: """Get all or a subset of relations for each location type. If a logic program is provided, only the relations mentioned in it are returned. Otherwise, all computed relations are returned. Args: logic: An optional logic program to filter the relations. Returns: A nested dictionary of all requested relations, mapping relation type to location type to the `Relation` object. """ if logic is not None: return deepcopy( { relation_type: { location_type: self.relations[relation_type][location_type] for location_type in location_types } for relation_type, location_types in self._get_mentioned_relations(logic).items() } ) return deepcopy(self.relations)
def _get_mentioned_relations(self, logic: str) -> dict[str, set[str]]: """Get all relations mentioned in a logic program. Args: logic: The logic program. Returns: A dictionary mapping relation types to a set of location types mentioned in the program. """ relations: dict[str, set[str]] = defaultdict(set) for name, arity in self.relation_arities.items(): # Assume the ternary relation "between(X, anchorage, port)". # Then, this pattern matches ", anchorage, port", i.e., what X relates to. relates_to = ",".join([r"\s*((?:'\w*')|(?:\w+))\s*"] * (arity - 1)) if relates_to: # Prepend comma to first element if not empty relates_to = "," + relates_to # Matches something like "distance(X, land)" full_pattern = rf"({name})\(X{relates_to}\)" for match in finditer(full_pattern, logic): match arity: case 1: raise Exception( "Arity 1 is not supported because it always needs a location type" ) case 2: location_type = match.group(2) if location_type[0] in "'\"": # Remove quotes location_type = location_type[1:-1] relations[name].add(location_type) case _: raise Exception(f"Only arity 2 is supported, but got {arity}") return relations
[docs] def adaptive_sample( self, candidate_sampler: Callable[[int], NDArray], number_of_random_maps: int, number_of_iterations: int, number_of_improvement_points: int, what: dict[str, Iterable[str | None]] | None = None, scaler: float = 10.0, value_index: int = 0, acquisition_method: str = "entropy" ): """Automatically add support points at locations where the uncertainty is high. Args: candidate_sampler: The sampler that provides a candidate Collection that may be used for computing relation parameters number_of_random_maps: How often to sample from map data in order to compute statistics of spatial relations number_of_iterations: How many iterations of improvements to run number_of_improvement_points: How many points to add to improve the map at each iteration. what: The spatial relations to compute, as a mapping of relation names to location types scaler: How much to weigh the employed scoring method over the distance score value_index: Which value column of the relation's parameters to use for improvement acquisition_method: Which improvement method to use, one of {entropy, gaussian_process} """ what = self.relation_and_location_types if what is None else what all_location_types = [location_type for types in what.values() for location_type in types] # For each location_type we get one set of random maps and RTrees for location_type in all_location_types: r_trees, random_maps = self._make_r_trees(location_type, number_of_random_maps) # For each relation we decide new sample points based on distance and entropy scores for relation, types in what.items(): if location_type not in types: continue # Define value function and improve Collection def value_function(points): return self._compute_parameters(points, relation, location_type, r_trees, random_maps) self.relations[relation][location_type].parameters.improve( candidate_sampler=candidate_sampler, value_function=value_function, number_of_iterations=number_of_iterations, number_of_improvement_points=number_of_improvement_points, scaler=scaler, value_index=value_index, acquisition_method=acquisition_method )
def _make_r_trees(self, location_type: str, number_of_random_maps: int) -> tuple[list, list]: """Create R-trees for a given location type from random map samples. Args: location_type: The location type to filter features from the UAM. number_of_random_maps: The number of random maps to sample. Returns: A tuple containing a list of R-trees and a list of the corresponding randomly sampled maps. Returns (None, None) if no features for the given location type exist. """ # Filter relevant features, sample randomized variations of map and package into RTrees typed_map: CartesianMap = self.uam.filter(location_type) if typed_map.features: random_maps = typed_map.sample(number_of_random_maps) return [instance.to_rtree() for instance in random_maps], random_maps else: return None, None def _compute_parameters( self, coordinates: NDArray, relation: str, location_type: str, r_trees: list | None, random_maps: list[CartesianMap] | None, ) -> NDArray: """Compute the parameters for a given relation. Args: coordinates: The coordinates at which to compute the parameters. relation: The name of the relation to compute. location_type: The location type to relate to. r_trees: A list of R-trees for the location type, one for each random map. random_maps: A list of randomly sampled maps. Returns: An array of computed parameters for each coordinate. """ # Get the class of the spatial relation relation_class = self.relation_name_to_class(relation) # If the map had no relevant features, fill with default values if r_trees is None: return array([relation_class.empty_map_parameters()] * coordinates.shape[0]) try: collection = CartesianCollection(self.uam.origin) collection.append_with_default(coordinates, 0.0) return relation_class.from_r_trees( collection, r_trees, location_type, original_geometries=random_maps ).parameters.values() except Exception as e: warn( f"StaR Map encountered excpetion! " f"Relation {relation} for {location_type} will use default parameters. " f"Error was:\n{''.join(format_exception(e))}" ) return array([relation_class.empty_map_parameters()] * coordinates.shape[0])
[docs] def sample( self, evaluation_points: CartesianCollection, number_of_random_maps: int, what: dict[str, Iterable[str | None]] | None = None, ): """Compute and store spatial relation parameters. For a given set of evaluation points, this method computes the parameters of spatial relations by sampling the underlying uncertainty-annotated map. Args: evaluation_points: The collection of points for which the spatial relations will be computed. number_of_random_maps: How often to sample from map data in order to compute statistics of spatial relations. what: The spatial relations to compute, as a mapping of relation names to location types. If None, all relations with already present location types are computed. """ what = self.relation_and_location_types if what is None else what all_location_types = [location_type for types in what.values() for location_type in types] coordinates = evaluation_points.coordinates() for location_type in all_location_types: r_trees, random_maps = self._make_r_trees(location_type, number_of_random_maps) # This could be parallelized, as each relation and location type is independent from all others for relation, types in what.items(): if location_type not in types: continue if location_type not in self.relations[relation].keys(): self.relations[relation][location_type] = self.relation_name_to_class(relation)( CartesianCollection(self.uam.origin, 2), location_type ) # Update collection of sample points self.relations[relation][location_type].parameters.append( coordinates, self._compute_parameters(coordinates, relation, location_type, r_trees, random_maps) )