Source code for promis.logic.spatial.relation

"""This module implements a distributional predicate of distances to sets of map features."""

#
# Copyright (c) Simon Kohaut, Honda Research Institute Europe GmbH, Felix Divo, and contributors
#
# This file is part of ProMis and licensed under the BSD 3-Clause License.
# You should have received a copy of the BSD 3-Clause License along with ProMis.
# If not, see https://opensource.org/license/bsd-3-clause/.
#

# Standard Library
from abc import ABC, abstractmethod
from pathlib import Path
from pickle import dump, load
from typing import TypeVar

# Third Party
from numpy import array, clip, mean, sqrt, var, vstack
from scipy.stats import norm
from shapely.strtree import STRtree

# ProMis
from promis.geo import CartesianCollection, CartesianLocation, CartesianMap, CartesianRasterBand

#: Helper to define derived relations within base class
DerivedRelation = TypeVar("DerivedRelation", bound="Relation")


[docs] class Relation(ABC): """An abstract base class for spatial relations. A spatial relation models a probabilistic relationship between points in space and typed map features. It is typically represented by a distribution (e.g., Gaussian) for each point, defined by a set of parameters like mean and variance. Args: parameters: A collection of points, where each point is associated with parameters that define the relation's distribution (e.g., mean and variance). location_type: A string identifier for the type of map feature this relation pertains to, such as "buildings" or "roads". Can be `None` if the relation is not specific to a feature type. """ def __init__(self, parameters: CartesianCollection, location_type: str | None) -> None: # Setup attributes self.parameters = parameters self.location_type = location_type
[docs] @staticmethod def load(path: str | Path) -> DerivedRelation: """Load the relation from a .pkl file. Args: path: The path to the file, including its name and file extension. Returns: The loaded Relation instance """ with open(path, "rb") as file: return load(file)
[docs] def save(self, path: str | Path) -> None: """Save the relation to a .pkl file. Args: path: The path to the file, including its name and file extension. """ with open(path, "wb") as file: dump(self, file)
[docs] def save_as_plp(self, path: str | Path) -> None: """Save the relation as a text file containing distributional clauses. Args: path: The path to the file, including its name and file extension. """ with open(path, "w") as plp_file: plp_file.write("".join(self.to_distributional_clauses()))
[docs] def to_distributional_clauses(self) -> list[str]: """Express the Relation as distributional clause. A distributional clause is a string representation of a probabilistic fact, suitable for use in a probabilistic logic program. Returns: A list of distributional clauses, one for each point in the `parameters` collection. """ return [ self.index_to_distributional_clause(index) for index in range(len(self.parameters.data)) ]
[docs] @staticmethod @abstractmethod def empty_map_parameters() -> list[float]: """Create the default parameters for a relation computed on an empty map. These parameters are used as a fallback when no map features are present to compute the relation from. """
[docs] @abstractmethod def index_to_distributional_clause(self, index: int) -> str: """Express a single index of this Relation as a distributional clause. Args: index: The index of the point within the `parameters` collection. Returns: A string representing the distributional clause for the specified entry. """
[docs] @staticmethod @abstractmethod def compute_relation( location: CartesianLocation, r_tree: STRtree, original_geometries: CartesianMap ) -> float: """Compute the value of this Relation type for a specific location and map. Args: location: The location to evaluate in Cartesian coordinates. r_tree: The map represented as an R-tree for efficient spatial queries. original_geometries: The original geometries indexed by the STRtree. Returns: A scalar value representing the computed relation (e.g., distance, depth). """
[docs] @staticmethod @abstractmethod def arity() -> int: """Return the arity of the relation."""
[docs] @classmethod def compute_parameters( cls, location: CartesianLocation, r_trees: list[STRtree], original_geometries: list[CartesianMap], ) -> array: """Compute the parameters of this Relation type for a specific location and set of maps. Args: location: The location to evaluate in Cartesian coordinates. r_trees: A list of generated maps, each represented as an R-tree. original_geometries: The original geometries indexed by the STRtrees. Returns: A numpy array containing the computed parameters (e.g., mean and variance) of the relation's distribution for the given location. """ relation_data = [ cls.compute_relation(location, r_tree, geometries) for r_tree, geometries in zip(r_trees, original_geometries) ] return array([mean(relation_data, axis=0), var(relation_data, axis=0)]).T
[docs] @classmethod def from_r_trees( cls, support: CartesianCollection, r_trees: list[STRtree], location_type: str, original_geometries: list[CartesianMap], ) -> DerivedRelation: """Compute relation for a Cartesian collection of points and a set of R-trees. Args: support: The collection of Cartesian points to compute the relation for. r_trees: Random variations of the features of a map, each indexible by an STRtree. location_type: The type of features this relates to. original_geometries: The original geometries indexed by the STRtrees. Returns: A new instance of the Relation class, populated with the computed parameters. """ # Compute relation over support points locations = support.to_cartesian_locations() statistical_moments = vstack( [ cls.compute_parameters(location, r_trees, original_geometries) for location in locations ] ) if isinstance(support, CartesianRasterBand): # Maintain the efficient raster representation parameters = CartesianRasterBand( support.origin, support.resolution, support.width, support.height, number_of_values=statistical_moments.shape[1], ) for i in range(statistical_moments.shape[1]): parameters.data[f"v{i}"] = statistical_moments[:, i] else: # Setup parameter collection parameters = CartesianCollection(support.origin, number_of_values=statistical_moments.shape[1]) parameters.append(locations, statistical_moments) return cls(parameters, location_type)
[docs] class ScalarRelation(Relation): """A relation representing a scalar value modeled by a Gaussian distribution. This class provides a concrete implementation for relations where the value at each point can be described by a mean and a variance. It also implements comparison operators (`<`, `>`) to facilitate probabilistic queries based on the Commulative Distribution Function (CDF) of the Gaussian distribution. Args: parameters: A collection of points, where each has values for `[mean, variance]`. location_type: The name of the locations this distance relates to. problog_name: The name to be used for this relation in Problog clauses. enforced_min_variance: The minimum variance to enforce for the distribution. Values below this will be clipped. """ def __init__( self, parameters: CartesianCollection, location_type: str | None, problog_name: str, enforced_min_variance: float | None = 0.001, ) -> None: super().__init__(parameters, location_type) self.problog_name = problog_name self.parameters.data["v1"] = clip(self.parameters.data["v1"], enforced_min_variance, None) self.enforced_min_variance = enforced_min_variance def __lt__(self, value: float) -> CartesianCollection: """Compute the probability that the relation's value is less than a given value. This is equivalent to calculating the CDF of the Gaussian distribution at each point for the given value. Args: value: The value to compare against. Returns: A CartesianCollection where each point's value is the probability `P(relation < value)`. """ means = self.parameters.data["v0"] variances = self.parameters.data["v1"] cdf = norm.cdf(value, loc=means, scale=sqrt(variances)) if isinstance(self.parameters, CartesianRasterBand): # Maintain the efficient raster representation probabilities = CartesianRasterBand( self.parameters.origin, self.parameters.resolution, self.parameters.width, self.parameters.height, ) probabilities.data["v0"] = cdf else: probabilities = CartesianCollection(self.parameters.origin) probabilities.append(self.parameters.coordinates(), vstack(cdf)) return probabilities def __gt__(self, value: float) -> CartesianCollection: """Compute the probability that the relation's value is greater than a given value. This is equivalent to calculating the survival function (1 - CDF) of the Gaussian distribution at each point for the given value. Args: value: The value to compare against. Returns: A CartesianCollection where each point's value is the probability `P(relation > value)`. """ # Uses the existing __lt__ method to compute the inverse probabilities = self < value probabilities.data["v0"] = 1.0 - probabilities.data["v0"] return probabilities
[docs] def index_to_distributional_clause(self, index: int) -> str: """Express a single index of this Relation as a distributional clause. Formats the clause as `name(x_INDEX, location_type) ~ normal(MEAN, STD).` or `name(x_INDEX) ~ normal(MEAN, STD).` if `location_type` is None. Args: index: The index of the point within the `parameters` collection. Returns: A string representing the distributional clause for the specified entry. """ if self.location_type is None: relation = f"{self.problog_name}(x_{index})" else: relation = f"{self.problog_name}(x_{index}, {self.location_type})" mean = self.parameters.data['v0'][index] std = sqrt(self.parameters.data['v1'][index]) distribution = f"normal({mean}, {std})" return f"{relation} ~ {distribution}.\n"