In [ ]:
%%html
<style>
.text_cell_render * {
   font-family: OfficinaSansCTT;
}
.reveal code {
    font-family: OfficinaSansCTT;
}
.text_cell_render h3 {
   font-family: OfficinaSansCTT;
}
.reveal section img {
    max-height: 500px;
    margin-left: auto;
    margin-right: auto;
}
</style>

Вопросы по прошлому занятию

  • Что найдет регулярка "^\d+.\d{1,2}.\d{1,2}\s[^A-Z]?$" ?
  • Что делает функция filter(lambda s: s.startswith("https://"), sys.stdin)?
  • Объясните своими словами, что такое yield.
  • В Python 2 была возможность получить очень странную ошибку ValueError: function 'func' accepts at least 2 arguments (2 given). В Python 3 сообщение об ошибке исправили на более информативное, но попытайтесь предположить, что надо было сделать для получения такой ошибки?
  • Как выбрать случайный элемент из списка?

Классы и магические методы


In [ ]:
a = 1
b = 3

In [ ]:
a + b
a.__add__(b)

In [ ]:
type(a)
isinstance(a, int)

In [2]:
class Animal(object):
    
    mammal = True  # class variable
    
    def __init__(self, name, voice, color="black"):
        self.name = name
        self.__voice = voice  # "приватный" или "защищенный" атрибут
        self._color = color  # "типа приватный" атрибут
        
    def make_sound(self):
        print('{0} {1} says "{2}"'.format(self._color, self.name, self.__voice))
    
    @classmethod
    def description(cls):
        print("Some animal")

In [5]:
Animal.mammal
Animal.description()
a = Animal("dog", "bark")
a.mammal


Some animal
Out[5]:
True

In [ ]:
c.__voice
c._color
dir(c)

In [ ]:


In [ ]:
class Cat(Animal):
    
    def __init__(self, color):
        super().__init__(name="cat", voice="meow", color=color)

In [ ]:
c = Cat(color="white")
isinstance(c, Animal)

In [ ]:
c1 = Cat(color="white")
c2 = Cat(color="black")
print(c1.mammal)
c1.mammal = False
print(c1.mammal)
print(c2.mammal)

In [ ]:


In [ ]:
c1 = Cat(color="white")
c2 = Cat(color="black")
print(c1.mammal)
Cat.mammal = False
print(c1.mammal)
print(c2.mammal)

In [ ]:


In [ ]:
c._color = "green"
c.make_sound()

In [ ]:
class Cat(Animal):
    
    def __init__(self, color):
        super().__init__(name="cat", voice="meow", color=color)
        
    @property
    def color(self):
        return self._color
    
    @color.setter
    def color(self, val):
        if val not in ("black", "white", "grey", "mixed"):
            raise Exception("Cat can't be {0}!".format(val))
        self._color = val

In [ ]:
c = Cat("white")
c.color

In [ ]:
c.color = "green"
c.color

Упражнение

Написать класс RangeCounter, который принимает начальное значение и шаг. У счетчика есть метод step(), который позволяет увеличить значение на размер шага. Давайте запретим менять значение напрямую, а шаг позволим устанавливать через сеттер.


In [ ]:

Более подробно о with


In [ ]:
class A(object):
    def __init__(self):
        self.sandbox = {}
        
    def __enter__(self):
        return self.sandbox
    
    def __exit__(self, exc_type, exc_value, traceback):
        self.sandbox = {}

In [ ]:
a = A()
with a as sbox:
    sbox["foo"] = "bar"
    print(sbox)

print(a.sandbox)

In [ ]:
from contextlib import contextmanager

@contextmanager
def contextgen():
    print("enter")
    yield 1
    print("exit")

In [ ]:
with contextgen() as a:
    print(a)

Что такое потоки?

  • Системный планировщик отдает процессорное время потокам/процессам, переключая между ними контекст
  • Процессы/потоки работают "параллельно", в идеале используя несколько ядер процессора
  • Пути планировщика неисповедимы, нельзя заранее предсказать, какой процесс получит ресурсы в конкретный момент
  • Потоки надо синхронизировать согласно задачам, чтобы не было проблем с одновременным доступом
  • Пример - простая версия веб-сервера
  • Есть CPU-bound задачи и есть I/O-bound задачи - важно понимать разницу

Что такое GIL?

  • GIL - это глобальный мьютекс (механизм синхронизации) в интерпретаторе Python
  • GIL запрещает выполнять байткод Python больше чем одному потоку одновременно
  • Но это касается ТОЛЬКО байткода Python и не распространяется на I/O операции
  • Потоки Python (в отличие от потоков, скажем, в Ruby) - это полноценные потоки ОС

In [ ]:
import os
import requests
from threading import Thread
 
class DownloadThread(Thread):

    def __init__(self, url, name):
        super().__init__()
        self.url = url
        self.name = name
    
    def run(self):
        res = requests.get(self.url, stream=True)
        res.raise_for_status()        
        fname = os.path.basename(self.url)
    
        with open(fname, "wb") as savefile:
            for chunk in res.iter_content(1024):
                savefile.write(chunk)
        
        print(f"{self.name} закончил загрузку {self.url} !")

In [ ]:
def main(urls): 
    for item, url in enumerate(urls):
        thread = DownloadThread(url, f"Поток {item + 1}")
        thread.start()

main([
    "http://www.irs.gov/pub/irs-pdf/f1040.pdf",
    "http://www.irs.gov/pub/irs-pdf/f1040a.pdf",
    "http://www.irs.gov/pub/irs-pdf/f1040ez.pdf",
    "http://www.irs.gov/pub/irs-pdf/f1040es.pdf",
    "http://www.irs.gov/pub/irs-pdf/f1040sb.pdf"
])

"""
В данном случае интерпретатор дожидается
завершения всех дочерних потоков.
В других языках может быть иначе!
"""

In [ ]:
import queue

class DownloadThread2(Thread):

    def __init__(self, queue, name):
        super().__init__()
        self.queue = queue
        self.name = name
    
    def run(self):
        while True:
            url = self.queue.get()
            fname = os.path.basename(url)
            
            res = requests.get(url, stream=True)
            res.raise_for_status()        
        
            with open(fname, "wb") as savefile:
                for chunk in res.iter_content(1024):
                    savefile.write(chunk)
                    
            self.queue.task_done()
            print(f"{self.name} закончил загрузку {url} !")

In [ ]:
def main(urls):
    q = queue.Queue()
    threads = [DownloadThread2(q, f"Поток {i + 1}") for i in range(2)]
    for t in threads:
        # заставляем интерпретатор НЕ ждать завершения дочерних потоков
        t.setDaemon(True)
        t.start()
    
    for url in urls:
        q.put(url)
        
    q.join()  # все обработано - выходим

main([
    "http://www.irs.gov/pub/irs-pdf/f1040.pdf",
    "http://www.irs.gov/pub/irs-pdf/f1040a.pdf",
    "http://www.irs.gov/pub/irs-pdf/f1040ez.pdf",
    "http://www.irs.gov/pub/irs-pdf/f1040es.pdf",
    "http://www.irs.gov/pub/irs-pdf/f1040sb.pdf"
])

Упражнение

Реализовать "sleepsort". Предположим у нас есть короткий список чисел от 0 до 10. Чтобы их вывести в отсортированном порядке - достаточно каждый поток заставить "спать" количество секунд, равное самому числу, и только потом его выводить. В чем недостаток данного подхода?


In [ ]:

Как обойти GIL?

  • Например, использовать процессы вместо потоков.
  • Тогда проблема будет с синхронизацией и обменом сообщениями (см. pickle)
  • И процессы все-таки немного тяжелее потоков. Стартовать по процессу на каждого клиента слишком дорого.

In [ ]:
from multiprocessing import Process
from multiprocessing import Queue

Ipyparallel

  • 0MQ + Kernels
  • Поддержка платформ наподобие EC2
  • mpi4py
  • Task DAG

https://ipyparallel.readthedocs.io/en/latest/


In [ ]:
import time
from concurrent.futures import ThreadPoolExecutor
# аналогично с ProcessPoolExecutor
 
def hold_my_beer_5_sec(beer):
    time.sleep(5)
    return beer
 
pool = ThreadPoolExecutor(3)
 
future = pool.submit(hold_my_beer_5_sec, ("Балтика"))
print(future.done())
time.sleep(5)
print(future.done())
print(future.result())

In [ ]:


In [ ]:
import concurrent.futures
import requests
 
def load_url(url):
    fname = os.path.basename(url)
    res = requests.get(url, stream=True)
    res.raise_for_status()        

    with open(fname, "wb") as savefile:
        for chunk in res.iter_content(1024):
            savefile.write(chunk)
    return fname

In [ ]:
URLS = [
    "http://www.irs.gov/pub/irs-pdf/f1040.pdf",
    "http://www.irs.gov/pub/irs-pdf/f1040a.pdf",
    "http://www.irs.gov/pub/irs-pdf/f1040ez.pdf",
    "http://www.irs.gov/pub/irs-pdf/f1040es.pdf",
    "http://www.irs.gov/pub/irs-pdf/f1040sb.pdf"
]

with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
    future_to_url = {
        executor.submit(load_url, url): url
        for url in URLS
    }
    for future in concurrent.futures.as_completed(future_to_url):
        url = future_to_url[future]
        print(f"URL '{future_to_url[future]}' is saved to '{future.result()}'")

Примитивы синхронизации - мьютекс


In [ ]:
m = threading.Lock()
m.acquire()
m.release()

In [ ]: