Математическая статистика

Практическое задание 0

В данном задании предлагается решить 4 простых задачи на использование функций библиотеки numpy. Хоть само задание и не относится к курсу статистики, оно является важным в условиях отсутствия курса по Питону. Решение этих задач поможет научить писать простой и понятный код, работающий при этом в десятки или даже в сотни раз быстрее. Нам же это облегчит процесс проверки.

Правила:

  • Задание считается выполненным, если решено не менее трех задач.
  • Успешное выполнение задание является допуском для выполнения следующих практических заданий.
  • В случае неуспешного выполнения задания допускаются две попытки повторной сдачи. Мы будем стараться отвечать в течении трех дней.
  • Выполненную работу нужно отправить на почту probability.diht@yandex.ru, указав тему письма "[номер группы] Фамилия Имя - Задание 0". Квадратные скобки обязательны.
  • Прислать нужно ноутбук и его pdf-версию. Названия файлов должны быть такими: 0.N.ipynb и 0.N.pdf, где N - ваш номер из таблицы с оценками.
  • В данном задании весь присылаемый код должен корректно работать на Python 3.5.

Во всех заданиях предполагается, что все аргументы функций, которые нужно реализовать, имеют тип numpy.array либо являются числами. Возвращать нужно также либо numpy.array, либо число. Кроме того, предполагается, что все аргументы корректны, и проверять их на корректность не нужно.

При реализации запрещается пользоваться любыми циклами, в том числе стандартными функциями языка, которые заменяют циклы. Можно использовать любые функции библиотек numpy или scipy, кроме функции numpy.fromfunction и декторатора numpy.vectorize.


In [3]:
import numpy as np
import scipy.stats as sps
import matplotlib.pyplot as plt

%matplotlib inline

Задача 1. Напишите функцию, реализующую матричное умножение. При вычислении разрешается создавать объекты размерности три. Запрещается пользоваться функциями, реализующими матричное умножение (numpy.dot, операция @, операция умножения в классе numpy.matrix). Авторское решение занимает одну строчку.


In [12]:
def matrix_multiplication(A, B): 
    '''Возвращает матрицу, которая является результатом
    матричного умножения матриц A и B.
    
    '''
    # Запоминаем размерности: мы умножаем матрицу размера nxm на матрицу размера mxk.
    n, m, k = A.shape[0], A.shape[1], B.shape[1]
    # Считаем внешнее произведение матриц. Заметим, что тогда получается матрица размера (nm)x(mk), таким образом,
    # матрица разбивает на nk подматриц размера mxm.
    C = np.array(np.outer(A, B.T))
    # Давайте так и разобьем их на подматрицы. К сожалению, сразу сделать numpy.reshape не выдаст желаемого результата
    # из-за "сканирования по строчкам". Сначала разобьем по вертикали на k столбцов нашу матрицу.
    D = np.array(np.hsplit(C, k))
    # Теперь можно сделать уже нормальный reshape, и все будет как надо. Мы получили матрицу nxk, элементами которой являются
    # матрицы размера mxm. Единственное, она получилась транспонированной. Пока что так даже удобнее, поскольку транспонировать
    # ее в таком виде будет криво (какие-то сложности из-за излишней многомерности).
    E = D.reshape(k, n, m, m)
    # Ну давайте посчитаем след каждой такой матрицы размера mxm. Совершенно случайно он оказывается элементом искомого
    # произведения матриц (именно это было замечено при выполнении numpy.outer). Таким образом, все хорошо, разве что
    # не забудем все же транспонировать матрицу.
    # P. S. Заметим, что каждой операцией мы получаем матрицу, а потом только один раз ее используем. Так что при большом
    # желании это все упихивается в одну никому не понятную строчку:
    # np.trace(np.array(np.hsplit(np.array(np.outer(A, B.T)), B.shape[1])).
    #                            reshape(B.shape[1], A.shape[0], A.shape[1], A.shape[1]), axis1=2, axis2=3).T
    # Может, так будет быстрее.
    return np.trace(E, axis1=2, axis2=3).T

Проверьте правильность реализации на случайных матрицах. Должен получится ноль.


In [13]:
A = sps.uniform.rvs(size=(10, 20))
B = sps.uniform.rvs(size=(20, 30))
np.abs(matrix_multiplication(A, B) - A @ B).sum()


Out[13]:
1.3011813848606835e-13

А вот в таком стиле вы присылали бы нам свои работы, если не стали бы делать это задание.


In [14]:
def stupid_matrix_multiplication(A, B):
    C = [[0 for j in range(len(B[0]))] for i in range(len(A))]
    for i in range(len(A)):
        for j in range(len(B[0])):
            for k in range(len(B)):
                C[i][j] += A[i][k] * B[k][j]
    return C

Проверьте, насколько быстрее работает ваш код по сравнению с неэффективной реализацией stupid_matrix_multiplication. Эффективный код должен работать почти в 200 раз быстрее. Для примера посмотрите также, насколько быстрее работают встроенные numpy-функции.


In [15]:
A = sps.uniform.rvs(size=(400, 200))
B = sps.uniform.rvs(size=(200, 300))

%time C1 = matrix_multiplication(A, B)
%time C2 = A @ B  # python 3.5
%time C3 = np.matrix(A) * np.matrix(B)
%time C4 = stupid_matrix_multiplication(A, B)


---------------------------------------------------------------------------
MemoryError                               Traceback (most recent call last)
<ipython-input-15-49ebb7c90123> in <module>()
      2 B = sps.uniform.rvs(size=(200, 300))
      3 
----> 4 get_ipython().magic('time C1 = matrix_multiplication(A, B)')
      5 get_ipython().magic('time C2 = A @ B  # python 3.5')
      6 get_ipython().magic('time C3 = np.matrix(A) * np.matrix(B)')

C:\Users\EvgenyShlykov\Anaconda3\lib\site-packages\IPython\core\interactiveshell.py in magic(self, arg_s)
   2156         magic_name, _, magic_arg_s = arg_s.partition(' ')
   2157         magic_name = magic_name.lstrip(prefilter.ESC_MAGIC)
-> 2158         return self.run_line_magic(magic_name, magic_arg_s)
   2159 
   2160     #-------------------------------------------------------------------------

C:\Users\EvgenyShlykov\Anaconda3\lib\site-packages\IPython\core\interactiveshell.py in run_line_magic(self, magic_name, line)
   2077                 kwargs['local_ns'] = sys._getframe(stack_depth).f_locals
   2078             with self.builtin_trap:
-> 2079                 result = fn(*args,**kwargs)
   2080             return result
   2081 

<decorator-gen-60> in time(self, line, cell, local_ns)

C:\Users\EvgenyShlykov\Anaconda3\lib\site-packages\IPython\core\magic.py in <lambda>(f, *a, **k)
    186     # but it's overkill for just that one bit of state.
    187     def magic_deco(arg):
--> 188         call = lambda f, *a, **k: f(*a, **k)
    189 
    190         if callable(arg):

C:\Users\EvgenyShlykov\Anaconda3\lib\site-packages\IPython\core\magics\execution.py in time(self, line, cell, local_ns)
   1178         else:
   1179             st = clock2()
-> 1180             exec(code, glob, local_ns)
   1181             end = clock2()
   1182             out = None

<timed exec> in <module>()

<ipython-input-12-f84c8799d2d1> in matrix_multiplication(A, B)
      8     # Считаем внешнее произведение матриц. Заметим, что тогда получается матрица размера (nm)x(mk), таким образом,
      9     # матрица разбивает на nk подматриц размера mxm.
---> 10     C = np.array(np.outer(A, B.T))
     11     # Давайте так и разобьем их на подматрицы. К сожалению, сразу сделать numpy.reshape не выдаст желаемого результата
     12     # из-за "сканирования по строчкам". Сначала разобьем по вертикали на k столбцов нашу матрицу.

C:\Users\EvgenyShlykov\Anaconda3\lib\site-packages\numpy\core\numeric.py in outer(a, b, out)
   1091     a = asarray(a)
   1092     b = asarray(b)
-> 1093     return multiply(a.ravel()[:, newaxis], b.ravel()[newaxis,:], out)
   1094 
   1095 

MemoryError: 

Ниже для примера приведена полная реализация функции. Ваc мы, конечно, не будем требовать проверять входные данные на корректность, но документации к функциям нужно писать.


In [16]:
def matrix_multiplication(A, B):
    '''Возвращает матрицу, которая является результатом
    матричного умножения матриц A и B.
    
    '''
    
    # Если A или B имеют другой тип, нужно выполнить преобразование типов
    A = np.array(A)
    B = np.array(B)
    
    # Проверка данных входных данных на корректность
    assert A.ndim == 2 and B.ndim == 2, 'Размер матриц не равен 2'
    assert A.shape[1] == B.shape[0], ('Матрицы размерностей '
                                      '{} и {} неперемножаемы'.format(A.shape,
                                                                      B.shape))
    
    # Запоминаем размерности: мы умножаем матрицу размера nxm на матрицу размера mxk.
    n, m, k = A.shape[0], A.shape[1], B.shape[1]
    # Считаем внешнее произведение матриц. Заметим, что тогда получается матрица размера (nm)x(mk), таким образом,
    # матрица разбивает на nk подматриц размера mxm.
    C = np.array(np.outer(A, B.T))
    # Давайте так и разобьем их на подматрицы. К сожалению, сразу сделать numpy.reshape не выдаст желаемого результата
    # из-за "сканирования по строчкам". Сначала разобьем по вертикали на k столбцов нашу матрицу.
    D = np.array(np.hsplit(C, k))
    # Теперь можно сделать уже нормальный reshape, и все будет как надо. Мы получили матрицу nxk, элементами которой являются
    # матрицы размера mxm. Единственное, она получилась транспонированной. Пока что так даже удобнее, поскольку транспонировать
    # ее в таком виде будет криво (какие-то сложности из-за излишней многомерности).
    E = D.reshape(k, n, m, m)
    # Ну давайте посчитаем след каждой такой матрицы размера mxm. Совершенно случайно он оказывается элементом искомого
    # произведения матриц (именно это было замечено при выполнении numpy.outer). Таким образом, все хорошо, разве что
    # не забудем все же транспонировать матрицу.
    # P. S. Заметим, что каждой операцией мы получаем матрицу, а потом только один раз ее используем. Так что при большом
    # желании это все упихивается в одну никому не понятную строчку:
    # np.trace(np.array(np.hsplit(np.array(np.outer(A, B.T)), B.shape[1])).
    #                            reshape(B.shape[1], A.shape[0], A.shape[1], A.shape[1]), axis1=2, axis2=3).T
    # Может, так будет быстрее.
    return np.trace(E, axis1=2, axis2=3).T

Задача 2. Напишите функцию, которая по входной последовательности $X = (X_1, ..., X_n)$ строит последовательность $S = (S_1, ..., S_n)$, где $S_k = \frac{X_1 + ... + X_k}{k}$. Авторское решение занимает одну строчку.


In [17]:
def cumavg(X):
    ''' Возвращает по входной последовательности $X = (X_1, ..., X_n)$
    последовательность $S = (S_1, ..., S_n)$, где $S_k = \frac{X_1 + ... + X_k}{k}$.
    
    '''
    # Считаем частичные суммы, а затем покоординатно делим их на числа 1, 2 и так далее по размеру входной последовательности.
    return X.cumsum() / np.linspace(1, X.size, X.size)

Постройте график зависимости $S_k$ от $k$. График должен быть в виде ломанной линии с достаточно крупными точками. Размер фигуры 15 на 5, сетка в виде пунктирной линии.


In [18]:
S = cumavg(sps.uniform.rvs(size=100))

plt.figure(figsize=(15, 5))  # Размер фигуры 15 на 5.
plt.plot(S, '-o')  # Ставим массив S на график, соединяем точки ломаной и ставим достаточно крупные точки.
plt.grid(linestyle='--')  # Делаем решетку пунктирными линиям.
plt.show()  # Показываем график.


Проверьте корректность работы реализации, а также ее эффективность. Эффективный код должен работать в 50 раз быстрее.


In [19]:
def stupid_cumavg(X):
    S = [0 for i in range(len(X))]
    for i in range(len(X)):
        S[i] = X[i] + S[i - 1]
    for i in range(len(X)):
        S[i] /= i + 1
    return S

X = sps.uniform.rvs(size=10 ** 7)

%time S1 = cumavg(X)
%time S2 = stupid_cumavg(X)

np.abs(S1 - S2).sum()


Wall time: 196 ms
Wall time: 8.58 s
Out[19]:
0.0

Задача 3. Дана матрица $A = (a_{ij})$ размера $n \times m$. Вычислите величину $$\frac{1}{m} \sum_{j=1}^m \min_{i=1, ..., n} a_{ij},$$ то есть средний минимум по столбцам. Авторское решение занимает одну строчку.


In [20]:
def avgmin(A):
    '''
    Возвращает средний минимум по столбцам матрицы A.
    
    '''
    # Получаем массив минимумов по всем столбцам, из которого находим среднее значение.
    return  A.min(axis=0).mean()

Проверьте корректность работы реализации, а также ее эффективность. Эффективный код должен работать почти в 200 раз быстрее. Обратите внимание, что разность чисел может быть не равна нулю из-за ошибок округления, но должна иметь малый порядок.


In [21]:
def stupid_avgmin(A):
    N, M = len(A), len(A[0])
    min_col = [min([A[i][j] for i in range(N)]) for j in range(M)]
    return sum(min_col) / M

N, M = 5000, 10000
A = sps.uniform.rvs(size=(N, M))

%time S1 = avgmin(A)
%time S2 = stupid_avgmin(A)

print(np.abs(S1 - S2))


Wall time: 96.1 ms
Wall time: 22.9 s
3.7947076037e-19

Задача 4. Дан массив $X$. Требуется построить новый массив, в котором все четные элементы $X$ заменить на число $v$ (если оно не указано, то на ноль). Все нечетные элементы исходного массива нужно возвести в квадрат и записать в обратном порядке относительно позиций этих элементов. Массив $X$ при этом должен остаться без изменений.


In [22]:
def func4(X, v=0):
    '''
    Вовзращает новый массив по массиву X по правилу: все четные элементы $X$
    заменены на число $v$ (если оно не указано, то на ноль). Все нечетные элементы исходного массива
    возведены в квадрат и записаны в обратном порядке относительно позиций этих элементов.  
    
    '''
    res = X.copy()  # Копируем массив X в переменную res.
    # Маска для нечетных чисел в массиве.
    mask = X % 2 != 0
    # Берем подматрицу их нечетных чисел, возводим все числа в ней в квадрат, а потом реверсим.
    res[mask] = (X[mask] ** 2)[::-1]
    # Оставляем все нечетные числа как есть, а четные превращаем в v; пользуемся v * True == v и v * False == 0.
    return res * mask + v * np.logical_not(mask)

Проверьте корректность работы реализации, а также ее эффективность. Эффективный код должен работать в 20 раз быстрее.


In [23]:
def stupid_func4(X, v=0):
    odd = [elem ** 2 for elem in X if elem % 2]
    
    new_X = []
    j = len(odd) - 1
    for i in range(len(X)):
        if X[i] % 2:
            new_X.append(odd[j])
            j -= 1
        else:
            new_X.append(v)
    
    return new_X

X = sps.randint.rvs(size=10 ** 7, low=0, high=100)

%time A1 = func4(X)
%time A2 = stupid_func4(X)

np.abs(A1 - A2).sum()


Wall time: 313 ms
Wall time: 7 s
Out[23]:
0

Вопрос: За счет чего достигается такая эффективность методов numpy?

Ответ: Пакет numpy предоставляет $n$-мерные однородные массивы (все элементы одного типа); в них нельзя вставить или удалить элемент в произвольном месте. Если задачу можно решить, произведя некоторую последовательность операций над массивами, то это будет столь же эффективно, как в C или matlab - львиная доля времени тратится в библиотечных функциях, написанных на C.