$ conda install ipyparallel
In [1]:
from ipyparallel import Client
c = Client()
c.ids
Out[1]:
In [2]:
dview = c[:]
dview
Out[2]:
In [3]:
def fahrenheit(T):
return 9 / 5 * T + 32
temp = np.arange(0, 110, 10)
temp
Out[3]:
In [15]:
F = map(fahrenheit, temp)
F, list(F)
Out[15]:
In [6]:
def create_prime(primes, n):
for p in primes:
if n % p == 0:
return primes
primes.append(n)
return primes
In [8]:
from functools import reduce
In [11]:
reduce(create_prime, np.arange(2, 100), [2])
Out[11]:
In [16]:
def pyprimes(kmax): #의미 생각하지 말고 소수 구하는 복잡한 함수다 정도만 알아두어라
p = np.zeros(1000)
result = []
if kmax > 1000:
kmax = 1000
k = 0
n = 2
while k < kmax:
i = 0
while i < k and n % p[i] != 0:
i = i + 1
if i == k:
p[k] = n
k = k + 1
result.append(n)
n = n + 1
return result
In [19]:
%time result = map(pyprimes, range(700, 1000)) #도커 안이라서 아래와 이것과 시간이 같게 나올 것이다. 아래 거는 서버에서 돌리면 다를듯
In [20]:
%time parallel_result = dview.map_sync(pyprimes, range(700, 1000)) #6명 중 1명이라도 답을 안준다면 안 주고 다 끝나고 나서 끝이다.
In [21]:
parallel_result == result
Out[21]:
In [22]:
async_result = dview.map_async(pyprimes, range(700, 1000)) #안 끝나도 중간에 제어권 돌려주고 모니터링 알아서 해라.
In [25]:
async_result.progress #몇 명이 완성했는지 알려준다.
Out[25]:
In [24]:
async_result.get()[0][-10:]
Out[24]:
모형을 분산처리하기 위해서는 sklearn.externals 서브패키지의 joblib.dump
명령과 joblib.load
명령 사용
pikle형태로 지금의 모델 안에 어트리뷰트 가진 형태대로 세이브 하고 긁어오고 한다.
In [26]:
from sklearn.datasets import fetch_20newsgroups
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.svm import SVC
from sklearn.pipeline import Pipeline
In [28]:
news = fetch_20newsgroups(subset="all")
n_samples = 3000
X_train = news.data[:n_samples]
y_train = news.target[:n_samples]
model = Pipeline([
('vect', TfidfVectorizer(stop_words="english", token_pattern="\b[a-z0-9_\-\.]+[a-z][a-z0-9_\-\.]+\b")),
('svc', SVC()),
])
In [29]:
from sklearn.externals import joblib
import os
from sklearn.cross_validation import KFold, cross_val_score
In [30]:
def persist_cv_splits(X, y, K=3, name="data", suffix="_cv_%03d.pkl"): #데이터를 나눈다. 나눠서 저장한다.
cv_split_filenames = []
cv = KFold(n_samples, K, shuffle=True, random_state=0)
for i, (train, test) in enumerate(cv):
cv_fold = ([X[k] for k in train], y[train],
[X[k] for k in test], y[test])
cv_split_filename = name + suffix % i
cv_split_filename = os.path.abspath(cv_split_filename)
joblib.dump(cv_fold, cv_split_filename)
cv_split_filenames.append(cv_split_filename)
return cv_split_filenames
cv_filenames = persist_cv_splits(X_train, y_train, name="news")
cv_filenames
Out[30]:
In [31]:
def compute_evaluation(cv_split_filename, model, params):
from sklearn.externals import joblib
X_train_, y_train_, X_test_, y_test_ = joblib.load(cv_split_filename, mmap_mode="c")
model.set_params(**params)
model.fit(X_train_, y_train_)
test_scores = model.score(X_test_, y_test_)
return test_scores
In [32]:
from sklearn.grid_search import ParameterGrid
def parallel_grid_search(lb_view, model, cv_split_filenames, param_grid): #lb_view 엔진에 대한 view.
all_tasks = []
all_parameters = list(ParameterGrid(param_grid))
for i, params in enumerate(all_parameters):
task_for_params = []
for j, cv_split_filename in enumerate(cv_split_filenames):
t = lb_view.apply(compute_evaluation, cv_split_filename, model, params) #map이랑 유사. apply는 하나짜리 함수 실행. 여기 말고 엔진에 가서 실행
task_for_params.append(t)
all_tasks.append(task_for_params)
return all_parameters, all_tasks
In [33]:
import datetime
def print_progress(tasks):
progress = np.mean([task.ready() for task_group in tasks for task in task_group])
print("{0}:{1}%".format(datetime.datetime.now(), progress * 100.0))
return int(progress * 100.0)
In [34]:
from ipyparallel import Client
client = Client()
print(client.ids)
lb_view = client.load_balanced_view()
In [35]:
from sklearn.grid_search import GridSearchCV
parameters = {
"svc__gamma": np.logspace(-2, 1, 4),
"svc__C": np.logspace(-1, 1, 3),
}
In [36]:
all_parameters, all_tasks = parallel_grid_search(lb_view, model, cv_filenames, parameters)
In [37]:
import time
start_time = datetime.datetime.now()
while True:
progress = print_progress(all_tasks)
if progress >= 100:
break
time.sleep(1)
print("finish")
end_time = datetime.datetime.now()
print((end_time - start_time).total_seconds())
In [ ]: