First, let's install the libraries. This first cell will download and install the libraries on our colab virtual machine, and the next cell imports them.
In [ ]:
!pip install google-cloud-automl
!apt-get install libmagickwand-dev
!pip install pillow
!pip install --upgrade protobuf
!pip install --upgrade google-cloud-videointelligence
You might have to restart your runtime to load these packages.
In [ ]:
import sys
import os
import json
import math
from google.colab import auth
from google.colab import files
import pandas as pd
from PIL import Image, ImageDraw
from matplotlib import pyplot as plt
import numpy as np
from google.cloud import automl
from google.cloud import videointelligence_v1p3beta1 as videointelligence
from google.oauth2 import service_account
Next, create a new GCP account (if you don't have one already), and create a new project.
In [ ]:
auth.authenticate_user()
Fill in your info below by specifying your project id. You'll also need to choose a bucket name (that should start with gs://
) and a name for your service account, a file that will be downloaded to this notebook to allow us to use the AutoML API. service_account_name
can be anything.
In [ ]:
# TODO: REMOVE MY SPECIFIC CONFIG
project_id = 'YOUR_PROJECT_ID' #@param {type: "string"}
bucket = 'gs://YOUR_BUCKET' #@param {type: "string"}
service_account_name="ANY_RANDOM_NAME" #@param {type: "string"}
In [ ]:
!gcloud config set project {project_id}
!gsutil mb {bucket}
!gcloud iam service-accounts create {service_account_name}
!gcloud iam service-accounts keys create ./key.json --iam-account {service_account_name}@{project_id}.iam.gserviceaccount.com
# Enable the Video Intelligence API and AutoML
!gcloud services enable videointelligence.googleapis.com
!gcloud services enable automl.googleapis.com
# Give your service account permission to access the API
!gcloud projects add-iam-policy-binding {project_id} --member="serviceAccount:{service_account_name}@{project_id}.iam.gserviceaccount.com" --role="roles/editor"
In [ ]:
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = "./key.json"
In this bit, we'll analyze our skeletons using the Video Intelligence API. The API tracks all sorts of features, including features of the face, but we'll just focus on the body:
To do this, you'll need to upload a video you'd like to analyze to a cloud storage bucket. The fastest way to do this is from the command line: gsutil cp YOUR_VIDEO_FILE.mp4 gs://YOUR_BUCKET_NAME
. Fill in the name of the file you upload here:
In [ ]:
file_to_analyze = 'YOUR_SPORTS_VIDEO.mp4' #@param {type: "string"}
In [ ]:
# Verify that you see that file listed here...
# This cell should print the name of the file you just uploaded
!gsutil ls {bucket}/{file_to_analyze}
Now let's run the Video Intelligence API's Person Detection feature on the uploaded video. We pass this function the input path to our file in cloud storage as well as an output path where we'd like the results to be written.
In [ ]:
input_uri = os.path.join(bucket, file_to_analyze)
output_uri = os.path.join(bucket, 'output.json')
In [ ]:
# This function comes from the docs
# https://cloud.google.com/video-intelligence/docs/people-detection
def detect_person(input_uri, output_uri):
"""Detects people in a video."""
client = videointelligence.VideoIntelligenceServiceClient(credentials=service_account.Credentials.from_service_account_file(
'./key.json'))
# Configure the request
config = videointelligence.types.PersonDetectionConfig(
include_bounding_boxes=True,
include_attributes=True,
include_pose_landmarks=True,
)
context = videointelligence.types.VideoContext(person_detection_config=config)
# Start the asynchronous request
operation = client.annotate_video(
input_uri=input_uri,
output_uri=output_uri,
features=[videointelligence.enums.Feature.PERSON_DETECTION],
video_context=context,
)
return operation
In [ ]:
# If you get a permissions episode here, you might have to modify the permissions
# on your bucket to allow your service account to access it. Do that in the
# GCP storage console/UI.
operation = detect_person(input_uri, output_uri)
We've called an asynchronous function here--detect_person
--because long videos can take a while to analyze. You can check the status of the analysis by calling operation.done
:
In [ ]:
print(f"Operation ${operation.operation.name} is done? {operation.done()}")
Note that even if you restart this notebook, the Video Intelligence API will still be analyzing your video in the cloud! So you won't lose any progress.
Once the operation is finished, we can download the results from our cloud storage bucket:
In [ ]:
# Note! This won't work unless operation.done() == True!
!mkdir tmp
!gsutil cp {output_uri} tmp
Results are written to cloud storage as a json file. Let's load them!
In [ ]:
data = json.load(open('./tmp/output.json'))
These json files are usually pretty big, so don't print them! Instead, let's just inspect the structure:
In [ ]:
print(data.keys())
# We only care about annotation_results[0] because we only have one video
print(len(data['annotation_results'][0]['person_detection_annotations']))
It's easy to get lost in all these nested fields! What we really want is the data stored in data['annotation_results
][0][person_detection_annotations
]. Let's grab it:
In [ ]:
people_annotations = data['annotation_results'][0]['person_detection_annotations']
In people_annotations
, every entry correspond to a person and each person has a unique set of tracks
, or tracked segments. We'll use a helper function to parse through the data and rearrange it to make it easier to use for our analyses:
In [ ]:
'''
This helper function takes in a person and rearranges the data so it's in
a timeline, which will make it easier for us to work with
'''
def analyzePerson(person):
frames = []
for track in person['tracks']:
# Convert timestamps to seconds
for ts_obj in track['timestamped_objects']:
time_offset = ts_obj['time_offset']
timestamp = 0
if 'nanos' in time_offset:
timestamp += time_offset['nanos'] / 10**9
if 'seconds' in time_offset:
timestamp += time_offset['seconds']
if 'minutes' in time_offset:
timestamp += time_offset['minutes'] * 60
frame= {'timestamp' : timestamp}
for landmark in ts_obj['landmarks']:
frame[landmark['name'] + '_x'] = landmark['point']['x']
# Subtract y value from 1 because positions are calculated
# from the top left corner
frame[landmark['name'] + '_y'] = 1 - landmark['point']['y']
frames.append(frame)
sorted(frames, key=lambda x: x['timestamp'])
return frames
We'll also store the data in a pandas DataFrame (also for convenience), and sort each data point by timestamp
In [ ]:
annotationsPd = pd.DataFrame(analyzePerson(people_annotations[0]))
for annotation in people_annotations[1:]:
annotationsPd = annotationsPd.append(pd.DataFrame(analyzePerson(annotation)))
annotationsPd = annotationsPd.sort_values('timestamp', ascending=True)
Phew! The hard bit (parsing the data) is over. Now we can take a look at the results!
In [ ]:
annotationsPd.head()
Out[ ]:
As you can see above, we've organized the data by the position of each body part by timestamp. Note that this works because they're actually only one person in my video--me!
The first thing you might want to do is plot the positions of various body parts over time. Since I was analyzing a bunch of shots of my serve, I thought I'd look at the position of my wrists to try to determine the start and end time of a serve. Try replacing wrist values for anything else you're interested in tracking:
In [ ]:
plt.figure()
annotationsPd.plot('timestamp', ['left_wrist_y', 'right_wrist_y'], figsize=(20, 5))
plt.title("Left and Right Wrist Positions Over Time")
plt.savefig("wrist_pos")
From the plot above, you can actually identify the time of my serve pretty easily! First, I throw the tennis ball up with my left hand (peak in left wrist). Then, a few seconds later, I hit the ball with my racket (peak in right wrist y).
The above plot is sort of useful, but what would be even better would be understanding the angles of my elbow, knee, etc.
In [ ]:
class Point:
def __init__(self, x, y):
self.x = x
self.y = y
To compute the angle made by three points, we use the Law of Cosines. Did you forget about this? I did! Imagine a triangle with side lengths a, b, and c. Then, to find 𝛾 (the angle across from side c), the formula is:
\begin{equation*} \gamma = \cos^{-1}\frac{a^2+b^2 - c^2}{2ab} \end{equation*}There's a good explanation and code sample here, from which this function is borrowed:
In [ ]:
def getAngle(a, b, c):
ang = math.degrees(math.atan2(c.y-b.y, c.x-b.x) - math.atan2(a.y-b.y, a.x-b.x))
return ang
Let's compute some useful angles below
In [ ]:
def computeElbowAngle(row, which='right'):
wrist = Point(row[f'{which}_wrist_x'], row[f'{which}_wrist_y'])
elbow = Point(row[f'{which}_elbow_x'], row[f'{which}_elbow_y'])
shoulder = Point(row[f'{which}_shoulder_x'], row[f'{which}_shoulder_y'])
return getAngle(wrist, elbow, shoulder)
def computeShoulderAngle(row, which='right'):
elbow = Point(row[f'{which}_elbow_x'], row[f'{which}_elbow_y'])
shoulder = Point(row[f'{which}_shoulder_x'], row[f'{which}_shoulder_y'])
hip = Point(row[f'{which}_hip_x'], row[f'{which}_hip_y'])
return getAngle(hip, shoulder, elbow)
def computeKneeAngle(row, which='right'):
hip = Point(row[f'{which}_hip_x'], row[f'{which}_hip_y'])
knee = Point(row[f'{which}_knee_x'], row[f'{which}_knee_y'])
ankle = Point(row[f'{which}_ankle_x'], row[f'{which}_ankle_y'])
return getAngle(ankle, knee, hip)
In [ ]:
# For a single timeslot...
row = annotationsPd.iloc[-1]
print("Elbow angle: " + str(computeElbowAngle(row)))
print("Shoulder angle: " + str(computeShoulderAngle(row)))
print("Knee angle: " + str(computeKneeAngle(row)))
Sweet! Now let's plot those angles over time.
In [ ]:
annotationsPd['right_elbow_angle'] = annotationsPd.apply(computeElbowAngle, axis=1)
annotationsPd['right_shoulder_angle'] = annotationsPd.apply(computeShoulderAngle, axis=1)
annotationsPd['right_knee_angle'] = annotationsPd.apply(computeKneeAngle, axis=1)
Now let's plot the results!
In [ ]:
plt.figure()
annotationsPd.plot('timestamp', ['right_elbow_angle'], figsize=(20, 5), color='blue')
plt.title("Right Elbow Angle over Time")
plt.savefig("right_elbow_angle")
annotationsPd.plot('timestamp', ['right_shoulder_angle'], figsize=(20, 5), color='purple')
plt.title("Right Shoulder Angle over Time")
plt.savefig("right_shoulder_angle")
annotationsPd.plot('timestamp', ['right_knee_angle'], figsize=(20, 5))
plt.title("Right Knee Angle over Time")
plt.savefig("right_knee_angle")
Now, these angles might not be very useful on their own. But when we combine them with position data, we can tell what the angle of my arm was at the height of my serve. In particular, let's take a look at the angle of my elbow and shoulder when my right wrist was at the highest point in the serve.
In [ ]:
fig = plt.figure()
ax=fig.add_subplot(111, label="1")
annotationsPd.plot('timestamp', ['right_wrist_y'], figsize=(20, 5), ax=ax, color='red')
plt.title("Right Elbow Angle over Time")
ax2=fig.add_subplot(111, label="2", frame_on=False)
annotationsPd.plot('timestamp', ['right_elbow_angle'], figsize=(20, 5), ax=ax2)
#annotationsPd.plot.scatter('right_wrist_y', 'right_elbow_angle')
Out[ ]:
These charts might be difficult to read, but but they tell me that when my arm is most extended, the angle of my elbow is about a 200 degree angle.
To compute the speed of the ball, I used AutoML Vision Object Detection, a GUI-based way to train a deep neural network. Check out the blog post to see how I trained the model. Here, we'll just use it.
Since we've trained a vision model, we'll first need to convert our video into images. First, upload a video file:
In [ ]:
uploaded = files.upload()
for fn in uploaded.keys():
print('User uploaded file "{name}" with length {length} bytes'.format(
name=fn, length=len(uploaded[fn])))
Next, we'll use a command line tool called ffmpeg to conver the video into photos, 10 photos per second.
In [ ]:
filename, _ = uploaded.popitem()
Below, I use the ffmpeg command to generate snapshots from my video at 20 frames per second. I take a 2 second segment (-t 00:00:02
) that starts from one second in (-ss 00:00:01
). This aligns with my first serve.
In [ ]:
!mkdir tmp/snapshots
!ffmpeg -i {filename} -vf fps=20 -ss 00:00:01 -t 00:00:02 tmp/snapshots/%03d.jpg
Now let's analyze those snapshots. Grab your AutoML model id:
In [ ]:
model_id = 'IOD6154100721080860672' #@param {type: "string"}
In [ ]:
def getAutoMLPrediction(filename):
with open(filename, 'rb') as ff:
content = ff.read()
prediction_client = automl.PredictionServiceClient()
name = 'projects/{}/locations/us-central1/models/{}'.format(project_id, model_id)
params = {"score_threshold": "0.7"} # this metric changes the sensitivity of your model
image = automl.types.Image(image_bytes=content)
payload = automl.types.ExamplePayload(image=image)
return prediction_client.predict(name, payload, params)
In [ ]:
def getBallsCoords(filename):
res = getAutoMLPrediction(filename)
return [obj.image_object_detection.bounding_box.normalized_vertices for obj in res.payload]
In [ ]:
snapshotFiles = os.listdir('tmp/snapshots')
snapshotFiles.sort()
print(f"Analyzing {len(snapshotFiles)} images")
Now that we're able to track the ball, let's make a pretty image so we can see what's actually going on:
In [ ]:
def makeBallImage(filename, coords):
im = Image.open(filename)
im.thumbnail((im.width * 0.2, im.height * 0.2))
draw = ImageDraw.Draw(im)
for coord in coords:
draw.rectangle([(coord[0].x * im.width, coord[0].y * im.height), coord[1].x * im.width, coord[1].y * im.height])
return im
In [ ]:
# Call the AutoML API--this could take a while!
coords = [getBallsCoords('tmp/snapshots/' + filename) for filename in snapshotFiles if 'jpg' in filename]
In [ ]:
imgs = [makeBallImage('tmp/snapshots/' + filename, coord) for filename, coord in zip(snapshotFiles, coords) if 'jpg' in filename]
!mkdir snapshot_annotated
for idx, im in enumerate(imgs):
plt.imshow(np.asarray(im))
plt.savefig('snapshot_annotated/file%d.png' % idx)
# Create a cute video of your seves!
!ffmpeg -framerate 20 -i snapshot_annotated/file%01d.png -vcodec mpeg4 -y ball_tracking.mp4
!ffmpeg -i ball_tracking.mp4 ball_tracking.gif
The code above analyzes the snapshots and creates a gif and video you can check out in the files ball_tracking.mp4 and ball_tracking.gif respectively.
Okay, now that we've tracked the ball, let's compute it's position and then speed!
In [ ]:
# For simplicity, we'll just plot the bottom left corner of the bounding box
# around the ball
coord_x = [ball[0].x for frame in coords for ball in frame]
coord_y = [1 - ball[0].y for frame in coords for ball in frame]
timestamps = [x/20 for x in range(len(coord_x))] # 20 frames per second
Here we can plot the ball in space, and see how it leaves my hand and then flies across the court.
In [ ]:
plt.title("Position of tennis ball during serve")
plt.xlabel("X Position")
plt.ylabel("Y Position")
plt.scatter(coord_x, coord_y)
plt.savefig("serve_position_x_y.png")
To determine the speed, let's look at the distance the ball travels over time:
In [ ]:
plt.title("Y position of tennis ball during serve over time")
plt.xlabel("seconds")
plt.ylabel("Y position")
plt.scatter(timestamps, coord_y)
plt.savefig("ball_position_over_time.png")
You can see that 0.5 to 0.7 seconds is when the ball has been hit and is traveling across the court. So, to compute the speed, let's divide distance by time!
In [ ]:
# Get the first data point from 0.5 seconds
start_x = coord_x[timestamps.index(0.5)]
end_x = coord_x[-1]
start_y = coord_y[timestamps.index(0.5)]
end_y = coord_y[-1]
# Compute the Euclidean distance
distance = math.sqrt((start_x - end_x)**2 + (start_y - end_y)**2)
time = timestamps[-1] - 0.5
print(f"The speed of your serve was {distance/time}")