import logging
from pathlib import Path
import folium
import geopandas as gpd
import pandas as pd
import rioxarray
import xarray
from pydantic import BaseModel, ConfigDict
from ..utils.preprocess import preprocess_hydroobjecten
from ..utils.create_graph import create_graph_from_edges
from ..utils.network_functions import (
calculate_angles_of_edges_at_nodes,
define_list_upstream_downstream_edges_ids,
find_node_edge_ids_in_directed_graph,
calculate_discharges_of_edges_at_nodes,
select_downstream_upstream_edges_angle,
select_downstream_upstream_edges_discharge,
)
[documentatie]class GeneratorBasis(BaseModel):
"""Basis class for all Generators
Basis class for reading all basis datasets (based on attributes)
from the subdirectory basisdata (dir_basisdata) and optionally read results
Parameters
----------
path : pathlib.Path
windowspath to analysis folder
name : str
String representing name of case (equal to folder name)
dir_basisdata : str | pathlib.Path
String representing subfolder with basisdata
dir_results : str | pathlib.Path
String representing subfolder with results
read_results : bool
setting to know whether results in dir_results should be read
write_results : bool
setting to know whether results should be written in dir_results
required_results : list[str]
attributes required as input (in the results directory)
folium_map : folium.Map
folium map
"""
model_config = ConfigDict(arbitrary_types_allowed=True)
path: Path = None
name: str = None
base_dir: Path = None
dir_basisdata: str | Path = "0_basisdata"
dir_results: str | Path | None = "1_resultaat"
read_results: bool = False
write_results: bool = False
required_results: list[str] = []
folium_map: folium.Map = None
def __init__(self, **kwargs):
super().__init__(**kwargs)
if self.path is not None:
self.check_case_path_directory(path=self.path)
self.read_data_from_case()
self.read_required_data_from_case()
[documentatie] def check_case_path_directory(self, path: Path):
"""Check on existence case directory and structure
Parameters
----------
path : Path
path to case directory. name of directory is used as case name.
self.path and self.name are set
Raises ValueErrors in case directory and 0_basisdata directory not exist
"""
if not path.exists() and path.is_dir():
raise ValueError(
f"provided path [{path}] does not exist or is not a directory"
)
self.path = path
self.name = self.path.name
logging.info(f' ### Case "{self.name.capitalize()}" ###')
# check if directories 0_basisdata and 1_resultaat exist
if isinstance(self.dir_basisdata, str):
self.dir_basisdata = Path(self.path, self.dir_basisdata)
if not isinstance(self.dir_basisdata, Path) or not self.dir_basisdata.exists():
raise ValueError(
f"provided [{self.dir_basisdata}] is not a path or does not exist"
)
if self.dir_results is not None:
if isinstance(self.dir_results, str):
self.dir_results = Path(self.path, self.dir_results)
if isinstance(self.dir_results, Path):
if not self.dir_results.exists():
self.dir_results.mkdir(parents=True, exist_ok=True)
logging.info(f" - dir basisdata = {self.dir_basisdata}")
logging.info(f" - dir results = {self.dir_results}")
[documentatie] def read_data_from_case(self, path: Path = None, read_results: bool = None):
"""Read data from case: including basis data and intermediate results
Parameters
----------
path : Path, optional
Path to the case directory including directories 0_basisdata and
1_resultaat. Directory name is used as name for the case,
by default None
read_results : bool, optional
if True, it reads already all resulst from, by default None
"""
if path is not None and path.exists():
self.check_case_path_directory(path=path)
def read_attributes_from_folder(path_dir: Path):
for f in path_dir.glob("**/*"):
if hasattr(self, f.stem):
logging.info(f" - get dataset {f.stem.upper()}")
if f.suffix == ".gpkg":
setattr(self, f.stem, gpd.read_file(f, layer=f.stem))
if f.suffix in [".nc", ".NC"]:
with rioxarray.open_rasterio(f) as raster:
setattr(self, f.stem, raster.load())
logging.info(f" x read basisdata")
if self.dir_basisdata is not None and self.dir_basisdata.exists():
read_attributes_from_folder(self.dir_basisdata)
if self.read_results:
logging.info(f" x read results")
if self.dir_results is not None and self.dir_results.exists():
read_attributes_from_folder(self.dir_results)
[documentatie] def read_required_data_from_case(self):
"""Check if required results (from previous analyses) is available
This function check if all required datasets are imported.
Raises
------
ValueError
if required dataset is not available
"""
for required_dataset in self.required_results:
for f in self.dir_results.glob("**/*"):
if f.stem != required_dataset:
continue
if hasattr(self, f.stem) and getattr(self, f.stem) is None:
logging.info(f" - get dataset {f.stem.upper()}")
if f.suffix == ".gpkg":
setattr(self, f.stem, gpd.read_file(f, layer=f.stem))
if f.suffix in [".nc", ".NC"]:
setattr(self, f.stem, rioxarray.open_rasterio(f))
if getattr(self, required_dataset) is None:
logging.info(f" * dataset {required_dataset} is missing - check if absolutely required")
[documentatie] def use_processed_hydroobjecten(self, processed_file="processed", force_preprocess=False, snapping_distance=0.05):
"""actualize hydroobjecten and overige_watergangen
replaces hydroobjecten and overige_watergangen with the newest processed attributes
Parameters
----------
processed_file : str, optional
suffix of processed files, by default "processed"
"""
if self.snapping_distance is not None:
snapping_distance = self.snapping_distance
for watergang in ["hydroobjecten", "overige_watergangen"]:
if getattr(self, watergang, None) is None:
logging.info(f" - attribute {watergang} does not exist")
continue
watergang_processed_file_name = None
attributes = dir(self)
files_in_dirs = list(self.dir_basisdata.glob("**/*")) + list(self.dir_results.glob("**/*"))
for file in files_in_dirs:
if (
f"{watergang}_{processed_file}" in file.stem
and "nodes" not in file.stem
and file.stem in self.required_results
):
watergang_processed_file_name = file
if force_preprocess or watergang_processed_file_name is None:
logging.info(f" - preprocessing dataset {watergang}")
waterline = self.generate_or_use_preprocessed_hydroobjecten(
waterline=watergang,
snapping_distance=snapping_distance if watergang == "hydroobjecten" else None
)
setattr(self, watergang, waterline)
else:
logging.info(
f" - use processed dataset {watergang}: {watergang_processed_file_name.name}"
)
setattr(self, watergang, gpd.read_file(watergang_processed_file_name))
[documentatie] def generate_or_use_preprocessed_hydroobjecten(
self, waterline, preprocessed_file="preprocessed", snapping_distance=0.05
):
files_in_dir = self.dir_results.glob("**/*")
waterline_preprocessed_file = Path(self.dir_results, f"{waterline}_{preprocessed_file}.gpkg")
if waterline_preprocessed_file in files_in_dir:
logging.info(f" - get dataset preprocessed {waterline}")
gdf_waterline = gpd.read_file(waterline_preprocessed_file)
return gdf_waterline
else:
logging.info(
f" - no {waterline}_preprocessed.gpkg, preprocessing {waterline}"
)
gdf_waterline = getattr(self, waterline)
len_gdf_waterline = len(gdf_waterline)
if snapping_distance is not None:
gdf_waterline, gdf_waterline_snapped = preprocess_hydroobjecten(
gdf_waterline, snapping_distance=snapping_distance
)
if self.write_results:
gdf_waterline_snapped.to_file(Path(self.dir_results, f"{waterline}_snapped.gpkg"))
gdf_waterline.to_file(Path(self.dir_results, f"{waterline}_preprocessed.gpkg"))
logging.info(f" - preprocessing done: {waterline}")
else:
logging.info(f" - no preprocessing: {waterline}")
# check for invalid or duplicate geometries (linestrings forming a ring)
gdf_waterline_old = gdf_waterline.copy()
gdf_waterline = gdf_waterline[~gdf_waterline['geometry'].apply(lambda geom: geom.is_closed)]
gdf_waterline = gdf_waterline.loc[~gdf_waterline["geometry"].duplicated(keep="first")]
logging.info(f" - removed {len_gdf_waterline-len(gdf_waterline)} waterlines [{waterline}]")
return gdf_waterline
[documentatie] def create_graph_from_network(self, water_lines=["hydroobjecten"], processed="processed"):
"""Turns a linestring layer containing waterlines into a graph of edges and nodes.
Parameters
----------
water_lines : list, optional
List of waterline files names used to create graph, must refer to geopackages containing linestrings, by default ["hydroobjecten"]
Returns
-------
self.nodes: gpd.GeoDataFrame
Geodataframe containing nodes between waterlines
self.edges: gpd.GeoDataFrame
Geodataframe containing edges (waterlines)
self.graph: nx.DiGraph
Networkx graph containing the edges and nodes
"""
edges = None
for water_line in water_lines:
gdf_water_line = getattr(self, water_line)
for i in range(10):
if not hasattr(self, f"{water_line}_{processed}_{i}"):
break
gdf_water_line_processed = getattr(self, f"{water_line}_{processed}_{i}")
if gdf_water_line_processed is None:
break
else:
gdf_water_line = gdf_water_line_processed.copy()
if gdf_water_line is None:
continue
if edges is None:
edges = gdf_water_line.explode()
else:
edges = pd.concat([edges, gdf_water_line.explode()])
self.nodes, self.edges, self.graph = create_graph_from_edges(edges)
logging.info(
f" x create network graph ({len(self.edges)} edges, {len(self.nodes)} nodes)"
)
return self.nodes, self.edges, self.graph
[documentatie] def select_downstream_upstream_edges(
self,
min_difference_angle=10.0,
min_difference_discharge_factor=2.0
):
logging.info(" x find downstream upstream edges")
if "specific_discharge" not in self.nodes:
logging.info(f" - use angle using min_difference_angle [{min_difference_angle}deg]")
self.nodes = select_downstream_upstream_edges_angle(
self.nodes, min_difference_angle=min_difference_angle
)
else:
logging.info(f" - use discharge distribution [factor{min_difference_discharge_factor:.3f}]")
self.nodes = select_downstream_upstream_edges_discharge(
self.nodes, min_difference_discharge_factor=min_difference_discharge_factor
)
return self.nodes
[documentatie] def export_results_to_gpkg_or_nc(self, list_layers: list[str] = None, dir_output: str | Path = None):
"""Export results to geopackages in folder 1_resultaat"""
if dir_output is None:
dir_output = self.dir_results
logging.info(f" x export results")
if list_layers is None:
return
for layer in list_layers:
result = getattr(self, layer)
if result is None:
logging.info(f" - {layer} not available")
elif isinstance(result, gpd.GeoDataFrame):
logging.info(f" - {layer} ({len(result)})")
result.to_file(Path(dir_output, f"{layer}.gpkg"))
elif isinstance(result, xarray.DataArray) or isinstance(result, xarray.Dataset):
logging.info(f" - {layer} (netcdf)")
netcdf_file_path = Path(dir_output, f"{layer}.nc")
if netcdf_file_path.exists():
netcdf_file_path.unlink()
encoding = {
layer: {
'dtype': str(result.dtype),
'zlib': True,
'complevel': 9,
},
}
result.to_netcdf(netcdf_file_path, mode='w', encoding=encoding)
else:
raise ValueError("type not exportable")