Source code for pyHMT2D.Hydraulic_Models_Data.RAS_2D.RAS_2D_Data

# -*- coding: utf-8 -*-
"""
Created on Sun Feb 28 09:17:08 2021

Convert HEC-RAS 2D mesh and data to SRH-2D. The generated SRHGEOM and SRHMAT files can be loaded in SMS. 
However, current version of SMS (v13.1) can only handle mesh cell shapes of triangles and quadrilaterals.
In HEC-RAS 2D, there could be polyhedrons with more than four sides. It seems SMS will discard extra
 sides and read the mesh wrong. Future versions of SMS may handle general polygons correctly.

Regardless, HEC-RAS 2D meshes can still be used with SRH-2D because the model itself can handle 
polygons. We simply have to prepare all the input files outside of SMS. And that is the purpose
 of this code.

The developement of this code used/modified some of the functions in HaD-to-Py (Tomkovic 2016), e.g., 
the reading of HEC-RAS's 2D result files using the HDF module. 

References:

L. A. Tomkovic (2016), HaD-to-Py, https://github.com/latomkovic/HaD-to-Py (accessed Feb. 2021)

Author: Xiaofeng Liu, PhD, PE  
Penn State University
"""

import sys
import numpy as np
import h5py
import vtk
from vtk.util import numpy_support as VN
from scipy import interpolate
import affine
import os.path
import copy

from pyHMT2D.Hydraulic_Models_Data import HydraulicData

from pyHMT2D.Misc import printProgressBar, vtkHandler, dumpXMDFFileItems
from pyHMT2D.__common__ import *

from .helpers import *

[docs] class RAS_2D_Data(HydraulicData): """A class for HEC-RAS 2D data I/O, manipulation, and format conversion This class is designed to read HEC-RAS results in HDF format. It can save RAS2D results into VTK format for visualization in Paraview, parse RAS2D mesh information and convert/save to SRH-2D format, query RAS2D results (e.g., for post-processing or analysis), etc. In HEC-RAS, one plan corresponds to one HDF file. So essentially, this class is designed to read one HDF file (one plan results). Attributes ---------- hdf_filename : str the name for the HDF result file generated by HEC-RAS. plan_filename : str the name for the HEC-RAS run plan file plan_name : str the name of the HEC-RAS run plan plan_shortID : str the short name of the run, called "Short Identifier" in HEC-RAS project_filename : str the name of the HEC-RAS project file geometry_file_name : str the name of the HEC-RAS geometry file Methods ------- """ def __init__(self, hdf_filename, terrain_tiff_filename=None): """RAS_2D_Data class constructor Parameters ---------- hdf_filename : str name of the HDF file that contains HEC-RAS results terrain_tiff_filename : str, optional name of the terrain TIFF file that contains the terrain data. If not provided, the terrain TIFF file name will be extracted from the HDF file. Note: It is not very clear how HEC-RAS organizes the terrain data in the HDF file regarding the terrain TIFF file names. """ HydraulicData.__init__(self, "HEC-RAS") #check the existence of the HDF file if not os.path.isfile(hdf_filename): print("The HDF file", hdf_filename, "does not exist. Make sure to run HEC-RAS simulation to generate the plan results HDF file before calling this function. Exiting ...") sys.exit() self.hdf_filename = hdf_filename self.plan_filename = None self.plan_name = None self.plan_shortID = None self.project_filename = None #geometry file name self.geometry_file_name = None #extract the plan information from the HDF file: plan_filename, plan_name, plan_shortID, project_filename self.extract_plan_information_from_hdf_file() #extract the geometry file name from the HDF file: geometry_file_name self.extract_geometry_file_name_from_hdf_file() if gVerbose: print("geometry_file_name = ", self.geometry_file_name) if terrain_tiff_filename is not None: if gVerbose: print("Using the provided terrain TIFF file: ", terrain_tiff_filename) self.terrain_hdf_filename = None #not used self.terrain_tiff_filename = terrain_tiff_filename #check the existence of the terrain TIFF file if not os.path.isfile(self.terrain_tiff_filename): print("The terrain TIFF file", self.terrain_tiff_filename, "does not exist. Exiting ...") sys.exit() else: if gVerbose: print("Extracting the terrain TIFF file name from the HDF file") #extract the terrain TIFF file name from the HDF file self.terrain_hdf_filename, self.terrain_tiff_filename = extract_terrain_file_names(self.geometry_file_name) #check the existence of the terrain TIFF file if not os.path.isfile(self.terrain_tiff_filename): print("The terrain TIFF file", self.terrain_tiff_filename, "does not exist. Exiting ...") sys.exit() #number of points for each face's profile (subgrid terrain) #HEC-RAS's HDF file does not export face profile, although in RAS Mapper #the face profile can be plotted and tabulated. It is not clear how these face #profiles are created. Here, we use a uniformed sampled terrain points on each face line. self.nFaceProfilePoints = 20 self.comp_interval = '' self.outp_interval = '' self.map_interval = '' #get intervals self.get_intervals() self.version ='' #version of HEC-RAS that produced this HDF result file self.units = '' #units used self.short_ID = '' #short_ID of the plan #extract version self.extract_version() #extract units self.extract_units() #start and end times self.start_time = '' self.end_time = '' self.Dpart = '' #extract start and end times self.extract_start_end_time() #get solution time series self.solution_time = self.get2DAreaSolutionTimes() self.solution_time_date = self.get2DAreaSolutionTimeDates() self.TwoDAreaNames = [] #get 2D Area names self.get2DAreaNames() self.TwoDAreaCellCounts = [] #get 2D area cell count self.get2DAreaCellCounts() #get 2D area cell points self.TwoDAreaCellPoints = self.get2DAreaCellPoints() #get 2D area boundary points #the content of the "External Faces" table in HDF, which #includes "BC Line ID", "Face Index", "FP Start Index", "FP End Index", #"Station Start", and "Station End" self.TwoDAreaBoundaryPoints = self.get2DAreaBoundaryPoints() #get 2D area boundary names, types, and other information #e.g., TwoDAreaBoundaryNamesTypes[0] is list with info on ["Name", "SA-2D", "Type", "Length"] self.TwoDAreaBoundaryNamesTypes = self.get2DAreaBoundaryNamesTypes() #build boundary information for 2D flow areas #totalBoundaries: total number of boundaries #boundaryIDList: a list boundary IDs (0, 1, 2, etc.) #boundaryNameList: a list of boundary names ("inet", "outlet", ...). Its order is the same as in boundaryIDList. #boundaryTypeList: a list of boundary types ("External", "Internal", etc). Its order is the same as in boundaryIDList. #boundary2DFlowAreaNameList: a list of 2D flow areas corresponding to each boundary. Its order is the same as in boundaryIDList. #boundaryTotalPoints: a list for the total number of points in each boundary. #boundaryPointList: list of points in each boundary self.totalBoundaries, self.boundaryIDList, self.boundaryNameList, self.boundary2DFlowAreaNameList,\ self.boundaryTypeList, self.boundaryTotalPoints, self.boundaryPointList = self.build2DAreaBoundaries() #get 2D area cell face points indexes #self.TwoDAreaCellFacePointsIndexes = self.get2DAreaCellFacePointsIndexes() #interpolator for elevation (bathymetry) #self.bilinterp = self.build2DInterpolatorFromGeoTiff(self.terrain_tiff_filename) #2D area face area elevation info and values self.TwoDAreaFaceAreaElevationInfo = [] self.TwoDAreaFaceAreaElevationValues = [] #list to store face hydraulic information: a list of lists self.TwoDAreaFaceHydraulicInformation = self.build2DAreaFaceHydraulicInformation() #list to store face points' coordinates list: a list of lists #e.g., TwoDAreaFacePointCoordinatesList[0] is a list of face point coordinates in 2D flow area 0 self.TwoDAreaFacePointCoordinatesList = [] self.build2DAreaFacePointCoordinatesList() #list to store cell face list: a list of lists #e.g., TwoDAreaCellFaceList[0][1] is a list of faces for cell 1 in 2D flow area 0 self.TwoDAreaCellFaceList = [] #build TwoDAreaCellFaceList self.build2DAreaCellFaceList() #land cover (Manning n) #This can be retrived from HDF's entries: [Geometry][Land Cover Filename] and [Geometry][Land Cover Layername] self.landcover_filename = b'' self.landcover_layername = b'' #Manning n zone dictionary: {ID: [name, Manning's n]} self.ManningNZones = {} self.build2DManningNZones() #cell IDs in each Manning's n zones (for srhmat output) self.cellsInManningZones = [] #2D area Manning's n at cell center: a list for all 2D areas #e.g., TwoDAreaCellManningN[0][1] is the Manning's n value for cell 1 in 2D flow area 0 self.TwoDAreaCellManningN = [] #Set Manning's n for each cell. There are two options. #Option 1: interpolte Manning's n from face to cell center: TwoDAreaCellManningN # This option is not accurate. Better use option 2. #self.interpolateManningN_face_to_cell() #Option 2: set Manning's n for cells from HEC-RAS's Manning's n GeoTiff and HDF files self.buildCellManningNFromGeoTiffHDF() #Porosity and Flow Drag self.porosity_and_flow_drag_filename = b'' self.porosity_and_flow_drag_layername = b'' #list to store face's two FacePoints #e.g., TwoDAreaFace_FacePoints[0][1] is a list two FacePoint IDs for face 1 in 2D flow area 0 self.TwoDAreaFace_FacePoints = [] #build TwoDAreaFace_FacePoints self.buildFace_FacePoints() #list to store face profile (subgrid terrain). On each face, number of points is self.nFaceProfilePoints. #e.g., TwoDAreaFaceProfile[0][1][0:nFaceProfilePoints,3] stores the profile for face 1 in 2D flow area 0 self.TwoDAreaFaceProfile = [] #build face profile #self.build2DAreaFaceProfile() #By default, HEC-RAS 2D outputs the following time series solution data in HDF file # - Depth (in each cell, including the ghost cells) # - Node X Vel (node velocity in x-direction) # - Node Y Vel (node velocity in y-direction) # - Water Surface (water surface elevation in each cell, including the ghost cells) # - other solution variables at face centers (don't know how to deal with them yet) # load solutions for 2D areas self.TwoDAreaCellDepth = [] self.TwoDAreaCellWSE = [] self.TwoDAreaCellVx = [] self.TwoDAreaCellVy = [] self.TwoDAreaCellVz = [] self.TwoDAreaPointVx = [] self.TwoDAreaPointVy = [] self.TwoDAreaPointVz = [] self.load2DAreaSolutions()
[docs] def extract_geometry_file_name_from_hdf_file(self): """Extract the geometry file name from the HDF file """ hf = h5py.File(self.hdf_filename,'r') self.geometry_file_name = hf['Plan Data']['Plan Information'].attrs["Geometry Filename"].decode() hf.close()
[docs] def extract_plan_information_from_hdf_file(self): """Extract the plan information from the HDF file """ hf = h5py.File(self.hdf_filename,'r') self.plan_filename = hf['Plan Data']['Plan Information'].attrs["Plan Filename"].decode() self.plan_name = hf['Plan Data']['Plan Information'].attrs["Plan Name"].decode() self.plan_shortID = hf['Plan Data']['Plan Information'].attrs["Plan ShortID"].decode() self.project_filename = hf['Plan Data']['Plan Information'].attrs["Project Filename"].decode() hf.close() # HEC-RAS stores the project file as an absolute path. If the case folder was moved # after the run, that path is wrong. Check whether the project file exists in the # same directory as the HDF file; if yes, point project_filename there; otherwise error. self.project_file_name = os.path.basename(self.project_filename) hdf_dir = os.path.dirname(os.path.abspath(self.hdf_filename)) if not hdf_dir: hdf_dir = os.curdir project_path_next_to_hdf = os.path.join(hdf_dir, self.project_file_name) if gVerbose: print("project_filename (from HDF) = ", self.project_filename) if not os.path.isfile(project_path_next_to_hdf): print("The project file " + self.project_file_name + " does not exist in the directory of the HDF file (" + hdf_dir + "). Move the case folder or run HEC-RAS in this location. Exiting ...") sys.exit() self.project_filename = os.path.abspath(project_path_next_to_hdf) if gVerbose: print("project_filename (adjusted) = ", self.project_filename)
[docs] def extract_version(self): """Get the version of HEC-RAS that produced this file Version of HEC-RAS, such as "5.0.7", "6.0.0", "6.1.0" Parameters ----------- """ #check whether the hdf file exists: if not, try the plan file. if not os.path.isfile(self.hdf_filename): print("The HDF file " + self.hdf_filename + " does not exist. Make sure to run HEC-RAS before calling this function. Trying the plan file instead ...") #try the plan file if not os.path.isfile(self.plan_filename): print("The plan file " + self.plan_filename + " does not exist. ") self.version = 'Unknown' return else: with open(self.plan_filename,'r') as plan_file: for line in plan_file: if "Program Version=6.60" in line: self.version = '6.6' break elif "Program Version=6.20" in line: self.version = '6.2' break elif "Program Version=6.10" in line: self.version = '6.1' break return hf = h5py.File(self.hdf_filename, 'r') file_version = hf.attrs['File Version'] if b'5.0.7' in file_version: self.version = '5.0.7' elif b'6.0.0' in file_version: self.version = '6.0.0' elif b'6.1.0' in file_version: self.version = '6.1.0' elif b'6.2' in file_version: self.version = '6.2' elif b'6.6' in file_version: self.version = '6.6' else: #raise Exception("The file version of the HEC-RAS result file is not supported.") print("The file version of the HEC-RAS result file is not supported.") print(" Version of the HEC-RAS result file: ", file_version) print(" Supported versions: 5.0.7, 6.0.0, 6.1.0, 6.2, and 6.6") sys.exit(-1) #print("HEC-RAS file version = ", self.version) hf.close()
[docs] def extract_units(self): """Extract the units used in the HEC-RAS project Units used in HEC-RAS can be either 'Feet' or 'Meter'. Parameters ----------- """ if gVerbose: print("Extracting units from the HEC-RAS project file: ", self.project_filename) # open the HEC-RAS project file #print("self.project_filename = ", self.project_filename) with open(self.project_filename,'r') as project_file: lines = project_file.readlines() if "English Units" in lines[3]: self.units = 'Feet' elif "SI Units" in lines[3]: self.units = "Meter" else: self.units = "Unknown units" if gVerbose: print("Units = ", self.units)
[docs] def get_intervals(self): """ Get the computation, output, and map intervals """ #print(os.getcwd()) #check whether the plan file exists if not os.path.isfile(self.plan_filename): print("The HEC-RAS plan file", self.plan_filename, "does not exists. Exiting ...") sys.exit() comp_indicator = "Computation Interval=" outp_indicator = "Output Interval=" map_indicator = "Mapping Interval=" i=0 with open(self.plan_filename,'r') as plan_file: for line in plan_file: if comp_indicator in line: self.comp_interval = line[line.index(comp_indicator) + len(comp_indicator):].rpartition("\n")[0] i += 1 if i==3: break elif outp_indicator in line: self.outp_interval = line[line.index(outp_indicator) + len(outp_indicator):].rpartition("\n")[0] i += 1 if i==3: break elif map_indicator in line: self.map_interval = line[line.index(map_indicator) + len(map_indicator):].rpartition("\n")[0] i += 1 if i==3: break
[docs] def get_time_minutes(self, interval): """ pass the interval to this function to get the minute value Parameters ---------- interval : str internal in string format Returns ------- """ interval_to_minutes = {'1MIN': 1, '2MIN':2,'3MIN':3,'4MIN':4,'5MIN':5,'10MIN':10,'15MIN':15, '20MIN':20,'30MIN':30,'1HOUR':60} try: minutes = interval_to_minutes[interval] except: message = "You might need to modify the get_time_minutes() function so that there is a interval corresponding to your %s interval" %interval sys.exit(message) raise return minutes
[docs] def extract_start_end_time(self): """ Extract the start_time and end_time from the plan file Returns ------- """ #print("plan_filename = ", self.plan_filename) #print("plan_filename = ", self.plan_filename) time_indicator = "Simulation Date=" with open(self.plan_filename,'r') as plan_file: for line in plan_file: #if gVerbose: print("line = ", line) if time_indicator in line: time_line = line break times = time_line[time_line.index(time_indicator) + len(time_indicator):] self.start_time = times[0:14] self.start_time = self.start_time.replace(","," ") self.end_time = times[15:] self.end_time = self.end_time.replace(","," ") self.Dpart = self.start_time[0:9] self.Dpart = list(self.Dpart) self.Dpart[0:2] = "01" self.Dpart = ''.join(self.Dpart)
[docs] def get2DAreaNames(self): """ Get the names of 2D Flow Areas in the HEC-RAS Plan's geometry Returns ------- """ hf = h5py.File(self.hdf_filename,'r') self.TwoDAreaNames = hf['Geometry']['2D Flow Areas']['Attributes']['Name'] #hdf2DFlow = hf['Geometry']['2D Flow Areas'] #for key in hdf2DFlow: # if key in ['Attributes', 'Cell Info', 'Cell Points', 'Manning\'s n',\ # 'Names', 'Polygon Info', 'Polygon Parts', 'Polygon Points',\ # 'Tolerances']: # continue # else: # self.TwoDAreaNames.append(key) # List of 2D Area names hf.close()
[docs] def get2DAreaCellCounts(self): """ Get 2D Flow Areas cell counts Returns ------- """ hf = h5py.File(self.hdf_filename,'r') self.TwoDAreaCellCounts = hf['Geometry']['2D Flow Areas']['Attributes']['Cell Count'] hf.close()
[docs] def get2DAreaCellPoints(self): """ Get 2D Flow Area cell points Returns ------- """ hf = h5py.File(self.hdf_filename,'r') hdf2DAreaCellPoints = np.array(hf['Geometry']['2D Flow Areas']['Cell Points']) #print(hdf2DAreaCellPoints) hf.close() return hdf2DAreaCellPoints
[docs] def get2DAreaBoundaryPoints(self): """ Get 2D Flow Area boundary points (all points on boundaries) Returns ------- hdf2DAreaBoundaryPoints : numpy.ndarray the content of the "External Faces" table in HDF, which include "BC Line ID", "Face Index", "FP Start Index", "FP End Index", "Station Start", and "Station End" """ hf = h5py.File(self.hdf_filename,'r') hdf2DAreaBoundaryPoints = np.array(hf['Geometry']['Boundary Condition Lines']['External Faces']) #print(hdf2DAreaBoundaryPoints) hf.close() return hdf2DAreaBoundaryPoints
[docs] def get2DAreaBoundaryNamesTypes(self): """ Get 2D Flow Area boundary names, which 2D area it belongs to, type (External or Internal), and Length Returns ------- hdf2DAreaBoundaryNamesTypes : list the content of the "Attributes" table in HDF, which includes "Name", "SA-2D", "Type", and "Length" """ hf = h5py.File(self.hdf_filename,'r') hdf2DAreaBoundaryNamesTypes = hf['Geometry']['Boundary Condition Lines']['Attributes'] #print(type(hdf2DAreaBoundaryNamesTypes)) #print(hdf2DAreaBoundaryNamesTypes) #print(hdf2DAreaBoundaryNamesTypes[0]) #temp list of lists: each is for one boundary [Name, SA-2D, Type, Length] temp_2DAreaBoundaryNamesTypes = [] for areaI in range(len(hdf2DAreaBoundaryNamesTypes)): temp_2DAreaBoundaryNamesTypes.append([hdf2DAreaBoundaryNamesTypes["Name"][areaI], hdf2DAreaBoundaryNamesTypes["SA-2D"][areaI], hdf2DAreaBoundaryNamesTypes["Type"][areaI], hdf2DAreaBoundaryNamesTypes["Length"][areaI] ]) hf.close() return temp_2DAreaBoundaryNamesTypes
[docs] def build2DAreaBoundaries(self): """ Build boundaries with their point list Returns ------- """ if gVerbose: print("Building HEC-RAS 2D area boundaries ...") maxNumBC = 10 #maximum number of boundaries (adjust according to the mesh) maxPointsPerBC = 1000 #maximum number of points per boundary boundaryIDList = np.zeros(maxNumBC,dtype=int) #list of boundary IDs boundaryPointList = np.zeros((maxNumBC,maxPointsPerBC),dtype=int) #list of point IDs on each boundary boundaryTotalPoints = np.zeros(maxNumBC,dtype=int) #total number of points in each boundary #read the first row of BoundaryPoints totalBoundaries = 0 #total number of boundaries currentBoundary = 0 #current BC counter pointCounterInCurrentBoundary = 0 #point counter in current boundary boundaryIDList[currentBoundary] = self.TwoDAreaBoundaryPoints[0][0] #first BC Line ID boundaryPointList[currentBoundary,0] = self.TwoDAreaBoundaryPoints[0][2] #first point boundaryPointList[currentBoundary,1] = self.TwoDAreaBoundaryPoints[0][3] #second point pointCounterInCurrentBoundary = 2 boundaryTotalPoints[currentBoundary] = 2 totalBoundaries = 1 #loop through row 2 to end of BoundaryPoints for k in range(self.TwoDAreaBoundaryPoints.shape[0]-1): if currentBoundary == self.TwoDAreaBoundaryPoints[k+1][0]: #still the old BC line boundaryPointList[currentBoundary,pointCounterInCurrentBoundary] = self.TwoDAreaBoundaryPoints[k+1][3] pointCounterInCurrentBoundary += 1 boundaryTotalPoints[currentBoundary] += 1 else: #a new BC line totalBoundaries += 1 currentBoundary += 1 boundaryIDList[currentBoundary] = self.TwoDAreaBoundaryPoints[k+1][0] #BC Line ID boundaryPointList[currentBoundary,0] = self.TwoDAreaBoundaryPoints[k+1][2] #first point boundaryPointList[currentBoundary,1] = self.TwoDAreaBoundaryPoints[k+1][3] #second point pointCounterInCurrentBoundary = 2 boundaryTotalPoints[currentBoundary] = 2 boundaryNameList = [] boundary2DFlowAreaNameList = [] boundaryTypeList = [] for k in range(len(self.TwoDAreaBoundaryNamesTypes)): boundaryNameList.append(self.TwoDAreaBoundaryNamesTypes[k][0]) boundary2DFlowAreaNameList.append(self.TwoDAreaBoundaryNamesTypes[k][1]) boundaryTypeList.append(self.TwoDAreaBoundaryNamesTypes[k][2]) #print("Total number of boundaries: ", totalBoundaries) #print("Boundary ID list: ",boundaryIDList) #print("boundaryTotalPoints: ", boundaryTotalPoints) #print("boundaryPointList: ", boundaryPointList) return totalBoundaries, boundaryIDList, boundaryNameList, boundary2DFlowAreaNameList,\ boundaryTypeList, boundaryTotalPoints, boundaryPointList
[docs] def build2DManningNZones(self): """ Build 2D flow area's Manning n zones from the land cover (Manning's n) HDF file Assume that there is "Land Cover Filename" and "Land Cover Layername" in the result HDF file. If they don't exist (mostly for the case that the whole domain uses the default, constant Manning's n value in ["Geometry"]["2D Flow Areas"][0][1]), just use that constant. Returns ------- """ hf = h5py.File(self.hdf_filename, 'r') #check whether 'Land Cover Filename' and 'Land Cover Layername' are in the attributes if 'Land Cover Filename' not in hf['Geometry'].attrs.keys() or \ 'Land Cover Layername' not in hf['Geometry'].attrs.keys(): self.ManningNZones = {} # clear the dictionary just in case # take the default Manning's n value in row 0 and column 1. self.ManningNZones[0] = [b'default', hf['Geometry']['2D Flow Areas']['Attributes'][0][1]] hf.close() return self.landcover_filename = hf['Geometry'].attrs['Land Cover Filename'] self.landcover_layername = hf['Geometry'].attrs['Land Cover Layername'] if gVerbose: print("Land Cover Filename = ", self.landcover_filename) if gVerbose: print("Land Cover Layername = ", self.landcover_layername) #Some time HEC-RAS does not save land cover filename and layername to HDF because the #geometry association of terrain or land cover (Manning's n) is removed after the 2D area geometry #computation has been done. if len(self.landcover_filename) == 0 or len(self.landcover_layername) == 0: print("Land Cover Filename or Land Cover Layername in result HDF is empty. " "Will use the default Manning's n value.") self.ManningNZones = {} # clear the dictionary just in case #take the default Manning's n value in row 0 and column 1. self.ManningNZones[0] = [b'default', hf['Geometry']['2D Flow Areas']['Attributes'][0][1]] else: # read the Manning n zones (land cover zones) self.ManningNZones = {} # clear the dictionary just in case # get the path of the HDF file. if len(os.path.dirname(self.hdf_filename))==0: fileBase = b'' else: fileBase = str.encode(os.path.dirname(self.hdf_filename) + '/') #The way the land cover and Manning's n information stored in v5 and v6 is different. if self.version == '5.0.7': hfManningN = h5py.File(fileBase + self.landcover_layername + b'.hdf', 'r') #hfManningN = h5py.File(fileBase + self.landcover_filename, 'r') dset = hfManningN['IDs'] with dset.astype(np.uint8): IDs = dset[:] ManningN = np.array(hfManningN['ManningsN']) Names = hfManningN['Names'] # print("IDs =", IDs) # print("ManningN =", ManningN) # print("Names =", Names) for i in range(len(IDs)): self.ManningNZones[IDs[i]] = [Names[i], ManningN[i]] hfManningN.close() elif self.version == '6.0.xxx': #not used any more; somehow new (>6) versions of HEC-RAS messed up hfManningN = h5py.File(fileBase + self.landcover_layername + b'.hdf', 'r') #hfManningN = h5py.File(fileBase + self.landcover_filename, 'r') dset_raster_map = hfManningN['Raster Map'] dset_variables = hfManningN['Variables'] #with dset_raster_map['ID'].astype(np.uint8): IDs = dset_raster_map['ID'] ManningN = np.array(dset_variables['ManningsN']) Names = dset_variables['Name'] # print("IDs =", IDs) # print("ManningN =", ManningN) # print("Names =", Names) for i in range(len(IDs)): self.ManningNZones[IDs[i]] = [Names[i], ManningN[i]] hfManningN.close() elif self.version == '6.1.0' or self.version == '6.0.0' or self.version == '6.6': #Note: In v6 and above, it seems HEC-RAS can handle different formats of Land Cover (Manning's n) HDF # file for backward compatability. For example, if a HEC-RAS case was originally created by v5.0.7, the # Land Cover HDF file has "IDs, ManningsN, and Names". But for cases created by v6 and later, the # Land Cover HDF fiel has "Raster Map, Variables". #check which hdf file exists: In HEC-RAS, depending on how the land cover (Manning's n) HDF file is created, the hdf file may in different directories. if os.path.isfile(fileBase + self.landcover_layername + b'.hdf'): hfManningN = h5py.File(fileBase + self.landcover_layername + b'.hdf', 'r') elif os.path.isfile(fileBase + self.landcover_filename): hfManningN = h5py.File(fileBase + self.landcover_filename, 'r') else: raise Exception("The land cover (Manning's n) HDF file " + fileBase + self.landcover_layername + b'.hdf' + " or " + fileBase + self.landcover_filename + " does not exist. Please check.") if (("Raster Map" in hfManningN.keys()) and ("Variables" in hfManningN.keys())): #for new version >= 6 dset_raster_map = hfManningN['Raster Map'] dset_variables = hfManningN['Variables'] # with dset_raster_map['ID'].astype(np.uint8): IDs = dset_raster_map['ID'] ManningN = np.array(dset_variables['ManningsN']) Names = dset_variables['Name'] # print("IDs =", IDs) # print("ManningN =", ManningN) # print("Names =", Names) for i in range(len(IDs)): self.ManningNZones[IDs[i]] = [Names[i], ManningN[i]] elif (("IDs" in hfManningN.keys()) and ("ManningsN" in hfManningN.keys()) and ("Names" in hfManningN.keys())): dset = hfManningN['IDs'] # with dset.astype(np.uint8): # IDs = dset[:] IDs = dset.astype(np.uint8)[:] ManningN = np.array(hfManningN['ManningsN']) Names = hfManningN['Names'] # print("IDs =", IDs) # print("ManningN =", ManningN) # print("Names =", Names) for i in range(len(IDs)): self.ManningNZones[IDs[i]] = [Names[i], ManningN[i]] else: hfManningN.close() hf.close() raise Exception("The format of the land cover (Manning's n) layer in the HDF file is not supported. " "Please contact the developer of pyHMT2D or raise an issue on GitHub.") hf.close() else: hf.close() raise Exception("The version of HEC-RAS that produced this HDF result file is not supported.") if gVerbose: print("self.ManningNZones = ", self.ManningNZones) hf.close()
[docs] def change_ManningsN(self, materialIDs, newManningsNValues, materialNames): """ Change materialID's Mannings n values to new values and save to file (for HEC-RAS v5, v6.1.0, and v6.6) Parameters ---------- materialIDs newManningsNValues materialNames Returns ------- """ #get land cover (Manning's n) file name and layer name hf = h5py.File(self.hdf_filename, 'r') self.landcover_filename = hf['Geometry'].attrs['Land Cover Filename'] self.landcover_layername = hf['Geometry'].attrs['Land Cover Layername'] hf.close() if gVerbose: print("Land Cover Filename = ", self.landcover_filename) if gVerbose: print("Land Cover Layername = ", self.landcover_layername) #Some time HEC-RAS does not save land cover filename and layername to HDF because the #geometry association of terrain or land cover (Manning's n) is removed after the 2D area geometry #computation has been done. if len(self.landcover_filename) == 0 or len(self.landcover_layername) == 0: print("Land Cover Filename or Land Cover Layername in result HDF is empty. " "Will use the default Manning's n value.") raise Exception("Modification of default constant Manning's n has not been implemented.") else: # read the Manning n zones (land cover zones) if os.path.dirname(self.hdf_filename) == '': fileBase = b'' else: fileBase = str.encode(os.path.dirname(self.hdf_filename) + '/') #check which hdf file exists: In HEC-RAS, depending on how the land cover (Manning's n) HDF file is created, the hdf file may in different directories. if os.path.isfile(fileBase + self.landcover_layername + b'.hdf'): hfManningN = h5py.File(fileBase + self.landcover_layername + b'.hdf', 'r+') elif os.path.isfile(fileBase + self.landcover_filename): hfManningN = h5py.File(fileBase + self.landcover_filename, 'r+') else: raise Exception("The land cover (Manning's n) HDF file " + fileBase + self.landcover_layername + b'.hdf' + " or " + fileBase + self.landcover_filename + " does not exist. Please check.") dset = hfManningN['IDs'] #with dset.astype(np.uint8): # IDs = dset[:] IDs = dset.astype(np.uint8) IDs = IDs[:].tolist() ManningN_dataset = hfManningN['ManningsN'] ManningN = np.array(ManningN_dataset) Names = hfManningN['Names'] # make a copy of the original Manning's n values ManningN_new = copy.deepcopy(ManningN) # print("IDs =", IDs) # print("ManningN =", ManningN) # print("Names =", Names) for i in range(len(materialIDs)): materialID = materialIDs[i] if materialID in IDs: # also check whether the name is consistent if materialNames[i] == Names[IDs.index(materialID)].decode("ASCII"): if gVerbose: print(" Old Manning's n value =", ManningN[IDs.index(materialID)], "for material ID = ", materialID) ManningN_new[IDs.index(materialID)] = newManningsNValues[i] if gVerbose: print(" New Manning's n value =", ManningN_new[IDs.index(materialID)], "for material ID = ", materialID) else: raise Exception( "The materialI and material name are not consistent. Please make sure they are consistent with HEC-RAS case." "You can check the content of the Manning's n HDF file with HDFViewer.") else: raise Exception( "The specified materialID %d is not in the Manning's n list. Please check." % materialID) ManningN_dataset[...] = ManningN_new # assign new values to data # save and close the HDF file hfManningN.close() # need to re-build 2D Manning's n zones information after update self.build2DManningNZones()
[docs] def change_ManningsN_v60(self, materialIDs, newManningsNValues, materialNames): """ Change materialID's Mannings n values to new values and save to file (for HEC-RAS v6.0.0) Parameters ---------- materialIDs newManningsNValues materialNames Returns ------- """ # get land cover (Manning's n) file name and layer name hf = h5py.File(self.hdf_filename, 'r') self.landcover_filename = hf['Geometry'].attrs['Land Cover Filename'] self.landcover_layername = hf['Geometry'].attrs['Land Cover Layername'] hf.close() if gVerbose: print("Land Cover Filename = ", self.landcover_filename) if gVerbose: print("Land Cover Layername = ", self.landcover_layername) # Some time HEC-RAS does not save land cover filename and layername to HDF because the # geometry association of terrain or land cover (Manning's n) is removed after the 2D area geometry # computation has been done. if len(self.landcover_filename) == 0 or len(self.landcover_layername) == 0: print("Land Cover Filename or Land Cover Layername in result HDF is empty. " "Will use the default Manning's n value.") raise Exception("Modification of default constant Manning's n has not been implemented.") else: # read the Manning n zones (land cover zones) if os.path.dirname(self.hdf_filename) == '': fileBase = b'' else: fileBase = str.encode(os.path.dirname(self.hdf_filename) + '/') #check which hdf file exists: In HEC-RAS, depending on how the land cover (Manning's n) HDF file is created, the hdf file may in different directories. if os.path.isfile(fileBase + self.landcover_layername + b'.hdf'): hfManningN = h5py.File(fileBase + self.landcover_layername + b'.hdf', 'r+') elif os.path.isfile(fileBase + self.landcover_filename): hfManningN = h5py.File(fileBase + self.landcover_filename, 'r+') else: raise Exception("The land cover (Manning's n) HDF file " + fileBase + self.landcover_layername + b'.hdf' + " or " + fileBase + self.landcover_filename + " does not exist. Please check.") dset_raster_map = hfManningN['Raster Map'] dset_variables = hfManningN['Variables'] #This dataset needs to be modified because ManningsN is in it. # with dset_raster_map['ID'].astype(np.uint8): IDs = dset_raster_map['ID'] IDs = IDs.tolist() ManningN = np.array(dset_variables['ManningsN']) Names = dset_variables['Name'] # make a copy of the original Manning's n values #ManningN_new = copy.deepcopy(ManningN) dset_variables_new = copy.deepcopy(dset_variables) # print("IDs =", IDs) # print("ManningN =", ManningN) # print("Names =", Names) for i in range(len(materialIDs)): materialID = materialIDs[i] if materialID in IDs: # also check whether the name is consistent if materialNames[i] == Names[IDs.index(materialID)].decode("ASCII"): if gVerbose: print(" Old Manning's n value =", dset_variables['ManningsN'][IDs.index(materialID)], "for material ID = ", materialID) dset_raster_map['ManningsN'][IDs.index(materialID)] = newManningsNValues[i] if gVerbose: print(" New Manning's n value =", dset_variables_new['ManningsN'][IDs.index(materialID)], "for material ID = ", materialID) else: raise Exception( "The materialI and material name are not consistent. Please make sure they are consistent with HEC-RAS case." "You can check the content of the Manning's n HDF file with HDFViewer.") else: raise Exception( "The specified materialID %d is not in the Manning's n list. Please check." % materialID) dset_variables[...] = dset_variables_new # assign new values to data # save and close the HDF file hfManningN.close() # need to re-build 2D Manning's n zones information after update self.build2DManningNZones()
[docs] def change_ManningsN_v66(self, materialIDs, newManningsNValues, materialNames): """ Change materialID's Mannings n values to new values and save to file (for HEC-RAS v6.6) Parameters ---------- materialIDs newManningsNValues materialNames Returns ------- """ # get land cover (Manning's n) file name and layer name hf = h5py.File(self.hdf_filename, 'r') self.landcover_filename = hf['Geometry'].attrs['Land Cover Filename'] self.landcover_layername = hf['Geometry'].attrs['Land Cover Layername'] hf.close() if gVerbose: print("Land Cover Filename = ", self.landcover_filename) if gVerbose: print("Land Cover Layername = ", self.landcover_layername) # Some time HEC-RAS does not save land cover filename and layername to HDF because the # geometry association of terrain or land cover (Manning's n) is removed after the 2D area geometry # computation has been done. if len(self.landcover_filename) == 0 or len(self.landcover_layername) == 0: print("Land Cover Filename or Land Cover Layername in result HDF is empty. " "Will use the default Manning's n value.") raise Exception("Modification of default constant Manning's n has not been implemented.") else: # read the Manning n zones (land cover zones) if os.path.dirname(self.hdf_filename) == '': fileBase = b'' else: fileBase = str.encode(os.path.dirname(self.hdf_filename) + '/') #check which hdf file exists: In HEC-RAS, depending on how the land cover (Manning's n) HDF file is created, the hdf file may in different directories. if os.path.isfile(fileBase + self.landcover_layername + b'.hdf'): hfManningN = h5py.File(fileBase + self.landcover_layername + b'.hdf', 'r+') elif os.path.isfile(fileBase + self.landcover_filename): hfManningN = h5py.File(fileBase + self.landcover_filename, 'r+') else: raise Exception("The land cover (Manning's n) HDF file " + fileBase + self.landcover_layername + b'.hdf' + " or " + fileBase + self.landcover_filename + " does not exist. Please check.") dset_raster_map = hfManningN['Raster Map'] dset_variables = hfManningN['Variables'] #This dataset needs to be modified because ManningsN is in it. if gVerbose: print("Dataset shape:", dset_variables['ManningsN'].shape) print("Dataset dtype:", dset_variables['ManningsN'].dtype) # with dset_raster_map['ID'].astype(np.uint8): IDs = dset_raster_map['ID'] IDs = IDs.tolist() ManningN = np.array(dset_variables['ManningsN']) Names = dset_variables['Name'] # make a copy of the original Manning's n values #ManningN_new = copy.deepcopy(ManningN) #dset_variables_new = copy.deepcopy(dset_variables) if gVerbose: print("IDs =", IDs) print("ManningN =", ManningN) print("Names =", Names) print("newManningsNValues =", newManningsNValues) print("materialNames =", materialNames) for i in range(len(materialIDs)): materialID = materialIDs[i] if materialID in IDs: # also check whether the name is consistent if materialNames[i] == Names[IDs.index(materialID)].decode("ASCII"): if gVerbose: print(" Old Manning's n value =", dset_variables['ManningsN'][IDs.index(materialID)], "for material ID = ", materialID) #print("IDs.index(materialID) = ", IDs.index(materialID)) #update the ManningsN value in the dset_variables dataset #ManningN[IDs.index(materialID)] = newManningsNValues[i] #dset_variables['ManningsN'][IDs.index(materialID)] = newManningsNValues[i] #This is the correct way to update hdf5 data row_index = IDs.index(materialID) row = dset_variables[row_index] # Fetch full structured row row["ManningsN"] = newManningsNValues[i] # Modify the field dset_variables[row_index] = row # Write row back into dataset if gVerbose: print(" New Manning's n value =", dset_variables['ManningsN'][IDs.index(materialID)], "for material ID = ", materialID) else: print("The materialI and material name are not consistent. Please make sure they are consistent with HEC-RAS case.") print("materialID = ", materialID) print("materialNames[i] = ", materialNames[i]) print("Names[IDs.index(materialID)] = ", Names[IDs.index(materialID)]) raise Exception( "The materialI and material name are not consistent. Please make sure they are consistent with HEC-RAS case." "You can check the content of the Manning's n HDF file with HDFViewer.") else: raise Exception( "The specified materialID %d is not in the Manning's n list. Please check." % materialID) #dset_variables['ManningsN'][:] = ManningN # assign new values to data # save and close the HDF file hfManningN.close() # need to re-build 2D Manning's n zones information after update self.build2DManningNZones()
[docs] def modify_ManningsN(self, materialIDs, newManningsNValues, materialNames): """Modify materialID's Manning's n values to new values Assume that there is "Land Cover Filename" and "Land Cover Layername" in the result HDF file. If they don't exist (mostly for the case that the whole domain uses the default, constant Manning's n value in ["Geometry"]["2D Flow Areas"][0][1]), just use newManningsNValues[0]. Parameters ---------- materialIDs : list material ID list (0-based) newManningsNValues : list new Manning's n values in a list materialNames : list names of the material in a list (0-based) Returns ------- """ if gVerbose: print("Modify Manning's n value ...") if not isinstance(materialIDs[0], int): print("Material ID has to be an integer. The type of materialID passed in is ", type(materialIDs[0]), ". Exit.\n") if not isinstance(newManningsNValues[0], float): print("Manning's n has to be a float. The type of newManningsNValue passed in is ", type(newManningsNValues[0]), ". Exit.\n") if self.version == '5.0.7' or self.version == '6.1.0' or self.version == '6.6': self.change_ManningsN(materialIDs, newManningsNValues, materialNames) elif self.version == '6.0.0': #v6.0.0 is special self.change_ManningsN_v60(materialIDs, newManningsNValues, materialNames) #elif self.version == '6.6': #v6.6 is special # self.change_ManningsN_v66(materialIDs, newManningsNValues, materialNames) if gVerbose: print("Finished modifying Manning's n value ...")
[docs] def modify_PorosityDrag_b_Coefficient(self, materialIDs, newPorosityDrag_b_Coefficients, materialNames): """Modify materialID's PorosityDrag_b_Coefficient values to new values Assume that there is "Porosity and Flow Drag Filename" and "Porosity and Flow Drag Layername" in the result HDF file. Parameters ---------- materialIDs : list material ID list newPorosityDrag_b_Coefficients : list new PorosityDrag_b_Coefficient values in a list materialNames : list names of the material in a list Returns ------- """ if gVerbose: print("Modify PorosityDrag_b_Coefficient value ...") if not isinstance(materialIDs[0], int): print("Material ID has to be an integer. The type of materialID passed in is ", type(materialIDs[0]), ". Exit.\n") if not isinstance(newPorosityDrag_b_Coefficients[0], float): print("PorosityDrag_b_Coefficient has to be a float. The type of newPorosityDrag_b_CoefficientValue passed in is ", type(newPorosityDrag_b_Coefficients[0]), ". Exit.\n") if self.version == '6.6': self.change_PorosityDrag_b_Coefficient_v66(materialIDs, newPorosityDrag_b_Coefficients, materialNames) else: raise Exception("PorosityDrag_b_Coefficient modification is not supported for HEC-RAS version ", self.version) if gVerbose: print("Finished modifying PorosityDrag_b_Coefficient value ...")
[docs] def change_PorosityDrag_b_Coefficient_v66(self, materialIDs, newPorosityDrag_b_Coefficients, materialNames): """ Change materialID's PorosityDrag_b_Coefficient values to new values and save to file (for HEC-RAS v6.6) Parameters ---------- materialIDs newManningsNValues materialNames Returns ------- """ # get Porosity and Flow Drag file name and layer name hf = h5py.File(self.hdf_filename, 'r') self.porosity_and_flow_drag_filename = hf['Geometry'].attrs['Porosity and Flow Drag Filename'] self.porosity_and_flow_drag_layername = hf['Geometry'].attrs['Porosity and Flow Drag Layername'] hf.close() if gVerbose: print("Porosity and Flow Drag Filename = ", self.porosity_and_flow_drag_filename) if gVerbose: print("Porosity and Flow Drag Layername = ", self.porosity_and_flow_drag_layername) # Some time HEC-RAS does not save Porosity and Flow Drag filename and layername to HDF because the # geometry association of Porosity and Flow Drag is removed after the 2D area geometry # computation has been done. if len(self.porosity_and_flow_drag_filename) == 0 or len(self.porosity_and_flow_drag_layername) == 0: print("Porosity and Flow Drag Filename or Porosity and Flow Drag Layername in result HDF is empty. ") raise Exception("Modification of PorosityDrag_b_Coefficient cannot proceed.") else: # read the Porosity and Flow Drag zones if os.path.dirname(self.hdf_filename) == '': fileBase = b'' else: fileBase = str.encode(os.path.dirname(self.hdf_filename) + '/') #check which hdf file exists: In HEC-RAS, depending on how the Porosity and Flow Drag HDF file is created, the hdf file may in different directories. if os.path.isfile(fileBase + self.porosity_and_flow_drag_layername + b'.hdf'): hfPorosityDrag = h5py.File(fileBase + self.porosity_and_flow_drag_layername + b'.hdf', 'r+') elif os.path.isfile(fileBase + self.porosity_and_flow_drag_filename): hfPorosityDrag = h5py.File(fileBase + self.porosity_and_flow_drag_filename, 'r+') else: raise Exception("The Porosity and Flow Drag HDF file " + fileBase + self.porosity_and_flow_drag_layername + b'.hdf' + " or " + fileBase + self.porosity_and_flow_drag_filename + " does not exist. Please check.") dset_raster_map = hfPorosityDrag['Raster Map'] dset_variables = hfPorosityDrag['Variables'] #This dataset needs to be modified because PorosityDrag_b_Coefficient is in it. if gVerbose: print("Dataset shape:", dset_variables['Quadratic Drag Coefficient'].shape) print("Dataset dtype:", dset_variables['Quadratic Drag Coefficient'].dtype) # with dset_raster_map['ID'].astype(np.uint8): IDs = dset_raster_map['ID'] IDs = IDs.tolist() PorosityDrag_b_Coefficient = np.array(dset_variables['Quadratic Drag Coefficient']) Names = dset_variables['Name'] if gVerbose: print("IDs =", IDs) print("PorosityDrag_b_Coefficient =", PorosityDrag_b_Coefficient) print("Names =", Names) print("newPorosityDrag_b_Coefficients =", newPorosityDrag_b_Coefficients) print("materialNames =", materialNames) for i in range(len(materialIDs)): materialID = materialIDs[i] if materialID in IDs: # also check whether the name is consistent if materialNames[i] == Names[IDs.index(materialID)].decode("ASCII"): if gVerbose: print(" Old PorosityDrag_b_Coefficient value =", dset_variables['Quadratic Drag Coefficient'][IDs.index(materialID)], "for material ID = ", materialID) #print("IDs.index(materialID) = ", IDs.index(materialID)) #update the PorosityDrag_b_Coefficient value in the dset_variables dataset #PorosityDrag_b_Coefficient[IDs.index(materialID)] = newPorosityDrag_b_Coefficients[i] #dset_variables['Quadratic Drag Coefficient'][IDs.index(materialID)] = newPorosityDrag_b_Coefficients[i] #This is the correct way to update hdf5 data row_index = IDs.index(materialID) row = dset_variables[row_index] # Fetch full structured row row["Quadratic Drag Coefficient"] = newPorosityDrag_b_Coefficients[i] # Modify the field dset_variables[row_index] = row # Write row back into dataset if gVerbose: print(" New PorosityDrag_b_Coefficient value =", dset_variables['Quadratic Drag Coefficient'][IDs.index(materialID)], "for material ID = ", materialID) else: print("The materialI and material name are not consistent. Please make sure they are consistent with HEC-RAS case.") print("materialID = ", materialID) print("materialNames[i] = ", materialNames[i]) print("Names[IDs.index(materialID)] = ", Names[IDs.index(materialID)]) raise Exception( "The materialI and material name are not consistent. Please make sure they are consistent with HEC-RAS case." "You can check the content of the Porosity and Flow Drag HDF file with HDFViewer.") else: raise Exception( "The specified materialID %d is not in the Porosity and Flow Drag list. Please check." % materialID) #dset_variables['Quadratic Drag Coefficient'][:] = PorosityDrag_b_Coefficient # assign new values to data # save and close the HDF file hfPorosityDrag.close()
# need to re-build 2D Porosity and Flow Drag zones information after update (not neede? If the hdf file is updated, HEC-RAS will recompute geometric information.) #self.build2DPorosityDragZones()
[docs] def build2DInterpolatorFromGeoTiff(self, geoTiffFileName): """ Build 2D interpolator for from geoTiff file, e.g., terrain and Manning n ID, using Python interpolate.interp2d Assume the data is in band 1. This method is too slow (use with caution if the case is large). Attributes ------- geoTiffFileName : str name for the geoTiff file Returns ------- """ if gVerbose: print('Building 2D interpolator from GeoTiff file ...') try: from osgeo import gdal except ImportError: raise ImportError('Error in importing GDAL package. Make sure GDAL has been installed properly.') # Read raster source = gdal.Open(geoTiffFileName,gdal.GA_ReadOnly) #print(source) # Read the raster band as separate variable #band = source.GetRasterBand(1) # Print only selected metadata: #print ("[ NO DATA VALUE ] = ", band.GetNoDataValue()) # none #print ("[ MIN ] = ", band.GetMinimum()) #print ("[ MAX ] = ", band.GetMaximum()) nx, ny = source.RasterXSize, source.RasterYSize gt = source.GetGeoTransform() band_array = source.GetRasterBand(1).ReadAsArray().astype(np.float64) # Compute mid-point grid spacings ax = np.array([gt[0] + ix*gt[1] + gt[1]/2.0 for ix in range(nx)]) ay = np.array([gt[3] + iy*gt[5] + gt[5]/2.0 for iy in range(ny)]) bilinterp = interpolate.interp2d(ax, ay, band_array, kind='linear') #close the GDAL file source = None return bilinterp
[docs] def interpolatorFromGeoTiff_rasterio(self, geoTiffFileName, pointList, dataType=np.float64): """Interpolate from a GeoTIFF file using rasterio (preferred on Windows). Falls back to :meth:`interpolatorFromGeoTiff` (GDAL-based) if rasterio is not available or if opening the file fails. Parameters ---------- geoTiffFileName: str name of the GeoTiff file pointList: list list of points (x,y) dataType : int, optional data type in the GeoTiff (default is numpy.float) Returns ------- """ try: import rasterio except ImportError: # rasterio not available; use the existing GDAL-based implementation return self.interpolatorFromGeoTiff(geoTiffFileName, pointList, dataType=dataType) # rasterio expects a string path, but some HEC-RAS metadata # are stored as bytes (e.g., b'.\\ManningNFromZonePolygons.tif'). if isinstance(geoTiffFileName, bytes): geoTiffFileName = geoTiffFileName.decode('utf-8') # check whether the geoTiff file exists if not os.path.isfile(geoTiffFileName): print("The geoTiff file", geoTiffFileName, "does not exists. Exiting ...") sys.exit() try: with rasterio.open(geoTiffFileName) as src: data_array = src.read(1) forward_transform = src.transform nodata = src.nodata except Exception: # Any issue with rasterio, fall back to the GDAL-based implementation return self.interpolatorFromGeoTiff(geoTiffFileName, pointList, dataType=dataType) # Replace nodata with a fill value so sampled points don't get -9999 (or similar). # Use the minimum valid value in the band so elevation stays plausible; if all nodata, use 0. if nodata is not None: valid = data_array[data_array != nodata] fill_value = float(np.min(valid)) if valid.size else 0.0 else: fill_value = None reverse_transform = ~forward_transform interpolatedValues = [] # loop through all points in the list for pointI in range(pointList.shape[0]): x = pointList[pointI,0] y = pointList[pointI,1] px, py = reverse_transform * (x, y) px, py = int(px + 0.5), int(py + 0.5) # make sure px and py are within the data_array bounds if px < 0 or px >= data_array.shape[1] or py < 0 or py >= data_array.shape[0]: if px < 0: px = 0 elif px >= data_array.shape[1]: px = data_array.shape[1] - 1 if py < 0: py = 0 elif py >= data_array.shape[0]: py = data_array.shape[0] - 1 # Sample with the pixel coordinates. Note that py should be first because # the index is [rows, columns] in a 2D grid in python # Return a scalar (like the original GDAL-based implementation), not a 0-D array. val = data_array[py, px] if fill_value is not None and (val == nodata or (np.isnan(nodata) and np.isnan(val))): val = fill_value interpolatedValues.append(val) return interpolatedValues
[docs] def interpolatorFromGeoTiff(self, geoTiffFileName, pointList, dataType=np.float64): """Interpolate from a GeoTiff file given a point list. Parameters ---------- geoTiffFileName: str name of the GeoTiff file pointList: list list of points (x,y) dataType : int, optional data type in the GeoTiff (default is numpy.float) Returns ------- """ try: from osgeo import gdal except ImportError: raise ImportError('Error in importing GDAL package. Make sure GDAL has been installed properly.') #check whether the geoTiff file exists if not os.path.isfile(geoTiffFileName): print("The geoTiff file", geoTiffFileName, "does not exists. Exiting ...") sys.exit() # Read raster source = gdal.Open(geoTiffFileName,gdal.GA_ReadOnly) #print(source) # Read the raster band as separate variable data_array = np.array(source.GetRasterBand(1).ReadAsArray()) #print(data_array.shape) # Print only selected metadata: #print ("[ NO DATA VALUE ] = ", band.GetNoDataValue()) # none #print ("[ MIN ] = ", band.GetMinimum()) #print ("[ MAX ] = ", band.GetMaximum()) forward_transform = affine.Affine.from_gdal(*source.GetGeoTransform()) reverse_transform = ~forward_transform interpolatedValues = [] #loop through all points in the list for pointI in range(pointList.shape[0]): x = pointList[pointI,0] y = pointList[pointI,1] px, py = reverse_transform * (x, y) px, py = int(px + 0.5), int(py + 0.5) #make sure px and py are within the data_array bounds if px < 0 or px >= data_array.shape[1] or py < 0 or py >= data_array.shape[0]: #if gVerbose: # print("Warning within the interpolatorFromGeoTiff() function of RAS_2D_Data.py: The point (", x, ",", y, ") is outside the data_array bounds. Using the boundary value instead ...") if px < 0: px = 0 elif px >= data_array.shape[1]: px = data_array.shape[1] - 1 if py < 0: py = 0 elif py >= data_array.shape[0]: py = data_array.shape[0] - 1 #Sample with the pixel coordinates. Note that py should be first because # the index is [rows, columns] in a 2@ grid in python interpolatedValues.append(data_array[py][px]) source = None return interpolatedValues
[docs] def get2DAreaCellFacePointsIndexes(self, area): """Get 2D Flow Area cell face points indexes for a specified 2D area Parameters ---------- area : str name of the area Returns ------- """ hf = h5py.File(self.hdf_filename,'r') hdf2DAreaCellFacePointsIndexes = np.array(hf['Geometry']['2D Flow Areas'][area] ['Cells FacePoint Indexes']) #print(hdf2DAreaCellFacePointsIndexes) hf.close() return hdf2DAreaCellFacePointsIndexes
[docs] def get2DAreaCellCenterCoordiantes(self, area): """Get 2D Flow Area cell center coordinates for a specified 2D area Note: these coordinates only have (x,y), no z. Parameters ---------- area : str name of area Returns ------- """ hf = h5py.File(self.hdf_filename, 'r') #in HEC-RAS v5.0.7 and earlier, the 'Cells Center Coordinate' contains also the ghost cell centers #Here, we only need the real centers, not the ghost. #Get 2D Flow Areas Attributes table twoDFlowAreaAttributs = hf['Geometry']['2D Flow Areas']['Attributes'] #index of the 2D flow area in the geometry (in case there are more than one 2D flow areas) areaIndex = 0 bFound = False for name in twoDFlowAreaAttributs['Name']: if name == area: bFound = True break areaIndex += 1 if not bFound: print("The specified area with name", area, "was not found in the geometry. Exiting ...") sys.exit() #Get the start and end of current 2D flow area cell index temp = hf['Geometry']['2D Flow Areas']['Cell Info'] iStart = temp[areaIndex,0] iEnd = temp[areaIndex,1] hdf2DAreaCellCenterCoordinates = np.array(hf['Geometry']['2D Flow Areas'][area] ['Cells Center Coordinate'])[iStart:iEnd,:] # print(hdf2DAreaCellCenterCoordinates) hf.close() return hdf2DAreaCellCenterCoordinates
[docs] def get2DAreaFacePointsCoordinates(self, area): """Get the face points' coordinates for a specified 2D area Parameters ---------- area : str name of area Returns ------- """ hf = h5py.File(self.hdf_filename,'r') hdf2DAreaFacePointsCoordinates = np.array(hf['Geometry']['2D Flow Areas'][area]['FacePoints Coordinate']) #print(hdf2DAreaFacePointsCoordinates) #add the third column to the coordinates array for the z-coordinates (RAS2D only exports x and y, not z) z_temp = np.zeros(hdf2DAreaFacePointsCoordinates.shape[0]) hdf2DAreaFacePointsCoordinates3D = np.column_stack((hdf2DAreaFacePointsCoordinates, z_temp)) #interpolate the elevation (bathymetry) to z coordinates self.interpolateZcoord2Points(hdf2DAreaFacePointsCoordinates3D) #print(hdf2DAreaFacePointsCoordinates3D) hf.close() return hdf2DAreaFacePointsCoordinates3D
[docs] def interpolateZcoord2Points(self, facePointsCoordinates3D): """Interpolate the elevation (bathymetry) to 2D area's points (z-coordinate) Parameters ---------- facePointsCoordinates3D : numpy.ndarray face points coordinates in 3D Returns ------- """ allFacePointsCoordiantes=np.empty([facePointsCoordinates3D.shape[0],2]) for k in range(facePointsCoordinates3D.shape[0]): x1 = facePointsCoordinates3D[k,0] y1 = facePointsCoordinates3D[k,1] allFacePointsCoordiantes[k,0] = x1 allFacePointsCoordiantes[k,1] = y1 # Prefer rasterio-based interpolation when available; fall back to the GDAL-based version. allFacePointZ = self.interpolatorFromGeoTiff_rasterio( self.terrain_tiff_filename, allFacePointsCoordiantes ) for k in range(facePointsCoordinates3D.shape[0]): facePointsCoordinates3D[k,2] = allFacePointZ[k]
[docs] def get2DAreaCellsFaceOrientationInfo(self, area): """Get cells face and orientation info for a specified 2D area Parameters ---------- area : str name of area Returns ------- """ hf = h5py.File(self.hdf_filename,'r') hdf2DAreaCellsFaceOrientationInfo = np.array(hf['Geometry']['2D Flow Areas'][area]['Cells Face and Orientation Info']) #print(hdf2DAreaCellsFaceOrientationInfo) hf.close() return hdf2DAreaCellsFaceOrientationInfo
[docs] def get2DAreaCellsFaceOrientationValues(self, area): """Get cells face and orientation values for a specified 2D area Parameters ---------- area : str name of area Returns ------- """ hf = h5py.File(self.hdf_filename,'r') hdf2DAreaCellsFaceOrientationValues = np.array(hf['Geometry']['2D Flow Areas'][area]['Cells Face and Orientation Values']) #print(hdf2DAreaCellsFaceOrientationValues) hf.close() return hdf2DAreaCellsFaceOrientationValues
[docs] def get2DAreaFaceAreaElevationInfo(self, area): """Get faces area elevation info for a specified 2D area Two columns: first column-starting index, second column-length of record Parameters ---------- area : str name of area Returns ------- """ hf = h5py.File(self.hdf_filename,'r') hdf2DAreaFaceAreaElevationInfo = np.array(hf['Geometry']['2D Flow Areas'][area]['Faces Area Elevation Info']) #print(hdf2DAreaFaceAreaElevationInfo) hf.close() return hdf2DAreaFaceAreaElevationInfo
[docs] def get2DAreaFaceAreaElevationValues(self, area): """Get faces area elevation values for a specified 2D area Four columns: first column-elevation, second column-area third column-wetted perimeter, fourth column-Manning's n (a constant as of now RAS version <=6) Parameters ---------- area : str name of area Returns ------- """ hf = h5py.File(self.hdf_filename,'r') hdf2DAreaFaceAreaElevationValues = np.array(hf['Geometry']['2D Flow Areas'][area]['Faces Area Elevation Values']) #print(hdf2DAreaFaceAreaElevationValues) hf.close() return hdf2DAreaFaceAreaElevationValues
[docs] def get2DAreaFaceFacePointIndexes(self, area): """Get face's facepoint indexes (two interger IDs) Parameters ---------- area : str name of area Returns ------- """ hf = h5py.File(self.hdf_filename,'r') hdf2DAreaFaceFacePointIndexes = np.array(hf['Geometry']['2D Flow Areas'][area]['Faces FacePoint Indexes']) #print(hdf2DAreaFaceFacePointIndexes) hf.close() return hdf2DAreaFaceFacePointIndexes
[docs] def hdf2DAreaResultVar(self, area, varName): """Get HEC-RAS solution time series result for a given variable Parameters ---------- area : str name of area varName : str variable name Returns ------- """ hf = h5py.File(self.hdf_filename,'r') hdf2DAreaResultVar = np.array(hf['Results']['Unsteady']['Output']['Output Blocks'] ['Base Output']['Unsteady Time Series']['2D Flow Areas'][area][varName]) #only take the last row (time step) #hdf2DAreaResultVar = hdf2DAreaResultVar[(hdf2DAreaResultVar.shape[0]-1)] #print(varName, hdf2DAreaResultVar) hf.close() return hdf2DAreaResultVar
[docs] def get2DAreaSolutionTimes(self): """Get 2D Flow Area solution times Returns ------- hdf2DAreaSolutionTimes : list list of solution times """ hf = h5py.File(self.hdf_filename,'r') hdf2DAreaSolutionTimes = np.array(hf['Results']['Unsteady']['Output']['Output Blocks'] ['Base Output']['Unsteady Time Series']['Time']) #print(hdf2DAreaSolutionTimes) hf.close() return hdf2DAreaSolutionTimes
[docs] def get2DAreaSolutionTimeDates(self): """Get 2D Flow Area solution time_dates Returns ------- hdf2DAreaSolutionTimeDates : list list of solution time and dates """ hf = h5py.File(self.hdf_filename,'r') hdf2DAreaSolutionTimeDates = np.array(hf['Results']['Unsteady']['Output']['Output Blocks'] ['Base Output']['Unsteady Time Series']['Time Date Stamp']) #print(hdf2DAreaSolutionTimeDates) hf.close() return hdf2DAreaSolutionTimeDates
[docs] def load2DAreaSolutions(self): """Load solutions (cell depth and water surface elevation, node X and Y vel.) for all 2D areas Returns ------- """ if gVerbose: print('Loading 2D area solutions ...') #loop through each 2D areas for area,i in zip(self.TwoDAreaNames, range(len(self.TwoDAreaNames))): #print("2D Flow Area = ", area) #fetch the depth and WSE data depth_name = '' #name of depth variable in HDF file WSE_name = '' #name of WSE variable in HDF file nodeVx_name = '' #name of nodal x velocity in HDF file nodeVy_name = '' # name of nodal y velocity in HDF file if self.version == '5.0.7': depth_name = 'Depth' WSE_name = 'Water Surface' nodeVx_name = 'Node X Vel' nodeVy_name = 'Node Y Vel' elif self.version == '6.0.0' or self.version == '6.1.0': depth_name = 'Cell Invert Depth' WSE_name = 'Water Surface' nodeVx_name = 'Node Velocity - Velocity X' nodeVy_name = 'Node Velocity - Velocity Y' elif self.version == '6.6': #For v6.6, the nodal velocity is not exported correctly; thus, we use the cell center velocity. depth_name = 'Cell Invert Depth' WSE_name = 'Water Surface' cellVx_name = 'Cell Velocity - Velocity X' cellVy_name = 'Cell Velocity - Velocity Y' nodeVx_name = 'Node Velocity - Velocity X' nodeVy_name = 'Node Velocity - Velocity Y' else: raise Exception("The version of HEC-RAS that produced this HDF result file is not supported.") try: cellDepth = self.hdf2DAreaResultVar(area, depth_name) cellWSE = self.hdf2DAreaResultVar(area, WSE_name) if self.version == '6.6': cellVx = self.hdf2DAreaResultVar(area, cellVx_name) cellVy = self.hdf2DAreaResultVar(area, cellVy_name) except KeyError as e: raise KeyError(f"Variable '{depth_name}' or '{WSE_name}' not found in 2D area '{area}'. Available variables may vary by HEC-RAS version.") #slice the cell data array to get values only for cells in current 2D area. #The HEC-RAS results also contain values at the center of boundary faces. cellDepth = cellDepth[0:(self.TwoDAreaCellCounts[i])] cellWSE = cellWSE[0:(self.TwoDAreaCellCounts[i])] if self.version == '6.6': cellVx = cellVx[0:(self.TwoDAreaCellCounts[i])] cellVy = cellVy[0:(self.TwoDAreaCellCounts[i])] self.TwoDAreaCellDepth.append(cellDepth) self.TwoDAreaCellWSE.append(cellWSE) if self.version == '6.6': self.TwoDAreaCellVx.append(cellVx) self.TwoDAreaCellVy.append(cellVy) self.TwoDAreaCellVz.append(cellVx*0.0) #fake z velocity (=0) #fetch point data #Node X Vel and Node Y Vel pointVx=self.hdf2DAreaResultVar(area, nodeVx_name) pointVy=self.hdf2DAreaResultVar(area, nodeVy_name) #deal with nan in pointVx and pointVy pointVx = np.where(np.isnan(pointVx), 0.0, pointVx) pointVy = np.where(np.isnan(pointVy), 0.0, pointVy) #create pointVz which is zero pointVz = np.zeros_like(pointVx) #print("pointVx = ", pointVx) #print("pointVy = ", pointVy) self.TwoDAreaPointVx.append(pointVx) self.TwoDAreaPointVy.append(pointVy) self.TwoDAreaPointVz.append(pointVz)
[docs] def build2DAreaFaceHydraulicInformation(self): """For 2D flow area, build face hydraulic information table (face area elevation) Returns ------- """ if gVerbose: print("Building 2D area's face hydraulic information ...") areaFaceHydraulicInformationTable = [] #loop through each 2D areas for area,i in zip(self.TwoDAreaNames, range(len(self.TwoDAreaNames))): #print("2D Flow Area = ", area) #get face area elevation info faceAreaElevationInfo = self.get2DAreaFaceAreaElevationInfo(area) self.TwoDAreaFaceAreaElevationInfo.append(faceAreaElevationInfo) #get face area elevation values faceAreaElevationValues = self.get2DAreaFaceAreaElevationValues(area) self.TwoDAreaFaceAreaElevationValues.append(faceAreaElevationValues) #print("Total number of faces in this area: ", faceAreaElevationInfo.shape[0]) #temp list to store the numpy arrays (tables) for each face in current area currAreaFaceHydraulicInformationTable = [] #loop through all faces in this area for faceI in range(faceAreaElevationInfo.shape[0]): #print("faceI = ", faceI) faceData = np.zeros([faceAreaElevationInfo[faceI,1],4]) #the four columns: elevation, area, wetted perimeter, and Manning's n start_row = faceAreaElevationInfo[faceI,0] end_row = faceAreaElevationInfo[faceI,0] + faceAreaElevationInfo[faceI,1] faceData = faceAreaElevationValues[start_row:end_row,:].copy() #make a copy #print("faceData = ", faceData) currAreaFaceHydraulicInformationTable.append(faceData) #append the current area's faceHydraulicInformation table to the top list areaFaceHydraulicInformationTable.append(currAreaFaceHydraulicInformationTable) return areaFaceHydraulicInformationTable
[docs] def build2DAreaFacePointCoordinatesList(self): """Build face point coordinates list Returns ------- """ if gVerbose: print("Building 2D area's face point coordinates list ...") #loop through each 2D areas for area,i in zip(self.TwoDAreaNames, range(len(self.TwoDAreaNames))): #print("2D Flow Area = ", area) #get the FacePoint coordinates in the current 2D flow area facePointsCoordinates = self.get2DAreaFacePointsCoordinates(area) self.TwoDAreaFacePointCoordinatesList.append(facePointsCoordinates)
[docs] def build2DAreaCellFaceList(self): """Build cell's face list Returns ------- """ if gVerbose: print("Building 2D area's cell face list ...") #loop through each 2D areas for area,i in zip(self.TwoDAreaNames, range(len(self.TwoDAreaNames))): #print("2D Flow Area = ", area) #get cells face and orientation info cellsFaceOrientationInfo = self.get2DAreaCellsFaceOrientationInfo(area) #get cells face and orientation values cellsFaceOrientationValues = self.get2DAreaCellsFaceOrientationValues(area) #temp list to store the numpy arrays (face list) for each cell in current area currAreaCellFaceList = [] #loop through cells in current area for cellI in range(self.TwoDAreaCellCounts[i]): start_row = cellsFaceOrientationInfo[cellI,0] end_row = cellsFaceOrientationInfo[cellI,0] + cellsFaceOrientationInfo[cellI,1] faceList = cellsFaceOrientationValues[start_row:end_row,0].copy() #make a copy #print("faceList =", faceList) currAreaCellFaceList.append(faceList) #append the current area's cell face list to the top list self.TwoDAreaCellFaceList.append(currAreaCellFaceList)
[docs] def build2DAreaFaceProfile(self): """Build face profile Face profile in HEC-RAS represents the sub-grid terrain. Returns ------- """ if gVerbose: print("Building 2D area's face profile ...") #loop through each 2D areas for area,i in zip(self.TwoDAreaNames, range(len(self.TwoDAreaNames))): #print("2D Flow Area = ", area) face_facePoints = self.TwoDAreaFace_FacePoints[i] #get the FacePoint coordinates in the current 2D flow area facePointsCoordinates = self.get2DAreaFacePointsCoordinates(area) #calculate how many points there are in all the profiles # loop through all faces in current area (it is a slow process; print a progress bar) totalNumProfilePoints = self.nFaceProfilePoints * face_facePoints.shape[0] #array to store all profile points (for all faces) totalProfilePoints = np.zeros((totalNumProfilePoints,3)) curAreaProfilePoints = [] #loop through all faces in current area (it is a slow process; print a progress bar) for faceI in range(face_facePoints.shape[0]): #print("faceI, face_facePoints.shape[0] = ", faceI, face_facePoints.shape[0]) printProgressBar(faceI, face_facePoints.shape[0], "Face profile computation in progress") #print("faceI =", faceI) facePoint_start = face_facePoints[faceI,0] #start ID of facePoint facePoint_end = face_facePoints[faceI,1] #end ID of facePoint #get the coordinates of the start and end facePoints startFacePointCoordinates = facePointsCoordinates[facePoint_start] endFacePointCoordinates = facePointsCoordinates[facePoint_end] length = self.horizontalDistance(startFacePointCoordinates,endFacePointCoordinates) stations = np.linspace(0,length,self.nFaceProfilePoints,endpoint=True) #if faceI == 0: print(repr(stations)) #print(startFacePointCoordinates) #print(endFacePointCoordinates) for pointI in range(self.nFaceProfilePoints): totalProfilePoints[pointI+self.nFaceProfilePoints*faceI,:] = (startFacePointCoordinates + (endFacePointCoordinates-startFacePointCoordinates) *pointI/(self.nFaceProfilePoints-1)) #interpolate elevation to all profile points self.interpolateZcoord2Points(totalProfilePoints) #unpack the points and assign to each face profile for faceI in range(face_facePoints.shape[0]): profilePoints = totalProfilePoints[self.nFaceProfilePoints*faceI:(self.nFaceProfilePoints*(faceI+1)),:] curAreaProfilePoints.append(profilePoints) self.TwoDAreaFaceProfile.append(curAreaProfilePoints)
[docs] def interpolateManningN_face_to_cell(self): """interpolate Manning's n from face to cell center HEC-RAS 2D stores Manning's n value at faces, not cell centers. We need to interpolate its value from face to cell center. Another option is to use the cell center coordinates and direclty query the Manning's n layer in RAS Mapper, which might be too complicated because HEC-RAS has default Manning's n, Land Use Cover, Override polygon, etc. Here, the interpolation from face to cell might not be a bad choice. For each face, HEC-RAS currently does not support varying Manning's n (horizontally and veritcally). Thus, effectively, it is a constant value for each face. This may change in the future. Returns ------- """ if gVerbose: print("Interpolating Manning's n from face to cell center ...") #clear the list up in case this function has been called before self.TwoDAreaCellManningN = [] #loop through each 2D areas for area,i in zip(self.TwoDAreaNames, range(len(self.TwoDAreaNames))): #print("2D Flow Area = ", area) temp_n = np.zeros(self.TwoDAreaCellCounts[i]) #loop through cells in current area for cellI in range(self.TwoDAreaCellCounts[i]): sum_n = 0.0 #loop through all faces of current cell for faceI in self.TwoDAreaCellFaceList[i][cellI]: sum_n += self.TwoDAreaFaceHydraulicInformation[i][faceI][0,3] #only take the first value because #it is constant for each face temp_n[cellI] = sum_n/len(self.TwoDAreaCellFaceList[i][cellI]) self.TwoDAreaCellManningN.append(temp_n)
[docs] def buildCellManningNFromGeoTiffHDF(self): """Build 2D flow area cell's Manning n value from HEC-RAS's GeoTiff and HDF files for Manning n. This method is more accurate than interpolateManningN_face_to_cell(), which uses a face to cell interpolation. Also need to consider the case where the "Land Cover Filename" is empty and the whole domain uses the default constant Manning's n. Returns ------- """ if gVerbose: print("Building cell's Manning n values from GeoTiff and HDF ...") self.TwoDAreaCellManningN = [] self.TwoDAreaCellManningN.append(np.zeros(self.TwoDAreaCellCounts[0])) cell_center_coordinates = np.array(self.get2DAreaCellCenterCoordiantes(self.TwoDAreaNames[0])) #loop through each cell #for cellI in range(self.TwoDAreaCellCounts[0]): # ManningN_ID = int(self.ManningNIDInterpolator(cell_center_coordinates[cellI,0], # cell_center_coordinates[cellI,1])) # cell_ManningN[cellI] = self.ManningNZones[ManningN_ID][1] #get the Manning n value #get the full land cover file name (deal with relative path w.r.t. the script where RAS_2D_Data class is used) if os.path.dirname(self.hdf_filename) == '': fileBase = b'' else: fileBase = str.encode(os.path.dirname(self.hdf_filename) + '/') #HEC-RAS v5's land cover file name stores the tiff file name, while in #v6 it stores the hdf file name. For v6, I assume the tiff file name is #land cover layer name.tif. Need to check this assumption. if self.version == '5.0.7': full_landcover_filename = fileBase+self.landcover_filename elif self.version == '6.0.0' or self.version == '6.1.0' or self.version == '6.6': full_landcover_filename = (fileBase+self.landcover_filename[:-4])+b'.tif' #note: it may already has the ".tif" extension else: errMessage = "HEC-RAS version not supported. version = " + self.version raise Exception(errMessage) if self.landcover_filename == b'': #if there is no "Land cover Filename" specified. ManningN_IDs = [0] * self.TwoDAreaCellCounts[0] else: # Prefer rasterio-based interpolation when available; fall back to the GDAL-based version. ManningN_IDs = self.interpolatorFromGeoTiff_rasterio( full_landcover_filename, cell_center_coordinates ) #set the Manning's n values for each cells for cellI in range(self.TwoDAreaCellCounts[0]): self.TwoDAreaCellManningN[0][cellI] = self.ManningNZones[ManningN_IDs[cellI]][1] #get the Manning n value #bin each cell to different Manning's n zones self.cellsInManningZones = [ [] for _ in range(len(self.ManningNZones))] # a list of lists for cellI in range(self.TwoDAreaCellCounts[0]): if self.cellsInManningZones[ManningN_IDs[cellI]]: self.cellsInManningZones[ManningN_IDs[cellI]].append(cellI) else: self.cellsInManningZones[ManningN_IDs[cellI]] = [cellI]
#print("cellsInManningZones = ", self.cellsInManningZones)
[docs] def buildFace_FacePoints(self): """Build face's facepoint list Returns ------- """ if gVerbose: print("Building face's facepoints list ...") #loop through each 2D areas for area,i in zip(self.TwoDAreaNames, range(len(self.TwoDAreaNames))): #print("2D Flow Area = ", area) #get cells face and orientation info faceFacePointIndexes = self.get2DAreaFaceFacePointIndexes(area) self.TwoDAreaFace_FacePoints.append(faceFacePointIndexes)
[docs] def saveHEC_RAS2D_results_to_VTK(self,timeStep=-1,lastTimeStep=False,fileNameBase='',dir='',bFlat=False): """Save HEC-RAS 2D solutions to VTK files. Note: - Each area saved separately - Each time saved separately The resulted files will be RAS2D_areaName_timeSequence.vtk, e.g., RAS2D_SpringCreek_0001.vtk, RAS2D_SpringCreek_0002.vtk, etc. The option lastTimeStep specifies whether only the last time step is saved (default=False). The option timeStep specifies the particular step to be saved. If both lastTimeSTep and timeStep are specified, lastTimeStep has the priority. Note: - In most HEC-RAS versions, the velocity can be exported at nodes (face points). But somehow in HEC-RAS 6.6, the nodal velocity is not exported correctly (it has overflow float points; don't know why); however, the velocity at cell centers is correct. Thus, we save the cell center velocity for version 6.6. Parameters ---------- timeStep : int, optional only the specified time step will be saved lastTimeStep : bool, optional specify only the last time step dir : str, optional directory name to write to Returns ------- """ if gVerbose: print('Saving RAS-2D results to VTK ...') #check the sanity of timeStep if (timeStep != -1) and (not timeStep in range(len(self.solution_time))): message = "Specified timeStep = %d not in range (0 to %d)." % (timeStep, len(self.solution_time)) sys.exit(message) #get units for variable name appendix (like SRH-2D) varLengthNameAppendix = '' varVelocityNameAppendix = '' if gVerbose: print("HEC-RAS project units =",self.units) if self.units == 'Feet': varLengthNameAppendix = '_ft' varVelocityNameAppendix = '_ft_p_s' elif self.units == 'Meter': varLengthNameAppendix = '_m' varVelocityNameAppendix = '_m_p_s' else: print("Wrong units specified in the HEC-RAS project file. Units = ", self.units) print("Exiting ...") sys.exit() #loop through each 2D areas for area,i in zip(self.TwoDAreaNames, range(len(self.TwoDAreaNames))): #print("2D Flow Area = ", area) #get the FacePoint indexes cellFacePointIndexes = self.get2DAreaCellFacePointsIndexes(area) #get the FacePoint coordinates (3D, the elevation is interpolated from terrain) facePointsCoordinates = self.get2DAreaFacePointsCoordinates(area) if bFlat: # if we want the mesh to be flat, then z = 0.0 facePointsCoordinates[:,2] = 0.0 #get cells face and orientation info cellsFaceOrientationInfo = self.get2DAreaCellsFaceOrientationInfo(area) #get current 2D area's solutions cellDepth = self.TwoDAreaCellDepth[i][:,0:self.TwoDAreaCellCounts[i]] cellWSE = self.TwoDAreaCellWSE[i][:,0:self.TwoDAreaCellCounts[i]] if self.version == '6.6': cellVx = self.TwoDAreaCellVx[i][:,0:self.TwoDAreaCellCounts[i]] cellVy = self.TwoDAreaCellVy[i][:,0:self.TwoDAreaCellCounts[i]] cellVz = self.TwoDAreaCellVz[i][:,0:self.TwoDAreaCellCounts[i]] pointVx = self.TwoDAreaPointVx[i] pointVy = self.TwoDAreaPointVy[i] pointVz = self.TwoDAreaPointVz[i] #check if the point velocity is overflowed or underflowed. If so, set it to 0.0 pointVx[pointVx > 1.0e+10] = 0.0 pointVx[pointVx < -1.0e+10] = 0.0 pointVy[pointVy > 1.0e+10] = 0.0 pointVy[pointVy < -1.0e+10] = 0.0 pointVz[pointVz > 1.0e+10] = 0.0 pointVz[pointVz < -1.0e+10] = 0.0 #check if the point velocity is NaN. If so, set it to 0.0 pointVx[np.isnan(pointVx)] = 0.0 pointVy[np.isnan(pointVy)] = 0.0 pointVz[np.isnan(pointVz)] = 0.0 #add Manning's n value for visualization cellManningN = self.TwoDAreaCellManningN[i] #print("self.TwoDAreaCellDepth = ", self.TwoDAreaCellDepth) #print("type(cellDepth) =", type(cellDepth)) #print("cellDepth =", cellDepth) # list of vtkFileName (to be returned to caller) vtkFileNameList = [] #loop through solution times for timeI in range(len(self.solution_time)): if lastTimeStep: if timeI < (len(self.solution_time)-1): continue if (timeStep != -1) and (timeI != timeStep): continue if gVerbose: print("timeI = ", timeI) #points pointsVTK = vtk.vtkPoints() pointsVTK.SetData(VN.numpy_to_vtk(facePointsCoordinates)) #cell topology information list: [num. of FP, FP0, FP1, .., num. of FP, FPxxx] #the list start with the number of FP for a cell and then the list of FP indexes connectivity_list = [] #type of cells (contains the number of face points #celltypes = np.zeros(self.TwoDAreaCellCounts[i], dtype=np.int64) cellFPCounts = np.zeros(self.TwoDAreaCellCounts[i], dtype=np.int64) #loop through each cell in the current 2D area to get their face point indexes for celli in range(self.TwoDAreaCellCounts[i]): #get the number of face points (=number of faces) numFP = cellsFaceOrientationInfo[celli,1] #print("numFP = ", numFP) if numFP > gMax_Nodes_per_Element: print("The number of face points, %d, for current face is more than the maximum allowed (%d)." % (numFP, gMax_Nodes_per_Element)) print("Exiting ...") sys.exit() connectivity_list.append(numFP) for fpI in range(numFP): connectivity_list.append(cellFacePointIndexes[celli][fpI]) cellFPCounts[celli] = numFP connectivity = np.array(connectivity_list, dtype=np.int64) #convert cell's number of face points to VTK cell type vtkHandler_obj = vtkHandler() cell_types = vtkHandler_obj.number_of_nodes_to_vtk_celltypes(cellFPCounts) #for vtk version > 9, it seems the vtkUnstructuredGrid's SetCells() function has changed. #The following is to convert cell_types from numpy's array to list cell_types = cell_types.tolist() cellsVTK = vtk.vtkCellArray() cellsVTK.SetCells(self.TwoDAreaCellCounts[i], VN.numpy_to_vtkIdTypeArray(connectivity)) uGrid = vtk.vtkUnstructuredGrid() uGrid.SetPoints(pointsVTK) uGrid.SetCells(cell_types, cellsVTK) #add solutions #add cell center data currentTimeCellDepth = cellDepth[timeI,:] currentTimeCellWSE = cellWSE[timeI,:] if self.version == '6.6': currentTimeCellVx = cellVx[timeI,:] currentTimeCellVy = cellVy[timeI,:] currentTimeCellVz = cellVz[timeI,:] currentTimePointVx = pointVx[timeI,:] currentTimePointVy = pointVy[timeI,:] currentTimePointVz = pointVz[timeI,:] #combine cell center velocity components into a vector (numpy array) if self.version == '6.6': currentTimeCellV = np.zeros((len(currentTimeCellVx),3)) currentTimeCellV[:,0] = currentTimeCellVx currentTimeCellV[:,1] = currentTimeCellVy currentTimeCellV[:,2] = currentTimeCellVz #combine point velocity components into a vector (numpy array) currentTimePointV = np.zeros((len(currentTimePointVx),3)) currentTimePointV[:,0] = currentTimePointVx currentTimePointV[:,1] = currentTimePointVy currentTimePointV[:,2] = currentTimePointVz currentTimePointElevation = facePointsCoordinates[:,2] #z coordinate of nodes currentTimeCellManningN = cellManningN #the same; no time change cell_data = uGrid.GetCellData() # This holds cell data point_data = uGrid.GetPointData() # This holds point data #cell depth currentTimeCellDepth_array = VN.numpy_to_vtk(currentTimeCellDepth) currentTimeCellDepth_array.SetName('Water_Depth'+varLengthNameAppendix) cell_data.AddArray(currentTimeCellDepth_array) #cell WSE currentTimeCellWSE_array = VN.numpy_to_vtk(currentTimeCellWSE) currentTimeCellWSE_array.SetName('Water_Elev'+varLengthNameAppendix) cell_data.AddArray(currentTimeCellWSE_array) #cell center velocity if self.version == '6.6': currentTimeCellV_array = VN.numpy_to_vtk(currentTimeCellV) currentTimeCellV_array.SetName('Velocity_cell'+varVelocityNameAppendix) cell_data.AddArray(currentTimeCellV_array) #cell center Manning's n currentTimeCellManningN_array = VN.numpy_to_vtk(currentTimeCellManningN) currentTimeCellManningN_array.SetName('ManningN') cell_data.AddArray(currentTimeCellManningN_array) #point velocity currentTimePointV_array = VN.numpy_to_vtk(currentTimePointV) currentTimePointV_array.SetName('Velocity_node'+varVelocityNameAppendix) point_data.AddArray(currentTimePointV_array) #point elevation currentTimePointElevation_array = VN.numpy_to_vtk(currentTimePointElevation) currentTimePointElevation_array.SetName('Bed_Elev'+varLengthNameAppendix) point_data.AddArray(currentTimePointElevation_array) #add field data: # - Time (float) # - Time_Date (string) #field_data = {'TIME': np.array([self.solution_time[timeI]]), 'DATE_TIME': str(self.solution_time_date[timeI],'utf-8')} field_data = {'TIME': np.array([self.solution_time[timeI]])} #write to vtk file if fileNameBase=='': fileNameBase='RAS2D_' fileName_temp = [] if dir!= '': fileName_temp = [dir, '/', fileNameBase, self.plan_name, '_', area.astype(str), '_', str(timeI).zfill(4),'.vtk'] else: fileName_temp = [fileNameBase, self.plan_name, '_', area.astype(str), '_', str(timeI).zfill(4), '.vtk'] vtkFileName = "".join(fileName_temp) # write out the ugrid unstr_writer = vtk.vtkUnstructuredGridWriter() # this can save as vtu format unstr_writer.SetFileName(vtkFileName) unstr_writer.SetInputData(uGrid) unstr_writer.Write() # add the vtkFileName to vtkFileNameList vtkFileNameList.append(vtkFileName) # vtkFileNameList return vtkFileNameList
[docs] def exportSRHGEOMFile(self, srhgeomFileName, twoDAreaNumber = 0): """Export srhgeom file Parameters ---------- srhgeomFileName : str name of the srhgeom file twoDAreaNumber : int, optional 2D flow area number (default = 0) Returns ------- """ #only one 2D area is exported. # get the cell's FacePoint indexes cellFacePointIndexes = self.get2DAreaCellFacePointsIndexes(self.TwoDAreaNames[twoDAreaNumber]) # get the FacePoint coordinates facePointsCoordinates = self.TwoDAreaFacePointCoordinatesList[twoDAreaNumber] # get cells face and orientation info cellsFaceOrientationInfo = self.get2DAreaCellsFaceOrientationInfo(self.TwoDAreaNames[twoDAreaNumber]) #write out to srhgeom file fname = srhgeomFileName + '.srhgeom' if gVerbose: print("Writing SRHGEOM file: ", fname) try: fid = open(fname, 'w') except IOError: print('.srhgeom error') sys.exit() fid.write('SRHGEOM 30\n') fid.write('Name \"HEC-RAS 2D Mesh %s\"\n' % srhgeomFileName) fid.write('\n') fid.write('GridUnit \"%s\" \n' % self.units) #all cells for cellI in range(self.TwoDAreaCellCounts[0]): cell_list = cellFacePointIndexes[cellI,0:cellsFaceOrientationInfo[cellI,1]] for i in range(len(cell_list)): cell_list[i] += 1 fid.write("Elem ") fid.write("%d " % (cellI+1)) #cellI+1 because SRH-2D is 1-based fid.write(" ".join(map(str, cell_list))) fid.write("\n") #all points for pointI in range(facePointsCoordinates.shape[0]): fid.write("Node %d " % (pointI+1)) #pointI+1 because SRH-2D is 1-based curr_point_coordinates = [facePointsCoordinates[pointI,0], facePointsCoordinates[pointI,1], facePointsCoordinates[pointI,2]] fid.write(" ".join(map(str, curr_point_coordinates))) fid.write("\n") #NodeString boundary_id = 0 #boundary ID counter #loop through all boundaries (only pick those used in current 2D flow area and with type = "External") for k in range(self.totalBoundaries): boundary_id += 1 #check the current boundary is used by the current 2D flow area and its type if self.TwoDAreaBoundaryNamesTypes[k][1] != self.TwoDAreaNames[twoDAreaNumber] or \ self.TwoDAreaBoundaryNamesTypes[k][2] != b"External": continue boundaryName = self.TwoDAreaBoundaryNamesTypes[k][0] fid.write("NodeString %d " % boundary_id) #loop over all node ID in the current NodeString for i in range(self.boundaryTotalPoints[boundary_id-1]): fid.write(" %d" % (self.boundaryPointList[boundary_id-1,i]+1)) #all point IDs are +1 becaue SRH-2D is 1-based and RAS2D is 0-based. # 10 numbers per line or this is the last node, start a new line if (((i + 1) % 10) == 0) or (i == (self.boundaryTotalPoints[boundary_id-1] - 1)): fid.write("\n") fid.write("\n") fid.close() print("SRHGEOM file exported successfully: ", fname)
[docs] def exportSRHMATFile(self, srhmatFileName): """Export the SRHMAT file Parameters ---------- srhmatFileName : str name of the srhmat file to write to Returns ------- """ #only the first 2D area is exported. cell_ManningN = self.TwoDAreaCellManningN[0] #cell Manning's n if gVerbose: print("cell_ManningN", cell_ManningN) #number of Manning's n zones nManningNZones = len(self.ManningNZones) if gVerbose: print("nManningNZones", nManningNZones) fname = srhmatFileName + '.srhmat' if gVerbose: print("Writing SRHMAT file: ", fname) try: fid = open(fname, 'w') except IOError: print('.srhmat error') sys.exit() fid.write('SRHMAT 30\n') fid.write('NMaterials %d\n' % (nManningNZones + 1)) #+1 is because SRH-2D also counts the default Manning's n in srhhydro file. #output MatName for matID in range(nManningNZones): fid.write('MatName %d \"%s\" \n' % (matID + 1, self.ManningNZones[matID][0].decode("utf-8"))) # +1 because SRH-2D is 1-based #output cells in different material categories for matID in range(nManningNZones): if not self.cellsInManningZones[matID]: #this Manning's n zone has no cells continue fid.write('Material %d ' % (matID+1)) #loop over all cells in current Manning's n zone for cellI in range(len(self.cellsInManningZones[matID])): fid.write(" %d" % (self.cellsInManningZones[matID][cellI]+1)) #+1 because SRH-2D is 1-based # 10 numbers per line or this is the last cell, start a new line if (((cellI+1) % 10) == 0) or (cellI == (len(self.cellsInManningZones[matID])-1)): fid.write("\n") fid.close() print("SRHMAT file exported successfully: ", fname)
[docs] def exportBoundariesToVTK(self, boundaryVTKFileName, dir='', twoDAreaNumber = 0): """Export boundaries of 2D area to VTK (for visual inspection in Paraview and check the ID of NodeString) Parameters ---------- boundaryVTKFileName : str name of the VTK file for output twoDAreaNumber : int, optional 2D flow area number (default is 0) Returns ------- """ #only the boundaries of one 2D area is exported. fname = dir + "/" + boundaryVTKFileName + '.vtk' if gVerbose: print('Writing RAS2D mesh boundaries to', fname) try: fid = open(fname, 'w') except IOError: print('boundary vtk file open error') sys.exit() #output header fid.write('# vtk DataFile Version 3.0\n') fid.write('RAS2D boundaries\n') fid.write('ASCII\n') fid.write('DATASET POLYDATA\n') fid.write('\n') #output all points in the mesh #(a short cut instead of output boundary points only; does not matter because #this is for inspection purpose only) #only the first 2D flow area area = self.TwoDAreaNames[twoDAreaNumber] # get the FacePoint coordinates (3D, the elevation is interpolated from terrain) facePointsCoordinates = self.get2DAreaFacePointsCoordinates(area) #total number of points in mesh totalNumPoints = facePointsCoordinates.shape[0] fid.write('POINTS %d float\n' % totalNumPoints) #loop over all points for k in range(facePointsCoordinates.shape[0]): fid.write(" ".join(map(str, facePointsCoordinates[k]))) fid.write("\n") #calculate total number of boundary points (all boundaries) totalNumBoundaryPoints = 0 for k in range(self.totalBoundaries): totalNumBoundaryPoints += self.boundaryTotalPoints[k] #print("totalNumBoundaryPoints = ", totalNumBoundaryPoints) #output lines (boundaries) fid.write('\nLINES %d %d\n' % (self.totalBoundaries, self.totalBoundaries+totalNumBoundaryPoints)) for k in range(self.totalBoundaries): fid.write('%d ' % self.boundaryTotalPoints[k]) fid.write(" ".join(map(str, self.boundaryPointList[k,:self.boundaryTotalPoints[k]]))) fid.write("\n") #output line ID (as cell_data) fid.write('\nCELL_DATA %d \n' % self.totalBoundaries) fid.write('scalars BC_ID integer\n') fid.write('LOOKUP_TABLE default\n') for k in range(self.totalBoundaries): fid.write('%d ' % (k+1)) #here k+1 is because SRH-2D is 1-based and RAS2D is 0-based. fid.write('\n') fid.close()
[docs] def exportFaceProfilesToVTK(self, faceProfileVTKFileName, dir='', twoDAreaNumber = 0): """Export face profile of 2D area to VTK (for visual inspection in Paraview) Parameters ---------- faceProfileVTKFileName : str name of VTK file to write to dir : str, optional directory to write to twoDAreaNumber : int, optional 2D flow area number (default is 0) Returns ------- """ #check whether the face profiles have been created; if not,create them (slow calculation) if len(self.TwoDAreaFaceProfile) == 0: self.build2DAreaFaceProfile() #only the face profiles of one 2D area is exported. faceProfiles = self.TwoDAreaFaceProfile[twoDAreaNumber] fname = dir + "/" + faceProfileVTKFileName + '.vtk' if gVerbose: print('\nWriting all face profiles to', fname) try: fid = open(fname, 'w') except IOError: print('face profile vtk file open error') sys.exit() #output header fid.write('# vtk DataFile Version 3.0\n') fid.write('RAS2D face profiles\n') fid.write('ASCII\n') fid.write('DATASET POLYDATA\n') fid.write('\n') #output all points in the profiles (repetition of points is ok because #this is for inspection purpose only) #total number of points in face profiles = (number of faces X points per profile) totalNumPoints = len(faceProfiles)*self.nFaceProfilePoints fid.write('POINTS %d float\n' % totalNumPoints) #loop over all points in all face profiles and export their coordinates for faceI in range(len(faceProfiles)): #loop over all points in current face profile for pointI in range(self.nFaceProfilePoints): #if pointI ==0: fid.write(" ".join(map(str, faceProfiles[faceI][pointI,:]))) fid.write(" ".join(map(str, faceProfiles[faceI][pointI,:]))) fid.write("\n") #output lines (face profiles) fid.write('\nLINES %d %d\n' % (len(faceProfiles), len(faceProfiles)+totalNumPoints)) profileBase = np.arange(0,self.nFaceProfilePoints) currentLineStartingIndex = 0 for lineI in range(len(faceProfiles)): fid.write('%d ' % self.nFaceProfilePoints) fid.write(" ".join(map(str, profileBase + currentLineStartingIndex))) fid.write("\n") currentLineStartingIndex += self.nFaceProfilePoints fid.close()
[docs] def dump_all_data(self): """Dump all data to screen (for debugging purpose) Returns ------- """ print("RAS2D_mesh_data class self-dump:") print(" hdf_filename = ", self.hdf_filename) print(" geometry_file_name = ", self.geometry_file_name) print(" terrain_hdf_filename = ", self.terrain_hdf_filename) print(" terrain_tiff_filename = ", self.terrain_tiff_filename) print(" plan_filename = ", self.plan_filename) print(" plan_name = ", self.plan_name) print(" plan_shortID = ", self.plan_shortID) print(" project_filename = ", self.project_filename) print(" comp_interval = ", self.comp_interval) print(" outp_interval = ", self.outp_interval) print(" map_interval = ", self.map_interval) print(" units = ", self.units) print(" short_ID = ", self.short_ID) print(" start_time = ", self.start_time) print(" end_time = ", self.end_time) print(" Dpart = ", self.Dpart) print(" solution_time = ", self.solution_time) print(" solution_timedates = ", self.solution_time_date) print(" 2DAreaNames = ", self.TwoDAreaNames) print(" 2DAreaCellCounts = ", self.TwoDAreaCellCounts) print(" 2DAreaCellPoints = \n", self.TwoDAreaCellPoints) print(" 2DAreaBoundaryPoints = \n", self.TwoDAreaBoundaryPoints) print(" TwoDAreaCellDepth = \n", self.TwoDAreaCellDepth) print(" TwoDAreaCellWSE = \n", self.TwoDAreaCellWSE) print(" TwoDAreapointVx = \n", self.TwoDAreaPointVx) print(" TwoDAreapointVy = \n", self.TwoDAreaPointVy) print(" TwoDAreapointVz = \n", self.TwoDAreaPointVz)
def main(): """ Testing Returns ------- """ my_ras_2d_data = RAS_2D_Data("Muncie2D.p01.hdf") #print(my_ras_2d_data.TwoDAreaFace_FacePoints[0]) #my_ras_2d_data.saveHEC_RAS2D_results_to_VTK(timeStep=2) #my_ras_2d_data.exportSRHGEOMFile("Muncie2D") #my_ras_2d_data.exportSRHMATFile("Muncie2D") #my_ras_2d_data.exportBoundariesToVTK("Muncie2D_boundaries") my_ras_2d_data.exportFaceProfilesToVTK("Muncie2D_faceprofiles") #dump all data to screen (debug) #my_ras_2d_data.dump_all_data() print("All done!") if __name__ == "__main__": main()