"Part B: 暗号化されたデータを使っての集計"の実行にはnumpyが必要なのですが、numpyはPyTorchのhookの機能をもっておらず、numpyを拡張させたオブジェクト等も準備が出来ていないため、動きません。
詳細はこちらで確認することができます。 https://github.com/OpenMined/PySyft/issues/2771.
"Part B"が動くようになり次第、このセルは削除される予定です。
前回のセクションでは、暗号化されたデータでのコンピューテーションを学びました。本セクションでは、Part 4で学んだFederated Learning Demo of Part 4へ戻りましょう。複数のワーカーからのモデルのアップデートの集計を担当する信頼できる集計者、"trusted aggregator"を紹介したチュートリアルです。
今回は新しい手法を採用することで、信頼できる集計者を使わないくても良いやり方を紹介します。いくら信頼できるとはいえ、誰かにデリケートな情報へのアクセス権を与えてしまうというのはベストとは言えないですよね。
と言う事で、このnotebookでは、SMPCを使う事で信頼できる集計者、"trusted aggregator"無しでセキュアに集計が出来る手法を紹介します。
Authors:
In [ ]:
import pickle
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import TensorDataset, DataLoader
class Parser:
"""ハイパーパラメータ"""
def __init__(self):
self.epochs = 10
self.lr = 0.001
self.test_batch_size = 8
self.batch_size = 8
self.log_interval = 10
self.seed = 1
args = Parser()
torch.manual_seed(args.seed)
kwargs = {}
In [ ]:
with open('../data/BostonHousing/boston_housing.pickle','rb') as f:
((X, y), (X_test, y_test)) = pickle.load(f)
X = torch.from_numpy(X).float()
y = torch.from_numpy(y).float()
X_test = torch.from_numpy(X_test).float()
y_test = torch.from_numpy(y_test).float()
# 前処理
mean = X.mean(0, keepdim=True)
dev = X.std(0, keepdim=True)
mean[:, 3] = 0. # 3番目のカラムはバイナリ,
dev[:, 3] = 1. # なので、ノーマライズはしません
X = (X - mean) / dev
X_test = (X_test - mean) / dev
train = TensorDataset(X, y)
test = TensorDataset(X_test, y_test)
train_loader = DataLoader(train, batch_size=args.batch_size, shuffle=True, **kwargs)
test_loader = DataLoader(test, batch_size=args.test_batch_size, shuffle=True, **kwargs)
In [ ]:
class Net(nn.Module):
def __init__(self):
super(Net, self).__init__()
self.fc1 = nn.Linear(13, 32)
self.fc2 = nn.Linear(32, 24)
self.fc3 = nn.Linear(24, 1)
def forward(self, x):
x = x.view(-1, 13)
x = F.relu(self.fc1(x))
x = F.relu(self.fc2(x))
x = self.fc3(x)
return x
model = Net()
optimizer = optim.SGD(model.parameters(), lr=args.lr)
In [ ]:
import syft as sy
hook = sy.TorchHook(torch)
bob = sy.VirtualWorker(hook, id="bob")
alice = sy.VirtualWorker(hook, id="alice")
james = sy.VirtualWorker(hook, id="james")
compute_nodes = [bob, alice]
データをワーカーへ送る 本来であれば、彼らこそがデータを元々もっているはずで、こちらから送るのはあべこべですが、チュートリアルをスムーズに進めるためには仕方がありませんね。
In [ ]:
train_distributed_dataset = []
for batch_idx, (data,target) in enumerate(train_loader):
data = data.send(compute_nodes[batch_idx % len(compute_nodes)])
target = target.send(compute_nodes[batch_idx % len(compute_nodes)])
train_distributed_dataset.append((data, target))
In [ ]:
def train(epoch):
model.train()
for batch_idx, (data,target) in enumerate(train_distributed_dataset):
worker = data.location
model.send(worker)
optimizer.zero_grad()
# update the model
pred = model(data)
loss = F.mse_loss(pred.view(-1), target)
loss.backward()
optimizer.step()
model.get()
if batch_idx % args.log_interval == 0:
loss = loss.get()
print('Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}'.format(
epoch, batch_idx * data.shape[0], len(train_loader),
100. * batch_idx / len(train_loader), loss.item()))
In [ ]:
def test():
model.eval()
test_loss = 0
for data, target in test_loader:
output = model(data)
test_loss += F.mse_loss(output.view(-1), target, reduction='sum').item() # sum up batch loss
pred = output.data.max(1, keepdim=True)[1] # get the index of the max log-probability
test_loss /= len(test_loader.dataset)
print('\nTest set: Average loss: {:.4f}\n'.format(test_loss))
In [ ]:
import time
In [ ]:
t = time.time()
for epoch in range(1, args.epochs + 1):
train(epoch)
total_time = time.time() - t
print('Total', round(total_time, 2), 's')
In [ ]:
test()
In [ ]:
remote_dataset = (list(),list())
train_distributed_dataset = []
for batch_idx, (data,target) in enumerate(train_loader):
data = data.send(compute_nodes[batch_idx % len(compute_nodes)])
target = target.send(compute_nodes[batch_idx % len(compute_nodes)])
remote_dataset[batch_idx % len(compute_nodes)].append((data, target))
def update(data, target, model, optimizer):
model.send(data.location)
optimizer.zero_grad()
pred = model(data)
loss = F.mse_loss(pred.view(-1), target)
loss.backward()
optimizer.step()
return model
bobs_model = Net()
alices_model = Net()
bobs_optimizer = optim.SGD(bobs_model.parameters(), lr=args.lr)
alices_optimizer = optim.SGD(alices_model.parameters(), lr=args.lr)
models = [bobs_model, alices_model]
params = [list(bobs_model.parameters()), list(alices_model.parameters())]
optimizers = [bobs_optimizer, alices_optimizer]
In [ ]:
# トレーニングを行うバッチを選択します。
data_index = 0
# リモートモデルをアップデートします
# 次のステップへ進む前に何度か繰り返しても良いのですが、ここではワーカー事1回です。
for remote_index in range(len(compute_nodes)):
data, target = remote_dataset[remote_index][data_index]
models[remote_index] = update(data, target, models[remote_index], optimizers[remote_index])
In [ ]:
# 平均化された暗号化されたモデルを保存するリストを作成しておきます
new_params = list()
In [ ]:
compute_nodes
In [ ]:
# 全てのパラメータに対して繰り返します
for param_i in range(len(params[0])):
# 全てのワーカーに対して
spdz_params = list()
for remote_index in range(len(compute_nodes)):
# 特定ワーカーの特定のモデルパラメータをコピーします
copy_of_parameter = params[remote_index][param_i].copy()
# SMPCは整数でしか動作しない(少数が使えない)ので、"Fixed Precision"を使って整数化します
fixed_precision_param = copy_of_parameter.fix_precision()
# この行で暗号化を行います。fixed_precision_param既にポインタですが、
# shareをつかってリモートで暗号化を行う事ができます。戻り値のencrypted_paramももちろんポインタです。
encrypted_param = fixed_precision_param.share(bob, alice, crypto_provider=james)
# .get()を使って暗号化されたデータを取得しましょう。
param = encrypted_param.get()
# 後で集計(平均化)できるよう取得したデータを保存しておきます。
spdz_params.append(param)
# 各ワーカーから届いたパラメータの平均をとります。.get()でデータを取得し、その後データの型を
# 整数から少数へ戻しています。
new_param = (spdz_params[0] + spdz_params[1]).get().float_precision()/2
# 集計結果を保存しておきます
new_params.append(new_param)
In [ ]:
with torch.no_grad():
for model in params:
for param in model:
param *= 0
for model in models:
model.get()
for remote_index in range(len(compute_nodes)):
for param_index in range(len(params[remote_index])):
params[remote_index][param_index].set_(new_params[param_index])
In [ ]:
def train(epoch):
for data_index in range(len(remote_dataset[0])-1):
# リモートモデルをアップデート
for remote_index in range(len(compute_nodes)):
data, target = remote_dataset[remote_index][data_index]
models[remote_index] = update(data, target, models[remote_index], optimizers[remote_index])
# 暗号化された勾配ベクトルの平均を取る
new_params = list()
for param_i in range(len(params[0])):
spdz_params = list()
for remote_index in range(len(compute_nodes)):
spdz_params.append(params[remote_index][param_i].copy().fix_precision().share(bob, alice, crypto_provider=james).get())
new_param = (spdz_params[0] + spdz_params[1]).get().float_precision()/2
new_params.append(new_param)
# 綺麗に掃除します(事後処理)
with torch.no_grad():
for model in params:
for param in model:
param *= 0
for model in models:
model.get()
for remote_index in range(len(compute_nodes)):
for param_index in range(len(params[remote_index])):
params[remote_index][param_index].set_(new_params[param_index])
In [ ]:
def test():
models[0].eval()
test_loss = 0
for data, target in test_loader:
output = models[0](data)
test_loss += F.mse_loss(output.view(-1), target, reduction='sum').item() # sum up batch loss
pred = output.data.max(1, keepdim=True)[1] # get the index of the max log-probability
test_loss /= len(test_loader.dataset)
print('Test set: Average loss: {:.4f}\n'.format(test_loss))
In [ ]:
t = time.time()
for epoch in range(args.epochs):
print(f"Epoch {epoch + 1}")
train(epoch)
test()
total_time = time.time() - t
print('Total', round(total_time, 2), 's')
本チュートリアルを完了しました。おめでとうございます!もし、このチュートリアルを気に入って、プライバシーに配慮した非中央集権的なAI技術や付随する(データやモデルの)サプライチェーンにご興味があって、プロジェクトに参加したいと思われるなら、以下の方法で可能です。
一番簡単に貢献できる方法はこのGitHubのレポジトリにスターを付けていただくことです。スターが増えると露出が増え、より多くのデベロッパーにこのクールな技術の事を知って貰えます。
最新の開発状況のトラッキングする一番良い方法はSlackに入ることです。 下記フォームから入る事ができます。 http://slack.openmined.org
コミュニティに貢献する一番良い方法はソースコードのコントリビューターになることです。PySyftのGitHubへアクセスしてIssueのページを開き、"Projects"で検索してみてください。参加し得るプロジェクトの状況を把握することができます。また、"good first issue"とマークされているIssueを探す事でミニプロジェクトを探すこともできます。
もし、ソースコードで貢献できるほどの時間は取れないけど、是非何かサポートしたいという場合は、寄付をしていただくことも可能です。寄附金の全ては、ハッカソンやミートアップの開催といった、コミュニティ運営経費として利用されます。
In [ ]: