import geopandas as gpd
import pandas as pd
import numpy as np
import networkx as nx
import logging
from ..utils.general_functions import calculate_angle_difference, calculate_angle
[documentatie]def find_predecessors_graph(
from_node_ids: np.array,
to_node_ids: np.array,
edge_ids: np.array,
edge_id: int,
border_node_ids: np.array = None,
pred_nodes: list = [],
pred_edges: list = [],
):
"""
Find predecessors within graph for specified node_id.
Note: recursive function!
"""
node_id = to_node_ids[np.where(edge_ids == edge_id)]
from_node = from_node_ids[np.where(edge_ids == edge_id)]
pred_node = from_node_ids[np.where(to_node_ids == from_node)]
pred_edge = edge_ids[np.where(to_node_ids == from_node)]
for i in range(pred_node.shape[0]):
p = pred_node[i]
e = pred_edge[i]
if e not in pred_edges:
pred_edges = pred_edges + [e]
if p not in pred_nodes:
pred_nodes = pred_nodes + [int(p)]
if border_node_ids is None or p not in border_node_ids:
pred_nodes, pred_edges = find_predecessors_graph(
from_node_ids,
to_node_ids,
edge_ids,
e,
border_node_ids,
pred_nodes,
pred_edges,
)
return pred_nodes, pred_edges
[documentatie]def find_predecessors_graph_with_splits(
from_node_ids,
to_node_ids,
edge_ids,
edge_id,
border_node_ids=None,
split_node_edge_ids=None,
split_node_edge_ids2=None,
pred_nodes=[],
pred_edges=[],
new_outflow_edges=[],
):
"""
Find predecessors within graph for specified node_id.
Note: recursive function!
"""
node_id = to_node_ids[np.where(edge_ids == edge_id)][0]
from_node = from_node_ids[np.where(edge_ids == edge_id)][0]
pred_node = from_node_ids[np.where(to_node_ids == from_node)]
pred_edge = edge_ids[np.where(to_node_ids == from_node)]
for i in range(pred_node.shape[0]):
p = pred_node[i]
e = pred_edge[i]
# if edge_id in ["WL_88", "WL_86", "WL_284"]: #"WL_269", "WL_106618-0", "WL_271-8"]:
# logging.debug("xxxxxxxxxxxx")
# logging.debug(edge_id)
# logging.debug(from_node)
# logging.debug(pred_node)
# logging.debug(pred_edge)
# logging.debug(split_node_edge_ids2)
# logging.debug("xxxxxxxxxxxx")
if (
split_node_edge_ids2 is not None
and from_node in split_node_edge_ids2
and split_node_edge_ids2[from_node] != e
):
# logging.debug(e)
new_outflow_edges = new_outflow_edges + [e]
continue
if e in pred_edges:
continue
pred_edges = pred_edges + [e]
pred_nodes = pred_nodes + [int(p)]
if border_node_ids is not None and p in border_node_ids:
continue
if (
split_node_edge_ids is None
or p not in split_node_edge_ids
or split_node_edge_ids[p] == e
):
pred_nodes, pred_edges, new_outflow_edges = (
find_predecessors_graph_with_splits(
from_node_ids,
to_node_ids,
edge_ids,
e,
border_node_ids,
split_node_edge_ids,
split_node_edge_ids2,
pred_nodes,
pred_edges,
new_outflow_edges,
)
)
return pred_nodes, pred_edges, new_outflow_edges
[documentatie]def accumulate_values_graph(
from_node_ids,
to_node_ids,
node_ids,
edge_ids,
values_node_ids,
values_nodes,
values_edge_ids,
values_edges,
border_node_ids=None,
itself=False,
direction="upstream",
decimals=None,
):
"""Calculate for all node_ids the accumulated values of all predecessors with values."""
len_node_ids = np.shape(node_ids)[0]
results_nodes = np.zeros(np.shape(node_ids))
results_edges = np.zeros(np.shape(node_ids))
logging.info(f"accumulate values using graph for {len(node_ids)} node(s)")
for i in range(node_ids.shape[0]):
print(f" * {i+1}/{len_node_ids} ({(i+1)/len(node_ids):.2%})", end="\r")
node_id = node_ids[i]
if direction == "upstream":
pred_nodes, pred_edges = find_predecessors_graph(
from_node_ids,
to_node_ids,
edge_ids,
node_id,
border_node_ids,
np.array([]),
)
else:
pred_nodes, pred_edges = find_predecessors_graph(
to_node_ids,
from_node_ids,
edge_ids,
node_id,
border_node_ids,
np.array([]),
)
if itself:
pred_nodes = np.append(pred_nodes, node_id)
pred_nodes_sum = np.sum(
values_nodes[np.searchsorted(values_node_ids, pred_nodes)]
)
pred_edges_sum = np.sum(
values_edges[np.searchsorted(values_edge_ids, pred_edges)]
)
if decimals is None:
results_nodes[i] = pred_nodes_sum
results_edges[i] = pred_edges_sum
else:
results_nodes[i] = np.round(pred_nodes_sum, decimals=decimals)
results_edges[i] = np.round(pred_edges_sum, decimals=decimals)
return results_nodes, results_edges
[documentatie]def find_node_edge_ids_in_directed_graph(
from_node_ids,
to_node_ids,
edge_ids,
outflow_edge_ids,
search_node_ids,
search_edge_ids,
border_node_ids=None,
direction="upstream",
split_points=None,
set_logging=False,
order_first=False,
new_order_at_splits=False,
):
results_nodes = [
[
int(to_node_ids[np.where(edge_ids == e)][0]),
int(from_node_ids[np.where(edge_ids == e)][0]),
]
for e in outflow_edge_ids
]
results_edges = [[e] if e in search_edge_ids else [] for e in outflow_edge_ids]
if set_logging:
logging.debug(
f" - find {direction} nodes/edges for {len(outflow_edge_ids)}/{len(search_edge_ids)} nodes"
)
search_direction = "upstream" if direction == "downstream" else "downstream"
opposite_direction = "downstream" if direction == "downstream" else "upstream"
if isinstance(outflow_edge_ids, list):
outflow_edge_ids = np.array(outflow_edge_ids)
new_outflow_edges = []
if split_points is None:
for i in range(outflow_edge_ids.shape[0]):
# print(f" * {i+1}/{len_outflow_edge_ids} ({(i+1)/len(outflow_edge_ids):.2%})")
edge_id = outflow_edge_ids[i]
if direction == "upstream":
pred_nodes, pred_edges = find_predecessors_graph(
from_node_ids, to_node_ids, edge_ids, edge_id, border_node_ids, []
)
else:
pred_nodes, pred_edges = find_predecessors_graph(
to_node_ids, from_node_ids, edge_ids, edge_id, border_node_ids, []
)
results_nodes[i] = results_nodes[i] + [
p for p in pred_nodes if p in search_node_ids
]
results_edges[i] = results_edges[i] + [
p for p in pred_edges if p in search_edge_ids
]
else:
split_node_edge_ids = split_points.set_index("nodeID")[
f"selected_{search_direction}_edge"
].to_dict()
split_node_edge_ids2 = split_points.set_index("nodeID")[
f"selected_{opposite_direction}_edge"
].to_dict()
if not new_order_at_splits:
split_node_edge_ids = {
k: v for k, v in split_node_edge_ids.items() if v not in [None, ""]
}
split_node_edge_ids2 = {
k: v for k, v in split_node_edge_ids2.items() if v not in [None, ""]
}
for i in range(outflow_edge_ids.shape[0]):
# print(f" * {i+1}/{len_outflow_node_ids} ({(i+1)/len_outflow_node_ids:.2%})", end="\r")
edge_id = outflow_edge_ids[i]
if direction == "upstream":
pred_nodes, pred_edges, new_outflow_edges = (
find_predecessors_graph_with_splits(
from_node_ids,
to_node_ids,
edge_ids,
edge_id,
border_node_ids,
split_node_edge_ids if not order_first else None,
split_node_edge_ids2,
[],
[],
)
)
else:
pred_nodes, pred_edges, new_outflow_edges = (
find_predecessors_graph_with_splits(
to_node_ids,
from_node_ids,
edge_ids,
edge_id,
border_node_ids,
split_node_edge_ids2,
split_node_edge_ids if not order_first else None,
[],
[],
)
)
results_nodes[i] = results_nodes[i] + [
p for p in pred_nodes if p in search_node_ids
]
results_edges[i] = results_edges[i] + [
p for p in pred_edges if p in search_edge_ids
]
return results_nodes, results_edges, new_outflow_edges
[documentatie]def find_nodes_edges_for_direction(
nodes: gpd.GeoDataFrame,
edges: gpd.GeoDataFrame,
outflow_edge_ids: list,
border_node_ids: list = None,
direction: str = "upstream",
split_points: gpd.GeoDataFrame = None,
order_first: bool = False,
):
nodes_direction, edges_direction, new_outflow_edges = (
find_node_edge_ids_in_directed_graph(
from_node_ids=edges.node_start.to_numpy(),
to_node_ids=edges.node_end.to_numpy(),
edge_ids=edges.code.to_numpy(),
outflow_edge_ids=outflow_edge_ids,
search_node_ids=nodes.nodeID.to_numpy(),
search_edge_ids=edges.code.to_numpy(),
border_node_ids=border_node_ids,
direction=direction,
split_points=split_points,
order_first=order_first,
)
)
for edge_id, node_direction, edge_direction in zip(
outflow_edge_ids, nodes_direction, edges_direction
):
nodes[f"{direction}_edge_{edge_id}"] = False
nodes.loc[
nodes["nodeID"].isin(node_direction), f"{direction}_edge_{edge_id}"
] = True
edges[f"{direction}_edge_{edge_id}"] = False
edges.loc[edges["code"].isin(edge_direction), f"{direction}_edge_{edge_id}"] = (
True
)
return nodes, edges, new_outflow_edges
[documentatie]def calculate_angles_of_edges_at_nodes(
nodes: gpd.GeoDataFrame,
edges: gpd.GeoDataFrame,
nodes_id_column: str = "nodeID",
):
"""Calculates the angles of the upstream and downstream edges for each node.
Returns
-------
self.nodes: gpd.GeoDataFrame
Geodataframe containing nodes between waterlines, including upstream and downstream edges and their angles
"""
edges["upstream_angle"] = edges["geometry"].apply(
lambda x: calculate_angle(x, "upstream").round(2)
)
edges["downstream_angle"] = edges["geometry"].apply(
lambda x: calculate_angle(x, "downstream").round(2)
)
def group_angles(x):
if isinstance(x, float):
if np.isnan(x):
return ""
else:
return str(x)
elif (isinstance(x[0], float) and not np.isnan(x[0])):
return ",".join([str(a) for a in x])
else:
return ""
for direction, opp_direction in zip(
["upstream", "downstream"], ["downstream", "upstream"]
):
node_end = "node_end" if direction == "upstream" else "node_start"
temp = nodes.merge(
edges[[node_end, f"{opp_direction}_angle"]].rename(
columns={node_end: nodes_id_column}
),
how="left",
on=nodes_id_column,
)
temp = temp.groupby(nodes_id_column)
temp = temp.agg({f"{opp_direction}_angle": list})
nodes[f"{direction}_angles"] = temp
nodes[f"{direction}_angles"] = nodes[f"{direction}_angles"].apply(
lambda x: group_angles(x)
)
return nodes, edges
[documentatie]def calculate_discharges_of_edges_at_nodes(
nodes: gpd.GeoDataFrame,
edges: gpd.GeoDataFrame,
nodes_id_column: str = "nodeID",
):
"""Calculates the angles of the upstream and downstream edges for each node.
Returns
-------
self.nodes: gpd.GeoDataFrame
Geodataframe containing nodes between waterlines, including upstream and downstream edges and their angles
"""
if "total_specific_discharge" not in edges.columns:
return nodes, edges
for direction, opp_direction in zip(
["upstream", "downstream"], ["downstream", "upstream"]
):
node_end = "node_end" if direction == "upstream" else "node_start"
temp = nodes.merge(
edges[[node_end, "total_specific_discharge"]].rename(
columns={node_end: nodes_id_column, "total_specific_discharge": f"{direction}_discharge"}
),
how="left",
on=nodes_id_column,
)
temp = temp.groupby(nodes_id_column).agg({f"{direction}_discharge": list})
temp[f"{direction}_discharge"] = temp[f"{direction}_discharge"].apply(
lambda x: ",".join([f"{a:.3f}" for a in x if ~np.isnan(a)])
)
nodes[f"{direction}_discharges"] = temp[f"{direction}_discharge"]
return nodes, edges
[documentatie]def select_downstream_upstream_edges_angle(nodes, min_difference_angle: float = 20.0):
"""select the upstream or downstream edge that represents the main channel, based on the smallest angle. When the angle of both edges is too large, no edge is selected.
Parameters
----------
min_difference_angle : str, optional
minimum , by default 20.0
Returns
-------
gpd.GeoDataFrame: self.nodes
Geodataframe containing nodes between waterlines, including the selected upstream and downstream angles
"""
def select_downstream_upstream_edges_angle_per_node(x, min_difference_angle: float = 20.0):
upstream_edges = [
a for a in x["upstream_edges"].split(",") if a != ""
]
downstream_edges = [
a for a in x["downstream_edges"].split(",") if a != ""
]
upstream_angles = [
float(a) for a in x["upstream_angles"].split(",") if a != ""
]
downstream_angles = [
float(a) for a in x["downstream_angles"].split(",") if a != ""
]
angle_differences = [
[round(abs(au - ad), 2) for ad in downstream_angles]
for au in upstream_angles
]
smallest_angle1 = None
smallest_angle2 = None
selected_upstream_edge = None
selected_downstream_edge = None
iteration = 0
for upstream_edge, upstream_angle_differences in zip(
upstream_edges, angle_differences
):
for downstream_edge, angle_difference in zip(
downstream_edges, upstream_angle_differences
):
iteration = iteration + 1
if smallest_angle1 is None or angle_difference < smallest_angle1:
smallest_angle2 = smallest_angle1
smallest_angle1 = angle_difference
selected_upstream_edge = upstream_edge
selected_downstream_edge = downstream_edge
elif smallest_angle2 is None or angle_difference < smallest_angle2:
smallest_angle2 = angle_difference
x["selected_upstream_edge"] = None
x["selected_downstream_edge"] = None
if (
smallest_angle2 is None
or smallest_angle1 < smallest_angle2 - min_difference_angle
):
x["selected_upstream_edge"] = selected_upstream_edge
x["selected_downstream_edge"] = selected_downstream_edge
if x["no_downstream_edges"] == 1:
x["selected_downstream_edge"] = downstream_edges[0]
if x["no_upstream_edges"] == 1:
x["selected_upstream_edge"] = upstream_edges[0]
return x
nodes = nodes.apply(
lambda x: select_downstream_upstream_edges_angle_per_node(x, min_difference_angle),
axis=1,
)
return nodes
[documentatie]def select_downstream_upstream_edges_discharge(nodes, min_difference_discharge_factor: float = 2.0):
"""select the upstream or downstream edge that represents the main channel, based on the smallest angle. When the angle of both edges is too large, no edge is selected.
Parameters
----------
min_difference_angle : str, optional
minimum , by default 20.0
Returns
-------
gpd.GeoDataFrame: self.nodes
Geodataframe containing nodes between waterlines, including the selected upstream and downstream angles
"""
def select_downstream_upstream_edges_discharge_per_node(x, min_difference_discharge_factor: float = 2.0):
upstream_edges = [
a for a in x["upstream_edges"].split(",") if a != ""
]
downstream_edges = [
a for a in x["downstream_edges"].split(",") if a != ""
]
upstream_discharges = [
float(a) for a in x["upstream_discharges"].split(",") if a != ""
]
downstream_discharges = [
float(a) for a in x["downstream_discharges"].split(",") if a != ""
]
x["selected_upstream_edge"] = None
x["selected_downstream_edge"] = None
if len(upstream_discharges) == 1:
x["selected_upstream_edge"] = upstream_edges[0]
elif len(upstream_discharges) > 1:
max_upstream_discharge = max(upstream_discharges)
upstream_discharges_sort = sorted(upstream_discharges, reverse=True)
if upstream_discharges_sort[0] >= upstream_discharges_sort[1] * min_difference_discharge_factor:
index_max = upstream_discharges.index(max_upstream_discharge)
x["selected_upstream_edge"] = upstream_edges[index_max]
if len(downstream_discharges) == 1:
x["selected_downstream_edge"] = downstream_edges[0]
elif len(downstream_discharges) > 1:
max_downstream_discharge = max(downstream_discharges)
downstream_discharges_sort = sorted(downstream_discharges, reverse=True)
if downstream_discharges_sort[0] >= downstream_discharges_sort[1] * min_difference_discharge_factor:
index_max = downstream_discharges.index(max_downstream_discharge)
x["selected_downstream_edge"] = downstream_edges[index_max]
return x
nodes = nodes.apply(
lambda x: select_downstream_upstream_edges_discharge_per_node(x, min_difference_discharge_factor),
axis=1,
)
return nodes
[documentatie]def define_list_upstream_downstream_edges_ids(
node_ids: np.array,
nodes: gpd.GeoDataFrame,
edges: gpd.GeoDataFrame,
nodes_id_column: str = "nodeID",
edges_id_column: str = "code",
):
"""Get the upstream and downstream edges for each node.
Returns
-------
self.nodes: gpd.GeoDataFrame
Geodataframe containing nodes between waterlines, including upstream and downstream edges
"""
logging.info(" x find connected edges for nodes")
nodes_sel = nodes[nodes.nodeID.isin(node_ids)].copy()
nodes_sel.index = nodes_sel[nodes_id_column].values
if any(edges[edges_id_column].apply(lambda x: x is None)):
none_code_edges = edges[edges[edges_id_column].isna()]
logging.info(f" - edges found without code: {len(none_code_edges)}")
edges = edges[edges[edges_id_column].notna()]
for direction in ["upstream", "downstream"]:
node_end = "node_end" if direction == "upstream" else "node_start"
direction_edges = nodes_sel.merge(
edges[[node_end, "code"]],
how="left",
left_on=nodes_id_column,
right_on=node_end,
)
nodes_sel[f"{direction}_edges"] = direction_edges.groupby(nodes_id_column).agg(
{edges_id_column: list}
)
def len_edges(x):
if ~(isinstance(x[0], float) and np.isnan(x[0])):
return len(x)
else:
return 0
nodes_sel[f"no_{direction}_edges"] = nodes_sel[f"{direction}_edges"].apply(
lambda x: len_edges(x)
)
def list_to_str(x):
if (~(isinstance(x[0], float) and np.isnan(x[0])) and x[0] is not None):
return ",".join(x)
else:
return ",".join([])
nodes_sel[f"{direction}_edges"] = nodes_sel[f"{direction}_edges"].apply(
lambda x: list_to_str(x)
)
nodes_sel = nodes_sel.reset_index(drop=True)
return nodes_sel
[documentatie]def get_start_edges_nodes(edges, nodes, direction="downstream"):
""" Function to get the starting edges of the graph.
This function identifies nodes that do not have"""
if direction == "downstream":
node_end = "node_end"
node_start = "node_start"
else:
node_end = "node_start"
node_start = "node_end"
# any incoming edges (i.e., nodes that are not the end of any edge)
nodes_end_ids = edges[node_end].unique()
start_nodes = nodes[~nodes.index.isin(nodes_end_ids)]
other_nodes = nodes[nodes.index.isin(nodes_end_ids)]
# Identify edges that start from the identified start nodes
nodes_start_ids = start_nodes.index
start_edges = edges[edges[node_start].isin(nodes_start_ids)]
other_edges = edges[~edges[node_start].isin(nodes_start_ids)]
return start_nodes, other_nodes, start_edges, other_edges
[documentatie]def sum_edge_node_values_through_network(
edges,
nodes,
edges_nodes="edges",
edges_id_column="code",
nodes_id_column="nodeID",
direction="downstream",
column_to_sum="specific_discharge",
sum_column="total_specific_discharge"
):
nodes = nodes.set_index(nodes_id_column)
edges = edges.set_index(edges_id_column)
edges[sum_column] = 0.0
if sum_column not in nodes.columns:
nodes[sum_column] = 0.0
other_edges = edges.copy()
other_nodes = nodes.copy()
start_edges = edges.copy()
logging.info(f" x sum of '{column_to_sum}' '{direction}' through the network (total {len(other_edges)} edges)...")
iteration = 0
while not other_edges.empty and not start_edges.empty:
iteration += 1
# find all start edges and nodes
start_nodes, other_nodes, start_edges, other_edges = get_start_edges_nodes(
other_edges, other_nodes,
direction=direction,
)
logging.info(f" - Found {len(start_edges)} start edges ({len(other_edges)} left)")
# add the total values of start nodes to start edges
start_edges = start_edges.drop(
columns=[sum_column],
).merge(
start_nodes[sum_column],
left_on="node_start",
right_index=True,
how="left"
)
# fillna and multiply with downstream distribution factor
start_edges[sum_column] = start_edges[sum_column].fillna(0.0)
start_edges = start_edges[~start_edges.index.duplicated()]
if "downstream_splits_dist" in start_edges.columns:
start_edges[sum_column] = (
start_edges[sum_column] * start_edges["downstream_splits_dist"]
)
start_edges[column_to_sum] += start_edges[sum_column]
edges = edges[~edges.index.duplicated(keep='first')]
edges.loc[start_edges.index, sum_column] += start_edges[column_to_sum].values
specific_discharge_start_nodes = start_edges[["node_end", column_to_sum]].groupby("node_end").sum()
nodes.loc[specific_discharge_start_nodes.index, sum_column] += specific_discharge_start_nodes[column_to_sum].values
other_nodes = nodes.loc[other_nodes.index]
edges[sum_column] = edges[sum_column].astype(float)
nodes[sum_column] = nodes[sum_column].astype(float)
nodes = nodes.reset_index()
edges = edges.reset_index()
return edges, nodes