$ conda install ipyparallel
In [4]:
from ipyparallel import Client
c = Client()
c.ids
Out[4]:
In [5]:
dview = c[:]
dview
Out[5]:
In [6]:
def fahrenheit(T):
return 9 / 5 * T + 32
temp = np.arange(0, 110, 10)
temp
Out[6]:
In [7]:
F = map(fahrenheit, temp)
F
Out[7]:
In [8]:
def create_prime(primes, n):
for p in primes:
if n % p == 0:
return primes
primes.append(n)
return primes
In [9]:
reduce(create_prime, np.arange(2, 100), [2])
Out[9]:
In [12]:
def pyprimes(kmax):
p = np.zeros(1000)
result = []
if kmax > 1000:
kmax = 1000
k = 0
n = 2
while k < kmax:
i = 0
while i < k and n % p[i] != 0:
i = i + 1
if i == k:
p[k] = n
k = k + 1
result.append(n)
n = n + 1
return result
In [13]:
%time result = map(pyprimes, range(700, 1000))
In [14]:
%time parallel_result = dview.map_sync(pyprimes, range(700, 1000))
In [15]:
parallel_result == result
Out[15]:
In [16]:
async_result = dview.map_async(pyprimes, range(700, 1000))
In [21]:
async_result.progress
Out[21]:
In [24]:
async_result.get()[0][-10:]
Out[24]:
모형을 분산처리하기 위해서는 sklearn.externals 서브패키지의 joblib.dump
명령과 joblib.load
명령 사용
In [36]:
from sklearn.datasets import fetch_20newsgroups
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.svm import SVC
from sklearn.pipeline import Pipeline
news = fetch_20newsgroups(subset="all")
n_samples = 3000
X_train = news.data[:n_samples]
y_train = news.target[:n_samples]
model = Pipeline([
('vect', TfidfVectorizer(stop_words="english", token_pattern=ur"\b[a-z0-9_\-\.]+[a-z][a-z0-9_\-\.]+\b")),
('svc', SVC()),
])
In [28]:
from sklearn.externals import joblib
import os
from sklearn.cross_validation import KFold, cross_val_score
def persist_cv_splits(X, y, K=3, name="data", suffix="_cv_%03d.pkl"):
cv_split_filenames = []
cv = KFold(n_samples, K, shuffle=True, random_state=0)
for i, (train, test) in enumerate(cv):
cv_fold = ([X[k] for k in train], y[train],
[X[k] for k in test], y[test])
cv_split_filename = name + suffix % i
cv_split_filename = os.path.abspath(cv_split_filename)
joblib.dump(cv_fold, cv_split_filename)
cv_split_filenames.append(cv_split_filename)
return cv_split_filenames
cv_filenames = persist_cv_splits(X_train, y_train, name="news")
cv_filenames
Out[28]:
In [29]:
def compute_evaluation(cv_split_filename, model, params):
from sklearn.externals import joblib
X_train_, y_train_, X_test_, y_test_ = joblib.load(cv_split_filename, mmap_mode="c")
model.set_params(**params)
model.fit(X_train_, y_train_)
test_scores = model.score(X_test_, y_test_)
return test_scores
In [30]:
from sklearn.grid_search import ParameterGrid
def parallel_grid_search(lb_view, model, cv_split_filenames, param_grid):
all_tasks = []
all_parameters = list(ParameterGrid(param_grid))
for i, params in enumerate(all_parameters):
task_for_params = []
for j, cv_split_filename in enumerate(cv_split_filenames):
t = lb_view.apply(compute_evaluation, cv_split_filename, model, params)
task_for_params.append(t)
all_tasks.append(task_for_params)
return all_parameters, all_tasks
In [31]:
import datetime
def print_progress(tasks):
progress = np.mean([task.ready() for task_group in tasks for task in task_group])
print("{0}:{1}%".format(datetime.datetime.now(), progress * 100.0))
return int(progress * 100.0)
In [32]:
from ipyparallel import Client
client = Client()
print(client.ids)
lb_view = client.load_balanced_view()
In [40]:
from sklearn.grid_search import GridSearchCV
parameters = {
"svc__gamma": np.logspace(-2, 1, 4),
"svc__C": np.logspace(-1, 1, 3),
}
In [41]:
all_parameters, all_tasks = parallel_grid_search(lb_view, model, cv_filenames, parameters)
In [ ]:
import time
start_time = datetime.datetime.now()
while True:
progress = print_progress(all_tasks)
if progress >= 100:
break
time.sleep(1)
print("finish")
end_time = datetime.datetime.now()
print((end_time - start_time).total_seconds())