In [1]:
from tool import Package
from tool import Pipeline
from cross.cross import Cross
from cross.config import Config
from simpy import Environment
from random import choice
import pandas as pd
import matplotlib.pyplot as plt
import re
import reprlib
from collections import abc
from itertools import count, takewhile, filterfalse, accumulate
import itertools
import functools
class CrossTestConfig(Config):
# RANDOM_SEED = 57
# NUM_PACKAGES = 100
# INTERVAL_TIME = 10
TYPE_PIP_LINE = Pipeline
# ==========================测试机配置参数===================================
# 本次杭州仿真模为一个入口队列一个机器
# 测试机器ID列表
ID_TEST_MACHINE = ['test1', 'test2']
# 测试机器出端口id列表
ID_NEXT_MACHINE = ['next_1']
# 测试机器资源
TEST_MACHINE_RESOURCE = 0 # 如果测试机器内部无资源调用,设置为0,否则设置资源数(如人力)
# 测试机单资源处理时延
PROCESS_TIME = None # 如果测试机器没有处理货物延时,设置为None,否则设置为对应延时
class LogicTest(object):
""""""
def __init__(self, env, test_cls, config):
self.env = env
self.test_cls = test_cls
self.config = config
self.pipline = {}
self.pipline_dic = {}
self.path = []
def _path_gen(self):
""""""
for o in self.config.ID_LAST_MACHINE:
tmp1 = []
for t in self.config.ID_TEST_MACHINE:
tmp1.append((o, t)) # (''.join([o,'_', t]))
tmp2 = []
for d in self.config.ID_NEXT_MACHINE:
tmp2.append((t, d)) # (''.join([t, '_', d]))
self.path.append((o, t, d))
self.pipline_dic[t] = tmp2
self.pipline_dic[o] = tmp1
def _pipline_generator(self):
for v in self.pipline_dic.values():
self.pipline.update({it: self.config.TYPE_PIP_LINE for it in v })
def generator(self):
""""""
self._path_gen()
self._pipline_generator()
def packages_generator(self):
""""""
for num in range(self.config.NUM_PACKAGES):
yield self.env.timeout(self.config.INTERVAL_TIME)
package = Package(self.env, None, num, choice(self.path))
package.pop_mark()
# print(package.next_pipeline)
self.pipline[package.next_pipeline](self.env, 10, package.next_pipeline).put(package)
def get_input_pip_line(self, id_test_machine):
""""""
if_find = False
for gi in self.config.ID_LAST_MACHINE:
for pip in self.pipline_dic[gi]:
if pip[1] == id_test_machine:
if_find = True
return pip
else:
continue
if not if_find:
raise ValueError('test machine id error!')
def get_pipelines_dict(self):
""""""
return self.pipline_dic
def get_reusource_dic(self):
""""""
return {id: self.config.TEST_MACHINE_RESOURCE for id in self.config.ID_TEST_MACHINE}
if __name__ == '__main__':
env = Environment()
t1 = LogicTest(env, Cross, CrossTestConfig)
t1.generator()
env.process(t1.packages_generator())
p1 = t1.pipline
p2 = t1.pipline_dic
p3 = t1.path
r1 = t1.get_reusource_dic()
for tid in t1.config.ID_TEST_MACHINE:
pid = t1.get_input_pip_line(tid)
input_pip_line = t1.pipline[pid](env, 10, pid)
t1.test_cls(env=env,
machine_id=tid,
equipment_id = tid,
input_pip_line=input_pip_line,
pipelines_dict=t1.get_pipelines_dict,
resource_dict=t1.get_reusource_dic())
env.run(until= 100)
In [4]:
pd.DataFrame(p3)
Out[4]:
In [11]:
p1
Out[11]:
In [4]:
p2
Out[4]:
In [55]:
from datetime import datetime
datetime.strptime('2017-09-11 03:42:22', '%Y-%m-%d %H:%M:%S')
Out[55]:
In [56]:
def gen_concatenate(iterators):
'''
Chain a sequence of iterators together into a single sequence.
'''
for it in iterators:
yield from it
In [60]:
from functools import partial
import math
points = [ (1, 2), (3, 4), (5, 6), (7, 8), (0, 0), (-1, -1) ]
pt = (0, 0)
def distance(p1, p2):
x1, y1 = p1
x2, y2 = p2
return math.hypot(x2 - x1, y2 - y1)
points.sort(key=partial(distance, pt), reverse=True);points
Out[60]:
In [2]:
def xx(x):
return x*x
print([x for x in map(xx, range(10))])
In [59]:
from urllib.request import urlopen
def urltemplate(template):
def opener(**kwargs):
return urlopen(template.format_map(kwargs))
return opener
# Example use
yahoo = urltemplate('http://finance.yahoo.com/d/quotes.csv?s={names}&f={fields}')
for line in yahoo(names='IBM,AAPL,FB,CI', fields='sl1c1v'):
print(line.decode('utf-8'))
In [15]:
# from multiprocessing import
def add(a,b):
return a+b
def send_handler():
accend = 0
def handle_result(result):
nonlocal accend
accend += 1
print('[{}] Got {}'.format(accend, result))
return handle_result
apply_an
Out[15]:
In [2]:
res1 = [x*3 for x in gen_ab()]
res1
Out[2]:
In [165]:
res2 = (x*3 for x in gen_ab())
[x for x in res2]
Out[165]:
In [95]:
s = 'abcd,efgsaf,end,dsa===='
type(s)
# while True:
# try:
# print(next(it))
# except StopIteration:
# del it
# break
# isinstance(it, abc.Iterator)
Out[95]:
In [2]:
def gen_con(begin, step, end):
first = type(begin + step)(begin)
ap_gen = count(first, step)
if end is not None:
ap_gen = takewhile(lambda n : n<end, ap_gen)
return ap_gen
In [21]:
g1 = (x for x in gen_con(1, .5, 5))
isinstance(g1, abc.Iterable)
c= 'Aardvarkuueee'
i = filterfalse(lambda x : x.lower() in 'aeiou', c)
[x for x in i]
Out[21]:
In [5]:
from random import randint
def rand_limit():
return randint(1, 5)
d6_it = iter(rand_limit, 1)
[x for x in d6_it]
Out[5]:
In [48]:
a = [1,3,4]
a[:] = [3,4]
a
Out[48]:
In [4]:
from contextlib import contextmanager
@contextmanager
def list_transaction(orig_list):
working = list(orig_list)
yield working
orig_list[:] = working
item = [1,2,3]
with list_transaction(item) as w:
w.append(1)
w.append(1)
w.append(1)
try:
raise RuntimeError('oops')
except RuntimeError:
pass
item
Out[4]:
In [63]:
from collections import namedtuple
day_name = namedtuple('DAY', 'day, hour, min')
day_name
In [54]:
from datetime import datetime
t1 = datetime.now()
datetime.strptime('17-09-26', '%y-%m-%d')
Out[54]:
In [71]:
from simpy import Environment
from collections.abc import Generator
evn = Environment()
def gen():
yield
env1 = env.process(gen())
isinstance(env, Generator)
Out[71]:
In [7]:
for x in range(7):
print(x)
if x ==8:
break
else:
print('endQ')
In [8]:
from collections import defaultdict
dic1 = defaultdict(list)
dic1['a'].append(['a', 'b'])
list1 = []
list1.extend([3,4,5,])
list1.extend([6,7,8])
list1
Out[8]:
In [49]:
from functools import wraps
registry = []
def register(leve=None):
print(leve)
def _wraps(func):
@wraps(func)
def _register(*arg, **kwargs):
# print('running register(%s)' % func)
registry.append(func)
return func(*arg, **kwargs)
return _register
return _wraps
@register(leve='int')
def f1():
print('running f1()')
return 'zz'
@register(leve='float')
def f2():
print('running f2()')
return 'yy'
def f3():
print('running f3()')
pass
def main():
lst = []
for _ in range(2):
re = f1()
lst.append(re)
return lst
x = main()
print(x, '\n', registry)
In [6]:
from collections import namedtuple
Event = namedtuple('Even', 'ok value')
event1 = Event._make([True, '400'])
class StopSimulation(Exception):
"""Indicates that the simulation should stop now."""
@classmethod
def callback(cls, event):
"""Used as callback in :meth:`BaseEnvironment.run()` to stop the
simulation when the *until* event occurred."""
if event.ok:
return cls(event.value)
else:
return event.value
StopSimulation.callback(event1)
Out[6]:
In [27]:
it1 = iter('abcd')
#
In [65]:
def _n_con(n):
while n > 0:
c = yield n
print(f'I have receive {c} sir!')
n -=1
def _com_n_con(n):
yield from _n_con(n)
nc =_com_n_con(5)
In [66]:
next(nc)
Out[66]:
In [117]:
import os
import fnmatch
import re
top_dir = 'D:\Git\python\python_test\cook_book_sim'
def file_dir_gen(pat, top):
for path, dirlist, filelist in os.walk(top):
for f in fnmatch.filter(filelist, pat):
yield os.path.join(path, f)
def file_open_gen(filename):
for file in filename:
if file:
f = open(file, 'rt')
yield f
f.close()
def gen_concatenate(iterator):
for it in iterator:
yield from it
def line_pat_gen(pat, lines):
""""""
pat = re.compile(pat)
for line in lines:
if pat.search(line):
yield line
In [124]:
files = file_dir_gen(pat='*.log', top=top_dir)
files_h = file_open_gen(files)
lines = gen_concatenate(files_h)
# for line in line_pat_gen('<sim>', lines):
# print(line)
In [128]:
import pickle
data1 = {'a': [1, 2.0, 3, 4+6j],
'b': ('string', u'Unicode string'),
'c': None}
selfref_list = [1, 2, 3]
selfref_list.append(selfref_list)
output = open('data.pkl', 'wb')
pickle.dump(obj=data1, file=output)
pickle.dump(obj=selfref_list, file=output, protocol=-1)
output.close()
In [133]:
import pprint
pkl_file = open('data.pkl', 'rb')
pkdata = pickle.load(pkl_file)
print(pkdata)