import pandas as pd
import geopandas as gpd
import networkx as nx
import momepy
from shapely.geometry import Point
import logging
import logging
[documentatie]def create_graph_from_edges(
edges: gpd.GeoDataFrame, directed=True, integer_labels=True
):
if edges.empty:
raise ValueError("GeoDataFrame 'edges' is empty.")
G = momepy.gdf_to_nx(
edges,
approach="primal",
directed=directed,
integer_labels=integer_labels,
length="geometry_len",
)
nodes, edges = momepy.nx_to_gdf(G)
nodes = nodes.drop_duplicates("geometry")
edges = edges.drop_duplicates("geometry")
return nodes, edges, G
[documentatie]def generate_nodes_from_edges(
edges: gpd.GeoDataFrame,
) -> tuple[gpd.GeoDataFrame, gpd.GeoDataFrame]:
"""
Generate start/end nodes from edges and update node information in edges GeoDataFrame.
Return updated edges geodataframe and nodes geodataframe
Parameters
----------
edges : gpd.GeoDataFrame
Line feature dataset containing edges
Returns
-------
Tuple containing GeoDataFrame with edges and GeoDataFrame with nodes
"""
edges["edge_no"] = range(len(edges))
edges.index = edges["edge_no"].values
# Generate nodes from edges and include extra information in edges
edges[["from_node", "to_node"]] = [
[g.coords[0], g.coords[-1]] for g in edges.geometry
] # generate endpoints
_nodes = pd.unique(
edges["from_node"].tolist() + edges["to_node"].tolist()
) # get unique nodes
indexer = dict(zip(_nodes, range(len(_nodes))))
nodes = gpd.GeoDataFrame(
data={"node_no": [indexer[x] for x in _nodes]},
index=[indexer[x] for x in _nodes],
geometry=[Point(x) for x in _nodes],
crs=edges.crs,
)
edges[["from_node", "to_node"]] = edges[["from_node", "to_node"]].map(
indexer.get
) # get node id instead of coords
return edges, nodes
[documentatie]def create_graph_based_on_nodes_edges(
nodes: gpd.GeoDataFrame,
edges: gpd.GeoDataFrame,
directional_graph: bool = True,
add_edge_length_as_weight: bool = False,
print_logmessage: bool = True,
) -> nx.Graph | nx.DiGraph:
"""
create networkx graph based on geographic nodes and edges.
default a directional graph.
"""
if directional_graph:
graph = nx.DiGraph()
else:
graph = nx.Graph()
if nodes is not None:
for i, node in nodes.iterrows():
graph.add_node(node.node_no, pos=(node.geometry.x, node.geometry.y))
if edges is not None:
for i, edge in edges.iterrows():
if add_edge_length_as_weight:
graph.add_edge(
edge.from_node, edge.to_node, weight=edge.geometry.length
)
else:
graph.add_edge(edge.from_node, edge.to_node)
if print_logmessage:
logging.info(
f" - create network graph from nodes ({len(nodes)}x) and edges ({len(edges)}x)"
)
return graph
[documentatie]def add_basin_code_from_network_to_nodes_and_edges(
graph: nx.DiGraph,
nodes: gpd.GeoDataFrame,
edges: gpd.GeoDataFrame,
):
"""add basin (subgraph) code to nodes and edges"""
subgraphs = list(nx.weakly_connected_components(graph))
if nodes is None or edges is None:
return None, None
nodes["basin"] = -1
edges["basin"] = -1
for i, subgraph in enumerate(subgraphs):
node_ids = list(subgraph)
edges.loc[
edges["from_node"].isin(node_ids) & edges["to_node"].isin(node_ids),
"basin",
] = i + 1
nodes.loc[nodes["node_no"].isin(list(subgraph)), "basin"] = i + 1
logging.debug(
f" - define numbers Ribasim-Basins ({len(subgraphs)}x) and join edges/nodes"
)
return nodes, edges