In [48]:
import time
import collections
#import worker
import server
import medareda_lib
from datetime import timedelta
import urllib3
urllib3.disable_warnings()
import worker_status
#import work
In [49]:
import server
class Office(object):
def __init__(self,work):
self.work = work
self.basename = "%s-%s" %('medaredaworker',self.work)
name = ''
def server_states(self):
return server.get_server_states()
def countServers(self):
return server.countServers(self.basename)
def countWorkers(self):
return worker_status.countWorkers(self.basename)
def addWorker(self):
image_name = '%s-i' %self.basename
#image_name = 'medareda-worker-iprice_image'
name = server.createServer(self.basename, image_name)
worker_status.addWorker(name)
def removeWorker(self): # deleteServer
delete_name = server.delete(self.basename)
print 'DELETED', delete_name
worker_status.removeWorker(delete_name)
def getProcessQueueLength(self):
conn = medareda_lib.get_conn()
cur = conn.cursor()
sql = "SELECT count(*) from iPrice where status = 'wait' "
cur.execute(sql)
results = cur.fetchall()
conn.commit()
conn.close()
count = results[0][0]
return count
def getProcessQueueTime(self):
conn = medareda_lib.get_conn()
cur = conn.cursor()
sql = "SELECT min(date),max(date) from iPrice where status = 'wait' "
cur.execute(sql)
results = cur.fetchall()
conn.commit()
conn.close()
print results
min_date = results[0][0]
max_date = results[0][1] # could use now
if max_date == None: # ( no waiting rows)
return timedelta(seconds=0)
queue_time = max_date - min_date
return queue_time
def reasonToStartServer(self):
if self.countServers() > 20: # max servers
return False
if self.countServers() < 2:
print 'TO FEW WORKERS'
return True
queue_length = self.getProcessQueueLength()
if queue_length > 100: # TODO max queue lenght
print 'QUEUE LENGHT TO LONG', queue_length
return True
if self.getProcessQueueTime() > timedelta(seconds=30):
print "PROCESSING TO SLOW"
return True
return False
def reasonToStopServer(self):
if self.countServers() < 3 : # min workers 2
self.False
if self.getProcessQueueLength() < self.countWorkers():
print "MORE SERVERS THAN WORK"
return True
#if self.getProcessQueueTime() > 1: # min
# return True
return False
def manageWorkers(self):
# get queue length (work)
# get max time in queue (work)
# get number of workers building
# get number of workers working
if self.reasonToStartServer(): # work > 100 or time in queue > 1hr
self.addWorker()
elif self.reasonToStopServer(): # que < servers * 2
time.sleep(5)
if self.reasonToStopServer():
self.removeWorker()
# else:
# time.sleep(5)
print 'x'
pass
def shutDown(self):
print 'Office.shutDown', self.basename
for server_name, status in worker_status.getServersNameStatus(self.basename):
print 'Found', server_name,status
if status == 'work':
worker_status.updateWorkerStatus(server_name,'cooldown')
time.sleep(5)
server.deleteServer(server_name)
worker_status.removeWorker(server_name)
elif status in ('standby','cooldown'):
server.deleteServer(server_name)
worker_status.removeWorker(server_name)
#server.shut_down_all(self.basename)
#return server.countServers(self.basename)
class OfficeManager(object):
def __init__(self,name,num_start_workers):
self.office = Office(name) # 1-2-1 at this stage, each mgr has one office, busness level logic
self.setNumWorkers(num_start_workers)
# to think about, servers with the same name
def addWorkers(self, num_workers):
for i in range(num_workers):
self.office.addWorker()
def removeWorkers(self, num_workers):
for i in range(num_workers):
self.office.removeWorker()
def setNumWorkers(self, num_workers):
workers = self.office.countWorkers()
need_to_start = num_workers - workers
if need_to_start > 0:
self.addWorkers(need_to_start)
def shutOffice(self):
self.office.shutDown() # cant delete when severs have the same name
test
In [50]:
def testOfficeManager():
work = 'iprice'
start_workers = 4
o_mgr = OfficeManager(work,start_workers)
#o_mgr.addWorkers(2)
#o_mgr.removeWorkers(1)
#o_mgr.setNumWorkers(7) # TODO remove workers ??
#o_mgr.shutOffice()
# o_mgr.getWorkCount()
## o_mgr.getWorkWait()
# for 5 mins
# min standby
# max stanby
# min workers
'''
read_varaibles
if stop = True
break
if standby < min_standby:
createStandby(minstandby)
if standby > max_standby:
deleteStandby(maxStandby)
if work_count > 10:
worker_start()
if work_count == 0:
worker_cooldown()
remove any cooldown servers
pass
'''
testOfficeManager()
print 'Done'
In [ ]:
In [58]:
work = 'iprice'
print 'start'
def test():
office = Office(work)
count = office.countServers()
print 'Server count =', count
count = office.countWorkers()
print 'Worker count =', count
#office.addWorker()
#office.removeWorker()
#office.getAmountOfWork() # queue lenght * weight
# for 1 hour, while time < now + 1hr
#print office.getProcessQueueTime()
#office.manageWorkers()
#office.shutDown()
test()
In [ ]:
In [ ]:
In [ ]:
In [ ]:
In [5]:
import mock
import time
#class workerTestCase(unittest.TestCase):
@mock.patch('server._create_server')
@mock.patch('server._delete')
def test_office(c,d):
#w = worker.Worker1('iPrice')
#time.sleep(5)
#w.delete()
#def test_office(office):
office = Office('iPrice')
print 'Pre Number of servers in office', office.countServers()
print 'Pre Number of workers in office =', office.countWorkers()
#office.addWorker()
#time.sleep(5)
office.removeWorker()
print 'Post Number of servers in office', office.countServers()
print 'Post Number of workers in office =', office.countWorkers()
test_office()
In [ ]:
In [ ]:
In [ ]:
In [ ]: