In [1]:
import logging
reload(logging)
logging.basicConfig(
format='%(asctime)-9s %(levelname)-8s: %(message)s',
datefmt='%I:%M:%S')
# Enable logging at INFO level
logging.getLogger().setLevel(logging.INFO)
# Uncomment the following lines to enabled CGroups verbose logging
logging.getLogger('cgroups').setLevel(logging.INFO)
logging.getLogger('cgroups.cpuset').setLevel(logging.INFO)
In [2]:
import json
import operator
import devlib
import trappy
import bart
from bart.sched.SchedMultiAssert import SchedMultiAssert
from wlgen import RTA, Periodic
In [3]:
import os
os.environ['ANDROID_HOME'] = '/ext/android-sdk-linux/'
In [4]:
from env import TestEnv
my_conf = {
# # JUNO Linux
# "platform" : "linux",
# "board" : "juno",
# "host" : "192.168.0.1",
# "username" : "root",
# "password" : "",
# "exclude_modules" : ['hwmon'],
# JUNO Android
"platform" : "android",
"board" : "juno",
"host" : "192.168.0.1",
"exclude_modules" : ['hwmon'],
# RT-App calibration values
"rtapp-calib" : {
'0': 363, '1': 138, '2': 139, '3': 352, '4': 353, '5': 361
},
# List of additional devlib modules to install
"modules" : ['cgroups', 'bl', 'cpufreq'],
# List of additional binary tools to install
"tools" : ['rt-app', 'trace-cmd'],
# FTrace events to collect
"ftrace" : {
"events" : [
"sched_switch"
],
"buffsize" : 10240
}
}
te = TestEnv(my_conf)
target = te.target
# Report target connection
logging.info('Connected to %s target', target.abi)
print "DONE"
In [5]:
logging.info('%14s - Available controllers:', 'CGroup')
ssys = target.cgroups.list_subsystems()
for (n,h,g,e) in ssys:
logging.info('%14s - %10s (hierarchy id: %d) has %d cgroups',
'CGroup', n, h, g)
In [6]:
# Get a reference to the CPUSet controller
cpuset = target.cgroups.controller('cpuset')
In [7]:
# Get the list of current configured CGroups for that controller
cgroups = cpuset.list_all()
logging.info('Existing CGropups:')
for cg in cgroups:
logging.info(' %s', cg)
In [8]:
# Dump the configuraiton of each controller
for cgname in cgroups:
#print cgname
cgroup = cpuset.cgroup(cgname)
attrs = cgroup.get()
#print attrs
cpus = attrs['cpus']
logging.info('%s:%-15s cpus: %s', cpuset.kind, cgroup.name, cpus)
In [9]:
# Create a LITTLE partition
cpuset_littles = cpuset.cgroup('/LITTLE')
In [10]:
# Check the attributes available for this control group
print "LITTLE:\n", json.dumps(cpuset_littles.get(), indent=4)
In [11]:
# Tune CPUs and MEMs attributes
# they must be initialize for the group to be usable
cpuset_littles.set(cpus=target.bl.littles, mems=0)
print "LITTLE:\n", json.dumps(cpuset_littles.get(), indent=4)
In [12]:
# Define a periodic big (80%) task
task = Periodic(
period_ms=100,
duty_cycle_pct=80,
duration_s=5).get()
# Create one task per each CPU in the target
tasks={}
for tid in enumerate(target.core_names):
tasks['task{}'.format(tid[0])] = task
# Configure RTA to run all these tasks
rtapp = RTA(target, 'simple', calibration=te.calibration())
rtapp.conf(kind='profile', params=tasks, run_dir=target.working_directory);
In [13]:
# Test execution of all these tasks into the LITTLE cluster
trace = rtapp.run(ftrace=te.ftrace, cgroup=cpuset_littles.name, out_dir=te.res_dir)
In [14]:
# Check tasks residency on little clsuter
trappy.plotter.plot_trace(trace)
In [15]:
# Compute and visualize tasks residencies on LITTLE clusterh CPUs
s = SchedMultiAssert(trappy.Run(trace), te.topology, execnames="task")
residencies = s.getResidency('cluster', target.bl.littles, percent=True)
print json.dumps(residencies, indent=4)
In [16]:
# Assert that ALL tasks have always executed only on LITTLE cluster
s.assertResidency('cluster', target.bl.littles,
99.9, operator.ge, percent=True, rank=len(residencies))
Out[16]:
In [17]:
# Get a reference to the CPU controller
cpu = target.cgroups.controller('cpu')
In [18]:
# Create a big partition on that CPUS
cpu_littles = cpu.cgroup('/LITTLE')
In [19]:
# Check the attributes available for this control group
print "LITTLE:\n", json.dumps(cpu_littles.get(), indent=4)
In [20]:
# Set a 1CPU equivalent bandwidth for that CGroup
# cpu_littles.set(cfs_period_us=100000, cfs_quota_us=50000)
cpu_littles.set(shares=512)
print "LITTLE:\n", json.dumps(cpu_littles.get(), indent=4)
In [21]:
# Test execution of all these tasks into the LITTLE cluster
trace = rtapp.run(ftrace=te.ftrace, cgroup=cpu_littles.name)
In [22]:
# Check tasks residency on little clsuter
trappy.plotter.plot_trace(trace)