Source code for promis.star_map

"""This module contains a class for handling probabilistic, semantic and geospatial data."""

#
# Copyright (c) Simon Kohaut, Honda Research Institute Europe GmbH, Felix Divo, and contributors
#
# This file is part of ProMis and licensed under the BSD 3-Clause License.
# You should have received a copy of the BSD 3-Clause License along with ProMis.
# If not, see https://opensource.org/license/bsd-3-clause/.
#

# Standard Library
import time
from collections import defaultdict
from collections.abc import Callable, Iterable
from copy import deepcopy
from pickle import dump, load
from re import finditer
from traceback import format_exception
from warnings import warn

# Third Party
from numpy import array, empty, maximum, mean, sqrt, var
from numpy.typing import NDArray

# ProMis
from promis.geo import CartesianCollection, CartesianMap
from promis.logic.spatial import Approaches, Crosses, Depth, Distance, Enters, Exits, Faces, Intersects, Follows, Opposes, Over, Relation, ScalarRelation


[docs] class StaRMap: """A Statistical Relational Map. This map holds all information about spatial relations between an agent's state space and features on a map. It can be used to compute parameters for these relations on a set of support points, and provides an interface to query these parameters for arbitrary locations. Args: uam: The uncertainty annotated map in Cartesian space. """ def __init__( self, uam: CartesianMap, ) -> None: self.uam = uam self.relations: dict[str, dict[str, Relation]] = { "over": {}, "distance": {}, "depth": {}, "enters": {}, "exits": {}, "faces": {}, "crosses": {}, "intersects": {}, "follows": {}, "opposes": {} } self._promis = None
[docs] def initialize(self, evaluation_points: CartesianCollection, number_of_random_maps: int, logic: str): """Setup the StaRMap for a given set of support points, number of samples and logic. Args: evaluation_points: The points to initialize the StaR Map on. number_of_random_maps: The number of samples to be used per support point. logic: The set of constraints deciding which relations are computed. """ what: dict[str, set[str]] = defaultdict(set) for relation_type, location_type, _ in self._parse_sources(logic): if relation_type in self.relation_types: what[relation_type].add(location_type) self.sample(evaluation_points, number_of_random_maps, dict(what))
[docs] @staticmethod def relation_name_to_class(relation: str) -> Relation: """Get the class for a given relation name. Args: relation: The name of the relation. Returns: The class corresponding to the relation name. Raises: NotImplementedError: If the relation name is unknown. """ match relation: case "approaches": return Approaches case "crosses": return Crosses case "over": return Over case "distance": return Distance case "depth": return Depth case "enters": return Enters case "exits": return Exits case "faces": return Faces case "intersects": return Intersects case "follows": return Follows case "opposes": return Opposes case _: raise NotImplementedError(f'Requested unknown relation "{relation}" from StaR Map')
@property def relation_types(self) -> set[str]: """Get the names of all relation types in the map.""" return set(self.relations.keys()) @property def relation_and_location_types(self) -> dict[str, set[str]]: """Get all relation and location type combinations in the map.""" return {name: set(info.keys()) for name, info in self.relations.items() if info} @property def location_types(self) -> set[str]: """Get all unique location types present in the map.""" return {location_type for info in self.relations.values() for location_type in info.keys()} @property def relation_arities(self) -> dict[str, int]: """Get the arity for each relation type.""" return {name: self.relation_name_to_class(name).arity() for name in self.relation_types}
[docs] @staticmethod def load(path: str) -> "StaRMap": """Load a StaRMap from a file. Args: path: The path to the file. Returns: The loaded StaRMap object. """ with open(path, "rb") as file: return load(file)
[docs] def save(self, path: str) -> None: """Save the StaRMap to a file. Args: path: The path to the file. """ with open(path, "wb") as file: dump(self, file)
[docs] def update( self, sample_points: CartesianCollection, number_of_random_maps: int, what: dict[str, Iterable[str | None]] | None = None, interpolation_method: str = "hybrid", timestamp: float = None, ) -> None: """Recompute a spatial relation and write the result to the linked Resin circuit. This combines :meth:`sample`, parameter interpolation to the evaluation grid, and the Resin writer call into one step. Call :meth:`link` and :meth:`~promis.promis.ProMis.initialize` before using this method. Args: sample_points: Points at which to compute the spatial relation (typically perturbed positions of the dynamic features). number_of_random_maps: Number of random map samples used to estimate the relation parameters. what: The spatial relations to compute, as a mapping of relation names to location types. If None, all relations with already present location types are updated. interpolation_method: Interpolation method used to map the newly computed relation parameters to the evaluation grid. timestamp: The timestamp to write with. If None, the current time is used. Raises: RuntimeError: If :meth:`link` or :meth:`~promis.promis.ProMis.initialize` have not been called. """ if self._promis is None: raise RuntimeError( "StaRMap must be linked to a ProMis instance via link() before calling update()." ) if self._promis._evaluation_points is None: raise RuntimeError( "ProMis.initialize() must be called before StaRMap.update()." ) # Default: Update all relations and location types in the StaR Map what = self.relation_and_location_types if what is None else what # Delete all existing, relevant relations for relation_type, location_types in what.items(): for location_type in location_types: self.relations[relation_type].pop(location_type) # Sample all the relevant relations on the given points self.sample(sample_points, number_of_random_maps, what=what) # Interpolate to the evaluation grid stored by ProMis and upload to Resin coords = self._promis._evaluation_points.coordinates() timestamp = timestamp if timestamp is not None else time.monotonic() for relation_type, location_types in what.items(): for location_type in location_types: # Get relation parameters interpolated onto ProMis evaluation points relation = self.get(relation_type, location_type) params = relation.parameters.get_interpolator(interpolation_method)(coords) # Write to the appropriate Resin channel writer = self._promis.get_star_map_writer(relation_type, location_type) relation_obj = self.relations[relation_type][location_type] if isinstance(relation_obj, ScalarRelation): means = params[:, 0].ravel() stds = sqrt(maximum(params[:, 1], 1e-6)).ravel() writer.write("normal", [means, stds], timestamp) else: writer.write(params[:, 0].ravel(), timestamp)
[docs] def get(self, relation: str, location_type: str) -> Relation: """Get the computed data for a relation to a location type. Args: relation: The relation to return. location_type: The location type to relate to. Returns: A deepcopy of the relation object for the given relation and location type. """ return deepcopy(self.relations[relation][location_type])
[docs] def get_all(self) -> dict[str, dict[str, Relation]]: """Get all computed relations. Returns: A nested dictionary of all computed relations, mapping relation type to location type to the `Relation` object. """ return deepcopy(self.relations)
@staticmethod def _parse_sources(logic: str) -> list[tuple[str, str, str]]: """Parse ``source(path, Type)`` declarations from a Resin program. Extracts relation type and location type from the path (last two segments) rather than from the left-hand side atom. For example:: source("/star_map/over/park", Probability) yields ``("over", "park", "Probability")``. Args: logic: The Resin program string. Returns: A list of ``(relation_type, location_type, source_type)`` triples. """ result = [] for match in finditer(r'source\("(/[^"]*)",\s*(Probability|Density)\s*\)', logic): parts = match.group(1).strip('/').split('/') source_type = match.group(2) if len(parts) >= 2: result.append((parts[-2], parts[-1], source_type)) return result
[docs] def adaptive_sample( self, candidate_sampler: Callable[[int], NDArray], number_of_random_maps: int, number_of_iterations: int, number_of_improvement_points: int, what: dict[str, Iterable[str | None]] | None = None, scaler: float = 10.0, value_index: int = 0, acquisition_method: str = "entropy" ): """Automatically add support points at locations where the uncertainty is high. Args: candidate_sampler: The sampler that provides a candidate Collection that may be used for computing relation parameters number_of_random_maps: How often to sample from map data in order to compute statistics of spatial relations number_of_iterations: How many iterations of improvements to run number_of_improvement_points: How many points to add to improve the map at each iteration. what: The spatial relations to compute, as a mapping of relation names to location types scaler: How much to weigh the employed scoring method over the distance score value_index: Which value column of the relation's parameters to use for improvement acquisition_method: Which improvement method to use, one of {entropy, gaussian_process} """ what = self.relation_and_location_types if what is None else what all_location_types = list(dict.fromkeys( location_type for types in what.values() for location_type in types )) # For each location_type we get one set of random maps and RTrees for location_type in all_location_types: r_trees, feature_samples = self._make_r_trees(location_type, number_of_random_maps) # For each relation we decide new sample points based on distance and entropy scores for relation, types in what.items(): if location_type not in types: continue # Define value function and improve Collection def value_function(collection): return self._compute_parameters(collection, relation, r_trees, feature_samples) self.relations[relation][location_type].parameters.improve( candidate_sampler=candidate_sampler, value_function=value_function, number_of_iterations=number_of_iterations, number_of_improvement_points=number_of_improvement_points, scaler=scaler, value_index=value_index, acquisition_method=acquisition_method )
def _make_r_trees(self, location_type: str, number_of_random_maps: int) -> tuple[list, list]: """Create R-trees for a given location type from random map samples. Args: location_type: The location type to filter features from the UAM. number_of_random_maps: The number of random maps to sample. Returns: A tuple containing a list of R-trees and a list of the corresponding randomly sampled maps. Returns (None, None) if no features for the given location type exist. """ type_features = self.uam.features.get(location_type, []) if type_features: typed_map = CartesianMap(self.uam.origin, {location_type: type_features}) random_maps = typed_map.sample(number_of_random_maps) r_trees = [instance.to_rtree() for instance in random_maps] feature_samples = [instance.features[location_type] for instance in random_maps] return r_trees, feature_samples else: return None, None def _compute_parameters( self, collection: CartesianCollection, relation: str, r_trees: list | None, feature_samples: list[list] | None, ) -> NDArray: """Compute the parameters for a given relation. Args: collection: The collection defining coordinates at which to compute the parameters. relation: The name of the relation to compute. r_trees: A list of R-trees for the location type, one for each random map. feature_samples: For each R-tree, the flat list of features it was built from. Returns: An array of computed parameters for each coordinate. """ # Get the class of the spatial relation relation_class = self.relation_name_to_class(relation) # If the map had no relevant features, fill with default values if r_trees is None: return array([relation_class.empty_map_parameters()] * len(collection.data)) try: return relation_class.compute_parameters(collection, r_trees, feature_samples) except Exception as e: warn( f"StaR Map encountered exception! " f"Relation {relation} will use default parameters. " f"Error was:\n{''.join(format_exception(e))}" ) return array([relation_class.empty_map_parameters()] * len(collection.data))
[docs] def sample( self, evaluation_points: CartesianCollection, number_of_random_maps: int, what: dict[str, Iterable[str | None]] | None = None, ): """Compute and store spatial relation parameters. For a given set of evaluation points, this method computes the parameters of spatial relations by sampling the underlying uncertainty-annotated map. Args: evaluation_points: The collection of points for which the spatial relations will be computed. number_of_random_maps: How often to sample from map data in order to compute statistics of spatial relations. what: The spatial relations to compute, as a mapping of relation names to location types. If None, all relations with already present location types are computed. """ what = self.relation_and_location_types if what is None else what all_location_types = list(dict.fromkeys( location_type for types in what.values() for location_type in types )) coordinates = evaluation_points.coordinates() transitions = evaluation_points.transitions() n_points = len(evaluation_points.data) for location_type in all_location_types: r_trees, feature_samples = self._make_r_trees(location_type, number_of_random_maps) relevant = [ (relation, self.relation_name_to_class(relation)) for relation, types in what.items() if location_type in types ] # No features for this type: fill all relations with defaults and move on if r_trees is None: for relation, relation_class in relevant: if location_type not in self.relations[relation]: self.relations[relation][location_type] = relation_class( CartesianCollection(self.uam.origin, 2), location_type ) self.relations[relation][location_type].parameters.append( coordinates, array([relation_class.empty_map_parameters()] * n_points), transitions, ) continue # Single pass over all R-trees, computing all relevant relations at once n_maps = len(r_trees) accumulator = {relation: empty((n_maps, n_points)) for relation, _ in relevant} failed: set[str] = set() for i, (r_tree, geometries) in enumerate(zip(r_trees, feature_samples)): for relation, relation_class in relevant: if relation in failed: continue try: accumulator[relation][i] = relation_class.compute_relation( evaluation_points, r_tree, geometries ) except Exception as e: warn( f"StaR Map encountered exception! " f"Relation {relation} for {location_type} will use default parameters. " f"Error was:\n{''.join(format_exception(e))}" ) failed.add(relation) for relation, relation_class in relevant: if location_type not in self.relations[relation]: self.relations[relation][location_type] = relation_class( CartesianCollection(self.uam.origin, 2), location_type ) if relation in failed: params = array([relation_class.empty_map_parameters()] * n_points) else: data = accumulator[relation] params = array([mean(data, axis=0), var(data, axis=0)]).T self.relations[relation][location_type].parameters.append( coordinates, params, transitions )