import geopandas as gpd
import pandas as pd
from shapely.geometry import (
Point,
LineString,
MultiLineString,
Polygon,
MultiPolygon,
base,
)
from shapely.ops import split, linemerge, nearest_points
import numpy as np
from typing import TypeVar
import time
import logging
import warnings
import shapely
[documentatie]def shorten_line_two_vertices(line, offset):
# Ensure the geometry is a LineString
if isinstance(line, LineString):
# Calculate the length of the line
length = line.length
# Create new points for the shortened line
start_point = line.interpolate(offset)
end_point = line.interpolate(length - offset)
# Return the new shortened line
return LineString([start_point, end_point])
else:
return line # Return the original line if not a LineString
[documentatie]def line_to_vertices(gdf, distance=10.0, keep_columns=["code", "WaterLineType"]):
all_points = []
for _, row in gdf.iterrows():
geometry = row["geometry"]
# Check if the geometry is a LineString or MultiLineString
if isinstance(geometry, LineString):
lines = [geometry]
elif isinstance(geometry, MultiLineString):
lines = list(geometry.geoms)
else:
continue # Skip if not a recognized geometry typ
for line in lines:
# Add start and end points to the list with 'dangling' label
start_point_x = {
"geometry": Point(line.coords[0][0], line.coords[0][1]),
"line_type": "dangling_start",
"distance_from_start": 0,
}
end_point_x = {
"geometry": Point(line.coords[-1][0], line.coords[-1][1]),
"line_type": "dangling_end",
"distance_from_start": line.length,
}
for point in [start_point_x, end_point_x]:
for col in keep_columns:
point[col] = row[col]
all_points.append(point)
# Generate intermediate points at specified intervals
num_points = int(line.length / distance)
for i in range(1, num_points):
new_distance = line.length / num_points
# Calculate the point at a distance along the line
point = line.interpolate(i * new_distance)
point_2d = Point(point.x, point.y)
# Calculate the cumulative distance from the starting point
cumulative_distance = i * new_distance
point = {
"geometry": point_2d,
"line_type": "other",
"distance_from_start": cumulative_distance,
}
for col in keep_columns:
point[col] = row[col]
all_points.append(point)
# Create a new GeoDataFrame for the vertices
points_gdf = gpd.GeoDataFrame(all_points, crs=gdf.crs)
return points_gdf
[documentatie]def split_waterways_by_endpoints(hydroobjects, relevant_culverts):
end_points = relevant_culverts["geometry"].apply(
lambda line: Point(line.coords[-1])
)
end_points_gdf = relevant_culverts.copy()
end_points_gdf.set_geometry(end_points, inplace=True)
end_points_gdf = remove_z_dims(end_points_gdf)
# Step 1: Convert MultiLineStrings to LineStrings
hydroobjects = hydroobjects.explode("geometry").reset_index(drop=True)
hydroobjects = remove_z_dims(hydroobjects)
# hydroobjects = gpd.GeoDataFrame(hydroobjects, geometry="geometry", crs=28992)
# Create 'NAAM' and 'status' if not present
if "status" not in hydroobjects.columns:
hydroobjects["status"] = "unchanged"
if "NAAM" not in hydroobjects.columns:
hydroobjects["NAAM"] = "unknown"
# Create lists to store start and end points
start_points = []
end_points = []
# Iterate through the lines and extract start and end points
for line in hydroobjects.geometry:
start_points.append(Point(line.coords[0])) # Start point
end_points.append(Point(line.coords[-1]))
# If you want to keep them as separate rows instead of columns, you can do this:
start_points_gdf = gpd.GeoDataFrame(geometry=start_points, crs="EPSG:28992")
end_points_gdf_ho = gpd.GeoDataFrame(geometry=end_points, crs="EPSG:28992")
# Combine them into a single GeoDataFrame if desired
combined_points_gdf = gpd.GeoDataFrame(
pd.concat([start_points_gdf, end_points_gdf_ho], ignore_index=True),
crs="EPSG:28992",
)
combined_end_points_gdf = gpd.GeoDataFrame(
pd.concat([combined_points_gdf, end_points_gdf], ignore_index=True),
crs="EPSG:28992",
)
combined_end_points_gdf = combined_end_points_gdf.drop_duplicates(subset="geometry")
def split_lines_at_points(gdf_lines, gdf_points):
# Create list of coordinates of linestring in gdf_lines
gdf_lines["coords"] = gdf_lines.apply(lambda x: list(x.geometry.coords), axis=1)
# Create segments with coords
gdf_lines["segments"] = gdf_lines.apply(
lambda x: [
LineString(x["coords"][i - 1 : i + 1])
for i in range(1, len(x["coords"]))
],
axis=1,
)
# Create gdf from segments of linestring
gdf_segments = gpd.GeoDataFrame(
gdf_lines[["code", "segments", "NAAM", "status"]]
.explode("segments")
.rename(columns={"segments": "geometry"}),
geometry="geometry",
crs=gdf_lines.crs,
).reset_index(drop=True)
# Determine which segment point of points_gdf are located on
gdf_segments = gdf_segments.merge(
gdf_points[["geometry"]]
.sjoin_nearest(gdf_segments.drop(columns="code"))
.groupby("index_right")
.agg(
{
"geometry": lambda x: list(x.apply(lambda geom: (geom.x, geom.y))),
}
)
.reset_index(),
how="outer",
left_index=True,
right_on="index_right",
suffixes=("", "_2"),
).reset_index(drop=True)
def split_linestring(linestring, coordinates):
# Convert coordinates to Points
points = [Point(coord) for coord in coordinates]
# Sort points by their position on the LineString
points = sorted(points, key=lambda point: linestring.project(point))
segments = []
prev_point = Point(linestring.coords[0])
for point in points:
segment = LineString([prev_point, point])
segments.append(segment)
prev_point = point
# Add the last segment
segments.append(LineString([prev_point, Point(linestring.coords[-1])]))
return segments
def split_and_explode(row):
if not row["geometry_2"] or isinstance(row["geometry_2"], float):
return [row["geometry"]]
else:
return split_linestring(row["geometry"], row["geometry_2"])
gdf_segments["geometry"] = gdf_segments.apply(split_and_explode, axis=1)
# Explode the list of geometries into separate rows
gdf_segments = gdf_segments.explode("geometry").reset_index(drop=True)
# Determine which segments are located between the same points of points_gdf
gdf_segments["index_right_diff"] = gdf_segments["index_right"].diff(1)
gdf_segments["index_right_cumsum"] = gdf_segments["index_right_diff"].map(
{0.0: 1, 1.0: 0}
)
gdf_segments.loc[0, "index_right_cumsum"] = 0.0
gdf_segments["index_right_cumsum"] = (
gdf_segments["index_right_cumsum"].cumsum().astype(int)
)
# Group segments that are between points, creating a list of geometries in geometry
gdf_segments = (
gdf_segments.groupby(["index_right_cumsum", "code"])
.agg(
{
"code": "first",
"geometry": list,
"NAAM": "first",
"status": "first",
}
)
.reset_index(drop=True)
)
# Remove segments with a length of 0 (created points that are already between 2 linestrings of gdf_lines)
gdf_segments["geometry_len"] = gdf_segments.apply(
lambda x: sum([y.length for y in x["geometry"]]), axis=1
)
gdf_segments = gdf_segments[gdf_segments["geometry_len"] > 0.0]
# Merge the geometries of the segments that are listed in geometry
gdf_segments["geometry"] = gdf_segments.apply(
lambda x: linemerge(x["geometry"]), axis=1
)
# Create gdf from
gdf_segments = gpd.GeoDataFrame(
gdf_segments.reset_index(drop=True), geometry="geometry", crs=gdf_lines.crs
)
# Function to add suffixes to duplicate codes
def add_suffixes(df, column):
counts = df[column].value_counts()
suffixes = {code: 0 for code in counts.index if counts[code] > 1}
def suffix_code(code):
if code in suffixes:
suffix = suffixes[code]
suffixes[code] += 1
df.loc[df[column] == code, "status"] = "split"
return f"{code}-{suffix}"
return code
df[column] = df[column].apply(suffix_code)
return df
# Apply the function to your GeoDataFrame
gdf_segments = add_suffixes(gdf_segments, "code")
return gdf_segments
result = split_lines_at_points(hydroobjects, combined_end_points_gdf)
return result
# Function to check if line endpoints need to be flipped
[documentatie]def check_and_flip(line, start_points, end_points):
start_point = line.coords[0]
end_point = line.coords[-1]
# Check if start and end points are in the correct positions
if start_point in end_points and end_point in start_points:
return line, False
else:
return LineString(line.coords[::-1]), True
[documentatie]def _remove_holes(geom, min_area):
def p(p: Polygon, min_area) -> Polygon:
holes = [i for i in p.interiors if not Polygon(i).area > min_area]
return Polygon(shell=p.exterior, holes=holes)
def mp(mp: MultiPolygon, min_area) -> MultiPolygon:
return MultiPolygon([p(i, min_area) for i in mp.geoms])
if isinstance(geom, Polygon):
return p(geom, min_area)
elif isinstance(geom, MultiPolygon):
return mp(geom, min_area)
else:
return geom
_Geom = TypeVar("_Geom", Polygon, MultiPolygon, gpd.GeoSeries, gpd.GeoDataFrame)
[documentatie]def remove_holes_from_polygons(geom: _Geom, min_area: float) -> _Geom:
"""Remove all holes from a geometry that satisfy the filter function."""
if isinstance(geom, gpd.GeoSeries):
return geom.apply(_remove_holes, min_area=min_area)
elif isinstance(geom, gpd.GeoDataFrame):
geom = geom.copy()
geom["geometry"] = remove_holes_from_polygons(
geom["geometry"], min_area=min_area
)
return geom
return _remove_holes(geom, min_area=min_area)
[documentatie]def remove_holes_from_basin_areas(basin_areas: gpd.GeoDataFrame, min_area: float):
print(f" - remove holes within basin areas with less than {min_area/10000.0:.2f}ha")
return remove_holes_from_polygons(geom=basin_areas, min_area=min_area)
[documentatie]def calculate_angle(line, direction):
if direction == "upstream":
coords = list(line.coords)
p1, p2 = coords[0], coords[1] # First segment
elif direction == "downstream":
coords = list(line.coords)
p1, p2 = coords[-2], coords[-1] # Last segment
else:
raise ValueError("Direction must be 'upstream' or 'downstream'")
angle = np.arctan2(p2[0] - p1[0], p2[1] - p1[1]) # Angle in radians
angle_degrees = np.degrees(angle)
return angle_degrees
[documentatie]def calculate_angle_reverse(line):
coords = list(line.coords)
p1, p2 = coords[1], coords[0]
angle = np.arctan2(p2[1] - p1[1], p2[0] - p1[0]) # Angle in radians
angle_degrees = np.degrees(angle)
angle_degrees = angle_degrees % 360
return angle_degrees
[documentatie]def calculate_angle_start(line):
coords = list(line.coords)
p1, p2 = coords[0], coords[1]
angle = np.arctan2(p2[1] - p1[1], p2[0] - p1[0]) # Angle in radians
angle_degrees = np.degrees(angle)
angle_degrees = angle_degrees % 360
return angle_degrees
[documentatie]def calculate_angle_end(line):
coords = list(line.coords)
p1, p2 = coords[-2], coords[-1]
angle = np.arctan2(p2[1] - p1[1], p2[0] - p1[0]) # Angle in radians
angle_degrees = np.degrees(angle)
angle_degrees = angle_degrees % 360
return angle_degrees
[documentatie]def calculate_angle_difference(angle1, angle2):
diff = abs(angle1 % 360 - angle2 % 360)
if diff > 180:
diff = 360 - diff
return diff
[documentatie]def find_edge_smallest_angle_difference(reference_angle, angles, edge_codes):
if reference_angle is None:
return [None for a in angles], None
reference_angle = float(reference_angle)
angle_differences = np.array(
[calculate_angle_difference(angle, reference_angle) for angle in angles]
)
min_index = np.argmin(angle_differences)
return angle_differences, edge_codes[min_index]
[documentatie]def remove_z_dims(_gdf: gpd.GeoDataFrame):
_gdf.geometry = [
(Point(g.coords[0][:2]) if len(g.coords[0]) > 2 else Point(g.coords[0]))
if isinstance(g, Point)
else (
(LineString([c[:2] if len(c) > 2 else c for c in g.coords]))
if isinstance(g, LineString)
else Polygon([c[:2] if len(c) > 2 else c for c in g.exterior.coords])
)
for g in _gdf.geometry.values
]
return _gdf
[documentatie]def get_endpoints_from_lines(lines: gpd.GeoDataFrame) -> gpd.GeoDataFrame:
"""
Extract all unique endpoints of line features from vector data
Args:
lines (gpd.GeoDataFrame): GeoDataFrame containing line features
Returns:
gpd.GeoDataFrame: GeoDataFrame containing all unique endpoints from
line features
"""
lines[["startpoint", "endpoint"]] = lines["geometry"].apply(
lambda x: pd.Series([x.coords[0], x.coords[-1]])
)
endpoints = pd.unique(lines[["startpoint", "endpoint"]].values.ravel("K"))
endpoints = gpd.GeoDataFrame({"coordinates": endpoints})
endpoints["starting_lines"] = endpoints["coordinates"].apply(
lambda x: lines["code"][lines["startpoint"] == x].values
)
endpoints["ending_lines"] = endpoints["coordinates"].apply(
lambda x: lines["code"][lines["endpoint"] == x].values
)
endpoints["starting_line_count"] = endpoints.apply(
lambda x: len(list(x["starting_lines"])), axis=1
)
endpoints["ending_line_count"] = endpoints.apply(
lambda x: len(list(x["ending_lines"])), axis=1
)
endpoints["connected_line_count"] = endpoints.apply(
lambda x: x["starting_line_count"] + x["ending_line_count"], axis=1
)
endpoints_geometry = endpoints.coordinates.apply(lambda x: Point(x))
endpoints = endpoints.set_geometry(endpoints_geometry)
return endpoints
[documentatie]def snap_unconnected_endpoints_to_endpoint_or_line(
hydroobjecten, snapping_distance=0.05
):
hydroobjecten = hydroobjecten.explode()
endpoints = get_endpoints_from_lines(hydroobjecten)
endpoints["ID"] = endpoints.index
endpoints = endpoints[
(endpoints["starting_line_count"] == 0) | (endpoints["ending_line_count"] == 0)
]
endpoints = endpoints.rename(columns={"coordinates": "geometry"})
endpoints = gpd.GeoDataFrame(endpoints, geometry="geometry", crs=28992)
# Create buffer and copy of original_geometry
original_geometry = endpoints[["ID", "geometry"]].copy()
endpoints["geometry"] = endpoints.buffer(snapping_distance)
endpoints = endpoints.to_crs(28992)
original_geometry = original_geometry.to_crs(28992)
# Perform a spatial join to find endpoints within the buffer
joined_points = gpd.sjoin(
original_geometry, endpoints, how="inner", predicate="intersects"
)
# Filter out the endpoints with the same ID
unconnected_endpoints_points = joined_points[
joined_points["ID_left"] != joined_points["ID_right"]
]
merged_df = unconnected_endpoints_points.merge(
unconnected_endpoints_points,
left_on="ID_left",
right_on="ID_right",
suffixes=("_left", "_right"),
)
# Explode the DataFrame to handle list elements individually
exploded_df = merged_df.explode("ending_lines_left")
# Filtering out rows where 'ending_lines_left' is nan
exploded_df = exploded_df[exploded_df["ending_lines_left"].notna()]
point_df = (
exploded_df[["ending_lines_left", "geometry_left"]]
.dropna()
.reset_index(drop=True)
)
point_df.columns = ["code", "geometry_left"]
def replace_last_coordinate(row, point_df):
if row["code"] in point_df["code"].values:
new_point = point_df.loc[
point_df["code"] == row["code"], "geometry_left"
].values[0]
line = row["geometry"]
new_coords = list(line.coords[:-1]) + [new_point.coords[0]]
return LineString(new_coords), "snapped"
return row["geometry"], "unchanged"
hydroobjecten[["geometry", "status"]] = hydroobjecten.apply(
lambda row: replace_last_coordinate(row, point_df), axis=1, result_type="expand"
)
joined_lines = gpd.sjoin(
endpoints, hydroobjecten, how="left", predicate="intersects"
)
joined_lines = joined_lines[
joined_lines.apply(
lambda x: x["code"] not in x["starting_lines"]
and x["code"] not in x["ending_lines"],
axis=1,
)
]
# Merge the DataFrames on the 'ID' column to bring the correct geometry back
joined_lines = joined_lines.merge(
original_geometry[["ID", "geometry"]], on="ID", suffixes=("", "_original")
)
# Replace the geometry column in joined_lines with the one from original_geometry
joined_lines["geometry"] = joined_lines["geometry_original"]
# Drop the now redundant 'geometry_original' column
joined_lines.drop(columns=["geometry_original"], inplace=True)
joined_lines["line_key"] = joined_lines.apply(
lambda row: str(
sorted(
row["starting_lines"] if isinstance(row["starting_lines"], list) else []
)
)
+ "-"
+ str(
sorted(row["ending_lines"] if isinstance(row["ending_lines"], list) else [])
),
axis=1,
)
unconnected_endpoints_points["point_key"] = unconnected_endpoints_points.apply(
lambda row: str(
sorted(
row["starting_lines"] if isinstance(row["starting_lines"], list) else []
)
)
+ "-"
+ str(
sorted(row["ending_lines"] if isinstance(row["ending_lines"], list) else [])
),
axis=1,
)
filtered_joined_lines = joined_lines[
~joined_lines["line_key"].isin(unconnected_endpoints_points["point_key"])
]
def project_point_onto_line(point, line):
if point is None or line is None:
return None # Handle None values gracefully
# Use shapely's nearest_points to find the closest point on the line
projected_point = nearest_points(point, line)[1]
return projected_point
# Create a new column for the projected geometries
filtered_joined_lines["projected_geometry"] = filtered_joined_lines.apply(
lambda row: project_point_onto_line(
row["geometry"]
if isinstance(row["geometry"], (shapely.geometry.base.BaseGeometry, list))
else None,
hydroobjecten.loc[hydroobjecten["code"] == row["code"], "geometry"].values[
0
]
if not pd.isna(row["code"])
and not hydroobjecten.loc[
hydroobjecten["code"] == row["code"], "geometry"
].empty
else None,
),
axis=1,
)
# Explode the lists in starting_lines and ending_lines
filtered_joined_lines_exploded = filtered_joined_lines.explode(
"starting_lines"
).explode("ending_lines")
# Create a new DataFrame from filtered_joined_lines_exploded
point_df_line = filtered_joined_lines_exploded[
["starting_lines", "ending_lines", "projected_geometry"]
]
def replace_coordinate_with_projection(row, point_df):
if (
row["code"] in point_df["starting_lines"].values
or row["code"] in point_df["ending_lines"].values
):
new_point = point_df.loc[
(point_df["starting_lines"] == row["code"])
| (point_df["ending_lines"] == row["code"]),
"projected_geometry",
].values[0]
line = row["geometry"]
if row["code"] in point_df["starting_lines"].values:
# Replace the first coordinate
new_coords = [new_point.coords[0]] + list(line.coords[1:])
elif row["code"] in point_df["ending_lines"].values:
# Replace the last coordinate
new_coords = list(line.coords[:-1]) + [new_point.coords[0]]
return LineString(new_coords), "snapped"
return row["geometry"], row["status"]
hydroobjecten[["geometry", "status"]] = hydroobjecten.apply(
lambda row: replace_coordinate_with_projection(row, point_df_line),
axis=1,
result_type="expand",
)
return hydroobjecten
[documentatie]def check_duplicate_codes(gdf, column):
temp_column = gdf[column].copy()
duplicates = gdf[column][gdf[column].duplicated(keep=False)]
suffix_dict = {}
for idx, value in duplicates.items():
if value not in suffix_dict:
suffix_dict[value] = 0
else:
suffix_dict[value] += 1
temp_column[idx] = f"{value}-{suffix_dict[value]}"
gdf[column] = temp_column
return gdf