Source code for MotionModel

######################################################################
#          O  o   o o-o   o--o   o-o  o   o o--o o-o   o--o          #
#         / \ |\  | |  \  |   | o   o |\ /| |    |  \  |             #
#        o---o| \ | |   O O-Oo  |   | | O | O-o  |   O O-o           #
#        |   ||  \| |  /  |  \  o   o |   | |    |  /  |             #
#        o   oo   o o-o   o   o  o-o  o   o o--o o-o   o--o          #
######################################################################
#
# ANDROMEDE
# Copyright (C) 2023 Toulouse INP
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
# See the GNU General Public License for more details :
# <http://www.gnu.org/licenses/>.
#
######################################################################

""" This module manages motion models. It allows the choice of the method to determine the more probalistic
displacement for each particule detected by the ObjectDetection module. For each particule, a horizontal and vertical
is associated. For Nearest and Tractrac methods an ID trajectory is also provided. For dense optical flow method the
detection is not used even if it should be done to intialize the computation.


"""

from numba import njit
import numpy as np
import time
import scipy.spatial as scp
from PySide6.QtWidgets import QProgressDialog
from PySide6.QtGui import Qt
import cv2

from ThirdParty import KalmanFilter
from ThirdParty import openpiv_pyprocess as pyprocess


# detect move using objects and store inside results.obj_moves variable which contains information for each
# displacement 'id_move' detected at the frame 'id_frame'
# with results.obj_moves[id_frame][id_move] = [id_traj, x, y, Ux, Uy]


[docs] def select_method(params): """Function for motion model selection the availiable methode are:nearest/nearest + Kalman Filter/Tractrac method/Opyflow method/Dense Optical Flow (Fanerback) :param dict: dictionnary parameters :rtype: int """ id_method = 0 if params.dict['MM']['method'] == 'Nearest': id_method = 1 if params.dict['MM']['method'] == 'Nearest prediction': id_method = 2 if params.dict['MM']['method'] == 'Tractrac': id_method = 3 if params.dict['MM']['method'] == 'KLT': id_method = 4 if params.dict['MM']['method'] == 'Dense optical flow': id_method = 5 if params.dict['MM']['method'] == 'PIV': id_method = 6 if params.dict['MM']['method'] == 'Manual': id_method = 7 return id_method
[docs] def process_motion_model_all(video, results, params, id_method): """Processing selected motion model :param video: video read :param dict: dictionnary parameters :param id_method: motion model method :return: list of movement detected at each time step [ID trajectory, X position, Y position, Velocity along x-axis, Velocity along y axis] :rtype: list """ if id_method: results.obj_moves = [] results.trajs = [] elapsed_time = time.time() video.outputConsole.appendPlainText('Motion model using ' + str(params.dict['MM']['method']) + '...') progress = QProgressDialog('Motion Model', 'Stop', 0, 100) progress.setMinimumDuration(0) progress.setWindowModality(Qt.WindowModal) progress.setWindowTitle('Motion Model') num_images = len(video.imgs) obj_move = [] U_KF = [[], [], [], [], 0] # [objets_points, objets_id, objets_KF, object_history, id_objet] if id_method == 3: U_tractrac = initTractrac(results.objs[0], results.objs[1], id_method) if id_method == 7: results.objs = [] move_manual = results.obj_manual index = [int(move_manual[i][0]) for i in range(len(move_manual))] for id_image in range(1, num_images): progress.setLabelText('Processing frame ' + str(id_image + video.prop['window_frame'][0])) if id_method == 1: obj_move = nearest(results.objs[id_image], results.objs[id_image - 1], id_image, params) elif id_method == 2: obj_move, U_KF = nearest_prediction(U_KF, results.objs[id_image], params) elif id_method == 3: U_tractrac[2] = np.transpose(np.array([results.objs[id_image][:, 1], results.objs[id_image][:, 2]])) obj_move, U_tractrac = tractrac(U_tractrac, params) elif id_method == 4: obj_move = opyflow_andromede(video.imgs[id_image], video.imgs[id_image - 1], results.objs[id_image][:, :3], results.objs[id_image - 1][:, :3], params) elif id_method == 5: obj_move = farneback_andromede(video.imgs[id_image], video.imgs[id_image - 1], params) elif id_method == 6: obj_move = PIV(video.imgs[id_image], video.imgs[id_image - 1], params) elif id_method == 7: ind_frame = list(np.where(np.array(index) == id_image)) obj_move = [] id_traj = [] x = [] y = [] z = [] intensity = [] for ind in ind_frame: if len(ind): obj_move.append(move_manual[ind[0]][1:]) id_traj.append(move_manual[ind[0]][1]) x.append(move_manual[ind[0]][2]) y.append(move_manual[ind[0]][3]) z.append(0) intensity.append(0) objs = np.transpose(np.stack((id_traj, x, y, z, intensity))) obj_move = np.array(obj_move) results.objs.append(objs) results.obj_moves.append(obj_move) progress.setValue(int(100 * (id_image + 1) / num_images)) if progress.wasCanceled(): break elapsed_time = time.time() - elapsed_time video.outputConsole.appendPlainText(str(num_images) + ' images processed in ' + str(round(elapsed_time, 2)) + ' seconds\n') # set id_obj to 0 for no trajectory methods if id_method == 5 or id_method == 6: for id_image in range(len(results.objs)): results.objs[id_image][:, 0] = 0 video.update_process_status(4)
@njit def norm2(current_obj, prev_obj): return (current_obj[1] - prev_obj[1]) ** 2 + (current_obj[2] - prev_obj[2]) ** 2 @njit def norm3(current_obj, prev_obj, variation_size): return (current_obj[1] - prev_obj[1]) ** 2 + (current_obj[2] - prev_obj[2]) ** 2 + variation_size @njit def norm4(current_obj, prev_obj, variation_size, variation_int): return (current_obj[1] - prev_obj[1]) ** 2 + (current_obj[2] - prev_obj[2]) ** 2 + variation_size + variation_int @njit def relative_variation(current_obj, prev_obj): return abs(current_obj[3] - prev_obj[3]) / (prev_obj[3] + 1e-6), \ abs(current_obj[4] - prev_obj[4]) / (prev_obj[4] + 1e-6)
[docs] def distance_objs(current_obj, prev_objs, params, size_and_intensity): """Compute distance between selected points (current frame) and all others possibles previous point (previous frame). This method is used for nearest method (simple+Kalman filter). The parameter of likelihood allows avoiding the computation for non-realistic displacement. The distance depends on the chosen type (euclidian, euclidian + root square of area, euclidian + root square of area + normalised intensity) :param current_obj: x and y position of the input point :param prev_objs: x and y position of all points detected in the previous frame :return: list of movement detected at each time step [ID trajectory, X position, Y position, Velocity along x-axis, Velocity along y axis] :rtype: list """ n_objs = len(prev_objs) likelihood = params.dict['MM']['likelihood'] dist = np.zeros(n_objs) if params.dict['MM']['type_distance'] == 'distance': if size_and_intensity: for id_obj in range(0, n_objs): variation_size, variation_int = relative_variation(current_obj, prev_objs[id_obj]) if variation_size < likelihood and variation_int < likelihood: dist[id_obj] = norm2(current_obj, prev_objs[id_obj]) else: dist[id_obj] = 10000 else: for id_obj in range(0, n_objs): dist[id_obj] = norm2(current_obj, prev_objs[id_obj]) elif params.dict['MM']['type_distance'] == 'distance+area': for id_obj in range(0, n_objs): variation_size, variation_int = relative_variation(current_obj, prev_objs[id_obj]) if variation_size < likelihood and variation_int < likelihood: dist[id_obj] = norm3(current_obj, prev_objs[id_obj], variation_size) else: dist[id_obj] = 10000 elif params.dict['MM']['type_distance'] == 'distance+area+intensity': for id_obj in range(0, n_objs): variation_size, variation_int = relative_variation(current_obj, prev_objs[id_obj]) if variation_size < likelihood and variation_int < likelihood: dist[id_obj] = norm4(current_obj, prev_objs[id_obj], variation_size, variation_int) else: dist[id_obj] = 10000 return dist
[docs] def nearest(current_objs, prev_objs, id_image, params): """find the displacement by computing the minimum distance between previous and current frame. The distance can take into account size and intensity particles. :param current_objs: x and y positions detected on the current frame :param prev_objs: x and y positions detected on the previous frame :param id_image: frame identifiant :param distance_max: maximal possible distance for computation :return: list of movement detected at each time step [ID trajectory, X position, Y position, Velocity along x-axis, Velocity along y axis] :rtype: list """ maximum_distance = params.dict['MM']['distance_max'] ** 2 obj_move = [] id_traj = [] nobjs=-1 size_intensity = False if len(current_objs) > 0 and len(prev_objs): if params.dict['MM']['type_distance'] == 'distance': prev_objs = np.array(prev_objs) curr_objs = np.array(current_objs) X = [] V = [] id_traj = [] c0 = np.transpose(np.array([prev_objs[:, 1], prev_objs[:, 2]])) c1 = np.transpose(np.array([curr_objs[:, 1], curr_objs[:, 2]])) # Initialize Point Tree structure t0 = None if len(c0) > 0: t0 = scp.cKDTree(c0) distance, neighbours = t0.query(c1) neighbours = np.array(neighbours) id_count_traj = np.max(prev_objs[neighbours, 0]) + 1 # Add preliminary filters for ind_point in range(len(curr_objs)): x, y = prev_objs[neighbours[ind_point], 1], prev_objs[neighbours[ind_point], 2] vx, vy = curr_objs[ind_point, 1] - prev_objs[neighbours[ind_point], 1], curr_objs[ind_point, 2] - \ prev_objs[neighbours[ind_point], 2] norm = (vx ** 2 + vy ** 2) ** 0.5 if norm < params.dict['MM']['distance_max']: X.append([x + vx / 2, y + vy / 2]) # position of the measured velocity at half of the displacement V.append([vx, vy]) if prev_objs[neighbours[ind_point], 0] > -1: id_traj.append(prev_objs[neighbours[ind_point], 0]) current_objs[ind_point, 0] = prev_objs[neighbours[ind_point], 0] else: id_count_traj += 1 id_traj.append(id_count_traj) current_objs[ind_point, 0] = id_count_traj else: id_count_traj += 1 current_objs[ind_point, 0] = id_count_traj if not X: X = np.empty((0, 2)) V = np.empty((0, 2)) X = np.array(X) V = np.array(V) if len(X): for id_obj in range(len(X[:, 0])): obj_move.append((id_traj[id_obj], X[id_obj, 0], X[id_obj, 1], V[id_obj, 0], V[id_obj, 1])) else: if max(current_objs[:, 3]) > 0: size_intensity = True for id_obj in range(len(current_objs)): distances = distance_objs(current_objs[id_obj, :], prev_objs, params, size_intensity) id_prev = np.where(distances == distances.min()) if len(id_prev[0]) == 1: if distances[id_prev] < maximum_distance: dx = float(current_objs[id_obj, 1] - prev_objs[id_prev, 1]) dy = float(current_objs[id_obj, 2] - prev_objs[id_prev, 2]) x = float(current_objs[id_obj, 1] + prev_objs[id_prev, 1]) / 2 y = float(current_objs[id_obj, 2] + prev_objs[id_prev, 2]) / 2 if prev_objs[id_prev, 0] > -1: current_objs[id_obj, 0] = prev_objs[id_prev, 0] else: nobjs += 1 prev_objs[id_prev, 0] = nobjs current_objs[id_obj, 0] = prev_objs[id_prev, 0] obj_move.append((current_objs[id_obj, 0], x, y, dx, dy)) return np.array(obj_move)
[docs] def initTractrac(current_objs, prev_objs, id_method): """Initialize variable ofr tractrac method. The variable contains the positions of detected particles for the 2 first frames. :param current_objs: x and y positions detected on the current frame :param prev_objs: x and y positions detected on the previous frame :param id_method: type of the chosen method :return: list of variables for the movement of the particles in the 2 first frames. :rtype: list """ if id_method != 3: return [] c0 = np.transpose(np.array([prev_objs[:, 1], prev_objs[:, 2]])) c1 = np.transpose(np.array([current_objs[:, 1], current_objs[:, 2]])) c2 = [] id0 = np.zeros(c0.shape[0]) + np.nan # Initialize Point Tree structure t0, t1 = None, None if len(c0) > 0: t0 = scp.cKDTree(c0) if len(c1) > 0: t1 = scp.cKDTree(c1) ns01, ns10, i1001 = [np.array([]) for _ in range(3)] if (len(c0) > 0) & (len(c1) > 0): ns01 = t0.query(c1) ns10 = t1.query(c0) # First check of i1001 = ns10[1][ns01[1]] == np.arange(c1.shape[0]) err_u_filtered = np.zeros(c1.shape[0]) id_traj = 0 # Initialize lists err_U = np.array([]) err_U_th = 2. x_mot, u_mot, x_mot_tmp, u_mot_tmp = [np.array([]).reshape((-1, 2)) for _ in range(4)] err_mot, err_mot_temp = [np.array([]) for _ in range(2)] id_good = [] x1 = np.array([]).reshape(0, 2) um = np.array([]).reshape(0, 2) U_tractrac = [c0, c1, c2, t0, t1, ns01, i1001, id0, err_u_filtered, id_traj, err_U, x_mot, u_mot, x_mot_tmp, u_mot_tmp, err_mot, err_mot_temp, err_U_th, id_good, x1, um] return U_tractrac
def Propagate_MotionModel_KDTree(C, Xm, Um, Em, Xm_old, Um_old, Em_old, params): # KDTree X_ref = np.vstack((Xm, Xm_old)) U_ref = np.vstack((Um, Um_old)) E_ref = np.hstack((Em, Em_old)) # Make Tree tX_ref = scp.cKDTree(X_ref) # Get firsts N neighbours of query points nn = np.minimum(np.minimum(np.int8(params.dict['MM']['motion_av']), C.shape[0]), X_ref.shape[0]) # print nn distance, neighbours = tX_ref.query(C, k=nn) # Make matrices and compute local average neighbours = np.array(neighbours) values_U = U_ref[neighbours.flatten(), :].reshape(-1, nn, 2) values_E = E_ref[neighbours.flatten()].reshape(-1, nn) # Averaging over neighbours values # Option 1 : simple average of neighbouring values # Option 2 : weighted average with weight on previous model weights = np.ones(X_ref.shape[0]) # W[:Xm.shape[0]]=W[:Xm.shape[0]]*np.maximum(0,-Em) weights[-Xm_old.shape[0]:] = weights[-Xm_old.shape[0]:] * params.dict['MM']['filter_time'] # *np.maximum(0,-Em_old) weight_neighbours = weights[neighbours.flatten()].reshape(-1, nn) U_filtered = np.sum(values_U * weight_neighbours.reshape(weight_neighbours.shape[0], weight_neighbours.shape[1], -1), axis=1).reshape(-1, 2) / \ np.sum(weight_neighbours, axis=1).reshape(weight_neighbours.shape[0], -1) E_filtered = np.sum(values_E * weight_neighbours, axis=1) / np.sum(weight_neighbours, axis=1) # Find nan and replace by default 0 value id_good = np.isfinite(U_filtered[:, 0]) # Save non nan to weight next iteration Xm_old = C[id_good, :] Um_old = U_filtered[id_good, :] Em_old = E_filtered[id_good] # Replace nan if no model points where given if len(U_filtered) > 0: id_nan = np.isnan(U_filtered[:, 0]) U_filtered[id_nan, :] = 0 E_filtered[np.isnan(E_filtered)] = 2 return U_filtered, E_filtered, Xm_old, Um_old, Em_old
[docs] def tractrac(U_tractrac, params): """Main loop of the tractrac method. :param U_tractrac: variables for the 2 previous frames (connectivity, displacement, error, etc..) :param motion: (true/false)selection of the motion method (iteration to determine the most probable displacement) :param motion_it: number of iteration for motion method :param filter: (True/False) filter outliers if necessary :param filter_time: number of time step for temporal filter :return: list of movement detected for current time step [ID trajectory, X position, Y position, Velocity along x-axis, Velocity along y axis] :rtype: list :return: list of variables for the movement of the particles. :rtype: list """ obj_move = [] dt = 1 # params.dict['IP']['frame_step'] [C0, C1, C2, _, t1, ns01, i1001, ID0, err_U_filtered, id_traj, errU, X_mot, U_mot, X_mot_temp, U_mot_temp, err_mot, err_mot_temp, errU_th, id_good, X1, um] = U_tractrac it = 0 while it <= params.dict['MM']['motion_it']: # PREDICTION from motion model if (params.dict['MM']['motion'] == 1) & (len(id_good) + X_mot.shape[0] > 1) & (len(C1) > 0): if len(X1) == 0: [U_motion, err_U_filtered, X_mot_temp, U_mot_temp, err_mot_temp] = \ Propagate_MotionModel_KDTree(C1, np.array([]).reshape(-1, 2), np.array([]).reshape(-1, 2), np.array([]), X_mot, U_mot, err_mot, params) else: [U_motion, err_U_filtered, X_mot_temp, U_mot_temp, err_mot_temp] = \ Propagate_MotionModel_KDTree(C1, X1[id_good, :], um[id_good, :], errU[id_good], X_mot, U_mot, err_mot, params) else: U_motion = np.zeros(C1.shape) err_U_filtered = np.zeros(C1.shape[0]) # Start Iteration step ################################################# # if len(C2)==0: # Initialize for each loop # X0,X1,X1motion,X2,um,U_motion,a,t2,ns12,i2112= (np.array([[],[]]).T for i in range(10)) # idC2,errU,ID,ID_good,i_all,um= (np.array([]).T for i in range(6)) # Build new trees i1221, i2112, ns12 = [[] for _ in range(3)] t1m, ID, id_nan = [None for _ in range(3)] if len(C1) > 0: t1m = scp.cKDTree(C1 + U_motion * dt) # Points C1 moved by motion model # !!Check on original version if len(C2) > 0: t2 = scp.cKDTree(C2) # Points C2 else: t2 = None if (len(C1) > 0) & (len(C2) > 0): ns21 = t2.query(C1 + U_motion * dt) # Nearest Neighbour link C1m -> C2 ns12 = t1m.query(C2) # Nearest Neighbour link C2 -> C1m i1221 = ns12[1][ns21[1]] == np.arange(C1.shape[0]) # Check reversibility of link C2-> C1m [C1m -> C2 ] i2112 = ns21[1][ns12[1]] == np.arange(C2.shape[0]) # For next iteration if (len(C0) > 0) & (len(C1) > 0) & (len(C2) > 0): i_all = i1001 & i1221 # Keep only Unilateral associations on 3 steps 0 -> 1 -> 2 # print np.sum(i_all)/np.float(C1.shape[0]) # Update Trajectories positions X0 = C0[ns01[1][i_all], :] X1 = C1[i_all, :] X2 = C2[ns21[1][i_all], :] # Build traj ID ID = ID0[ns01[1][i_all]] # Velocities U0 = (X1 - X0) / dt U1 = (X2 - X1) / dt um = (U0 + U1) / 2. # TO CHECK whether params_dict_file['dt'] should appear or not in errU errU = np.maximum(-10., np.log10(np.amax((np.abs((U_motion[i_all, :] - um) * dt)), 1))) # Filtering outliers IS_good = errU - err_U_filtered[i_all] < errU_th # Evolution of threshold errU_th = (params.dict['MM']['filter_time'] * errU_th + params.dict['MM']['filter_std'] * np.std(errU)) / ( params.dict['MM']['filter_time'] + 1.) else: i_all, ID, errU, IS_good = [[] for _ in range(4)] X0, X1, X2, um, A = [np.array([]).reshape(-1, 2) for _ in range(5)] errU_th = 2 # Reinitialize threshold # Filter Outliers if necessary if params.dict['MM']['filter'] == 1: id_good = np.where(IS_good == 1)[0] elif len(X1) > 0: id_good = np.arange(0, len(X1[:, 1])) else: id_good = [] it += 1 # END Iteration step ################################################# if len(ID) > 0: id_nan = np.where(np.isnan(ID[id_good]))[0] if len(id_nan) > 0: # Associate new ID ID[id_good[id_nan]] = np.arange(0, len(id_nan)) + id_traj id_traj = id_traj + len(id_nan) # Keep track of last motion model used X_mot, U_mot, err_mot = X_mot_temp, U_mot_temp, err_mot_temp # Save ID for next iteration if len(C1) > 0: ID0 = np.zeros(C1.shape[0]) + np.nan ID0[i_all] = ID U_tractrac = [C1, C2, C2, t1, t2, ns12, i2112, ID0, err_U_filtered, id_traj, errU, X_mot, U_mot, X_mot_temp, U_mot_temp, err_mot, err_mot_temp, errU_th, id_good, X1, um] for id_obj in id_good: obj_move.append((ID[id_obj], X1[id_obj, 0], X1[id_obj, 1], um[id_obj, 0], um[id_obj, 1])) return np.array(obj_move), U_tractrac
[docs] def nearest_prediction(U_KF, current_objs, params): """Find the displacement by computing the minimum distance between current frame and the prediction from KF. The distance can take into account size and intensity particles. The kalman filter allows predicting position of the particles and store it if the particles has disappeared in the current frame. The filter is updated at each time step. :param U_KF: Kalman filter variables (trajectory) :param currents_objs: x and y positions detected on the current frame :param step_max_prevision: maximal number of time step for prediction without new detection :param distance_max: maximal possible distance for computation :return: list of movement detected for current time step [ID trajectory, X position, Y position, Velocity along x-axis, Velocity along y axis] :rtype: list """ [objets_points, objets_id, objets_KF, obj_history, id_objet] = U_KF dt = params.dict['IP']['frame_step'] obj_move = [(0, 0, 0, 0, 0)] obj_IDs, objs_X, objs_Y, objs_Ux, objs_Uy = [[] for _ in range(5)] size_intensity = False if len(current_objs) > 0: if max(current_objs[:, 3]) > 0: size_intensity = True count_vit = 0 # Prediction of all objects + display if object_point exists for id_obj in range(len(objets_points)): obj_state = objets_KF[id_obj].predict() # predicted objects by KF objets_points[id_obj] = np.array([obj_state[0, 0], obj_state[1, 0], obj_state[4, 0], obj_state[5, 0]]) if obj_state[0, 0] < (params.dict['IP']['ROIxmax'] - params.dict['IP']['ROIxmin']) and obj_state[1, 0] < ( params.dict['IP']['ROIymax'] - params.dict['IP']['ROIymin']): # if estimation remains in the image objs_Ux.append(obj_state[2, 0]) # add the X-velocity for the corresponding ID objs_Uy.append(obj_state[3, 0]) # add the Y-velocity for the corresponding ID objs_X.append(obj_state[0, 0]) objs_Y.append(obj_state[1, 0]) obj_IDs.append(objets_id[id_obj]) count_vit += 1 new_objs = np.ones((len(current_objs))) if len(objets_points): maximum_distance = params.dict['MM']['distance_max'] ** 2 for id_obj in range(len(objets_points)): if len(current_objs) > 0: if params.dict['MM']['type_distance'] == 'distance': c1 = np.transpose(np.array([current_objs[:, 1], current_objs[:, 2]])) id_prev = [] if len(c1) > 0: t1 = scp.cKDTree(c1) distance1, neighbours = t1.query(np.array([objets_points[id_obj][0], objets_points[id_obj][1]])) id_prev = [[neighbours]] distances1 = distance1 ** 2 else: distances = distance_objs(np.insert(objets_points[id_obj], 0, -1, axis=0), current_objs, params, size_intensity) id_prev = np.where(distances == distances.min()) distances1 = distances[id_prev] if len(id_prev[0]) == 1: if distances1 < maximum_distance: objets_KF[id_obj].update(np.array(current_objs[id_prev[0], 1:5]).T) obj_history[id_obj] = 0 # test detection at last time step new_objs[id_prev] = 0 else: obj_history[id_obj] += 1 # test detection at last time step else: obj_history[id_obj] += 1 # test detection at last time step # Creation of the Kalman filter for new objects for point_id in range(len(current_objs)): if new_objs[point_id]: objets_points.append(current_objs[point_id, 1:5]) objets_KF.append(KalmanFilter.KalmanFilter(dt, [current_objs[point_id, 1], current_objs[point_id, 2]], [current_objs[point_id, 3], current_objs[point_id, 4]])) objets_id.append(id_objet) obj_history.append(0) id_objet += 1 # Cleaning ... tab_id = [] for id_point in range(len(objets_points)): if int(objets_points[id_point][0]) < 0 or \ int(objets_points[id_point][1]) < 0 or \ objets_points[id_point][0] > params.dict['IP']['ROIxmax'] or \ objets_points[id_point][1] > params.dict['IP']['ROIymax'] or \ obj_history[id_point] > params.dict['MM']['step_max_prevision']: tab_id.append(id_point) for index in sorted(tab_id, reverse=True): del objets_points[index] del objets_KF[index] del objets_id[index] del obj_history[index] U_KF = [objets_points, objets_id, objets_KF, obj_history, id_objet] for id_obj in range(len(obj_IDs)): obj_move.append((obj_IDs[id_obj], objs_X[id_obj], objs_Y[id_obj], objs_Ux[id_obj], objs_Uy[id_obj])) return np.array(obj_move), U_KF
[docs] def opyflow_andromede(curr_gray, prev_gray, curr_X, prev_X, params): """find the displacement by using method in Opyflow. It is based on the Luka Kanade algorithm. :param curr_gray: current frame in grayscale :param prev_gray: previous frame in grayscale :param curr_X: x and y positions detected on the current frame :param prev_X: x and y positions detected on the previous frame :param distance_max_opy: maximal possible distance for computation :param winSize: size of the search window at each pyramid level. :param maxLevel: 0-based maximal pyramid level number; if set to 0, pyramids are not used (single level), if set to 1, two levels are used, and so on; if pyramids are passed to input then algorithm will use as many levels as pyramids have but no more than maxLevel. :param criteria: parameter, specifying the termination criteria of the iterative search algorithm (after the specified maximum number of iterations criteria.maxCount or when the search window moves by less than criteria.epsilon. :return: list of movement detected for current time step [ID trajectory, X position, Y position, Velocity along x axis, Velocity along y axis] :rtype: list """ obj_move, X, V, Id_traj = [[] for _ in range(4)] window_size = (params.dict['MM']['window_size'], params.dict['MM']['window_size']) # transformation of real coordinate points to pixel in the shifted image prev_Cc = np.float32(np.zeros((len(prev_X[:, 0]), 1, 2))) curr_Cc = np.float32(np.zeros((len(curr_X[:, 0]), 1, 2))) for ii in range(len(prev_X[1:, 0])): prev_Cc[ii, 0, 0] = np.float32(prev_X[ii, 1]) prev_Cc[ii, 0, 1] = np.float32(prev_X[ii, 2]) for ii in range(len(curr_X[:, 0])): curr_Cc[ii, 0, 0] = np.float32(curr_X[ii, 1]) curr_Cc[ii, 0, 1] = np.float32(curr_X[ii, 2]) # prev_Cc=(np.array(prev_Cc)) if prev_gray is not None and len(prev_Cc) > 0: img0, img1 = np.uint8(curr_gray * 255), np.uint8(prev_gray * 255) p1, st, err = cv2.calcOpticalFlowPyrLK( img0, img1, curr_Cc, None, winSize=window_size) p0r, st, err = cv2.calcOpticalFlowPyrLK( img1, img0, p1, None, winSize=window_size) d = abs(curr_Cc - p0r).reshape(-1, 2).max(-1) # wayBackGoodFlag determine the acceptable error when flow is calculated from A to B good = (d < 1) X_temp = curr_Cc.reshape(-1, 2) U_temp = curr_Cc.reshape(-1, 2) - p1.reshape(-1, 2) id_count = -1 id_count_traj = np.max(prev_X[:, 0]) + 1 c0 = np.transpose(np.array([prev_X[:, 1], prev_X[:, 2]])) c1 = np.transpose(np.array([curr_X[:, 1], curr_X[:, 2]])) # Initialize Point Tree structure t0, t1 = None, None if len(c0) > 0: t0 = scp.cKDTree(c0) distance, neighbours = t0.query(c1) # Make matrices and compute local average neighbours = np.array(neighbours) Id_temp = prev_X[neighbours, 0] # Add preliminary filters for [vx, vy], [x, y], good_flag, id_traj, dist1 in zip(U_temp, X_temp, good, Id_temp, distance): id_count += 1 if not good_flag: continue norm = (vx ** 2 + vy ** 2) ** 0.5 if norm > params.dict['MM']['distance_max_opy']: continue X.append([x + vx / 2, y + vy / 2]) # position of the measured velocity at half of the displacement V.append([vx, vy]) if params.dict['MM']['compute_trajectory']: if dist1 < params.dict['MM']['distance_max_opy']: if id_traj <= 0: Id_traj.append(id_count_traj) curr_X[id_count, 0] = id_count_traj id_count_traj += 1 else: Id_traj.append(id_traj) curr_X[id_count, 0] = id_traj else: Id_traj.append(id_count_traj) curr_X[id_count, 0] = id_count_traj id_count_traj += 1 else: Id_traj.append(0) if not X: X = np.empty((0, 2)) V = np.empty((0, 2)) X = np.array(X) V = np.array(V) um = V if len(X): for id_obj in range(len(X[:, 0])): obj_move.append((Id_traj[id_obj], X[id_obj, 0], X[id_obj, 1], um[id_obj, 0], um[id_obj, 1])) return np.array(obj_move)
[docs] def farneback_andromede(curr_gray, prev_gray, params): """From farneback method of openCV. :param curr_gray: current frame in grayscale :param prev_gray: previous frame in grayscale :param curr_X: x and y positions detected on the current frame :param prev_X: x and y positions detected on the previous frame :param resolutionDenseOF: resolution of the grid for result (could be lower tha pixel size) :param pyr_scale: parameter, specifying the image scale (<1) to build pyramids for each image; pyr_scale=0.5 means a classical pyramid, where each next layer is twice smaller than the previous one.levels number of pyramid layers including the initial image; levels=1 means that no extra layers are created and only the original images are used. :param winsize: averaging window size; larger values increase the algorithm robustness to image noise and give more chances for fast motion detection, but yield more blurred motion field. :param iterations: number of iterations the algorithm does at each pyramid level. :param poly_n: size of the pixel neighborhood used to find polynomial expansion in each pixel; larger values mean that the image will be approximated with smoother surfaces, yielding more robust algorithm and more blurred motion field, typically poly_n =5 or 7. :param poly_sigma: standard deviation of the Gaussian that is used to smooth derivatives used as a basis for the polynomial expansion; for poly_n=5, you can set poly_sigma=1.1, for poly_n=7, a good value would be poly_sigma=1.5. :return: list of movement detected for current time step [ID trajectory, X position, Y position, Velocity along x axis, Velocity along y axis] :rtype: list """ obj_move = [] Para = params.dict['MM']['dense'] flowMap = cv2.calcOpticalFlowFarneback(prev_gray, curr_gray, None, Para[0], Para[1], Para[2], Para[3], Para[4], Para[5], Para[6]) res = int(params.dict['MM']['resolution_denseOF']) vecY = range(0, curr_gray.shape[0], res) vecX = range(0, curr_gray.shape[1], res) x_grid, y_grid = np.meshgrid(vecX, vecY) x = x_grid.flatten() y = y_grid.flatten() flowMapReduced = cv2.resize(flowMap, dsize=(len(vecY), len(vecX)), interpolation=cv2.INTER_CUBIC) u = flowMapReduced[..., 0].flatten() v = flowMapReduced[..., 1].flatten() for id_obj in range(len(x)): obj_move.append((-1, x[id_obj], y[id_obj], u[id_obj], v[id_obj])) return np.array(obj_move)
[docs] def PIV(curr_gray, prev_gray, params): """From pyprocess method of openPIV. :param curr_gray: current frame in grayscale :param prev_gray: previous frame in grayscale :param resolutionDenseOF: resolution of the grid for result (could be lower tha pixel size) :return: list of movement detected for current time step [ID trajectory, X position, Y position, Velocity along x axis, Velocity along y axis] :rtype: list """ obj_move = [] u, v, s2n = pyprocess.extended_search_area_piv(prev_gray.astype(np.int32), curr_gray.astype(np.int32), window_size=params.dict['MM']['window_size'], overlap=params.dict['MM']['overlap'], search_area_size=params.dict['MM']['search_area_size']) x, y = pyprocess.get_coordinates(image_size=prev_gray.shape, search_area_size=params.dict['MM']['search_area_size'], overlap=params.dict['MM']['overlap']) for id_obj in range(len(x.flatten())): obj_move.append((-1, x.flatten()[id_obj], y.flatten()[id_obj], u.flatten()[id_obj], v.flatten()[id_obj])) return np.array(obj_move)
def manual_trajectory(img_objs, n_detected_objs): moves = [ [] for _ in range(len(img_objs)-1)] for id_obj in range(n_detected_objs+1): x, y, id_img = 0, 0, -1 for img in range(len(img_objs)): if img_objs[img].shape[0] > 0: for current_obj in range(img_objs[img].shape[0]): if id_obj == int(img_objs[img][current_obj, 0]): if id_img != -1: moves[img-1].append(( id_obj, img_objs[img][current_obj, 1]+(x-img_objs[img][current_obj, 1])/(2*(img-id_img)), img_objs[img][current_obj, 2]+(y-img_objs[img][current_obj, 2])/(2*(img-id_img)), (img_objs[img][current_obj, 1]-x)/(img-id_img), (img_objs[img][current_obj, 2]-y)/(img-id_img) )) x, y = img_objs[img][current_obj, 1:3] id_img = img for id_img in range(len(moves)): moves[id_img] = np.array(moves[id_img]) return moves