# -*- coding: utf-8 -*-
""" A set of classes to handle SRH-2D data
SRH-2D data include information in SRHHydro, SRHGeom, SRHMat, and simulation results.
Author: Xiaofeng Liu, PhD, PE
Penn State University
"""
import sys
import os
import copy
import numpy as np
import glob
import h5py
from os import path
import shlex
import vtk
from vtk.util import numpy_support as VN
import re
import meshio
import json
from pyHMT2D.Hydraulic_Models_Data import HydraulicData
from pyHMT2D.__common__ import *
from pyHMT2D.Misc.vtk_utilities import vtkCellTypeMap
from pyHMT2D.Misc import vtkHandler
[docs]
class SRH_2D_SIF:
"""Class to parse and manage SRH-2D SIF (Simulation Input File) data"""
VALID_MODULES = ['RIVER', 'WATERSHED', 'COAST']
VALID_SOLVERS = ['FLOW', 'MOBILE', 'DIFF', 'SED_DIFF']
VALID_TIME_TYPES = ['STEADY', 'UNSTEADY']
VALID_MESH_UNITS = ['FOOT', 'METER', 'INCH', 'MM', 'MILE', 'KM', 'GSCALE']
VALID_INIT_CONDITIONS = ['DRY', 'RST', 'AUTO', 'ZONAL', 'VARY_WSE', 'VARY_WD']
VALID_MANNING_OPTIONS = ['SPATIAL', 'SPATIAL VEG', 'SPATIAL GRAIN']
VALID_OUTPUT_FORMATS = ['SRHC', 'TEC', 'SRHN', 'XMDF', 'XMDFC', 'VTK']
VALID_OUTPUT_UNITS = ['SI', 'EN']
def __init__(self, srhsif_filename):
"""Initialize SIF parser with srhsif_filename"""
self.srhsif_filename = srhsif_filename
self.srhsif_content = {
'ManningN': {},
'BC': {},
'IQParams': {},
'EWSParamsC': {},
'MONITORING': {},
'wall_roughness': {},
'pressurized_zones': {},
'flow_obstructions': {},
'output_max_dat': False,
'intermediate_output': {}
}
self.comments = {}
try:
self._parse_file()
except Exception as e:
raise ValueError(f"Error parsing SIF file: {str(e)}")
def _parse_file(self):
"""Parse the SIF file and store data in self.data dictionary"""
try:
with open(self.srhsif_filename, 'r') as f:
lines = f.readlines()
except FileNotFoundError:
raise FileNotFoundError(f"SIF file not found: {self.srhsif_filename}")
#get the case name from the filename: assume the filename is like "Muncie_SIF.dat"
case_name = self.srhsif_filename.split("_SIF.dat")[0] #strip off "_SIF.dat"
self.srhsif_content["Case"] = case_name
i = 0
while i < len(lines):
line = lines[i].strip()
if line.startswith('//'):
comment = line[2:].strip()
self.comments[i] = comment
i += 1
if i >= len(lines):
break
value = lines[i].strip()
try:
if "Simulation Description" in comment:
self.srhsif_content['simulation_description'] = value
elif "Module Selected" in comment:
self._validate_module(value.upper())
self.srhsif_content['module'] = value
elif "Solver Selection" in comment:
self._validate_solver(value.upper())
self.srhsif_content['solver'] = value
elif "Monitor-Point-Info" in comment:
self.srhsif_content['monitor_point_npoints'] = int(value) #number of monitor points
if self.srhsif_content['monitor_point_npoints'] > 0:
self._parse_monitoring_points(lines, i)
elif "Steady-or-Unsteady" in comment:
self._validate_time_type(value.upper())
self.srhsif_content['time_type'] = value
elif "Time Parameters" in comment:
self.srhsif_content['time_params'] = self._parse_time_parameters(value)
elif "Turbulence-Model" in comment:
self.srhsif_content['turbulence_model'] = value
elif "A_TURB for the PARA Model" in comment:
self.srhsif_content['a_turb'] = float(value)
elif "Mesh-Unit" in comment:
self.srhsif_content['mesh_unit'] = value
elif "Mesh FILE_NAME and FORMAT" in comment:
self.srhsif_content['mesh_file_name'] = value.split()[0]
self.srhsif_content['mesh_format'] = value.split()[1]
elif "Initial Flow Condition Setup Option" in comment:
self._validate_init_condition(value.upper())
self.srhsif_content['initial_flow_condition'] = value
#if the initial flow condition is ZONAL, then the next lines are for the number of initial condition zones and the zone data
if value.upper() == 'ZONAL':
#dict for zonal IC data
res_IC_zonal = {}
i += 1 #comment line
#check the current comment line contains "Constant Setup for Initial Condition"
if "Constant Setup for Initial Condition" in lines[i]:
#read the next line
i += 1
value = lines[i].strip()
self.srhsif_content['n_initial_condition_zones'] = int(value)
#read the next comment line and make sure it contains "Constant-Value Initial Condition"
i += 1
if "Constant-Value Initial Condition" in lines[i]:
#read the next n_initial_condition_zones lines (one line per zone)
for j in range(self.srhsif_content['n_initial_condition_zones']):
i += 1
values = lines[i].strip().split() #values is a list of strings; the first three are floats, the rest are strings
values_float = [float(x) for x in values[:3]]
values_str = values[3:]
res_IC_zonal[j] = values_float + values_str
else:
raise ValueError("The next line after the initial flow condition is not 'Constant-Value Initial Condition'.")
else:
raise ValueError("The next line after the initial flow condition is not 'Constant Setup for Initial Condition'.")
self.srhsif_content['IC_zonal'] = res_IC_zonal
elif "Manning Coefficient" in comment:
i = self._parse_manning_coefficients(lines, i, value)
elif "Any-Special-Modeling-Options" in comment:
self.srhsif_content['special_modeling_options'] = int(value)
elif "Boundary Type" in comment:
i = self._parse_boundary_conditions(lines, i)
elif "Wall-Roughess-Height-Specification" in comment:
i = self._parse_wall_roughness(lines, i)
elif "Pressurized Zone exists?" in comment:
i = self._parse_pressurized_zones(lines, i)
elif "Any In-Stream Flow Obstructions?" in comment:
i = self._parse_flow_obstructions(lines, i)
elif "Results-Output-Format" in comment:
self._parse_output_format(value)
elif "Output File _MAX.dat" in comment:
self._parse_max_output(value)
elif "Intermediate Result Output Control" in comment:
i = self._parse_intermediate_output(lines, i)
except ValueError as e:
raise ValueError(f"Error parsing line {i+1}: {str(e)}")
i += 1
def _parse_boundary_conditions(self, lines, start_index):
"""Parse all boundary conditions from the file
Args:
lines: List of all file lines
start_index: Current line index where first boundary is found
Returns:
next_index: Index of the next line after all boundaries
"""
res_BC = {}
res_IQParams = {}
res_EWSParamsC = {}
i = start_index - 1 # Start from the line before the first boundary for the while loop below
index_BC = 0
while i < len(lines):
line = lines[i].strip()
# Check if we've hit the next section (indicated by '//' without 'Boundary')
if line.startswith('//') and 'Boundary' not in line:
break
# Parse boundary type
if line.startswith('// Boundary Type'):
index_BC += 1
i += 1
boundary_type = lines[i].strip().lower()
res_BC[index_BC] = boundary_type
if boundary_type == 'monitoring' or boundary_type == 'monitor' or boundary_type == 'symmetry' or boundary_type == 'wall':
#do nothing (there is no boundary values for monitoring, symmetry, or symmetry)
pass
else:
# Get boundary values from next two lines
i += 1 # Skip to values comment line
i += 1 # Go to actual values line
boundary_values = lines[i].strip().split()
if boundary_type == 'inlet-q':
res_IQParams[index_BC] = boundary_values
elif boundary_type == 'exit-h':
res_EWSParamsC[index_BC] = boundary_values
else:
raise ValueError(f"Boundary type: {boundary_type} is not supported yet.")
i += 1
self.srhsif_content['BC'] = res_BC
self.srhsif_content['IQParams'] = res_IQParams
self.srhsif_content['EWSParamsC'] = res_EWSParamsC
#print("res_BC = ", res_BC)
#print("res_IQParams = ", res_IQParams)
#print("res_EWSParamsC = ", res_EWSParamsC)
return i # Return the index of the next section
def _parse_wall_roughness(self, lines, i):
"""Parse wall roughness section"""
value = lines[i].strip()
if value and value not in ['0', '']:
self.srhsif_content['wall_roughness'] = value.split()
return i
def _parse_pressurized_zones(self, lines, i):
"""Parse pressurized zones section"""
value = lines[i].strip()
if value and value not in ['0', '']:
self.srhsif_content['pressurized_zones'] = value.split()
return i
def _parse_flow_obstructions(self, lines, i):
"""Parse flow obstructions section"""
value = lines[i].strip()
if value and value not in ['0', '']:
self.srhsif_content['flow_obstructions'] = value.split()
return i
def _parse_output_format(self, value):
"""Parse output format and units"""
parts = value.split()
format_parts = parts[0].split('/')
output_format = format_parts[0].upper()
if output_format not in self.VALID_OUTPUT_FORMATS:
raise ValueError(f"Invalid output format: {output_format}")
output_unit = parts[1].upper() if len(parts) > 1 else 'EN'
if output_unit not in self.VALID_OUTPUT_UNITS:
raise ValueError(f"Invalid output unit: {output_unit}")
self.srhsif_content['OutputFormat'] = output_format
self.srhsif_content['OutputUnit'] = output_unit
# Check for optional STL FACE
if len(parts) > 2 and parts[2:] == ['STL', 'FACE']:
self.srhsif_content['output_stl_face'] = True
def _parse_max_output(self, value):
"""Parse _MAX.dat output option"""
self.srhsif_content['output_max_dat'] = bool(value.strip())
def _parse_intermediate_output(self, lines, i):
"""Parse intermediate output control"""
value = lines[i].strip()
if value.lower() != 'empty':
try:
# Try parsing as a single interval
interval = float(value)
self.srhsif_content['intermediate_output'] = {'type': 'interval', 'value': interval}
except ValueError:
# Parse as list of times
times = [float(t) for t in value.split()]
self.srhsif_content['intermediate_output'] = {'type': 'list', 'value': times}
return i
[docs]
def save_as(self, srhsif_filename=None):
"""Save the current data in self.srhsif_content back to a SIF file
So to modify a SIF file, one needs to change the data in self.srhsif_content, and then call this function to save the data back to the SIF file.
Parameters
----------
srhsif_filename : str, optional
The filename to save the SIF file to. If not provided, the filename will be the same as the original filename.
Returns
-------
"""
if srhsif_filename is None:
srhsif_filename = self.srhsif_filename
try:
with open(srhsif_filename, 'w') as f:
# Simulation Description
f.write("// Simulation Description (not used by SRH):\n")
f.write(f"{self.srhsif_content.get('simulation_description', '')}\n")
# Module
f.write("// Module Selected (RIVER WATERSHED COAST)\n")
f.write(f"{self.srhsif_content.get('module', '')}\n")
# Solver
f.write("// Solver Selection (FLOW MOBile DIFF SED_DIFF ...)\n")
f.write(f"{self.srhsif_content.get('solver', '')}\n")
# Monitor Points
f.write("// Monitor-Point-Npoints: NPOINT\n")
f.write(f"{self.srhsif_content.get('monitor_point_npoints', '0')}\n")
# Monitoring Points
if self.srhsif_content.get('monitor_point_npoints') > 0:
f.write("// Monitor Point Coordinates: x1 y1 x2 y2 ...\n")
#loop over the monitor points and write them to the file
for pointI in range(self.srhsif_content['monitor_point_npoints']):
point_x = self.srhsif_content['monitor_points'][pointI*2]
point_y = self.srhsif_content['monitor_points'][pointI*2+1]
f.write(f"{point_x} {point_y}\n")
# Time Type
f.write("// Steady-or-Unsteady (STEADY/UNS)\n")
f.write(f"{self.srhsif_content.get('time_type', '')}\n")
# Time Parameters
f.write("// Time Parameters: T_Start(hr) T_end(hr) Dt(s) [Dt_max(s) CFL]\n")
f.write(" ".join(map(str, self.srhsif_content['time_params'])) + "\n")
# Turbulence Model
f.write("// Turbulence-Model\n")
f.write(f"{self.srhsif_content.get('turbulence_model', '')}\n")
# A_TURB Parameter
f.write("// A_TURB for the PARA Model (0.05 to 1.0)\n")
f.write(f"{self.srhsif_content.get('a_turb', '')}\n")
# Mesh Unit
f.write("// Mesh-Unit (FOOT METER INCH MM MILE KM GSCALE)\n")
f.write(f"{self.srhsif_content.get('mesh_unit', '')}\n")
# Mesh File
f.write("// Mesh FILE_NAME and FORMAT(SMS...)\n")
f.write(f"{self.srhsif_content.get('mesh_file_name', '')} {str(self.srhsif_content.get('mesh_format', '')).strip()}\n")
# Initial Flow Condition
f.write("// Initial Flow Condition Setup Option (DRY RST AUTO ZONAL Vary_WSE/Vary_WD)\n")
f.write(f"{self.srhsif_content.get('initial_flow_condition', '')}\n")
# If the initial flow condition is ZONAL, then the next lines are for the number of initial condition zones and the zone data
if self.srhsif_content.get('initial_flow_condition', '').upper() == 'ZONAL':
f.write("// Constant Setup for Initial Condition: n_zone [MESH or 2DM_filename]\n")
f.write(f"{self.srhsif_content.get('n_initial_condition_zones', '0')}\n")
f.write("// Constant-Value Initial Condition for Mesh Zone: U V WSE [TK] [ED] [T]\n")
for j in range(self.srhsif_content['n_initial_condition_zones']):
#f.write(f"{self.srhsif_content['IC_zonal'][j]}\n")
f.write(f"{' '.join(map(str, self.srhsif_content['IC_zonal'][j]))}\n")
# Manning Coefficients
f.write("// Manning Coefficient n Input Options: SPATIAL or SPATIAL VEG GRAIN for 2D Model; SPATIAL for 3D model\n")
f.write(f"{self.srhsif_content.get('manning_option', '')}\n")
f.write("// Number of Material Types in 2D Mesh File\n")
f.write(f"{str(self.srhsif_content.get('n_manning_zones', '')).strip()}\n")
f.write("// Manning Coefficient in each mesh zone: a real value or a WD~n file name or Landuse\n")
if 'ManningN' in self.srhsif_content:
# Sort the ManningN values by their keys before writing
sorted_values = sorted(self.srhsif_content['ManningN'].items(), key=lambda x: x[0])
for key, value in sorted_values:
if key != 0: #Don't need the default Manning's n value
f.write(f"{value}\n")
else:
#something is wrong
raise ValueError("ManningN is not found in the SIF file.")
# Special Modeling Options
f.write("// Any-Special-Modeling-Options? (0/1=no/yes)\n")
f.write(f"{self.srhsif_content.get('special_modeling_options', '0')}\n")
# Boundary Conditions
bcDict = self.srhsif_content['BC']
IQParams = self.srhsif_content['IQParams']
EWSParamsC = self.srhsif_content['EWSParamsC']
# loop over all the boundaries
for index_BC, boundary_type in bcDict.items():
f.write("// Boundary Type (INLET-Q EXIT-H etc)\n")
f.write(f"{boundary_type}\n")
if boundary_type == 'inlet-q':
f.write("// Boundary Values (Q W QS TEM H_rough etc)\n")
f.write(" ".join(str(x) for x in IQParams[index_BC]) + "\n")
elif boundary_type == 'exit-h':
f.write("// Boundary Values (Q W QS TEM H_rough etc)\n")
f.write(" ".join(str(x) for x in EWSParamsC[index_BC]) + "\n")
elif boundary_type == 'wall':
#do nothing
pass
elif boundary_type == 'monitoring' or boundary_type == 'monitor' or boundary_type == 'symmetry':
#do nothing
pass
else:
raise ValueError(f"Boundary type: {boundary_type} is not supported yet.")
# Wall Roughness
f.write("// Wall-Roughess-Height-Specification (empty-line=DONE)\n")
if self.srhsif_content.get('wall_roughness'):
f.write(" ".join(map(str, self.srhsif_content['wall_roughness'])) + "\n")
else:
f.write(" \n")
# Pressurized Zones
f.write("// Pressurized Zone exists? (empty-line or 0 == NO)\n")
if self.srhsif_content.get('pressurized_zones'):
f.write(" ".join(map(str, self.srhsif_content['pressurized_zones'])) + "\n")
else:
f.write(" \n")
# Flow Obstructions
f.write("// Any In-Stream Flow Obstructions? (empty-line or 0 = NO)\n")
if self.srhsif_content.get('flow_obstructions'):
f.write(" ".join(map(str, self.srhsif_content['flow_obstructions'])) + "\n")
else:
f.write(" \n")
# Output Format
f.write("// Results-Output-Format-and-Unit(SRHC/TEC/SRHN/XMDF/XMDFC/VTK;SI/EN) + Optional STL FACE\n")
output_str = f"{self.srhsif_content.get('OutputFormat', '')} {self.srhsif_content.get('OutputUnit', '')}"
if self.srhsif_content.get('output_stl_face'):
output_str += " STL FACE"
f.write(output_str + "\n")
# MAX.dat Output
f.write("// Output File _MAX.dat is requested? (empty means NO)\n")
f.write("1\n" if self.srhsif_content.get('output_max_dat') else " \n")
# Intermediate Output
f.write("// Intermediate Result Output Control: INTERVAL(hour) OR List of T1 T2 ... EMPTY means the end\n")
if 'intermediate_output' in self.srhsif_content:
output_data = self.srhsif_content['intermediate_output']
if output_data['type'] == 'interval':
f.write(f"{output_data['value']}\n")
else:
f.write(" ".join(map(str, output_data['value'])) + "\n")
else:
f.write("EMPTY\n")
except Exception as e:
raise IOError(f"Error saving SIF file: {str(e)}")
# Additional getter methods
[docs]
def get_simulation_start_end_time(self):
"""Return simulation start and end time"""
return self.srhsif_content['time_params'][0], self.srhsif_content['time_params'][1]
[docs]
def get_simulation_time_step_size(self):
"""Return simulation time step size"""
return self.srhsif_content['time_params'][2]
[docs]
def get_ManningN_dict(self):
"""Return Manning's n values"""
return self.srhsif_content.get('ManningN', [])
[docs]
def get_wall_roughness(self):
"""Return wall roughness specifications"""
return self.data.get('wall_roughness', [])
[docs]
def get_pressurized_zones(self):
"""Return pressurized zones specifications"""
return self.data.get('pressurized_zones', [])
[docs]
def get_flow_obstructions(self):
"""Return flow obstructions specifications"""
return self.data.get('flow_obstructions', [])
# Additional setter methods
[docs]
def set_wall_roughness(self, values):
"""Set wall roughness specifications"""
self.data['wall_roughness'] = values
def _validate_module(self, value):
"""Validate module selection"""
if value.upper() not in self.VALID_MODULES:
raise ValueError(f"Invalid module: {value}. Must be one of {self.VALID_MODULES}")
def _validate_solver(self, value):
"""Validate solver selection"""
if value.upper() not in self.VALID_SOLVERS:
raise ValueError(f"Invalid solver: {value}. Must be one of {self.VALID_SOLVERS}")
def _validate_time_type(self, value):
"""Validate time type selection"""
if value.upper() not in self.VALID_TIME_TYPES:
raise ValueError(f"Invalid time type: {value}. Must be one of {self.VALID_TIME_TYPES}")
def _validate_mesh_unit(self, value):
"""Validate mesh unit selection"""
if value.upper() not in self.VALID_MESH_UNITS:
raise ValueError(f"Invalid mesh unit: {value}. Must be one of {self.VALID_MESH_UNITS}")
def _validate_init_condition(self, value):
"""Validate initial condition selection"""
if value.upper() not in self.VALID_INIT_CONDITIONS:
raise ValueError(f"Invalid initial condition: {value}. Must be one of {self.VALID_INIT_CONDITIONS}")
def _validate_manning_option(self, value):
"""Validate Manning option selection"""
if value.upper() not in self.VALID_MANNING_OPTIONS:
raise ValueError(f"Invalid Manning option: {value}. Must be one of {self.VALID_MANNING_OPTIONS}")
def _parse_time_parameters(self, value):
"""Parse time parameters line
Format: T_Start(hr) T_end(hr) Dt(s) [Dt_max(s) CFL]
Last two parameters are optional
Args:
value: String containing space-separated time parameters
Returns:
List of float values for time parameters
Raises:
ValueError: If parameters are invalid or missing required values
"""
try:
params = [float(x) for x in value.split()]
if len(params) < 3:
raise ValueError("Time parameters must include at least t_start, t_end, and dt")
return params
except ValueError as e:
raise ValueError(f"Invalid time parameters format: {str(e)}")
def _parse_monitoring_points(self, lines, i):
"""Parse monitoring points section
Args:
lines: List of all file lines
i: Current line index (where NPOINT is specified)
Returns:
next_index: Index of the next line after monitoring points section
"""
try:
# Get number of monitoring points
n_points = int(lines[i].strip())
# Skip comment line
i += 1
if not lines[i].strip().startswith('//'):
raise ValueError("Expected comment line for monitoring point coordinates")
# Get coordinates (x, y) and one point per line
coords = [] #list of x1, y1, x2, y2, ...
for j in range(n_points):
i += 1
#print("lines[i].strip().split(): ", lines[i].strip().split())
#print("type(lines[i].strip().split()): ", type(lines[i].strip().split()))
for item in lines[i].strip().split():
coords.append(float(item))
#print(f"coords: {coords}")
# Convert coordinates to float and pair them
if len(coords) != 2 * n_points:
raise ValueError(f"Expected {2*n_points} coordinates for {n_points} points: length of coords is {len(coords)}")
#coords = [float(x) for x in coords]
# Store in data dictionary
self.srhsif_content['monitor_points'] = coords
return i
except ValueError as e:
raise ValueError(f"Error parsing monitoring points: {str(e)}")
def _parse_manning_coefficients(self, lines, i, value):
"""Parse Manning coefficients section
Args:
lines: List of all file lines
i: Current line index where "VARY" is found
value: String containing "VARY"
Returns:
next_index: Index of the next line after Manning coefficients
Raises:
ValueError: If Manning coefficient values are invalid
"""
try:
# First line should be the option, e.g., "VARY"
manning_option = value.upper()
self.srhsif_content['manning_option'] = manning_option
# Next line should be the comment about number of materials
i += 1
if i >= len(lines) or not lines[i].strip().startswith('//'):
raise ValueError("Expected comment line for number of materials")
# Next line contains the actual number
i += 1
if i >= len(lines):
raise ValueError("Unexpected end of file while reading number of materials")
try:
n_materials = int(lines[i].strip())
except ValueError:
raise ValueError(f"Invalid number of materials: {lines[i].strip()}")
self.srhsif_content['n_manning_zones'] = n_materials
# Read the comment line for the Manning values
i += 1
if i >= len(lines) or not lines[i].strip().startswith('//'):
raise ValueError("Expected comment line for Manning values")
# Read the Manning values
res_ManningsN = {} # dict for ManningsN (there cuould be multiple entries)
#add a default value for the ManningN (not sure why SIF file does not have this)
res_ManningsN[0] = 0.03
for material_id in range(n_materials):
i += 1
if i >= len(lines):
raise ValueError("Unexpected end of file while reading Manning coefficients")
value = lines[i].strip()
# Handle both numeric values and file names
try:
res_ManningsN[material_id+1] = float(value)
except ValueError:
# If not a number, store as string (could be a file name)
res_ManningsN[material_id+1] = value
self.srhsif_content['ManningN'] = res_ManningsN
return i
except ValueError as e:
raise ValueError(f"Invalid Manning coefficient values: {str(e)}")
[docs]
def modify_one_manning_n_in_srhsif_content(self, material_id, new_manning_n):
"""Modify one Manning's n value for a specific material ID in self.srhsif_content"""
#check material_id is an integer and is within the range of the Manning's n list
if not isinstance(material_id, int) or material_id < 0 or material_id >= len(self.srhsif_content['ManningN']):
raise ValueError(f"Invalid material ID: {material_id}. Must be an integer within the range of the Manning's n list.")
#check new_manning_n is a float
if not isinstance(new_manning_n, float):
raise ValueError(f"Invalid Manning's n value: {new_manning_n}. Must be a float.")
#modify the Manning's n value
self.srhsif_content['ManningN'][material_id] = new_manning_n
if gVerbose:
print(f"Manning's n value for material ID {material_id} has been modified to {new_manning_n}")
[docs]
def modify_ManningsN(self, materialIDs, newManningsNValues, ManningN_MaterialNames):
"""Modify materialID's Manning's n values to new values for a given list of material IDs
Parameters
----------
materialIDs : list
material ID list
newManningsNValues : list
new Manning's n value list
ManningN_MaterialNames : list
name of materials list
Returns
-------
"""
if gVerbose: print("Modify Manning's n values ...")
if not isinstance(materialIDs[0], int):
raise Exception("Material ID has to be an integer. The type of materialID passed in is ", type(materialIDs[0]))
if not isinstance(newManningsNValues[0], float):
raise Exception("Manning's n has to be a float. The type of newManningsNValue passed in is ", type(newManningsNValues[0]))
#get the Manning's n dictionary
nDict = self.get_ManningN_dict()
#loop over all the material IDs and modify the Manning's n values
for i in range(len(materialIDs)):
materialID = materialIDs[i]
if materialID in nDict:
if gVerbose: print(" Old Manning's n value =", nDict[materialID], "for material ID = ", materialID)
nDict[materialID] = newManningsNValues[i]
if gVerbose: print(" New Manning's n value =", nDict[materialID], "for material ID = ", materialID)
#also modify the srhsif_content
self.modify_one_manning_n_in_srhsif_content(materialID, newManningsNValues[i])
else:
print("The specified materialID", materialID, "is not in the Manning's n list. Please check.")
[docs]
def modify_InletQ(self, bcIDs, newInletQValues):
"""Modify the inlet flow rate for the specified boundary IDs
Parameters
----------
bcIDs : list
list of boundary IDs
newInletQValues : list
list of new inlet flow rate values
Returns
-------
"""
if gVerbose: print("Modifying inlet flow rate for the specified boundary IDs ...")
if not isinstance(bcIDs[0], int):
raise Exception("Boundary ID has to be an integer. The type of bcID passed in is ", type(bcIDs[0]))
if not isinstance(newInletQValues[0], float):
raise Exception("Inlet flow rate has to be a float. The type of newInletQValues passed in is ", type(newInletQValues[0]))
#get the inlet flow rate dictionary
bcDict = self.srhsif_content['BC']
IQParamsDict = self.srhsif_content['IQParams']
#loop over all the boundary IDs and modify the inlet flow rate values
for i in range(len(bcIDs)):
bcID = bcIDs[i]
if bcID in bcDict:
if gVerbose: print(" Old inlet flow rate =", IQParamsDict[bcID][0], "for boundary ID = ", bcID)
IQParamsDict[bcID][0] = newInletQValues[i]
if gVerbose: print(" New inlet flow rate =", IQParamsDict[bcID][0], "for boundary ID = ", bcID)
else:
print("The specified boundary ID", bcID, "is not in the inlet flow rate list. Please check.")
[docs]
def modify_ExitH(self, bcIDs, newExitHValues):
"""Modify the exit water surface elevation for the specified boundary IDs
Parameters
----------
bcIDs : list
list of boundary IDs
newExitHValues : list
list of new exit water surface elevation values
Returns
-------
"""
if gVerbose: print("Modifying exit water surface elevation for the specified boundary IDs ...")
if not isinstance(bcIDs[0], int):
raise Exception("Boundary ID has to be an integer. The type of bcID passed in is ",
type(bcIDs[0]))
if not isinstance(newExitHValues[0], float):
raise Exception("Exit water surface elevation has to be a float. The type of newExitHValues passed in is ",
type(newExitHValues[0]))
#get the exit water surface elevation dictionary
bcDict = self.srhsif_content['BC']
EWSParamsC_Dict = self.srhsif_content['EWSParamsC']
#loop over all the boundary IDs and modify the exit water surface elevation values
for i in range(len(bcIDs)):
bcID = bcIDs[i]
if bcID in bcDict:
if gVerbose: print(" Old exit water surface elevation =", EWSParamsC_Dict[bcID][0], "for boundary ID = ", bcID)
EWSParamsC_Dict[bcID][0] = newExitHValues[i]
if gVerbose: print(" New exit water surface elevation =", EWSParamsC_Dict[bcID][0], "for boundary ID = ", bcID)
else:
print("The specified boundary ID", bcID, "is not in the exit water surface elevation list. Please check.")
[docs]
class SRH_2D_SRHHydro:
"""A class to handle srhhydro file for SRH-2D
Attributes
----------
srhhydro_filename : str
name for the srhhydro file
srhhydro_content : dictionary
a dictionary to hold all information in the srhhydro file
Methods
"""
def __init__(self, srhhydro_filename):
""" Constructor for SRH_2D_SRHHydro
The SRH_2D_SRHHydro file contains all information for a SRH-2D run.
Parameters
----------
srhhydro_filename: str
The name of the SRHHydro file
"""
self.srhhydro_filename = srhhydro_filename #srhhydro file name
#dict to hold the content of the srhhydro file
self.srhhydro_content = {}
#parse the srhhydro file and build srhhydro_content
self.parse_srhhydro_file()
[docs]
def parse_srhhydro_file(self):
"""Parse the SRHHydro file"""
res_all = {}
res_ManningsN = {}
res_InitZoneParams = {}
res_BC = {}
res_MONITORING = {}
res_PRESSURE = {}
res_INLET_Q = {}
res_EXIT_H = {}
res_WALL = {}
res_IQParams = {}
res_ISupCrParams = {}
res_EWSParamsC = {}
res_EWSParamsRC = {}
res_EQParams = {}
res_NDParams = {}
res_PressureNodestrings = {}
res_PressureParams = {}
res_PressOvertop = {} # Add dictionary for pressure overtop parameters
res_PressWeirParams = {} # Add dictionary for pressure weir parameters
try:
with open(self.srhhydro_filename, 'r') as f:
lines = f.readlines()
for i, line in enumerate(lines):
line = line.strip()
if not line:
continue
parts = line.split()
# Handle different line types
if line.startswith('SRHHYDRO'):
res_all['Version'] = parts[1]
elif line.startswith('Case'):
res_all['Case'] = parts[1].strip('"')
elif line.startswith('Description'):
res_all['Description'] = ' '.join(parts[1:]).strip('"')
elif line.startswith('RunType'):
res_all['RunType'] = parts[1]
elif line.startswith('ModelTemp'):
res_all['ModelTemp'] = parts[1]
elif line.startswith('UnsteadyOutput'):
res_all['UnsteadyOutput'] = parts[1]
elif line.startswith('SimTime'):
res_all['SimTime'] = [float(x) for x in parts[1:]]
elif line.startswith('TurbulenceModel'):
res_all['TurbulenceModel'] = parts[1]
elif line.startswith('ParabolicTurbulence'):
res_all['ParabolicTurbulence'] = float(parts[1])
elif line.startswith('InitCondOption'):
res_all['InitCondOption'] = parts[1]
elif line.startswith('NumInitZones'):
res_all['NumInitZones'] = int(parts[1])
elif line.startswith('InitZoneParams'):
zone_id = int(parts[1])
p1_value = int(parts[2])
p2_value = int(parts[3])
p3_value = float(parts[4])
unit = parts[5]
res_InitZoneParams[zone_id] = {'p1': p1_value, 'p2': p2_value, 'p3': p3_value, 'unit': unit}
elif line.startswith('Grid'):
res_all['Grid'] = parts[1].strip('"')
elif line.startswith('HydroMat'):
res_all['HydroMat'] = parts[1].strip('"')
elif line.startswith('MonitorPtFile'):
res_all['MonitorPtFile'] = parts[1].strip('"')
elif line.startswith('OutputFormat'):
res_all['OutputFormat'] = parts[1]
res_all['OutputUnit'] = parts[2]
elif line.startswith('OutputOption'):
res_all['OutputOption'] = int(parts[1])
elif line.startswith('OutputInterval'):
res_all['OutputInterval'] = float(parts[1])
elif line.startswith('ManningsN'):
zone_id = int(parts[1])
n_value = float(parts[2])
res_ManningsN[zone_id] = n_value
elif line.startswith('BC'):
bc_id = int(parts[1])
bc_type = parts[2]
res_BC[bc_id] = bc_type
# Track monitoring and pressure BCs
if bc_type == 'MONITORING':
res_MONITORING[int(parts[1])] = parts[2]
elif bc_type == 'PRESSURE':
res_PRESSURE[int(parts[1])] = parts[2]
elif bc_type == 'INLET-Q':
res_INLET_Q[int(parts[1])] = parts[2]
elif bc_type == 'EXIT-H':
res_EXIT_H[int(parts[1])] = parts[2]
elif bc_type == 'WALL':
res_WALL[int(parts[1])] = parts[2]
elif line.startswith('IQParams'):
bc_id = int(parts[1])
res_IQParams[bc_id] = {
'discharge': float(parts[2]),
'unit': parts[3],
'type': parts[4]
}
elif line.startswith('EWSParamsC'):
bc_id = int(parts[1])
res_EWSParamsC[bc_id] = {
'wse': float(parts[2]),
'unit': parts[3],
'type': parts[4]
}
elif line.startswith('EWSParamsRC'):
bc_id = int(parts[1])
res_EWSParamsRC[bc_id] = {
'curve_file': parts[2].strip('"'),
'unit': parts[3],
'type': parts[4]
}
elif line.startswith('PressureNodestrings'):
group_id = int(parts[1])
nodestring_ids = [int(x) for x in parts[2:]]
res_PressureNodestrings[group_id] = nodestring_ids
elif line.startswith('PressureParams'):
nodestring_id = int(parts[1])
res_PressureParams[nodestring_id] = {
'type': parts[2],
'var1': float(parts[3]), #not sure the meaning of var1, var2, var3
'var2': float(parts[4]),
'var3': float(parts[5]),
'unit': parts[6]
}
#If the current line is PressureParams, the next line should be PressOvertop
next_line = lines[i + 1].strip()
if next_line.startswith('PressOvertop'):
nodestring_id_overtop = int(parts[1])
if nodestring_id_overtop != nodestring_id:
raise ValueError(f"Mismatched nodestring IDs between PressureParams and PressOvertop: {nodestring_id} != {nodestring_id_overtop}")
overtop_flag = int(parts[2])
res_PressureParams[nodestring_id]['overtop_flag'] = overtop_flag
# If overtop_flag is 1, next line should be PressWeirParams2
if overtop_flag == 1:
next_line = lines[i + 1].strip()
if next_line.startswith('PressWeirParams2'):
weir_parts = next_line.split()
weir_id = int(weir_parts[1])
if weir_id != nodestring_id:
raise ValueError(f"Mismatched nodestring IDs between PressOvertop and PressWeirParams2: {nodestring_id} != {weir_id}")
res_PressWeirParams[nodestring_id] = {
'crest_elevation': float(weir_parts[2]),
'weir_coefficient': float(weir_parts[3]),
'unit': weir_parts[4],
'type': weir_parts[5]
}
else:
raise ValueError(f"PressWeirParams2 not found for nodestring ID {nodestring_id}")
else:
raise ValueError(f"PressOvertop not found for nodestring ID {nodestring_id}")
# Store all results in srhhydro_content
self.srhhydro_content = {
**res_all,
'ManningsN': res_ManningsN,
'InitZoneParams': res_InitZoneParams,
'BC': res_BC,
'MONITORING': res_MONITORING,
'PRESSURE': res_PRESSURE,
'INLET-Q': res_INLET_Q,
'EXIT-H': res_EXIT_H,
'WALL': res_WALL,
'IQParams': res_IQParams,
'ISupCrParams': res_ISupCrParams,
'EWSParamsC': res_EWSParamsC,
'EWSParamsRC': res_EWSParamsRC,
'EQParams': res_EQParams,
'NDParams': res_NDParams,
'PressOvertop': res_PressOvertop,
'PressWeirParams': res_PressWeirParams,
'PressureNodestrings': res_PressureNodestrings,
'PressureParams': res_PressureParams
}
except Exception as e:
print(f"Error parsing SRHHYDRO file: {str(e)}")
raise
[docs]
def get_ManningN_dict(self):
"""
Get the dictionary for Manning's n
Returns
-------
"""
return self.srhhydro_content["ManningsN"]
[docs]
def modify_ManningsN(self, materialIDs, newManningsNValues, ManningN_MaterialNames):
"""Modify materialID's Manning's n value to new value
Parameters
----------
materialIDs : list
material ID list
newManningsNValues : list
new Manning's n value list
ManningN_MaterialNames : list
name of materials list
Returns
-------
"""
if gVerbose: print("Modify Manning's n value ...")
if not isinstance(materialIDs[0], int):
raise Exception("Material ID has to be an integer. The type of materialID passed in is ", type(materialIDs[0]))
if not isinstance(newManningsNValues[0], float):
raise Exception("Manning's n has to be a float. The type of newManningsNValue passed in is ", type(newManningsNValues[0]))
nDict = self.srhhydro_content["ManningsN"]
for i in range(len(materialIDs)):
materialID = materialIDs[i]
if materialID in nDict:
if gVerbose: print(" Old Manning's n value =", nDict[materialID], "for material ID = ", materialID)
nDict[materialID] = newManningsNValues[i]
if gVerbose: print(" New Manning's n value =", nDict[materialID], "for material ID = ", materialID)
else:
print("The specified materialID", materialID, "is not in the Manning's n list. Please check.")
[docs]
def get_BC(self):
"""
Get boundary condition dictionary.
Returns
-------
"""
BC_Dict = self.srhhydro_content["BC"]
return BC_Dict
[docs]
def get_InletQ(self):
"""
Get InletQ dictionary:
Returns
-------
"""
if 'IQParams' not in self.srhhydro_content:
IQParams_Dict = {} #empty
else:
IQParams_Dict = self.srhhydro_content['IQParams']
return IQParams_Dict
[docs]
def modify_InletQ(self, bcIDs, newInletQValues):
"""Modify bcID's InletQ value to new value.
Currently only constant inlet flow is supported.
Parameters
----------
bcIDs : list
bc ID list
newInletQValues : list
new InletQ value list
Returns
-------
"""
if gVerbose: print("Modify InletQ value ...")
if not isinstance(bcIDs[0], int):
raise Exception("Boundary condition ID has to be an integer. The type of bcID passed in is ",
type(bcIDs[0]))
if not isinstance(newInletQValues[0], float):
raise Exception("IneltQ value has to be a float. The type of newInletQValues passed in is ",
type(newInletQValues[0]))
BC_Dict = self.srhhydro_content["BC"]
if 'IQParams' not in self.srhhydro_content:
raise Exception("There is no INLET-Q boundary in the SRHHydro file.")
IQParams_Dict = self.srhhydro_content['IQParams'] #key in IQParams is the boundary ID (1-based)
for i in range(len(bcIDs)):
bcID = bcIDs[i]
if bcID in BC_Dict:
# Make sure this bcID's corresponding BC type is InletQ
if BC_Dict[bcID] != "INLET-Q":
raise Exception("The boundary condition for the specified bcID ", bcID, "is not INLET-Q.")
# Also make sure bcID is in the "IQParams" dictionary
if bcID not in IQParams_Dict:
raise Exception("The specified bcID ", bcID, "is not in the IQParams dictionary.")
#print("bcID = ", bcID)
#print("IQParams_Dict[bcID] = ", IQParams_Dict[bcID])
#print("newInletQValues[i] = ", newInletQValues[i])
if gVerbose: print(" Old InletQ value =", IQParams_Dict[bcID]['discharge'], "for boundary ID = ", bcID)
IQParams_Dict[bcID]['discharge'] = newInletQValues[i]
if gVerbose: print(" New InletQ value =", IQParams_Dict[bcID]['discharge'], "for boundary ID = ", bcID)
else:
print("The specified bcID", bcID, "is not in the boundary list. Please check.")
[docs]
def get_ExitH(self):
"""
Get ExitH dictionary:
Returns
-------
"""
if 'EWSParamsC' not in self.srhhydro_content:
EWSParamsC_Dict = {} #empty
else:
EWSParamsC_Dict = self.srhhydro_content['EWSParamsC']
return EWSParamsC_Dict
[docs]
def modify_ExitH(self, bcIDs, newExitHValues):
"""Modify bcID's ExitH value to new value.
Currently only constant WSE is supported.
Parameters
----------
bcIDs : list
bc ID list
newExitHValues : list
new ExitH value list
Returns
-------
"""
if gVerbose: print("Modify Exit-H value ...")
if not isinstance(bcIDs[0], int):
raise Exception("Boundary condition ID has to be an integer. The type of bcID passed in is ",
type(bcIDs[0]))
if not isinstance(newExitHValues[0], float):
raise Exception("Exit-H value has to be a float. The type of newExitHValues passed in is ",
type(newExitHValues[0]))
BC_Dict = self.srhhydro_content["BC"]
if 'EWSParamsC' not in self.srhhydro_content:
raise Exception("There is no constant stage exit boundary in the SRHHydro file.")
EWSParamsC_Dict = self.srhhydro_content['EWSParamsC']
for i in range(len(bcIDs)):
bcID = bcIDs[i]
if bcID in BC_Dict:
# Make sure this bcID's corresponding BC type is InletQ
if BC_Dict[bcID] != "EXIT-H":
raise Exception("The boundary condition for the specified bcID ", bcID, "is not EXIT-H.")
# Also make sure bcID is in the "EWSParamsC" dictionary
if bcID not in EWSParamsC_Dict:
raise Exception("The specified bcID ", bcID, "is not in the EWSParamsC dictionary.")
print("bcID = ", bcID)
print("EWSParamsC_Dict[bcID] = ", EWSParamsC_Dict[bcID])
print("newExitHValues[i] = ", newExitHValues[i])
if gVerbose: print(" Old EXIT-H value =", EWSParamsC_Dict[bcID]['discharge'], "for boundary ID = ", bcID)
EWSParamsC_Dict[bcID]['discharge'] = newExitHValues[i]
if gVerbose: print(" New EXIT-H value =", EWSParamsC_Dict[bcID][0], "for boundary ID = ", bcID)
else:
print("The specified bcID", bcID, "is not in the boundary list. Please check.")
[docs]
def modify_Case_Name(self, newCaseName):
"""Modify grid file name
Parameters
----------
newCaseName : str
new case name
Returns
-------
"""
if gVerbose: print("Modify case name ...")
self.srhhydro_content['Case'] = newCaseName
def get_Case_Name(self):
return self.srhhydro_content['Case']
[docs]
def modify_Grid_FileName(self, newGridFileName):
"""Modify grid file name
Parameters
----------
newGridFileName : str
new grid file name
Returns
-------
"""
if gVerbose: print("Modify grid file name ...")
self.srhhydro_content['Grid'] = newGridFileName
def get_Grid_FileName(self):
return self.srhhydro_content['Grid']
[docs]
def modify_HydroMat_FileName(self, newHydroMatFileName):
"""Modify grid file name
Parameters
----------
newHydroMatFileName : str
new HydroMat file name
Returns
-------
"""
if gVerbose: print("Modify HydroMat file name ...")
self.srhhydro_content['HydroMat'] = newHydroMatFileName
def get_HydroMat_FileName(self):
return self.srhhydro_content['HydroMat']
[docs]
def save_as(self, new_srhhydro_file_name=None):
"""Save as a new SRHHydro file (useful for modification of the SRHHydro file)
Parameters
-------
new_srhhydro_file_name : str
name of the new srhhydro file. If not provided, the original filename will be used.
"""
if new_srhhydro_file_name is None:
new_srhhydro_file_name = self.srhhydro_filename
if gVerbose: print("Writing the SRHHYDRO file %s \n" % new_srhhydro_file_name)
try:
fid = open(new_srhhydro_file_name, 'w')
except IOError:
print('srhhydro file open error')
sys.exit()
# Write header and basic parameters in order
fid.write(f"SRHHYDRO {self.srhhydro_content.get('Version', '')}\n")
# Write quoted string parameters
for param in ['Case', 'Description']:
if param in self.srhhydro_content:
fid.write(f'{param} "{self.srhhydro_content[param]}"\n')
else:
raise ValueError(f"Parameter {param} not found in the SRHHydro content.")
# Write basic simulation parameters
for param in ['RunType', 'ModelTemp', 'UnsteadyOutput']:
if param in self.srhhydro_content:
fid.write(f'{param} {self.srhhydro_content[param]}\n')
else:
raise ValueError(f"Parameter {param} not found in the SRHHydro content.")
# Write SimTime
if 'SimTime' in self.srhhydro_content:
value = self.srhhydro_content['SimTime']
fid.write(f'SimTime {value[0]} {value[1]} {value[2]}\n')
else:
raise ValueError("Parameter SimTime not found in the SRHHydro content.")
# Write turbulence and initial condition parameters
for param in ['TurbulenceModel', 'ParabolicTurbulence', 'InitCondOption']:
if param in self.srhhydro_content:
fid.write(f'{param} {self.srhhydro_content[param]}\n')
#if the initial condition option is WSE, need to write the InitZoneParams
if param == 'InitCondOption' and self.srhhydro_content['InitCondOption'] == 'WSE':
fid.write(f'NumInitZones {self.srhhydro_content["NumInitZones"]}\n')
if 'InitZoneParams' in self.srhhydro_content:
for zone_id, params in sorted(self.srhhydro_content['InitZoneParams'].items()):
fid.write(f'InitZoneParams {zone_id} {params["p1"]} {params["p2"]} {params["p3"]} {params["unit"]}\n')
else:
raise ValueError("Parameter InitZoneParams not found in the SRHHydro content.")
else:
raise ValueError(f"Parameter {param} not found in the SRHHydro content.")
# Write grid and material files
for param in ['Grid', 'HydroMat']:
if param in self.srhhydro_content:
fid.write(f'{param} "{self.srhhydro_content[param]}"\n')
else:
raise ValueError(f"Parameter {param} not found in the SRHHydro content.")
# Write monitor point file (monitoring points are optional)
for param in ['MonitorPtFile']:
if param in self.srhhydro_content:
fid.write(f'{param} "{self.srhhydro_content[param]}"\n')
else:
print(f"Parameter {param} not found in the SRHHydro content.")
# Write output parameters
if 'OutputFormat' in self.srhhydro_content:
fid.write(f'OutputFormat {self.srhhydro_content["OutputFormat"]} {self.srhhydro_content.get("OutputUnit", "")}\n')
else:
raise ValueError("Parameter OutputFormat not found in the SRHHydro content.")
for param in ['OutputOption', 'OutputInterval']:
if param in self.srhhydro_content:
fid.write(f'{param} {self.srhhydro_content[param]}\n')
else:
raise ValueError(f"Parameter {param} not found in the SRHHydro content.")
# Write Manning's n values
if 'ManningsN' in self.srhhydro_content:
for material_id, value in sorted(self.srhhydro_content['ManningsN'].items()):
fid.write(f'ManningsN {material_id} {value}\n')
else:
raise ValueError("Parameter ManningsN not found in the SRHHydro content.")
# Write boundary conditions
if 'BC' in self.srhhydro_content:
for bc_id, bc_type in sorted(self.srhhydro_content['BC'].items()):
fid.write(f'BC {bc_id} {bc_type}\n')
else:
raise ValueError("Parameter BC not found in the SRHHydro content.")
# Write inlet and exit parameters
if 'EWSParamsC' in self.srhhydro_content:
#print("EWSParamsC = ", self.srhhydro_content['EWSParamsC'])
for bc_id, params in sorted(self.srhhydro_content['EWSParamsC'].items()):
fid.write(f'EWSParamsC {bc_id} {params["wse"]} {params["unit"]} {params["type"]}\n')
if 'EWSParamsRC' in self.srhhydro_content:
#print("EWSParamsRC = ", self.srhhydro_content['EWSParamsRC'])
for bc_id, params in sorted(self.srhhydro_content['EWSParamsRC'].items()):
print("EWSParamsRC = ", bc_id, params)
#fid.write(f'EWSParamsRC {bc_id} "{params[0]}" {params[1]} {params[2]}\n')
fid.write(f'EWSParamsRC {bc_id} "{params["curve_file"]}" {params["unit"]} {params["type"]}\n')
if 'IQParams' in self.srhhydro_content:
#print("IQParams = ", self.srhhydro_content['IQParams'])
for bc_id, params in sorted(self.srhhydro_content['IQParams'].items()):
fid.write(f'IQParams {bc_id} {params["discharge"]} {params["unit"]} {params["type"]}\n')
# Write pressure nodestrings
if 'PressureNodestrings' in self.srhhydro_content:
for group_id, nodestrings in sorted(self.srhhydro_content['PressureNodestrings'].items()):
fid.write(f'PressureNodestrings {group_id} {" ".join(map(str, nodestrings))}\n')
# Write pressure parameters and overtop settings
if 'PressureParams' in self.srhhydro_content:
for group_id, params in sorted(self.srhhydro_content['PressureParams'].items()):
fid.write(f'PressureParams {group_id} {" ".join(map(str, params))}\n')
# Write PressOvertop immediately after its corresponding PressureParams
if 'PressOvertop' in self.srhhydro_content and group_id in self.srhhydro_content['PressOvertop']:
overtop_value = self.srhhydro_content['PressOvertop'][group_id]
fid.write(f'PressOvertop {group_id} {overtop_value}\n')
# If overtop is 1, write corresponding weir parameters
if overtop_value == 1 and 'PressWeirParams' in self.srhhydro_content and group_id in self.srhhydro_content['PressWeirParams']:
weir_params = self.srhhydro_content['PressWeirParams'][group_id]
fid.write(f'PressWeirParams2 {group_id} {" ".join(map(str, weir_params))}\n')
fid.close()
[docs]
def get_simulation_start_end_time(self):
"""Get the start and end time of simulation (in hours, from the "SimTime" entry)
Returns
-------
startTime : float
start time in hours
endTime : float
end time in hours
"""
value = self.srhhydro_content["SimTime"]
return value[0],value[2]
[docs]
def get_simulation_time_step_size(self):
"""Get the time step size of simulation (in seconds, from the "SimTime" entry)
Returns
-------
deltaT : float
time step size in seconds
"""
value = self.srhhydro_content["SimTime"]
return value[1]
[docs]
def get_grid_file_name(self):
"""Get grid file name (should be like "xxx.srhgeom")
Returns
-------
value : str
Name of the grid file
"""
value = self.srhhydro_content["Grid"]
return value
[docs]
def get_mat_file_name(self):
"""Get material file name (should be like "xxx.srhmat")
Returns
-------
value : str
Name of the material file
"""
value = self.srhhydro_content["HydroMat"]
return value
# Add getter methods
[docs]
def get_pressure_overtop(self, nodestring_id=None):
"""Get pressure overtop parameters
Args:
nodestring_id: Optional nodestring ID. If None, returns all parameters
Returns:
dict: Pressure overtop parameters
"""
if nodestring_id is None:
return self.srhhydro_content.get('PressOvertop', {})
return self.srhhydro_content.get('PressOvertop', {}).get(nodestring_id)
[docs]
def get_pressure_weir_params(self, nodestring_id=None):
"""Get pressure weir parameters
Args:
nodestring_id: Optional nodestring ID. If None, returns all parameters
Returns:
dict: Pressure weir parameters
"""
if nodestring_id is None:
return self.srhhydro_content.get('PressWeirParams', {})
return self.srhhydro_content.get('PressWeirParams', {}).get(nodestring_id)
[docs]
class SRH_2D_SRHGeom:
"""A class to handle srhgeom file for SRH-2D
Notes:
1. In SMS (SRH-2D), monitoring lines (ML) and monitoring points (MP) are treated separately and differently.
ML is created in the same way as boundary lines. Nodes closest to the line are recorded as a NodeString
in SRHGEOM file. MP is created as a feature point and its coordinates are recorded in the "srhmpoint" file.
Interpolation is done to probe the results at MPs.
This potentially have some drawbacks:
a. The MLs and MPs have to be defined before the simulation is run, not afterwards. One can use plot over a
line or point, but it is not convenient and it is limited. For example, to calculate flow over a line,
one has to cut a line and then integrate.
b. The way MLs are defined if slightly limited by the mesh (node locations). To be exactly at a ML, one has
to interpolate the simulated solution to the points on MLs.
2. Because BC and ML are mixed together, we need to separate them. However, this can only be done with the BC
information in the srhhydro file. That is why we need to pass in bcDict in the constructor.
Attributes
----------
Methods
----------
"""
def __init__(self, srhgeom_filename, bcDict):
"""Constructor for SRH_2D_SRHGeom
Parameters
----------
srhgeom_filename : str
Name of the SRHGeom file
bcDict : dict
Boundary condtion dictionary
"""
self.srhgeom_filename = srhgeom_filename
self.Name = ''
self.GridUnit = ''
#boundary condition dictionary. In srhgeom file, not all nodeStrings are boundary conditions. Some of them
#could be monitoring lines. This bcDict is passed in to distinguish the two (srhgeom does not have this
#information; only srhhydro has). An SRH_2D_SRHHydro object has to be previously created, and then
#bcDict <- srhhydro_obj.srhhydro_content["BC"].
self.bcDict = bcDict
#mesh information:
# number of elements
self.numOfElements = -1
# number of nodes
self.numOfNodes = -1
# number of NodeString
self.numOfNodeStrings = -1
# First read of the SRHGEOM file to get number of elements and nodes
self.getNumOfElementsNodes()
# list of nodes for all elements
self.elementNodesList = np.zeros([self.numOfElements, gMax_Nodes_per_Element], dtype=int)
# number of nodes for each element (3,4,...,gMax_Nodes_per_Element)
self.elementNodesCount = np.zeros(self.numOfElements, dtype=int)
# list of edges for all elements (For each element, the number of nodes and the number of edges are the same.
# Thus, there is no need for elementEdgesCount.
self.elementEdgesList = np.zeros([self.numOfElements, gMax_Nodes_per_Element], dtype=int)
# each element's vtk cell type
self.vtkCellTypeCode = np.zeros(self.numOfElements, dtype=int)
# each node's 3D coordinates
self.nodeCoordinates = np.zeros([self.numOfNodes, 3], dtype=np.float64)
# twoD mesh's boundingBox: [xmin, ymin, zmin, xmax, ymax, zmax]
self.twoDMeshBoundingbox = []
# each element (cell)'s bed elevation (z)
self.elementBedElevation = np.zeros(self.numOfElements, dtype=np.float64)
# center of each element
self.elementCenters = np.zeros([self.numOfElements, 3], dtype=np.float64)
# bed slope of each element at cell center
self.elementBedSlope = np.zeros([self.numOfElements, 2], dtype=np.float64)
# bed slope of each node
self.nodeBedSlope = np.zeros([self.numOfNodes, 2], dtype=np.float64)
# each NodeString's list of nodes (stored in a dictionary)
self.nodeStringsDict = {}
# get the mesh information (elementNodesList, elementNodesCount, vtkCellTypeCode, and nodeCoordinates)
# from reading the SRHGEOM file again
self.readSRHGEOMFile()
# list of elements for all nodes
self.nodeElementsList = np.zeros([self.numOfNodes, gMax_Elements_per_Node], dtype=int)
# number of elements for each node (1,2,...,gMax_Elements_per_Node)
self.nodeElementsCount = np.zeros(self.numOfNodes, dtype=int)
# build the element list for each node
self.buildNodeElements()
#edges (connecting two nodes). Not using "faces" because they are lines. When 2D mesh is extruded to 3D,
#these lines will become faces.
self.edges = {} #dictionary: {[node1, node2]: edgeID}
self.edges_r = {} #dictionary: {edgeID: [node1, node2]} #revise of self.edges
self.edgeElements = {} #dictionary: {edgeID: [element list]}. Here since one edge can be shared by two
#elements at the most, the element list's length is either 1 or 2
#edges of each boundary. Only contains real boundaries, not monitoring lines.
#boundaryID = nodeString ID read in from srhgeom file
self.boundaryEdges = {} #dictionary: {boundaryID: [list of edge IDs]}
self.boundaryEdges_normal = {} #dictionary: {boundaryID: [list of edge normal vectors]}. The normal vector is pointing outwards.
#list of all boundary edge IDs (all lumped to one list)
self.allBoundaryEdgeIDs = []
#build "lines" and "boundaryLines" dictionaries. SRH-2D has no such information in SRHGEOM. We need to build it.
self.buildEdgesAndBoundaryEdges()
#calculate the bed slope of each element and then interpolate the bed slope to each node
self.calculate_bed_slope()
[docs]
def getNumOfElementsNodes(self):
""" Get the number of elements and nodes in srhgeom mesh file
Returns
"""
if gVerbose: print("Getting numbers of elements and nodes from the SRHGEOM file ...")
# read the "srhgeom" mesh file
try:
#print("Opening the SRHGEOM file %s" % self.srhgeom_filename)
srhgeomfile = open(self.srhgeom_filename, 'r')
except:
raise Exception('Failed openning the SRHGEOM file %s' % self.srhgeom_filename)
count = 0
elemCount = 0
nodeCount = 0
nodeStringCount = 0
while True:
count += 1
# Get next line from file
line = srhgeomfile.readline()
# if line is empty
# end of file is reached
if not line:
break
# print("Line{}: {}".format(count, line.strip()))
search = line.split()
# print(search)
if len(search) != 0:
if search[0] == "Elem":
elemCount += 1
# print("Elem # %d: %s" % (elemCount, line))
elif search[0] == "Node":
nodeCount += 1
# print("Node # %d: %s" % (nodeCount, line))
elif search[0] == "NodeString":
nodeStringCount += 1
srhgeomfile.close()
self.numOfElements = elemCount
self.numOfNodes = nodeCount
self.numOfNodeStrings = nodeStringCount
if gVerbose: print("There are %d elements, %d nodes, and %d node strings in the mesh." % (self.numOfElements,
self.numOfNodes, self.numOfNodeStrings))
[docs]
def readSRHGEOMFile(self):
""" Get mesh information by reading the srhgeom file
Parameters
----------
Returns
-------
"""
if gVerbose: print("Reading the SRHGEOM file ...")
# read the "srhgeom" mesh file
try:
srhgeomfile = open(self.srhgeom_filename, 'r')
except:
raise Exception('Failed openning srhgeom file %s' % self.srhgeom_filename)
count = 0
elemCount = 0
nodeCount = 0
nodeStringCount = 0
# nodeString could be long enough to have more than one line. Use this to record the current nodeString during reading.
currentNodeStringID = -1
while True:
count += 1
# Get next line from file
line = srhgeomfile.readline()
# if line is empty
# end of file is reached
if not line:
break
# print("Line{}: {}".format(count, line.strip()))
search = line.split()
# print(search)
if len(search) != 0:
if search[0] == "Elem":
elemCount += 1
# print("Elem # %d: %s" % (elemCount, line))
self.elementNodesList[elemCount - 1][0:len(search[2:])] = [int(i) for i in search[2:]]
self.elementNodesCount[elemCount - 1] = len(search[2:])
if len(search[2:]) < 1 or len(search[2:]) > gMax_Nodes_per_Element:
sys.exit("Number of nodes for element %d is less than 1 or larger than the max of %d." % (elemCount,gMax_Nodes_per_Element))
self.vtkCellTypeCode[elemCount - 1] = vtkCellTypeMap[len(search[2:])]
#print("elem ID = ", elemCount-1)
#print("elementNodesList = ", self.elementNodesList[elemCount - 1])
elif search[0] == "Node":
nodeCount += 1
# print("Node # %d: %s" % (nodeCount, line))
self.nodeCoordinates[nodeCount - 1] = [float(i) for i in search[2:]]
elif search[0] == "NodeString":
nodeStringCount += 1
self.nodeStringsDict[int(search[1])] = [int(i) for i in search[2:]]
currentNodeStringID = int(search[1])
elif search[0].lower() == "Name".lower():
self.Name = search[1]
elif search[0].lower() == "GridUnit".lower():
self.GridUnit = search[1]
elif ("SRHGEOM".lower() not in search[0].lower()) : #assume this is still nodeString
node_list = self.nodeStringsDict[currentNodeStringID]
for i in search:
node_list.append(int(i))
self.nodeStringsDict[currentNodeStringID] = node_list
srhgeomfile.close()
#calculate cell center and the bed elevation at cell center
for cellI in range(self.numOfElements):
cell_center = np.array([0.0, 0.0, 0.0])
elev_temp = 0.0
#loop over all nodes of current element
for nodeI in range(self.elementNodesCount[cellI]):
cell_center += self.nodeCoordinates[self.elementNodesList[cellI][nodeI]-1]
elev_temp += self.nodeCoordinates[self.elementNodesList[cellI][nodeI]-1,2] #z elevation; nodeI-1 because node number is 1-based for SRH-2D
if self.elementNodesCount[cellI] > 0:
self.elementBedElevation[cellI] = elev_temp / self.elementNodesCount[cellI]
self.elementCenters[cellI] = cell_center / self.elementNodesCount[cellI]
else:
print("element ", cellI, " has no nodes. Exiting...")
sys.exit()
#calculate the 2D mesh's bounding box
xmin = np.min(self.nodeCoordinates[:,0])
ymin = np.min(self.nodeCoordinates[:,1])
zmin = np.min(self.nodeCoordinates[:,2])
xmax = np.max(self.nodeCoordinates[:,0])
ymax = np.max(self.nodeCoordinates[:,1])
zmax = np.max(self.nodeCoordinates[:,2])
self.twoDMeshBoundingbox = [xmin, ymin, zmin, xmax, ymax, zmax]
if gVerbose: print("2D mesh's bounding box = ", self.twoDMeshBoundingbox)
if False:
print("elementNodesList = ", self.elementNodesList)
print("elementNodesCount = ", self.elementNodesCount)
print("vtkCellTypeCode = ", self.vtkCellTypeCode)
print("nodeCoordinates = ", self.nodeCoordinates)
print("elementBedElevation = ", self.elementBedElevation)
print("nodeStrings = ", self.nodeStringsDict)
[docs]
def calculate_bed_slope(self):
r"""
Calculate the bed slope of each element and then interpolate the bed slope to each node. The slope of each element (e.g., triangle and quadrilateral) is calculated
by using the Gauss theorem. For each element, the bed slope is calculated by:
1. compute the outward normal vector of each edge of the element.
2. interpolate the bed elevation at the center of each edge of the element from the the two points of the edge.
3. use the Gauss theorem to calculate the bed slope of the element: S = -(1/A) * \int_{\partial A} z * n * ds, where A is the area of the element,
z is the bed elevation, n is the outward normal vector, and ds is the length of the edge.
4. interpolate the bed slope of the element to each node of the element.
Returns
-------
"""
import numpy as np
import sys
# Initialize array to store bed slopes [Sx, Sy] for each element
num_elements = self.numOfElements
element_bed_slopes = np.zeros((num_elements, 2))
#print("self.elementNodesCount = ", self.elementNodesCount)
for i in range(num_elements):
# Get nodes for this element
element_nodes = self.elementNodesList[i]
num_nodes = self.elementNodesCount[i]
# Get coordinates of element nodes
node_coords = self.nodeCoordinates[element_nodes-1] #-1 because node number is 1-based for SRH-2D
# Calculate element area and determine node ordering
if num_nodes == 3: # Triangle
# Area of triangle using cross product
v1 = node_coords[1] - node_coords[0]
v2 = node_coords[2] - node_coords[0]
area = 0.5 * np.cross(v1[:2], v2[:2]) # Remove abs() to preserve sign
elif num_nodes == 4: # Quadrilateral
# Area of quadrilateral using shoelace formula
x = node_coords[:num_nodes, 0]
y = node_coords[:num_nodes, 1]
area = 0.5 * np.sum(x * np.roll(y, -1) - np.roll(x, -1) * y) # Remove abs() to preserve sign
# Check for degenerate quads
#if np.any(np.diff(x) == 0) and np.any(np.diff(y) == 0):
# print(f"Warning: Element {i} may be degenerate")
else: # General polygon (pentagon, hexagon, etc.)
# Area of general polygon using shoelace formula
x = node_coords[:num_nodes, 0]
y = node_coords[:num_nodes, 1]
area = 0.5 * np.sum(x * np.roll(y, -1) - np.roll(x, -1) * y) # Remove abs() to preserve sign
#else:
# print("Element %d is not a triangle or quadrilateral. Other shapes are not supported yet. Exiting..." % i)
# sys.exit()
# Take absolute value of area for size, but keep sign for orientation
area_abs = np.abs(area)
is_clockwise = area < 0
# Check for degenerate elements (zero or very small area)
# if area_abs < 1e-12: # Very small threshold for numerical precision
# print(f"ERROR: Element {i} has zero area ({area})")
# print(f"Element nodes: {element_nodes}")
# print(f"Node coordinates:")
# for j, coord in enumerate(node_coords):
# print(f" Node {j}: {coord}")
# print(f"Number of nodes: {num_nodes}")
# # For now, assign a very small area to avoid division by zero
# # This is a temporary fix - the real issue should be investigated
# area_abs = 1e-12
# area = 1e-12 if area >= 0 else -1e-12
# print(f"Assigned temporary area: {area}")
#debug
#print("element id = ", i)
#print("element nodes = ", element_nodes)
#print("num_nodes = ", num_nodes)
#print("area = ", area)
#print("is_clockwise = ", is_clockwise)
# Initialize slope components
Sx = 0.0
Sy = 0.0
# Process each edge of the element (number of edges = number of nodes)
for j in range(num_nodes):
# Get nodes of current edge
node1 = element_nodes[j]
node2 = element_nodes[(j + 1) % num_nodes]
# Get coordinates of edge nodes
p1 = self.nodeCoordinates[node1-1] #-1 because node number is 1-based for SRH-2D
p2 = self.nodeCoordinates[node2-1] #-1 because node number is 1-based for SRH-2D
# Calculate edge length
edge_length = np.sqrt(np.sum((p2[:2] - p1[:2])**2))
# Calculate outward normal vector (normalized)
dx = p2[0] - p1[0]
dy = p2[1] - p1[1]
# For counterclockwise elements, normal = [dy, -dx]
# For clockwise elements, normal = [-dy, dx]
if is_clockwise:
normal = np.array([-dy, dx]) / edge_length
else:
normal = np.array([dy, -dx]) / edge_length
#debug
#print("edge id = ", j)
#print("normal = ", normal)
#print("edge center = ", (p1 + p2) / 2)
# Interpolate bed elevation at edge center
z_center = 0.5 * (p1[2] + p2[2])
# Add contribution to slope components using Gauss theorem
Sx += z_center * normal[0] * edge_length
Sy += z_center * normal[1] * edge_length
# Final slope calculation (note the negative sign from the Gauss theorem)
element_bed_slopes[i] = [-Sx / area_abs, -Sy / area_abs]
self.elementBedSlope = element_bed_slopes
# Interpolate bed slope to each node in the mesh
for i in range(self.numOfNodes):
# Get elements for this node
node_elements = self.nodeElementsList[i]
num_elements = self.nodeElementsCount[i]
# Get slopes of the elements that share this node
node_slopes = element_bed_slopes[node_elements]
# Interpolate the slope components separately
node_slope_x = np.sum(node_slopes[:, 0]) / num_elements
node_slope_y = np.sum(node_slopes[:, 1]) / num_elements
# Store the slope vector
self.nodeBedSlope[i] = [node_slope_x, node_slope_y]
[docs]
def save_as(self, newSrhgeomFileName, dir=''):
""" Save as a srhgeom file.
It can be used to save to a new srhgeom file after things have been modified, e.g., bathymetry
Parameters
----------
Returns
-------
"""
if gVerbose: print("Wring the SRHGEOM file %s \n" % newSrhgeomFileName)
if len(dir) != 0:
newSrhgeomFileName = dir + "/" + newSrhgeomFileName
try:
fid = open(newSrhgeomFileName, 'w')
except IOError:
print('srhgeom file open error')
sys.exit()
fid.write('SRHGEOM 30\n')
fid.write('Name \"Saved from pyHMT2D \"\n')
fid.write('\n')
fid.write('GridUnit %s \n' % self.GridUnit)
#write elements
for elemI in range(1, self.numOfElements + 1):
fid.write('Elem %d' % elemI)
for nodeI in range(1, self.elementNodesCount[elemI-1] + 1):
fid.write(' %d' % self.elementNodesList[elemI-1, nodeI-1])
fid.write('\n')
#write nodes
for nodeI in range(1, self.numOfNodes + 1):
fid.write('Node %d %f %f %f\n' % (nodeI,
self.nodeCoordinates[nodeI-1, 0],
self.nodeCoordinates[nodeI-1, 1],
self.nodeCoordinates[nodeI-1, 2]))
#write nodeStrings
for nodeStringID, nodeStringNodeList in self.nodeStringsDict.items():
fid.write('NodeString %d' % nodeStringID)
# line break counter (start a new line every 10 nodes)
line_break_counter = 0
for nodeI in nodeStringNodeList:
fid.write(' %d' % nodeI)
line_break_counter += 1
# 10 numbers per line
if ((line_break_counter % 10) == 0):
fid.write("\n")
line_break_counter = 0
fid.write('\n')
fid.close()
[docs]
def buildNodeElements(self):
""" Build node's element list for all nodes
Returns
-------
"""
if gVerbose: print("Building mesh's node, elements, and topology ...")
#create an empty list of lists for all nodes
self.nodeElementsList = [[] for _ in range(self.numOfNodes)]
#loop over all cells
for cellI in range(self.numOfElements):
#loop over all nodes of current cell
for i in range(self.elementNodesCount[cellI]):
#get the node ID of current node
nodeID = self.elementNodesList[cellI][i]
self.nodeElementsList[nodeID-1].append(cellI) #nodeID-1 because SRH-2D is 1-based
#count beans in each node's element list
for nodeI in range(self.numOfNodes):
self.nodeElementsCount[nodeI] = len(self.nodeElementsList[nodeI])
#print("nodeElementsCount = ", self.nodeElementsCount)
#print("nodeElementsList = ", self.nodeElementsList)
[docs]
def buildEdgesAndBoundaryEdges(self):
""" Build edges and boundaryEdges dictionaries
Returns
-------
"""
current_edgeID = 1 #note: SRH-2D is 1-based.
#loop over all elements
for cellI in range(self.numOfElements):
# loop over all edges of current element
for i in range(self.elementNodesCount[cellI]):
# get the node ID of current and next nodes
# connecting current edge
nodeID_1 = 0
nodeID_2 = 0
if i != (self.elementNodesCount[cellI]-1): #if not the last edge
nodeID_1 = self.elementNodesList[cellI][i]
nodeID_2 = self.elementNodesList[cellI][i+1]
else: #if the last edge
nodeID_1 = self.elementNodesList[cellI][i]
nodeID_2 = self.elementNodesList[cellI][0]
curr_edge_node_IDs = tuple(sorted([nodeID_1, nodeID_2]))
#check whether the current edge is in the edges dictionary or not
if curr_edge_node_IDs not in self.edges: #if no, added a new edge
self.edges[curr_edge_node_IDs] = current_edgeID
self.edges_r[current_edgeID] = curr_edge_node_IDs
#add the new edge to edgeElements dictionary too
self.edgeElements[current_edgeID] = [cellI+1] #+1 because SRH-2D is 1-based
current_edgeID += 1
else: # if yes, neighbor element already has that edge and no need to do anything, but need to add
# the current element to edgeElements list
#get the existing edge's ID
existingEdgeID = self.edges[curr_edge_node_IDs]
#in this case, there should be one and only one element in the list
assert len(self.edgeElements[existingEdgeID]) == 1
self.edgeElements[existingEdgeID].append(cellI+1) #+1 because SRH-2D is 1-based
#build the allBoundaryEdgeIDs list: boundary edges are those with only one element
#loop over all edges in the mesh
for edge in self.edgeElements:
if len(self.edgeElements[edge]) == 1:
self.allBoundaryEdgeIDs.append(edge)
#list to flag whether a boundary edge in self.allBoundaryEdgeIDs is used or not
#if not, it is a default boundary (wall)
allBoundaryEdgeUsageFlag = {}
for boundaryEdgeID in self.allBoundaryEdgeIDs:
allBoundaryEdgeUsageFlag[boundaryEdgeID] = False
#now all edges are built, we check the boundary edges
#loop through all boundaries.
for nodeString in self.nodeStringsDict:
#not all NodeStrings are boundaries. Could be monitoring lines. Need to exclude them.
if nodeString not in self.bcDict.keys():
continue
#also exclude internal BC nodeStrings such as weir and pressure
if 'WEIR' in self.bcDict[nodeString] or 'PRESSURE' in self.bcDict[nodeString]:
continue
#list of nodes in current nodeString
nodeString_nodeList = self.nodeStringsDict[nodeString]
current_boundary_edge_list = []
#loop through all edges in current boundary
for i in range(len(nodeString_nodeList)-1):
nodeID_1 = nodeString_nodeList[i]
nodeID_2 = nodeString_nodeList[i + 1]
#curr_edge_node_IDs = tuple(sorted([nodeID_1, nodeID_2]))
curr_edge_node_IDs = tuple([nodeID_1, nodeID_2])
curr_edge_node_IDs_reverse = tuple([nodeID_2, nodeID_1])
#check whether the edge is in the edge list. If not, we got a problem.
if curr_edge_node_IDs in self.edges:
current_boundary_edge_list.append(self.edges[curr_edge_node_IDs])
allBoundaryEdgeUsageFlag[self.edges[curr_edge_node_IDs]] = True
elif curr_edge_node_IDs_reverse in self.edges:
current_boundary_edge_list.append(-self.edges[curr_edge_node_IDs_reverse])
allBoundaryEdgeUsageFlag[self.edges[curr_edge_node_IDs_reverse]] = True
else:
print("Boundary edge ", curr_edge_node_IDs, "in NodeString", nodeString, "can not be found in edge list. Mesh is wrong. Exiting...")
sys.exit()
self.boundaryEdges[nodeString] = current_boundary_edge_list
#build default boundary as wall (in SMS, the default boundary is not exported in SRHGEOM)
#boundary edges are those who is used only by one element.
unusedBoundaryEdgeList = []
for boundaryEdgeID in self.allBoundaryEdgeIDs:
if not allBoundaryEdgeUsageFlag[boundaryEdgeID]: #if not used
unusedBoundaryEdgeList.append(boundaryEdgeID)
if len(unusedBoundaryEdgeList) > 0: #if there are unused boundary edges
defaultWallBoundaryID = len(self.boundaryEdges)+1 #boundary ID for default boundary
self.boundaryEdges[defaultWallBoundaryID] = unusedBoundaryEdgeList
#compute the outward normal vectors of all boundary edges
for boundaryID in self.boundaryEdges:
self.boundaryEdges_normal[boundaryID] = []
for edgeID in self.boundaryEdges[boundaryID]:
#get the nodes of the current edge
edge_nodes = self.edges_r[abs(edgeID)] #edgeID is negative for the reverse edge
node1 = edge_nodes[0]
node2 = edge_nodes[1]
p1 = self.nodeCoordinates[node1-1]
p2 = self.nodeCoordinates[node2-1]
edge_center = (p1 + p2) / 2
edge_vector = p2 - p1
edge_vector_magnitude = np.linalg.norm(edge_vector)
edge_normal_vector = edge_vector / edge_vector_magnitude
#make sure the normal vector is pointing outwards: get the cell/element that the edge belongs to,
#get the cell center,
element_ID = self.edgeElements[abs(edgeID)][0]
cell_center = self.elementCenters[element_ID-1]
#compute the vector from the cell center to the edge node
cell_center_vector = edge_center - cell_center
#compute the dot product of the cell center vector and the edge normal vector
dot_product = np.dot(cell_center_vector, edge_normal_vector)
#if the dot product is positive, the normal vector is pointing outwards
if dot_product < 0:
edge_normal_vector = -edge_normal_vector
self.boundaryEdges_normal[boundaryID].append(edge_normal_vector)
#build elementEdgesList
# loop over all elements
for cellI in range(self.numOfElements):
# loop over all edges of current element
for i in range(self.elementNodesCount[cellI]): #i is the current edge index for this element
# get the node ID of current and next nodes
# connecting current edge
nodeID_1 = 0
nodeID_2 = 0
if i != (self.elementNodesCount[cellI] - 1): # if not the last edge
nodeID_1 = self.elementNodesList[cellI][i]
nodeID_2 = self.elementNodesList[cellI][i + 1]
else: # if the last edge
nodeID_1 = self.elementNodesList[cellI][i]
nodeID_2 = self.elementNodesList[cellI][0]
curr_edge_node_IDs = tuple([nodeID_1, nodeID_2]) #the original order of the edge nodes from mesh file
curr_edge_node_IDs_reverse = tuple([nodeID_2, nodeID_1]) #the order of the edge nodes reversed
# check whether the current edge is in the edges dictionary or not
if curr_edge_node_IDs in self.edges:
self.elementEdgesList[cellI, i] = self.edges[curr_edge_node_IDs]
elif curr_edge_node_IDs_reverse in self.edges:
self.elementEdgesList[cellI, i] = -self.edges[curr_edge_node_IDs_reverse] #negative means the order is opposite.
else: # something is wrong if the edge is not found in the edge list.
print("Cell: ", cellI, " , edge: ", i, " edge node IDs: ", curr_edge_node_IDs, "can not be found in edge list. Mesh is wrong. Exiting...")
sys.exit()
#debug
if False:
print("edges", self.edges)
print("edgeElements",self.edgeElements)
print("elementEdgesList", self.elementEdgesList)
print("allBoundaryEdgeIDs",self.allBoundaryEdgeIDs)
print("boundaryEdges", self.boundaryEdges)
[docs]
def extrude_to_3D(self, layerHeights, mshFileName, bTerrainFollowing=False, dir=''):
""" Extrude the 2D mesh to 3D with layers
When extruded: node -> line, edge -> face, 2D element-> volume, boundary line -> boundary face
The extruded mesh is written to a GMSH MSH file. Tried MSH version 4. But it seems
too complicated (entities etc.). They are not necessary for our purpose.
Therefore, we will stick with version 2 and OpenFOAM takes this version well.
Ref: https://gmsh.info/doc/texinfo/gmsh.html#MSH-file-format-version-2-_0028Legacy_0029
Parameters
----------
layerHeights : list
a list with strictly increasing heights for the layers. If bTerrainFollowing
is True, only the last value is used as the top elevation.
mshFileName : str
name of the GMSH MSH file to be written
bTerrainFollowing : bool
whether to follow the terrain. If False, the bottom will be at z = 0, and the layer heights
are specified in layerHeights. If True, the bottom will key the terrain elevation, the top
will be at layerHeights[-1] and interpolation inbetween.
Returns
-------
"""
#GMSH element type code
MSHTRI = 2 # 3-node triangle
MSHQUAD = 3 # 4-node quadrilateral
MSHHEX = 5 # 8-node hexahedron
MSHPRISM = 6 # 6-node prism
allNodes = {} #dict: {nodeID: [x,y,z]}
allCells = {} #dict: {cellID: [cell_type, [list of nodes]]}
number_of_prism = 0 #number of prism in the extruded 3D mesh
number_of_hex = 0 #number of hex in the extruded 3D mesh
#some sanity check nlayers and layerHeights
nlayers = len(layerHeights)
if nlayers == 0:
raise Exception("layerHeight is empty. Please check. Exiting ...")
#check the values in the list layerHeights are strictly increasing
bHeighIncreasing = all(i < j for i, j in zip(layerHeights, layerHeights[1:]))
if not bHeighIncreasing:
raise Exception("Values in layerHeight are not strictly increasing. Please check. Exiting ...")
#a list for all side boundaries (not including top and bottom)
allSideBoundaryFaces = {} #dict: {boundaryID: [list of faces]}. Here face is a list of nodes.
#create all nodes
#1. add the original nodes in 2D mesh
for nodeI in range(self.nodeCoordinates.shape[0]):
if bTerrainFollowing:
allNodes[nodeI] = self.nodeCoordinates[nodeI,:]
else:
allNodes[nodeI] = self.nodeCoordinates[nodeI, :]
allNodes[nodeI][2] = 0.0 # make the 2D mesh flat
#2. add the extruded nodes
for layerI in range (1, nlayers+1):
for nodeI in range(self.nodeCoordinates.shape[0]):
if bTerrainFollowing: #interpolation between bottom elevation and the top elevation in layerHeights[-1]
elev = allNodes[nodeI][2] + (layerHeights[-1] - allNodes[nodeI][2])/nlayers*layerI
allNodes[nodeI + layerI * self.numOfNodes] = np.array([self.nodeCoordinates[nodeI, 0],
self.nodeCoordinates[nodeI, 1],
elev])
else:
# make the original 2D mesh flat; only add the layer height
allNodes[nodeI + layerI*self.numOfNodes] = np.array([self.nodeCoordinates[nodeI,0],
self.nodeCoordinates[nodeI,1],
layerHeights[layerI-1]])
#create all cells
#bottom and top boundary faces list
bottomBoundaryFaces = [None] * self.numOfElements
topBoundaryFaces = [None] * self.numOfElements
cellID = 0
#layer by layer
for layerI in range(1, nlayers+1):
#loop through each element in 2D mesh
for elementI in range(self.numOfElements):
if self.elementNodesCount[elementI] == 3: #triangle -> prism
cell_type = MSHPRISM
number_of_prism += 1
elif self.elementNodesCount[elementI] == 4: #quad -> hex
cell_type = MSHHEX
number_of_hex += 1
cellID += 1
cell_list = []
top_face_node_list = []
bottom_face_node_list = []
#loop over all nodes of the current elment in 2D mesh (at current layer's bottom)
for nodeI in range(self.elementNodesCount[elementI]):
cell_list.append(self.elementNodesList[elementI, nodeI] + (layerI-1)*self.numOfNodes)
#if this is the first layer, record the bottom face to bottomBoundaryFaces list
if layerI == 1:
bottom_face_node_list.append(self.elementNodesList[elementI, nodeI])
#now add the nodes at current layer's top.
for nodeI in range(self.elementNodesCount[elementI]):
cell_list.append(self.elementNodesList[elementI, nodeI] + layerI*self.numOfNodes)
#if this is the last layer, record the top face to topBoundaryFaces list
if layerI == nlayers:
top_face_node_list.append(self.elementNodesList[elementI, nodeI] + layerI*self.numOfNodes)
allCells[cellID] = [cell_type, cell_list]
bottomBoundaryFaces[elementI] = bottom_face_node_list
topBoundaryFaces[elementI] = top_face_node_list
#create all boundaries
#allSideBoundaryFaces = {} #dict: {boundaryID: [list of faces]}. Here face is a list of nodes.
#loop through all boundary lines (nodeStrings) in 2D mesh. self.boundaryEdges only
#contains real boundaries, not monitoring lines.
for boundaryID in self.boundaryEdges:
#list of all faces for current boundary
#each face is a list four nodes (quad because we extrude a line in vertical direction)
faceList = []
#loop through all edges of current boundary
for edgeID in self.boundaryEdges[boundaryID]:
nodeID_1 = self.edges_r[edgeID][0]
nodeID_2 = self.edges_r[edgeID][1]
#loop through all layers
for layerI in range(1, nlayers + 1):
#add the four nodes to the current face (ordered)
curFace = [nodeID_1+(layerI-1)*self.numOfNodes, nodeID_1+layerI*self.numOfNodes,
nodeID_2+ layerI*self.numOfNodes, nodeID_2+(layerI-1)*self.numOfNodes]
#add the current face to the faceList for current boundary
faceList.append(curFace)
#now we have all faces in the current boundary
allSideBoundaryFaces[boundaryID] = faceList
#write to GMSH MSH file
if dir!='':
mshFileName_base = dir + '/' + mshFileName
else:
mshFileName_base = mshFileName
if gVerbose: print("Write to GMESH MSH file with name = ", mshFileName_base)
fid = open(mshFileName_base, 'w')
#write MeshFormat
fid.write("$MeshFormat\n")
fid.write("2.1 0 8\n")
fid.write("$EndMeshFormat\n")
#write PhysicalNames:
fid.write("$PhysicalNames\n")
fid.write("%d\n" % (len(allSideBoundaryFaces)+2+1)) #need to output all side boundaries,
#top + bottom (2), and the volume (1)
# maximum side boundary ID. This is the starting point for IDs of top, bottom, and volume
maxSideBoundaryID = -999
#loop through all side boundaries
for boundaryID in allSideBoundaryFaces:
if boundaryID in self.bcDict.keys(): #if the current BC is defined in srhhydro
boundary_name = "boundary_"+str(boundaryID)+"_"+self.bcDict[boundaryID] #name = boundary_ID_BCType
else: #else, this should be default wall boundary
boundary_name = "boundary_"+str(boundaryID)+"_"+"wall" #name = boundary_ID_wall
fid.write("2 %d \"%s\"\n" % (boundaryID, boundary_name)) #dimension, physicalTag, name
maxSideBoundaryID = max(maxSideBoundaryID, boundaryID)
#output the top and bottom boundaries
boundary_name = "top"
fid.write("2 %d \"%s\"\n" % (maxSideBoundaryID+1, boundary_name)) # dimension, physicalTag, name
boundary_name = "bottom"
fid.write("2 %d \"%s\"\n" % (maxSideBoundaryID+2, boundary_name)) # dimension, physicalTag, name
#output the volume
volume_name = "channel" #just a generic name for the volume
fid.write("3 %d \"%s\"\n" % (maxSideBoundaryID+2+1, volume_name))
fid.write("$EndPhysicalNames\n")
#write Nodes
fid.write("$Nodes\n")
#total_num_nodes = (nlayers+1)*self.numOfNodes
total_num_nodes = len(allNodes)
fid.write("%d\n" % total_num_nodes)
#output node coordinates
for nodeI in allNodes:
#GMSH is 1-based
fid.write("%d %f %f %f\n" %(nodeI+1, allNodes[nodeI][0], allNodes[nodeI][1], allNodes[nodeI][2]))
fid.write("$EndNodes\n")
#write Elements (boundary conditions and volume)
# numElements = number of all faces in all boundaries + number of cells in all volumes
elementTag_counter = 0 #count the number of elements
fid.write("$Elements\n")
if (number_of_hex == 0) and (number_of_prism == 0):
print("There is on hex or prism, the only supported cell types, in the extruded 3D mesh. Check the mesh. Exiting ...")
sys.exit()
#output total number of elements
# num. of side boundary faces + num. of faces on top and bottom + num. of hex + num. of prism
fid.write("%d \n" % (len(self.allBoundaryEdgeIDs)*nlayers + self.numOfElements*2 + number_of_hex + number_of_prism))
#loop through all side boundaries
for boundaryID in allSideBoundaryFaces:
#output each quad face's
for faceID in range(len(allSideBoundaryFaces[boundaryID])):
elementTag_counter += 1
fid.write("%d 3 1 %d %d %d %d %d\n" % (elementTag_counter, boundaryID, allSideBoundaryFaces[boundaryID][faceID][0],
allSideBoundaryFaces[boundaryID][faceID][1], allSideBoundaryFaces[boundaryID][faceID][2],
allSideBoundaryFaces[boundaryID][faceID][3]))
#output top and bottom boundaries
for faceI in topBoundaryFaces:
elementTag_counter += 1
if len(faceI) == 3: #triangle
fid.write("%d 2 1 %d %d %d %d \n" % (elementTag_counter, maxSideBoundaryID+1,
faceI[0],faceI[1],faceI[2]))
elif len(faceI) == 4: #quad
fid.write("%d 3 1 %d %d %d %d %d \n" % (elementTag_counter, maxSideBoundaryID+1,
faceI[0], faceI[1], faceI[2], faceI[3]))
for faceI in bottomBoundaryFaces:
elementTag_counter += 1
if len(faceI) == 3: # triangle
fid.write("%d 2 1 %d %d %d %d \n" % (elementTag_counter, maxSideBoundaryID + 2,
faceI[0], faceI[1], faceI[2]))
elif len(faceI) == 4: # quad
fid.write("%d 3 1 %d %d %d %d %d \n" % (elementTag_counter, maxSideBoundaryID + 2,
faceI[0], faceI[1], faceI[2], faceI[3]))
#output the volume's tag and node list. Hex and prism need to be seperate
#1.Hex
if number_of_hex > 0:
#loop through all cells in the volume
for cellID in allCells:
if allCells[cellID][0] == MSHHEX:
elementTag_counter += 1
fid.write("%d 5 1 %d %d %d %d %d %d %d %d %d\n" % (elementTag_counter, maxSideBoundaryID+2+1,
allCells[cellID][1][0], allCells[cellID][1][1],
allCells[cellID][1][2], allCells[cellID][1][3], allCells[cellID][1][4],
allCells[cellID][1][5], allCells[cellID][1][6], allCells[cellID][1][7]))
#2. prism
if number_of_prism > 0:
#loop through all cells in the volume
for cellID in allCells:
if allCells[cellID][0] == MSHPRISM:
elementTag_counter += 1
fid.write("%d 6 1 %d %d %d %d %d %d %d\n" % (elementTag_counter, maxSideBoundaryID+2+1,
allCells[cellID][1][0], allCells[cellID][1][1],
allCells[cellID][1][2], allCells[cellID][1][3],
allCells[cellID][1][4], allCells[cellID][1][5]))
fid.write("$EndElements")
fid.close()
[docs]
def output_2d_mesh_to_vtk(self,meshVTKFileName, bFlat=False, dir=''):
"""output the 2D mesh to vtk
Parameters
--------
meshVTKFileName : str
VTK file name for the mesh
bFlat : bool, optional
whether to make a flat mesh (z=0)
dir : str, optional
directory to write the VTK file to
Returns
-------
"""
vtkFileName = ''
if len(dir) == 0:
vtkFileName = meshVTKFileName
else:
vtkFileName = dir + "/" + meshVTKFileName
# build VTK object:
# points
pointsVTK = vtk.vtkPoints()
flatCoordinates = np.copy(self.nodeCoordinates)
if bFlat:
flatCoordinates[:,2] = 0.0
pointsVTK.SetData(VN.numpy_to_vtk(flatCoordinates))
# cell topology information list: [num. of nodes, node0, node1, .., num. of nodes, nodexxx]
# the list start with the number of nodes for a cell and then the list of node indexes
connectivity_list = []
# type of cells (contains the number of face points
cellFPCounts = np.zeros(self.elementNodesList.shape[0], dtype=np.int64)
# loop over all elements
for k in range(self.elementNodesList.shape[0]):
connectivity_list.append(self.elementNodesCount[k])
for nodeI in range(self.elementNodesCount[k]):
connectivity_list.append(
self.elementNodesList[k][nodeI] - 1) # -1 becasue SRH-2D is 1-based.
cellFPCounts[k] = self.elementNodesCount[k]
connectivity = np.array(connectivity_list, dtype=np.int64)
# convert cell's number of face points to VTK cell type
vtkHandler_obj = vtkHandler()
cell_types = vtkHandler_obj.number_of_nodes_to_vtk_celltypes(cellFPCounts)
cellsVTK = vtk.vtkCellArray()
cellsVTK.SetCells(self.elementNodesList.shape[0], VN.numpy_to_vtkIdTypeArray(connectivity))
uGrid = vtk.vtkUnstructuredGrid()
uGrid.SetPoints(pointsVTK)
uGrid.SetCells(cell_types.tolist(), cellsVTK)
# write to vtk file
unstr_writer = vtk.vtkUnstructuredGridWriter()
unstr_writer.SetFileName(vtkFileName)
unstr_writer.SetInputData(uGrid)
unstr_writer.Write()
[docs]
def output_nodeString_line_coordinates(self, nodeStringID, nodeStringFileName, dir=''):
""" Output the nodeString line coordinates into a text file
Parameters
----------
nodeStringID : int
the ID of the nodeString to be written out
nodeStringFileName : str
the name of the file to be written to
dir : str, optional
directory to write the output file to
Returns
-------
"""
nodeStringFileName_final = ''
if len(dir) == 0:
nodeStringFileName_final = nodeStringFileName
else:
nodeStringFileName_final = dir + "/" + nodeStringFileName
#check whether the given nodeStringID is in the nodeStringDict
if nodeStringID not in self.nodeStringsDict.keys():
print("The given nodeStringID", nodeStringID, "is not valid. Valid nodeString IDs are: ",
self.nodeStringsDict.keys())
try:
fid = open(nodeStringFileName_final, 'w')
except IOError:
print('NodeString file open error')
sys.exit()
#write the header
fid.write("station, x, y, z\n")
# list of nodes in current nodeString
nodeString_nodeList = self.nodeStringsDict[nodeStringID]
station = [0] * len(nodeString_nodeList)
x = [0] * len(nodeString_nodeList)
y = [0] * len(nodeString_nodeList)
z = [0] * len(nodeString_nodeList)
#loop over all nodes in the list
for nodeI in range(len(nodeString_nodeList)):
x[nodeI] = self.nodeCoordinates[nodeString_nodeList[nodeI] - 1, 0]
y[nodeI] = self.nodeCoordinates[nodeString_nodeList[nodeI] - 1, 1]
z[nodeI] = self.nodeCoordinates[nodeString_nodeList[nodeI] - 1, 2]
if nodeI!=0: #if not the first node in the list, calculate the station
station[nodeI] = station[nodeI-1] + np.sqrt( (x[nodeI]-x[nodeI-1])**2 + (y[nodeI]-y[nodeI-1])**2 )
#write the data
for nodeI in range(len(nodeString_nodeList)):
fid.write("%f, %f, %f, %f\n" % (station[nodeI], x[nodeI], y[nodeI], z[nodeI]))
fid.close()
[docs]
def output_as_gmsh(self, gmshFileName, dir=''):
""" Output SRH-2D mesh in GMESH format
Not implemented yet.
Parameters
----------
gmshFileName : str
GMSH MSH file name to write to
dir : str, optional
directory name to write to
Returns
---------
"""
return NotImplemented
if dir!='':
gmshFileName_base = dir + '/' + gmshFileName
else:
gmshFileName_base = gmshFileName
if gVerbose: print("GMESH file name = ", gmshFileName_base)
[docs]
class SRH_2D_SRHMat:
"""A class to handle srhmat file for SRH-2D
Attributes
----------
srhmat_filename : str
name of the srhmat file
numOfMaterials : int
number of materials defined in the srhmat file
matZoneCells : dict
each material zone's cell list (in a dictionary: material zone ID and cell list)
Methods
"""
def __init__(self, srhmat_filename):
"""SRH_2D_SRHMat constructor from a srhmat file
Parameters
----------
srhmat_filename : str
name of the srhmat file
"""
self.srhmat_filename = srhmat_filename
#number of materials (Manning's n zones)
self.numOfMaterials = -1
#list of Material names (in a dictionary: ID and name)
self.matNameList = {}
#each material zone's cell list (in a dictionary: material zone ID and cell list)
self.matZoneCells = {}
#build material zones data
self.buildMaterialZonesData()
[docs]
def buildMaterialZonesData(self):
"""Build the data for material zones from reading the srhmat file
Returns
-------
"""
if gVerbose: print("Reading the SRHMAT file ...")
# read the "srhmat" material file
try:
srhmatfile = open(self.srhmat_filename, 'r')
except:
print('Failed openning SRHMAT file', self.srhmat_filename)
sys.exit()
res_MatNames = {} #dict to hold "MatName" entries for material names list: ID and name
res_Materials = {} #dict to hold "Material" information: ID and list of cells
#add default material name
res_MatNames["-1"] = "Default"
current_MaterialID = -1
current_MaterialCellList = []
while True:
# Get a line from file
line = srhmatfile.readline()
# if line is empty
# end of file is reached
if not line:
break
# print("Line{}: {}".format(count, line.strip()))
search = line.split()
# print(search)
# print("search[0] == Material", (search[0] == "Material"))
if len(search) != 0: #if it is not just an empty line
if search[0] == "SRHMAT":
continue
elif search[0] == "NMaterials":
self.numOfMaterials = int(search[1])
continue
elif search[0] == "MatName":
res_MatNames[search[1]] = search[2]
continue
elif search[0] == "Material": # a new material zone starts
#if this is not the first Material zone; save the previous Material zone
if ((current_MaterialID != -1) and (len(current_MaterialCellList)!=0)):
res_Materials[current_MaterialID] = copy.deepcopy(current_MaterialCellList)
#clear up current_MaterialID and current_MaterialCellList
current_MaterialID = -1
current_MaterialCellList.clear()
current_MaterialID = int(search[1])
current_MaterialCellList.extend(search[2:])
else: #still in a material zone (assume there is no other things other than those listed above in the
#srhmat file)
current_MaterialCellList.extend(search)
#add the last material zone
res_Materials[current_MaterialID] = copy.deepcopy(current_MaterialCellList)
srhmatfile.close()
#within SRHMat file, we have no idea whether there are any cells in the default material zone. Add a default
#wiht an empty cell list. The user of SRH_2D_SRHMat should take care of that.
res_Materials[-1] = []
#convert material ID from str to int
for zoneID, cellList in res_Materials.items():
#print("cellList ", cellList)
temp_list = [int(i) for i in cellList]
res_Materials[zoneID] = temp_list
self.matNameList = res_MatNames
self.matZoneCells = res_Materials
#print("matNameList = ", self.matNameList)
#print("matZoneCells = ", self.matZoneCells)
[docs]
def find_cell_material_ID(self, cellID):
"""Given a cell ID, find its material (Manning's n) ID
Parameters
----------
cellID : int
cell ID to find material ID for
Returns
-------
"""
if not self.matZoneCells:
self.buildMaterialZonesData()
#whether the cell is found in the list
bFound = False
for matID, cellList in self.matZoneCells.items():
if (cellID+1) in cellList: #cellID+1 because SRH-2D is 1-based
bFound = True
return matID
#if cellID is not found, report problem.
#This could be due to several reasons. Most likely, the material coverage in SMS does not
#cover certain area of the mesh. In this case, SMS will not report the material for the missed
#cells in srhmat (no warning given either). Here, if we can't find the cell in material list,
#simply use the default Manning's n ID.
if not bFound:
print("pyHMT2D: In find_cell_material_ID(cellID), cellID =", cellID, "is not found. Check mesh and material coverage. Default is used.")
return 0 #return the default (?)
#sys.exit()
[docs]
def save_as(self, new_srhmat_file_name):
"""Save as a new SRHMat file (useful for modification of the SRHMAT file)
Parameters
-------
new_srhmat_file_name : str
name of the new srhmat file
"""
if gVerbose: print("Wring the SRHMAT file %s \n" % new_srhmat_file_name)
try:
fid = open(new_srhmat_file_name, 'w')
except IOError:
print('srhmat file open error')
sys.exit()
fid.write('SRHMAT 30\n')
fid.write('NMaterials %d\n' % (self.numOfMaterials))
for matID, matName in self.matNameList.items():
fid.write('MatName %s %s\n' % (matID, matName))
element_counter = 0
for matID, cellList in self.matZoneCells.items():
#don't write out the default
if matID==-1:
continue
fid.write('Material %d' % (matID))
for cellI in cellList:
element_counter += 1
fid.write(' %d' % cellI)
if element_counter==10:
element_counter = 0
fid.write('\n')
fid.close()
[docs]
class SRH_2D_Data(HydraulicData):
"""
A class for SRH-2D data I/O, manipulation, and format conversion
This class is designed to read SRH-2D results in format other than VTK. It can
save SRH-2D results into VTK format for visualization in Paraview, parse
SRH-2D mesh information and convert/save to other formats, query SRH-2D
results (e.g., for post-processing or analysis), etc.
Between srhdryo and srhsif, only one of them is needed depending on the
type of the input file.
Attributes
----------
hdf_filename : str
The name for the HDF result file generated by HEC-RAS
srhhydro_obj : SRH_2D_SRHHydro
An object to hold information in the SRHHYDRO file
srhsif_obj : SRH_2D_SIF
An object to hold information in the SIF file
srhgeom_obj : SRH_2D_SRHGeom
An object to hold information in the SRHGEOM file
srhmat_obj : SRH_2D_SRHMat
An object to hold information in the SRHMAT file
Methods
-------
"""
def __init__(self, srhcontrol_filename):
"""Constructor for SRH_2D_Data
srhgeom_filename and srhmat_filename are contained in the SRHHydro or SRH-2D SIF file.
Parameters
----------
srhcontrol_filename : str
Name of the SRH-2D control file: either SRHHydro or SIF file
"""
HydraulicData.__init__(self, "SRH-2D")
#make sure the control file is either SRHHydro or SIF
if not srhcontrol_filename.endswith(".srhhydro") and \
not ("sif" in srhcontrol_filename.lower()): #contains "sif" or "SIF"
raise ValueError("SRH-2D control file must be either SRHHydro or SIF file")
#check if the control file is a SRHHydro or SIF file
self.control_type = "SRHHydro" if srhcontrol_filename.endswith(".srhhydro") else "SIF"
self.srhcontrol_filename = srhcontrol_filename
#extract path in srhcontrol_filename. We assume the SRHGeom and SRHMat files
#are located in the same directory as the srhcontrol_filename file, which can be
#either a srhhydro or sif file. In the control file,
#the file names for SRHGeom and SRHMat do not contain the directory.
file_path, _, = os.path.split(srhcontrol_filename)
#read SRH_2D_SRHHydro file and build SRH_2D_SRHHydro object
self.srhhydro_obj = None
self.srhsif_obj = None
if self.control_type == "SRHHydro":
self.srhhydro_obj = SRH_2D_SRHHydro(self.srhcontrol_filename)
else:
self.srhsif_obj = SRH_2D_SIF(self.srhcontrol_filename)
#get the srhgeom_filename and srhmat_filename from srhhydro_obj
self.srhgeom_filename = None
self.srhmat_filename = None #Note: it seems SRH-2D preprocess assumes the file name for srhmat is the same as srhgeom. For example, if srhgeom is "Muncie.srhgeom", then srhmat is "Muncie.srhmat".
if self.control_type == "SRHHydro":
self.srhgeom_filename = self.srhhydro_obj.get_grid_file_name() if len(file_path) == 0 \
else file_path+"/"+self.srhhydro_obj.get_grid_file_name()
self.srhmat_filename = self.srhhydro_obj.get_mat_file_name() if len(file_path) == 0 \
else file_path+"/"+self.srhhydro_obj.get_mat_file_name()
else:
self.srhgeom_filename = self.srhsif_obj.srhsif_content["mesh_file_name"]
self.srhmat_filename = self.srhsif_obj.srhsif_content["mesh_file_name"].replace(".srhgeom", ".srhmat")
if gVerbose:
print("self.srhgeom_filename = ", self.srhgeom_filename)
print("self.srhmat_filename = ", self.srhmat_filename)
#read and build SRH_2D_Geom, and SRH_2D_Mat objects
self.srhgeom_obj = None
self.srhmat_obj = None
if self.control_type == "SRHHydro":
self.srhgeom_obj = SRH_2D_SRHGeom(self.srhgeom_filename, self.srhhydro_obj.srhhydro_content["BC"])
self.srhmat_obj = SRH_2D_SRHMat(self.srhmat_filename)
else:
self.srhgeom_obj = SRH_2D_SRHGeom(self.srhgeom_filename, self.srhsif_obj.srhsif_content["BC"])
self.srhmat_obj = SRH_2D_SRHMat(self.srhmat_filename)
#Manning's n value at cell centers and nodes
self.ManningN_cell = np.zeros(self.srhgeom_obj.numOfElements, dtype=float)
self.ManningN_node = np.zeros(self.srhgeom_obj.numOfNodes, dtype=float)
#build Manning's n value at cell centers and nodes
self.buildManningN_cells_nodes()
#XMDF data: if desired, the result data in a XMDF file
#can be read in and stored in the following arrays.
#Depending on whether the XMDF is nodal or at cell centers, use different arrays.
self.xmdfTimeArray_Nodal = None #numpy array to store all time values
self.xmdfAllData_Nodal = {} #dict to store {varName: varValues (numpy array)}
self.xmdfTimeArray_Cell = None #numpy array to store all time values
self.xmdfAllData_Cell = {} #dict to store {varName: varValues (numpy array)}
[docs]
def get_case_name(self):
"""Get the "Case" name for this current case ("Case" in srhhyro file)
Returns
-------
case_name : str
Case name
"""
if self.control_type == "SRHHydro":
return self.srhhydro_obj.srhhydro_content["Case"]
else:
return self.srhsif_obj.srhsif_content["Case"]
[docs]
def buildManningN_cells_nodes(self):
""" Build Manning's n values at cell centers and nodes
This calculation is based on the srhhydro and srhgeom files, not interpolation from a GeoTiff file. This is the
Manning's n value used by SRH-2D.
Returns
-------
"""
if gVerbose: print("Building Manning's n values for cells and nodes in SRH-2D mesh ...")
if self.control_type == "SRHHydro":
#Manning's n dictionary in srhhydro
nDict = self.srhhydro_obj.srhhydro_content["ManningsN"]
elif self.control_type == "SIF":
nDict = self.srhsif_obj.srhsif_content["ManningN"]
if gVerbose: print("nDict = ", nDict)
#loop over all cells in the mesh
for cellI in range(self.srhgeom_obj.numOfElements):
#get the material ID of current cell
matID = self.srhmat_obj.find_cell_material_ID(cellI)
#print("matID = ", matID)
#print("nDict[matID] = ", nDict[matID])
#get the Manning's n value for current cell
self.ManningN_cell[cellI] = nDict[matID]
#interpolate Manning's n from cell centers to nodes
#loop over all nodes in the mesh
for nodeI in range(self.srhgeom_obj.numOfNodes):
n_temp = 0.0
#loop over all cells that share the current node
for i in range(self.srhgeom_obj.nodeElementsCount[nodeI]):
cellI = self.srhgeom_obj.nodeElementsList[nodeI][i]
n_temp += self.ManningN_cell[cellI]
#take the average
self.ManningN_node[nodeI] = n_temp/self.srhgeom_obj.nodeElementsCount[nodeI]
[docs]
def output_boundary_manning_n_profile(self, nodeStringID, nodeStringFileName, dir=''):
""" Output Manning's n profile for a specified boundary
Parameters
----------
nodeStringID : int
nodeString ID of specified boundary
nodeStringFileName : str
name of output file
dir : str, optional
directory for the output file
Returns
-------
"""
nodeStringFileName_final = ''
if len(dir) == 0:
nodeStringFileName_final = nodeStringFileName
else:
nodeStringFileName_final = dir + "/" + nodeStringFileName
#check whether the given nodeStringID is in the nodeStringDict
if nodeStringID not in self.srhgeom_obj.nodeStringsDict.keys():
print("The given nodeStringID", nodeStringID, "is not valid. Valid nodeString IDs are: ",
self.srhgeom_obj.nodeStringsDict.keys())
sys.exit()
try:
fid = open(nodeStringFileName_final, 'w')
except IOError:
print('Boundary\'s Manning n file open error')
sys.exit()
#write the header
fid.write("station, n\n")
# list of node ID (1-based) in current nodeString
nodeString_nodeList = self.srhgeom_obj.nodeStringsDict[nodeStringID]
station = [0] * len(nodeString_nodeList)
x = [0] * len(nodeString_nodeList)
y = [0] * len(nodeString_nodeList)
z = [0] * len(nodeString_nodeList)
#loop over all nodes in the list
for nodeI in range(len(nodeString_nodeList)):
x[nodeI] = self.srhgeom_obj.nodeCoordinates[nodeString_nodeList[nodeI] - 1, 0]
y[nodeI] = self.srhgeom_obj.nodeCoordinates[nodeString_nodeList[nodeI] - 1, 1]
z[nodeI] = self.srhgeom_obj.nodeCoordinates[nodeString_nodeList[nodeI] - 1, 2]
if nodeI!=0: #if not the first node in the list, calculate the station
station[nodeI] = station[nodeI-1] + np.sqrt( (x[nodeI]-x[nodeI-1])**2 + (y[nodeI]-y[nodeI-1])**2 )
#write the data
for nodeI in range(len(nodeString_nodeList)):
fid.write("%f, %f\n" % (station[nodeI], self.ManningN_node[nodeString_nodeList[nodeI] - 1])) #-1 because SRH-2D is 1-based
fid.close()
[docs]
def get_ManningN_dict(self):
""" Get the Manning's n dictionary from the SRH-2D control file
Returns
-------
"""
if self.control_type == "SRHHydro":
return self.srhhydro_obj.srhhydro_content["ManningsN"]
else:
return self.srhsif_obj.srhsif_content["ManningN"]
[docs]
def modify_ManningsNs(self, materialIDs, newManningsNValues, materialNames):
""" Modify the Manning's n value for the specified material IDs
Parameters
----------
materialIDs : list
list of material IDs
newManningsNValues : list
list of new Manning's n values
materialNames : list
list of material names
Returns
-------
"""
if gVerbose: print("Modifying Manning's n values for the specified material IDs ...")
if self.control_type == "SRHHydro":
self.srhhydro_obj.modify_ManningsN(materialIDs, newManningsNValues, materialNames)
else:
self.srhsif_obj.modify_ManningsN(materialIDs, newManningsNValues, materialNames)
#update the ManningN at cells and nodes after the modification
self.buildManningN_cells_nodes()
[docs]
def modify_InletQ(self, inlet_q_bc_IDs, new_inlet_q_values):
""" Modify the inlet flow rate for the specified boundary IDs
Parameters
----------
inlet_q_bc_IDs : list
list of inlet flow rate boundary IDs
new_inlet_q_values : list
list of new inlet flow rate values
Returns
-------
"""
if gVerbose: print("Modifying inlet flow rate for the specified boundary IDs ...")
if self.control_type == "SRHHydro":
self.srhhydro_obj.modify_InletQ(inlet_q_bc_IDs, new_inlet_q_values)
else:
self.srhsif_obj.modify_InletQ(inlet_q_bc_IDs, new_inlet_q_values)
[docs]
def modify_ExitH(self, exit_h_bc_IDs, new_exit_h_values):
""" Modify the exit water surface elevation for the specified boundary IDs
Parameters
----------
exit_h_bc_IDs : list
list of exit water surface elevation boundary IDs
new_exit_h_values : list
list of new exit water surface elevation values
Returns
-------
"""
if gVerbose: print("Modifying exit water surface elevation for the specified boundary IDs ...")
if self.control_type == "SRHHydro":
self.srhhydro_obj.modify_ExitH(exit_h_bc_IDs, new_exit_h_values)
else:
self.srhsif_obj.modify_ExitH(exit_h_bc_IDs, new_exit_h_values)
[docs]
def readSRHXMDFFile(self, xmdfFileName, bNodal):
""" Read SRH-2D result file in XMDF format. The XMDF file can be either nodal or cell center. It also contains results from multiple time steps.
Parameters
----------
xmdfFileName : str
file name for the XMDF data
bNodal : bool
whether it is nodal (True) or at cell center (False)
Returns
-------
"""
#for debug
#dumpXMDFFileItems(xmdfFileName)
if bNodal:
if "XMDFC" in xmdfFileName:
if gVerbose: print("Warning: bNodal indices the result is for nodal values. However, the XMDF file name contains "
"\"XMDFC\", which indicates it is for cell center values. Please double check.")
else:
if "XMDFC" not in xmdfFileName:
if gVerbose: print("Warning: bNodal indices the result is for cell centers. However, the XMDF file name "
"does not contain \"XMDFC\", which indicates it is for nodal values. Please double check.")
if gVerbose: print("Reading the XMDF file ...\n")
xmdfFile = h5py.File(xmdfFileName, "r")
#build the list of solution variables
varNameList = []
#copy of the original velocity vector variable name
varNameVelocity = ''
for ds in xmdfFile.keys():
#print(ds)
if ds != "File Type" and ds != "File Version":
if "Velocity" in ds:
varNameVelocity = '%s' % ds
vel_x = ds.replace("Velocity","Vel_X",1)
vel_y = ds.replace("Velocity","Vel_Y",1)
varNameList.append(vel_x)
varNameList.append(vel_y)
else:
varNameList.append(ds)
if not varNameList: #empty list
print("There is no solution varialbes in the XMDF file. Exiting ...")
sys.exit()
if gVerbose: print("Variables in the XMDF file: ", varNameList)
#build the 1d array of time for the solution
#It seems all solution variables share the same time (although
#each has their own recrod of "Time"). So only get one is sufficient.
if bNodal:
self.xmdfTimeArray_Nodal = np.array(xmdfFile[varNameList[0]]['Times'])
if gVerbose: print("Time values for the solutions: ", self.xmdfTimeArray_Nodal)
else:
self.xmdfTimeArray_Cell = np.array(xmdfFile[varNameList[0]]['Times'])
if gVerbose: print("Time values for the solutions: ", self.xmdfTimeArray_Cell)
#result for all varialbes and at all times in a dictionary (varName : numpy array)
for varName in varNameList:
if gVerbose: print("varName =", varName)
varName_temp = varName
if ("Vel_X" in varName) or ("Vel_Y" in varName): # if it is velocity components
varName = varNameVelocity # use the original velocity variable name
if bNodal: #for nodal data
if "Velocity" in varName:
if "Vel_X" in varName_temp: #vel-x
#print("varName_temp = ", varName_temp)
self.xmdfAllData_Nodal[varName_temp] = np.array(xmdfFile[varName]['Values'][:,:,0])
else:
#print("varName_temp = ", varName_temp)
self.xmdfAllData_Nodal[varName_temp] = np.array(xmdfFile[varName]['Values'][:,:,1])
else:
self.xmdfAllData_Nodal[varName] = np.array(xmdfFile[varName]['Values'])
#fixe the water elevation = -999 in SRH-2D
if varName == "Water_Elev_ft" or varName == "Water_Elev_m":
for nodeI in range(len(self.xmdfAllData_Nodal[varName])):
#loop over time steps
for timeI in range(self.xmdfAllData_Nodal[varName].shape[0]):
if self.xmdfAllData_Nodal[varName][timeI, nodeI] == -999:
self.xmdfAllData_Nodal[varName][timeI, nodeI] = self.srhgeom_obj.nodeCoordinates[nodeI, 2] #use node elevation
if np.array(xmdfFile[varName]['Values']).shape[1] != self.srhgeom_obj.numOfNodes:
print("The number of nodes in the XMDF file (%d) is different from that in the mesh (%d). "
"Abort readSRHXMDFFile(...) function."
% (np.array(xmdfFile[varName]['Values']).shape[1], self.srhgeom_obj.numOfNodes))
return
#print(self.xmdfAllData_Nodal)
else: #for cell center data
if "Velocity" in varName:
if "Vel_X" in varName_temp: #vel-x
#print("varName_temp, varName = ", varName_temp, varName)
self.xmdfAllData_Cell[varName_temp] = np.array(xmdfFile[varName]['Values'])[:,:,0]
else: #vel-y
#print("varName_temp = ", varName_temp)
self.xmdfAllData_Cell[varName_temp] = np.array(xmdfFile[varName]['Values'])[:,:,1]
else:
self.xmdfAllData_Cell[varName] = np.array(xmdfFile[varName]['Values'])
#fixe the water elevation = -999 in SRH-2D
if varName == "Water_Elev_ft" or varName == "Water_Elev_m":
#loop through time
for timeI in range(self.xmdfAllData_Cell[varName].shape[0]):
for cellI in range(self.xmdfAllData_Cell[varName].shape[1]):
if self.xmdfAllData_Cell[varName][timeI, cellI] == -999:
self.xmdfAllData_Cell[varName][timeI, cellI] = self.srhgeom_obj.elementBedElevation[cellI]
if np.array(xmdfFile[varName]['Values']).shape[1] != self.srhgeom_obj.numOfElements:
print("The number of elements in the XMDF file (%d) is different from that in the mesh (%d). "
"Abort readSRHXMDFFile(...) function."
% (np.array(xmdfFile[varName]['Values']).shape[1], self.srhgeom_obj.numOfElements))
#print(self.xmdfAllData_Cell)
xmdfFile.close()
[docs]
def outputXMDFDataToVTK(self, bNodal, timeStep=-1,lastTimeStep=False, dir=''):
"""Output XMDF result data to VTK
This has to be called after the XMDF data have been loaded by calling readSRHXMDFFile(...).
Parameters
----------
bNodal : bool
whether export nodal data or cell center data. Currently, it can't output both.
timeStep : int
optionly only the specified time step will be saved
lastTimeStep : bool
optionally specify only the last time step
dir : str, optional
directory to write to
Returns
-------
vtkFileNameList : list
a list of vtkFileName (for each time step)
"""
if gVerbose: print("Output all data in the XMDF file to VTK ...")
if (bNodal and (not self.xmdfAllData_Nodal)) or ((not bNodal) and (not self.xmdfAllData_Cell)):
print("Empty XMDF data arrays. Call readSRHXMDFFile() function first. Exiting ...")
sys.exit()
#check the sanity of timeStep
if (timeStep != -1):
if bNodal:
if (not timeStep in range(len(self.xmdfTimeArray_Nodal))):
message = "Specified timeStep = %d not in range (0 to %d)." % (timeStep, len(self.xmdfTimeArray_Nodal))
sys.exit(message)
else:
if (not timeStep in range(len(self.xmdfTimeArray_Cell))):
message = "Specified timeStep = %d not in range (0 to %d)." % (timeStep, len(self.xmdfTimeArray_Cell))
sys.exit(message)
vtkFileName_base = ''
#print("self.srhhydro_obj.srhhydro_content[\"Case\"] = ", self.srhhydro_obj.srhhydro_content["Case"])
if self.control_type == "SRHHydro":
if dir!='':
vtkFileName_base = dir + '/' + 'SRH2D_' + self.srhhydro_obj.srhhydro_content["Case"] #use the case name of the
# SRH-2D case
else:
vtkFileName_base = 'SRH2D_' + self.srhhydro_obj.srhhydro_content["Case"] # use the case name of the SRH-2D case
else:
if dir!='':
vtkFileName_base = dir + '/' + 'SRH2D_' + self.srhsif_obj.srhsif_content["Case"] #use the case name of the
# SRH-2D case
else:
vtkFileName_base = 'SRH2D_' + self.srhsif_obj.srhsif_content["Case"] # use the case name of the SRH-2D case
if gVerbose: print("vtkFileName_base = ", vtkFileName_base)
#build result variable names
resultVarNames = list(self.xmdfAllData_Nodal.keys()) if bNodal else list(self.xmdfAllData_Cell.keys())
#add Manning's n (at cell center regardless bNodal)
ManningNVarName = "ManningN"
if bNodal:
if gVerbose: print("All nodal solution variable names: ", resultVarNames[:-1], "and cell center ManningN")
else:
if gVerbose: print("All cell center solution variable names: ", resultVarNames)
#add bed elevation at nodes regardless whether bNodal is True or False. Nodal elevation is more accurate representation
#of the terrain because cell center elevation is averaged from nodal elevations.
#get the units of the results
if self.control_type == "SRHHydro":
units = self.srhhydro_obj.srhhydro_content['OutputUnit']
elif self.control_type == "SIF":
units = self.srhsif_obj.srhsif_content['OutputUnit']
else:
raise NotImplementedError("TODO: implement the outputXMDFDataToVTK for SIF file")
if gVerbose: print("SRH-2D result units:", units)
bedElevationVarName = ''
velocityVarName = ''
if units == "SI":
bedElevationVarName = "Bed_Elev_m"
velocityVarName = "Velocity_m_p_s"
else:
bedElevationVarName = "Bed_Elev_ft"
velocityVarName = "Velocity_ft_p_s"
if gVerbose: print("Nodal bed elevation name: ", bedElevationVarName)
#list of vtkFileName (to be returned to caller)
vtkFileNameList = []
#loop through each time step
timeArray = self.xmdfTimeArray_Nodal if bNodal else self.xmdfTimeArray_Cell
for timeI in range(timeArray.shape[0]):
if lastTimeStep:
if timeI < (timeArray.shape[0] - 1):
continue
if (timeStep != -1) and (timeI != timeStep):
continue
if gVerbose: print("timeI and time = ", timeI, timeArray[timeI])
#numpy array for all solution variables at one time
resultData = np.zeros((self.srhgeom_obj.numOfNodes, len(resultVarNames)), dtype="float32") if bNodal \
else np.zeros((self.srhgeom_obj.numOfElements, len(resultVarNames)), dtype="float32")
str_extra = "_N_" if bNodal else "_C_"
vtkFileName = vtkFileName_base + str_extra + str(timeI+1).zfill(4) + ".vtk"
if gVerbose: print("vtkFileName = ", vtkFileName)
#loop through each solution variable (except Bed_Elev and ManningN, which will be added seperately)
for varName, varI in zip(resultVarNames, range(len(resultVarNames))):
if gVerbose: print("varName = ", varName)
#get the values of current solution varialbe at current time
resultData[:,varI] = self.xmdfAllData_Nodal[varName][timeI,:] if bNodal \
else self.xmdfAllData_Cell[varName][timeI,:]
#add Mannng's n to cell center
#self.ManningN_cell
#add nodal bed elevation to VTK's point data
nodalElevation = self.srhgeom_obj.nodeCoordinates[:,2]
#build VTK object:
# points
pointsVTK = vtk.vtkPoints()
pointsVTK.SetData(VN.numpy_to_vtk(self.srhgeom_obj.nodeCoordinates))
# cell topology information list: [num. of nodes, node0, node1, .., num. of nodes, nodexxx]
# the list start with the number of nodes for a cell and then the list of node indexes
connectivity_list = []
# type of cells (contains the number of face points
cellFPCounts = np.zeros(self.srhgeom_obj.elementNodesList.shape[0], dtype=np.int64)
#loop over all elements
for k in range(self.srhgeom_obj.elementNodesList.shape[0]):
connectivity_list.append(self.srhgeom_obj.elementNodesCount[k])
for nodeI in range(self.srhgeom_obj.elementNodesCount[k]):
connectivity_list.append(self.srhgeom_obj.elementNodesList[k][nodeI]-1) #-1 becasue SRH-2D is 1-based.
cellFPCounts[k] = self.srhgeom_obj.elementNodesCount[k]
connectivity = np.array(connectivity_list, dtype=np.int64)
# convert cell's number of face points to VTK cell type
vtkHandler_obj = vtkHandler()
cell_types = vtkHandler_obj.number_of_nodes_to_vtk_celltypes(cellFPCounts)
cellsVTK = vtk.vtkCellArray()
cellsVTK.SetCells(self.srhgeom_obj.elementNodesList.shape[0], VN.numpy_to_vtkIdTypeArray(connectivity))
uGrid = vtk.vtkUnstructuredGrid()
uGrid.SetPoints(pointsVTK)
uGrid.SetCells(cell_types.tolist(), cellsVTK)
cell_data = uGrid.GetCellData() # This holds cell data
point_data = uGrid.GetPointData() # This holds point data
# add solutions
# column numbers for Vel_X and Vel_Y for vector assemble
nColVel_X = -1
nColVel_Y = -1
# First output all solution variables as scalars
if gVerbose: print('The following solution variables are processed and saved to VTK file: \n')
for k in range(len(resultVarNames)):
if gVerbose: print(' %s\n' % resultVarNames[k])
#if it is a veloctiy component, only record its location
#not output to VTK by itself, will be assembed to vector.
if resultVarNames[k].find('Vel_X') != -1:
nColVel_X = k
continue
elif resultVarNames[k].find('Vel_Y') != -1:
nColVel_Y = k
continue
if bNodal:
temp_point_data_array = VN.numpy_to_vtk(resultData[:,k])
temp_point_data_array.SetName(resultVarNames[k])
point_data.AddArray(temp_point_data_array)
else:
temp_cell_data_array = VN.numpy_to_vtk(resultData[:,k])
temp_cell_data_array.SetName(resultVarNames[k])
cell_data.AddArray(temp_cell_data_array)
#add veloctiy by combining components
currentTimePointV = np.zeros((self.srhgeom_obj.numOfNodes, 3)) if bNodal else \
np.zeros((self.srhgeom_obj.numOfElements,3))
currentTimePointV[:, 0] = resultData[:,nColVel_X]
currentTimePointV[:, 1] = resultData[:,nColVel_Y]
currentTimePointV[:, 2] = 0.0
if bNodal:
temp_point_data_array = VN.numpy_to_vtk(currentTimePointV)
temp_point_data_array.SetName(velocityVarName)
point_data.AddArray(temp_point_data_array)
else:
temp_cell_data_array = VN.numpy_to_vtk(currentTimePointV)
temp_cell_data_array.SetName(velocityVarName)
cell_data.AddArray(temp_cell_data_array)
#add Manning's n
if bNodal:
temp_point_data_array = VN.numpy_to_vtk(self.ManningN_node)
temp_point_data_array.SetName(ManningNVarName)
point_data.AddArray(temp_point_data_array)
else:
temp_cell_data_array = VN.numpy_to_vtk(self.ManningN_cell)
temp_cell_data_array.SetName(ManningNVarName)
cell_data.AddArray(temp_cell_data_array)
#add nodal bed elevation
currentBedElev = np.zeros(self.srhgeom_obj.numOfNodes) if bNodal else np.zeros(self.srhgeom_obj.numOfElements)
currentBedElev = self.srhgeom_obj.nodeCoordinates[:,2]
temp_point_data_array = VN.numpy_to_vtk(currentBedElev)
temp_point_data_array.SetName(bedElevationVarName)
point_data.AddArray(temp_point_data_array)
# write to vtk file
unstr_writer = vtk.vtkUnstructuredGridWriter() #ASCII
#unstr_writer = vtk.vtkXMLUnstructuredGridWriter() #Binary
unstr_writer.SetFileName(vtkFileName)
unstr_writer.SetInputData(uGrid)
unstr_writer.Write()
#add the vtkFileName to vtkFileNameList
vtkFileNameList.append(vtkFileName)
#vtkFileNameList
return vtkFileNameList
[docs]
def outputXMDFDataToPINNData(self, bNodal, PINN_normalization_specs, bBoundary=False, dir=''):
"""Output XMDF result data to PINN data files
- data_points.npy: rows of (x, y) coordinates of the points for steady state data (currently only support one time step)
- data_values.npy: rows of (h, u, v) values of the solution variables at the points in data_points.npy
- data_flags.npy: rows of flags for the points in data_points.npy (h_flag, u_flag, v_flag)
- all_data_points_stats.pny: statistics of the data points (min, max, mean, std, etc.)
This has to be called after the XMDF data have been loaded by calling readSRHXMDFFile(...).
Parameters
----------
bNodal : bool
whether export nodal data or cell center data. Currently, it can't output both.
PINN_normalization_specs (dict): The normalization specifications for the PINN data
bBoundary : bool
whether export boundary data
dir : str, optional
directory to write to
Returns
-------
None
"""
if gVerbose: print("Output all data in the XMDF file to PINN data ...")
if (bNodal and (not self.xmdfAllData_Nodal)) or ((not bNodal) and (not self.xmdfAllData_Cell)):
print("Empty XMDF data arrays. Call readSRHXMDFFile() function first. Exiting ...")
sys.exit()
#build result variable names
resultVarNames = list(self.xmdfAllData_Nodal.keys()) if bNodal else list(self.xmdfAllData_Cell.keys())
if bNodal:
if gVerbose: print("All nodal solution variable names: ", resultVarNames[:-1])
else:
if gVerbose: print("All cell center solution variable names: ", resultVarNames)
#add bed elevation at nodes regardless whether bNodal is True or False. Nodal elevation is more accurate representation
#of the terrain because cell center elevation is averaged from nodal elevations.
#get the units of the results
if self.control_type == "SRHHydro":
units = self.srhhydro_obj.srhhydro_content['OutputUnit']
elif self.control_type == "SIF":
units = self.srhsif_obj.srhsif_content['OutputUnit']
else:
raise NotImplementedError("TODO: implement the outputXMDFDataToVTK for SIF file")
if gVerbose: print("SRH-2D result units:", units)
velocityVarName = ''
if units == "SI":
velocityVarName = "Velocity_m_p_s"
else:
velocityVarName = "Velocity_ft_p_s"
if gVerbose: print("Nodal velocity name: ", velocityVarName)
#loop through each time step
timeArray = self.xmdfTimeArray_Nodal if bNodal else self.xmdfTimeArray_Cell
#get the last time step
timeI = timeArray.shape[0] - 1
#get the last time step data
resultData = np.zeros((self.srhgeom_obj.numOfNodes, len(resultVarNames)), dtype="float32") if bNodal \
else np.zeros((self.srhgeom_obj.numOfElements, len(resultVarNames)), dtype="float32")
if gVerbose: print("timeI and time = ", timeI, timeArray[timeI])
#numpy array for all solution variables at one time
resultData = np.zeros((self.srhgeom_obj.numOfNodes, len(resultVarNames)), dtype="float32") if bNodal \
else np.zeros((self.srhgeom_obj.numOfElements, len(resultVarNames)), dtype="float32")
#loop through each solution variable
for varName, varI in zip(resultVarNames, range(len(resultVarNames))):
if gVerbose: print("varName = ", varName)
#get the values of current solution varialbe at current time
resultData[:,varI] = self.xmdfAllData_Nodal[varName][timeI,:] if bNodal \
else self.xmdfAllData_Cell[varName][timeI,:]
# Get coordinates (x,y) for all points
if bNodal:
data_points = self.srhgeom_obj.nodeCoordinates[:, :2] # Only x,y coordinates
else:
data_points = self.srhgeom_obj.elementCenters[:, :2] # Only x,y coordinates
# Get solution variables (h, u, v)
# Find indices for water elevation and velocity components
water_depth_idx = -1
vel_x_idx = -1
vel_y_idx = -1
for i, var_name in enumerate(resultVarNames):
if "Water_Depth" in var_name:
water_depth_idx = i
elif "Vel_X" in var_name:
vel_x_idx = i
elif "Vel_Y" in var_name:
vel_y_idx = i
if water_depth_idx == -1 or vel_x_idx == -1 or vel_y_idx == -1:
print("Error: Could not find all required variables (water depth and velocity components)")
sys.exit()
# Extract h, u, v values
h_values = resultData[:, water_depth_idx]
u_values = resultData[:, vel_x_idx]
v_values = resultData[:, vel_y_idx]
#number of points so far (before we add wall boundary points)
num_points_excluding_walls = len(data_points)
# if bBoundary is True, we include points on the boundary (currenlty only the wall boundaries are supported)
if bBoundary:
#for wall boundary points
wall_node_IDs = []
x_values_wall = []
y_values_wall = []
h_values_wall = []
u_values_wall = []
v_values_wall = []
#loop through all boundaries.
for nodeString in self.srhgeom_obj.nodeStringsDict:
#not all NodeStrings are boundaries. Could be monitoring lines. Need to exclude them.
if nodeString not in self.srhgeom_obj.bcDict.keys():
continue
#we only include wall boundaries (for now)
if 'wall' in self.srhgeom_obj.bcDict[nodeString].lower():
#list of nodes in current nodeString
nodeString_nodeList = self.srhgeom_obj.nodeStringsDict[nodeString]
#loop through all edges in current boundary
for i in range(len(nodeString_nodeList)-1):
nodeID_1 = nodeString_nodeList[i]
nodeID_2 = nodeString_nodeList[i + 1]
# Get coordinates of the two nodes
node1_coords = self.srhgeom_obj.nodeCoordinates[nodeID_1-1] #SRH-2D node IDs are 1-based
node2_coords = self.srhgeom_obj.nodeCoordinates[nodeID_2-1] #SRH-2D node IDs are 1-based
if nodeID_1 not in wall_node_IDs:
wall_node_IDs.append(nodeID_1)
x_values_wall.append(node1_coords[0])
y_values_wall.append(node1_coords[1])
h_values_wall.append(0.0) #h does not matter for wall boundary because it is not used in training
u_values_wall.append(0.0) #no-slip condition
v_values_wall.append(0.0) #no-slip condition
if nodeID_2 not in wall_node_IDs:
wall_node_IDs.append(nodeID_2)
x_values_wall.append(node2_coords[0])
y_values_wall.append(node2_coords[1])
h_values_wall.append(0.0) #h does not matter for wall boundary because it is not used in training
u_values_wall.append(0.0) #no-slip condition
v_values_wall.append(0.0) #no-slip condition
print("Found %d wall boundary points" % len(wall_node_IDs))
#number of wall points
num_wall_points = len(wall_node_IDs)
#append the wall boundary points to the data_points
if len(x_values_wall) > 0:
# First stack x and y values into a 2D array
wall_points = np.column_stack((np.array(x_values_wall), np.array(y_values_wall)))
# Then concatenate with the original data_points
data_points = np.concatenate((data_points, wall_points))
h_values = np.concatenate((h_values, np.array(h_values_wall)))
u_values = np.concatenate((u_values, np.array(u_values_wall)))
v_values = np.concatenate((v_values, np.array(v_values_wall)))
# Compute U magnitude
Umag = np.sqrt(u_values**2 + v_values**2)
# Combine into data_values array
data_values = np.column_stack((h_values, u_values, v_values))
# Create flags array (1 for valid data, 0 for invalid)
# For now, we'll mark all points as valid (1)
data_flags = np.ones_like(data_values)
# the data_flags for h on walls should be set to 0 (we only need the no-slip condition on the walls)
if bBoundary:
data_flags[num_points_excluding_walls:num_points_excluding_walls + num_wall_points, 0] = 0
#compute the statistics of the data points
#min, max, mean, std, median, etc.
x_min = np.min(data_points[:, 0]) #stats of x, y, and t are computed here, but not used in training. The normalization should use the statistics of the pde/boundary points from mesh_points.json.
x_max = np.max(data_points[:, 0])
x_mean = np.mean(data_points[:, 0])
x_std = np.std(data_points[:, 0])
y_min = np.min(data_points[:, 1])
y_max = np.max(data_points[:, 1])
y_mean = np.mean(data_points[:, 1])
y_std = np.std(data_points[:, 1])
t_min = 0.0
t_max = 1.0
t_mean = 0.5
t_std = 1.0
h_min = np.min(data_values[:, 0])
h_max = np.max(data_values[:, 0])
h_mean = np.mean(data_values[:, 0])
h_std = np.std(data_values[:, 0])
u_min = np.min(data_values[:, 1])
u_max = np.max(data_values[:, 1])
u_mean = np.mean(data_values[:, 1])
u_std = np.std(data_values[:, 1])
v_min = np.min(data_values[:, 2])
v_max = np.max(data_values[:, 2])
v_mean = np.mean(data_values[:, 2])
v_std = np.std(data_values[:, 2])
Umag_min = np.min(Umag)
Umag_max = np.max(Umag)
Umag_mean = np.mean(Umag)
Umag_std = np.std(Umag)
all_PINN_stats = np.array([x_min, x_max, x_mean, x_std, y_min, y_max, y_mean, y_std, t_min, t_max, t_mean, t_std, h_min, h_max, h_mean, h_std, u_min, u_max, u_mean, u_std, v_min, v_max, v_mean, v_std, Umag_min, Umag_max, Umag_mean, Umag_std])
all_PINN_points_stats_dict = {
'x_min': x_min,
'x_max': x_max,
'x_mean': x_mean,
'x_std': x_std,
'y_min': y_min,
'y_max': y_max,
'y_mean': y_mean,
'y_std': y_std,
't_min': t_min,
't_max': t_max,
't_mean': t_mean,
't_std': t_std
}
all_PINN_data_stats_dict = {
#'x_min': x_min,
#'x_max': x_max,
#'x_mean': x_mean,
#'x_std': x_std,
#'y_min': y_min,
#'y_max': y_max,
#'y_mean': y_mean,
#'y_std': y_std,
#'t_min': t_min,
#'t_max': t_max,
#'t_mean': t_mean,
#'t_std': t_std,
'h_min': h_min,
'h_max': h_max,
'h_mean': h_mean,
'h_std': h_std,
'u_min': u_min,
'u_max': u_max,
'u_mean': u_mean,
'u_std': u_std,
'v_min': v_min,
'v_max': v_max,
'v_mean': v_mean,
'v_std': v_std,
'Umag_min': Umag_min,
'Umag_max': Umag_max,
'Umag_mean': Umag_mean,
'Umag_std': Umag_std
}
all_PINN_data_stats_dict_serializable = {
k: float(v) if isinstance(v, (np.floating, np.integer)) else v
for k, v in all_PINN_data_stats_dict.items()
}
print(f"x_min: {x_min}, x_max: {x_max}, x_mean: {x_mean}, x_std: {x_std}")
print(f"y_min: {y_min}, y_max: {y_max}, y_mean: {y_mean}, y_std: {y_std}")
print(f"t_min: {t_min}, t_max: {t_max}, t_mean: {t_mean}, t_std: {t_std}")
print(f"h_min: {h_min}, h_max: {h_max}, h_mean: {h_mean}, h_std: {h_std}")
print(f"u_min: {u_min}, u_max: {u_max}, u_mean: {u_mean}, u_std: {u_std}")
print(f"v_min: {v_min}, v_max: {v_max}, v_mean: {v_mean}, v_std: {v_std}")
print(f"Umag_min: {Umag_min}, Umag_max: {Umag_max}, Umag_mean: {Umag_mean}, Umag_std: {Umag_std}")
#normalize data_points and data_values using the PINN_normalization_specs
#currently only supports min-max normalization for x, y, and t and z-score normalization for h, u, v, and Umag
#check the normalization method to be min-max for x, y, and t and z-score for h, u, v, and Umag; if not report error and exit
if PINN_normalization_specs['x'] != 'min-max' or PINN_normalization_specs['y'] != 'min-max' or PINN_normalization_specs['t'] != 'min-max' or PINN_normalization_specs['h'] != 'z-score' or PINN_normalization_specs['u'] != 'z-score' or PINN_normalization_specs['v'] != 'z-score' or PINN_normalization_specs['Umag'] != 'z-score':
print("Error: Currently only supports min-max normalization for x, y, and t and z-score normalization for h, u, v, and Umag")
sys.exit()
#normalize data_points and data_values using the PINN_normalization_specs
#make a copy of data_points and data_values
data_points_normalized = data_points.copy()
data_values_normalized = data_values.copy()
#normalize x, y, and t (min-max normalization)
data_points_normalized[:, 0] = (data_points[:, 0] - x_min) / (x_max - x_min)
data_points_normalized[:, 1] = (data_points[:, 1] - y_min) / (y_max - y_min)
#currenlty only steady state is supported, so t is not used in training
#data_points_normalized[:, 2] = (data_points[:, 2] - t_min) / (t_max - t_min)
#normalize h, u, and v
data_values_normalized[:, 0] = (data_values[:, 0] - h_mean) / h_std
data_values_normalized[:, 1] = (data_values[:, 1] - u_mean) / u_std
data_values_normalized[:, 2] = (data_values[:, 2] - v_mean) / v_std
# Save files
if dir:
os.makedirs(dir, exist_ok=True)
prefix = os.path.join(dir, '/')
else:
os.makedirs('data/PINN', exist_ok=True)
prefix = 'data/PINN/'
#save the normalized data_points and data_values
np.save(f'{prefix}data_points.npy', data_points_normalized)
np.save(f'{prefix}data_values.npy', data_values_normalized)
np.save(f'{prefix}data_flags.npy', data_flags)
np.save(f'{prefix}all_PINN_stats.npy', all_PINN_stats)
#with open(f'{prefix}all_PINN_data_stats.json', 'w') as f:
# json.dump(all_PINN_data_stats_dict_serializable, f, indent=4)
print(f"Saved data points shape: {data_points.shape}")
print(f"Saved data values shape: {data_values.shape}")
print(f"Saved data flags shape: {data_flags.shape}")
if dir:
print(f"Files saved in: {dir}")
else:
print(f"Files saved in: data/PINN")
#print the last 10 data points
print(f"Last 10 data points: {data_points[-10:]}")
print(f"Last 10 data values: {data_values[-10:]}")
print(f"Last 10 data flags: {data_flags[-10:]}")
#save the data_points, data_values, and data_flags to a vtk file (for visualization)
vtkFileName = os.path.join(dir, 'data_points.vtk')
# Create points array with z=0 for 2D points
points = np.column_stack((data_points, np.zeros(len(data_points))))
# Create point data dictionary
point_data = {
'h': data_values[:, 0],
'u': data_values[:, 1],
'v': data_values[:, 2],
'h_flag': data_flags[:, 0],
'u_flag': data_flags[:, 1],
'v_flag': data_flags[:, 2]
}
# Create meshio mesh object with point cloud
mesh = meshio.Mesh(
points=points,
cells=[("vertex", np.arange(len(points)).reshape(-1, 1))], # Create vertex cells for each point
point_data=point_data
)
# Write to VTK file
mesh.write(vtkFileName)
if gVerbose:
print(f"Saved visualization data to: {vtkFileName}")
return all_PINN_points_stats_dict, all_PINN_data_stats_dict_serializable
[docs]
def readSRHCFiles(self, case_name):
"""Read SRH-2D results from SRHC files (cell-centered data)
Args:
case_name (str): Base filename without _SIF.dat extension
"""
# Get directory and base name
directory = os.path.dirname(self.srhsif_obj.srhsif_filename)
# Find all SRHC files
pattern = os.path.join(directory, f"{case_name}_SRHC*.dat")
srhc_files = sorted(glob.glob(pattern), key=self.natural_sort_key)
#print("srhc_files = ", srhc_files)
if gVerbose:
print(f"Found {len(srhc_files)} SRHC files matching pattern: {pattern}")
print(f"SRHC files: {srhc_files}")
if not srhc_files:
print(f"No SRHC files found matching pattern: {pattern}")
return
# Initialize data structures for cell-centered data
self.xmdfTimeArray_Cell = []
self.xmdfAllData_Cell = {}
# Read first file to get variable names
data = np.genfromtxt(srhc_files[0], delimiter=',', names=True, dtype=None, encoding='utf-8')
# Remove the empty field name caused by trailing comma
var_names = list(data.dtype.names)[:-1] # Skip the last empty field
# Initialize data arrays for each variable
for var_name in var_names[3:]: # 3: is for skipping Point_ID, X, and Y
self.xmdfAllData_Cell[var_name] = []
# Read each SRHC file (each represents a timestep)
for timestep, srhc_file in enumerate(srhc_files):
# Use genfromtxt to read the data
data = np.genfromtxt(srhc_file, delimiter=',', skip_header=1, filling_values=0.0)
# Remove the last column (empty due to trailing comma)
data = data[:, :-1]
# Store time (we'll use timestep number since actual time isn't in SRHC files)
self.xmdfTimeArray_Cell.append(float(timestep))
# Store data for each variable
for i, var_name in enumerate(var_names[3:], 3): # Skip Point_ID, X, and Y
values = data[:, i]
#debug
if gVerbose:
print("var_name = ", var_name)
print("values = ", values[0:10])
# Fix water elevation -999 values
if var_name == "Water_Elev_ft" or var_name == "Water_Elev_m":
values = np.where(values == -999,
self.srhgeom_obj.elementBedElevation,
values)
self.xmdfAllData_Cell[var_name].append(values)
# Convert lists to numpy arrays
self.xmdfTimeArray_Cell = np.array(self.xmdfTimeArray_Cell)
for var_name in self.xmdfAllData_Cell:
self.xmdfAllData_Cell[var_name] = np.array(self.xmdfAllData_Cell[var_name])
[docs]
def outputSRHCDataToVTK(self, timeStep=-1, lastTimeStep=False, dir=''):
"""Output SRHC data to VTK format
Args:
timeStep (int): Specific timestep to output (-1 for all timesteps)
lastTimeStep (bool): Only output last timestep if True
dir (str): Output directory
"""
# SRHC data is always at cell center
bNodal = False
# Use existing outputXMDFDataToVTK function since data structure is same
return self.outputXMDFDataToVTK(bNodal, timeStep, lastTimeStep, dir)
[docs]
def outputVTK(self, vtkFileName, resultVarNames, resultData, bNodal):
""" Output result to VTK file
The supplied resultVarNames and resultData should be compatible with the mesh. If resultVarNames is empty,
it only outputs the mesh with no data.
Parameters
----------
vtkFileName : str
name for the output vtk file
resultVarNames : str
result variable names
resultData : `numpy.ndarray`
2D array containing result data
bNodal : bool
whether the data is nodal (True) or at cell center (False)
Returns
-------
"""
if gVerbose: print("Output to VTK ...")
try:
fid = open(vtkFileName, 'w')
except IOError:
print('vtk file open error')
sys.exit()
#get the units of the results
if self.control_type == "SRHHydro":
units = self.srhhydro_obj.srhhydro_content['OutputUnit']
elif self.control_type == "SIF":
units = self.srhsif_obj.srhsif_content['OutputUnit']
else:
raise NotImplementedError("TODO: implement the outputVTK for SIF file")
if gVerbose: print("SRH-2D result units:", units)
fid.write('# vtk DataFile Version 3.0\n')
fid.write('Results from SRH-2D Modeling Run\n')
fid.write('ASCII\n')
fid.write('DATASET UNSTRUCTURED_GRID\n')
fid.write('\n')
# output points
fid.write('POINTS %d double\n' % self.srhgeom_obj.nodeCoordinates.shape[0])
point_id = 0 # point ID counter
for k in range(self.srhgeom_obj.nodeCoordinates.shape[0]):
point_id += 1
fid.write(" ".join(map(str, self.srhgeom_obj.nodeCoordinates[k])))
fid.write("\n")
# output elements
fid.write('CELLS %d %d \n' % (self.srhgeom_obj.elementNodesList.shape[0],
self.srhgeom_obj.elementNodesList.shape[0] + np.sum(self.srhgeom_obj.elementNodesCount)))
cell_id = 0 # cell ID counter
for k in range(self.srhgeom_obj.elementNodesList.shape[0]):
cell_id += 1
fid.write('%d ' % self.srhgeom_obj.elementNodesCount[k])
fid.write(" ".join(map(str, self.srhgeom_obj.elementNodesList[k][:self.srhgeom_obj.elementNodesCount[k]] - 1)))
fid.write("\n")
# output element types
fid.write('CELL_TYPES %d \n' % self.srhgeom_obj.elementNodesList.shape[0])
for k in range(self.srhgeom_obj.elementNodesList.shape[0]):
fid.write('%d ' % self.srhgeom_obj.vtkCellTypeCode[k])
if (((k + 1) % 20) == 0):
fid.write('\n')
fid.write('\n')
# How many solution data entries to output depending on whether it is nodal or not
entryCount = -1
if bNodal:
entryCount = self.srhgeom_obj.nodeCoordinates.shape[0]
else:
entryCount = self.srhgeom_obj.elementNodesList.shape[0]
# output solution variables: only if there is solution variable
if len(resultVarNames) != 0:
if not bNodal:
if gVerbose: print('Output variables are at cell centers. \n')
fid.write('CELL_DATA %d\n' % self.srhgeom_obj.elementNodesList.shape[0])
else:
if gVerbose: print('Output variables are at vertices. \n')
fid.write('POINT_DATA %d\n' % self.srhgeom_obj.nodeCoordinates.shape[0])
# column numbers for Vel_X and Vel_Y for vector assemble
nColVel_X = -1
nColVel_Y = -1
# First output all solution variables as scalars
#print('The following solution variables are processed: \n')
for k in range(len(resultVarNames)):
#print(' %s\n' % resultVarNames[k])
if resultVarNames[k].find('Vel_X') != -1:
nColVel_X = k
elif resultVarNames[k].find('Vel_Y') != -1:
nColVel_Y = k
fid.write('SCALARS %s double 1 \n' % resultVarNames[k])
fid.write('LOOKUP_TABLE default\n')
for entryI in range(entryCount):
fid.write('%f ' % resultData[entryI][k])
if (((entryI + 1) % 20) == 0):
fid.write('\n')
fid.write('\n \n')
# Then output Vel_X and Vel_Y as velocity vector (Vel_Z = 0.0)
# print('nColVel_X, nColVel_Y = %d, %d' % (nColVel_X, nColVel_Y))
if (nColVel_X != -1) and (nColVel_Y != -1):
if units == "SI":
fid.write('VECTORS Velocity_m_p_s double \n')
else:
fid.write('VECTORS Velocity_ft_p_s double \n')
for entryI in range(entryCount):
fid.write('%f %f 0.0 ' % (resultData[entryI][nColVel_X],
resultData[entryI][nColVel_Y]))
if (((entryI + 1) % 20) == 0):
fid.write('\n')
fid.write('\n \n')
fid.close()
[docs]
def output_2d_mesh_to_vtk(self, meshVTKFileName, bFlat=False, dir=''):
""" output the flat mesh to vtk
Parameters
----------
meshVTKFileName : str
file name for the mesh
bFlat : bool
whether to make the 2d mesh flat (node's z coordinate -> 0)
dir : str, optional
directory to write to
Returns
-------
"""
#just call srhgeom_obj's function
self.srhgeom_obj.output_2d_mesh_to_vtk(meshVTKFileName, bFlat, dir)
[docs]
def readOneSRHFile(self, srhFileName):
""" Read one single SRH-2D result file in SRHC (cell center) or SRH (point) format.
Note: SRH-2D outputs an extra "," to each line. As a result, Numpy's
genfromtext(...) function adds a column of "nan" to the end. Need to take care of this.
Parameters
----------
srhFileName : str
file name for the SRH result file
Returns
-------
"""
if gVerbose: print("Reading the SRH/SRHC result file ...")
data = np.genfromtxt(srhFileName, delimiter=',', names=True)
return data.dtype.names[:-1], data
[docs]
def readTECFile(self, tecFileName):
"""Read SRH-2D results in Tecplot format
Parameters
----------
tecFileName : str
Tecplot file name
Returns
-------
"""
readerTEC = vtk.vtkTecplotReader()
readerTEC.SetFileName(tecFileName)
# 'update' the reader i.e. read the Tecplot data file
readerTEC.Update()
polydata = readerTEC.GetOutput()
# print(polydata) #this is a multibock dataset
# print(polydata.GetBlock(0)) #we only need the first block
# If there are no points in 'vtkPolyData' something went wrong
if polydata.GetBlock(0).GetNumberOfPoints() == 0:
raise ValueError(
"No point data could be loaded from '" + tecFileName)
return None
#return of the first block (there should be only one block)
return polydata.GetBlock(0)
[docs]
def cell_center_to_vertex(self, cell_data, vertex_data):
"""Interpolate result from cell center to vertex
Not implemented yet.
Returns
-------
"""
pass
[docs]
def vertex_to_cell_center(self, vertex_data, cell_data):
"""Interpolate result from vertex to cell center
Not implemented yet.
Returns
-------
"""
pass
def natural_sort_key(self, s):
return [int(text) if text.isdigit() else text.lower()
for text in re.split(r'(\d+)', s)]