# -*- coding: utf-8 -*-
"""
Created on Sun Feb 28 09:17:08 2021
Convert HEC-RAS 2D mesh and data to SRH-2D. The generated SRHGEOM and SRHMAT files can be loaded in SMS.
However, current version of SMS (v13.1) can only handle mesh cell shapes of triangles and quadrilaterals.
In HEC-RAS 2D, there could be polyhedrons with more than four sides. It seems SMS will discard extra
sides and read the mesh wrong. Future versions of SMS may handle general polygons correctly.
Regardless, HEC-RAS 2D meshes can still be used with SRH-2D because the model itself can handle
polygons. We simply have to prepare all the input files outside of SMS. And that is the purpose
of this code.
The developement of this code used/modified some of the functions in HaD-to-Py (Tomkovic 2016), e.g.,
the reading of HEC-RAS's 2D result files using the HDF module.
References:
L. A. Tomkovic (2016), HaD-to-Py, https://github.com/latomkovic/HaD-to-Py (accessed Feb. 2021)
Author: Xiaofeng Liu, PhD, PE
Penn State University
"""
import sys
import numpy as np
import h5py
import vtk
from vtk.util import numpy_support as VN
from scipy import interpolate
import affine
import os.path
import copy
from pyHMT2D.Hydraulic_Models_Data import HydraulicData
from pyHMT2D.Misc import printProgressBar, vtkHandler, dumpXMDFFileItems
from pyHMT2D.__common__ import *
from .helpers import *
[docs]
class RAS_2D_Data(HydraulicData):
"""A class for HEC-RAS 2D data I/O, manipulation, and format conversion
This class is designed to read HEC-RAS results in HDF format. It can
save RAS2D results into VTK format for visualization in Paraview, parse
RAS2D mesh information and convert/save to SRH-2D format, query RAS2D
results (e.g., for post-processing or analysis), etc.
In HEC-RAS, one plan corresponds to one HDF file. So essentially, this class is designed to read one HDF file (one plan results).
Attributes
----------
hdf_filename : str
the name for the HDF result file generated by HEC-RAS.
plan_filename : str
the name for the HEC-RAS run plan file
plan_name : str
the name of the HEC-RAS run plan
plan_shortID : str
the short name of the run, called "Short Identifier" in HEC-RAS
project_filename : str
the name of the HEC-RAS project file
geometry_file_name : str
the name of the HEC-RAS geometry file
Methods
-------
"""
def __init__(self, hdf_filename, terrain_tiff_filename=None):
"""RAS_2D_Data class constructor
Parameters
----------
hdf_filename : str
name of the HDF file that contains HEC-RAS results
terrain_tiff_filename : str, optional
name of the terrain TIFF file that contains the terrain data. If not provided, the terrain TIFF file name will be extracted from the HDF file.
Note: It is not very clear how HEC-RAS organizes the terrain data in the HDF file regarding the terrain TIFF file names.
"""
HydraulicData.__init__(self, "HEC-RAS")
#check the existence of the HDF file
if not os.path.isfile(hdf_filename):
print("The HDF file", hdf_filename, "does not exist. Make sure to run HEC-RAS simulation to generate the plan results HDF file before calling this function. Exiting ...")
sys.exit()
self.hdf_filename = hdf_filename
self.plan_filename = None
self.plan_name = None
self.plan_shortID = None
self.project_filename = None
#geometry file name
self.geometry_file_name = None
#extract the plan information from the HDF file: plan_filename, plan_name, plan_shortID, project_filename
self.extract_plan_information_from_hdf_file()
#extract the geometry file name from the HDF file: geometry_file_name
self.extract_geometry_file_name_from_hdf_file()
if gVerbose:
print("geometry_file_name = ", self.geometry_file_name)
if terrain_tiff_filename is not None:
if gVerbose:
print("Using the provided terrain TIFF file: ", terrain_tiff_filename)
self.terrain_hdf_filename = None #not used
self.terrain_tiff_filename = terrain_tiff_filename
#check the existence of the terrain TIFF file
if not os.path.isfile(self.terrain_tiff_filename):
print("The terrain TIFF file", self.terrain_tiff_filename, "does not exist. Exiting ...")
sys.exit()
else:
if gVerbose:
print("Extracting the terrain TIFF file name from the HDF file")
#extract the terrain TIFF file name from the HDF file
self.terrain_hdf_filename, self.terrain_tiff_filename = extract_terrain_file_names(self.geometry_file_name)
#check the existence of the terrain TIFF file
if not os.path.isfile(self.terrain_tiff_filename):
print("The terrain TIFF file", self.terrain_tiff_filename, "does not exist. Exiting ...")
sys.exit()
#number of points for each face's profile (subgrid terrain)
#HEC-RAS's HDF file does not export face profile, although in RAS Mapper
#the face profile can be plotted and tabulated. It is not clear how these face
#profiles are created. Here, we use a uniformed sampled terrain points on each face line.
self.nFaceProfilePoints = 20
self.comp_interval = ''
self.outp_interval = ''
self.map_interval = ''
#get intervals
self.get_intervals()
self.version ='' #version of HEC-RAS that produced this HDF result file
self.units = '' #units used
self.short_ID = '' #short_ID of the plan
#extract version
self.extract_version()
#extract units
self.extract_units()
#start and end times
self.start_time = ''
self.end_time = ''
self.Dpart = ''
#extract start and end times
self.extract_start_end_time()
#get solution time series
self.solution_time = self.get2DAreaSolutionTimes()
self.solution_time_date = self.get2DAreaSolutionTimeDates()
self.TwoDAreaNames = []
#get 2D Area names
self.get2DAreaNames()
self.TwoDAreaCellCounts = []
#get 2D area cell count
self.get2DAreaCellCounts()
#get 2D area cell points
self.TwoDAreaCellPoints = self.get2DAreaCellPoints()
#get 2D area boundary points
#the content of the "External Faces" table in HDF, which
#includes "BC Line ID", "Face Index", "FP Start Index", "FP End Index",
#"Station Start", and "Station End"
self.TwoDAreaBoundaryPoints = self.get2DAreaBoundaryPoints()
#get 2D area boundary names, types, and other information
#e.g., TwoDAreaBoundaryNamesTypes[0] is list with info on ["Name", "SA-2D", "Type", "Length"]
self.TwoDAreaBoundaryNamesTypes = self.get2DAreaBoundaryNamesTypes()
#build boundary information for 2D flow areas
#totalBoundaries: total number of boundaries
#boundaryIDList: a list boundary IDs (0, 1, 2, etc.)
#boundaryNameList: a list of boundary names ("inet", "outlet", ...). Its order is the same as in boundaryIDList.
#boundaryTypeList: a list of boundary types ("External", "Internal", etc). Its order is the same as in boundaryIDList.
#boundary2DFlowAreaNameList: a list of 2D flow areas corresponding to each boundary. Its order is the same as in boundaryIDList.
#boundaryTotalPoints: a list for the total number of points in each boundary.
#boundaryPointList: list of points in each boundary
self.totalBoundaries, self.boundaryIDList, self.boundaryNameList, self.boundary2DFlowAreaNameList,\
self.boundaryTypeList, self.boundaryTotalPoints, self.boundaryPointList = self.build2DAreaBoundaries()
#get 2D area cell face points indexes
#self.TwoDAreaCellFacePointsIndexes = self.get2DAreaCellFacePointsIndexes()
#interpolator for elevation (bathymetry)
#self.bilinterp = self.build2DInterpolatorFromGeoTiff(self.terrain_tiff_filename)
#2D area face area elevation info and values
self.TwoDAreaFaceAreaElevationInfo = []
self.TwoDAreaFaceAreaElevationValues = []
#list to store face hydraulic information: a list of lists
self.TwoDAreaFaceHydraulicInformation = self.build2DAreaFaceHydraulicInformation()
#list to store face points' coordinates list: a list of lists
#e.g., TwoDAreaFacePointCoordinatesList[0] is a list of face point coordinates in 2D flow area 0
self.TwoDAreaFacePointCoordinatesList = []
self.build2DAreaFacePointCoordinatesList()
#list to store cell face list: a list of lists
#e.g., TwoDAreaCellFaceList[0][1] is a list of faces for cell 1 in 2D flow area 0
self.TwoDAreaCellFaceList = []
#build TwoDAreaCellFaceList
self.build2DAreaCellFaceList()
#land cover (Manning n)
#This can be retrived from HDF's entries: [Geometry][Land Cover Filename] and [Geometry][Land Cover Layername]
self.landcover_filename = b''
self.landcover_layername = b''
#Manning n zone dictionary: {ID: [name, Manning's n]}
self.ManningNZones = {}
self.build2DManningNZones()
#cell IDs in each Manning's n zones (for srhmat output)
self.cellsInManningZones = []
#2D area Manning's n at cell center: a list for all 2D areas
#e.g., TwoDAreaCellManningN[0][1] is the Manning's n value for cell 1 in 2D flow area 0
self.TwoDAreaCellManningN = []
#Set Manning's n for each cell. There are two options.
#Option 1: interpolte Manning's n from face to cell center: TwoDAreaCellManningN
# This option is not accurate. Better use option 2.
#self.interpolateManningN_face_to_cell()
#Option 2: set Manning's n for cells from HEC-RAS's Manning's n GeoTiff and HDF files
self.buildCellManningNFromGeoTiffHDF()
#Porosity and Flow Drag
self.porosity_and_flow_drag_filename = b''
self.porosity_and_flow_drag_layername = b''
#list to store face's two FacePoints
#e.g., TwoDAreaFace_FacePoints[0][1] is a list two FacePoint IDs for face 1 in 2D flow area 0
self.TwoDAreaFace_FacePoints = []
#build TwoDAreaFace_FacePoints
self.buildFace_FacePoints()
#list to store face profile (subgrid terrain). On each face, number of points is self.nFaceProfilePoints.
#e.g., TwoDAreaFaceProfile[0][1][0:nFaceProfilePoints,3] stores the profile for face 1 in 2D flow area 0
self.TwoDAreaFaceProfile = []
#build face profile
#self.build2DAreaFaceProfile()
#By default, HEC-RAS 2D outputs the following time series solution data in HDF file
# - Depth (in each cell, including the ghost cells)
# - Node X Vel (node velocity in x-direction)
# - Node Y Vel (node velocity in y-direction)
# - Water Surface (water surface elevation in each cell, including the ghost cells)
# - other solution variables at face centers (don't know how to deal with them yet)
# load solutions for 2D areas
self.TwoDAreaCellDepth = []
self.TwoDAreaCellWSE = []
self.TwoDAreaCellVx = []
self.TwoDAreaCellVy = []
self.TwoDAreaCellVz = []
self.TwoDAreaPointVx = []
self.TwoDAreaPointVy = []
self.TwoDAreaPointVz = []
self.load2DAreaSolutions()
[docs]
def get_intervals(self):
""" Get the computation, output, and map intervals
"""
#print(os.getcwd())
#check whether the plan file exists
if not os.path.isfile(self.plan_filename):
print("The HEC-RAS plan file", self.plan_filename, "does not exists. Exiting ...")
sys.exit()
comp_indicator = "Computation Interval="
outp_indicator = "Output Interval="
map_indicator = "Mapping Interval="
i=0
with open(self.plan_filename,'r') as plan_file:
for line in plan_file:
if comp_indicator in line:
self.comp_interval = line[line.index(comp_indicator) +
len(comp_indicator):].rpartition("\n")[0]
i += 1
if i==3:
break
elif outp_indicator in line:
self.outp_interval = line[line.index(outp_indicator) +
len(outp_indicator):].rpartition("\n")[0]
i += 1
if i==3:
break
elif map_indicator in line:
self.map_interval = line[line.index(map_indicator) +
len(map_indicator):].rpartition("\n")[0]
i += 1
if i==3:
break
[docs]
def get_time_minutes(self, interval):
""" pass the interval to this function to get the minute value
Parameters
----------
interval : str
internal in string format
Returns
-------
"""
interval_to_minutes = {'1MIN': 1, '2MIN':2,'3MIN':3,'4MIN':4,'5MIN':5,'10MIN':10,'15MIN':15,
'20MIN':20,'30MIN':30,'1HOUR':60}
try:
minutes = interval_to_minutes[interval]
except:
message = "You might need to modify the get_time_minutes() function so that there is a interval corresponding to your %s interval" %interval
sys.exit(message)
raise
return minutes
[docs]
def get2DAreaNames(self):
""" Get the names of 2D Flow Areas in the HEC-RAS Plan's geometry
Returns
-------
"""
hf = h5py.File(self.hdf_filename,'r')
self.TwoDAreaNames = hf['Geometry']['2D Flow Areas']['Attributes']['Name']
#hdf2DFlow = hf['Geometry']['2D Flow Areas']
#for key in hdf2DFlow:
# if key in ['Attributes', 'Cell Info', 'Cell Points', 'Manning\'s n',\
# 'Names', 'Polygon Info', 'Polygon Parts', 'Polygon Points',\
# 'Tolerances']:
# continue
# else:
# self.TwoDAreaNames.append(key) # List of 2D Area names
hf.close()
[docs]
def get2DAreaCellCounts(self):
""" Get 2D Flow Areas cell counts
Returns
-------
"""
hf = h5py.File(self.hdf_filename,'r')
self.TwoDAreaCellCounts = hf['Geometry']['2D Flow Areas']['Attributes']['Cell Count']
hf.close()
[docs]
def get2DAreaCellPoints(self):
""" Get 2D Flow Area cell points
Returns
-------
"""
hf = h5py.File(self.hdf_filename,'r')
hdf2DAreaCellPoints = np.array(hf['Geometry']['2D Flow Areas']['Cell Points'])
#print(hdf2DAreaCellPoints)
hf.close()
return hdf2DAreaCellPoints
[docs]
def get2DAreaBoundaryPoints(self):
""" Get 2D Flow Area boundary points (all points on boundaries)
Returns
-------
hdf2DAreaBoundaryPoints : numpy.ndarray
the content of the "External Faces" table in HDF, which
include "BC Line ID", "Face Index", "FP Start Index", "FP End Index", "Station Start", and "Station End"
"""
hf = h5py.File(self.hdf_filename,'r')
hdf2DAreaBoundaryPoints = np.array(hf['Geometry']['Boundary Condition Lines']['External Faces'])
#print(hdf2DAreaBoundaryPoints)
hf.close()
return hdf2DAreaBoundaryPoints
[docs]
def get2DAreaBoundaryNamesTypes(self):
""" Get 2D Flow Area boundary names, which 2D area it belongs to, type (External or Internal), and Length
Returns
-------
hdf2DAreaBoundaryNamesTypes : list
the content of the "Attributes" table in HDF, which includes "Name", "SA-2D", "Type", and "Length"
"""
hf = h5py.File(self.hdf_filename,'r')
hdf2DAreaBoundaryNamesTypes = hf['Geometry']['Boundary Condition Lines']['Attributes']
#print(type(hdf2DAreaBoundaryNamesTypes))
#print(hdf2DAreaBoundaryNamesTypes)
#print(hdf2DAreaBoundaryNamesTypes[0])
#temp list of lists: each is for one boundary [Name, SA-2D, Type, Length]
temp_2DAreaBoundaryNamesTypes = []
for areaI in range(len(hdf2DAreaBoundaryNamesTypes)):
temp_2DAreaBoundaryNamesTypes.append([hdf2DAreaBoundaryNamesTypes["Name"][areaI],
hdf2DAreaBoundaryNamesTypes["SA-2D"][areaI],
hdf2DAreaBoundaryNamesTypes["Type"][areaI],
hdf2DAreaBoundaryNamesTypes["Length"][areaI]
])
hf.close()
return temp_2DAreaBoundaryNamesTypes
[docs]
def build2DAreaBoundaries(self):
""" Build boundaries with their point list
Returns
-------
"""
if gVerbose: print("Building HEC-RAS 2D area boundaries ...")
maxNumBC = 10 #maximum number of boundaries (adjust according to the mesh)
maxPointsPerBC = 1000 #maximum number of points per boundary
boundaryIDList = np.zeros(maxNumBC,dtype=int) #list of boundary IDs
boundaryPointList = np.zeros((maxNumBC,maxPointsPerBC),dtype=int) #list of point IDs on each boundary
boundaryTotalPoints = np.zeros(maxNumBC,dtype=int) #total number of points in each boundary
#read the first row of BoundaryPoints
totalBoundaries = 0 #total number of boundaries
currentBoundary = 0 #current BC counter
pointCounterInCurrentBoundary = 0 #point counter in current boundary
boundaryIDList[currentBoundary] = self.TwoDAreaBoundaryPoints[0][0] #first BC Line ID
boundaryPointList[currentBoundary,0] = self.TwoDAreaBoundaryPoints[0][2] #first point
boundaryPointList[currentBoundary,1] = self.TwoDAreaBoundaryPoints[0][3] #second point
pointCounterInCurrentBoundary = 2
boundaryTotalPoints[currentBoundary] = 2
totalBoundaries = 1
#loop through row 2 to end of BoundaryPoints
for k in range(self.TwoDAreaBoundaryPoints.shape[0]-1):
if currentBoundary == self.TwoDAreaBoundaryPoints[k+1][0]: #still the old BC line
boundaryPointList[currentBoundary,pointCounterInCurrentBoundary] = self.TwoDAreaBoundaryPoints[k+1][3]
pointCounterInCurrentBoundary += 1
boundaryTotalPoints[currentBoundary] += 1
else: #a new BC line
totalBoundaries += 1
currentBoundary += 1
boundaryIDList[currentBoundary] = self.TwoDAreaBoundaryPoints[k+1][0] #BC Line ID
boundaryPointList[currentBoundary,0] = self.TwoDAreaBoundaryPoints[k+1][2] #first point
boundaryPointList[currentBoundary,1] = self.TwoDAreaBoundaryPoints[k+1][3] #second point
pointCounterInCurrentBoundary = 2
boundaryTotalPoints[currentBoundary] = 2
boundaryNameList = []
boundary2DFlowAreaNameList = []
boundaryTypeList = []
for k in range(len(self.TwoDAreaBoundaryNamesTypes)):
boundaryNameList.append(self.TwoDAreaBoundaryNamesTypes[k][0])
boundary2DFlowAreaNameList.append(self.TwoDAreaBoundaryNamesTypes[k][1])
boundaryTypeList.append(self.TwoDAreaBoundaryNamesTypes[k][2])
#print("Total number of boundaries: ", totalBoundaries)
#print("Boundary ID list: ",boundaryIDList)
#print("boundaryTotalPoints: ", boundaryTotalPoints)
#print("boundaryPointList: ", boundaryPointList)
return totalBoundaries, boundaryIDList, boundaryNameList, boundary2DFlowAreaNameList,\
boundaryTypeList, boundaryTotalPoints, boundaryPointList
[docs]
def build2DManningNZones(self):
""" Build 2D flow area's Manning n zones from the land cover (Manning's n) HDF file
Assume that there is "Land Cover Filename" and "Land Cover Layername" in the result HDF file.
If they don't exist (mostly for the case that the whole domain uses the default, constant
Manning's n value in ["Geometry"]["2D Flow Areas"][0][1]), just use that constant.
Returns
-------
"""
hf = h5py.File(self.hdf_filename, 'r')
#check whether 'Land Cover Filename' and 'Land Cover Layername' are in the attributes
if 'Land Cover Filename' not in hf['Geometry'].attrs.keys() or \
'Land Cover Layername' not in hf['Geometry'].attrs.keys():
self.ManningNZones = {} # clear the dictionary just in case
# take the default Manning's n value in row 0 and column 1.
self.ManningNZones[0] = [b'default', hf['Geometry']['2D Flow Areas']['Attributes'][0][1]]
hf.close()
return
self.landcover_filename = hf['Geometry'].attrs['Land Cover Filename']
self.landcover_layername = hf['Geometry'].attrs['Land Cover Layername']
if gVerbose: print("Land Cover Filename = ", self.landcover_filename)
if gVerbose: print("Land Cover Layername = ", self.landcover_layername)
#Some time HEC-RAS does not save land cover filename and layername to HDF because the
#geometry association of terrain or land cover (Manning's n) is removed after the 2D area geometry
#computation has been done.
if len(self.landcover_filename) == 0 or len(self.landcover_layername) == 0:
print("Land Cover Filename or Land Cover Layername in result HDF is empty. "
"Will use the default Manning's n value.")
self.ManningNZones = {} # clear the dictionary just in case
#take the default Manning's n value in row 0 and column 1.
self.ManningNZones[0] = [b'default', hf['Geometry']['2D Flow Areas']['Attributes'][0][1]]
else:
# read the Manning n zones (land cover zones)
self.ManningNZones = {} # clear the dictionary just in case
# get the path of the HDF file.
if len(os.path.dirname(self.hdf_filename))==0:
fileBase = b''
else:
fileBase = str.encode(os.path.dirname(self.hdf_filename) + '/')
#The way the land cover and Manning's n information stored in v5 and v6 is different.
if self.version == '5.0.7':
hfManningN = h5py.File(fileBase + self.landcover_layername + b'.hdf', 'r')
#hfManningN = h5py.File(fileBase + self.landcover_filename, 'r')
dset = hfManningN['IDs']
with dset.astype(np.uint8):
IDs = dset[:]
ManningN = np.array(hfManningN['ManningsN'])
Names = hfManningN['Names']
# print("IDs =", IDs)
# print("ManningN =", ManningN)
# print("Names =", Names)
for i in range(len(IDs)):
self.ManningNZones[IDs[i]] = [Names[i], ManningN[i]]
hfManningN.close()
elif self.version == '6.0.xxx': #not used any more; somehow new (>6) versions of HEC-RAS messed up
hfManningN = h5py.File(fileBase + self.landcover_layername + b'.hdf', 'r')
#hfManningN = h5py.File(fileBase + self.landcover_filename, 'r')
dset_raster_map = hfManningN['Raster Map']
dset_variables = hfManningN['Variables']
#with dset_raster_map['ID'].astype(np.uint8):
IDs = dset_raster_map['ID']
ManningN = np.array(dset_variables['ManningsN'])
Names = dset_variables['Name']
# print("IDs =", IDs)
# print("ManningN =", ManningN)
# print("Names =", Names)
for i in range(len(IDs)):
self.ManningNZones[IDs[i]] = [Names[i], ManningN[i]]
hfManningN.close()
elif self.version == '6.1.0' or self.version == '6.0.0' or self.version == '6.6':
#Note: In v6 and above, it seems HEC-RAS can handle different formats of Land Cover (Manning's n) HDF
# file for backward compatability. For example, if a HEC-RAS case was originally created by v5.0.7, the
# Land Cover HDF file has "IDs, ManningsN, and Names". But for cases created by v6 and later, the
# Land Cover HDF fiel has "Raster Map, Variables".
#check which hdf file exists: In HEC-RAS, depending on how the land cover (Manning's n) HDF file is created, the hdf file may in different directories.
if os.path.isfile(fileBase + self.landcover_layername + b'.hdf'):
hfManningN = h5py.File(fileBase + self.landcover_layername + b'.hdf', 'r')
elif os.path.isfile(fileBase + self.landcover_filename):
hfManningN = h5py.File(fileBase + self.landcover_filename, 'r')
else:
raise Exception("The land cover (Manning's n) HDF file " + fileBase + self.landcover_layername + b'.hdf' + " or " + fileBase + self.landcover_filename + " does not exist. Please check.")
if (("Raster Map" in hfManningN.keys()) and ("Variables" in hfManningN.keys())): #for new version >= 6
dset_raster_map = hfManningN['Raster Map']
dset_variables = hfManningN['Variables']
# with dset_raster_map['ID'].astype(np.uint8):
IDs = dset_raster_map['ID']
ManningN = np.array(dset_variables['ManningsN'])
Names = dset_variables['Name']
# print("IDs =", IDs)
# print("ManningN =", ManningN)
# print("Names =", Names)
for i in range(len(IDs)):
self.ManningNZones[IDs[i]] = [Names[i], ManningN[i]]
elif (("IDs" in hfManningN.keys()) and ("ManningsN" in hfManningN.keys()) and ("Names" in hfManningN.keys())):
dset = hfManningN['IDs']
# with dset.astype(np.uint8):
# IDs = dset[:]
IDs = dset.astype(np.uint8)[:]
ManningN = np.array(hfManningN['ManningsN'])
Names = hfManningN['Names']
# print("IDs =", IDs)
# print("ManningN =", ManningN)
# print("Names =", Names)
for i in range(len(IDs)):
self.ManningNZones[IDs[i]] = [Names[i], ManningN[i]]
else:
hfManningN.close()
hf.close()
raise Exception("The format of the land cover (Manning's n) layer in the HDF file is not supported. "
"Please contact the developer of pyHMT2D or raise an issue on GitHub.")
hf.close()
else:
hf.close()
raise Exception("The version of HEC-RAS that produced this HDF result file is not supported.")
if gVerbose: print("self.ManningNZones = ", self.ManningNZones)
hf.close()
[docs]
def change_ManningsN(self, materialIDs, newManningsNValues, materialNames):
""" Change materialID's Mannings n values to new values and save to file (for HEC-RAS v5, v6.1.0, and v6.6)
Parameters
----------
materialIDs
newManningsNValues
materialNames
Returns
-------
"""
#get land cover (Manning's n) file name and layer name
hf = h5py.File(self.hdf_filename, 'r')
self.landcover_filename = hf['Geometry'].attrs['Land Cover Filename']
self.landcover_layername = hf['Geometry'].attrs['Land Cover Layername']
hf.close()
if gVerbose: print("Land Cover Filename = ", self.landcover_filename)
if gVerbose: print("Land Cover Layername = ", self.landcover_layername)
#Some time HEC-RAS does not save land cover filename and layername to HDF because the
#geometry association of terrain or land cover (Manning's n) is removed after the 2D area geometry
#computation has been done.
if len(self.landcover_filename) == 0 or len(self.landcover_layername) == 0:
print("Land Cover Filename or Land Cover Layername in result HDF is empty. "
"Will use the default Manning's n value.")
raise Exception("Modification of default constant Manning's n has not been implemented.")
else:
# read the Manning n zones (land cover zones)
if os.path.dirname(self.hdf_filename) == '':
fileBase = b''
else:
fileBase = str.encode(os.path.dirname(self.hdf_filename) + '/')
#check which hdf file exists: In HEC-RAS, depending on how the land cover (Manning's n) HDF file is created, the hdf file may in different directories.
if os.path.isfile(fileBase + self.landcover_layername + b'.hdf'):
hfManningN = h5py.File(fileBase + self.landcover_layername + b'.hdf', 'r+')
elif os.path.isfile(fileBase + self.landcover_filename):
hfManningN = h5py.File(fileBase + self.landcover_filename, 'r+')
else:
raise Exception("The land cover (Manning's n) HDF file " + fileBase + self.landcover_layername + b'.hdf' + " or " + fileBase + self.landcover_filename + " does not exist. Please check.")
dset = hfManningN['IDs']
#with dset.astype(np.uint8):
# IDs = dset[:]
IDs = dset.astype(np.uint8)
IDs = IDs[:].tolist()
ManningN_dataset = hfManningN['ManningsN']
ManningN = np.array(ManningN_dataset)
Names = hfManningN['Names']
# make a copy of the original Manning's n values
ManningN_new = copy.deepcopy(ManningN)
# print("IDs =", IDs)
# print("ManningN =", ManningN)
# print("Names =", Names)
for i in range(len(materialIDs)):
materialID = materialIDs[i]
if materialID in IDs:
# also check whether the name is consistent
if materialNames[i] == Names[IDs.index(materialID)].decode("ASCII"):
if gVerbose: print(" Old Manning's n value =", ManningN[IDs.index(materialID)],
"for material ID = ", materialID)
ManningN_new[IDs.index(materialID)] = newManningsNValues[i]
if gVerbose: print(" New Manning's n value =", ManningN_new[IDs.index(materialID)],
"for material ID = ", materialID)
else:
raise Exception(
"The materialI and material name are not consistent. Please make sure they are consistent with HEC-RAS case."
"You can check the content of the Manning's n HDF file with HDFViewer.")
else:
raise Exception(
"The specified materialID %d is not in the Manning's n list. Please check." % materialID)
ManningN_dataset[...] = ManningN_new # assign new values to data
# save and close the HDF file
hfManningN.close()
# need to re-build 2D Manning's n zones information after update
self.build2DManningNZones()
[docs]
def change_ManningsN_v60(self, materialIDs, newManningsNValues, materialNames):
""" Change materialID's Mannings n values to new values and save to file (for HEC-RAS v6.0.0)
Parameters
----------
materialIDs
newManningsNValues
materialNames
Returns
-------
"""
# get land cover (Manning's n) file name and layer name
hf = h5py.File(self.hdf_filename, 'r')
self.landcover_filename = hf['Geometry'].attrs['Land Cover Filename']
self.landcover_layername = hf['Geometry'].attrs['Land Cover Layername']
hf.close()
if gVerbose: print("Land Cover Filename = ", self.landcover_filename)
if gVerbose: print("Land Cover Layername = ", self.landcover_layername)
# Some time HEC-RAS does not save land cover filename and layername to HDF because the
# geometry association of terrain or land cover (Manning's n) is removed after the 2D area geometry
# computation has been done.
if len(self.landcover_filename) == 0 or len(self.landcover_layername) == 0:
print("Land Cover Filename or Land Cover Layername in result HDF is empty. "
"Will use the default Manning's n value.")
raise Exception("Modification of default constant Manning's n has not been implemented.")
else:
# read the Manning n zones (land cover zones)
if os.path.dirname(self.hdf_filename) == '':
fileBase = b''
else:
fileBase = str.encode(os.path.dirname(self.hdf_filename) + '/')
#check which hdf file exists: In HEC-RAS, depending on how the land cover (Manning's n) HDF file is created, the hdf file may in different directories.
if os.path.isfile(fileBase + self.landcover_layername + b'.hdf'):
hfManningN = h5py.File(fileBase + self.landcover_layername + b'.hdf', 'r+')
elif os.path.isfile(fileBase + self.landcover_filename):
hfManningN = h5py.File(fileBase + self.landcover_filename, 'r+')
else:
raise Exception("The land cover (Manning's n) HDF file " + fileBase + self.landcover_layername + b'.hdf' + " or " + fileBase + self.landcover_filename + " does not exist. Please check.")
dset_raster_map = hfManningN['Raster Map']
dset_variables = hfManningN['Variables'] #This dataset needs to be modified because ManningsN is in it.
# with dset_raster_map['ID'].astype(np.uint8):
IDs = dset_raster_map['ID']
IDs = IDs.tolist()
ManningN = np.array(dset_variables['ManningsN'])
Names = dset_variables['Name']
# make a copy of the original Manning's n values
#ManningN_new = copy.deepcopy(ManningN)
dset_variables_new = copy.deepcopy(dset_variables)
# print("IDs =", IDs)
# print("ManningN =", ManningN)
# print("Names =", Names)
for i in range(len(materialIDs)):
materialID = materialIDs[i]
if materialID in IDs:
# also check whether the name is consistent
if materialNames[i] == Names[IDs.index(materialID)].decode("ASCII"):
if gVerbose: print(" Old Manning's n value =", dset_variables['ManningsN'][IDs.index(materialID)],
"for material ID = ", materialID)
dset_raster_map['ManningsN'][IDs.index(materialID)] = newManningsNValues[i]
if gVerbose: print(" New Manning's n value =", dset_variables_new['ManningsN'][IDs.index(materialID)],
"for material ID = ", materialID)
else:
raise Exception(
"The materialI and material name are not consistent. Please make sure they are consistent with HEC-RAS case."
"You can check the content of the Manning's n HDF file with HDFViewer.")
else:
raise Exception(
"The specified materialID %d is not in the Manning's n list. Please check." % materialID)
dset_variables[...] = dset_variables_new # assign new values to data
# save and close the HDF file
hfManningN.close()
# need to re-build 2D Manning's n zones information after update
self.build2DManningNZones()
[docs]
def change_ManningsN_v66(self, materialIDs, newManningsNValues, materialNames):
""" Change materialID's Mannings n values to new values and save to file (for HEC-RAS v6.6)
Parameters
----------
materialIDs
newManningsNValues
materialNames
Returns
-------
"""
# get land cover (Manning's n) file name and layer name
hf = h5py.File(self.hdf_filename, 'r')
self.landcover_filename = hf['Geometry'].attrs['Land Cover Filename']
self.landcover_layername = hf['Geometry'].attrs['Land Cover Layername']
hf.close()
if gVerbose: print("Land Cover Filename = ", self.landcover_filename)
if gVerbose: print("Land Cover Layername = ", self.landcover_layername)
# Some time HEC-RAS does not save land cover filename and layername to HDF because the
# geometry association of terrain or land cover (Manning's n) is removed after the 2D area geometry
# computation has been done.
if len(self.landcover_filename) == 0 or len(self.landcover_layername) == 0:
print("Land Cover Filename or Land Cover Layername in result HDF is empty. "
"Will use the default Manning's n value.")
raise Exception("Modification of default constant Manning's n has not been implemented.")
else:
# read the Manning n zones (land cover zones)
if os.path.dirname(self.hdf_filename) == '':
fileBase = b''
else:
fileBase = str.encode(os.path.dirname(self.hdf_filename) + '/')
#check which hdf file exists: In HEC-RAS, depending on how the land cover (Manning's n) HDF file is created, the hdf file may in different directories.
if os.path.isfile(fileBase + self.landcover_layername + b'.hdf'):
hfManningN = h5py.File(fileBase + self.landcover_layername + b'.hdf', 'r+')
elif os.path.isfile(fileBase + self.landcover_filename):
hfManningN = h5py.File(fileBase + self.landcover_filename, 'r+')
else:
raise Exception("The land cover (Manning's n) HDF file " + fileBase + self.landcover_layername + b'.hdf' + " or " + fileBase + self.landcover_filename + " does not exist. Please check.")
dset_raster_map = hfManningN['Raster Map']
dset_variables = hfManningN['Variables'] #This dataset needs to be modified because ManningsN is in it.
if gVerbose:
print("Dataset shape:", dset_variables['ManningsN'].shape)
print("Dataset dtype:", dset_variables['ManningsN'].dtype)
# with dset_raster_map['ID'].astype(np.uint8):
IDs = dset_raster_map['ID']
IDs = IDs.tolist()
ManningN = np.array(dset_variables['ManningsN'])
Names = dset_variables['Name']
# make a copy of the original Manning's n values
#ManningN_new = copy.deepcopy(ManningN)
#dset_variables_new = copy.deepcopy(dset_variables)
if gVerbose:
print("IDs =", IDs)
print("ManningN =", ManningN)
print("Names =", Names)
print("newManningsNValues =", newManningsNValues)
print("materialNames =", materialNames)
for i in range(len(materialIDs)):
materialID = materialIDs[i]
if materialID in IDs:
# also check whether the name is consistent
if materialNames[i] == Names[IDs.index(materialID)].decode("ASCII"):
if gVerbose: print(" Old Manning's n value =", dset_variables['ManningsN'][IDs.index(materialID)],
"for material ID = ", materialID)
#print("IDs.index(materialID) = ", IDs.index(materialID))
#update the ManningsN value in the dset_variables dataset
#ManningN[IDs.index(materialID)] = newManningsNValues[i]
#dset_variables['ManningsN'][IDs.index(materialID)] = newManningsNValues[i]
#This is the correct way to update hdf5 data
row_index = IDs.index(materialID)
row = dset_variables[row_index] # Fetch full structured row
row["ManningsN"] = newManningsNValues[i] # Modify the field
dset_variables[row_index] = row # Write row back into dataset
if gVerbose: print(" New Manning's n value =", dset_variables['ManningsN'][IDs.index(materialID)],
"for material ID = ", materialID)
else:
print("The materialI and material name are not consistent. Please make sure they are consistent with HEC-RAS case.")
print("materialID = ", materialID)
print("materialNames[i] = ", materialNames[i])
print("Names[IDs.index(materialID)] = ", Names[IDs.index(materialID)])
raise Exception(
"The materialI and material name are not consistent. Please make sure they are consistent with HEC-RAS case."
"You can check the content of the Manning's n HDF file with HDFViewer.")
else:
raise Exception(
"The specified materialID %d is not in the Manning's n list. Please check." % materialID)
#dset_variables['ManningsN'][:] = ManningN # assign new values to data
# save and close the HDF file
hfManningN.close()
# need to re-build 2D Manning's n zones information after update
self.build2DManningNZones()
[docs]
def modify_ManningsN(self, materialIDs, newManningsNValues, materialNames):
"""Modify materialID's Manning's n values to new values
Assume that there is "Land Cover Filename" and "Land Cover Layername" in the result HDF file.
If they don't exist (mostly for the case that the whole domain uses the default, constant
Manning's n value in ["Geometry"]["2D Flow Areas"][0][1]), just use newManningsNValues[0].
Parameters
----------
materialIDs : list
material ID list (0-based)
newManningsNValues : list
new Manning's n values in a list
materialNames : list
names of the material in a list (0-based)
Returns
-------
"""
if gVerbose: print("Modify Manning's n value ...")
if not isinstance(materialIDs[0], int):
print("Material ID has to be an integer. The type of materialID passed in is ", type(materialIDs[0]),
". Exit.\n")
if not isinstance(newManningsNValues[0], float):
print("Manning's n has to be a float. The type of newManningsNValue passed in is ", type(newManningsNValues[0]),
". Exit.\n")
if self.version == '5.0.7' or self.version == '6.1.0' or self.version == '6.6':
self.change_ManningsN(materialIDs, newManningsNValues, materialNames)
elif self.version == '6.0.0': #v6.0.0 is special
self.change_ManningsN_v60(materialIDs, newManningsNValues, materialNames)
#elif self.version == '6.6': #v6.6 is special
# self.change_ManningsN_v66(materialIDs, newManningsNValues, materialNames)
if gVerbose: print("Finished modifying Manning's n value ...")
[docs]
def modify_PorosityDrag_b_Coefficient(self, materialIDs, newPorosityDrag_b_Coefficients, materialNames):
"""Modify materialID's PorosityDrag_b_Coefficient values to new values
Assume that there is "Porosity and Flow Drag Filename" and "Porosity and Flow Drag Layername" in the result HDF file.
Parameters
----------
materialIDs : list
material ID list
newPorosityDrag_b_Coefficients : list
new PorosityDrag_b_Coefficient values in a list
materialNames : list
names of the material in a list
Returns
-------
"""
if gVerbose: print("Modify PorosityDrag_b_Coefficient value ...")
if not isinstance(materialIDs[0], int):
print("Material ID has to be an integer. The type of materialID passed in is ", type(materialIDs[0]),
". Exit.\n")
if not isinstance(newPorosityDrag_b_Coefficients[0], float):
print("PorosityDrag_b_Coefficient has to be a float. The type of newPorosityDrag_b_CoefficientValue passed in is ", type(newPorosityDrag_b_Coefficients[0]),
". Exit.\n")
if self.version == '6.6':
self.change_PorosityDrag_b_Coefficient_v66(materialIDs, newPorosityDrag_b_Coefficients, materialNames)
else:
raise Exception("PorosityDrag_b_Coefficient modification is not supported for HEC-RAS version ", self.version)
if gVerbose: print("Finished modifying PorosityDrag_b_Coefficient value ...")
[docs]
def change_PorosityDrag_b_Coefficient_v66(self, materialIDs, newPorosityDrag_b_Coefficients, materialNames):
""" Change materialID's PorosityDrag_b_Coefficient values to new values and save to file (for HEC-RAS v6.6)
Parameters
----------
materialIDs
newManningsNValues
materialNames
Returns
-------
"""
# get Porosity and Flow Drag file name and layer name
hf = h5py.File(self.hdf_filename, 'r')
self.porosity_and_flow_drag_filename = hf['Geometry'].attrs['Porosity and Flow Drag Filename']
self.porosity_and_flow_drag_layername = hf['Geometry'].attrs['Porosity and Flow Drag Layername']
hf.close()
if gVerbose: print("Porosity and Flow Drag Filename = ", self.porosity_and_flow_drag_filename)
if gVerbose: print("Porosity and Flow Drag Layername = ", self.porosity_and_flow_drag_layername)
# Some time HEC-RAS does not save Porosity and Flow Drag filename and layername to HDF because the
# geometry association of Porosity and Flow Drag is removed after the 2D area geometry
# computation has been done.
if len(self.porosity_and_flow_drag_filename) == 0 or len(self.porosity_and_flow_drag_layername) == 0:
print("Porosity and Flow Drag Filename or Porosity and Flow Drag Layername in result HDF is empty. ")
raise Exception("Modification of PorosityDrag_b_Coefficient cannot proceed.")
else:
# read the Porosity and Flow Drag zones
if os.path.dirname(self.hdf_filename) == '':
fileBase = b''
else:
fileBase = str.encode(os.path.dirname(self.hdf_filename) + '/')
#check which hdf file exists: In HEC-RAS, depending on how the Porosity and Flow Drag HDF file is created, the hdf file may in different directories.
if os.path.isfile(fileBase + self.porosity_and_flow_drag_layername + b'.hdf'):
hfPorosityDrag = h5py.File(fileBase + self.porosity_and_flow_drag_layername + b'.hdf', 'r+')
elif os.path.isfile(fileBase + self.porosity_and_flow_drag_filename):
hfPorosityDrag = h5py.File(fileBase + self.porosity_and_flow_drag_filename, 'r+')
else:
raise Exception("The Porosity and Flow Drag HDF file " + fileBase + self.porosity_and_flow_drag_layername + b'.hdf' + " or " + fileBase + self.porosity_and_flow_drag_filename + " does not exist. Please check.")
dset_raster_map = hfPorosityDrag['Raster Map']
dset_variables = hfPorosityDrag['Variables'] #This dataset needs to be modified because PorosityDrag_b_Coefficient is in it.
if gVerbose:
print("Dataset shape:", dset_variables['Quadratic Drag Coefficient'].shape)
print("Dataset dtype:", dset_variables['Quadratic Drag Coefficient'].dtype)
# with dset_raster_map['ID'].astype(np.uint8):
IDs = dset_raster_map['ID']
IDs = IDs.tolist()
PorosityDrag_b_Coefficient = np.array(dset_variables['Quadratic Drag Coefficient'])
Names = dset_variables['Name']
if gVerbose:
print("IDs =", IDs)
print("PorosityDrag_b_Coefficient =", PorosityDrag_b_Coefficient)
print("Names =", Names)
print("newPorosityDrag_b_Coefficients =", newPorosityDrag_b_Coefficients)
print("materialNames =", materialNames)
for i in range(len(materialIDs)):
materialID = materialIDs[i]
if materialID in IDs:
# also check whether the name is consistent
if materialNames[i] == Names[IDs.index(materialID)].decode("ASCII"):
if gVerbose: print(" Old PorosityDrag_b_Coefficient value =", dset_variables['Quadratic Drag Coefficient'][IDs.index(materialID)],
"for material ID = ", materialID)
#print("IDs.index(materialID) = ", IDs.index(materialID))
#update the PorosityDrag_b_Coefficient value in the dset_variables dataset
#PorosityDrag_b_Coefficient[IDs.index(materialID)] = newPorosityDrag_b_Coefficients[i]
#dset_variables['Quadratic Drag Coefficient'][IDs.index(materialID)] = newPorosityDrag_b_Coefficients[i]
#This is the correct way to update hdf5 data
row_index = IDs.index(materialID)
row = dset_variables[row_index] # Fetch full structured row
row["Quadratic Drag Coefficient"] = newPorosityDrag_b_Coefficients[i] # Modify the field
dset_variables[row_index] = row # Write row back into dataset
if gVerbose: print(" New PorosityDrag_b_Coefficient value =", dset_variables['Quadratic Drag Coefficient'][IDs.index(materialID)],
"for material ID = ", materialID)
else:
print("The materialI and material name are not consistent. Please make sure they are consistent with HEC-RAS case.")
print("materialID = ", materialID)
print("materialNames[i] = ", materialNames[i])
print("Names[IDs.index(materialID)] = ", Names[IDs.index(materialID)])
raise Exception(
"The materialI and material name are not consistent. Please make sure they are consistent with HEC-RAS case."
"You can check the content of the Porosity and Flow Drag HDF file with HDFViewer.")
else:
raise Exception(
"The specified materialID %d is not in the Porosity and Flow Drag list. Please check." % materialID)
#dset_variables['Quadratic Drag Coefficient'][:] = PorosityDrag_b_Coefficient # assign new values to data
# save and close the HDF file
hfPorosityDrag.close()
# need to re-build 2D Porosity and Flow Drag zones information after update (not neede? If the hdf file is updated, HEC-RAS will recompute geometric information.)
#self.build2DPorosityDragZones()
[docs]
def build2DInterpolatorFromGeoTiff(self, geoTiffFileName):
""" Build 2D interpolator for from geoTiff file, e.g., terrain and Manning n ID, using Python interpolate.interp2d
Assume the data is in band 1.
This method is too slow (use with caution if the case is large).
Attributes
-------
geoTiffFileName : str
name for the geoTiff file
Returns
-------
"""
if gVerbose: print('Building 2D interpolator from GeoTiff file ...')
try:
from osgeo import gdal
except ImportError:
raise ImportError('Error in importing GDAL package. Make sure GDAL has been installed properly.')
# Read raster
source = gdal.Open(geoTiffFileName,gdal.GA_ReadOnly)
#print(source)
# Read the raster band as separate variable
#band = source.GetRasterBand(1)
# Print only selected metadata:
#print ("[ NO DATA VALUE ] = ", band.GetNoDataValue()) # none
#print ("[ MIN ] = ", band.GetMinimum())
#print ("[ MAX ] = ", band.GetMaximum())
nx, ny = source.RasterXSize, source.RasterYSize
gt = source.GetGeoTransform()
band_array = source.GetRasterBand(1).ReadAsArray().astype(np.float64)
# Compute mid-point grid spacings
ax = np.array([gt[0] + ix*gt[1] + gt[1]/2.0 for ix in range(nx)])
ay = np.array([gt[3] + iy*gt[5] + gt[5]/2.0 for iy in range(ny)])
bilinterp = interpolate.interp2d(ax, ay, band_array, kind='linear')
#close the GDAL file
source = None
return bilinterp
[docs]
def interpolatorFromGeoTiff_rasterio(self, geoTiffFileName, pointList, dataType=np.float64):
"""Interpolate from a GeoTIFF file using rasterio (preferred on Windows).
Falls back to :meth:`interpolatorFromGeoTiff` (GDAL-based) if rasterio
is not available or if opening the file fails.
Parameters
----------
geoTiffFileName: str
name of the GeoTiff file
pointList: list
list of points (x,y)
dataType : int, optional
data type in the GeoTiff (default is numpy.float)
Returns
-------
"""
try:
import rasterio
except ImportError:
# rasterio not available; use the existing GDAL-based implementation
return self.interpolatorFromGeoTiff(geoTiffFileName, pointList, dataType=dataType)
# rasterio expects a string path, but some HEC-RAS metadata
# are stored as bytes (e.g., b'.\\ManningNFromZonePolygons.tif').
if isinstance(geoTiffFileName, bytes):
geoTiffFileName = geoTiffFileName.decode('utf-8')
# check whether the geoTiff file exists
if not os.path.isfile(geoTiffFileName):
print("The geoTiff file", geoTiffFileName, "does not exists. Exiting ...")
sys.exit()
try:
with rasterio.open(geoTiffFileName) as src:
data_array = src.read(1)
forward_transform = src.transform
nodata = src.nodata
except Exception:
# Any issue with rasterio, fall back to the GDAL-based implementation
return self.interpolatorFromGeoTiff(geoTiffFileName, pointList, dataType=dataType)
# Replace nodata with a fill value so sampled points don't get -9999 (or similar).
# Use the minimum valid value in the band so elevation stays plausible; if all nodata, use 0.
if nodata is not None:
valid = data_array[data_array != nodata]
fill_value = float(np.min(valid)) if valid.size else 0.0
else:
fill_value = None
reverse_transform = ~forward_transform
interpolatedValues = []
# loop through all points in the list
for pointI in range(pointList.shape[0]):
x = pointList[pointI,0]
y = pointList[pointI,1]
px, py = reverse_transform * (x, y)
px, py = int(px + 0.5), int(py + 0.5)
# make sure px and py are within the data_array bounds
if px < 0 or px >= data_array.shape[1] or py < 0 or py >= data_array.shape[0]:
if px < 0:
px = 0
elif px >= data_array.shape[1]:
px = data_array.shape[1] - 1
if py < 0:
py = 0
elif py >= data_array.shape[0]:
py = data_array.shape[0] - 1
# Sample with the pixel coordinates. Note that py should be first because
# the index is [rows, columns] in a 2D grid in python
# Return a scalar (like the original GDAL-based implementation), not a 0-D array.
val = data_array[py, px]
if fill_value is not None and (val == nodata or (np.isnan(nodata) and np.isnan(val))):
val = fill_value
interpolatedValues.append(val)
return interpolatedValues
[docs]
def interpolatorFromGeoTiff(self, geoTiffFileName, pointList, dataType=np.float64):
"""Interpolate from a GeoTiff file given a point list.
Parameters
----------
geoTiffFileName: str
name of the GeoTiff file
pointList: list
list of points (x,y)
dataType : int, optional
data type in the GeoTiff (default is numpy.float)
Returns
-------
"""
try:
from osgeo import gdal
except ImportError:
raise ImportError('Error in importing GDAL package. Make sure GDAL has been installed properly.')
#check whether the geoTiff file exists
if not os.path.isfile(geoTiffFileName):
print("The geoTiff file", geoTiffFileName, "does not exists. Exiting ...")
sys.exit()
# Read raster
source = gdal.Open(geoTiffFileName,gdal.GA_ReadOnly)
#print(source)
# Read the raster band as separate variable
data_array = np.array(source.GetRasterBand(1).ReadAsArray())
#print(data_array.shape)
# Print only selected metadata:
#print ("[ NO DATA VALUE ] = ", band.GetNoDataValue()) # none
#print ("[ MIN ] = ", band.GetMinimum())
#print ("[ MAX ] = ", band.GetMaximum())
forward_transform = affine.Affine.from_gdal(*source.GetGeoTransform())
reverse_transform = ~forward_transform
interpolatedValues = []
#loop through all points in the list
for pointI in range(pointList.shape[0]):
x = pointList[pointI,0]
y = pointList[pointI,1]
px, py = reverse_transform * (x, y)
px, py = int(px + 0.5), int(py + 0.5)
#make sure px and py are within the data_array bounds
if px < 0 or px >= data_array.shape[1] or py < 0 or py >= data_array.shape[0]:
#if gVerbose:
# print("Warning within the interpolatorFromGeoTiff() function of RAS_2D_Data.py: The point (", x, ",", y, ") is outside the data_array bounds. Using the boundary value instead ...")
if px < 0:
px = 0
elif px >= data_array.shape[1]:
px = data_array.shape[1] - 1
if py < 0:
py = 0
elif py >= data_array.shape[0]:
py = data_array.shape[0] - 1
#Sample with the pixel coordinates. Note that py should be first because
# the index is [rows, columns] in a 2@ grid in python
interpolatedValues.append(data_array[py][px])
source = None
return interpolatedValues
[docs]
def get2DAreaCellFacePointsIndexes(self, area):
"""Get 2D Flow Area cell face points indexes for a specified 2D area
Parameters
----------
area : str
name of the area
Returns
-------
"""
hf = h5py.File(self.hdf_filename,'r')
hdf2DAreaCellFacePointsIndexes = np.array(hf['Geometry']['2D Flow Areas'][area]
['Cells FacePoint Indexes'])
#print(hdf2DAreaCellFacePointsIndexes)
hf.close()
return hdf2DAreaCellFacePointsIndexes
[docs]
def get2DAreaCellCenterCoordiantes(self, area):
"""Get 2D Flow Area cell center coordinates for a specified 2D area
Note: these coordinates only have (x,y), no z.
Parameters
----------
area : str
name of area
Returns
-------
"""
hf = h5py.File(self.hdf_filename, 'r')
#in HEC-RAS v5.0.7 and earlier, the 'Cells Center Coordinate' contains also the ghost cell centers
#Here, we only need the real centers, not the ghost.
#Get 2D Flow Areas Attributes table
twoDFlowAreaAttributs = hf['Geometry']['2D Flow Areas']['Attributes']
#index of the 2D flow area in the geometry (in case there are more than one 2D flow areas)
areaIndex = 0
bFound = False
for name in twoDFlowAreaAttributs['Name']:
if name == area:
bFound = True
break
areaIndex += 1
if not bFound:
print("The specified area with name", area, "was not found in the geometry. Exiting ...")
sys.exit()
#Get the start and end of current 2D flow area cell index
temp = hf['Geometry']['2D Flow Areas']['Cell Info']
iStart = temp[areaIndex,0]
iEnd = temp[areaIndex,1]
hdf2DAreaCellCenterCoordinates = np.array(hf['Geometry']['2D Flow Areas'][area]
['Cells Center Coordinate'])[iStart:iEnd,:]
# print(hdf2DAreaCellCenterCoordinates)
hf.close()
return hdf2DAreaCellCenterCoordinates
[docs]
def get2DAreaFacePointsCoordinates(self, area):
"""Get the face points' coordinates for a specified 2D area
Parameters
----------
area : str
name of area
Returns
-------
"""
hf = h5py.File(self.hdf_filename,'r')
hdf2DAreaFacePointsCoordinates = np.array(hf['Geometry']['2D Flow Areas'][area]['FacePoints Coordinate'])
#print(hdf2DAreaFacePointsCoordinates)
#add the third column to the coordinates array for the z-coordinates (RAS2D only exports x and y, not z)
z_temp = np.zeros(hdf2DAreaFacePointsCoordinates.shape[0])
hdf2DAreaFacePointsCoordinates3D = np.column_stack((hdf2DAreaFacePointsCoordinates, z_temp))
#interpolate the elevation (bathymetry) to z coordinates
self.interpolateZcoord2Points(hdf2DAreaFacePointsCoordinates3D)
#print(hdf2DAreaFacePointsCoordinates3D)
hf.close()
return hdf2DAreaFacePointsCoordinates3D
[docs]
def interpolateZcoord2Points(self, facePointsCoordinates3D):
"""Interpolate the elevation (bathymetry) to 2D area's points (z-coordinate)
Parameters
----------
facePointsCoordinates3D : numpy.ndarray
face points coordinates in 3D
Returns
-------
"""
allFacePointsCoordiantes=np.empty([facePointsCoordinates3D.shape[0],2])
for k in range(facePointsCoordinates3D.shape[0]):
x1 = facePointsCoordinates3D[k,0]
y1 = facePointsCoordinates3D[k,1]
allFacePointsCoordiantes[k,0] = x1
allFacePointsCoordiantes[k,1] = y1
# Prefer rasterio-based interpolation when available; fall back to the GDAL-based version.
allFacePointZ = self.interpolatorFromGeoTiff_rasterio(
self.terrain_tiff_filename, allFacePointsCoordiantes
)
for k in range(facePointsCoordinates3D.shape[0]):
facePointsCoordinates3D[k,2] = allFacePointZ[k]
[docs]
def get2DAreaCellsFaceOrientationInfo(self, area):
"""Get cells face and orientation info for a specified 2D area
Parameters
----------
area : str
name of area
Returns
-------
"""
hf = h5py.File(self.hdf_filename,'r')
hdf2DAreaCellsFaceOrientationInfo = np.array(hf['Geometry']['2D Flow Areas'][area]['Cells Face and Orientation Info'])
#print(hdf2DAreaCellsFaceOrientationInfo)
hf.close()
return hdf2DAreaCellsFaceOrientationInfo
[docs]
def get2DAreaCellsFaceOrientationValues(self, area):
"""Get cells face and orientation values for a specified 2D area
Parameters
----------
area : str
name of area
Returns
-------
"""
hf = h5py.File(self.hdf_filename,'r')
hdf2DAreaCellsFaceOrientationValues = np.array(hf['Geometry']['2D Flow Areas'][area]['Cells Face and Orientation Values'])
#print(hdf2DAreaCellsFaceOrientationValues)
hf.close()
return hdf2DAreaCellsFaceOrientationValues
[docs]
def get2DAreaFaceAreaElevationInfo(self, area):
"""Get faces area elevation info for a specified 2D area
Two columns: first column-starting index, second column-length of record
Parameters
----------
area : str
name of area
Returns
-------
"""
hf = h5py.File(self.hdf_filename,'r')
hdf2DAreaFaceAreaElevationInfo = np.array(hf['Geometry']['2D Flow Areas'][area]['Faces Area Elevation Info'])
#print(hdf2DAreaFaceAreaElevationInfo)
hf.close()
return hdf2DAreaFaceAreaElevationInfo
[docs]
def get2DAreaFaceAreaElevationValues(self, area):
"""Get faces area elevation values for a specified 2D area
Four columns: first column-elevation, second column-area
third column-wetted perimeter, fourth column-Manning's n (a constant as of now RAS version <=6)
Parameters
----------
area : str
name of area
Returns
-------
"""
hf = h5py.File(self.hdf_filename,'r')
hdf2DAreaFaceAreaElevationValues = np.array(hf['Geometry']['2D Flow Areas'][area]['Faces Area Elevation Values'])
#print(hdf2DAreaFaceAreaElevationValues)
hf.close()
return hdf2DAreaFaceAreaElevationValues
[docs]
def get2DAreaFaceFacePointIndexes(self, area):
"""Get face's facepoint indexes (two interger IDs)
Parameters
----------
area : str
name of area
Returns
-------
"""
hf = h5py.File(self.hdf_filename,'r')
hdf2DAreaFaceFacePointIndexes = np.array(hf['Geometry']['2D Flow Areas'][area]['Faces FacePoint Indexes'])
#print(hdf2DAreaFaceFacePointIndexes)
hf.close()
return hdf2DAreaFaceFacePointIndexes
[docs]
def hdf2DAreaResultVar(self, area, varName):
"""Get HEC-RAS solution time series result for a given variable
Parameters
----------
area : str
name of area
varName : str
variable name
Returns
-------
"""
hf = h5py.File(self.hdf_filename,'r')
hdf2DAreaResultVar = np.array(hf['Results']['Unsteady']['Output']['Output Blocks']
['Base Output']['Unsteady Time Series']['2D Flow Areas'][area][varName])
#only take the last row (time step)
#hdf2DAreaResultVar = hdf2DAreaResultVar[(hdf2DAreaResultVar.shape[0]-1)]
#print(varName, hdf2DAreaResultVar)
hf.close()
return hdf2DAreaResultVar
[docs]
def get2DAreaSolutionTimes(self):
"""Get 2D Flow Area solution times
Returns
-------
hdf2DAreaSolutionTimes : list
list of solution times
"""
hf = h5py.File(self.hdf_filename,'r')
hdf2DAreaSolutionTimes = np.array(hf['Results']['Unsteady']['Output']['Output Blocks']
['Base Output']['Unsteady Time Series']['Time'])
#print(hdf2DAreaSolutionTimes)
hf.close()
return hdf2DAreaSolutionTimes
[docs]
def get2DAreaSolutionTimeDates(self):
"""Get 2D Flow Area solution time_dates
Returns
-------
hdf2DAreaSolutionTimeDates : list
list of solution time and dates
"""
hf = h5py.File(self.hdf_filename,'r')
hdf2DAreaSolutionTimeDates = np.array(hf['Results']['Unsteady']['Output']['Output Blocks']
['Base Output']['Unsteady Time Series']['Time Date Stamp'])
#print(hdf2DAreaSolutionTimeDates)
hf.close()
return hdf2DAreaSolutionTimeDates
[docs]
def load2DAreaSolutions(self):
"""Load solutions (cell depth and water surface elevation, node X and Y vel.) for all 2D areas
Returns
-------
"""
if gVerbose: print('Loading 2D area solutions ...')
#loop through each 2D areas
for area,i in zip(self.TwoDAreaNames, range(len(self.TwoDAreaNames))):
#print("2D Flow Area = ", area)
#fetch the depth and WSE data
depth_name = '' #name of depth variable in HDF file
WSE_name = '' #name of WSE variable in HDF file
nodeVx_name = '' #name of nodal x velocity in HDF file
nodeVy_name = '' # name of nodal y velocity in HDF file
if self.version == '5.0.7':
depth_name = 'Depth'
WSE_name = 'Water Surface'
nodeVx_name = 'Node X Vel'
nodeVy_name = 'Node Y Vel'
elif self.version == '6.0.0' or self.version == '6.1.0':
depth_name = 'Cell Invert Depth'
WSE_name = 'Water Surface'
nodeVx_name = 'Node Velocity - Velocity X'
nodeVy_name = 'Node Velocity - Velocity Y'
elif self.version == '6.6': #For v6.6, the nodal velocity is not exported correctly; thus, we use the cell center velocity.
depth_name = 'Cell Invert Depth'
WSE_name = 'Water Surface'
cellVx_name = 'Cell Velocity - Velocity X'
cellVy_name = 'Cell Velocity - Velocity Y'
nodeVx_name = 'Node Velocity - Velocity X'
nodeVy_name = 'Node Velocity - Velocity Y'
else:
raise Exception("The version of HEC-RAS that produced this HDF result file is not supported.")
try:
cellDepth = self.hdf2DAreaResultVar(area, depth_name)
cellWSE = self.hdf2DAreaResultVar(area, WSE_name)
if self.version == '6.6':
cellVx = self.hdf2DAreaResultVar(area, cellVx_name)
cellVy = self.hdf2DAreaResultVar(area, cellVy_name)
except KeyError as e:
raise KeyError(f"Variable '{depth_name}' or '{WSE_name}' not found in 2D area '{area}'. Available variables may vary by HEC-RAS version.")
#slice the cell data array to get values only for cells in current 2D area.
#The HEC-RAS results also contain values at the center of boundary faces.
cellDepth = cellDepth[0:(self.TwoDAreaCellCounts[i])]
cellWSE = cellWSE[0:(self.TwoDAreaCellCounts[i])]
if self.version == '6.6':
cellVx = cellVx[0:(self.TwoDAreaCellCounts[i])]
cellVy = cellVy[0:(self.TwoDAreaCellCounts[i])]
self.TwoDAreaCellDepth.append(cellDepth)
self.TwoDAreaCellWSE.append(cellWSE)
if self.version == '6.6':
self.TwoDAreaCellVx.append(cellVx)
self.TwoDAreaCellVy.append(cellVy)
self.TwoDAreaCellVz.append(cellVx*0.0) #fake z velocity (=0)
#fetch point data
#Node X Vel and Node Y Vel
pointVx=self.hdf2DAreaResultVar(area, nodeVx_name)
pointVy=self.hdf2DAreaResultVar(area, nodeVy_name)
#deal with nan in pointVx and pointVy
pointVx = np.where(np.isnan(pointVx), 0.0, pointVx)
pointVy = np.where(np.isnan(pointVy), 0.0, pointVy)
#create pointVz which is zero
pointVz = np.zeros_like(pointVx)
#print("pointVx = ", pointVx)
#print("pointVy = ", pointVy)
self.TwoDAreaPointVx.append(pointVx)
self.TwoDAreaPointVy.append(pointVy)
self.TwoDAreaPointVz.append(pointVz)
[docs]
def build2DAreaFacePointCoordinatesList(self):
"""Build face point coordinates list
Returns
-------
"""
if gVerbose: print("Building 2D area's face point coordinates list ...")
#loop through each 2D areas
for area,i in zip(self.TwoDAreaNames, range(len(self.TwoDAreaNames))):
#print("2D Flow Area = ", area)
#get the FacePoint coordinates in the current 2D flow area
facePointsCoordinates = self.get2DAreaFacePointsCoordinates(area)
self.TwoDAreaFacePointCoordinatesList.append(facePointsCoordinates)
[docs]
def build2DAreaCellFaceList(self):
"""Build cell's face list
Returns
-------
"""
if gVerbose: print("Building 2D area's cell face list ...")
#loop through each 2D areas
for area,i in zip(self.TwoDAreaNames, range(len(self.TwoDAreaNames))):
#print("2D Flow Area = ", area)
#get cells face and orientation info
cellsFaceOrientationInfo = self.get2DAreaCellsFaceOrientationInfo(area)
#get cells face and orientation values
cellsFaceOrientationValues = self.get2DAreaCellsFaceOrientationValues(area)
#temp list to store the numpy arrays (face list) for each cell in current area
currAreaCellFaceList = []
#loop through cells in current area
for cellI in range(self.TwoDAreaCellCounts[i]):
start_row = cellsFaceOrientationInfo[cellI,0]
end_row = cellsFaceOrientationInfo[cellI,0] + cellsFaceOrientationInfo[cellI,1]
faceList = cellsFaceOrientationValues[start_row:end_row,0].copy() #make a copy
#print("faceList =", faceList)
currAreaCellFaceList.append(faceList)
#append the current area's cell face list to the top list
self.TwoDAreaCellFaceList.append(currAreaCellFaceList)
[docs]
def build2DAreaFaceProfile(self):
"""Build face profile
Face profile in HEC-RAS represents the sub-grid terrain.
Returns
-------
"""
if gVerbose: print("Building 2D area's face profile ...")
#loop through each 2D areas
for area,i in zip(self.TwoDAreaNames, range(len(self.TwoDAreaNames))):
#print("2D Flow Area = ", area)
face_facePoints = self.TwoDAreaFace_FacePoints[i]
#get the FacePoint coordinates in the current 2D flow area
facePointsCoordinates = self.get2DAreaFacePointsCoordinates(area)
#calculate how many points there are in all the profiles
# loop through all faces in current area (it is a slow process; print a progress bar)
totalNumProfilePoints = self.nFaceProfilePoints * face_facePoints.shape[0]
#array to store all profile points (for all faces)
totalProfilePoints = np.zeros((totalNumProfilePoints,3))
curAreaProfilePoints = []
#loop through all faces in current area (it is a slow process; print a progress bar)
for faceI in range(face_facePoints.shape[0]):
#print("faceI, face_facePoints.shape[0] = ", faceI, face_facePoints.shape[0])
printProgressBar(faceI, face_facePoints.shape[0], "Face profile computation in progress")
#print("faceI =", faceI)
facePoint_start = face_facePoints[faceI,0] #start ID of facePoint
facePoint_end = face_facePoints[faceI,1] #end ID of facePoint
#get the coordinates of the start and end facePoints
startFacePointCoordinates = facePointsCoordinates[facePoint_start]
endFacePointCoordinates = facePointsCoordinates[facePoint_end]
length = self.horizontalDistance(startFacePointCoordinates,endFacePointCoordinates)
stations = np.linspace(0,length,self.nFaceProfilePoints,endpoint=True)
#if faceI == 0: print(repr(stations))
#print(startFacePointCoordinates)
#print(endFacePointCoordinates)
for pointI in range(self.nFaceProfilePoints):
totalProfilePoints[pointI+self.nFaceProfilePoints*faceI,:] = (startFacePointCoordinates + (endFacePointCoordinates-startFacePointCoordinates)
*pointI/(self.nFaceProfilePoints-1))
#interpolate elevation to all profile points
self.interpolateZcoord2Points(totalProfilePoints)
#unpack the points and assign to each face profile
for faceI in range(face_facePoints.shape[0]):
profilePoints = totalProfilePoints[self.nFaceProfilePoints*faceI:(self.nFaceProfilePoints*(faceI+1)),:]
curAreaProfilePoints.append(profilePoints)
self.TwoDAreaFaceProfile.append(curAreaProfilePoints)
[docs]
def interpolateManningN_face_to_cell(self):
"""interpolate Manning's n from face to cell center
HEC-RAS 2D stores Manning's n value at faces, not cell centers. We need to
interpolate its value from face to cell center. Another option is to use the
cell center coordinates and direclty query the Manning's n layer in RAS Mapper, which
might be too complicated because HEC-RAS has default Manning's n, Land Use Cover, Override polygon, etc.
Here, the interpolation from face to cell might not be a bad choice.
For each face, HEC-RAS currently does not support varying Manning's n (horizontally and veritcally). Thus,
effectively, it is a constant value for each face. This may change in the future.
Returns
-------
"""
if gVerbose: print("Interpolating Manning's n from face to cell center ...")
#clear the list up in case this function has been called before
self.TwoDAreaCellManningN = []
#loop through each 2D areas
for area,i in zip(self.TwoDAreaNames, range(len(self.TwoDAreaNames))):
#print("2D Flow Area = ", area)
temp_n = np.zeros(self.TwoDAreaCellCounts[i])
#loop through cells in current area
for cellI in range(self.TwoDAreaCellCounts[i]):
sum_n = 0.0
#loop through all faces of current cell
for faceI in self.TwoDAreaCellFaceList[i][cellI]:
sum_n += self.TwoDAreaFaceHydraulicInformation[i][faceI][0,3] #only take the first value because
#it is constant for each face
temp_n[cellI] = sum_n/len(self.TwoDAreaCellFaceList[i][cellI])
self.TwoDAreaCellManningN.append(temp_n)
[docs]
def buildCellManningNFromGeoTiffHDF(self):
"""Build 2D flow area cell's Manning n value from HEC-RAS's GeoTiff and HDF files for Manning n.
This method is more accurate than interpolateManningN_face_to_cell(), which uses a face to cell interpolation.
Also need to consider the case where the "Land Cover Filename" is empty and the whole domain uses the
default constant Manning's n.
Returns
-------
"""
if gVerbose: print("Building cell's Manning n values from GeoTiff and HDF ...")
self.TwoDAreaCellManningN = []
self.TwoDAreaCellManningN.append(np.zeros(self.TwoDAreaCellCounts[0]))
cell_center_coordinates = np.array(self.get2DAreaCellCenterCoordiantes(self.TwoDAreaNames[0]))
#loop through each cell
#for cellI in range(self.TwoDAreaCellCounts[0]):
# ManningN_ID = int(self.ManningNIDInterpolator(cell_center_coordinates[cellI,0],
# cell_center_coordinates[cellI,1]))
# cell_ManningN[cellI] = self.ManningNZones[ManningN_ID][1] #get the Manning n value
#get the full land cover file name (deal with relative path w.r.t. the script where RAS_2D_Data class is used)
if os.path.dirname(self.hdf_filename) == '':
fileBase = b''
else:
fileBase = str.encode(os.path.dirname(self.hdf_filename) + '/')
#HEC-RAS v5's land cover file name stores the tiff file name, while in
#v6 it stores the hdf file name. For v6, I assume the tiff file name is
#land cover layer name.tif. Need to check this assumption.
if self.version == '5.0.7':
full_landcover_filename = fileBase+self.landcover_filename
elif self.version == '6.0.0' or self.version == '6.1.0' or self.version == '6.6':
full_landcover_filename = (fileBase+self.landcover_filename[:-4])+b'.tif' #note: it may already has the ".tif" extension
else:
errMessage = "HEC-RAS version not supported. version = " + self.version
raise Exception(errMessage)
if self.landcover_filename == b'': #if there is no "Land cover Filename" specified.
ManningN_IDs = [0] * self.TwoDAreaCellCounts[0]
else:
# Prefer rasterio-based interpolation when available; fall back to the GDAL-based version.
ManningN_IDs = self.interpolatorFromGeoTiff_rasterio(
full_landcover_filename, cell_center_coordinates
)
#set the Manning's n values for each cells
for cellI in range(self.TwoDAreaCellCounts[0]):
self.TwoDAreaCellManningN[0][cellI] = self.ManningNZones[ManningN_IDs[cellI]][1] #get the Manning n value
#bin each cell to different Manning's n zones
self.cellsInManningZones = [ [] for _ in range(len(self.ManningNZones))] # a list of lists
for cellI in range(self.TwoDAreaCellCounts[0]):
if self.cellsInManningZones[ManningN_IDs[cellI]]:
self.cellsInManningZones[ManningN_IDs[cellI]].append(cellI)
else:
self.cellsInManningZones[ManningN_IDs[cellI]] = [cellI]
#print("cellsInManningZones = ", self.cellsInManningZones)
[docs]
def buildFace_FacePoints(self):
"""Build face's facepoint list
Returns
-------
"""
if gVerbose: print("Building face's facepoints list ...")
#loop through each 2D areas
for area,i in zip(self.TwoDAreaNames, range(len(self.TwoDAreaNames))):
#print("2D Flow Area = ", area)
#get cells face and orientation info
faceFacePointIndexes = self.get2DAreaFaceFacePointIndexes(area)
self.TwoDAreaFace_FacePoints.append(faceFacePointIndexes)
[docs]
def saveHEC_RAS2D_results_to_VTK(self,timeStep=-1,lastTimeStep=False,fileNameBase='',dir='',bFlat=False):
"""Save HEC-RAS 2D solutions to VTK files.
Note:
- Each area saved separately
- Each time saved separately
The resulted files will be RAS2D_areaName_timeSequence.vtk, e.g.,
RAS2D_SpringCreek_0001.vtk, RAS2D_SpringCreek_0002.vtk, etc.
The option lastTimeStep specifies whether only the last time step is saved (default=False).
The option timeStep specifies the particular step to be saved.
If both lastTimeSTep and timeStep are specified, lastTimeStep has the priority.
Note:
- In most HEC-RAS versions, the velocity can be exported at nodes (face points). But somehow in HEC-RAS 6.6,
the nodal velocity is not exported correctly (it has overflow float points; don't know why); however, the velocity at cell centers is correct.
Thus, we save the cell center velocity for version 6.6.
Parameters
----------
timeStep : int, optional
only the specified time step will be saved
lastTimeStep : bool, optional
specify only the last time step
dir : str, optional
directory name to write to
Returns
-------
"""
if gVerbose: print('Saving RAS-2D results to VTK ...')
#check the sanity of timeStep
if (timeStep != -1) and (not timeStep in range(len(self.solution_time))):
message = "Specified timeStep = %d not in range (0 to %d)." % (timeStep, len(self.solution_time))
sys.exit(message)
#get units for variable name appendix (like SRH-2D)
varLengthNameAppendix = ''
varVelocityNameAppendix = ''
if gVerbose: print("HEC-RAS project units =",self.units)
if self.units == 'Feet':
varLengthNameAppendix = '_ft'
varVelocityNameAppendix = '_ft_p_s'
elif self.units == 'Meter':
varLengthNameAppendix = '_m'
varVelocityNameAppendix = '_m_p_s'
else:
print("Wrong units specified in the HEC-RAS project file. Units = ", self.units)
print("Exiting ...")
sys.exit()
#loop through each 2D areas
for area,i in zip(self.TwoDAreaNames, range(len(self.TwoDAreaNames))):
#print("2D Flow Area = ", area)
#get the FacePoint indexes
cellFacePointIndexes = self.get2DAreaCellFacePointsIndexes(area)
#get the FacePoint coordinates (3D, the elevation is interpolated from terrain)
facePointsCoordinates = self.get2DAreaFacePointsCoordinates(area)
if bFlat: # if we want the mesh to be flat, then z = 0.0
facePointsCoordinates[:,2] = 0.0
#get cells face and orientation info
cellsFaceOrientationInfo = self.get2DAreaCellsFaceOrientationInfo(area)
#get current 2D area's solutions
cellDepth = self.TwoDAreaCellDepth[i][:,0:self.TwoDAreaCellCounts[i]]
cellWSE = self.TwoDAreaCellWSE[i][:,0:self.TwoDAreaCellCounts[i]]
if self.version == '6.6':
cellVx = self.TwoDAreaCellVx[i][:,0:self.TwoDAreaCellCounts[i]]
cellVy = self.TwoDAreaCellVy[i][:,0:self.TwoDAreaCellCounts[i]]
cellVz = self.TwoDAreaCellVz[i][:,0:self.TwoDAreaCellCounts[i]]
pointVx = self.TwoDAreaPointVx[i]
pointVy = self.TwoDAreaPointVy[i]
pointVz = self.TwoDAreaPointVz[i]
#check if the point velocity is overflowed or underflowed. If so, set it to 0.0
pointVx[pointVx > 1.0e+10] = 0.0
pointVx[pointVx < -1.0e+10] = 0.0
pointVy[pointVy > 1.0e+10] = 0.0
pointVy[pointVy < -1.0e+10] = 0.0
pointVz[pointVz > 1.0e+10] = 0.0
pointVz[pointVz < -1.0e+10] = 0.0
#check if the point velocity is NaN. If so, set it to 0.0
pointVx[np.isnan(pointVx)] = 0.0
pointVy[np.isnan(pointVy)] = 0.0
pointVz[np.isnan(pointVz)] = 0.0
#add Manning's n value for visualization
cellManningN = self.TwoDAreaCellManningN[i]
#print("self.TwoDAreaCellDepth = ", self.TwoDAreaCellDepth)
#print("type(cellDepth) =", type(cellDepth))
#print("cellDepth =", cellDepth)
# list of vtkFileName (to be returned to caller)
vtkFileNameList = []
#loop through solution times
for timeI in range(len(self.solution_time)):
if lastTimeStep:
if timeI < (len(self.solution_time)-1):
continue
if (timeStep != -1) and (timeI != timeStep):
continue
if gVerbose: print("timeI = ", timeI)
#points
pointsVTK = vtk.vtkPoints()
pointsVTK.SetData(VN.numpy_to_vtk(facePointsCoordinates))
#cell topology information list: [num. of FP, FP0, FP1, .., num. of FP, FPxxx]
#the list start with the number of FP for a cell and then the list of FP indexes
connectivity_list = []
#type of cells (contains the number of face points
#celltypes = np.zeros(self.TwoDAreaCellCounts[i], dtype=np.int64)
cellFPCounts = np.zeros(self.TwoDAreaCellCounts[i], dtype=np.int64)
#loop through each cell in the current 2D area to get their face point indexes
for celli in range(self.TwoDAreaCellCounts[i]):
#get the number of face points (=number of faces)
numFP = cellsFaceOrientationInfo[celli,1]
#print("numFP = ", numFP)
if numFP > gMax_Nodes_per_Element:
print("The number of face points, %d, for current face is more than the maximum allowed (%d)."
% (numFP, gMax_Nodes_per_Element))
print("Exiting ...")
sys.exit()
connectivity_list.append(numFP)
for fpI in range(numFP):
connectivity_list.append(cellFacePointIndexes[celli][fpI])
cellFPCounts[celli] = numFP
connectivity = np.array(connectivity_list, dtype=np.int64)
#convert cell's number of face points to VTK cell type
vtkHandler_obj = vtkHandler()
cell_types = vtkHandler_obj.number_of_nodes_to_vtk_celltypes(cellFPCounts)
#for vtk version > 9, it seems the vtkUnstructuredGrid's SetCells() function has changed.
#The following is to convert cell_types from numpy's array to list
cell_types = cell_types.tolist()
cellsVTK = vtk.vtkCellArray()
cellsVTK.SetCells(self.TwoDAreaCellCounts[i], VN.numpy_to_vtkIdTypeArray(connectivity))
uGrid = vtk.vtkUnstructuredGrid()
uGrid.SetPoints(pointsVTK)
uGrid.SetCells(cell_types, cellsVTK)
#add solutions
#add cell center data
currentTimeCellDepth = cellDepth[timeI,:]
currentTimeCellWSE = cellWSE[timeI,:]
if self.version == '6.6':
currentTimeCellVx = cellVx[timeI,:]
currentTimeCellVy = cellVy[timeI,:]
currentTimeCellVz = cellVz[timeI,:]
currentTimePointVx = pointVx[timeI,:]
currentTimePointVy = pointVy[timeI,:]
currentTimePointVz = pointVz[timeI,:]
#combine cell center velocity components into a vector (numpy array)
if self.version == '6.6':
currentTimeCellV = np.zeros((len(currentTimeCellVx),3))
currentTimeCellV[:,0] = currentTimeCellVx
currentTimeCellV[:,1] = currentTimeCellVy
currentTimeCellV[:,2] = currentTimeCellVz
#combine point velocity components into a vector (numpy array)
currentTimePointV = np.zeros((len(currentTimePointVx),3))
currentTimePointV[:,0] = currentTimePointVx
currentTimePointV[:,1] = currentTimePointVy
currentTimePointV[:,2] = currentTimePointVz
currentTimePointElevation = facePointsCoordinates[:,2] #z coordinate of nodes
currentTimeCellManningN = cellManningN #the same; no time change
cell_data = uGrid.GetCellData() # This holds cell data
point_data = uGrid.GetPointData() # This holds point data
#cell depth
currentTimeCellDepth_array = VN.numpy_to_vtk(currentTimeCellDepth)
currentTimeCellDepth_array.SetName('Water_Depth'+varLengthNameAppendix)
cell_data.AddArray(currentTimeCellDepth_array)
#cell WSE
currentTimeCellWSE_array = VN.numpy_to_vtk(currentTimeCellWSE)
currentTimeCellWSE_array.SetName('Water_Elev'+varLengthNameAppendix)
cell_data.AddArray(currentTimeCellWSE_array)
#cell center velocity
if self.version == '6.6':
currentTimeCellV_array = VN.numpy_to_vtk(currentTimeCellV)
currentTimeCellV_array.SetName('Velocity_cell'+varVelocityNameAppendix)
cell_data.AddArray(currentTimeCellV_array)
#cell center Manning's n
currentTimeCellManningN_array = VN.numpy_to_vtk(currentTimeCellManningN)
currentTimeCellManningN_array.SetName('ManningN')
cell_data.AddArray(currentTimeCellManningN_array)
#point velocity
currentTimePointV_array = VN.numpy_to_vtk(currentTimePointV)
currentTimePointV_array.SetName('Velocity_node'+varVelocityNameAppendix)
point_data.AddArray(currentTimePointV_array)
#point elevation
currentTimePointElevation_array = VN.numpy_to_vtk(currentTimePointElevation)
currentTimePointElevation_array.SetName('Bed_Elev'+varLengthNameAppendix)
point_data.AddArray(currentTimePointElevation_array)
#add field data:
# - Time (float)
# - Time_Date (string)
#field_data = {'TIME': np.array([self.solution_time[timeI]]), 'DATE_TIME': str(self.solution_time_date[timeI],'utf-8')}
field_data = {'TIME': np.array([self.solution_time[timeI]])}
#write to vtk file
if fileNameBase=='':
fileNameBase='RAS2D_'
fileName_temp = []
if dir!= '':
fileName_temp = [dir, '/', fileNameBase, self.plan_name, '_', area.astype(str), '_', str(timeI).zfill(4),'.vtk']
else:
fileName_temp = [fileNameBase, self.plan_name, '_', area.astype(str), '_', str(timeI).zfill(4), '.vtk']
vtkFileName = "".join(fileName_temp)
# write out the ugrid
unstr_writer = vtk.vtkUnstructuredGridWriter() # this can save as vtu format
unstr_writer.SetFileName(vtkFileName)
unstr_writer.SetInputData(uGrid)
unstr_writer.Write()
# add the vtkFileName to vtkFileNameList
vtkFileNameList.append(vtkFileName)
# vtkFileNameList
return vtkFileNameList
[docs]
def exportSRHGEOMFile(self, srhgeomFileName, twoDAreaNumber = 0):
"""Export srhgeom file
Parameters
----------
srhgeomFileName : str
name of the srhgeom file
twoDAreaNumber : int, optional
2D flow area number (default = 0)
Returns
-------
"""
#only one 2D area is exported.
# get the cell's FacePoint indexes
cellFacePointIndexes = self.get2DAreaCellFacePointsIndexes(self.TwoDAreaNames[twoDAreaNumber])
# get the FacePoint coordinates
facePointsCoordinates = self.TwoDAreaFacePointCoordinatesList[twoDAreaNumber]
# get cells face and orientation info
cellsFaceOrientationInfo = self.get2DAreaCellsFaceOrientationInfo(self.TwoDAreaNames[twoDAreaNumber])
#write out to srhgeom file
fname = srhgeomFileName + '.srhgeom'
if gVerbose: print("Writing SRHGEOM file: ", fname)
try:
fid = open(fname, 'w')
except IOError:
print('.srhgeom error')
sys.exit()
fid.write('SRHGEOM 30\n')
fid.write('Name \"HEC-RAS 2D Mesh %s\"\n' % srhgeomFileName)
fid.write('\n')
fid.write('GridUnit \"%s\" \n' % self.units)
#all cells
for cellI in range(self.TwoDAreaCellCounts[0]):
cell_list = cellFacePointIndexes[cellI,0:cellsFaceOrientationInfo[cellI,1]]
for i in range(len(cell_list)):
cell_list[i] += 1
fid.write("Elem ")
fid.write("%d " % (cellI+1)) #cellI+1 because SRH-2D is 1-based
fid.write(" ".join(map(str, cell_list)))
fid.write("\n")
#all points
for pointI in range(facePointsCoordinates.shape[0]):
fid.write("Node %d " % (pointI+1)) #pointI+1 because SRH-2D is 1-based
curr_point_coordinates = [facePointsCoordinates[pointI,0],
facePointsCoordinates[pointI,1],
facePointsCoordinates[pointI,2]]
fid.write(" ".join(map(str, curr_point_coordinates)))
fid.write("\n")
#NodeString
boundary_id = 0 #boundary ID counter
#loop through all boundaries (only pick those used in current 2D flow area and with type = "External")
for k in range(self.totalBoundaries):
boundary_id += 1
#check the current boundary is used by the current 2D flow area and its type
if self.TwoDAreaBoundaryNamesTypes[k][1] != self.TwoDAreaNames[twoDAreaNumber] or \
self.TwoDAreaBoundaryNamesTypes[k][2] != b"External":
continue
boundaryName = self.TwoDAreaBoundaryNamesTypes[k][0]
fid.write("NodeString %d " % boundary_id)
#loop over all node ID in the current NodeString
for i in range(self.boundaryTotalPoints[boundary_id-1]):
fid.write(" %d" % (self.boundaryPointList[boundary_id-1,i]+1)) #all point IDs are +1 becaue SRH-2D is 1-based and RAS2D is 0-based.
# 10 numbers per line or this is the last node, start a new line
if (((i + 1) % 10) == 0) or (i == (self.boundaryTotalPoints[boundary_id-1] - 1)):
fid.write("\n")
fid.write("\n")
fid.close()
print("SRHGEOM file exported successfully: ", fname)
[docs]
def exportSRHMATFile(self, srhmatFileName):
"""Export the SRHMAT file
Parameters
----------
srhmatFileName : str
name of the srhmat file to write to
Returns
-------
"""
#only the first 2D area is exported.
cell_ManningN = self.TwoDAreaCellManningN[0] #cell Manning's n
if gVerbose: print("cell_ManningN", cell_ManningN)
#number of Manning's n zones
nManningNZones = len(self.ManningNZones)
if gVerbose: print("nManningNZones", nManningNZones)
fname = srhmatFileName + '.srhmat'
if gVerbose: print("Writing SRHMAT file: ", fname)
try:
fid = open(fname, 'w')
except IOError:
print('.srhmat error')
sys.exit()
fid.write('SRHMAT 30\n')
fid.write('NMaterials %d\n' % (nManningNZones + 1)) #+1 is because SRH-2D also counts the default Manning's n in srhhydro file.
#output MatName
for matID in range(nManningNZones):
fid.write('MatName %d \"%s\" \n' % (matID + 1, self.ManningNZones[matID][0].decode("utf-8"))) # +1 because SRH-2D is 1-based
#output cells in different material categories
for matID in range(nManningNZones):
if not self.cellsInManningZones[matID]: #this Manning's n zone has no cells
continue
fid.write('Material %d ' % (matID+1))
#loop over all cells in current Manning's n zone
for cellI in range(len(self.cellsInManningZones[matID])):
fid.write(" %d" % (self.cellsInManningZones[matID][cellI]+1)) #+1 because SRH-2D is 1-based
# 10 numbers per line or this is the last cell, start a new line
if (((cellI+1) % 10) == 0) or (cellI == (len(self.cellsInManningZones[matID])-1)):
fid.write("\n")
fid.close()
print("SRHMAT file exported successfully: ", fname)
[docs]
def exportBoundariesToVTK(self, boundaryVTKFileName, dir='', twoDAreaNumber = 0):
"""Export boundaries of 2D area to VTK (for visual inspection in Paraview and check the ID of NodeString)
Parameters
----------
boundaryVTKFileName : str
name of the VTK file for output
twoDAreaNumber : int, optional
2D flow area number (default is 0)
Returns
-------
"""
#only the boundaries of one 2D area is exported.
fname = dir + "/" + boundaryVTKFileName + '.vtk'
if gVerbose: print('Writing RAS2D mesh boundaries to', fname)
try:
fid = open(fname, 'w')
except IOError:
print('boundary vtk file open error')
sys.exit()
#output header
fid.write('# vtk DataFile Version 3.0\n')
fid.write('RAS2D boundaries\n')
fid.write('ASCII\n')
fid.write('DATASET POLYDATA\n')
fid.write('\n')
#output all points in the mesh
#(a short cut instead of output boundary points only; does not matter because
#this is for inspection purpose only)
#only the first 2D flow area
area = self.TwoDAreaNames[twoDAreaNumber]
# get the FacePoint coordinates (3D, the elevation is interpolated from terrain)
facePointsCoordinates = self.get2DAreaFacePointsCoordinates(area)
#total number of points in mesh
totalNumPoints = facePointsCoordinates.shape[0]
fid.write('POINTS %d float\n' % totalNumPoints)
#loop over all points
for k in range(facePointsCoordinates.shape[0]):
fid.write(" ".join(map(str, facePointsCoordinates[k])))
fid.write("\n")
#calculate total number of boundary points (all boundaries)
totalNumBoundaryPoints = 0
for k in range(self.totalBoundaries):
totalNumBoundaryPoints += self.boundaryTotalPoints[k]
#print("totalNumBoundaryPoints = ", totalNumBoundaryPoints)
#output lines (boundaries)
fid.write('\nLINES %d %d\n' % (self.totalBoundaries, self.totalBoundaries+totalNumBoundaryPoints))
for k in range(self.totalBoundaries):
fid.write('%d ' % self.boundaryTotalPoints[k])
fid.write(" ".join(map(str, self.boundaryPointList[k,:self.boundaryTotalPoints[k]])))
fid.write("\n")
#output line ID (as cell_data)
fid.write('\nCELL_DATA %d \n' % self.totalBoundaries)
fid.write('scalars BC_ID integer\n')
fid.write('LOOKUP_TABLE default\n')
for k in range(self.totalBoundaries):
fid.write('%d ' % (k+1)) #here k+1 is because SRH-2D is 1-based and RAS2D is 0-based.
fid.write('\n')
fid.close()
[docs]
def exportFaceProfilesToVTK(self, faceProfileVTKFileName, dir='', twoDAreaNumber = 0):
"""Export face profile of 2D area to VTK (for visual inspection in Paraview)
Parameters
----------
faceProfileVTKFileName : str
name of VTK file to write to
dir : str, optional
directory to write to
twoDAreaNumber : int, optional
2D flow area number (default is 0)
Returns
-------
"""
#check whether the face profiles have been created; if not,create them (slow calculation)
if len(self.TwoDAreaFaceProfile) == 0:
self.build2DAreaFaceProfile()
#only the face profiles of one 2D area is exported.
faceProfiles = self.TwoDAreaFaceProfile[twoDAreaNumber]
fname = dir + "/" + faceProfileVTKFileName + '.vtk'
if gVerbose: print('\nWriting all face profiles to', fname)
try:
fid = open(fname, 'w')
except IOError:
print('face profile vtk file open error')
sys.exit()
#output header
fid.write('# vtk DataFile Version 3.0\n')
fid.write('RAS2D face profiles\n')
fid.write('ASCII\n')
fid.write('DATASET POLYDATA\n')
fid.write('\n')
#output all points in the profiles (repetition of points is ok because
#this is for inspection purpose only)
#total number of points in face profiles = (number of faces X points per profile)
totalNumPoints = len(faceProfiles)*self.nFaceProfilePoints
fid.write('POINTS %d float\n' % totalNumPoints)
#loop over all points in all face profiles and export their coordinates
for faceI in range(len(faceProfiles)):
#loop over all points in current face profile
for pointI in range(self.nFaceProfilePoints):
#if pointI ==0: fid.write(" ".join(map(str, faceProfiles[faceI][pointI,:])))
fid.write(" ".join(map(str, faceProfiles[faceI][pointI,:])))
fid.write("\n")
#output lines (face profiles)
fid.write('\nLINES %d %d\n' % (len(faceProfiles), len(faceProfiles)+totalNumPoints))
profileBase = np.arange(0,self.nFaceProfilePoints)
currentLineStartingIndex = 0
for lineI in range(len(faceProfiles)):
fid.write('%d ' % self.nFaceProfilePoints)
fid.write(" ".join(map(str, profileBase + currentLineStartingIndex)))
fid.write("\n")
currentLineStartingIndex += self.nFaceProfilePoints
fid.close()
[docs]
def dump_all_data(self):
"""Dump all data to screen (for debugging purpose)
Returns
-------
"""
print("RAS2D_mesh_data class self-dump:")
print(" hdf_filename = ", self.hdf_filename)
print(" geometry_file_name = ", self.geometry_file_name)
print(" terrain_hdf_filename = ", self.terrain_hdf_filename)
print(" terrain_tiff_filename = ", self.terrain_tiff_filename)
print(" plan_filename = ", self.plan_filename)
print(" plan_name = ", self.plan_name)
print(" plan_shortID = ", self.plan_shortID)
print(" project_filename = ", self.project_filename)
print(" comp_interval = ", self.comp_interval)
print(" outp_interval = ", self.outp_interval)
print(" map_interval = ", self.map_interval)
print(" units = ", self.units)
print(" short_ID = ", self.short_ID)
print(" start_time = ", self.start_time)
print(" end_time = ", self.end_time)
print(" Dpart = ", self.Dpart)
print(" solution_time = ", self.solution_time)
print(" solution_timedates = ", self.solution_time_date)
print(" 2DAreaNames = ", self.TwoDAreaNames)
print(" 2DAreaCellCounts = ", self.TwoDAreaCellCounts)
print(" 2DAreaCellPoints = \n", self.TwoDAreaCellPoints)
print(" 2DAreaBoundaryPoints = \n", self.TwoDAreaBoundaryPoints)
print(" TwoDAreaCellDepth = \n", self.TwoDAreaCellDepth)
print(" TwoDAreaCellWSE = \n", self.TwoDAreaCellWSE)
print(" TwoDAreapointVx = \n", self.TwoDAreaPointVx)
print(" TwoDAreapointVy = \n", self.TwoDAreaPointVy)
print(" TwoDAreapointVz = \n", self.TwoDAreaPointVz)
def main():
""" Testing
Returns
-------
"""
my_ras_2d_data = RAS_2D_Data("Muncie2D.p01.hdf")
#print(my_ras_2d_data.TwoDAreaFace_FacePoints[0])
#my_ras_2d_data.saveHEC_RAS2D_results_to_VTK(timeStep=2)
#my_ras_2d_data.exportSRHGEOMFile("Muncie2D")
#my_ras_2d_data.exportSRHMATFile("Muncie2D")
#my_ras_2d_data.exportBoundariesToVTK("Muncie2D_boundaries")
my_ras_2d_data.exportFaceProfilesToVTK("Muncie2D_faceprofiles")
#dump all data to screen (debug)
#my_ras_2d_data.dump_all_data()
print("All done!")
if __name__ == "__main__":
main()