######################################################################
# O o o o-o o--o o-o o o o--o o-o o--o #
# / \ |\ | | \ | | o o |\ /| | | \ | #
# o---o| \ | | O O-Oo | | | O | O-o | O O-o #
# | || \| | / | \ o o | | | | / | #
# o oo o o-o o o o-o o o o--o o-o o--o #
######################################################################
#
# ANDROMEDE
# Copyright (C) 2023 Toulouse INP
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
# See the GNU General Public License for more details :
# <http://www.gnu.org/licenses/>.
#
######################################################################
import numpy as np
from scipy.interpolate import griddata
from skimage.measure import profile_line
[docs]
class CrossSection:
""" This class deals with Cross plotting and calculation
"""
def __init__(self, Xstart, Xend, WS, K, Npoint, CoeffR):
"""
Initialize cross-section
:param Xstart: starting point of the profile ([X,Y])
:param Xend: ending point of the profile ([X,Y])
:param WS: water surface level
:param K: strickler coefficient
:param CoeffR: ratio between averaged and maximum velocity
"""
# point extreme du profil
self.Xstart = Xstart
self.Xend = Xend
self.Npoint = Npoint
self.CoeffR = CoeffR
# profil topo mesuré
self.X = []
self.Y = []
self.Z = []
self.V = []
self.q = []
self.Q = []
self.Vexp = []
self.qexp = []
self.Qexp = []
# profil de vitesses ensurface
self.xsurface = []
self.ysurface = []
self.zsurface = []
self.Vsurface = [[], [], [], [], []] # vitesse suivant x et y
self.distanceSurface = []
self.indH = []
self.Hexp = []
self.K = K
self.S = []
self.Smean = []
self.WS = WS
self.Rh = []
self.Ah = []
self.Pm = []
self.waterdepth = []
self.WSextent = []
self.distance = []
[docs]
def ProjectionWithMNTXYZ(self, MNT, origin):
"""
Provide the bathymetry along the cross-section if topographic data are in a XYZ format
:param MNT: Digital Elevation Model format (XYZ, 3 columns)
:param origin: XY coordinates
:return: position X and Y for N points between the starting and ending point.
:rtype: list
:return: bathymetry for point in cross-section
:rtype: list
"""
x = MNT[:, 0] - origin[0]
y = MNT[:, 1] - origin[1]
z = MNT[:, 2]
distance = []
x_min, x_max = np.min((self.Xstart[0], self.Xend[0])), np.max((self.Xstart[0], self.Xend[0]))
y_min, y_max = np.min((self.Xstart[1], self.Xend[1])), np.max((self.Xstart[1], self.Xend[1]))
xtopo = np.linspace(x_min, x_max, self.Npoint)
ytopo = np.linspace(y_min, y_max, self.Npoint)
indx1 = np.min(np.where(xtopo == self.Xstart[0]))
indx2 = np.min(np.where(xtopo == self.Xend[0]))
indy1 = np.min(np.where(ytopo == self.Xstart[1]))
indy2 = np.min(np.where(ytopo == self.Xend[1]))
self.X = list(np.linspace(xtopo[indx1], xtopo[indx2], self.Npoint))
self.Y = list(np.linspace(ytopo[indy1], ytopo[indy2], self.Npoint))
for ii in range(len(self.X)):
distance.append(((self.X[ii] - self.X[0]) ** 2 + (self.Y[ii] - self.Y[0]) ** 2) ** 0.5)
x1, y1 = np.meshgrid(xtopo, ytopo)
grid_MNT = griddata((x, y), z, (x1, y1), method='linear')
Z_temp = profile_line(grid_MNT, (indy1, indx1), (indy2, indx2))
d_temp = np.linspace(0, np.max(distance), len(Z_temp))
self.Z = list(np.interp(distance, d_temp, Z_temp))
[docs]
def ProjectionVelocity(self, X, Y, velocity_field):
"""
Provide the velocity along the cross-section from velocity stored in a andromede result grid
:param X: X position of each cross-section point (2D array)
:param Y: Y position of each cross-section point (2D array)
:param velocity_field: andromede result grid (X,Y,Vx,VY,V,...)
:return: Surface velocity for point in cross-section X,Y and associated dat (fluctuations,..)
:rtype: list
"""
# suppression Nan Value
x = X.flatten()
y = Y.flatten()
Vx = velocity_field[0].flatten()
Vy = velocity_field[1].flatten()
Vf = velocity_field[3].flatten()
uncertainty = velocity_field[6].flatten()
indnan = np.bitwise_not(np.isnan(Vx))
x = x[indnan]
y = y[indnan]
Vx = Vx[indnan]
Vy = Vy[indnan]
Vf = Vf[indnan]
if len(uncertainty):
uncertainty = uncertainty[indnan]
distance = []
method = 'linear'
xtopo = X[0, :]
ytopo = Y[:, 0]
x1, y1 = np.meshgrid(xtopo, ytopo)
indx1 = np.argmin(abs(xtopo - self.Xstart[0]))
indx2 = np.argmin(abs(xtopo - self.Xend[0]))
indy1 = np.argmin(abs(ytopo - self.Xstart[1]))
indy2 = np.argmin(abs(ytopo - self.Xend[1]))
self.xsurface = list(np.linspace(xtopo[indx1], xtopo[indx2], self.Npoint))
self.ysurface = list(np.linspace(ytopo[indy1], ytopo[indy2], self.Npoint))
for ii in range(len(self.xsurface)):
distance.append(
((self.xsurface[ii] - self.xsurface[0]) ** 2 + (self.ysurface[ii] - self.ysurface[0]) ** 2) ** 0.5)
grid_Vx = griddata((x, y), Vx, (X, Y), method=method)
grid_Vy = griddata((x, y), Vy, (X, Y), method=method)
grid_fluctuation = griddata((x, y), Vf, (X, Y), method=method)
X_temp = profile_line(x1, (indy1, indx1), (indy2, indx2))
Y_temp = profile_line(y1, (indy1, indx1), (indy2, indx2))
Z1_temp = profile_line(grid_Vx, (indy1, indx1), (indy2, indx2))
Z2_temp = profile_line(grid_Vy, (indy1, indx1), (indy2, indx2))
ZFluctuation = profile_line(grid_fluctuation, (indy1, indx1), (indy2, indx2))
if len(uncertainty):
grid_uncertainty = griddata((x, y), uncertainty, (X, Y), method=method)
ZUncertainty = profile_line(grid_uncertainty, (indy1, indx1), (indy2, indx2))
d_temp = np.linspace(0, np.max(distance), len(Z1_temp))
# test bathymetrie importée
if len(self.X) == 0:
self.X = list(np.interp(distance, d_temp, X_temp))
self.Y = list(np.interp(distance, d_temp, Y_temp))
self.Z = list(0 * np.interp(distance, d_temp, X_temp))
self.Vsurface[0] = list(np.interp(distance, d_temp, Z1_temp))
self.Vsurface[1] = list(np.interp(distance, d_temp, Z2_temp))
self.Vsurface[2] = list(
(np.interp(distance, d_temp, Z1_temp) ** 2 + np.interp(distance, d_temp, Z2_temp) ** 2) ** 0.5)
self.Vsurface[3] = list(np.interp(distance, d_temp, ZFluctuation))
if len(uncertainty):
self.Vsurface[4] = list(np.interp(distance, d_temp, ZUncertainty))
else:
self.Vsurface[4] = None
[docs]
def addBankPoint(self):
"""
Add point corresponding to interface air-water
"""
for ii in range(len(self.X) - 1):
if ((self.Z[ii + 1] < self.WS) and (self.Z[ii] > self.WS)) or (
(self.Z[ii + 1] > self.WS) and (self.Z[ii] < self.WS)):
if self.Z[ii + 1] > self.Z[ii]:
distance_temp = np.interp(self.WS, (self.Z[ii], self.Z[ii + 1]),
(self.distance[ii], self.distance[ii + 1]))
else:
distance_temp = np.interp(self.WS, (self.Z[ii + 1], self.Z[ii]),
(self.distance[ii], self.distance[ii + 1]))
X_temp = np.interp(distance_temp, (self.distance[ii], self.distance[ii + 1]),
(self.X[ii], self.X[ii + 1]))
Y_temp = np.interp(distance_temp, (self.distance[ii], self.distance[ii + 1]),
(self.Y[ii], self.Y[ii + 1]))
self.Z.append(self.WS)
self.X.append(X_temp)
self.Y.append(Y_temp)
self.distance.append(distance_temp)
self.WSextent.append(distance_temp)
self.X = (list(zip(*sorted(zip(self.distance, self.X))))[1])
self.Y = (list(zip(*sorted(zip(self.distance, self.Y))))[1])
self.Z = (list(zip(*sorted(zip(self.distance, self.Z))))[1])
self.distance.sort()
[docs]
def distanceBord(self):
"""
Compute distance along the cross-section
:return: distance from the starting point
:rtype: list
"""
self.distance = []
self.distanceSurface = []
if len(self.X) == len(self.Y) and len(self.xsurface) == len(self.ysurface):
for ii in range(len(self.X)):
self.distance.append(((self.X[ii] - self.X[0]) ** 2 + (self.Y[ii] - self.Y[0]) ** 2) ** 0.5)
for ii in range(len(self.xsurface)):
self.distanceSurface.append(
((self.xsurface[ii] - self.xsurface[0]) ** 2 + (self.ysurface[ii] - self.ysurface[0]) ** 2) ** 0.5)
else:
print('inconsistent data')
[docs]
def computeHydraulicGeometry(self):
"""
Compute wetted perimeter, wetted area and hydrualic radius for each subsection between 2 successive points of
the profile
"""
self.waterdepth = [0 for _ in range(len(self.Z))]
self.Ah = [0 for _ in range(len(self.Z))]
self.Pm = [0 for _ in range(len(self.Z))]
self.Rh = [0 for _ in range(len(self.Z))]
for ii in range(len(self.Z) - 1):
self.waterdepth[ii] = self.WS - self.Z[ii]
self.Ah[ii] = ((self.WS - self.Z[ii + 1]) + (self.WS - self.Z[ii])) / 2 * (
self.distance[ii + 1] - self.distance[ii])
if self.Ah[ii] < 0:
self.Ah[ii] = 0
if np.isnan(self.Ah[ii]):
self.Ah[ii] = 0
if self.waterdepth[ii] < 0:
self.waterdepth[ii] = 0
self.Pm[ii] = (
((self.Z[ii + 1] - self.Z[ii]) ** 2 + (self.distance[ii + 1] - self.distance[ii]) ** 2) ** 0.5)
self.Rh[ii] = self.Ah[ii] / self.Pm[ii]
self.Rh[-1] = self.Rh[-2]
self.Ah[-1] = self.Ah[-2]
[docs]
def interpVelocityOnGroundPoints(self):
"""
Interpolate velocity of DEM data to compute discharge
"""
self.Vsurface[0] = list(np.interp(self.distance, self.distanceSurface, self.Vsurface[0]))
self.Vsurface[1] = list(np.interp(self.distance, self.distanceSurface, self.Vsurface[1]))
self.Vsurface[2] = list(np.interp(self.distance, self.distanceSurface, self.Vsurface[2]))
[docs]
def dischargeExperimental(self):
"""
Compute discharge with velocity measurements and bathymetry extracted from given DEM
"""
self.Vexp = [0 for _ in range(len(self.Vsurface[0]))]
self.qexp = [0 for _ in range(len(self.Vsurface[0]))]
if self.Xstart[1] == self.Xend[1]:
alpha = np.pi / 2
else:
alpha = np.arctan((self.Xstart[0] - self.Xend[0]) / (self.Xstart[1] - self.Xend[1]))
for ii in range(len(self.Vsurface[0])):
self.Vexp[ii] = ((self.Vsurface[0][ii] ** 2 + self.Vsurface[1][ii] ** 2) ** 0.5)
if self.Vsurface[0][ii] == 0:
beta = self.Vsurface[1][ii] / abs(self.Vsurface[1][ii]) * np.pi / 2
else:
beta = np.arctan(
self.Vsurface[1][ii] / self.Vsurface[0][ii]) # angle du vecteur vitesse par rapport à Ox
self.qexp[ii] = self.Vexp[ii] * np.cos(beta - alpha) * self.Ah[ii]
Qexp = np.sum(self.qexp) * self.CoeffR
self.Qexp = Qexp
[docs]
def findXYH(self, pointH):
"""
Find points on the profile nearer than given water depth measurements.
"""
self.indH = [0 for _ in range(len(pointH))]
self.Hexp = [0 for _ in range(len(pointH))]
for i in range((len(pointH))):
distance = []
for ii in range(len(self.X)):
distance.append(((self.X[ii] - pointH[i][0]) ** 2 + (self.Y[ii] - pointH[i][1]) ** 2) ** 0.5)
self.indH[i] = np.where(distance == np.min(distance))
for i in range(len(pointH)):
self.Hexp[i] = pointH[i][2]
[docs]
def findS(self):
"""
Compute friction slope from given water depths
"""
self.S = [0 for _ in range(len(self.indH))]
for i in range((len(self.indH))):
ind = int(self.indH[i][0])
self.S[i] = (self.Vsurface[2][ind] * self.CoeffR / (self.K * self.Hexp[i] ** (2 / 3))) ** 2
self.Smean = np.nanmean(self.S)
[docs]
def BathymetryFromVelocityExp(self):
"""
Compute equivalent bathymetry assuming a constant friction slope on the section
"""
self.waterdepth = [0 for _ in range(len(self.X))]
self.Z = [0 for _ in range(len(self.X))]
for ii in range(len(self.X)):
if np.isnan(self.Vsurface[2][ii]):
self.Vsurface[2][ii] = 0
self.waterdepth[ii] = (self.Vsurface[2][ii] ** self.CoeffR / self.Smean ** 0.5 / self.K) ** (3 / 2)
self.Z[ii] = self.WS - self.waterdepth[ii]
[docs]
def dischargeVelocityFromSlope(self, S):
"""
Compute discharge assuming a constant friction slope on the section
"""
self.V = [0 for _ in range(len(self.Rh))]
self.q = [0 for _ in range(len(self.Rh))]
for ii in range(len(self.Rh)):
self.V[ii] = self.K * self.Rh[ii] ** (2 / 3) * S ** 0.5
self.q[ii] = self.V[ii] * self.Ah[ii]
Q = np.sum(self.q)
self.Q = Q