"""Allows to find and read nautical charts. Currently, this only supports IHO S-57 charts.
Ideas:
- Maybe use `Fiona <https://pypi.org/project/Fiona/>`__ as an alternative?
Resources:
- Documentation on the S-57 file format and the relevant parts of GDAL:
- https://gdal.org/python/osgeo.ogr-module.html
- https://gdal.org/drivers/vector/s57.html
- https://www.teledynecaris.com/s-57/frames/S57catalog.htm (the entire object catalogue!)
- https://gdal.org/api/python_gotchas.html (!)
- Examples and Cookbooks:
- https://pcjericks.github.io/py-gdalogr-cookbook/vector_layers.html
- and more general: https://pcjericks.github.io/py-gdalogr-cookbook/index.html
- https://lists.osgeo.org/pipermail/gdal-dev/2008-April/016767.html
- Helpers:
- The program QGIS is very helpful because it can open S-57 files visually.
"""
# Python standard
import os
import os.path
import sys
from collections.abc import Generator, Mapping
from functools import partial
from hashlib import sha1
from multiprocessing import Pool
from pathlib import Path
from warnings import catch_warnings, simplefilter, warn
# Third Party
from numpy import array
from shapely import intersection
from shapely.geometry import LineString, MultiLineString, MultiPolygon, Point, Polygon
# ProMis
from promis.geo import (
CartesianGeometry,
CartesianLocation,
CartesianPolygon,
CartesianPolyLine,
Geospatial,
PolarGeometry,
PolarLocation,
PolarPolygon,
PolarPolyLine,
)
from promis.loaders.spatial_loader import SpatialLoader
# Allow osgeo to be missing
# Set to True if the osgeo is available, or False if not
_OSGEO_PRESENT: bool
try:
# This emits warnings (at least on Python 3.8)
with catch_warnings():
simplefilter("ignore", DeprecationWarning, lineno=8)
from osgeo import gdal, ogr
except ImportError as _error: # pragma: no cover
_OSGEO_PRESENT = False
del _error
else:
_OSGEO_PRESENT = True
ogr.UseExceptions()
class S57ChartHandler:
"""Reads IHO S-57 chart files. The returned geometries are *not* checked for validity.
These chart objects are extracted from the source files:
- Landmasses (from S-57 object type ``LNAM``)
- Depth values (from S-57 object type ``DEPARE``, via attribute ``DRVAL2``, assumed to be in meters)
- Buoys (from S-57 object type ``BOY*``, e.g. ``BOYCAR``)
- Possibly more in the future
The identifiers of the created objects are created deterministically from the chart name and the already
contained identifiers. They are supposed to be unique across all charts. They are created by first
assembling a string that is guaranteed to be a globally unique identifier from the chart file name and the
``LNAM`` field. Then, the string is hashed and truncated to form a 63-bit identifier.
The names of the objects are created like this:
``{chart file name}#{chart-unique alphanumeric identifier} ({human-readable type}): "{common name}"``.
Raises:
ImportError: If the :mod:`osgeo` package is missing
"""
def __init__(self):
if not _OSGEO_PRESENT: # pragma: no cover
raise ImportError(
"Could not import package osgeo. "
"If you woud like to load nautical charts, please install it as described in the README."
)
#: This maps layer names to the corresponding parameters for S57ChartHandler._create_obstacle(...)
#: These are not all possible objects but merely the ones which are trivial to read out.
_SIMPLE_MAPPINGS: Mapping[str, tuple[str, str]] = {
"LNDARE": ("land", "Landmass"),
#
"BOYCAR": ("obstruction", "Buoy (BOYCAR)"),
"BOYINB": ("obstruction", "Buoy (BOYINB)"),
"BOYISD": ("obstruction", "Buoy (BOYISD)"),
"BOYLAT": ("obstruction", "Buoy (BOYLAT)"),
"BOYSAW": ("obstruction", "Buoy (BOYSAW)"),
"BOYSPP": ("obstruction", "Buoy (BOYSPP)"),
#
"OBSTRN": ("obstruction", "Obstruction"),
"OFSPLF": ("obstruction", "Platform"),
"OSPARE": ("obstruction", "Production Area/Wind farm"),
"PILPNT": ("obstruction", "Post"),
# "MIPARE": ("obstruction", "Military Exercise Area"),
# "DMPGRD": ("obstruction", "Dumping Ground"),
"DOCARE": ("obstruction", "Dock Area"),
"DRYDOC": ("obstruction", "Dry Dock"),
"FLODOC": ("obstruction", "Floating Dock"),
"DYKCON": ("obstruction", "Dyke/Levee"),
#
"DWRTCL": ("waterway", "Deep Water Centerline"),
"DWRTPT": ("waterway", "Deep Water Way"),
"FAIRWY": ("waterway", "Fairway"),
#
"HRBARE": ("harbor", "Harbor area (administrative)"),
#
# "TSELNE": ("TSELNE", "TSELNE"),
# "TSEZNE": ("TSEZNE", "TSEZNE"),
# "TSSBND": ("TSSBND", "TSSBND"),
# "TSSLPT": ("TSSLPT", "TSSLPT"),
"RECTRC": ("waterway", "RECTRC"),
"RCTLPT": ("waterway", "RCTLPT"),
"RCRTCL": ("waterway", "RCRTCL"),
#
"ACHARE": ("anchorage", "Anchorage area"),
"ACHBRT": ("anchorage", "Anchor berth (single)"),
}
@staticmethod
def find_chart_files(search_path: str | os.PathLike[str]) -> Generator[Path, None, None]:
for root, _, files in os.walk(search_path, followlinks=True):
for file in files:
if file.endswith(".000"):
# assume it is an IHO S-57 file
yield Path(root) / file
# else: ignore the file
def read_chart_file(self, path: str | os.PathLike[str]) -> Generator[PolarGeometry, None, None]:
"""Reads a chart file and converts the relevant layers/features into ChartObstacles.
Args:
path: The path to the S-57 chart file (e.g., ``something.000``)
Returns:
All relevant obstacles with globally unique and deterministic names
Raises:
FileNotFoundError: If the database file(s) is/are missing
OSError: If the database file(s) cannot be opened for another reason
"""
file_path = str(path)
if not os.path.exists(file_path):
raise FileNotFoundError(f"cannot open dataset: {file_path}")
# open database
dataset = ogr.Open(file_path, gdal.GA_ReadOnly)
if not dataset:
raise OSError(f"cannot open dataset (invalid file): {file_path}")
file_name = os.path.splitext(os.path.basename(file_path))[0]
file_name_bytes = file_name.encode()
# read contents
for i in range(int(dataset.GetLayerCount())):
layer = dataset.GetLayerByIndex(i)
for geometry, feature_id in S57ChartHandler._convert_layer_to_obstacles(layer):
# prepend the name of the file to make it unique and ease lookup of objects in the source
# this is also required because the LNAM field is not guaranteed to be unique across files
geometry.name = f"{file_name}#{geometry.name}"
# hash a combination of file name and feature identifier as that together is globally unique
hashed_id = sha1(file_name_bytes + feature_id.encode()).digest()
# truncate to 64 bit and create an int from it
identifier = int.from_bytes(hashed_id[-8:], sys.byteorder, signed=True)
# cut off the most-significant bit to arrive at 63 bits
geometry.identifier = identifier & 0x7F_FF_FF_FF_FF_FF_FF_FF
yield geometry
@staticmethod
def _convert_layer_to_obstacles(
layer: "ogr.Layer",
) -> Generator[tuple[PolarGeometry, str], None, None]:
"""Converts the relevant obstacles of a layer into polar geometries.
Args:
layer: The layer to search in
Returns:
For each relevant feature in the layer: a polygon and a feature ID (32 bit)
"""
layer_name = layer.GetName()
# we first do the more complicated stuff and then convert using S57ChartHandler.SIMPLE_MAPPINGS
if layer_name == "DEPARE": # "depth area"
for feature in layer:
# Warning: we assume these depths are given in meters, which could be wrong in some cases but
# worked in our tests
depth_max = feature["DRVAL2"]
yield from S57ChartHandler._create_obstacle(feature, f"Depth={depth_max}m", "water")
else:
if layer_name in S57ChartHandler._SIMPLE_MAPPINGS:
location_type, human_readable_type = S57ChartHandler._SIMPLE_MAPPINGS[layer_name]
for feature in layer:
yield from S57ChartHandler._create_obstacle(
feature, human_readable_type, location_type
)
@staticmethod
def _create_obstacle(
feature: "ogr.Feature",
human_readable_type: str,
location_type: str,
) -> Generator[tuple[PolarGeometry, str], None, None]:
"""Creates a point or area obstacle from a given feature.
Args:
feature: The feature to transform
human_readable_type: A human-readable string describing what this is, like ``"landmass"``
location_type: The location type to be used
Returns:
(1) A location or polygon that represents an obstacle
(2) A (not necessarily unique) feature ID (32 bit) for that obstacle; but unique per chart file
"""
# This ID is guaranteed to be unique within the chart file and composed of AGEN, FIDN, and FIDS
# It is not nessesarily unique even within one chart file since we support multi-part geometries.
feature_id: str = feature["LNAM"]
assert feature_id is not None, "the LNAM field is mandatory for all objects"
# Remark: feature.IsFieldSetAndNotNull("OBJNAM") seems to work but logs tons of errors to syserr
# It is not mandatory for all types of chart objects
object_name: str | None
try:
object_name = feature["OBJNAM"] # might be None
except (ValueError, KeyError):
object_name = None
if object_name is None:
object_name = "---"
else:
# Replace broken unicode text (surrogates)
object_name = object_name.encode("utf-8", "replace").decode("utf-8")
# Construct the obstacle's name
name = f'{feature_id} ({human_readable_type}): "{object_name}"'
# Extract the geometries (as the feature may or may not contain a geometry collection)
geometry = feature.GetGeometryRef()
geometry_type = geometry.GetGeometryType()
match geometry_type:
case ogr.wkbPoint:
point = PolarLocation(
latitude=geometry.GetY(),
longitude=geometry.GetX(),
name=name,
location_type=location_type,
)
yield point, feature_id
case ogr.wkbLineString:
points = [
PolarLocation(latitude=lat, longitude=lon) for lon, lat in geometry.GetPoints()
]
yield PolarPolyLine(points, name=name, location_type=location_type), feature_id
case ogr.wkbMultiLineString:
for i in range(geometry.GetGeometryCount()):
points = [
PolarLocation(latitude=lat, longitude=lon)
for lon, lat in geometry.GetGeometryRef(i).GetPoints()
]
yield PolarPolyLine(points, name=name, location_type=location_type), feature_id
case ogr.wkbPolygon:
outer_ring = geometry.GetGeometryRef(0)
outer_points = [
PolarLocation(latitude=lat, longitude=lon)
for lon, lat in outer_ring.GetPoints()
]
inner_rings = [
[PolarLocation(latitude=lat, longitude=lon) for lon, lat in ring.GetPoints()]
for ring in (
geometry.GetGeometryRef(i) for i in range(1, geometry.GetGeometryCount())
)
]
yield (
PolarPolygon(
locations=outer_points,
holes=inner_rings,
name=name,
location_type=location_type,
),
feature_id,
)
case _:
# Apparently, no other geometries appear in charts
raise NotImplementedError(
f"Cannot handle geometry type {ogr.GeometryTypeToName(geometry_type)}"
)
[docs]
class NauticalLoader(SpatialLoader):
def __init__(
self, chart_root: Path, origin: PolarLocation, dimensions: tuple[float, float]
) -> None:
super().__init__(origin, dimensions)
self.chart_root = chart_root
if not chart_root.exists():
raise FileNotFoundError(f"chart_root must exist, but got {chart_root}")
if not chart_root.is_dir():
raise NotADirectoryError(f"chart_root must be a directory, but got {chart_root}")
[docs]
def load(self, feature_description=None, n_jobs: int | None = None) -> None:
handler = S57ChartHandler()
# Everything is in local coordinates, so we can just use a simple bounding box
width, height = self.dimensions
bounding_box = CartesianPolygon.make_centered_box(width, height, origin=self.origin)
with Pool(n_jobs) as pool:
for sublist in pool.imap_unordered(
partial(
process_file, handler=handler, origin=self.origin, bounding_box=bounding_box
),
handler.find_chart_files(self.chart_root),
):
self.features.extend(sublist)
def process_file(
file: Path, handler: S57ChartHandler, origin: PolarLocation, bounding_box: CartesianPolygon
) -> list[PolarGeometry]:
results = []
for geo_object in handler.read_chart_file(file):
geometry = geo_object.to_cartesian(origin=origin)
# Make sure to only return the objects that are within the relevant bounding box
cropped = intersection(bounding_box.geometry, geometry.geometry)
if not cropped.is_empty:
results.extend(
it.to_polar(origin=origin)
for it in _from_shapely(cropped, copy_metadata_from=geometry)
)
return results
def _from_shapely(
shapely_geometry, copy_metadata_from: Geospatial | None = None
) -> Generator[CartesianGeometry, None, None]:
"""Constructs an appropriate geometry from a Shapely geometry object.
Args:
shapely_geometry: The geometry to convert. Supported types are Point, LineString,
Polygon, and MultiPolygon.
copy_metadata_from: The geometry to copy metadata from, or None to not copy metadata.
Returns:
The constructed geometry.
"""
reference = copy_metadata_from or CartesianLocation(0, 0) # for defaults
match shapely_geometry:
case Point(x=x, y=y):
yield CartesianLocation(
east=x,
north=y,
name=reference.name,
location_type=reference.location_type,
identifier=reference.identifier,
)
case LineString(coords=coords):
yield CartesianPolyLine.from_numpy(
array(coords),
name=reference.name,
location_type=reference.location_type,
identifier=reference.identifier,
)
case MultiLineString(geoms=geoms):
for line in geoms:
yield from _from_shapely(line, reference)
case Polygon(exterior=exterior, interiors=interiors):
yield CartesianPolygon.from_numpy(
array(exterior.coords).T,
holes=[array(ring.coords).T for ring in interiors],
name=reference.name,
location_type=reference.location_type,
identifier=reference.identifier,
)
case MultiPolygon(geoms=geoms):
for polygon in geoms:
yield from _from_shapely(polygon, reference)
case _:
raise NotImplementedError(f"Cannot handle geometry type {shapely_geometry.geom_type}")