In [1]:
# These are all the modules we'll be using later. Make sure you can import them
from __future__ import print_function
import matplotlib.pyplot as plt
import numpy as np
import os
import sys
import tarfile
import json
from six.moves.urllib.request import urlretrieve
In [2]:
url = 'http://orange.com/wgmn4901/data/'
def maybe_download(filename, expected_bytes, force=False):
"""Download a file if not present, and make sure it's the right size."""
if force or not os.path.exists(filename):
filename, _ = urlretrieve(url + filename, filename) # may not work as expected
statinfo = os.stat(filename)
if statinfo.st_size == expected_bytes:
print('Found and verified', filename)
else:
raise Exception(
'Failed to verify ' + filename + '. Can you get to it with a browser?')
return filename
clustersnapshot = maybe_download('clustersnapshot24022016.tar.gz', 249468)
In [3]:
def maybe_extract(filename,force=False):
root = os.path.splitext(os.path.splitext(filename)[0])[0] # remove .tar.gz
if os.path.isdir(root) and not force:
# You may override by setting force=True.
print('%s already present - Skipping extraction of %s.' % (root, filename))
else:
print('Extracting data for %s. This may take a while. Please wait.' % root)
tar = tarfile.open(filename)
sys.stdout.flush()
tar.extractall()
tar.close()
data_folders = [
os.path.join(root, d) for d in sorted(os.listdir(root))
if os.path.isdir(os.path.join(root, d))]
print('Found the following application profile in %s:' % filename)
for folder in data_folders:
print (' %s' % folder)
return data_folders
clustersnapshot_folders = maybe_extract(clustersnapshot)
In [4]:
import pandas as pd #this is how I usually import pandas
def load_application(folder):
"""Load the container data for each application."""
container_index = 0
container_name_list = []
container_starttime_list = []
container_endtime_list = []
container_task_list = []
container_node_list = []
container_node_mem_total_list = []
container_node_CPU_total_list = []
container_node_mem_rsrv_list = []
container_node_CPU_rsrv_list = []
container_mem_assigned_list = []
container_CPU_assigned_list = []
app_name_list = []
application_starttime = 100000000000000 # really big number
application_endtime = 0
for container in os.listdir(folder):
container_file = os.path.join(folder, container)
try:
with open(container_file) as data_file:
data = json.load(data_file)
container_name = data["container_id"]
container_starttime = data["container_starttime"]
if container_starttime < application_starttime:
application_starttime = container_starttime
container_endtime = data["container_endtime"]
if container_endtime > application_endtime:
application_endtime = container_endtime
"""gather info for container"""
container_task = data["container_tasks"][0]["task_type"] #First task only
app_name = data["application_info"]["appId"]
if not data["nodeInfo"]:
container_node = "null"
container_node_mem_total = 0
container_node_CPU_total = 0
container_node_mem_rsrv = 0
container_node_CPU_rsrv = 0
container_mem_assigned = 0
container_CPU_assigned = 0
else:
container_node = data["nodeInfo"]["id"]
container_node_mem_total = data["nodeInfo"]["totalNodePhyMem"]
container_node_CPU_total = data["nodeInfo"]["totalNodePhyCore"]
container_node_mem_rsrv = data["nodeInfo"]["totalPmemAllocatedContainersMB"] * 1024
container_node_CPU_rsrv = data["nodeInfo"]["totalVCoresAllocatedContainers"]
container_mem_assigned = data["MemoryAssignedMB"] * 1024
container_CPU_assigned = data["VCoreAssigned"]
"""make list to create dataframe"""
container_name_list.insert(container_index,container_name)
container_starttime_list.insert(container_index,container_starttime)
container_endtime_list.insert(container_index,container_endtime)
container_task_list.insert(container_index,container_task)
container_node_list.insert(container_index,container_node)
container_node_mem_total_list.insert(container_index,container_node_mem_total)
container_node_CPU_total_list.insert(container_index,container_node_CPU_total)
container_node_mem_rsrv_list.insert(container_index,container_node_mem_rsrv)
container_node_CPU_rsrv_list.insert(container_index,container_node_CPU_rsrv)
container_mem_assigned_list.insert(container_index,container_mem_assigned)
container_CPU_assigned_list.insert(container_index,container_CPU_assigned)
app_name_list.insert(container_index,app_name)
if not container_name:
raise Exception('Unexpected container name')
if container_starttime == 0:
raise Exception('Unexpected container start time')
if container_endtime == 0:
raise Exception('Unexpected container end time')
container_index += 1
except IOError as e:
print('Could not read:', container_file, ':', e)
DataSet = list(zip(container_starttime_list,container_endtime_list,
container_task_list,container_node_list,container_node_mem_total_list,
container_node_CPU_total_list,container_node_mem_rsrv_list,container_node_CPU_rsrv_list,
container_mem_assigned_list, container_CPU_assigned_list, app_name_list))
columns=[ 'start', 'end', 'task', 'node','n_mem','n_CPU',
'mem_rsrv','CPU_rsrv','mem','CPU','app']
DataFrame = pd.DataFrame(data = DataSet, index=container_name_list,
columns=columns)
return DataFrame, application_starttime, application_endtime
def load_snapshot(snapshot_folders):
snapshot_starttime = 100000000000000 # really big number
snapshot_endtime = 0
snapshot_dataframe = pd.DataFrame()
for application_folder in snapshot_folders:
dataframe, application_starttime, application_endtime = load_application(application_folder)
if application_starttime < snapshot_starttime:
snapshot_starttime = application_starttime
if application_endtime > snapshot_endtime:
snapshot_endtime = application_endtime
if snapshot_dataframe.empty:
snapshot_dataframe = dataframe
else:
snapshot_dataframe = snapshot_dataframe.append(dataframe)
return snapshot_dataframe, snapshot_starttime, snapshot_endtime
snapshot_dataframe,snapshot_starttime,snapshot_endtime = load_snapshot(clustersnapshot_folders)
print('Snapshot start time: %s' % snapshot_starttime)
print('Snapshot end time: %s' % snapshot_endtime)
snapshot_duration = snapshot_endtime-snapshot_starttime
print('Snapshot duration: %s ms (or ~%s s or ~%s m)' % (snapshot_duration,snapshot_duration/1000,snapshot_duration/60000))
In [5]:
# General syntax to import a library but no functions:
##import (library) as (give the library a nickname/alias)
import matplotlib.pyplot as plt
import matplotlib #only needed to determine Matplotlib version number
from random import randint
import pylab
# Enable inline plotting
%matplotlib inline
pylab.rcParams['figure.figsize'] = (15.0, 8.0)
random_application = clustersnapshot_folders[randint(0,len(clustersnapshot_folders)-1)]
print("App %s is selected for this example" % random_application.split('/')[1])
DataFrame, application_starttime, application_endtime = load_application(random_application)
DataFrameSorted = DataFrame.sort(columns="start")
value = 1
for index in DataFrameSorted.index:
starttime_rescaled = DataFrameSorted.loc[index,"start"]/1000 - snapshot_starttime/1000
endtime_rescaled = DataFrameSorted.loc[index,"end"]/1000 - snapshot_starttime/1000
task_duration = endtime_rescaled - starttime_rescaled
x = np.linspace(starttime_rescaled,endtime_rescaled,task_duration)
container_data = np.linspace(value,value,task_duration)
if DataFrameSorted.loc[index,"task"] == "MAP":
plt.plot(x,container_data,"ro")
elif DataFrameSorted.loc[index,"task"] == "REDUCE":
plt.plot(x,container_data,"bo")
elif DataFrameSorted.loc[index,"task"] == "AM":
plt.plot(x,container_data,"yo")
else:
print("not applicable for %s" % index)
value = value + 1
In [6]:
SnapshotDataFrame = snapshot_dataframe.sort(columns="node")
print('List of containers with no information on node ID:')
SnapshotDataFrame.groupby('node').count().task
SnapshotDataFrame.groupby('node').count().sum()
float(55)/323
Out[6]:
In [7]:
##### pylab.rcParams['figure.figsize'] = (15.0, 8.0)
snapshot_dataframe,snapshot_starttime,snapshot_endtime = load_snapshot(clustersnapshot_folders)
snapshot_duration = snapshot_endtime - snapshot_starttime
SnapshotDFSorted = snapshot_dataframe.sort(columns="start")
#print(DataFrameSorted)
# Initialize slot for drawing
slot = {}
for node in snapshot_dataframe.groupby('node').count().task.index:
slot[node] = 1
slot_max = 6
for index in SnapshotDFSorted.index:
if slot[SnapshotDFSorted.loc[index,"node"]] >= slot_max:
slot[SnapshotDFSorted.loc[index,"node"]] = 1 #reset slot
starttime_rescaled = SnapshotDFSorted.loc[index,"start"]/1000 - snapshot_starttime/1000
endtime_rescaled = SnapshotDFSorted.loc[index,"end"]/1000 - snapshot_starttime/1000
task_duration = endtime_rescaled - starttime_rescaled
if SnapshotDFSorted.loc[index,"node"] == "null":
current_slot = slot[SnapshotDFSorted.loc[index,"node"]]
else:
current_slot = (int(SnapshotDFSorted.loc[index,"node"][3:5]) * slot_max) + slot[SnapshotDFSorted.loc[index,"node"]]
#print(str(int(SnapshotDFSorted.loc[index,"node"][3:5])))
x = np.linspace(starttime_rescaled,endtime_rescaled-1,task_duration)
container_data = np.linspace(current_slot,current_slot,task_duration)
if SnapshotDFSorted.loc[index,"task"] == "MAP":
plt.plot(x,container_data,"ro")
elif SnapshotDFSorted.loc[index,"task"] == "REDUCE":
plt.plot(x,container_data,"bo")
elif SnapshotDFSorted.loc[index,"task"] == "AM":
plt.plot(x,container_data,"yo")
else:
print("not applicable for %s" % index)
slot[SnapshotDFSorted.loc[index,"node"]] = slot[SnapshotDFSorted.loc[index,"node"]] + 1
# draw line separate node:
x = np.linspace(0,snapshot_duration/1000-1,snapshot_duration/1000)
for line in np.linspace(1,len(slot)+3,len(slot)+3): # 4 locations for null
y_value = line * slot_max
y = np.linspace(y_value,y_value,snapshot_duration/1000)
plt.plot(x,y,'-b')
plt.text(0.2,slot_max*1-3,"null")
plt.text(0.2,slot_max*2-3,"svr01")
plt.text(0.2,slot_max*3-3,"svr02")
plt.text(0.2,slot_max*4-3,"svr03")
plt.text(0.2,slot_max*5-3,"svr04")
plt.text(0.2,slot_max*6-3,"svr05")
plt.text(0.2,slot_max*7-3,"svr06")
plt.text(0.2,slot_max*8-3,"svr07")
plt.text(0.2,slot_max*9-3,"svr08")
plt.text(0.2,slot_max*10-3,"svr09")
plt.text(0.2,slot_max*11-3,"svr10")
plt.text(0.2,slot_max*12-3,"svr11")
plt.text(0.2,slot_max*13-3,"svr12")
plt.text(0.2,slot_max*14-3,"svr13")
plt.text(0.2,slot_max*15-3,"svr14")
plt.text(0.2,slot_max*16-3,"svr15")
plt.axis('off')
Out[7]:
In [8]:
def load_application_cpu_profile(folder):
"""Load the container data for each application."""
record_index_list = []
container_name_list = []
capture_time_list = []
container_load_list = []
cpu_us_list = []
cpu_sys_list = []
cpu_id_list = []
fail_count = 0
record_index = 0
for container in os.listdir(folder):
container_file = os.path.join(folder, container)
try:
with open(container_file) as data_file:
data = json.load(data_file)
container_name = data["container_id"]
if not data["cpu_usage"]:
print('No infomation for container %s, ignore it' % container_name)
fail_count = fail_count + 1
else:
for record in data["cpu_usage"]:
container_capture_time = record["capture_time"]
container_load = record["container_load"] / 100
cpu_us = record["cpu_us"]
cpu_sys = record["cpu_sy"]
cpu_id = record["cpu_id"]
'''make list to create dataframe'''
record_index_list.insert(record_index,record_index)
container_name_list.insert(record_index,container_name)
capture_time_list.insert(record_index,container_capture_time)
container_load_list.insert(record_index,container_load)
cpu_us_list.insert(record_index,cpu_us)
cpu_sys_list.insert(record_index,cpu_sys)
cpu_id_list.insert(record_index,cpu_id)
record_index += 1
if not container_name:
raise Exception('Unexpected container name')
except IOError as e:
print('Could not read:', container_file, ':', e)
DataSet = list(zip(capture_time_list,container_name_list,container_load_list,cpu_us_list,cpu_sys_list,cpu_id_list))
columns=[ 'timestamp','name','cpu_load', 'cpu_us', 'cpu_sys', 'cpu_id']
DataFrame = pd.DataFrame(data = DataSet, index=record_index_list,columns=columns)
return DataFrame, fail_count
def load_snapshot_cpu_profile(snapshot_folder):
snapshot_dataframe = pd.DataFrame()
snapshot_failcount = 0
for application_folder in snapshot_folder:
dataframe, fail_count = load_application_cpu_profile(application_folder)
if snapshot_dataframe.empty:
snapshot_dataframe = dataframe
else:
snapshot_dataframe = snapshot_dataframe.append(dataframe)
snapshot_failcount = snapshot_failcount + fail_count
return snapshot_dataframe
snapshot_cpu_profile = load_snapshot_cpu_profile(clustersnapshot_folders)
snapshot_cpu_profile
Out[8]:
In [9]:
def get_cpu_actual_use(container_name, snapshot_dataframe, snapshot_cpu_profile):
container_starttime = snapshot_dataframe.loc[container_name,'start']
container_endtime = snapshot_dataframe.loc[container_name,'end']
rescaled_container_starttime = container_starttime / 1000
rescaled_container_endtime = container_endtime / 1000
duration = rescaled_container_endtime - rescaled_container_starttime
container_index = np.linspace(rescaled_container_starttime, rescaled_container_endtime -1,duration)
data = np.linspace(0,0,duration)
container_data2 = pd.Series(data,index=pd.to_datetime(container_index,unit='s'))
cpu_container_filtered = snapshot_cpu_profile[(snapshot_cpu_profile.name == container_name)]
index = cpu_container_filtered.timestamp.tolist()
cpu_load = cpu_container_filtered.cpu_load.tolist()
container_data1 = pd.Series(cpu_load,index=pd.to_datetime(index,unit='s'))
container_data3 = container_data1 + container_data2
container_data4 = container_data3.interpolate(method='time')
for index in container_data4.index:
if pd.isnull(container_data4[index]):
container_data4[index] = container_data4.mean()
if len(container_data4) > len(container_data2):
a = set(container_data4.index.tolist()) - set(container_data2.index.tolist())
print(a)
#print(container_data4)
#print(container_data3)
#print(container_data2.index)
#raise Exception('Return length mismatch')
return container_data4
# Filter to remove containers with no informaction of the node, or to select specific nodes
SnapshotDFFiltered = SnapshotDFSorted[(SnapshotDFSorted.node != 'null')]
for i in SnapshotDFFiltered.index:
a = get_cpu_actual_use(i,snapshot_dataframe, snapshot_cpu_profile)
plt.plot(a)
In [10]:
def load_application_mem_profile(folder):
"""Load the container data for each application."""
record_index_list = []
container_name_list = []
capture_time_list = []
container_mem_virt_list = []
container_mem_res_list = []
container_mem_shr_list = []
used_pmem_list = [] #system info
swap_cached_list = [] # system info
fail_count = 0
record_index = 0
for container in os.listdir(folder):
container_file = os.path.join(folder, container)
try:
with open(container_file) as data_file:
data = json.load(data_file)
container_name = data["container_id"]
if not data["memory_usage"]:
print('No infomation for container %s, ignore it' % container_name)
fail_count = fail_count + 1
else:
for record in data["memory_usage"]:
container_capture_time = record["capture_time"]
container_mem_virt = record["virt"]
container_mem_res = record["res"]
container_mem_shr = record["shr"]
used_pmem = record["used_pmem"]
swap_cached = record["swap_cached"]
'''make list to create dataframe'''
record_index_list.insert(record_index,record_index)
container_name_list.insert(record_index,container_name)
capture_time_list.insert(record_index,container_capture_time)
container_mem_virt_list.insert(record_index,container_mem_virt)
container_mem_res_list.insert(record_index,container_mem_res)
container_mem_shr_list.insert(record_index,container_mem_shr)
used_pmem_list.insert(record_index,used_pmem)
swap_cached_list.insert(record_index,swap_cached)
record_index += 1
if not container_name:
raise Exception('Unexpected container name')
except IOError as e:
print('Could not read:', container_file, ':', e)
DataSet = list(zip(capture_time_list,container_name_list,container_mem_virt_list,
container_mem_res_list,container_mem_shr_list,used_pmem_list,swap_cached_list))
columns=[ 'timestamp','name','virt', 'res', 'shr', 'used_pmem','swap_cached']
DataFrame = pd.DataFrame(data = DataSet, index=record_index_list,columns=columns)
return DataFrame, fail_count
def load_snapshot_mem_profile(snapshot_folder):
snapshot_dataframe = pd.DataFrame()
snapshot_failcount = 0
for application_folder in snapshot_folder:
dataframe, fail_count = load_application_mem_profile(application_folder)
if snapshot_dataframe.empty:
snapshot_dataframe = dataframe
else:
snapshot_dataframe = snapshot_dataframe.append(dataframe)
snapshot_failcount = snapshot_failcount + fail_count
return snapshot_dataframe
snapshot_mem_profile = load_snapshot_mem_profile(clustersnapshot_folders)
snapshot_mem_profile
Out[10]:
In [11]:
def get_mem_actual_use(container_name, snapshot_dataframe, snapshot_mem_profile):
container_starttime = snapshot_dataframe.loc[container_name,'start']
container_endtime = snapshot_dataframe.loc[container_name,'end']
rescaled_container_starttime = container_starttime / 1000
rescaled_container_endtime = container_endtime / 1000
duration = rescaled_container_endtime - rescaled_container_starttime
container_index = np.linspace(rescaled_container_starttime, rescaled_container_endtime -1,duration)
data = np.linspace(0,0,duration)
container_data2 = pd.Series(data,index=pd.to_datetime(container_index,unit='s'))
mem_container_filtered = snapshot_mem_profile[(snapshot_mem_profile.name == container_name)]
index = mem_container_filtered.timestamp.tolist()
# get resident memory
mem = mem_container_filtered.res.tolist()
# get share memory
#mem = mem_container_filtered.shr.tolist()
# get virt memory
#mem = mem_container_filtered.virt.tolist()
container_data1 = pd.Series(mem,index=pd.to_datetime(index,unit='s'))
container_data3 = container_data1 + container_data2
container_data4 = container_data3.interpolate(method='time')
for index in container_data4.index:
if pd.isnull(container_data4[index]):
container_data4[index] = container_data4.mean()
if len(container_data4) > len(container_data2):
a = set(container_data4.index.tolist()) - set(container_data2.index.tolist())
print(a)
#print(container_data4)
#print(container_data3)
#print(container_data2.index)
#raise Exception('Return length mismatch')
return container_data4
# Filter to remove containers with no informaction of the node, or to select specific nodes
SnapshotDFFiltered = SnapshotDFSorted[(SnapshotDFSorted.node != 'null')]
for i in SnapshotDFFiltered.index:
a = get_mem_actual_use(i,snapshot_dataframe, snapshot_mem_profile)
a = [ x / (1024*1024*1024) for x in a ]
plt.plot(a)
In [12]:
# Filter to remove containers with no informaction of the node, or to select specific nodes
SnapshotDFFiltered = SnapshotDFSorted[(SnapshotDFSorted.node != 'null')]
# Filter to select specific nodes
#SnapshotDFFiltered = SnapshotDFFiltered[(SnapshotDFFiltered.node == 'svr15.spo:45454')]
# Filter to select specific apps
#SnapshotDFFiltered = SnapshotDFFiltered[(SnapshotDFFiltered.app == 'application_1456319042576_0007')]
# get list of application
application_list = SnapshotDFFiltered.app.unique().tolist()
# limit number of app to draw
# application_list = application_list[:1]
'''FOR CPU'''
SnapshotDFFiltered_group_node = SnapshotDFFiltered.groupby("node")
total_phy_CPU = SnapshotDFFiltered_group_node.n_CPU.mean().sum() #assume capacity has no change
total_rsrv_CPU = SnapshotDFFiltered_group_node.CPU_rsrv.mean().sum() #assume capacity has no change
print('Total physical cores in these nodes: %s cores' % total_phy_CPU)
print('Total virtual cores reserved for YARN in these nodes: %s cores' % total_rsrv_CPU)
# create vector zero
rescaled_snapshot_starttime = snapshot_starttime / 1000
rescaled_snapshot_endtime = snapshot_endtime / 1000
rescaled_snapshot_duration = rescaled_snapshot_endtime - rescaled_snapshot_starttime
time = np.linspace(rescaled_snapshot_starttime, rescaled_snapshot_endtime-1,rescaled_snapshot_duration)
zero_vector = pd.Series(np.linspace(0, 0,rescaled_snapshot_duration), index=time)
base_vector = zero_vector
phy_CPU = np.linspace(total_phy_CPU,total_phy_CPU,rescaled_snapshot_duration)
rsrv_CPU = np.linspace(total_rsrv_CPU,total_rsrv_CPU,rescaled_snapshot_duration)
phy_CPU = pd.Series(phy_CPU,index=time)
rsrv_CPU = pd.Series(rsrv_CPU,index=time)
fig = plt.figure(figsize=(12,8))
ax = fig.add_subplot(111)
ax.plot(phy_CPU,'-b')
ax.plot(rsrv_CPU,'-g')
# Allocated CPU
for application in application_list:
print('Processing %s' % (application))
for index in SnapshotDFFiltered.index:
if SnapshotDFFiltered.loc[index,'app'] == application:
# get CPU allocated for that container
container_starttime = SnapshotDFFiltered.loc[index,'start']
container_endtime = SnapshotDFFiltered.loc[index,'end']
rescaled_container_starttime = container_starttime / 1000
rescaled_container_endtime = container_endtime / 1000
container_duration = rescaled_container_endtime - rescaled_container_starttime
container_time_index = np.linspace(rescaled_container_starttime,rescaled_container_endtime-1,container_duration)
container_value = np.linspace(SnapshotDFFiltered.loc[index,'CPU'],
SnapshotDFFiltered.loc[index,'CPU'],
container_duration)
container_data = pd.Series(container_value, index=container_time_index)
# checking if all points in container data belongs to the snapshot timeframe
check = set(container_time_index) - set(time)
if len(check): #not empty
raise Exception("Error, data point's mismatch")
# matching the container data with the length of the snapshot
container_data = container_data + zero_vector
container_data = container_data.fillna(0)
# do stackplot
# create drawing vector based on base_vector
container_data = container_data + base_vector
base_vector = container_data # update base_vector for next iteration
#ax.plot(container_data,'-r')
ax.plot(base_vector,'-r')
# Actual Used CPU
base_vector = zero_vector
for application in application_list:
print('Processing %s' % (application))
for index in SnapshotDFFiltered.index:
if SnapshotDFFiltered.loc[index,'app'] == application:
# get CPU allocated for that container
container_starttime = SnapshotDFFiltered.loc[index,'start']
container_endtime = SnapshotDFFiltered.loc[index,'end']
rescaled_container_starttime = container_starttime / 1000
rescaled_container_endtime = container_endtime / 1000
container_duration = rescaled_container_endtime - rescaled_container_starttime
container_time_index = np.linspace(rescaled_container_starttime,rescaled_container_endtime-1,container_duration)
'''Get actual CPU used '''
actual_data_use = get_cpu_actual_use(index,snapshot_dataframe, snapshot_cpu_profile)
actual_data_use = actual_data_use.tolist()
'''because the return may have lengh longer, we generate the container_time_index again'''
container_time_index = np.linspace(rescaled_container_starttime,
rescaled_container_starttime+len(actual_data_use)-1,
len(actual_data_use))
container_data = pd.Series(actual_data_use, index=container_time_index)
# checking if all points in container data belongs to the snapshot timeframe
check = set(container_time_index) - set(time)
if len(check): #not empty
raise Exception("Error, data point's mismatch")
# matching the container data with the length of the snapshot
container_data = container_data + zero_vector
container_data = container_data.fillna(0)
# do stackplot
# create drawing vector based on base_vector
container_data = container_data + base_vector
base_vector = container_data # update base_vector for next iteration
#ax.plot(container_data,'-y')
ax.plot(base_vector,'-y')
'''FOR MEMORY'''
total_phy_mem = SnapshotDFFiltered_group_node.n_mem.mean().sum() #assume capacity has no change
total_rsrv_mem = SnapshotDFFiltered_group_node.mem_rsrv.mean().sum() #assume capacity has no change
# convert from KB to GB
total_phy_mem = float(total_phy_mem) / (1024*1024)
total_rsrv_mem = float(total_rsrv_mem) / (1024*1024)
print('Total physical memory in these nodes: %s GBs' % total_phy_mem)
print('Total memory reserved for YARN in these nodes: %s GBs' % total_rsrv_mem)
phy_mem = np.linspace(total_phy_mem,total_phy_mem,rescaled_snapshot_duration)
rsrv_mem = np.linspace(total_rsrv_mem,total_rsrv_mem,rescaled_snapshot_duration)
phy_mem = pd.Series(phy_mem,index=time)
rsrv_mem = pd.Series(rsrv_mem,index=time)
fig_mem = plt.figure(figsize=(12,8))
ax_mem = fig_mem.add_subplot(111)
ax_mem.plot(phy_mem,'-b')
ax_mem.plot(rsrv_mem,'-g')
base_vector = zero_vector # reset base vector
for application in application_list:
print('Processing %s' % (application))
for index in SnapshotDFFiltered.index:
if SnapshotDFFiltered.loc[index,'app'] == application:
# get mem allocated for that container
container_starttime = SnapshotDFFiltered.loc[index,'start']
container_endtime = SnapshotDFFiltered.loc[index,'end']
rescaled_container_starttime = container_starttime / 1000
rescaled_container_endtime = container_endtime / 1000
container_duration = rescaled_container_endtime - rescaled_container_starttime
container_time_index = np.linspace(rescaled_container_starttime,rescaled_container_endtime-1,container_duration)
mem = float(SnapshotDFFiltered.loc[index,'mem']) / (1024*1024)
container_value = np.linspace(mem,mem,container_duration)
container_data = pd.Series(container_value, index=container_time_index)
# checking if all points in container data belongs to the snapshot timeframe
check = set(container_time_index) - set(time)
if len(check): #not empty
raise Exception("Error, data point's mismatch")
# matching the container data with the length of the snapshot
container_data = container_data + zero_vector
container_data = container_data.fillna(0)
# do stackplot
# create drawing vector based on base_vector
container_data = container_data + base_vector
base_vector = container_data # update base_vector for next iteration
#ax_mem.plot(container_data,'-r')
ax_mem.plot(base_vector,'-r')
base_vector = zero_vector # reset base vector
for application in application_list:
print('Processing %s' % (application))
for index in SnapshotDFFiltered.index:
if SnapshotDFFiltered.loc[index,'app'] == application:
# get mem allocated for that container
container_starttime = SnapshotDFFiltered.loc[index,'start']
container_endtime = SnapshotDFFiltered.loc[index,'end']
rescaled_container_starttime = container_starttime / 1000
rescaled_container_endtime = container_endtime / 1000
container_duration = rescaled_container_endtime - rescaled_container_starttime
'''Get actual CPU used '''
actual_data_use = get_mem_actual_use(index,snapshot_dataframe, snapshot_mem_profile)
actual_data_use = actual_data_use.tolist()
actual_data_use = [x / (1024*1024*1024) for x in actual_data_use]
'''because the return may have lengh longer, we generate the container_time_index again'''
container_time_index = np.linspace(rescaled_container_starttime,
rescaled_container_starttime+len(actual_data_use)-1,
len(actual_data_use))
container_data = pd.Series(actual_data_use, index=container_time_index)
# checking if all points in container data belongs to the snapshot timeframe
check = set(container_time_index) - set(time)
if len(check): #not empty
raise Exception("Error, data point's mismatch")
# matching the container data with the length of the snapshot
container_data = container_data + zero_vector
container_data = container_data.fillna(0)
# do stackplot
# create drawing vector based on base_vector
container_data = container_data + base_vector
base_vector = container_data # update base_vector for next iteration
#ax_mem.plot(container_data,'-y')
ax_mem.plot(base_vector,'-y')
# Calculate free memory of the cluster during snapshot
Out[12]:
In [13]:
# Calculate free memory
In [ ]: