Copyright 2019 Google LLC

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

    https://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.

事前準備

  1. GCP プロジェクト を作成します。
  2. 課金設定 を有効にします。
  3. API Key を作成します。
  4. Cloud Vision API と Cloud Video Intelligence API を有効にします。

Google Cloud API の認証情報を入力

Google Cloud API を REST インタフェースから利用するために、 API Key を利用します。 Google Cloud Console から API Key をコピーしましょう。


In [0]:
import getpass
APIKEY = getpass.getpass()

Cloud Vision API を使ってみよう !

Cloud Vision API では、静止画を認識して画像の中に含まれる物を特定したり、文字を抽出したり (OCR) 、Web上から同一の画像を探す事ができます。このノートブックでは、以下 4 種類の認識を行います。

  • オブジェクト検知 - 画像中の物体の名称や位置を検出します
  • 顔検出 - 画像中の顔検出や表情からの感情予測を行います
  • ロゴ検出 - 画像中に存在する有名ブランドのロゴを検出します
  • ランドマーク検知 - 有名な観光地の画像から、その地名と位置を特定します

Cloud Vision API の準備

API Discovery Service を利用して Cloud Vision API を発見します。 Cloud Vision の REST API 仕様は こちら に解説されています。


In [0]:
from googleapiclient.discovery import build

vision_service = build('vision', 'v1p3beta1', developerKey=APIKEY)

オブジェクト検知

オブジェクト検知とは、画像中のどの領域にどんなオブジェクト(例えば車やリンゴといった一般的な”物”)があるかを特定します。

Colab 環境への画像アップロード

次のセルを実行して、ローカルに保存された objects.jpg をアップロードします。


In [0]:
from google.colab import files
uploaded = files.upload()

再アップロードすると、ファイルは上書きではなく別ファイル名で保存されます。ファイルを消したい場合は以下のコマンドを実行します。


In [0]:
# !rm ./objects.jpg

Request メッセージの構築

API には画像データと、どんな検知をやりたいのかという情報を渡す必要があります。今回は REST API を使っているので、 HTTP Request の Payload に、 JSON 形式で画像データと検知のタイプを指定して渡します。画像データは GCS (Google Cloud Storage) のパスを指定する方法もありますが、今回は直接ファイルの中身を Payload に入れて送ります。直接 Payload に入れる場合は、ファイルの中身を Base64 エンコードする必要があります。


In [0]:
from base64 import b64encode

with open('objects.jpg', 'rb') as image_file:
  my_image = {
      'content': b64encode(image_file.read()).decode('utf-8')
  }

In [0]:
my_features = [
    {'type':'OBJECT_LOCALIZATION', 'model':'builtin/stable'}
]

In [0]:
my_body={
    'requests': [
        {'image': my_image, 'features': my_features}
    ]
}

Request メッセージの送信


In [0]:
response = vision_service.images().annotate(body=my_body).execute()

検出結果の可視化

オブジェクト検知では、検出されたオブジェクトの名称、矩形位置情報(画像中の座標)とその確度が返ってきます。ここではその情報を使って、画像中の何が検出されたのかを描画して確かめてみましょう。次のセルを実行すると検知したオブジェクトをハイライトする便利関数 highlight_objects が定義されます。


In [0]:
#@title 検出したオブジェクトをハイライトする関数 highlight_objects を定義

from PIL import Image, ImageDraw

def highlight_objects(image_file, objects):
  image = Image.open(image_file)
  draw = ImageDraw.Draw(image, "RGBA")
  
  width = image.getbbox()[-2]
  height = image.getbbox()[-1]
  
  for object in objects:
    n_vertex_lt = tuple(object['boundingPoly']['normalizedVertices'][0].values())
    n_vertex_rb = tuple(object['boundingPoly']['normalizedVertices'][2].values())
    
    vertex_lt = (int(n_vertex_lt[0] * width), int(n_vertex_lt[1] * height))
    vertex_rb = (int(n_vertex_rb[0] * width), int(n_vertex_rb[1] * height))
    
    # bounding box
    draw.rectangle(xy=(vertex_lt, vertex_rb), outline='red')
    
    # probability
    object['name']
    draw.text(xy=(vertex_lt[0], vertex_lt[1]-10),
              text=object['name'] + ':' + str(format(object['score'], '.3f')),
              fill='red')    
  display(image)

In [0]:
highlight_objects('objects.jpg', response['responses'][0]['localizedObjectAnnotations'])

Response メッセージの中身を確認

Responseメッセージは、JSONの形式で返ってきます。どんなデータが内包されているのか、確認してみましょう。


In [0]:
response['responses'][0]['localizedObjectAnnotations']

顔検出

顔検出では、画像中にある顔の検出、表情からの感情予測を行います。

Colab 環境に画像を用意する

下記セルを実行すると写真撮影用の関数 take_photo が定義される


In [0]:
#@title このセルを実行すると、写真撮影用の関数 "take_photo" が定義される

from IPython.display import HTML, Image
from google.colab.output import eval_js
from base64 import b64decode

VIDEO_HTML = """
<video autoplay
 width=600 height=450></video>
<script>
var video = document.querySelector('video')
navigator.mediaDevices.getUserMedia({ video: true })
  .then(stream=> video.srcObject = stream)
  
var data = new Promise(resolve=>{
  video.onclick = ()=>{
    var canvas = document.createElement('canvas')
    var [w,h] = [video.offsetWidth, video.offsetHeight]
    canvas.width = w
    canvas.height = h
    canvas.getContext('2d')
          .drawImage(video, 0, 0, w, h)
    video.srcObject.getVideoTracks()[0].stop()
    video.replaceWith(canvas)
    resolve(canvas.toDataURL('image/jpeg', %f))
  }
})
</script>
"""
def take_photo(filename='photo.jpg', quality=0.8):
  display(HTML(VIDEO_HTML % quality))
  data = eval_js("data")
  binary = b64decode(data.split(',')[1])
  with open(filename, 'wb') as f:
    f.write(binary)
  return len(binary)

In [0]:
take_photo(filename='my_face.jpg')

Request メッセージの構築

ここでは、上で撮影した画像を使います。


In [0]:
from base64 import b64encode

with open('my_face.jpg', 'rb') as image_file:
  my_image = {
      'content': b64encode(image_file.read()).decode('utf-8')
  }

In [0]:
my_features = [
    {'type':'FACE_DETECTION', 'model':'builtin/stable'},
]

In [0]:
my_body={
    'requests': [
        {'image': my_image, 'features': my_features}
    ]
}

Request メッセージの送信


In [0]:
response = vision_service.images().annotate(body=my_body).execute()

検出結果の可視化

ここでも、検出した顔の矩形情報が得られるのでそれを描画してみましょう。次のセルを実行すると検知したオブジェクトをハイライトする便利関数 highlight_faces が定義されます。


In [0]:
#@title 検出した顔をハイライトする関数 highlight_faces を定義

from PIL import Image, ImageDraw

def highlight_faces(image_file, faces):
  image = Image.open(image_file)
  draw = ImageDraw.Draw(image, "RGBA")
  
  for face in faces:
    vertex_lt = tuple(face['boundingPoly']['vertices'][0].values())
    vertex_rb = tuple(face['boundingPoly']['vertices'][2].values())
    
    # bounding box
    draw.rectangle(xy=(vertex_lt, vertex_rb), outline='red')
    
    # probability
    draw.text(xy=(vertex_lt[0], vertex_lt[1]-10),
              text=str(format(face['detectionConfidence'], '.3f')),
              fill='red')    
  display(image)

In [0]:
highlight_faces('my_face.jpg', response['responses'][0]['faceAnnotations'])

表情から読み取れる感情の予測結果表示する。


In [0]:
face_response = response['responses'][0]['faceAnnotations']

keys = ['angerLikelihood',  'joyLikelihood', 'sorrowLikelihood',
        'surpriseLikelihood', 'headwearLikelihood']

for key in keys:
  print(key, "==>", face_response[0][key])

ロゴ検知

ロゴ検知では、画像中にある(有名ブランドなどの)ロゴを検出します。

Colab 環境への画像アップロード

次のセルを実行して、ローカルに保存された googleplex.jpg をアップロードします。


In [0]:
from google.colab import files
uploaded = files.upload()

再アップロードすると、ファイルは上書きではなく別ファイル名で保存されます。ファイルを消したい場合は以下のコマンドを実行します。


In [0]:
# !rm ./googleplex.jpg

Request メッセージを構築してみよう

ここでは、 googleplex.jpg の画像を使います。Requestのしかたは先ほどと同じで、画像データと typeが異なるだけです。


In [0]:
from base64 import b64encode

with open('googleplex.jpg', 'rb') as image_file:
  my_image = {
      'content': b64encode(image_file.read()).decode('utf-8')
  }

In [0]:
my_features = [
    {'type':'LOGO_DETECTION', 'model':'builtin/stable'}
]

In [0]:
my_body={
    'requests': [
        {'image': my_image, 'features': my_features}
    ]
}

Request メッセージの送信


In [0]:
response = vision_service.images().annotate(body=my_body).execute()

検出結果を可視化しよう

ここでも、検出したロゴの矩形情報が得られるのでそれを描画してみましょう。次のセルを実行すると検知したオブジェクトをハイライトする便利関数 highlight_logos が定義されます。


In [0]:
#@title 検出したオブジェクトをハイライトする関数 highlight_logos を定義

from PIL import Image, ImageDraw

def highlight_logos(image_file, objects):
  image = Image.open(image_file)
  draw = ImageDraw.Draw(image, "RGBA")
  
  for obj in objects:
    vertex_lt = tuple(obj['boundingPoly']['vertices'][0].values())
    vertex_rb = tuple(obj['boundingPoly']['vertices'][2].values())
    
    # bounding box
    draw.rectangle(xy=(vertex_lt, vertex_rb), outline='red')
    
    # probability
    obj['description']
    draw.text(xy=(vertex_lt[0], vertex_lt[1]-10),
              text=obj['description'] + ':' + str(format(obj['score'], '.3f')),
              fill='red')    
  display(image)

In [0]:
highlight_logos('googleplex.jpg', response['responses'][0]['logoAnnotations'])

Response メッセージの中身を確認

オブジェクト検知とほぼ同じ形式です。midとは、Google Knowledge GraphのIDです。


In [0]:
response['responses'][0]['logoAnnotations']

ランドマーク検知

ランドマーク検知では、有名な観光地や場所を特定することができます。

Colab 環境への画像アップロード

次のセルを実行して、ローカルに保存された osaka.jpg をアップロードします。


In [0]:
from google.colab import files
uploaded = files.upload()

再アップロードすると、ファイルは上書きではなく別ファイル名で保存されます。ファイルを消したい場合は以下のコマンドを実行します。


In [0]:
!rm ./osaka.jpg

Request メッセージの構築


In [0]:
from base64 import b64encode

with open('osaka.jpg', 'rb') as image_file:
  my_image = {
      'content': b64encode(image_file.read()).decode('utf-8')
  }

In [0]:
my_features = [
    {'type':'LANDMARK_DETECTION', 'model':'builtin/stable'}
]

In [0]:
my_body={
    'requests': [
        {'image': my_image, 'features': my_features}
    ]
}

Request メッセージの送信


In [0]:
response = vision_service.images().annotate(body=my_body).execute()

検出結果の可視化

ここでも、検出したランドマークの矩形情報が得られるのでそれを描画してみましょう。次のセルを実行すると検知したオブジェクトをハイライトする便利関数 highlight_landmarks が定義されます。


In [0]:
#@title 検出したオブジェクトをハイライトする関数 highlight_landmarks を定義

from PIL import Image, ImageDraw

def highlight_landmarks(image_file, objects):
  image = Image.open(image_file)
  draw = ImageDraw.Draw(image, "RGBA")
  
  for obj in objects:
    vertex_lt = tuple(obj['boundingPoly']['vertices'][0].values())
    vertex_rb = tuple(obj['boundingPoly']['vertices'][2].values())
    
    # bounding box
    draw.rectangle(xy=(vertex_lt, vertex_rb), outline='red')
    
    # probability
    obj['description']
    draw.text(xy=(vertex_lt[0], vertex_lt[1]-10),
              text=obj['description'] + ':' + str(format(obj['score'], '.3f')),
              fill='red')    
  display(image)

In [0]:
highlight_landmarks('osaka.jpg', response['responses'][0]['landmarkAnnotations'])

プラス One!

Maps APIを使って検知したランドマークの位置を地図に表示しよう。地図の表示は Google Maps Static API を使っているので、別途 API を有効にする必要があります。


In [0]:
from IPython.core.display import HTML

latlng = response['responses'][0]['landmarkAnnotations'][0]['locations'][0]['latLng']

html = """
<img src="https://maps.googleapis.com/maps/api/staticmap?center={},{}&zoom=14&size=680x300&key={}">
""".format(latlng['latitude'], latlng['longitude'], APIKEY)
display(HTML(html))

Response メッセージの中身を確認


In [0]:
response['responses'][0]['landmarkAnnotations']

Tips

複数の検知を一度にリクエストすることもできます。


In [0]:
# Requestのmy_features部分を以下のように複数指定する
my_features = [
    {'type':'OBJECT_LOCALIZATION', 'model':'builtin/stable'},
    {'type':'LOGO_DETECTION', 'model':'builtin/stable'},
    {'type':'LANDMARK_DETECTION', 'model':'builtin/stable'}
]

Cloud Video Intelligence API を使ってみよう !

Cloud Video Intelligence API を使えば動画の中にある物体やシーンの切り替えなどを検知することができます。

Cloud Video Intelligence API の準備

API Discovery Service を利用して Cloud Video Intelligence API を発見します。


In [0]:
import time
from googleapiclient.discovery import build

video_service = build('videointelligence', 'v1', developerKey=APIKEY)

入力動画を確認しよう


In [0]:
#@title 動画を録画するための record_video を定義

# Install required libraries and packages
!apt-get -qq update
!apt-get -qq install -y ffmpeg
!pip install ffmpeg-python

# Define record_audio
import base64
import ffmpeg
import subprocess
import google.colab
from io import BytesIO

def record_video(file_id):
  # Record webm file from Colaboratory.
  video = google.colab._message.blocking_request(
    'user_media',
    {
      'audio': True,
      'video': True,
      'duration': -1
    },
    timeout_sec=600)

  # Convert web file into in_memory file.
  mfile = BytesIO(base64.b64decode(video[video.index(',')+1:]))

  # Store webm file locally.
  print('Generating {}.webm'.format(file_id))

  with open('{0}.webm'.format(file_id), 'wb') as f:
    mfile.seek(0)
    f.write(mfile.read())

  print('Converting {0}.webm to {0}.mp4'.format(file_id))
  !ffmpeg -y -i {file_id}.webm -r 29.97 {file_id}.mp4  

  print('Done')

In [0]:
# @title 動画を再生するための便利関数 `resume_video()` を定義
import io
import base64
from IPython.display import HTML

def resume_video(path_to_mp4):
  video = io.open(path_to_mp4, 'rb').read()
  encoded = base64.b64encode(video)
  return HTML(data="""
      <video width="640" height="360" controls>
          <source src="data:video/mp4;base64,{0}" type="video/mp4" />
      </video>""".format(encoded.decode('ascii')))

In [0]:
# 動画の録画
record_video('sample')

In [0]:
resume_video('sample.mp4')

Video Intelligence API の結果を取得する

Video Intelligence API の実行オプション


In [0]:
# @title 動画ファイルを読み込み base64 エンコードした結果を取得

from base64 import b64encode

path_to_video = "sample.mp4" #@param {type:"string"}
with open(path_to_video, 'rb') as video_file:
  input_content = b64encode(video_file.read()).decode()

In [0]:
# @title リージョン指定 (input_uri の Read Permission に注意)

location_id = "us-east1" #@param ["us-east1", "us-west1", "europe-west1", "asia-east1"]

In [0]:
# @title 有効にする機能を選択

class Features():
  def __init__(self):
    pass
  
  def get(self):
    return [k for k, v in self.__dict__.items() if v == True]
  
features = Features()
features.FEATURE_UNSPECIFIED = False #@param {type:"boolean"}
features.LABEL_DETECTION = True #@param {type:"boolean"}
features.SHOT_CHANGE_DETECTION = False #@param {type:"boolean"}
features.EXPLICIT_CONTENT_DETECTION = False #@param {type:"boolean"}
features.SPEECH_TRANSCRIPTION = False #@param {type:"boolean"}
features.TEXT_DETECTION = True #@param {type:"boolean"}
features.OBJECT_TRACKING = True #@param {type:"boolean"}

In [0]:
# @title ラベルの分析 (LABEL_DETECTION) または オブジェクト追跡 (OBJECT_TRACKING) 用の設定

class LabelDetectionConfig():
  def __init__(self):
    pass
  def get(self):
    return self.__dict__
  
label_detection_config = LabelDetectionConfig()
label_detection_config.labelDetectionMode = "FRAME_MODE" #@param ["LABEL_DETECTION_MODE_UNSPECIFIED", "SHOT_MODE", "FRAME_MODE", "SHOT_AND_FRAME_MODE"]
label_detection_config.stationaryCamera = False #@param {type: "boolean"}
label_detection_config.model = "builtin/stable" #@param ["builtin/stable", "builtin/latest"]
label_detection_config.frameConfidenceThreshold = 0.7 #@param {type:"slider", min:0.1, max:0.9, step:0.1}
label_detection_config.videoConfidenceThreshold = 0.7 #@param {type:"slider", min:0.1, max:0.9, step:0.1}

In [0]:
# @title ショット変更の分析 (SHOT_CHANGE_DETECTION) 用の設定

class ShotChangeDetectionConfig():
  def __init__(self):
    pass
  def get(self):
    return self.__dict__
  
shot_change_detection_config = ShotChangeDetectionConfig()
shot_change_detection_config.model = "builtin/stable" #@param ["builtin/stable", "builtin/latest"]

In [0]:
# @title 不適切なコンテンツの分析 (EXPLICIT_CONTENT_DETECTION) 用の設定

class ExplicitContentDetectionConfig():
  def __init__(self):
    pass
  def get(self):
    return self.__dict__
  
explicit_content_detection_config = ExplicitContentDetectionConfig()
explicit_content_detection_config.model = "builtin/stable" #@param ["builtin/stable", "builtin/latest"]

In [0]:
# @title 音声文字変換 (SPEECH_TRANSCRIPTION) 用の設定

class SpeechTranscriptionConfig():
  def __init__(self):
    pass
  def get(self):
    return self.__dict__

speech_transcription_config = SpeechTranscriptionConfig()
speech_transcription_config.languageCode = "en_US" #@param ["ja_JP", "en_US"]
speech_transcription_config.maxAlternatives = 1 #@param {type:"slider", min:0, max:30, step:1}
speech_transcription_config.filterProfanity = False #@param {type: "boolean"}
speech_transcription_config.enableAutomaticPunctuation = False #@param {type: "boolean"}
speech_transcription_config.enableSpeakerDiarization = False #@param {type: "boolean"}
speech_transcription_config.enableWordConfidence = False #@param {type: "boolean"}

In [0]:
# @title テキスト検出 (TEXT_DETECTION) 用の設定

class TextDetectionConfig():
  def __init__(self):
    pass
  def get(self):
    return self.__dict__
  
class LanguageHints():
  def __init__(self):
    pass
  def get(self):
    return [k for k, v in self.__dict__.items() if v == True]
  
text_detection_config = TextDetectionConfig()


language_hints = LanguageHints()
language_hints.ja_JP = True #@param {type: "boolean"}
language_hints.en_US = False #@param {type: "boolean"}

text_detection_config.languageHints = language_hints.get()

In [0]:
#@title request を構成するための関数 `create_request()` を定義

class Request():
  def __init__(self):
    pass
  
  def get(self):
    return self.__dict__
  

class VideoContext():
  def __init__(self):
    pass
  
  def get(self):
    return self.__dict__

def create_video_context():
  vc = VideoContext()
  list_features = features.get()
  if 'LABEL_DETECTION' in list_features or 'OBJECT_TRACKING' in list_features:
    vc.labelDetectionConfig = label_detection_config.get()
  if 'SHOT_CHANGE_DETECTION' in list_features:
    vc.shotChangeDetectionConfig = shot_change_detection_config.get()
  if 'EXPLICIT_CONTENT_DETECTION' in list_features:
    vc.explicitContentDetectionConfig = explicit_content_detection_config.get()
  if 'SPEECH_TRANSCRIPTION' in list_features:
    vc.speechTranscriptionConfig = speech_transcription_config.get()
  if 'TEXT_DETECTION' in list_features:
    vc.textDetectionConfig = text_detection_config.get()
  return vc.__dict__

def create_request():
  request = Request()
  request.inputContent = input_content
  request.features = features.get()
  request.videoContext = create_video_context()
  request.locationId = location_id
  return request.__dict__

Request メッセージの作成


In [0]:
create_request()

Response メッセージの取得


In [0]:
import time

response = video_service.videos().annotate(body=create_request()).execute()

while(True):
  output = video_service.projects().locations().operations().get(name=response['name']).execute()
  try:
    if output['done']:
      break
  except KeyError:
    print(output['metadata'])
    time.sleep(10)
    
video_intelligence_res = output

オブジェクトトラッキング

オブジェクトトラッキングの結果を反映した動画を作成する


In [0]:
# @title オブジェクトトラッキングの結果を動画にオーバーレイする `create_annotated_video()` を定義

!wget -N https://noto-website-2.storage.googleapis.com/pkgs/NotoSansCJKjp-hinted.zip
!unzip -o NotoSansCJKjp-hinted.zip -d notosans

import cv2
import numpy as np
from collections import defaultdict
from decimal import Decimal, ROUND_HALF_UP, ROUND_HALF_EVEN
from PIL import Image, ImageDraw, ImageFont

def get_image_from_array(img_arr_bgr, width, height):
  img_arr_rgb = cv2.cvtColor(img_arr_bgr, cv2.COLOR_BGR2RGB)
  img = Image.fromarray(img_arr_rgb.astype('uint8'))  
  img = img.resize((width, height), Image.ANTIALIAS)
  return img

def get_frames_from_video(path_to_file, width, height):
  """
  Args:
    path_to_file: path to a video file.
  """
  ret = {}
  
  cap = cv2.VideoCapture(path_to_file)  
  curr_msec = 0.
  prev_msec = 0.
  time_resolution_msec = 0
  
  ret['frames'] = []
  while(cap.isOpened()):
    success, frame = cap.read()
    if success:
      curr_msec = cap.get(cv2.CAP_PROP_POS_MSEC)
      diff_msec = curr_msec - prev_msec
      prev_msec = curr_msec

      diff_msec = Decimal(diff_msec).quantize(
          Decimal('.001'), rounding=ROUND_HALF_UP)
      
      if time_resolution_msec < diff_msec:
        time_resolution_msec = diff_msec
      
      frame_dict = {}
      frame_dict['image'] = get_image_from_array(frame, width, height)
      ret['frames'].append(frame_dict)
    else:
      break
      
  cap.release()
  
  ret['time_resolution_msec'] = time_resolution_msec
  ret['width'] = width
  ret['height'] = height
  return ret

def convert_secstr_to_intmsec(str_val):
  return int(float(str_val.replace('s', '')) * 1000)

def merge_object_annotations(base_dict, object_annotations, annotate_fps=8):
  time_resolution_msec = base_dict['time_resolution_msec']
  entities = []

  # insert bboxes.
  for i, object_annotation in enumerate(object_annotations):
    if not 'description' in object_annotation['entity']:
      continue

    confidence = object_annotation['confidence']
      
    entity = '{:03}_{}'.format(i, object_annotation['entity']['description'])
    entities.append(entity)
    
    for anno_frame in object_annotation['frames']:
      bbox = anno_frame['normalizedBoundingBox']
      time_offset_msec = convert_secstr_to_intmsec(anno_frame['timeOffset'])
      
      index = int(time_offset_msec / time_resolution_msec)
      base_frame_dict = base_dict['frames'][index]
      
      if 'object_annotations' not in base_frame_dict:
        base_frame_dict['object_annotations'] = {}
        
      if entity not in base_frame_dict['object_annotations']:
        base_frame_dict['object_annotations'][entity] = {}
        base_entity_dict = base_frame_dict['object_annotations'][entity]
        base_entity_dict['bbox'] = bbox
        base_entity_dict['confidence'] = confidence
        base_entity_dict['time_offset'] = time_offset_msec


  interpolate_object_annotations(base_dict, entities, annotate_fps)
  return

def interpolate_object_annotations(base_dict, entities, annotate_fps=8):
  # interpolate bboxes between key frames.
  max_annotation_duration = 1000 // annotate_fps

  for entity in entities:
    cache = None
    annotation_duration = 0

    for base_frame_dict in base_dict['frames']:
      
      if 'object_annotations_i' not in base_frame_dict:
        base_frame_dict['object_annotations_i'] = {}
      
      if 'object_annotations' in base_frame_dict:
        if entity in base_frame_dict['object_annotations']:
          cache = base_frame_dict['object_annotations'][entity]
          annotation_duration = 0

      base_frame_dict['object_annotations_i'][entity] = cache
      
      if annotation_duration < max_annotation_duration:
        annotation_duration += int(base_dict['time_resolution_msec'])
      else:
        cache = None
  return

def create_annotated_images(base_dict):
  width = base_dict['width']
  height = base_dict['height']
  fnt = ImageFont.truetype('notosans/NotoSansCJKjp-Regular.otf', 10)
  
  for frame in base_dict['frames']:
    image = frame['image']
    draw = ImageDraw.Draw(image, "RGBA")
    
    object_annotations = frame['object_annotations_i']
    for entity, entity_info in object_annotations.items():
      if entity_info:
        confidence = entity_info['confidence']
        bbox = entity_info['bbox']
        
        try:
          t = bbox['top']
          l = bbox['left']
          r = bbox['right']
          b = bbox['bottom']
        except(KeyError):
          continue

        vertex_lt = (int(l * base_dict['width']), int(t * base_dict['height']))
        vertex_rb = (int(r * base_dict['width']), int(b * base_dict['height']))
        
        
        draw.rectangle(xy=(vertex_lt, vertex_rb), outline='red')
        draw.text(
            xy=(vertex_lt[0], vertex_lt[1]-10),
            text='{}'.format(entity) + ':' + str(format(confidence, '.3f')),
            fill='red', font=fnt)
        
    frame['image_annotated'] = image
  return

def create_annotated_video(
    path_to_input, object_annotations, path_to_output='output.avi',
    width=640, height=480):
  
  base = get_frames_from_video(path_to_input, width=width, height=height)
  merge_object_annotations(base, object_annotations)
  create_annotated_images(base)
  
  
  fourcc = cv2.VideoWriter_fourcc(*'XVID')
  framerate = 1000 // base['time_resolution_msec']
  video_dims = (base['width'], base['height'])
  video = cv2.VideoWriter(path_to_output, fourcc, framerate, video_dims)

  for frame in base['frames']:
    img_annotated = frame['image_annotated']
    video.write(cv2.cvtColor(np.array(img_annotated), cv2.COLOR_RGB2BGR))

  video.release()

In [0]:
key = 'objectAnnotations'

for e in video_intelligence_res['response']['annotationResults']:
  if key in list(e.keys()):
    object_annotations = e['objectAnnotations']

In [0]:
create_annotated_video(
    path_to_input=path_to_video,
    object_annotations=object_annotations,
    path_to_output='output.avi')

In [0]:
!ffmpeg -y -i output.avi -c:v libx264 -crf 19 -preset slow -c:a libfdk_aac -b:a 192k -ac 2 output.mp4

オブジェクトトラッキングの結果を確認する


In [0]:
resume_video('output.mp4')

音声認識


In [0]:
# @title 音声認識結果を出力する便利関数を定義
import textwrap

def print_speech_transcript(response, limit=10):
  num_print = 0
  for e in response['response']['annotationResults']:
    if 'speechTranscriptions' in list(e.keys()):
      for t in e['speechTranscriptions']:
        for b in textwrap.wrap(t['alternatives'][0]['transcript'], 70):
          print(b)
          num_print += 1
          if num_print > limit:
            return 

def print_speech_word(response, limit=10):
  num_print = 0
  for e in response['response']['annotationResults']:
    if 'speechTranscriptions' in list(e.keys()):
      for t in e['speechTranscriptions']:
        for o in t['alternatives'][0]['words']:
          print('{:>7}-{:>7}: {}'.format(o['startTime'], o['endTime'], o['word']))
          num_print += 1
          if num_print > limit:
            return

def print_speech_sentence(response, limit=10):
  num_print = 0
  for e in response['response']['annotationResults']:
    if 'speechTranscriptions' in list(e.keys()):
      for t in e['speechTranscriptions']:
        start_time = None
        words = []
        for o in t['alternatives'][0]['words']:
          if words == []:
            start_time = o['startTime']

          words.append(o['word'])

          if '.' == o['word'][-1]:
            print('{:>7}-{:>7} {}'.format(
                start_time, o['endTime'], ' '.join(words)))
            words = []
            start_time = None
            num_print += 1

          if num_print > limit:
            return

単語レベルのタイムスタンプを表示する


In [0]:
print_speech_word(video_intelligence_res, limit=10)

センテンスレベルのタイムスタンプを表示する


In [0]:
print_speech_sentence(video_intelligence_res, limit=10)

ラベルの分析


In [0]:
key = 'frameLabelAnnotations'

frame_label_annotations = None

for e in video_intelligence_res['response']['annotationResults']:
  if key in list(e.keys()):
    frame_label_annotations = e[key]

In [0]:
for i, annotation in enumerate(frame_label_annotations):
  entity = annotation['entity']['description']
  print('{:>20} {}'.format(entity, annotation['frames']))
  if i > 10:
    break

テキスト検出 (OCR)


In [0]:
key = 'textAnnotations'

text_annotations = None

for e in video_intelligence_res['response']['annotationResults']:
  if key in list(e.keys()):
    text_annotations = e[key]

In [0]:
num_print = 0

for t in text_annotations:
  text = t['text']
  s = t['segments'][0]['segment']
  confidence = t['segments'][0]['confidence']
  if confidence > 0.9:
    print('{:>11} - {:>11}: {}'.format(s['startTimeOffset'], s['endTimeOffset'], text))
    num_print += 1
  if num_print > 10:
    break

ショット変更の分析


In [0]:
#@title ショット分析結果を出力する便利関数を定義

def print_shots(response, limit=10):
  key = 'shotAnnotations'

  shot_annotations = None
  for e in response['response']['annotationResults']:
    if key in list(e.keys()):
      shot_annotations = e[key]

  for i, e in enumerate(shot_annotations):
    print('{:>11}-{:>11}: Scene-{:03}'.format(
      e['startTimeOffset'], e['endTimeOffset'], i))
    if i > limit:
      return

In [0]:
print_shots(video_intelligence_res, limit=10)