In [ ]:
# URL of http video stream
url = ''
# Set number of seconds to retrieve images from http video stream
run_time = 60
# Set range of upsampling to do on image while calling face.face_locations
upsampling = [1,2,3]

In [ ]:
# coding: utf-8
import glob
import pickle
import sys
import time
import urllib
from collections import defaultdict
from os.path import join
from urllib.request import urlopen
import dlib
import face_recognition_models
import numpy as np
from PIL import Image
from skimage import io
import cv2
from face import face

In [ ]:
# Get single frame from http video stream url
try:
    with urlopen(url) as stream:
        data=b''
        img = None
        i = None
        done = False
        while not done:
            data+=stream.read(1024) 
            a = data.find(b'\xff\xd8')
            b = data.find(b'\xff\xd9')
            if a!=-1 and b!=-1:
                jpg = data[a:b+2]
                data = data[b+2:]
                i = cv2.imdecode(np.fromstring(jpg, dtype=np.uint8),cv2.IMREAD_COLOR)
                i2 = cv2.cvtColor(i, cv2.COLOR_BGR2RGB)
                img = Image.fromarray(i2)
                done = True
except urllib.error.HTTPError as HTTPError:
    print("HTTPError; Wait and try again.")
except urllib.error.URLError as URLError:
    print("URLError; Check url.")

In [ ]:
img

In [ ]:
# Analyze single frame for faces
im_arr = np.fromstring(img.tobytes(), dtype=np.uint8)
im_arr = im_arr.reshape((img.size[1], img.size[0], 3))
for level in upsampling:
    print("Trying upsampling:",level)
    list_face_locations = face.face_locations(im_arr,level)
    list_face_encodings = face.face_encodings(im_arr , list_face_locations)
    for face_encoding, face_location in zip(list_face_encodings, list_face_locations):
        (top, right, bottom, left) = face_location
        cv2.rectangle(im_arr, (left, top),(right,bottom), (0, 255, 0), 2)
        print('found face', face_location)
    if len(list_face_locations)>0: 
        cv2.cvtColor(im_arr, cv2.COLOR_BGR2RGB)
        #cv2.imwrite("/in/test.jpg",im_arr)
        image_boxes = Image.fromarray(im_arr)
        from IPython.display import display
        display(image_boxes)
        break
    else:
        print("No faces found")

In [ ]:


In [ ]:
# Repeatedly pull images from url stream for a set period of time and scan for faces

def getImage(url):
    try:
        with urlopen(url) as stream:
            data = b''
            img = None
            i = None
            done = False
            while not done:
                data += stream.read(1024)
                start = data.find(b'\xff\xd8')
                end = data.find(b'\xff\xd9')
                if start != -1 and end != -1:
                    jpg = data[start:end + 2]
                    data = data[end + 2:]
                    i = cv2.imdecode(np.fromstring(
                        jpg, dtype=np.uint8), cv2.IMREAD_COLOR)
                    i2 = cv2.cvtColor(i, cv2.COLOR_BGR2RGB)
                    img = Image.fromarray(i2)
                    done = True
            return img
    except urllib.error.HTTPError as HTTPError:
        print("HTTPError; Wait and try again.")
        return None
    except urllib.error.URLError as URLError:
        print("URLError; Check url.")
        return None
    
def findCameraFace(url, sleep, end):
    found_people = []
    print("Frame at", time.asctime( time.localtime(time.time()) ))
    img = getImage(url)    
    if img is not None:
        im_arr = np.fromstring(img.tobytes(), dtype=np.uint8)
        im_arr = im_arr.reshape((img.size[1], img.size[0], 3))
        for level in upsampling:
            #print("Trying upsampling:",level)
            list_face_locations = face.face_locations(im_arr,level)
            list_face_encodings = face.face_encodings(im_arr , list_face_locations)
            for face_encoding, face_location in zip(list_face_encodings, list_face_locations):
                (top, right, bottom, left) = face_location
                cv2.rectangle(im_arr, (left, top),(right,bottom), (0, 255, 0), 2)
                print('found face', face_location)
            if len(list_face_locations)>0: 
                found_people.append((img, im_arr, list_face_locations, list_face_encodings))
                return found_people    
    time.sleep(sleep)
    return []

found_people = []        
start = int(time.time())
end = start + run_time

while int(time.time()) <= end:
    found_people += findCameraFace(url, 0, end)
print("done")

In [ ]:
# Cycles every 2 seconds through detected faces on images from http video stream
from IPython.display import display, clear_output

for person in range(len(found_people)):
    clear_output(wait=True)
    im_arr = found_people[person][1]
    cv2.cvtColor(im_arr, cv2.COLOR_BGR2RGB)
    #cv2.imwrite("/out/face_"+str(person)+".jpg",im_arr)
    image_box = Image.fromarray(im_arr)
    display(image_box)
    time.sleep(2)
print("done")

In [ ]: