Source code for CrossSection

######################################################################
#          O  o   o o-o   o--o   o-o  o   o o--o o-o   o--o          #
#         / \ |\  | |  \  |   | o   o |\ /| |    |  \  |             #
#        o---o| \ | |   O O-Oo  |   | | O | O-o  |   O O-o           #
#        |   ||  \| |  /  |  \  o   o |   | |    |  /  |             #
#        o   oo   o o-o   o   o  o-o  o   o o--o o-o   o--o          #
######################################################################
#
# ANDROMEDE
# Copyright (C) 2023 Toulouse INP
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
# See the GNU General Public License for more details :
# <http://www.gnu.org/licenses/>.
#
######################################################################

import numpy as np
from scipy.interpolate import griddata
from skimage.measure import profile_line


[docs] class CrossSection: """ This class deals with Cross plotting and calculation """ def __init__(self, Xstart, Xend, WS, K, Npoint, CoeffR): """ Initialize cross-section :param Xstart: starting point of the profile ([X,Y]) :param Xend: ending point of the profile ([X,Y]) :param WS: water surface level :param K: strickler coefficient :param CoeffR: ratio between averaged and maximum velocity """ # point extreme du profil self.Xstart = Xstart self.Xend = Xend self.Npoint = Npoint self.CoeffR = CoeffR # profil topo mesuré self.X = [] self.Y = [] self.Z = [] self.V = [] self.q = [] self.Q = [] self.Vexp = [] self.qexp = [] self.Qexp = [] # profil de vitesses ensurface self.xsurface = [] self.ysurface = [] self.zsurface = [] self.Vsurface = [[], [], [], [], []] # vitesse suivant x et y self.distanceSurface = [] self.indH = [] self.Hexp = [] self.K = K self.S = [] self.Smean = [] self.WS = WS self.Rh = [] self.Ah = [] self.Pm = [] self.waterdepth = [] self.WSextent = [] self.distance = []
[docs] def ProjectionWithMNTXYZ(self, MNT, origin): """ Provide the bathymetry along the cross-section if topographic data are in a XYZ format :param MNT: Digital Elevation Model format (XYZ, 3 columns) :param origin: XY coordinates :return: position X and Y for N points between the starting and ending point. :rtype: list :return: bathymetry for point in cross-section :rtype: list """ x = MNT[:, 0] - origin[0] y = MNT[:, 1] - origin[1] z = MNT[:, 2] distance = [] x_min, x_max = np.min((self.Xstart[0], self.Xend[0])), np.max((self.Xstart[0], self.Xend[0])) y_min, y_max = np.min((self.Xstart[1], self.Xend[1])), np.max((self.Xstart[1], self.Xend[1])) xtopo = np.linspace(x_min, x_max, self.Npoint) ytopo = np.linspace(y_min, y_max, self.Npoint) indx1 = np.min(np.where(xtopo == self.Xstart[0])) indx2 = np.min(np.where(xtopo == self.Xend[0])) indy1 = np.min(np.where(ytopo == self.Xstart[1])) indy2 = np.min(np.where(ytopo == self.Xend[1])) self.X = list(np.linspace(xtopo[indx1], xtopo[indx2], self.Npoint)) self.Y = list(np.linspace(ytopo[indy1], ytopo[indy2], self.Npoint)) for ii in range(len(self.X)): distance.append(((self.X[ii] - self.X[0]) ** 2 + (self.Y[ii] - self.Y[0]) ** 2) ** 0.5) x1, y1 = np.meshgrid(xtopo, ytopo) grid_MNT = griddata((x, y), z, (x1, y1), method='linear') Z_temp = profile_line(grid_MNT, (indy1, indx1), (indy2, indx2)) d_temp = np.linspace(0, np.max(distance), len(Z_temp)) self.Z = list(np.interp(distance, d_temp, Z_temp))
[docs] def ProjectionVelocity(self, X, Y, velocity_field): """ Provide the velocity along the cross-section from velocity stored in a andromede result grid :param X: X position of each cross-section point (2D array) :param Y: Y position of each cross-section point (2D array) :param velocity_field: andromede result grid (X,Y,Vx,VY,V,...) :return: Surface velocity for point in cross-section X,Y and associated dat (fluctuations,..) :rtype: list """ # suppression Nan Value x = X.flatten() y = Y.flatten() Vx = velocity_field[0].flatten() Vy = velocity_field[1].flatten() Vf = velocity_field[3].flatten() uncertainty = velocity_field[6].flatten() indnan = np.bitwise_not(np.isnan(Vx)) x = x[indnan] y = y[indnan] Vx = Vx[indnan] Vy = Vy[indnan] Vf = Vf[indnan] if len(uncertainty): uncertainty = uncertainty[indnan] distance = [] method = 'linear' xtopo = X[0, :] ytopo = Y[:, 0] x1, y1 = np.meshgrid(xtopo, ytopo) indx1 = np.argmin(abs(xtopo - self.Xstart[0])) indx2 = np.argmin(abs(xtopo - self.Xend[0])) indy1 = np.argmin(abs(ytopo - self.Xstart[1])) indy2 = np.argmin(abs(ytopo - self.Xend[1])) self.xsurface = list(np.linspace(xtopo[indx1], xtopo[indx2], self.Npoint)) self.ysurface = list(np.linspace(ytopo[indy1], ytopo[indy2], self.Npoint)) for ii in range(len(self.xsurface)): distance.append( ((self.xsurface[ii] - self.xsurface[0]) ** 2 + (self.ysurface[ii] - self.ysurface[0]) ** 2) ** 0.5) grid_Vx = griddata((x, y), Vx, (X, Y), method=method) grid_Vy = griddata((x, y), Vy, (X, Y), method=method) grid_fluctuation = griddata((x, y), Vf, (X, Y), method=method) X_temp = profile_line(x1, (indy1, indx1), (indy2, indx2)) Y_temp = profile_line(y1, (indy1, indx1), (indy2, indx2)) Z1_temp = profile_line(grid_Vx, (indy1, indx1), (indy2, indx2)) Z2_temp = profile_line(grid_Vy, (indy1, indx1), (indy2, indx2)) ZFluctuation = profile_line(grid_fluctuation, (indy1, indx1), (indy2, indx2)) if len(uncertainty): grid_uncertainty = griddata((x, y), uncertainty, (X, Y), method=method) ZUncertainty = profile_line(grid_uncertainty, (indy1, indx1), (indy2, indx2)) d_temp = np.linspace(0, np.max(distance), len(Z1_temp)) # test bathymetrie importée if len(self.X) == 0: self.X = list(np.interp(distance, d_temp, X_temp)) self.Y = list(np.interp(distance, d_temp, Y_temp)) self.Z = list(0 * np.interp(distance, d_temp, X_temp)) self.Vsurface[0] = list(np.interp(distance, d_temp, Z1_temp)) self.Vsurface[1] = list(np.interp(distance, d_temp, Z2_temp)) self.Vsurface[2] = list( (np.interp(distance, d_temp, Z1_temp) ** 2 + np.interp(distance, d_temp, Z2_temp) ** 2) ** 0.5) self.Vsurface[3] = list(np.interp(distance, d_temp, ZFluctuation)) if len(uncertainty): self.Vsurface[4] = list(np.interp(distance, d_temp, ZUncertainty)) else: self.Vsurface[4] = None
[docs] def addBankPoint(self): """ Add point corresponding to interface air-water """ for ii in range(len(self.X) - 1): if ((self.Z[ii + 1] < self.WS) and (self.Z[ii] > self.WS)) or ( (self.Z[ii + 1] > self.WS) and (self.Z[ii] < self.WS)): if self.Z[ii + 1] > self.Z[ii]: distance_temp = np.interp(self.WS, (self.Z[ii], self.Z[ii + 1]), (self.distance[ii], self.distance[ii + 1])) else: distance_temp = np.interp(self.WS, (self.Z[ii + 1], self.Z[ii]), (self.distance[ii], self.distance[ii + 1])) X_temp = np.interp(distance_temp, (self.distance[ii], self.distance[ii + 1]), (self.X[ii], self.X[ii + 1])) Y_temp = np.interp(distance_temp, (self.distance[ii], self.distance[ii + 1]), (self.Y[ii], self.Y[ii + 1])) self.Z.append(self.WS) self.X.append(X_temp) self.Y.append(Y_temp) self.distance.append(distance_temp) self.WSextent.append(distance_temp) self.X = (list(zip(*sorted(zip(self.distance, self.X))))[1]) self.Y = (list(zip(*sorted(zip(self.distance, self.Y))))[1]) self.Z = (list(zip(*sorted(zip(self.distance, self.Z))))[1]) self.distance.sort()
[docs] def distanceBord(self): """ Compute distance along the cross-section :return: distance from the starting point :rtype: list """ self.distance = [] self.distanceSurface = [] if len(self.X) == len(self.Y) and len(self.xsurface) == len(self.ysurface): for ii in range(len(self.X)): self.distance.append(((self.X[ii] - self.X[0]) ** 2 + (self.Y[ii] - self.Y[0]) ** 2) ** 0.5) for ii in range(len(self.xsurface)): self.distanceSurface.append( ((self.xsurface[ii] - self.xsurface[0]) ** 2 + (self.ysurface[ii] - self.ysurface[0]) ** 2) ** 0.5) else: print('inconsistent data')
[docs] def computeHydraulicGeometry(self): """ Compute wetted perimeter, wetted area and hydrualic radius for each subsection between 2 successive points of the profile """ self.waterdepth = [0 for _ in range(len(self.Z))] self.Ah = [0 for _ in range(len(self.Z))] self.Pm = [0 for _ in range(len(self.Z))] self.Rh = [0 for _ in range(len(self.Z))] for ii in range(len(self.Z) - 1): self.waterdepth[ii] = self.WS - self.Z[ii] self.Ah[ii] = ((self.WS - self.Z[ii + 1]) + (self.WS - self.Z[ii])) / 2 * ( self.distance[ii + 1] - self.distance[ii]) if self.Ah[ii] < 0: self.Ah[ii] = 0 if np.isnan(self.Ah[ii]): self.Ah[ii] = 0 if self.waterdepth[ii] < 0: self.waterdepth[ii] = 0 self.Pm[ii] = ( ((self.Z[ii + 1] - self.Z[ii]) ** 2 + (self.distance[ii + 1] - self.distance[ii]) ** 2) ** 0.5) self.Rh[ii] = self.Ah[ii] / self.Pm[ii] self.Rh[-1] = self.Rh[-2] self.Ah[-1] = self.Ah[-2]
[docs] def interpVelocityOnGroundPoints(self): """ Interpolate velocity of DEM data to compute discharge """ self.Vsurface[0] = list(np.interp(self.distance, self.distanceSurface, self.Vsurface[0])) self.Vsurface[1] = list(np.interp(self.distance, self.distanceSurface, self.Vsurface[1])) self.Vsurface[2] = list(np.interp(self.distance, self.distanceSurface, self.Vsurface[2]))
[docs] def dischargeExperimental(self): """ Compute discharge with velocity measurements and bathymetry extracted from given DEM """ self.Vexp = [0 for _ in range(len(self.Vsurface[0]))] self.qexp = [0 for _ in range(len(self.Vsurface[0]))] if self.Xstart[1] == self.Xend[1]: alpha = np.pi / 2 else: alpha = np.arctan((self.Xstart[0] - self.Xend[0]) / (self.Xstart[1] - self.Xend[1])) for ii in range(len(self.Vsurface[0])): self.Vexp[ii] = ((self.Vsurface[0][ii] ** 2 + self.Vsurface[1][ii] ** 2) ** 0.5) if self.Vsurface[0][ii] == 0: beta = self.Vsurface[1][ii] / abs(self.Vsurface[1][ii]) * np.pi / 2 else: beta = np.arctan( self.Vsurface[1][ii] / self.Vsurface[0][ii]) # angle du vecteur vitesse par rapport à Ox self.qexp[ii] = self.Vexp[ii] * np.cos(beta - alpha) * self.Ah[ii] Qexp = np.sum(self.qexp) * self.CoeffR self.Qexp = Qexp
[docs] def findXYH(self, pointH): """ Find points on the profile nearer than given water depth measurements. """ self.indH = [0 for _ in range(len(pointH))] self.Hexp = [0 for _ in range(len(pointH))] for i in range((len(pointH))): distance = [] for ii in range(len(self.X)): distance.append(((self.X[ii] - pointH[i][0]) ** 2 + (self.Y[ii] - pointH[i][1]) ** 2) ** 0.5) self.indH[i] = np.where(distance == np.min(distance)) for i in range(len(pointH)): self.Hexp[i] = pointH[i][2]
[docs] def findS(self): """ Compute friction slope from given water depths """ self.S = [0 for _ in range(len(self.indH))] for i in range((len(self.indH))): ind = int(self.indH[i][0]) self.S[i] = (self.Vsurface[2][ind] * self.CoeffR / (self.K * self.Hexp[i] ** (2 / 3))) ** 2 self.Smean = np.nanmean(self.S)
[docs] def BathymetryFromVelocityExp(self): """ Compute equivalent bathymetry assuming a constant friction slope on the section """ self.waterdepth = [0 for _ in range(len(self.X))] self.Z = [0 for _ in range(len(self.X))] for ii in range(len(self.X)): if np.isnan(self.Vsurface[2][ii]): self.Vsurface[2][ii] = 0 self.waterdepth[ii] = (self.Vsurface[2][ii] ** self.CoeffR / self.Smean ** 0.5 / self.K) ** (3 / 2) self.Z[ii] = self.WS - self.waterdepth[ii]
[docs] def dischargeVelocityFromSlope(self, S): """ Compute discharge assuming a constant friction slope on the section """ self.V = [0 for _ in range(len(self.Rh))] self.q = [0 for _ in range(len(self.Rh))] for ii in range(len(self.Rh)): self.V[ii] = self.K * self.Rh[ii] ** (2 / 3) * S ** 0.5 self.q[ii] = self.V[ii] * self.Ah[ii] Q = np.sum(self.q) self.Q = Q