Decorator

Basic concept

基本的 decorator 概念就是把 function 當作參數丟入另一個 function


In [1]:
def deco(func):
    def wrapped():
        print('Before func')
        func()
        print('After func')
    return wrapped

@deco
def f1():
    print('This is f1 function')

f1()


Before func
This is f1 function
After func

Problem 1 : lost original attribute

當我們用 decorator 把原本的 function 包起來的時候, 以上面的程式碼為例, 呼叫 f1() 其實是執行 wrapped() 這時候如果我們希望取得跟 f1 本身有關的屬性就存取不到了, 取而代之的會存取到 wrapped() 的參數


In [2]:
def deco(func):
    def wrapped():
        print('Before func')
        func()
        print('After func')
    return wrapped

@deco
def f1():
    print('This is f1 function')
    
print(f1.__name__)


wrapped

Solution 1: Using the build-in functools.wraps

可以透過內建的 functolls.wraps 把原本 function 屬性保留下來


In [3]:
from functools import wraps

def deco(func):
    @wraps(func)
    def wrapped():
        print('Before func')
        func()
        print('After func')
    return wrapped

@deco
def f1():
    print('This is f1 function')
    
print(f1.__name__)


f1

Supplement: pass argument with decorator

如果執行 function 需要參數, 記得在 decorator 裡包裝 function 的時候也一並要傳入


In [4]:
import time
from functools import wraps

def deco(func):
    @wraps(func)
    def wrapped(*args, **kwargs):
        print('Before func')
        print('---')
        print('{} argument:\nargs = {}\nkwargs = {}\n'.format(
            func.__name__, args, kwargs
        ))
        func(*args, **kwargs)
        print('---')
        print('After func')
    return wrapped

@deco
def f1(*args, **kwargs):
    print('This is f1 function')
    
f1(1, '2', [3], {'4': 4}, time=time.ctime())


Before func
---
f1 argument:
args = (1, '2', [3], {'4': 4})
kwargs = {'time': 'Wed Sep 13 13:36:58 2017'}

This is f1 function
---
After func

Decorator with argument

前面在使用 decorator 的時候都是直接加上 @ + decorator name

但你是否有想過既然他是個包裝 function 的 function, 那我們是不是也可以像一般 functino 那樣給予其他的值?


In [5]:
from functools import wraps

def tag(name):
    def deco(func):
        @wraps(func)
        def wrapped(*args, **kwargs):
            print('<{}>'.format(name))
            func(*args, **kwargs)
            print('</{}>'.format(name))
        return wrapped
    return deco

@tag('p')
def content(*args, **kwargs):
    for i in args:
        print(i)

content('Hello World.', 'This is second argument.')


<p>
Hello World.
This is second argument.
</p>

Decorator assignment

Python 裏面所有的東西都是 Object, 既然是這樣那 decorator 應該也可以存成其他變數名稱並且正常使用


In [6]:
from functools import wraps

def tag(name):
    def deco(func):
        @wraps(func)
        def wrapped(*args, **kwargs):
            print('<{}>'.format(name))
            func(*args, **kwargs)
            print('</{}>'.format(name))
        return wrapped
    return deco

tag_p = tag('p')

@tag_p
def content(*args, **kwargs):
    for i in args:
        print(i)

content('Hello World.', 'This is second argument.')


<p>
Hello World.
This is second argument.
</p>

Decorator assignment (2)

既然 decorator 是包裝其他 function, 那 decorator 應該也可以包裝 decorator ?


In [7]:
from functools import wraps

def tag(name):
    def deco(func):
        @wraps(func)
        def wrapped(*args, **kwargs):
            print('<{}>'.format(name))
            func(*args, **kwargs)
            print('</{}>'.format(name))
        return wrapped
    return deco

tag_div = tag('div')
tag_p = tag('p')

@tag_div
@tag_p
def content(*args, **kwargs):
    for i in args:
        print(i)

content('Hello World.', 'This is second argument.')


<div>
<p>
Hello World.
This is second argument.
</p>
</div>

Decorator scenario

Case 1: Profiling Log

可以透過 decorator 寫出漂亮的 Pythonic code (e.g. link)

Task: 重複操作「紀錄當下時間為起始時間」->「執行 function」->「當下時間減掉起始時間為 function 耗時時間」

搭配 logging 還可以設定 log level 為 DEBUG, 這樣可以非常輕鬆的 enable/ disable 這些訊息


In [8]:
import time
import logging
from functools import wraps


LOGGER = logging.getLogger(__name__)

def func_profiling(func):
    @wraps(func)
    def wrapped(*args, **kwargs):
        start_time = time.time()
        result = func(*args, **kwargs)
        time_spent = time.time() - start_time
        fullname = '{}.{}'.format(func.__module__, func.__name__)
        LOGGER.debug('{}[args={}, kwargs={}] completed in {}'.format(
            fullname, args, kwargs, time_spent
        ))
        return result
    return wrapped

@func_profiling
def test_func_profiling(msg=None):
    import random
    sleep_sec = random.randrange(1,3)
    LOGGER.debug('random sleep in {} sec'.format(sleep_sec))
    time.sleep(sleep_sec)
    LOGGER.info(msg)
    LOGGER.debug('Wake up')

if __name__ == '__main__':
    """testing"""
    import sys
    logging.basicConfig(
        level=logging.DEBUG,
        format='%(asctime)s %(filename)12s:L%(lineno)3s [%(levelname)8s] %(message)s',
        datefmt='%Y-%m-%d %H:%M:%S',
        stream=sys.stdout
        )
    test_func_profiling('Hello World')


2017-09-13 13:37:10 <ipython-input-8-08c0ce2f790b>:L 25 [   DEBUG] random sleep in 1 sec
2017-09-13 13:37:11 <ipython-input-8-08c0ce2f790b>:L 27 [    INFO] Hello World
2017-09-13 13:37:11 <ipython-input-8-08c0ce2f790b>:L 28 [   DEBUG] Wake up
2017-09-13 13:37:11 <ipython-input-8-08c0ce2f790b>:L 16 [   DEBUG] __main__.test_func_profiling[args=('Hello World',), kwargs={}] completed in 1.0032951831817627

Case 2: Top function exception

如果你是一位有良心的工程師, 在你產品裏面應該會留有你的聯絡方式, 讓你的顧客或是交接的人在遇到 exception 的時候可以聯絡的到你


In [9]:
from functools import wraps

def author(email):
    def sub_command(func):
        @wraps(func)
        def wrapped(*args, **kwargs):
            try:
                func(*args, **kwargs)
            except ValueError:
                '''you can use your own defined exception'''
                print('some useful message to debug')
            except Exception:
                print('Unexpected exception, please contant author: {}'.format(email))
        return wrapped
    return sub_command

@author('afun@example.com')
def divide(a, b):
    print('{}/{} = {}'.format(a, b, a/b))
    return a/b

divide(6, 2)
divide(6, 0)


6/2 = 3.0
Unexpected exception, please contant author: afun@example.com

Case 3: Least recently used cache

當我們的 function output 是可預期的 (同樣的 input 會有同樣的 output), 可以透過 lru_cache 把結果儲存起來, 下一次遇到同樣的 input 時就可以直接從 cache 取出結果而不用重新計算一次

詳細細節可以參考文件


In [10]:
from functools import lru_cache

@lru_cache()
def heavy_jobs(x):
    print('do some heavy jobs with input {}'.format(x))
    return x+1000

print(heavy_jobs(1))
print(heavy_jobs(1))
print(heavy_jobs(2))
print(heavy_jobs(1))
print(heavy_jobs(2))


do some heavy jobs with input 1
1001
1001
do some heavy jobs with input 2
1002
1001
1002

Case 4: Cache to json file

對於快速的 debug 非常有幫助, 會把 exception 內容都存成 json

Hint: 這邊 demo 的時候可以操作連續兩次看哪裡不同, 試著觸發 wrapped 內的 exception


In [11]:
import json
from functools import wraps

def cache_json(filename):
    def deco(func):
        @wraps(func)
        def wrapped(*args, **kwargs):
            try:
                print('Before try wrapped')
                return json.load(open(filename))
            except FileNotFoundError:
                print('Before except wrapped')
                data = func(*args, **kwargs)
                json.dump(data, open(filename, 'w'))
                return data
        return wrapped
    return deco

@cache_json('heavy.json')
def heavy_jobs(*args, **kwargs):
    print('do heavy jobs')
    if 'count' in kwargs:
        return kwargs['count']
    return

print(heavy_jobs(user='afun', count=5))


Before try wrapped
Before except wrapped
do heavy jobs
5

Decorator trick

以上面那些 function 為例, 其實在 wrapped 之前可以做些額外的操作

Function map


In [12]:
from functools import wraps

function_map = {}

def deco(func):
    global function_map
    function_map[func.__name__] = func
    
    @wraps(func)
    def wrapped(*args, **kwargs):
        func(*args, **kwargs)
    return wrapped

@deco
def f1():
    print('This is f1')

@deco
def f2():
    print('This is f2')

print(function_map)


{'f2': <function f2 at 0x7f3c1704a400>, 'f1': <function f1 at 0x7f3c1050d730>}

Flask

Flask 是一種 light-weight 的 python web framework, 因為環境沒有裝我也沒有真正使用過所以就只展示程式碼


In [ ]:
from flask import Flask

app = Flask(__name__)

@app.route('/')
def index():
    return 'Index page'

@app.route('/hello')
def hello():
    return 'Hello, world!'

@app.route('/post/<int:post_id>')
def show_post(post_id):
    return 'Post {}'.format(post_id)

Context manager

Basic concept

所謂 context 就是上下文的意思, 跟 decorator 同樣的地方是他可以在執行跟結束 function 的時候有額外操作


In [13]:
class my_context(object):
    def __enter__(self):
        print('in enter')
        return 'enter'
    def __exit__(self, *excinfo):
        print('in exit')
        return 'exit'

with my_context() as f:
    print('Hello')
    print('World')


in enter
Hello
World
in exit

上面 with 的使用方式同等於下面列的程式碼


In [14]:
context = my_context()
obj = context.__enter__()
try:
    print('Hello')
    print('World')
except Exception as e:
    if context.__exit__(sys.exc_info()):
        raise e
else:
    context.__exit__()


in enter
Hello
World
in exit

Context manager decorator

內建函式庫 contextlib 中有個好用的 decorator contextmanager 可以很簡單的將你的 function 變成 context manager


In [15]:
from contextlib import contextmanager

@contextmanager
def my_context():
    print('do things in enter')
    yield 'It is a feature, not a bug!!!'
    print('do things in exit')

with my_context() as obj:
    print('Hello')
    print('World')


do things in enter
Hello
World
do things in exit

到這邊你可能會疑惑這兩邊哪裡一樣?然後 yield 的東西呢?

其實你可以把 contextlib.contextmanager 視為一個轉換工具


In [16]:
@contextmanager
def my_context():
    print('do things in enter')
    yield 'It is a feature, not a bug!!!'
    print('do things in exit')

# 透過 contextmanager 轉換, 與下面這段程式碼等價

class my_context(object):
    def __enter__(self):
        print('do things in enter')
        return 'It is a feature, not a bug!!!'
    def __exit__(self, *excinfo):
        print('do things in exit')
        return 'exit'

Contextmanager limitation

Generator 中只允許一個物件


In [17]:
from contextlib import contextmanager

@contextmanager
def my_context():
    yield 'Hello'
    yield 'World'

with my_context():
    print('line 1')
    print('line 2')


line 1
line 2
---------------------------------------------------------------------------
RuntimeError                              Traceback (most recent call last)
<ipython-input-17-0c22725d7f6f> in <module>()
      8 with my_context():
      9     print('line 1')
---> 10     print('line 2')

/usr/lib/python3.5/contextlib.py in __exit__(self, type, value, traceback)
     68                 return
     69             else:
---> 70                 raise RuntimeError("generator didn't stop")
     71         else:
     72             if value is None:

RuntimeError: generator didn't stop

In [18]:
from contextlib import contextmanager

@contextmanager
def context_loop():
    for i in range(100):
        yield i

with context_loop():
    print('line 1')
    print('line 2')


line 1
line 2
---------------------------------------------------------------------------
RuntimeError                              Traceback (most recent call last)
<ipython-input-18-b141e9d6bce3> in <module>()
      8 with context_loop():
      9     print('line 1')
---> 10     print('line 2')

/usr/lib/python3.5/contextlib.py in __exit__(self, type, value, traceback)
     68                 return
     69             else:
---> 70                 raise RuntimeError("generator didn't stop")
     71         else:
     72             if value is None:

RuntimeError: generator didn't stop

In [19]:
from contextlib import contextmanager

@contextmanager
def context_condition(cond):
    if cond:
        yield 'in condition'
    else:
        yield 'else case'

with context_condition(True) as f1:
    print(f1)
    print('line 1')
    print('line 2')

with context_condition(False) as f2:
    print(f2)
    print('line 3')
    print('line 4')


in condition
line 1
line 2
else case
line 3
line 4

In [20]:
from contextlib import contextmanager

@contextmanager
def context_try(cond):
    try:
        yield 'normal'
    except Exception:
        print('exception')
    finally:
        print('finally')

with context_try(True):
    print('line 1')
    print('line 2')


line 1
line 2
finally

with statement

所以說到底 with 是甚麼? 根據 PEP 343 的說法, with 基本上就是 try/finally 的實現

Abstract
    This PEP adds a new statement "with" to the Python language to make
    it possible to factor out standard uses of try/finally statements.

    In this PEP, context managers provide __enter__() and __exit__()
    methods that are invoked on entry to and exit from the body of the
    with statement.

In [21]:
fp = open('test.txt', 'w+')
try:
    fp.write('Hello world')
finally:
    fp.close()
    
# 上面這種寫法透過 with 實現的話會變成下面這種程式碼

with open('test.txt', 'w+') as fp:
    fp.write('Hello world')

with + @contextmanager

複習一下: 我們可以透過 with 來做 try/finally 的實現, 還可以透過 contextmanager 來實現上下文管理, 那如果同時使用這兩個呢?

在 try/finally 架構中實現上下文管理

先經過 __enter__ entry, 然後 try/finally 本文, 最後再透過 __exit__ 離開這個 block


In [22]:
from contextlib import contextmanager

@contextmanager
def tag(name):
    print('<{}>'.format(name))
    yield
    print('</{}>'.format(name))

with tag('div'):
    print('content 1')


<div>
content 1
</div>

In [23]:
from contextlib import contextmanager

@contextmanager
def tag(name):
    print('<{}>'.format(name))
    yield
    print('</{}>'.format(name))

# nested
with tag('div'):
    with tag('p'):
        print('content 1')

print()
        
# multiple
with tag('div'), tag('h1'):
    print('content 2')


<div>
<p>
content 1
</p>
</div>

<div>
<h1>
content 2
</h1>
</div>

Case 1: redirect stdout to file

重新把原本會輸出在 stdout 的訊息導到 file descriptor


In [24]:
from contextlib import redirect_stdout

with open('test_redirect.txt', 'w+') as f:
    with redirect_stdout(f):
        help(redirect_stdout)
        print('afun defined message')

Case 2: timeit

前面有提到透過 decorator 來做 function 的 profiling, 但假如我今天不是要對 function, 而是單純要對某個 block 的程式碼做 profiling 呢? 是否要把這個 block 包成 function 再做 profiling ?

這個時候上面提到的 @contextmanager 就派上用場了, 透過 contextmanager 在 entry point 跟 exit point 都紀錄時間來計算耗時


In [25]:
from contextlib import contextmanager
import time

@contextmanager
def timeit(name=''):
    start_time = time.time()
    yield
    elapse_time = time.time() - start_time
    print('{} - completed in {:.6f}'.format(name, elapse_time))

# note: comprehension speed seens slower when range(10)
test = [i for i in range(100000)]

with timeit('afun 1'):
    a = [i for i in test if i % 2 == 0]

with timeit('afun 2'):
    b = []
    for i in test:
        if i % 2 == 0:
            b.append(i)

@timeit('afun 3')
def func(test_list):
    result = []
    for i in test_list:
        if i % 2 == 0:
            result.append(i)
    return result

c = func(test)


afun 1 - completed in 0.006175
afun 2 - completed in 0.012578
afun 3 - completed in 0.009176

Study 1: Detail of contextmanager

contextmanger 是一個為了 with statement 做出來不用特別建立具有 __enter____exit__ class 的 decorator

我們可以觀察原始碼近一步了解內部運作機制


In [26]:
from functools import wraps

# contextmanager 原始碼
# 這邊就是我們上面提過的 decorator 寫法
# 雖然他是回傳 _GeneratorContextManager 這個 helper class, 但其實這邊主要是透過 ContextDecorator 實作
def contextmanager(func):
    @wraps(func)
    def helper(*args, **kwds):
        return _GeneratorContextManager(func, args, kwds)
    return helper

# ContextDecorator 原始碼
# _recreate_cm 會回傳自己的 instance
class ContextDecorator(object):

    def _recreate_cm(self):
        return self
    
    def __call__(self, func):
        @wraps(func)
        def inner(*args, **kwds):
            with self._recreate_cm():
                return func(*args, **kwds)
        return inner

透過上面的原始碼你可以發現 contextmanager 目前有兩點限制

  1. 沒有 func 的資訊, 雖然我們用 wrap 保存了 func 資訊, 但是沒做任何操作, 也沒有任何 entry point 可以讓我們塞程式碼來了顯示 func 資訊
  2. 沒辦法修改 return, 原因同上, 這邊沒有任何可以外部增加的程式碼或是 flag