######################################################################
# O o o o-o o--o o-o o o o--o o-o o--o #
# / \ |\ | | \ | | o o |\ /| | | \ | #
# o---o| \ | | O O-Oo | | | O | O-o | O O-o #
# | || \| | / | \ o o | | | | / | #
# o oo o o-o o o o-o o o o--o o-o o--o #
######################################################################
#
# ANDROMEDE
# Copyright (C) 2023 Toulouse INP
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
# See the GNU General Public License for more details :
# <http://www.gnu.org/licenses/>.
#
######################################################################
""" This module manages motion models. It allows the choice of the method to determine the more probalistic
displacement for each particule detected by the ObjectDetection module. For each particule, a horizontal and vertical
is associated. For Nearest and Tractrac methods an ID trajectory is also provided. For dense optical flow method the
detection is not used even if it should be done to intialize the computation.
"""
from numba import njit
import numpy as np
import time
import scipy.spatial as scp
from PySide6.QtWidgets import QProgressDialog
from PySide6.QtGui import Qt
import cv2
from ThirdParty import KalmanFilter
from ThirdParty import openpiv_pyprocess as pyprocess
# detect move using objects and store inside results.obj_moves variable which contains information for each
# displacement 'id_move' detected at the frame 'id_frame'
# with results.obj_moves[id_frame][id_move] = [id_traj, x, y, Ux, Uy]
[docs]
def select_method(params):
"""Function for motion model selection the availiable methode are:nearest/nearest + Kalman Filter/Tractrac
method/Opyflow method/Dense Optical Flow (Fanerback)
:param dict: dictionnary parameters
:rtype: int
"""
id_method = 0
if params.dict['MM']['method'] == 'Nearest':
id_method = 1
if params.dict['MM']['method'] == 'Nearest prediction':
id_method = 2
if params.dict['MM']['method'] == 'Tractrac':
id_method = 3
if params.dict['MM']['method'] == 'KLT':
id_method = 4
if params.dict['MM']['method'] == 'Dense optical flow':
id_method = 5
if params.dict['MM']['method'] == 'PIV':
id_method = 6
if params.dict['MM']['method'] == 'Manual':
id_method = 7
return id_method
[docs]
def process_motion_model_all(video, results, params, id_method):
"""Processing selected motion model
:param video: video read
:param dict: dictionnary parameters
:param id_method: motion model method
:return: list of movement detected at each time step
[ID trajectory, X position, Y position, Velocity along x-axis, Velocity along y axis]
:rtype: list
"""
if id_method:
results.obj_moves = []
results.trajs = []
elapsed_time = time.time()
video.outputConsole.appendPlainText('Motion model using ' + str(params.dict['MM']['method']) + '...')
progress = QProgressDialog('Motion Model', 'Stop', 0, 100)
progress.setMinimumDuration(0)
progress.setWindowModality(Qt.WindowModal)
progress.setWindowTitle('Motion Model')
num_images = len(video.imgs)
obj_move = []
U_KF = [[], [], [], [], 0] # [objets_points, objets_id, objets_KF, object_history, id_objet]
if id_method == 3:
U_tractrac = initTractrac(results.objs[0], results.objs[1], id_method)
if id_method == 7:
results.objs = []
move_manual = results.obj_manual
index = [int(move_manual[i][0]) for i in range(len(move_manual))]
for id_image in range(1, num_images):
progress.setLabelText('Processing frame ' + str(id_image + video.prop['window_frame'][0]))
if id_method == 1:
obj_move = nearest(results.objs[id_image], results.objs[id_image - 1], id_image, params)
elif id_method == 2:
obj_move, U_KF = nearest_prediction(U_KF, results.objs[id_image], params)
elif id_method == 3:
U_tractrac[2] = np.transpose(np.array([results.objs[id_image][:, 1], results.objs[id_image][:, 2]]))
obj_move, U_tractrac = tractrac(U_tractrac, params)
elif id_method == 4:
obj_move = opyflow_andromede(video.imgs[id_image], video.imgs[id_image - 1],
results.objs[id_image][:, :3],
results.objs[id_image - 1][:, :3], params)
elif id_method == 5:
obj_move = farneback_andromede(video.imgs[id_image], video.imgs[id_image - 1], params)
elif id_method == 6:
obj_move = PIV(video.imgs[id_image], video.imgs[id_image - 1], params)
elif id_method == 7:
ind_frame = list(np.where(np.array(index) == id_image))
obj_move = []
id_traj = []
x = []
y = []
z = []
intensity = []
for ind in ind_frame:
if len(ind):
obj_move.append(move_manual[ind[0]][1:])
id_traj.append(move_manual[ind[0]][1])
x.append(move_manual[ind[0]][2])
y.append(move_manual[ind[0]][3])
z.append(0)
intensity.append(0)
objs = np.transpose(np.stack((id_traj, x, y, z, intensity)))
obj_move = np.array(obj_move)
results.objs.append(objs)
results.obj_moves.append(obj_move)
progress.setValue(int(100 * (id_image + 1) / num_images))
if progress.wasCanceled():
break
elapsed_time = time.time() - elapsed_time
video.outputConsole.appendPlainText(str(num_images) + ' images processed in ' +
str(round(elapsed_time, 2)) + ' seconds\n')
# set id_obj to 0 for no trajectory methods
if id_method == 5 or id_method == 6:
for id_image in range(len(results.objs)):
results.objs[id_image][:, 0] = 0
video.update_process_status(4)
@njit
def norm2(current_obj, prev_obj):
return (current_obj[1] - prev_obj[1]) ** 2 + (current_obj[2] - prev_obj[2]) ** 2
@njit
def norm3(current_obj, prev_obj, variation_size):
return (current_obj[1] - prev_obj[1]) ** 2 + (current_obj[2] - prev_obj[2]) ** 2 + variation_size
@njit
def norm4(current_obj, prev_obj, variation_size, variation_int):
return (current_obj[1] - prev_obj[1]) ** 2 + (current_obj[2] - prev_obj[2]) ** 2 + variation_size + variation_int
@njit
def relative_variation(current_obj, prev_obj):
return abs(current_obj[3] - prev_obj[3]) / (prev_obj[3] + 1e-6), \
abs(current_obj[4] - prev_obj[4]) / (prev_obj[4] + 1e-6)
[docs]
def distance_objs(current_obj, prev_objs, params, size_and_intensity):
"""Compute distance between selected points (current frame) and all others possibles previous point (previous
frame). This method is used for nearest method (simple+Kalman filter). The parameter of likelihood allows
avoiding the computation for non-realistic displacement. The distance depends on the chosen type (euclidian,
euclidian + root square of area, euclidian + root square of area + normalised intensity)
:param current_obj: x and y position of the input point :param prev_objs: x and y position of all points
detected in the previous frame :return: list of movement detected at each time step [ID trajectory, X position,
Y position, Velocity along x-axis, Velocity along y axis] :rtype: list
"""
n_objs = len(prev_objs)
likelihood = params.dict['MM']['likelihood']
dist = np.zeros(n_objs)
if params.dict['MM']['type_distance'] == 'distance':
if size_and_intensity:
for id_obj in range(0, n_objs):
variation_size, variation_int = relative_variation(current_obj, prev_objs[id_obj])
if variation_size < likelihood and variation_int < likelihood:
dist[id_obj] = norm2(current_obj, prev_objs[id_obj])
else:
dist[id_obj] = 10000
else:
for id_obj in range(0, n_objs):
dist[id_obj] = norm2(current_obj, prev_objs[id_obj])
elif params.dict['MM']['type_distance'] == 'distance+area':
for id_obj in range(0, n_objs):
variation_size, variation_int = relative_variation(current_obj, prev_objs[id_obj])
if variation_size < likelihood and variation_int < likelihood:
dist[id_obj] = norm3(current_obj, prev_objs[id_obj], variation_size)
else:
dist[id_obj] = 10000
elif params.dict['MM']['type_distance'] == 'distance+area+intensity':
for id_obj in range(0, n_objs):
variation_size, variation_int = relative_variation(current_obj, prev_objs[id_obj])
if variation_size < likelihood and variation_int < likelihood:
dist[id_obj] = norm4(current_obj, prev_objs[id_obj], variation_size, variation_int)
else:
dist[id_obj] = 10000
return dist
[docs]
def nearest(current_objs, prev_objs, id_image, params):
"""find the displacement by computing the minimum distance between previous and current frame.
The distance can take into account size and intensity particles.
:param current_objs: x and y positions detected on the current frame
:param prev_objs: x and y positions detected on the previous frame
:param id_image: frame identifiant
:param distance_max: maximal possible distance for computation
:return: list of movement detected at each time step
[ID trajectory, X position, Y position, Velocity along x-axis, Velocity along y axis]
:rtype: list
"""
maximum_distance = params.dict['MM']['distance_max'] ** 2
obj_move = []
id_traj = []
nobjs=-1
size_intensity = False
if len(current_objs) > 0 and len(prev_objs):
if params.dict['MM']['type_distance'] == 'distance':
prev_objs = np.array(prev_objs)
curr_objs = np.array(current_objs)
X = []
V = []
id_traj = []
c0 = np.transpose(np.array([prev_objs[:, 1], prev_objs[:, 2]]))
c1 = np.transpose(np.array([curr_objs[:, 1], curr_objs[:, 2]]))
# Initialize Point Tree structure
t0 = None
if len(c0) > 0:
t0 = scp.cKDTree(c0)
distance, neighbours = t0.query(c1)
neighbours = np.array(neighbours)
id_count_traj = np.max(prev_objs[neighbours, 0]) + 1
# Add preliminary filters
for ind_point in range(len(curr_objs)):
x, y = prev_objs[neighbours[ind_point], 1], prev_objs[neighbours[ind_point], 2]
vx, vy = curr_objs[ind_point, 1] - prev_objs[neighbours[ind_point], 1], curr_objs[ind_point, 2] - \
prev_objs[neighbours[ind_point], 2]
norm = (vx ** 2 + vy ** 2) ** 0.5
if norm < params.dict['MM']['distance_max']:
X.append([x + vx / 2, y + vy / 2]) # position of the measured velocity at half of the displacement
V.append([vx, vy])
if prev_objs[neighbours[ind_point], 0] > -1:
id_traj.append(prev_objs[neighbours[ind_point], 0])
current_objs[ind_point, 0] = prev_objs[neighbours[ind_point], 0]
else:
id_count_traj += 1
id_traj.append(id_count_traj)
current_objs[ind_point, 0] = id_count_traj
else:
id_count_traj += 1
current_objs[ind_point, 0] = id_count_traj
if not X:
X = np.empty((0, 2))
V = np.empty((0, 2))
X = np.array(X)
V = np.array(V)
if len(X):
for id_obj in range(len(X[:, 0])):
obj_move.append((id_traj[id_obj], X[id_obj, 0], X[id_obj, 1], V[id_obj, 0], V[id_obj, 1]))
else:
if max(current_objs[:, 3]) > 0:
size_intensity = True
for id_obj in range(len(current_objs)):
distances = distance_objs(current_objs[id_obj, :], prev_objs, params, size_intensity)
id_prev = np.where(distances == distances.min())
if len(id_prev[0]) == 1:
if distances[id_prev] < maximum_distance:
dx = float(current_objs[id_obj, 1] - prev_objs[id_prev, 1])
dy = float(current_objs[id_obj, 2] - prev_objs[id_prev, 2])
x = float(current_objs[id_obj, 1] + prev_objs[id_prev, 1]) / 2
y = float(current_objs[id_obj, 2] + prev_objs[id_prev, 2]) / 2
if prev_objs[id_prev, 0] > -1:
current_objs[id_obj, 0] = prev_objs[id_prev, 0]
else:
nobjs += 1
prev_objs[id_prev, 0] = nobjs
current_objs[id_obj, 0] = prev_objs[id_prev, 0]
obj_move.append((current_objs[id_obj, 0], x, y, dx, dy))
return np.array(obj_move)
[docs]
def initTractrac(current_objs, prev_objs, id_method):
"""Initialize variable ofr tractrac method.
The variable contains the positions of detected particles for the 2 first frames.
:param current_objs: x and y positions detected on the current frame
:param prev_objs: x and y positions detected on the previous frame
:param id_method: type of the chosen method
:return: list of variables for the movement of the particles in the 2 first frames.
:rtype: list
"""
if id_method != 3:
return []
c0 = np.transpose(np.array([prev_objs[:, 1], prev_objs[:, 2]]))
c1 = np.transpose(np.array([current_objs[:, 1], current_objs[:, 2]]))
c2 = []
id0 = np.zeros(c0.shape[0]) + np.nan
# Initialize Point Tree structure
t0, t1 = None, None
if len(c0) > 0:
t0 = scp.cKDTree(c0)
if len(c1) > 0:
t1 = scp.cKDTree(c1)
ns01, ns10, i1001 = [np.array([]) for _ in range(3)]
if (len(c0) > 0) & (len(c1) > 0):
ns01 = t0.query(c1)
ns10 = t1.query(c0)
# First check of
i1001 = ns10[1][ns01[1]] == np.arange(c1.shape[0])
err_u_filtered = np.zeros(c1.shape[0])
id_traj = 0
# Initialize lists
err_U = np.array([])
err_U_th = 2.
x_mot, u_mot, x_mot_tmp, u_mot_tmp = [np.array([]).reshape((-1, 2)) for _ in range(4)]
err_mot, err_mot_temp = [np.array([]) for _ in range(2)]
id_good = []
x1 = np.array([]).reshape(0, 2)
um = np.array([]).reshape(0, 2)
U_tractrac = [c0, c1, c2, t0, t1, ns01, i1001, id0, err_u_filtered, id_traj, err_U, x_mot, u_mot, x_mot_tmp,
u_mot_tmp, err_mot, err_mot_temp, err_U_th, id_good, x1, um]
return U_tractrac
def Propagate_MotionModel_KDTree(C, Xm, Um, Em, Xm_old, Um_old, Em_old, params):
# KDTree
X_ref = np.vstack((Xm, Xm_old))
U_ref = np.vstack((Um, Um_old))
E_ref = np.hstack((Em, Em_old))
# Make Tree
tX_ref = scp.cKDTree(X_ref)
# Get firsts N neighbours of query points
nn = np.minimum(np.minimum(np.int8(params.dict['MM']['motion_av']), C.shape[0]), X_ref.shape[0])
# print nn
distance, neighbours = tX_ref.query(C, k=nn)
# Make matrices and compute local average
neighbours = np.array(neighbours)
values_U = U_ref[neighbours.flatten(), :].reshape(-1, nn, 2)
values_E = E_ref[neighbours.flatten()].reshape(-1, nn)
# Averaging over neighbours values
# Option 1 : simple average of neighbouring values
# Option 2 : weighted average with weight on previous model
weights = np.ones(X_ref.shape[0])
# W[:Xm.shape[0]]=W[:Xm.shape[0]]*np.maximum(0,-Em)
weights[-Xm_old.shape[0]:] = weights[-Xm_old.shape[0]:] * params.dict['MM']['filter_time'] # *np.maximum(0,-Em_old)
weight_neighbours = weights[neighbours.flatten()].reshape(-1, nn)
U_filtered = np.sum(values_U * weight_neighbours.reshape(weight_neighbours.shape[0],
weight_neighbours.shape[1], -1), axis=1).reshape(-1, 2) / \
np.sum(weight_neighbours, axis=1).reshape(weight_neighbours.shape[0], -1)
E_filtered = np.sum(values_E * weight_neighbours, axis=1) / np.sum(weight_neighbours, axis=1)
# Find nan and replace by default 0 value
id_good = np.isfinite(U_filtered[:, 0])
# Save non nan to weight next iteration
Xm_old = C[id_good, :]
Um_old = U_filtered[id_good, :]
Em_old = E_filtered[id_good]
# Replace nan if no model points where given
if len(U_filtered) > 0:
id_nan = np.isnan(U_filtered[:, 0])
U_filtered[id_nan, :] = 0
E_filtered[np.isnan(E_filtered)] = 2
return U_filtered, E_filtered, Xm_old, Um_old, Em_old
[docs]
def tractrac(U_tractrac, params):
"""Main loop of the tractrac method.
:param U_tractrac: variables for the 2 previous frames (connectivity, displacement, error, etc..)
:param motion: (true/false)selection of the motion method (iteration to determine the most probable displacement)
:param motion_it: number of iteration for motion method
:param filter: (True/False) filter outliers if necessary
:param filter_time: number of time step for temporal filter
:return: list of movement detected for current time step
[ID trajectory, X position, Y position, Velocity along x-axis, Velocity along y axis]
:rtype: list
:return: list of variables for the movement of the particles.
:rtype: list
"""
obj_move = []
dt = 1 # params.dict['IP']['frame_step']
[C0, C1, C2, _, t1, ns01, i1001, ID0, err_U_filtered, id_traj, errU, X_mot, U_mot, X_mot_temp,
U_mot_temp, err_mot, err_mot_temp, errU_th, id_good, X1, um] = U_tractrac
it = 0
while it <= params.dict['MM']['motion_it']:
# PREDICTION from motion model
if (params.dict['MM']['motion'] == 1) & (len(id_good) + X_mot.shape[0] > 1) & (len(C1) > 0):
if len(X1) == 0:
[U_motion, err_U_filtered, X_mot_temp, U_mot_temp, err_mot_temp] = \
Propagate_MotionModel_KDTree(C1, np.array([]).reshape(-1, 2), np.array([]).reshape(-1, 2),
np.array([]), X_mot, U_mot, err_mot, params)
else:
[U_motion, err_U_filtered, X_mot_temp, U_mot_temp, err_mot_temp] = \
Propagate_MotionModel_KDTree(C1, X1[id_good, :], um[id_good, :], errU[id_good], X_mot, U_mot,
err_mot,
params)
else:
U_motion = np.zeros(C1.shape)
err_U_filtered = np.zeros(C1.shape[0])
# Start Iteration step #################################################
# if len(C2)==0:
# Initialize for each loop
# X0,X1,X1motion,X2,um,U_motion,a,t2,ns12,i2112= (np.array([[],[]]).T for i in range(10))
# idC2,errU,ID,ID_good,i_all,um= (np.array([]).T for i in range(6))
# Build new trees
i1221, i2112, ns12 = [[] for _ in range(3)]
t1m, ID, id_nan = [None for _ in range(3)]
if len(C1) > 0:
t1m = scp.cKDTree(C1 + U_motion * dt) # Points C1 moved by motion model # !!Check on original version
if len(C2) > 0:
t2 = scp.cKDTree(C2) # Points C2
else:
t2 = None
if (len(C1) > 0) & (len(C2) > 0):
ns21 = t2.query(C1 + U_motion * dt) # Nearest Neighbour link C1m -> C2
ns12 = t1m.query(C2) # Nearest Neighbour link C2 -> C1m
i1221 = ns12[1][ns21[1]] == np.arange(C1.shape[0]) # Check reversibility of link C2-> C1m [C1m -> C2 ]
i2112 = ns21[1][ns12[1]] == np.arange(C2.shape[0]) # For next iteration
if (len(C0) > 0) & (len(C1) > 0) & (len(C2) > 0):
i_all = i1001 & i1221 # Keep only Unilateral associations on 3 steps 0 -> 1 -> 2
# print np.sum(i_all)/np.float(C1.shape[0])
# Update Trajectories positions
X0 = C0[ns01[1][i_all], :]
X1 = C1[i_all, :]
X2 = C2[ns21[1][i_all], :]
# Build traj ID
ID = ID0[ns01[1][i_all]]
# Velocities
U0 = (X1 - X0) / dt
U1 = (X2 - X1) / dt
um = (U0 + U1) / 2.
# TO CHECK whether params_dict_file['dt'] should appear or not in errU
errU = np.maximum(-10., np.log10(np.amax((np.abs((U_motion[i_all, :] - um) * dt)), 1)))
# Filtering outliers
IS_good = errU - err_U_filtered[i_all] < errU_th
# Evolution of threshold
errU_th = (params.dict['MM']['filter_time'] * errU_th + params.dict['MM']['filter_std'] * np.std(errU)) / (
params.dict['MM']['filter_time'] + 1.)
else:
i_all, ID, errU, IS_good = [[] for _ in range(4)]
X0, X1, X2, um, A = [np.array([]).reshape(-1, 2) for _ in range(5)]
errU_th = 2 # Reinitialize threshold
# Filter Outliers if necessary
if params.dict['MM']['filter'] == 1:
id_good = np.where(IS_good == 1)[0]
elif len(X1) > 0:
id_good = np.arange(0, len(X1[:, 1]))
else:
id_good = []
it += 1
# END Iteration step #################################################
if len(ID) > 0:
id_nan = np.where(np.isnan(ID[id_good]))[0]
if len(id_nan) > 0: # Associate new ID
ID[id_good[id_nan]] = np.arange(0, len(id_nan)) + id_traj
id_traj = id_traj + len(id_nan)
# Keep track of last motion model used
X_mot, U_mot, err_mot = X_mot_temp, U_mot_temp, err_mot_temp
# Save ID for next iteration
if len(C1) > 0:
ID0 = np.zeros(C1.shape[0]) + np.nan
ID0[i_all] = ID
U_tractrac = [C1, C2, C2, t1, t2, ns12, i2112, ID0, err_U_filtered, id_traj, errU, X_mot, U_mot, X_mot_temp,
U_mot_temp, err_mot, err_mot_temp, errU_th, id_good, X1, um]
for id_obj in id_good:
obj_move.append((ID[id_obj], X1[id_obj, 0], X1[id_obj, 1], um[id_obj, 0], um[id_obj, 1]))
return np.array(obj_move), U_tractrac
[docs]
def nearest_prediction(U_KF, current_objs, params):
"""Find the displacement by computing the minimum distance between current frame and the prediction from KF.
The distance can take into account size and intensity particles.
The kalman filter allows predicting position of the particles and store it if the particles has disappeared in the
current frame. The filter is updated at each time step.
:param U_KF: Kalman filter variables (trajectory)
:param currents_objs: x and y positions detected on the current frame
:param step_max_prevision: maximal number of time step for prediction without new detection
:param distance_max: maximal possible distance for computation
:return: list of movement detected for current time step
[ID trajectory, X position, Y position, Velocity along x-axis, Velocity along y axis]
:rtype: list
"""
[objets_points, objets_id, objets_KF, obj_history, id_objet] = U_KF
dt = params.dict['IP']['frame_step']
obj_move = [(0, 0, 0, 0, 0)]
obj_IDs, objs_X, objs_Y, objs_Ux, objs_Uy = [[] for _ in range(5)]
size_intensity = False
if len(current_objs) > 0:
if max(current_objs[:, 3]) > 0:
size_intensity = True
count_vit = 0
# Prediction of all objects + display if object_point exists
for id_obj in range(len(objets_points)):
obj_state = objets_KF[id_obj].predict()
# predicted objects by KF
objets_points[id_obj] = np.array([obj_state[0, 0], obj_state[1, 0], obj_state[4, 0], obj_state[5, 0]])
if obj_state[0, 0] < (params.dict['IP']['ROIxmax'] - params.dict['IP']['ROIxmin']) and obj_state[1, 0] < (
params.dict['IP']['ROIymax'] - params.dict['IP']['ROIymin']): # if estimation remains in the image
objs_Ux.append(obj_state[2, 0]) # add the X-velocity for the corresponding ID
objs_Uy.append(obj_state[3, 0]) # add the Y-velocity for the corresponding ID
objs_X.append(obj_state[0, 0])
objs_Y.append(obj_state[1, 0])
obj_IDs.append(objets_id[id_obj])
count_vit += 1
new_objs = np.ones((len(current_objs)))
if len(objets_points):
maximum_distance = params.dict['MM']['distance_max'] ** 2
for id_obj in range(len(objets_points)):
if len(current_objs) > 0:
if params.dict['MM']['type_distance'] == 'distance':
c1 = np.transpose(np.array([current_objs[:, 1], current_objs[:, 2]]))
id_prev = []
if len(c1) > 0:
t1 = scp.cKDTree(c1)
distance1, neighbours = t1.query(np.array([objets_points[id_obj][0], objets_points[id_obj][1]]))
id_prev = [[neighbours]]
distances1 = distance1 ** 2
else:
distances = distance_objs(np.insert(objets_points[id_obj], 0, -1, axis=0), current_objs, params,
size_intensity)
id_prev = np.where(distances == distances.min())
distances1 = distances[id_prev]
if len(id_prev[0]) == 1:
if distances1 < maximum_distance:
objets_KF[id_obj].update(np.array(current_objs[id_prev[0], 1:5]).T)
obj_history[id_obj] = 0 # test detection at last time step
new_objs[id_prev] = 0
else:
obj_history[id_obj] += 1 # test detection at last time step
else:
obj_history[id_obj] += 1 # test detection at last time step
# Creation of the Kalman filter for new objects
for point_id in range(len(current_objs)):
if new_objs[point_id]:
objets_points.append(current_objs[point_id, 1:5])
objets_KF.append(KalmanFilter.KalmanFilter(dt, [current_objs[point_id, 1], current_objs[point_id, 2]],
[current_objs[point_id, 3], current_objs[point_id, 4]]))
objets_id.append(id_objet)
obj_history.append(0)
id_objet += 1
# Cleaning ...
tab_id = []
for id_point in range(len(objets_points)):
if int(objets_points[id_point][0]) < 0 or \
int(objets_points[id_point][1]) < 0 or \
objets_points[id_point][0] > params.dict['IP']['ROIxmax'] or \
objets_points[id_point][1] > params.dict['IP']['ROIymax'] or \
obj_history[id_point] > params.dict['MM']['step_max_prevision']:
tab_id.append(id_point)
for index in sorted(tab_id, reverse=True):
del objets_points[index]
del objets_KF[index]
del objets_id[index]
del obj_history[index]
U_KF = [objets_points, objets_id, objets_KF, obj_history, id_objet]
for id_obj in range(len(obj_IDs)):
obj_move.append((obj_IDs[id_obj], objs_X[id_obj], objs_Y[id_obj], objs_Ux[id_obj], objs_Uy[id_obj]))
return np.array(obj_move), U_KF
[docs]
def opyflow_andromede(curr_gray, prev_gray, curr_X, prev_X, params):
"""find the displacement by using method in Opyflow. It is based on the Luka Kanade algorithm.
:param curr_gray: current frame in grayscale
:param prev_gray: previous frame in grayscale
:param curr_X: x and y positions detected on the current frame :param prev_X: x and y positions detected on the
previous frame
:param distance_max_opy: maximal possible distance for computation
:param winSize: size of the search window at each pyramid level.
:param maxLevel: 0-based maximal pyramid level number; if set to 0, pyramids are not used (single
level), if set to 1, two levels are used, and so on; if pyramids are passed to input then algorithm will use as
many levels as pyramids have but no more than maxLevel.
:param criteria: parameter, specifying the termination
criteria of the iterative search algorithm (after the specified maximum number of iterations criteria.maxCount or
when the search window moves by less than criteria.epsilon.
:return: list of movement detected for current time step
[ID trajectory, X position, Y position, Velocity along x axis, Velocity along y axis]
:rtype: list
"""
obj_move, X, V, Id_traj = [[] for _ in range(4)]
window_size = (params.dict['MM']['window_size'], params.dict['MM']['window_size'])
# transformation of real coordinate points to pixel in the shifted image
prev_Cc = np.float32(np.zeros((len(prev_X[:, 0]), 1, 2)))
curr_Cc = np.float32(np.zeros((len(curr_X[:, 0]), 1, 2)))
for ii in range(len(prev_X[1:, 0])):
prev_Cc[ii, 0, 0] = np.float32(prev_X[ii, 1])
prev_Cc[ii, 0, 1] = np.float32(prev_X[ii, 2])
for ii in range(len(curr_X[:, 0])):
curr_Cc[ii, 0, 0] = np.float32(curr_X[ii, 1])
curr_Cc[ii, 0, 1] = np.float32(curr_X[ii, 2])
# prev_Cc=(np.array(prev_Cc))
if prev_gray is not None and len(prev_Cc) > 0:
img0, img1 = np.uint8(curr_gray * 255), np.uint8(prev_gray * 255)
p1, st, err = cv2.calcOpticalFlowPyrLK(
img0, img1, curr_Cc, None, winSize=window_size)
p0r, st, err = cv2.calcOpticalFlowPyrLK(
img1, img0, p1, None, winSize=window_size)
d = abs(curr_Cc - p0r).reshape(-1, 2).max(-1)
# wayBackGoodFlag determine the acceptable error when flow is calculated from A to B
good = (d < 1)
X_temp = curr_Cc.reshape(-1, 2)
U_temp = curr_Cc.reshape(-1, 2) - p1.reshape(-1, 2)
id_count = -1
id_count_traj = np.max(prev_X[:, 0]) + 1
c0 = np.transpose(np.array([prev_X[:, 1], prev_X[:, 2]]))
c1 = np.transpose(np.array([curr_X[:, 1], curr_X[:, 2]]))
# Initialize Point Tree structure
t0, t1 = None, None
if len(c0) > 0:
t0 = scp.cKDTree(c0)
distance, neighbours = t0.query(c1)
# Make matrices and compute local average
neighbours = np.array(neighbours)
Id_temp = prev_X[neighbours, 0]
# Add preliminary filters
for [vx, vy], [x, y], good_flag, id_traj, dist1 in zip(U_temp, X_temp, good, Id_temp, distance):
id_count += 1
if not good_flag:
continue
norm = (vx ** 2 + vy ** 2) ** 0.5
if norm > params.dict['MM']['distance_max_opy']:
continue
X.append([x + vx / 2, y + vy / 2]) # position of the measured velocity at half of the displacement
V.append([vx, vy])
if params.dict['MM']['compute_trajectory']:
if dist1 < params.dict['MM']['distance_max_opy']:
if id_traj <= 0:
Id_traj.append(id_count_traj)
curr_X[id_count, 0] = id_count_traj
id_count_traj += 1
else:
Id_traj.append(id_traj)
curr_X[id_count, 0] = id_traj
else:
Id_traj.append(id_count_traj)
curr_X[id_count, 0] = id_count_traj
id_count_traj += 1
else:
Id_traj.append(0)
if not X:
X = np.empty((0, 2))
V = np.empty((0, 2))
X = np.array(X)
V = np.array(V)
um = V
if len(X):
for id_obj in range(len(X[:, 0])):
obj_move.append((Id_traj[id_obj], X[id_obj, 0], X[id_obj, 1], um[id_obj, 0], um[id_obj, 1]))
return np.array(obj_move)
[docs]
def farneback_andromede(curr_gray, prev_gray, params):
"""From farneback method of openCV.
:param curr_gray: current frame in grayscale
:param prev_gray: previous frame in grayscale
:param curr_X: x and y positions detected on the current frame
:param prev_X: x and y positions detected on the previous frame
:param resolutionDenseOF: resolution of the grid for result (could be lower tha pixel size)
:param pyr_scale: parameter, specifying the image scale (<1) to build pyramids for each image; pyr_scale=0.5 means
a classical pyramid, where each next layer is twice smaller than the previous one.levels number of pyramid
layers including the initial image; levels=1 means that no extra layers are created and only the original images
are used.
:param winsize: averaging window size; larger values increase the algorithm robustness to image noise and give more
chances for fast motion detection, but yield more blurred motion field.
:param iterations: number of iterations the algorithm does at each pyramid level.
:param poly_n: size of the pixel neighborhood used to find polynomial expansion in each pixel; larger values mean
that the image will be approximated with smoother surfaces, yielding more robust algorithm and more blurred motion
field, typically poly_n =5 or 7.
:param poly_sigma: standard deviation of the Gaussian that is used to smooth derivatives used as a basis for the
polynomial expansion; for poly_n=5, you can set poly_sigma=1.1, for poly_n=7, a good value would be poly_sigma=1.5.
:return: list of movement detected for current time step [ID trajectory, X position, Y position, Velocity along
x axis, Velocity along y axis]
:rtype: list
"""
obj_move = []
Para = params.dict['MM']['dense']
flowMap = cv2.calcOpticalFlowFarneback(prev_gray, curr_gray, None, Para[0], Para[1], Para[2], Para[3], Para[4],
Para[5], Para[6])
res = int(params.dict['MM']['resolution_denseOF'])
vecY = range(0, curr_gray.shape[0], res)
vecX = range(0, curr_gray.shape[1], res)
x_grid, y_grid = np.meshgrid(vecX, vecY)
x = x_grid.flatten()
y = y_grid.flatten()
flowMapReduced = cv2.resize(flowMap, dsize=(len(vecY), len(vecX)), interpolation=cv2.INTER_CUBIC)
u = flowMapReduced[..., 0].flatten()
v = flowMapReduced[..., 1].flatten()
for id_obj in range(len(x)):
obj_move.append((-1, x[id_obj], y[id_obj], u[id_obj], v[id_obj]))
return np.array(obj_move)
[docs]
def PIV(curr_gray, prev_gray, params):
"""From pyprocess method of openPIV.
:param curr_gray: current frame in grayscale :param prev_gray: previous frame in grayscale :param
resolutionDenseOF: resolution of the grid for result (could be lower tha pixel size) :return: list of movement
detected for current time step [ID trajectory, X position, Y position, Velocity along x axis, Velocity along y
axis] :rtype: list
"""
obj_move = []
u, v, s2n = pyprocess.extended_search_area_piv(prev_gray.astype(np.int32), curr_gray.astype(np.int32),
window_size=params.dict['MM']['window_size'],
overlap=params.dict['MM']['overlap'],
search_area_size=params.dict['MM']['search_area_size'])
x, y = pyprocess.get_coordinates(image_size=prev_gray.shape,
search_area_size=params.dict['MM']['search_area_size'],
overlap=params.dict['MM']['overlap'])
for id_obj in range(len(x.flatten())):
obj_move.append((-1, x.flatten()[id_obj], y.flatten()[id_obj], u.flatten()[id_obj], v.flatten()[id_obj]))
return np.array(obj_move)
def manual_trajectory(img_objs, n_detected_objs):
moves = [ [] for _ in range(len(img_objs)-1)]
for id_obj in range(n_detected_objs+1):
x, y, id_img = 0, 0, -1
for img in range(len(img_objs)):
if img_objs[img].shape[0] > 0:
for current_obj in range(img_objs[img].shape[0]):
if id_obj == int(img_objs[img][current_obj, 0]):
if id_img != -1:
moves[img-1].append((
id_obj,
img_objs[img][current_obj, 1]+(x-img_objs[img][current_obj, 1])/(2*(img-id_img)),
img_objs[img][current_obj, 2]+(y-img_objs[img][current_obj, 2])/(2*(img-id_img)),
(img_objs[img][current_obj, 1]-x)/(img-id_img),
(img_objs[img][current_obj, 2]-y)/(img-id_img)
))
x, y = img_objs[img][current_obj, 1:3]
id_img = img
for id_img in range(len(moves)):
moves[id_img] = np.array(moves[id_img])
return moves