Pizzafire

A multithreaded job queue for distributed neural network tasks. Automates the deployment and termination of many AWS EC2 g2.2xlarge instances appropriately to minimize cost. Executes commands and monitors activity over ssh.

This notebook is setup to apply A Neural Algorithm of Artistic Style (Gatys, Ecker & Bethge, 2015) to video or very large images.

Style Transfer

Read more here https://github.com/jcjohnson/neural-style

Setup

  1. Register for AWS: https://aws.amazon.com/

  2. Install boto3 and configure AWS credentials. Follow these instructions: https://github.com/boto/boto3

  3. Create a security group in AWS console called "ssh-http".

     Enable inbound SSH (tcp port 22)
     Enable inbound HTTP (tcp port 80)
     Enable outbound all traffic. 
  4. Configure parameters of job. See Configure and Jobs below for examples.

  5. Run all code!


In [ ]:
# @author CJ Carr, 2016 
# http://github.com/cortexelus/pizzafire

import time
import sys
import random
import threading
import boto3
from ntpath import basename

# Should a particular thread take another job, or terminate?
def should_terminate():
    # current time + time it would take to process + extra time allotted for shutdown 
    return time.time() + estimated_process_time + termination_margin >= termination_time

# Should all the threads and VMs terminate?
def should_terminate_all():
    # current time + extra time allotted for shutdown 
    return time.time() + termination_margin >= termination_time

In [ ]:
# This is so we don't have to call flush() everytime we print
# See: http://stackoverflow.com/questions/107705/disable-output-buffering
class Unbuffered(object):
    def __init__(self, stream):
        self.stream = stream
    def write(self, data):
        try: 
            self.stream.write(data)
            self.stream.flush()
        except:
            # couldn't print to stdout for some reason
            # maybe because I switched iPython cells in the middle of it trying to print 
            # whatever, not that important, don't raise the exception 
            pass
    def __getattr__(self, attr):
        return getattr(self.stream, attr)
sys.stdout = Unbuffered(sys.stdout)

In [ ]:
# style_image = local image whose style we are transferring
# content = local image or directory containing individual frames of the video we are processing
# directory = True if content is directory, False if content is image
def add_to_jobs(style_image, content, directory=False):
    if(directory):
        files = !ls "{content}"
        content_directory = content
    else:
        files = [content]
        content_directory = "."
    # status = incomplete | complete | processing
    # last_modified = timestamp of last change to status
    # host = public_dns_address of AWS machine
    # content = filename of content image to process (directory relative to ipynb file)
    # style = filename of style image to process (directory relative to ipynb file)
    new_jobs = [{"status": "incomplete", "host": "", "last_modified": start_time, "style": style_image, "content": content_directory + "/" + f } for f in files]
    jobs.extend(new_jobs)
    #print jobs

In [ ]:
# Nanny thread
# Terminates all VMs before time limit, just in case 
def nanny():
    global workers, jobs, vm_ids, start_time
    time.sleep(30) # wait a bit before thread checking 
    while True: 
        #print "Nanny: Checking threads. . ."
        # check if all jobs are complete
        if all_jobs_complete():
            print "All jobs complete"
            break
        # check if time limit is really close (within terminate_margin)
        if should_terminate_all():
            print "Nanny: The hour draws near..."
            break
        # if there is free time to process another job, check for dead threads, restart them
        if not should_terminate():
            for i in xrange(len(workers)):
                thread = workers[i]
                if not thread.is_alive():
                    # There are still jobs to do, but this thread has terminated
                    # Set its jobs to "incomplete"
                    # TODO ## [job["status"]="incomplete" for job in jobs if job["host"]==]
                    # restart thread
                    new_thread = threading.Thread(target=worker, args=(i,vm_ids[i],))
                    workers[i] = new_thread
                    new_thread.start()
        time.sleep(1) # wait a sec just to make sure new threads appear alive
        # At this point check if all threads have died, if so, end the process early
        if reduce(lambda x, y: x and y, [not t.is_alive() for t in workers]):
            print "All workers have finished, although not all jobs were completed"
            break 
        time.sleep(15)
    # Terminate everything 
    print "Terminating all VMs..."
    [ec2.Instance(id=vm_id).terminate() for vm_id in vm_ids]
    complete = reduce(lambda x, y: x + y, [1 if j["status"] == "complete" else 0 for j in jobs ])
    incomplete = reduce(lambda x, y: x + y, [1 if j["status"] == "incomplete" else 0 for j in jobs ])
    processing = reduce(lambda x, y: x + y, [1 if j["status"] == "processing" else 0 for j in jobs ])
    print "Complete: %i\nIncomplete: %i\nStuck Processing: %i\n" % (complete, incomplete, processing)
    print "\nDone."
    print "Took %s minutes" % ((time.time()-start_time)/60)
    sys.stdout.flush()
    
# Return true if all jobs are complete
def all_jobs_complete():
    global jobs
    return reduce(lambda x, y: x and y, [j["status"] == "complete" for j in jobs])

In [ ]:
# Worker thread. Give it an arbitrary worker id i, and the id of an ec2 instance. 
# 1. Waits for ec2 machine to change to "running" state
# 2. Looks for available jobs and does them. Repeat.
# Terminates when all jobs are done, or if time limit is close. 
def worker(i, vm_id):
    global jobs, jobs_lock, estimated_process_time, shuffle_jobs, vm_ids
    """thread worker function"""
    time.sleep(1)
    print 'Worker: %s %s' % (i, vm_id)
    time.sleep(5) # sometimes the instance doesn't immediately exist
    while ec2.Instance(id=vm_id).state["Name"] != 'running':
        print "%s is still asleep (%s)" % (i, vm_id)
        time.sleep(5)
    print "%s is awake (%s)" % (i, vm_id)
    vm = ec2.Instance(id=vm_id)
    host = vm.public_dns_name
    
    # Look for jobs
    while True:
        # Should Thread Terminate because of time?
        if should_terminate():
            print "The hour draws near. Terminate this VM before amazon charges us another hour. "
            break  
            
        # Should thread terminate because all jobs are complete?
        if all_jobs_complete():
            print "All jobs complete."
            break
            
        # Claim a job
        while(jobs_lock):
            # wait for lock to free up before mutating jobs
            time.sleep(1)
        jobs_lock = True # lock jobs so other threads can't mutate it
        
        next_jobs = [j for j in jobs if j['status']=='incomplete'] # gather incomplete jobs
        if(shuffle_jobs):
            # randomize jobs. this way one machine doesn't get stuck trying to do the same job over and over (if it keeps failing)
            random.shuffle(next_jobs)
        if len(next_jobs):
            my_job = next_jobs[0] # claim the next incomplete job
            
            # UPDATE JOB STATUS 
            my_job["status"] = "processing"
            my_job["host"] = host
            my_job["last_modified"] = time.time()
            
            jobs_lock = False # free lock so other threads can mutate it
            print "Claimed job"
            print my_job
        else:
            jobs_lock = False 
            print "No available jobs, but not all jobs complete. Wait just in case a job becomes available "
            # 
            time.sleep(30)
            continue

        try: 
            # Do job
            doJob(my_job)
            
            time.sleep(1)
            # Success!
            print "Job success %s" % (host)
            print my_job
            while(jobs_lock): time.sleep(1)
            jobs_lock = True
            my_job["status"] = "complete"
            my_job["last_modified"] = time.time()
            jobs_lock = False # free lock so other threads can mutate it
            print "Finished job"
            print my_job
        except:
            # Job failed. 
            raise
            time.sleep(2)
            print "Job fail %s" % (host)
            print my_job
            print sys.exc_info()
            # Return job to "incomplete"
            while(jobs_lock): time.sleep(1)
            jobs_lock = True
            my_job["status"] = "incomplete"
            my_job["last_modified"] = time.time()
            jobs_lock = False
            #raise
        
    ### Thread Finished
    # Terminate VM
    
    print "Terminating %s..." % host 
    vm.terminate()
    
    return

In [ ]:
# Send local file to user@host:dir over scp 
def scpSend(file, user, host, dir):
    for x in xrange(10): 
        # try 10 times to send file 
        try:
            !scp -o "StrictHostKeyChecking no" "{file}" {user}@{host}:{dir}
            break # success
        except:
            # file is probably in use by another thread, or my operating system
            if(x<9):
                pass
            else:
                print "Failed to SCP file %s to %s" % (file, host)
                raise RunTimeError("Failed to SCP file %s to %s" % (file, host)) # hm, fuck
        time.sleep(5) # wait 5 seconds before trying again 

# awesome function that executes a string of commands (separated by new line) on user@host through ssh 
# remember to escape single quotes 
def sshx(commands, user, host):
    commands = "'" + commands + "'"
    tmpfile = "__tmp." + str(random.randint(0,9999999)) + ".sh"
    !echo {commands} > {tmpfile}
    time.sleep(1)
    !cat {tmpfile}| ssh -o "StrictHostKeyChecking no" {user}@{host}
    time.sleep(1)
    !rm {tmpfile}
    sys.stdin.flush()

In [ ]:
# For Debugging
def doJob(job):
    time.sleep(random.randint(10, 80))

In [ ]:
# Moves images to machine over scp, executes code over ssh, wget files back to local machine
def doJob(job):
    global user, num_iterations, image_size
    host = job['host']
    
    # move images over to machine with scp
    style_filename = basename(job['style'])
    content_filename = basename(job['content'])
    scpSend(job['style'], user, host, "~/neural-style")
    scpSend(job['content'], user, host, "~/neural-style")
    
    # execute code 
    sshx(pathStuff+"\n\
cd ~/neural-style \n\
rm output* \n\
th neural_style.lua -num_iterations %s -style_image \"%s\" -content_image \"%s\" -image_size %s -backend cudnn -output_image output.png \n\
tar -cvzf output.tar.gz output_* \n\
"%(num_iterations, style_filename,content_filename,image_size),user,host)
    
    # Save the output 
    # make dirs if they don't exist
    !mkdir ./nn
    !mkdir ./nn/archive
    # wget the files over http, store locally
    wget = !wget --tries=3 http://{host}/nn/neural-style/output.png -O "./nn/{style_filename}_{content_filename}.png"
    # if output.png doesn't exist on the vm, raise exception
    if(wgetFail(wget)):
        !rm "./nn/{style_filename}_{content_filename}.png" # remove the blank file wget makes
        raise RuntimeError("Neural style did not finish. No output.png was created.")
    wget = !wget --tries=3 http://{host}/nn/neural-style/output.tar.gz -O "./nn/archive/{style_filename}_{content_filename}.tar.gz"
    if(wgetFail(wget)):
        !rm "./nn/archive/{style_filename}_{content_filename}.tar.gz" # remove the blank file wget makes
        raise RuntimeError("Failed to retrieve output.tar.gz")

# Returns true of output from !wget failed
# Not sure if this would be different on different systems
def wgetFail(wget):
    return not ("HTTP request sent, awaiting response... 200 OK" in "\n".join(wget))

In [ ]:
# Need to set this pathStuff every time we send commands over SSH
# bash loads different profiles when run non-interactively like this
# See: http://stackoverflow.com/questions/415403/whats-the-difference-between-bashrc-bash-profile-and-environment
# Alternatively these could have been set on the machine in the right profile before I made the image. 
pathStuff = """
export PATH=$PATH:/usr/local/cuda/bin
export LD_LIBRARY_PATH=/usr/local/cuda/lib64:$LD_LIBRARY_PATH;

# added by Anaconda 2.1.0 installer
export PATH="/home/ubuntu/anaconda/bin:$PATH"
export PATH=~/torch-distro/install/bin:$PATH; export LD_LIBRARY_PATH=~/torch-distro/install/lib:$LD_LIBRARY_PATH;

# Adding caffe to PYTHONPATH
export PYTHONPATH="/home/ubuntu/caffe/python:$PYTHONPATH"
export PATH=/home/ubuntu/torch-distro/install/bin:$PATH  # Added automatically by torch-dist
export LD_LIBRARY_PATH=/home/ubuntu/torch-distro/install/lib:$LD_LIBRARY_PATH  # Added automatically by torch-dist
export DYLD_LIBRARY_PATH=/home/ubuntu/torch-distro/install/lib:$DYLD_LIBRARY_PATH  # Added automatically by torch-dist

export LD_LIBRARY_PATH=/home/ubuntu/cudnn-6.5-linux-x64-v2-rc2:$LD_LIBRARY_PATH

"""

In [ ]:
ec2 = boto3.resource('ec2')
# Deploys a new EC2 instance of given machine_type (e.g. "g2.2xlarge"), returns id of new machine
def deployVM(machine_type):
    new_vm = ec2.create_instances(
        ImageId = neuralstyle_ami,
        MinCount = 1,
        MaxCount = 1,
        KeyName = 'exocortex2',
        InstanceType = machine_type,
        SecurityGroups = ['ssh-http']
    )
    print "Creating new instance... %s" % machine_type
    return new_vm[0].id

Configure


In [ ]:
# VM image we start with
# Everything is already installed on this AMI, it's ready to execute stuff
neuralstyle_ami = "ami-e095ca8a"
# If this no longer works, I have stopped paying to host this AMI. 
# Instead follow these instructions: https://gist.github.com/elad/e3a9e3cc609996b13454

# m3.medium for debugging ($0.07/hour) 
#machine_type = 'm3.medium'
# g2.2xlarge for real processing ($0.65/hour)
machine_type = 'g2.2xlarge'

# how many machines we deploy:
number_of_vms = 4 # Double check your limit for the machine type. 10 is default. 

# how long do we run our VMs? Amazon EC2 bills you by rounding up to the hour. 
maximum_hours = 1
# size of output image style transfer
image_size = 400 # anything over 700 may cause GPU memory error
# number of iterations of style transfer
num_iterations = 400 

# AWS machines use user "ubuntu"
user = "ubuntu"
# randomize jobs, this way if one machine keeps failing (example: not enough available memory)
# it doesn't get stuck with the same job, and other machines will pick up those jobs instead
shuffle_jobs = False
# start time
start_time = time.time()
estimated_process_time = 4*60 # about n minutes per render 
termination_time = start_time + 60*60*maximum_hours
termination_margin = 1.5*60 # give a margin of n minutes for shut down.

jobs = [] # list of jobs 
jobs_lock = False # thread lock used when threads mutate jobs

Jobs


In [ ]:
### Applying Style Transfer to still image 
# add_to_jobs("inputs/style.png", "inputs/content.png", False)

### Applying Style Transfer to Video
# Split video into individual frame images using VLC, Quicktime, etc. 
# Leave images in their own directory.
# add_to_jobs("inputs/style.png", "inputs/video_frame_sequence", True)

### Applying Style Transfer to Large Images
# You will likely run out of GPU memory if you want your output image to be larger than 800px * 800px. 
# Therefore, split a large content image into smaller slices, leave them in their own directory.
# Use ImageSlicer.pynb to do this.
# Recombine them later. 

# add_to_jobs("inputs/style.png", "inputs/large_image_slices", True)

# add_to_jobs(style, content, content_is_directory)
# style = local image whose style we are transferring
# content = local content image OR directory containing video frames
# content_is_directory = True | False 

# PIZZAFIRE
add_to_jobs("inputs/pizza.png", "inputs/FireLoop1_seq", True)

Deploy Machines, Execute Threads


In [ ]:
# DEPLOY instances first, then assign them threads.
# If worker thread terminates for some reason, we can recover the instance and run a new worker
vm_ids = []
for i in xrange(number_of_vms):
    # Deploy an AWS ec2 instance
    vm_id = deployVM(machine_type)
    vm_ids.append(vm_id)
    print "Deployed %s, %s" % (machine_type, vm_id)

# worker threads each own a machine, claim jobs, process jobs, and
# terminate their machine when all jobs are done, before time limit
workers = []
for i in xrange(number_of_vms):
    t = threading.Thread(target=worker, args=(i,vm_ids[i],))
    workers.append(t)
    t.start()

# nanny thread double checks worker threads, terminates all VMs before time limit
# this isn't necessary, but since processing costs $$, it's good to double check 
# Outputs status of jobs at end
t = threading.Thread(target=nanny)
t.start()

# When finished, double check AWS console to make sure instances terminated.

# If you stop this process or restart the kernel, you need to terminate the EC2 instances in the AWS console.
# https://console.aws.amazon.com/ec2

# Make animated gifs here: http://gifmaker.me/

In [ ]: