In [1]:
from tool import Package
from tool import Pipeline
from cross.cross import Cross
from cross.config import Config
from simpy import Environment
from random import choice
import pandas as pd
import matplotlib.pyplot as plt
import re
import reprlib
from collections import abc
from itertools import count, takewhile, filterfalse, accumulate
import itertools
import functools


class CrossTestConfig(Config):
    # RANDOM_SEED = 57
    # NUM_PACKAGES = 100
    # INTERVAL_TIME = 10
    TYPE_PIP_LINE = Pipeline
    # ==========================测试机配置参数===================================
    # 本次杭州仿真模为一个入口队列一个机器
    # 测试机器ID列表
    ID_TEST_MACHINE = ['test1', 'test2']
    # 测试机器出端口id列表
    ID_NEXT_MACHINE = ['next_1']
    # 测试机器资源
    TEST_MACHINE_RESOURCE = 0  # 如果测试机器内部无资源调用,设置为0,否则设置资源数(如人力)
    # 测试机单资源处理时延
    PROCESS_TIME = None  # 如果测试机器没有处理货物延时,设置为None,否则设置为对应延时
    

class LogicTest(object):
    """"""
    def __init__(self, env, test_cls, config):
        self.env = env
        self.test_cls = test_cls
        self.config = config
        self.pipline = {}
        self.pipline_dic = {}
        self.path = []
    
    def _path_gen(self):
        """"""
        for o in self.config.ID_LAST_MACHINE:
            tmp1 = []
            for t in self.config.ID_TEST_MACHINE:
                tmp1.append((o, t))  # (''.join([o,'_', t]))
                tmp2 = []
                for d in self.config.ID_NEXT_MACHINE:
                    tmp2.append((t, d))  # (''.join([t, '_', d]))
                    self.path.append((o, t, d))
                self.pipline_dic[t] = tmp2
            self.pipline_dic[o] = tmp1
    
    def _pipline_generator(self):
        for v in self.pipline_dic.values():
            self.pipline.update({it: self.config.TYPE_PIP_LINE for it in v })
        
    def generator(self):
        """"""
        self._path_gen()
        self._pipline_generator()
    
    def packages_generator(self):
        """"""
        for num in range(self.config.NUM_PACKAGES):
            yield self.env.timeout(self.config.INTERVAL_TIME)
            package = Package(self.env, None, num, choice(self.path))
            package.pop_mark()
            # print(package.next_pipeline)
            self.pipline[package.next_pipeline](self.env, 10, package.next_pipeline).put(package)
    
    def get_input_pip_line(self, id_test_machine):
        """"""
        if_find = False
        for gi in self.config.ID_LAST_MACHINE:
            for pip in self.pipline_dic[gi]:
                if pip[1] == id_test_machine:
                    if_find = True
                    return pip
                else:
                    continue
        if not if_find:
            raise ValueError('test machine id error!')
    
    def get_pipelines_dict(self):
        """"""
        return self.pipline_dic
    
    def get_reusource_dic(self):
        """"""
        return {id: self.config.TEST_MACHINE_RESOURCE for id in self.config.ID_TEST_MACHINE}
        
if __name__ == '__main__':
    env = Environment()
    t1 = LogicTest(env, Cross, CrossTestConfig)
    t1.generator()
    env.process(t1.packages_generator())
    p1 = t1.pipline
    p2 = t1.pipline_dic
    p3 = t1.path
    r1 = t1.get_reusource_dic()
    for tid in t1.config.ID_TEST_MACHINE:
        pid = t1.get_input_pip_line(tid)
        input_pip_line = t1.pipline[pid](env, 10, pid)
        t1.test_cls(env=env, 
                    machine_id=tid,
                    equipment_id = tid,
                    input_pip_line=input_pip_line,
                    pipelines_dict=t1.get_pipelines_dict, 
                    resource_dict=t1.get_reusource_dic())
        
    env.run(until= 100)

In [4]:
pd.DataFrame(p3)


Out[4]:
0 1 2
0 package_generator test1 next_1
1 package_generator test2 next_1

In [11]:
p1


Out[11]:
{('package_generator', 'test1'): tool.items.Pipeline,
 ('package_generator', 'test2'): tool.items.Pipeline,
 ('test1', 'next_1'): tool.items.Pipeline,
 ('test2', 'next_1'): tool.items.Pipeline}

In [4]:
p2


Out[4]:
{'package_generator': [('package_generator', 'test1'),
  ('package_generator', 'test2')],
 'test1': [('test1', 'next_1')],
 'test2': [('test2', 'next_1')]}

In [55]:
from datetime import datetime
datetime.strptime('2017-09-11 03:42:22', '%Y-%m-%d %H:%M:%S')


Out[55]:
datetime.datetime(2017, 9, 11, 3, 42, 22)

In [56]:
def gen_concatenate(iterators):
    '''
    Chain a sequence of iterators together into a single sequence.
    '''
    for it in iterators:
        yield from it

In [60]:
from functools import partial
import math

points = [ (1, 2), (3, 4), (5, 6), (7, 8), (0, 0), (-1, -1) ]
pt = (0, 0)

def distance(p1, p2):
    x1, y1 = p1
    x2, y2 = p2
    return math.hypot(x2 - x1, y2 - y1)

points.sort(key=partial(distance, pt), reverse=True);points


Out[60]:
[(7, 8), (5, 6), (3, 4), (1, 2), (-1, -1), (0, 0)]

In [2]:
def xx(x):
    return x*x

print([x for x in map(xx, range(10))])


[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]

http url open


In [59]:
from urllib.request import urlopen

def urltemplate(template):
    def opener(**kwargs):
        return urlopen(template.format_map(kwargs))
    return opener

# Example use
yahoo = urltemplate('http://finance.yahoo.com/d/quotes.csv?s={names}&f={fields}')
for line in yahoo(names='IBM,AAPL,FB,CI', fields='sl1c1v'):
    print(line.decode('utf-8'))


"IBM",145.8110,+0.6814,4000231

"AAPL",150.578,-1.312,41329657

"FB",162.8200,-7.7204,38650871

"CI",181.46,-1.28,787922

回调函数


In [15]:
# from multiprocessing import 

def add(a,b):
    return a+b

def send_handler():
    accend = 0
    def handle_result(result):
        nonlocal accend
        accend += 1
        print('[{}] Got {}'.format(accend, result))
    return handle_result

apply_an


Out[15]:
3

In [2]:
res1 = [x*3 for x in gen_ab()]
res1


---------------------------------------------------------------------------
ImportError                               Traceback (most recent call last)
<ipython-input-2-81f2c1c2062c> in <module>()
      1 from queue import Queue
      2 from functools import wraps
----> 3 from multiprocessing import apply_async
      4 
      5 class Async:

ImportError: cannot import name 'apply_async'
Out[2]:
['AAA', 'BBB']

In [165]:
res2 = (x*3 for x in gen_ab())
[x for x in res2]


begin
next
end
Out[165]:
['AAA', 'BBB']

In [95]:
s = 'abcd,efgsaf,end,dsa===='
type(s)
# while True:
#     try:
#         print(next(it))
#     except StopIteration:
#         del it
#         break

# isinstance(it, abc.Iterator)


Out[95]:
str

In [2]:
def gen_con(begin, step, end):
    first = type(begin + step)(begin)
    ap_gen = count(first, step)
    if end is not None:
        ap_gen = takewhile(lambda n : n<end, ap_gen)
    return ap_gen

In [21]:
g1 = (x for x in gen_con(1, .5, 5))
isinstance(g1, abc.Iterable)
c= 'Aardvarkuueee'
i = filterfalse(lambda x : x.lower() in 'aeiou', c)
[x for x in i]


Out[21]:
['r', 'd', 'v', 'r', 'k']

In [5]:
from random import randint

def rand_limit():
    return randint(1, 5)

d6_it = iter(rand_limit, 1)
[x for x in d6_it]


Out[5]:
[4, 5, 2, 4, 2, 2, 2, 3, 3]

In [48]:
a = [1,3,4]
a[:] = [3,4]
a


Out[48]:
[3, 4]

In [4]:
from contextlib import contextmanager
@contextmanager
def list_transaction(orig_list):
    working = list(orig_list)
    yield working
    orig_list[:] = working
    
item = [1,2,3]

with list_transaction(item) as w:
    w.append(1)
    w.append(1)
    w.append(1)
    try:
        raise RuntimeError('oops')
    except RuntimeError:
        pass
    

item


Out[4]:
[1, 2, 3, 1, 1, 1]

In [63]:
from collections import namedtuple

day_name = namedtuple('DAY', 'day, hour, min')
day_name

In [54]:
from datetime import datetime

t1 = datetime.now()
datetime.strptime('17-09-26', '%y-%m-%d')


Out[54]:
datetime.datetime(2017, 9, 26, 0, 0)

In [71]:
from simpy import Environment
from collections.abc import Generator
evn = Environment()
def gen():
    yield
    
env1 = env.process(gen())
isinstance(env, Generator)


Out[71]:
False

In [7]:
for x in range(7):
    print(x)
    if x ==8:
        break
else:
    print('endQ')


0
1
2
3
4
5
6
endQ

In [8]:
from collections import defaultdict

dic1 = defaultdict(list)
dic1['a'].append(['a', 'b'])
list1 = []
list1.extend([3,4,5,])
list1.extend([6,7,8])
list1


Out[8]:
[3, 4, 5, 6, 7, 8]

In [49]:
from functools import wraps
registry = []

def register(leve=None):
    print(leve)
    def _wraps(func):
        @wraps(func)
        def _register(*arg, **kwargs):
#             print('running register(%s)' % func)
            registry.append(func)
            return func(*arg, **kwargs)
        return _register
    return _wraps

@register(leve='int')
def f1():
    print('running f1()')
    return 'zz'

@register(leve='float')
def f2():
    print('running f2()')
    return 'yy'
def f3():
    print('running f3()')
    pass

def main():
    lst = []
    for _ in range(2):
        re = f1()
        lst.append(re)
    return lst
x = main()

print(x, '\n', registry)


int
float
running f1()
running f1()
['zz', 'zz'] 
 [<function f1 at 0x00000265411666A8>, <function f1 at 0x00000265411666A8>]

In [6]:
from collections import namedtuple
Event = namedtuple('Even', 'ok value')

event1 = Event._make([True, '400'])

class StopSimulation(Exception):
    """Indicates that the simulation should stop now."""

    @classmethod
    def callback(cls, event):
        """Used as callback in :meth:`BaseEnvironment.run()` to stop the
        simulation when the *until* event occurred."""
        if event.ok:
            return cls(event.value)
        else:
            return event.value

StopSimulation.callback(event1)


Out[6]:
__main__.StopSimulation('400')

In [27]:
it1 = iter('abcd')
#

In [65]:
def _n_con(n):
    while n > 0:
        c = yield n
        print(f'I have receive {c} sir!')
        n -=1
def _com_n_con(n):
    yield from _n_con(n)
    
nc =_com_n_con(5)

In [66]:
next(nc)


Out[66]:
5

数据处理管道

  • yield ; yield from

In [117]:
import os
import fnmatch
import re


top_dir = 'D:\Git\python\python_test\cook_book_sim'
def file_dir_gen(pat, top):
    for path, dirlist, filelist in os.walk(top):
        for f in fnmatch.filter(filelist, pat):
            yield os.path.join(path, f)

            
def file_open_gen(filename):
    for file in filename:
        if file:
            f = open(file, 'rt')
        yield f
        f.close()


def gen_concatenate(iterator):
    for it in iterator:
        yield from it

        
def line_pat_gen(pat, lines):
    """"""
    pat = re.compile(pat)
    for line in lines:
        if pat.search(line):
            yield line

In [124]:
files = file_dir_gen(pat='*.log', top=top_dir)
files_h = file_open_gen(files)
lines = gen_concatenate(files_h)
# for line in line_pat_gen('<sim>', lines):
#     print(line)

In [128]:
import pickle

data1 = {'a': [1, 2.0, 3, 4+6j],
         'b': ('string', u'Unicode string'),
         'c': None}

selfref_list = [1, 2, 3]
selfref_list.append(selfref_list)

output = open('data.pkl', 'wb')

pickle.dump(obj=data1, file=output)

pickle.dump(obj=selfref_list, file=output, protocol=-1)

output.close()

In [133]:
import pprint

pkl_file = open('data.pkl', 'rb')

pkdata = pickle.load(pkl_file)

print(pkdata)


{'a': [1, 2.0, 3, (4+6j)], 'b': ('string', 'Unicode string'), 'c': None}