Imports


In [1]:
%load_ext autoreload

%autoreload 2

In [2]:
import os
import numpy as np
import skimage.io
import skimage
from scipy import ndimage
from skimage.draw import circle_perimeter_aa,circle
from skimage import transform


import matplotlib.pyplot as plt
import matplotlib
import matplotlib.animation as anim

from scipy.stats import multivariate_normal
import numba
import skvideo.io
import skvideo.datasets

import itertools

import scoring
import util

Configuration


In [21]:
test_path='test_data/All_Blacks.5846.main_glosses.mb.r480x360.mp4'
#test_path='test_data/_1276_small_3.mov'
output_path='tmp/'
filename =os.path.basename(test_path)
tracked_video_path= os.path.join(output_path,filename)
debug_video_path=os.path.join(output_path,"_debug".join(os.path.splitext(filename)))
data = skvideo.io.ffprobe(test_path)['video']
rate = data['@r_frame_rate']
video = skvideo.io.vread(test_path)/255

print("Original video shape: %s" % str(video.shape))
video=video[0:20,:,:,:]
util.video_rgb_to_hsv(video)

T,h,w,c=video.shape

# new_size=(320,320)
# video_original=video
# video=np.zeros((T,new_size[0],new_size[1],c))
# for t in range(T):
#     video[t,:,:,:] = transform.resize(video_original[t,:,:,:], new_size, order=1)

T,h,w,c=video.shape

print("Resulting video shape: %s" % str(video.shape))


Original video shape: (54, 368, 480, 3)
Resulting video shape: (20, 368, 480, 3)

In [22]:
def jump_penalty_euclidean(p1,p2):
  d=p1-p2
  return np.sum(d**2)
def skin_config_asllvd():
    mu_hsv=np.array([20.0,35.0,73.0])/255.0
    #mu_rgb=np.array([176,134,112])
    cov=np.array([0.3,0.5,10])
    return mu_hsv,cov
def skin_config_boston():
    mu_hsv=np.array([357,56.0,85.0])/255.0
    #mu_rgb=np.array([176,134,112])
    cov=np.array([0.3,0.7,15])
    return mu_hsv,cov

mu,cov=skin_config_asllvd()
#mu,cov=skin_config_boston()

movement_weight=0.1
optical_flow_weight=0.4
scorers=[scoring.manhattan_movement_score,lambda video:scoring.skin_pixel_scorer(video,mu,cov),scoring.optical_flow_movement_score]
scorers_labels=['manhattan_movement','pixel','optical_flow']
scorers_weights=[movement_weight,1-movement_weight-optical_flow_weight,optical_flow_weight]
local_score,scores=scoring.calculate_local_score(video,scorers,scorers_weights)


(19, 368, 480)
Movement score treshold: 0.017744
Pixel score treshold: 0.024918
Optical flow treshold: 0.421541

Test model


In [23]:
class DPTracker:  

  def __init__(self):
    pass

  def backtrack_path(self,backtracking_path,score):
    Tm,h,w,dim_indices=backtracking_path.shape
    T=Tm+1
    path=np.zeros((T,2),dtype=int) # 2 => (x,y)
    best_index_last_frame = np.argmax(score[-1,:,:])
    x,y= np.unravel_index(best_index_last_frame , score[-1,:,:].shape)
    path[-1,:]=np.array([x,y])
    for t in reversed(range(T-1)):
      x,y= tuple(path[t+1,:])
      path[t, :] = backtracking_path[t,x,y,:]
    return path

  @numba.jit
  def track(self,video,local_score,jump_penalty_matrix):
    video = ndimage.gaussian_filter(video, sigma=(0, 2, 2, 0), order=0)
    T,h,w,c=video.shape
    self.backtracking_path=np.zeros((T-1,h,w,2),dtype=int) # 2 => (x,y) for T-1
    self.score=np.zeros((T,h,w))
    n=int((jump_penalty_matrix.shape[0]-1)/2)
    n2=n+1
    

    self.score[0,:,:]=local_score[0,:,:]
    self.local_score =local_score
        
    initial_values=np.zeros((h,w,2))

    for i in range(h):
      for j in range(w):
        initial_values[i,j,:]=np.array([i,j])
    self.backtracking_path[:,:,:,:]=initial_values
    
    print("Calculating score for frame:")
    for t in range(1,T):
      print("%d/%d - " % (t,T-1), end="")
      for i in range(n, h-n):
        for j in range(n, w-n):
          neighbourhood_score=self.score[t-1,i-n:i+n2,j-n:j+n2]
          previous_score=neighbourhood_score-jump_penalty_matrix
            
          index=np.argmax(previous_score)
          best_score=previous_score.flat[index]
          relative_x,relative_y=np.unravel_index(index, jump_penalty_matrix.shape)
          x = i + relative_x - n
          y = j + relative_y - n
          self.backtracking_path[t-1, i, j, 0] =x
          self.backtracking_path[t-1, i, j, 1] =y
          self.score[t,i,j]=self.local_score[t,i,j]+best_score #np.mean(previous_score)
    self.path=self.backtrack_path(self.backtracking_path,self.score)
    
    return self.path

def generate_jump_penalty_matrix(jump_penalty, neighbourhood):
    n=neighbourhood*2+1
    matrix=np.zeros((n,n))
    center=np.array([neighbourhood,neighbourhood])
    for i in range(n):
      for j in range(n):
        position=np.array([i,j])
        matrix[i,j]=jump_penalty(center,position)
    return matrix

neighbourhood_size=25
jump_penalty_weight=0.0003
jump_penalty_matrix=jump_penalty_weight * generate_jump_penalty_matrix(jump_penalty_euclidean,neighbourhood_size)


tracker=DPTracker()
track_result=tracker.track(video,local_score,jump_penalty_matrix)

print("Tracking coordinates")
print(track_result.T)


Calculating score for frame:
1/19 - 2/19 - 3/19 - 4/19 - 5/19 - 6/19 - 7/19 - 8/19 - 9/19 - 10/19 - 11/19 - 12/19 - 13/19 - 14/19 - 15/19 - 16/19 - 17/19 - 18/19 - 19/19 - Tracking coordinates
[[315 315 324 336 342 338 313 288 263 238 213 214 201 198 192 190 190 191
  197 202]
 [183 179 184 184 194 219 235 243 251 255 250 251 256 254 257 262 262 264
  257 254]]

Save output


In [ ]:
def draw_tracked(video,track_result):
  T,h,w,c = video.shape
  image_shape=(h,w)
  radius=5
  color=np.array([1.0,0,0])
  for t in range(T):
    x,y=tuple(track_result[t,:])
    if (x-radius>=0) and (x+radius<h) and (y-radius>=0) and (y+radius<w):
      rr,cc=circle(x,y,radius,shape=image_shape)
      video[t,rr,cc,:]=color
  return video


def generate_debug_video(video,scores,labels,output_path):
#   FFMpegWriter= anim.writers['ffmpeg']
  metadata=dict(title="Output")
#   writer=FFMpegWriter(fps=6,bitrate=16384*2,metadata=metadata)

  columns=int(np.ceil(len(scores)/2))+1
  f,axes=plt.subplots(2,columns,dpi=100)
  axes =list(itertools.chain.from_iterable(axes))
  image_ax=axes[0]
  f.set_size_inches(10,10, True)
  T,h,w,c=video.shape
  base_file,ext=os.path.splitext(output_path)
  if not os.path.exists(base_file):
    os.mkdir(base_file)
#   with writer.saving(f,debug_video_path,T):
  cbars=[]
  ranges=[]
  for s in range(len(scores)):
    ranges.append( (0,np.max(scores[s])) )
  for t in range(T):
    plt.suptitle("Frame %d/%d" % (t,T-1))
    image_ax.cla()
    plot=image_ax.imshow(video[t,:,:,:])
    
    for s in range(len(scores)):
      ax=axes[s+1]
      ax.cla()
      ax.set_title(labels[s],fontsize=10)
    
      data=scores[s][t,:,:]
      vmin,vmax=ranges[s]
      plot=ax.imshow(data,cmap='gray',vmin=vmin,vmax=vmax)  
      if t==0:
        cbar=f.colorbar(plot, ax=ax)
        cbar.ax.tick_params(labelsize=10)
        cbar_ticks = np.linspace(vmin,vmax, num=10, endpoint=True)
        cbar.set_ticks(cbar_ticks)
        cbars.append(cbar)
#       plot.set_clim([data.min(), data.max()])
#       cbar_ticks = np.linspace(data.min(), data.max(), num=10, endpoint=True)
#       cbars[s].set_ticks(cbar_ticks) 
#       plot.autoscale()
    a='%s/%03d.png' % (base_file,t)
    plt.savefig(a)
#       writer.grab_frame()

tracked_video=np.copy(video)
util.video_hsv_to_rgb(tracked_video)
tracked_video=tracked_video/255.0
tracked_video=draw_tracked(tracked_video,track_result)
tracked_video=tracked_video.astype('short')
show_scores=[local_score,tracker.score]+scores
show_scores_label=['local','global']+scorers_labels
generate_debug_video(tracked_video, show_scores,show_scores_label,debug_video_path)

# skvideo.io.vwrite(tracked_video_path, tracked_video, outputdict={
#   '-vcodec': 'libx264',
#   '-pix_fmt': 'yuv420p',
#   '-r': rate,
# })
plt.close("all")

In [ ]:
score=tracker.local_score
plt.imshow(score[T-2,:,:])
plt.colorbar()
plt.figure()
plt.imshow(score[T-3,:,:])
plt.colorbar()
plt.figure()
plt.imshow(score[T-4,:,:])
plt.colorbar()
plt.figure()
plt.imshow(tracked_video[T-1,:,:,:])
plt.colorbar()
plt.figure()
mean_scores=np.sum(tracker.score,axis=(1,2))
print(mean_scores.shape)
plt.plot(mean_scores)
plt.show()