In [ ]:
# URL of http video stream
url = ''
# Set number of seconds to retrieve images from http video stream
run_time = 60
# Set range of upsampling to do on image while calling face.face_locations
upsampling = [1,2,3]
In [ ]:
# coding: utf-8
import glob
import pickle
import sys
import time
import urllib
from collections import defaultdict
from os.path import join
from urllib.request import urlopen
import dlib
import face_recognition_models
import numpy as np
from PIL import Image
from skimage import io
import cv2
from face import face
In [ ]:
# Get single frame from http video stream url
try:
with urlopen(url) as stream:
data=b''
img = None
i = None
done = False
while not done:
data+=stream.read(1024)
a = data.find(b'\xff\xd8')
b = data.find(b'\xff\xd9')
if a!=-1 and b!=-1:
jpg = data[a:b+2]
data = data[b+2:]
i = cv2.imdecode(np.fromstring(jpg, dtype=np.uint8),cv2.IMREAD_COLOR)
i2 = cv2.cvtColor(i, cv2.COLOR_BGR2RGB)
img = Image.fromarray(i2)
done = True
except urllib.error.HTTPError as HTTPError:
print("HTTPError; Wait and try again.")
except urllib.error.URLError as URLError:
print("URLError; Check url.")
In [ ]:
img
In [ ]:
# Analyze single frame for faces
im_arr = np.fromstring(img.tobytes(), dtype=np.uint8)
im_arr = im_arr.reshape((img.size[1], img.size[0], 3))
for level in upsampling:
print("Trying upsampling:",level)
list_face_locations = face.face_locations(im_arr,level)
list_face_encodings = face.face_encodings(im_arr , list_face_locations)
for face_encoding, face_location in zip(list_face_encodings, list_face_locations):
(top, right, bottom, left) = face_location
cv2.rectangle(im_arr, (left, top),(right,bottom), (0, 255, 0), 2)
print('found face', face_location)
if len(list_face_locations)>0:
cv2.cvtColor(im_arr, cv2.COLOR_BGR2RGB)
#cv2.imwrite("/in/test.jpg",im_arr)
image_boxes = Image.fromarray(im_arr)
from IPython.display import display
display(image_boxes)
break
else:
print("No faces found")
In [ ]:
In [ ]:
# Repeatedly pull images from url stream for a set period of time and scan for faces
def getImage(url):
try:
with urlopen(url) as stream:
data = b''
img = None
i = None
done = False
while not done:
data += stream.read(1024)
start = data.find(b'\xff\xd8')
end = data.find(b'\xff\xd9')
if start != -1 and end != -1:
jpg = data[start:end + 2]
data = data[end + 2:]
i = cv2.imdecode(np.fromstring(
jpg, dtype=np.uint8), cv2.IMREAD_COLOR)
i2 = cv2.cvtColor(i, cv2.COLOR_BGR2RGB)
img = Image.fromarray(i2)
done = True
return img
except urllib.error.HTTPError as HTTPError:
print("HTTPError; Wait and try again.")
return None
except urllib.error.URLError as URLError:
print("URLError; Check url.")
return None
def findCameraFace(url, sleep, end):
found_people = []
print("Frame at", time.asctime( time.localtime(time.time()) ))
img = getImage(url)
if img is not None:
im_arr = np.fromstring(img.tobytes(), dtype=np.uint8)
im_arr = im_arr.reshape((img.size[1], img.size[0], 3))
for level in upsampling:
#print("Trying upsampling:",level)
list_face_locations = face.face_locations(im_arr,level)
list_face_encodings = face.face_encodings(im_arr , list_face_locations)
for face_encoding, face_location in zip(list_face_encodings, list_face_locations):
(top, right, bottom, left) = face_location
cv2.rectangle(im_arr, (left, top),(right,bottom), (0, 255, 0), 2)
print('found face', face_location)
if len(list_face_locations)>0:
found_people.append((img, im_arr, list_face_locations, list_face_encodings))
return found_people
time.sleep(sleep)
return []
found_people = []
start = int(time.time())
end = start + run_time
while int(time.time()) <= end:
found_people += findCameraFace(url, 0, end)
print("done")
In [ ]:
# Cycles every 2 seconds through detected faces on images from http video stream
from IPython.display import display, clear_output
for person in range(len(found_people)):
clear_output(wait=True)
im_arr = found_people[person][1]
cv2.cvtColor(im_arr, cv2.COLOR_BGR2RGB)
#cv2.imwrite("/out/face_"+str(person)+".jpg",im_arr)
image_box = Image.fromarray(im_arr)
display(image_box)
time.sleep(2)
print("done")
In [ ]: