In [44]:
import functools
import sys
tasks = [
{
"name": "fast",
"interval": 0.4,
# do this if nothing else is running
"priority": 1,
"duration": 0.01,
"nice": 0.0
},
{
"name": "medium",
"interval": 0.8,
# do this if slow is not running
"priority": 2,
"duration": 0.01,
"nice": 0.0
},
{
"name": "slow",
"interval": 2,
# always do this
"priority": 3,
"duration": 0.5,
"nice": 0.0
}
]
T0 = 0
t = 0
def wait_time(task, elapsed):
"""how long to wait for the interval to occur again"""
interval = task['interval']
wait = interval - (elapsed % interval)
# we have to wait a bit longer to avoid floating point issues
wait += sys.float_info.epsilon
return wait
def dead_line(task, elapsed):
interval = task['interval']
wait = interval - (elapsed % interval)
wait += sys.float_info.epsilon
dead = wait + task['duration']
return dead
for i in range(20):
elapsed = t - T0
next_wait_time = functools.partial(wait_time, elapsed=elapsed)
# select the next task based on finish first
first_task = min(
tasks,
key=lambda x: next_wait_time(x) + x['duration']
)
# how long will this run?
first_stop = next_wait_time(first_task) + first_task['duration']
# see if this blocks any other tasks
overlapping_tasks = []
for task in tasks:
if next_wait_time(task) <= first_stop:
overlapping_tasks.append(task)
# select the task with the highest priority
selected_task = max(overlapping_tasks, key=lambda x: x['priority'])
# how long should we wait for it
selected_wait = next_wait_time(selected_task)
# TODO: give bonus to missed tasks
# wait for time to be executed
t += selected_wait
print('t: %.2f calling function %s' % (t, selected_task['name']))
# execute it
t += selected_task['duration']
# update duration estimate
In [47]:
t = 0
for i in range(20):
elapsed = t - T0
next_wait_time = functools.partial(wait_time, elapsed=elapsed)
next_dead_line = functools.partial(dead_line, elapsed=elapsed)
# select the next task based on finish first
first_task = min(
tasks,
key=lambda x: next_dead_line(x)
)
# how long will this run?
first_stop = next_dead_line(task)
selected_task = first_task
# TODO: give bonus to missed tasks
# wait for time to be executed
t += next_wait_time(selected_task)
print('t: %.2f calling function %s' % (t, selected_task['name']))
# execute it
t += selected_task['duration']
# update duration estimate
In [46]:
intervals = [task['interval'] for task in tasks]
min_interval = min(intervals)
fractions = [task['interval'] / min_interval for task in tasks]
fractions
import numpy as np
a = task['interval']/ min_interval
all([fraction.is_integer() for fraction in fractions])
Out[46]:
In [ ]: