In [116]:
from functools import wraps
from collections import namedtuple
import random
# import numpy as np
import logging as Log
from inspect import Signature, Parameter
from inspect import getgeneratorstate
Log.basicConfig(format='%(asctime)s %(message)s', level=Log.INFO)
In [114]:
def coroutine_next(action):
if action:
Log.info(f'action is : {action}')
def _wraps_next(func):
@wraps(func)
def next_gen(*args, **kwargs):
gen = func(*args, **kwargs)
next(gen) # 触发协程的开始
return gen
return next_gen
return _wraps_next
@coroutine_next('go')
def averager_gen():
total = 0.0
con = 0
ave = None
try:
while True:
try:
temp = yield ave
total += temp
con += 1
ave = total/con
Log.info(f'{total}')
except StopIteration:
Log.info('Inner Stop !')
except StopIteration:
Log.info('Outer Stop !')
finally:
Log.info(f'{total}End!')
ave1 = averager_gen()
In [113]:
# next(ave1)
ave1.send(10)
# next(ave1)
getgeneratorstate(ave1)
# ss = zip(range(10, 100, 10), [ave1.send(ave) for ave in range(10, 100, 10)])
# ave1.close()
# getgeneratorstate(ave1), list(ss)
Out[113]:
In [119]:
Result = namedtuple('Result', 'count average')
data = {
'girls:kg': [
random.uniform(40, 60) for _ in range(10)
],
'girls:m': [
random.uniform(1.5, 1.8) for _ in range(10)
],
'boys:kg': [
random.uniform(60, 90) for _ in range(10)
],
'boys:m': [
random.uniform(1.6, 1.9) for _ in range(10)
]
}
# @coroutine_next('gen')
def averager_return():
total = 0.0
con = 0
ave = None
try:
while True:
try:
temp = yield
if temp is None:
break
total += temp
con += 1
ave = total/con
except StopIteration:
Log.info('Outer Stop !')
finally:
Log.info('End!')
return Result(con, ave)
# @coroutine_next('next')
def grouper(result, key):
while True:
result[key] = yield from averager_return()
def report_log(data: dict=None):
if data:
for k, v in sorted(data.items()):
group, unit = k.split(':')
Log.info(f'{v.count:<3} {group:<5} averaging {v.average:.2f}{unit}')
def main(data):
result = {}
for k, v in data.items():
Log.info(f'nmu: {k}')
gen_g = grouper(result, k)
next(gen_g)
# values iterators
for value in v:
# send value to averager_return temp
gen_g.send(value)
# send None to end while
gen_g.send(None)
report_log(result)
In [120]:
main(data=data)
In [97]:
str1 = 'chois'
f'abd {str1:^9} name'
# a = iter('dsafdsa')
Out[97]:
In [ ]: