Source code for promis.promis

"""The ProMis engine for solving constrained navigation tasks using hybrid probabilistic logic."""

#
# Copyright (c) Simon Kohaut, Honda Research Institute Europe GmbH
#
# This file is part of ProMis and licensed under the BSD 3-Clause License.
# You should have received a copy of the BSD 3-Clause License along with ProMis.
# If not, see https://opensource.org/license/bsd-3-clause/.
#

# Standard Library
from multiprocessing import Pool

# Third Party
from numpy import array
from rich.progress import track

# ProMis
from promis.geo import CartesianCollection
from promis.logic import Solver
from promis.star_map import StaRMap


[docs] class ProMis: """The ProMis engine to create Probabilistic Mission Landscapes.""" def __init__(self, star_map: StaRMap) -> "ProMis": """Setup the ProMis engine. Args: star_map: The statistical relational map holding the parameters for ProMis """ self.star_map = star_map
[docs] def solve( self, evaluation_points: CartesianCollection, logic: str, n_jobs: int | None = None, batch_size: int = 10, show_progress: bool = False, print_first: bool = False, interpolation_method: str = "linear", ) -> None: """Solve the given ProMis problem. It searches the provided logic for the used relations and location types and only encodes the necessary information for the inference. It can further parallelize the inference process over multiple workers and batch into fewer solver invocations to speed up computations. Args: support: The points to compute exactly, with the output being interpolated to the same target as the employed StaRMap logic: The constraints of the landscape(X) predicate, including its definition n_jobs: How many workers to use in parallel batch_size: How many pixels to infer at once show_progress: Whether to show a progress bar print_first: Whether to print the first program to stdout interpolation_method: The interpolation method used to get from StaR Map known values to evluation points """ # Get all relevant relations from the StaRMap relations = self.star_map.get_all(logic) # For each point in the target CartesianCollection, we need to run a query number_of_queries = len(evaluation_points.data) queries = [f"query(landscape(x_{index})).\n" for index in range(number_of_queries)] # We batch up queries into separate programs solvers = [] for index in range(0, number_of_queries, batch_size): # Define the current batch of indices batch = range(index, index + batch_size) # Write the background knowledge, queries and parameters to the program program = logic + "\n" for batch_index in batch: if batch_index >= number_of_queries: break for relation_type in relations.keys(): for location_type in relations[relation_type].keys(): relation = relations[relation_type][location_type] relation.parameters = relation.parameters.into( evaluation_points, interpolation_method, in_place=False ) program += relation.index_to_distributional_clause(batch_index) program += queries[batch_index] # Add program to collection solvers.append(Solver(program)) if print_first and index == 0: print(program) if n_jobs is None: flattened_data = [] for solver in track( solvers, description="Inference", disable=not show_progress, ): batch = ProMis._run_inference(solver) flattened_data.extend(batch) else: # Solve in parallel with pool of workers flattened_data = [] with Pool(n_jobs) as pool: batched_results = pool.imap( ProMis._run_inference, solvers, chunksize=10 if len(solvers) > 1000 else 1, ) for batch in track( batched_results, total=len(solvers), description="Inference", disable=not show_progress, ): flattened_data.extend(batch) # Write results to the CartesianCollection where the tata is known evaluation_points.data["v0"] = array(flattened_data)
[docs] def adaptive_solve( self, initial_evaluation_points, candidate_sampler, logic, number_of_iterations: int, number_of_improvement_points: int, n_jobs: int | None = None, batch_size: int = 10, scaler: float = 10.0, interpolation_method: str = "linear", acquisition_method: str = "entropy" ): """Automatically add support points at locations where the uncertainty is high. Args: number_of_random_maps: How often to sample from map data in order to compute statistics of spatial relations number_of_improvement_points: How many points to add to improve the map relations: The spatial relations to compute location_types: The location types to compute interpolation_method: The interpolation method used to get from StaR Map known values to evluation points acquisition_method: The method to inform the adaptive solving process, one of {entropy, gaussian_process} """ self.solve(initial_evaluation_points, logic, n_jobs, batch_size) def value_function(points): evaluation_points = CartesianCollection(self.star_map.uam.origin) evaluation_points.append_with_default(points, 0.0) self.solve( evaluation_points, logic, n_jobs, batch_size, interpolation_method=interpolation_method ) return evaluation_points.data["v0"].to_numpy()[:, None] initial_evaluation_points.improve( candidate_sampler=candidate_sampler, value_function=value_function, number_of_iterations=number_of_iterations, number_of_improvement_points=number_of_improvement_points, scaler=scaler, acquisition_method=acquisition_method )
@staticmethod def _run_inference(solver: Solver) -> list[float]: return solver.inference()