In [1]:
from IPython.display import YouTubeVideo
YouTubeVideo('leVZjVahdKs')
Out[1]:
by: Sam.Gu@KudosData.com
May 2017 ========== Scan the QR code to become trainer's friend in WeChat ========>>
From the same API console, choose "Dashboard" on the left-hand menu and "Enable API".
Enable the following APIs for your project (search for them) if they are not already enabled:
Finally, because we are calling the APIs from Python (clients in many other languages are available), let's install the Python package (it's not installed by default on Datalab)
In [2]:
# Copyright 2016 Google Inc.
# Licensed under the Apache License, Version 2.0 (the "License");
# !pip install --upgrade google-api-python-client
In [3]:
import io, os, subprocess, sys, time, datetime, requests, itchat, json, numpy as np
from itchat.content import *
from googleapiclient.discovery import build
from IPython.display import HTML
# Python 2
if sys.version_info[0] < 3:
import urllib2
# Python 3
else:
import urllib.request
First, visit API console, choose "Credentials" on the left-hand menu. Choose "Create Credentials" and generate an API key for your application. You should probably restrict it by IP address to prevent abuse, but for now, just leave that field blank and delete the API key after trying out this demo.
Copy-paste your API Key here:
In [4]:
# Here I read in my own API_KEY from a file, which is not shared in Github repository:
with io.open('../../API_KEY.txt') as fp:
for line in fp: APIKEY = line
# You need to un-comment below line and replace 'APIKEY' variable with your own GCP API key:
# APIKEY='AIzaSyCvxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx'
In [5]:
# Below is for GCP Video API
# video_service = build('videointelligence', 'v1', developerKey=APIKEY)
video_service = build('videointelligence', 'v1beta1', developerKey=APIKEY)
# check video processing progress
video_operation_service = build('videointelligence', 'v1', developerKey=APIKEY)
In [6]:
# Import the base64 encoding library.
import base64
# Pass the media data to an encoding function.
def encode_media(media_file):
with io.open(media_file, "rb") as media_file:
media_content = media_file.read()
# Python 2
if sys.version_info[0] < 3:
return base64.b64encode(media_content).decode('ascii')
# Python 3
else:
return base64.b64encode(media_content).decode('utf-8')
In [7]:
# API control parameter for 视频识别和处理 Video Recognition & Processing
parm_video_api_features = ['LABEL_DETECTION', 'SHOT_CHANGE_DETECTION']
# parm_video_api_features = ['LABEL_DETECTION']
# parm_video_api_features = ['SHOT_CHANGE_DETECTION']
parm_video_api_display_seconds = True # Ture: use seconds for display; False: use microseconds to display video shot duration
parm_video_response = {} # global variable
In [8]:
# Invoke Video API
def KudosData_VIDEO_DETECTION(video_base64):
print ('[INFOR] Start Video Analysis...')
##########################################################################################
# 1. Send request for video processing
##########################################################################################
request1 = video_service.videos().annotate(body={
# "inputUri": string,
"inputContent": video_base64,
"features": parm_video_api_features, # 'LABEL_DETECTION' & 'SHOT_CHANGE_DETECTION'
})
responses1 = request1.execute(num_retries=3)
if 'name' in responses1:
print ('Internal video/operation name assinged : %s' % responses1['name'])
##########################################################################################
# 2. Check progress till completion (Video requires asynchronous long processing...)
##########################################################################################
responses2 = KudosData_VIDEO_DETECTION_CHECK_PROCRESS(responses1['name'], APIKEY)
else:
print ('[ERROR] Calling Video API request failed. Please re-try.')
return responses2
In [9]:
# Keep checking progress of Video API processing, till full completion.
def KudosData_VIDEO_DETECTION_CHECK_PROCRESS(name, apikey):
flag_completion = False
operation_url = 'https://videointelligence.googleapis.com/v1/operations/' + name + '?key=' + apikey
# print('operation_url : ', operation_url)
while not flag_completion:
# Python 2
if sys.version_info[0] < 3:
response_operation = json.loads(urllib2.urlopen(operation_url).read())
# Python 3
else:
# response_operation = json.loads(urllib.request.urlopen(operation_url).read())
response_operation = json.loads(urllib.request.urlopen(operation_url).read().decode('utf-8'))
# define an array, containing 'progress %' of each Video API
video_api_progress_pct = np.zeros(shape=(1, len(response_operation['metadata']['annotationProgress'])))
# print('... length : %d' % len(response_operation['metadata']['annotationProgress']))
for i in range(len(response_operation['metadata']['annotationProgress'])):
if 'progressPercent' in response_operation['metadata']['annotationProgress'][i]:
video_api_progress_pct[0][i] = response_operation['metadata']['annotationProgress'][i]['progressPercent']
# print ('>>> Video API {} Progress : {} %'.format(i+1, video_api_progress_pct[0][i]))
print (' overall progress : {} %'.format(np.mean(video_api_progress_pct[0])))
if min(video_api_progress_pct[0]) == 100:
flag_completion = True
print (' Video API Completed !\n')
# time.sleep(5)
time.sleep(2)
# print('... time.sleep')
return response_operation
In [10]:
# return a list of shot locations, based on list of object name
def KudosData_search_location(list_object_name):
# print('Look for object : %s' % object_name)
for i in range(len(parm_video_response['response']['annotationResults'][0]['labelAnnotations'])):
if parm_video_response['response']['annotationResults'][0]['labelAnnotations'][i]['description'] == list_object_name[0]:
# print('Found {} Locations :'.format(
# len(parm_video_response['response']['annotationResults'][0]['labelAnnotations'][i]['locations'])
# ))
return parm_video_response['response']['annotationResults'][0]['labelAnnotations'][i]['locations']
return [] # If not found, return empty list []
### simple score integration: Certainty Factor
### CF = CF1 + CF2 * ( 1 - CF1 )
def KudosData_CF(cf1, cf2): return cf1 + cf2 * ( 1 - cf1)
# return a list of shot-locations, based on matched shot-loactions of two objects
def KudosData_match_location(list_location_1, list_location_2):
list_location_matched = []
for i in range(len(list_location_1)):
for j in range(len(list_location_2)):
if list_location_1[i]['level'] == list_location_2[j]['level'] and \
list_location_1[i]['segment'] == list_location_2[j]['segment']:
# print('Shots matched: [i={}] [j={}]'.format(i, j))
dict_location_matched = list_location_2[j].copy()
dict_location_matched['confidence'] = KudosData_CF(list_location_1[i]['confidence'],
list_location_2[j]['confidence'])
list_location_matched.append(dict_location_matched)
# print(list_location_matched)
return list_location_matched
def KudosData_search(list_object_name):
if len(list_object_name) == 1:
return KudosData_search_location([list_object_name[0]])
else:
return KudosData_match_location(KudosData_search_location([list_object_name[0]]),
KudosData_search_location([list_object_name[1]]))
In [11]:
def KudosData_video_generate_reply(parm_video_response):
video_reply = u'[ Video Analysis 视频分析结果 ]'
# 1. 识别视频的场景片段 (Detect shots change in video)
if parm_video_api_display_seconds: # our pre-defined control parm
parm_denominator = 1000000 # dispay results in seconds
else:
parm_denominator = 1 # dispay results in microseconds
video_reply += '\n' + (u'\n[ 视频场景片段 ]')
video_reply += '\n' + (u'片段总数 No. scenery/shot : {}'
.format(len(parm_video_response['response']['annotationResults'][0]['shotAnnotations'])))
for i in range(len(parm_video_response['response']['annotationResults'][0]['shotAnnotations'])):
# video_reply += (parm_video_response['response']['annotationResults'][0]['shotAnnotations'][i])
if i == 0:
video_reply += '\n' + (u'片段 Shot 1 : 0 ~ {} {}'
.format(
int(parm_video_response['response']['annotationResults'][0]['shotAnnotations'][i]['endTimeOffset'])
/parm_denominator
, 'seconds' if parm_video_api_display_seconds else 'microseconds'
))
else:
video_reply += '\n' + (u'片段 Shot {} : {} ~ {} {}'
.format(i+1
, int(parm_video_response['response']['annotationResults'][0]['shotAnnotations'][i]['startTimeOffset'])
/parm_denominator
, int(parm_video_response['response']['annotationResults'][0]['shotAnnotations'][i]['endTimeOffset'])
/parm_denominator
, 'seconds' if parm_video_api_display_seconds else 'microseconds'
))
# 2. 识别视频消息中的物体名字 (Recognize objects in video/shots)
video_reply += '\n' + (u'\n[ 视频物体识别 ]')
video_reply += '\n' + (u'物体总数 No. objects : {}'
.format(len(parm_video_response['response']['annotationResults'][0]['labelAnnotations'])))
for i in range(len(parm_video_response['response']['annotationResults'][0]['labelAnnotations'])):
# video_reply += each object's name:
video_reply += '\n'
video_reply += '\n' + (u'物体 Obj {} : {}\n位于{}处片段 Locations: '.format(
i+1
, parm_video_response['response']['annotationResults'][0]['labelAnnotations'][i]['description']
, len(parm_video_response['response']['annotationResults'][0]['labelAnnotations'][i]['locations'])
))
# video_reply += each object's location in video:
for j in range(len(parm_video_response['response']['annotationResults'][0]['labelAnnotations'][i]['locations'])):
# video_reply += (parm_video_response['response']['annotationResults'][0]['labelAnnotations'][i]['locations'][j])
video_reply += '\n' + (' [{}] Confidence : {} | {} | {} '.format(
j+1
, parm_video_response['response']['annotationResults'][0]['labelAnnotations'][i]['locations'][j]['confidence']
, parm_video_response['response']['annotationResults'][0]['labelAnnotations'][i]['locations'][j]['segment']
, parm_video_response['response']['annotationResults'][0]['labelAnnotations'][i]['locations'][j]['level']
))
return video_reply
< Start of interactive demo >
In [12]:
# video_file = 'reference/video.mp4'
# video_file = 'reference/SampleVideo_360x240_1mb.mp4'
video_file = 'reference/SampleVideo_360x240_2mb.mp4'
In [13]:
HTML(data='''<video alt="test" controls><source src="data:video/mp4;base64,{0}" type="video/mp4" /></video>'''
.format(encode_media(video_file)))
Out[13]:
In [14]:
# Send API request & Obtain results:
parm_video_response = KudosData_VIDEO_DETECTION(encode_media(video_file))
# Obtain results, without resending API request:
# parm_video_response = KudosData_VIDEO_DETECTION_CHECK_PROCRESS('asia-east1.11590064400054192243', APIKEY)
In [15]:
parm_video_response
# metadata in response: Version of Video API used; Differenc Video APIs called, and start/end processing time.
# parm_video_response['metadata']
# actual video API detecion results in response:
# parm_video_response['response']['annotationResults'][0]
Out[15]:
In [16]:
# parm_video_response['response']['annotationResults'][0]['shotAnnotations']
In [17]:
if parm_video_api_display_seconds: # our pre-defined control parm
parm_denominator = 1000000 # dispay results in seconds
else:
parm_denominator = 1 # dispay results in microseconds
print(u'[ 视频场景片段 ]')
print(u'片段总数 No. scenery/shots : {}'.format(len(parm_video_response['response']['annotationResults'][0]\
['shotAnnotations'])))
for i in range(len(parm_video_response['response']['annotationResults'][0]['shotAnnotations'])):
# print(parm_video_response['response']['annotationResults'][0]['shotAnnotations'][i])
if i == 0:
print(u'片段 Shot 1 : 0 ~ {} {}'
.format(
int(parm_video_response['response']['annotationResults'][0]['shotAnnotations'][i]\
['endTimeOffset'])/parm_denominator
, 'seconds' if parm_video_api_display_seconds else 'microseconds'
))
else:
print(u'片段 Shot {} : {} ~ {} {}'
.format(i+1
, int(parm_video_response['response']['annotationResults'][0]['shotAnnotations'][i]\
['startTimeOffset'])/parm_denominator
, int(parm_video_response['response']['annotationResults'][0]['shotAnnotations'][i]\
['endTimeOffset'])/parm_denominator
, 'seconds' if parm_video_api_display_seconds else 'microseconds'
))
In [18]:
# parm_video_response['response']['annotationResults'][0]['labelAnnotations']
In [19]:
print(u'[ 视频物体识别 ]')
print(u'物体总数 No. objects : {}'.format(len(parm_video_response['response']['annotationResults'][0]['labelAnnotations'])))
print('')
for i in range(len(parm_video_response['response']['annotationResults'][0]['labelAnnotations'])):
# print each object's name:
print(u'物体 Obj {} : {}'.format(
i+1
, parm_video_response['response']['annotationResults'][0]['labelAnnotations'][i]['description']
))
In [20]:
print(u'[ 视频物体识别 ]')
print(u'物体总数 No. objects : {}'.format(len(parm_video_response['response']['annotationResults'][0]['labelAnnotations'])))
for i in range(len(parm_video_response['response']['annotationResults'][0]['labelAnnotations'])):
# print each object's name:
print('')
print(u'物体 Obj {} : {}\n位于{}处片段 Locations: '.format(
i+1
, parm_video_response['response']['annotationResults'][0]['labelAnnotations'][i]['description']
, len(parm_video_response['response']['annotationResults'][0]['labelAnnotations'][i]['locations'])
))
# print each object's related shot/location in video:
for j in range(len(parm_video_response['response']['annotationResults'][0]['labelAnnotations'][i]['locations'])):
# print(parm_video_response['response']['annotationResults'][0]['labelAnnotations'][i]['locations'][j])
print(' [{}] Confidence : {} | {} | {} '.format(
j+1
, parm_video_response['response']['annotationResults'][0]['labelAnnotations'][i]['locations'][j]['confidence']
, parm_video_response['response']['annotationResults'][0]['labelAnnotations'][i]['locations'][j]['segment']
, parm_video_response['response']['annotationResults'][0]['labelAnnotations'][i]['locations'][j]['level']
))
In [21]:
KudosData_search(['Animal'])
Out[21]:
In [22]:
KudosData_search(['Sky'])
Out[22]:
In [23]:
KudosData_search(['Sam'])
Out[23]:
[ 搜索视频内容 ] 'Animal'
[ 搜索视频内容 ] 'Sky'
In [24]:
# CF = 0.81967264 + 0.4363379 * ( 1 - 0.81967264 ) = ?
print('CF (combined confidence score) = {}'.format(0.81967264 + 0.4363379 * ( 1 - 0.81967264 )))
In [25]:
# * CF = 0.496 + 0.8764819 * ( 1 - 0.496 ) = ?
print('CF (combined confidence score) = {}'.format(0.496 + 0.8764819 * ( 1 - 0.496 )))
In [26]:
KudosData_search(['Animal', 'Sky'])
Out[26]:
In [27]:
KudosData_search(['Animal', 'Grassland'])
Out[27]:
[提问] 如果是三个或更多关键词呢,怎么处理? [Question] Which video shot is most relvant for: Animal, Sky, and Grassland? And why?
In [28]:
HTML(data='''<video alt="test" controls><source src="data:video/mp4;base64,{0}" type="video/mp4" /></video>'''
.format(encode_media(video_file)))
Out[28]:
In [29]:
KudosData_search(['Animal', 'SamGu'])
Out[29]:
< End of interactive demo >
In [30]:
print(KudosData_video_generate_reply(parm_video_response))
In [ ]:
itchat.auto_login(hotReload=True) # hotReload=True: 退出程序后暂存登陆状态。即使程序关闭,一定时间内重新开启也可以不用重新扫码。
# itchat.auto_login(enableCmdQR=-2) # enableCmdQR=-2: 命令行显示QR图片
In [ ]:
# @itchat.msg_register([VIDEO], isGroupChat=True)
@itchat.msg_register([VIDEO])
def download_files(msg):
msg.download(msg.fileName)
print('\nDownloaded video file name is: %s' % msg['FileName'])
##############################################################################################################
# call video analysis APIs #
##############################################################################################################
global parm_video_response # save into global variable, which can be accessed by next WeChat keyword search
parm_video_response = KudosData_VIDEO_DETECTION(encode_media(msg['FileName']))
##############################################################################################################
# format video API results #
##############################################################################################################
video_analysis_reply = KudosData_video_generate_reply(parm_video_response)
print ('')
print(video_analysis_reply)
return video_analysis_reply
In [ ]:
# 单聊模式,基于关键词进行视频搜索:
@itchat.msg_register([TEXT])
def text_reply(msg):
# if msg['isAt']:
list_keywords = [x.strip() for x in msg['Text'].split(',')]
# call video search function:
search_responses = KudosData_search(list_keywords) # return is a list
# Format search results:
search_reply = u'[ Video Search 视频搜索结果 ]' + '\n'
if len(search_responses) == 0:
search_reply += u'[ Nill 无结果 ]'
else:
for i in range(len(search_responses)): search_reply += '\n' + str(search_responses[i])
print ('')
print (search_reply)
return search_reply
In [ ]:
# 群聊模式,基于关键词进行视频搜索:
@itchat.msg_register([TEXT], isGroupChat=True)
def text_reply(msg):
if msg['isAt']:
list_keywords = [x.strip() for x in msg['Text'].split(',')]
# call video search function:
search_responses = KudosData_search(list_keywords) # return is a list
# Format search results:
search_reply = u'[ Video Search 视频搜索结果 ]' + '\n'
if len(search_responses) == 0:
search_reply += u'[ Nill 无结果 ]'
else:
for i in range(len(search_responses)): search_reply += '\n' + str(search_responses[i])
print ('')
print (search_reply)
return search_reply
In [ ]:
itchat.run()
[提问] 是不是可以用图片来搜索视频内容?需要怎么处理? [Question] Can we use image as input to search video content? How?
In [ ]:
# interupt kernel, then logout
itchat.logout() # 安全退出