現在、"Part B: 暗号化されたデータを使っての集計"は動きません。

"Part B: 暗号化されたデータを使っての集計"の実行にはnumpyが必要なのですが、numpyはPyTorchのhookの機能をもっておらず、numpyを拡張させたオブジェクト等も準備が出来ていないため、動きません。

詳細はこちらで確認することができます。 https://github.com/OpenMined/PySyft/issues/2771.

"Part B"が動くようになり次第、このセルは削除される予定です。

Part 10: 暗号化された勾配ベクトルを集計してのFederated Learning

前回のセクションでは、暗号化されたデータでのコンピューテーションを学びました。本セクションでは、Part 4で学んだFederated Learning Demo of Part 4へ戻りましょう。複数のワーカーからのモデルのアップデートの集計を担当する信頼できる集計者、"trusted aggregator"を紹介したチュートリアルです。

今回は新しい手法を採用することで、信頼できる集計者を使わないくても良いやり方を紹介します。いくら信頼できるとはいえ、誰かにデリケートな情報へのアクセス権を与えてしまうというのはベストとは言えないですよね。

と言う事で、このnotebookでは、SMPCを使う事で信頼できる集計者、"trusted aggregator"無しでセキュアに集計が出来る手法を紹介します。

Authors:

Section 1: 基本的なFederated Learning

これは、Boston Housing Datasetを題材としたベーシックなFederated Learningの例です。本セクションのコードはいくつかのサブセクションへ別れています。

セットアップ


In [ ]:
import pickle

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import TensorDataset, DataLoader

class Parser:
    """ハイパーパラメータ"""
    def __init__(self):
        self.epochs = 10
        self.lr = 0.001
        self.test_batch_size = 8
        self.batch_size = 8
        self.log_interval = 10
        self.seed = 1
    
args = Parser()

torch.manual_seed(args.seed)
kwargs = {}

データセットのロード


In [ ]:
with open('../data/BostonHousing/boston_housing.pickle','rb') as f:
    ((X, y), (X_test, y_test)) = pickle.load(f)

X = torch.from_numpy(X).float()
y = torch.from_numpy(y).float()
X_test = torch.from_numpy(X_test).float()
y_test = torch.from_numpy(y_test).float()
# 前処理
mean = X.mean(0, keepdim=True)
dev = X.std(0, keepdim=True)
mean[:, 3] = 0. # 3番目のカラムはバイナリ,
dev[:, 3] = 1.  # なので、ノーマライズはしません
X = (X - mean) / dev
X_test = (X_test - mean) / dev
train = TensorDataset(X, y)
test = TensorDataset(X_test, y_test)
train_loader = DataLoader(train, batch_size=args.batch_size, shuffle=True, **kwargs)
test_loader = DataLoader(test, batch_size=args.test_batch_size, shuffle=True, **kwargs)

ニューラルネットワークの構造


In [ ]:
class Net(nn.Module):
    def __init__(self):
        super(Net, self).__init__()
        self.fc1 = nn.Linear(13, 32)
        self.fc2 = nn.Linear(32, 24)
        self.fc3 = nn.Linear(24, 1)

    def forward(self, x):
        x = x.view(-1, 13)
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = self.fc3(x)
        return x

model = Net()
optimizer = optim.SGD(model.parameters(), lr=args.lr)

PyTorchをHook(PyTorchを拡張)


In [ ]:
import syft as sy

hook = sy.TorchHook(torch)
bob = sy.VirtualWorker(hook, id="bob")
alice = sy.VirtualWorker(hook, id="alice")
james = sy.VirtualWorker(hook, id="james")

compute_nodes = [bob, alice]

データをワーカーへ送る 本来であれば、彼らこそがデータを元々もっているはずで、こちらから送るのはあべこべですが、チュートリアルをスムーズに進めるためには仕方がありませんね。


In [ ]:
train_distributed_dataset = []

for batch_idx, (data,target) in enumerate(train_loader):
    data = data.send(compute_nodes[batch_idx % len(compute_nodes)])
    target = target.send(compute_nodes[batch_idx % len(compute_nodes)])
    train_distributed_dataset.append((data, target))

トレーニング関数


In [ ]:
def train(epoch):
    model.train()
    for batch_idx, (data,target) in enumerate(train_distributed_dataset):
        worker = data.location
        model.send(worker)

        optimizer.zero_grad()
        # update the model
        pred = model(data)
        loss = F.mse_loss(pred.view(-1), target)
        loss.backward()
        optimizer.step()
        model.get()
            
        if batch_idx % args.log_interval == 0:
            loss = loss.get()
            print('Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}'.format(
                epoch, batch_idx * data.shape[0], len(train_loader),
                       100. * batch_idx / len(train_loader), loss.item()))

テスト関数


In [ ]:
def test():
    model.eval()
    test_loss = 0
    for data, target in test_loader:
        output = model(data)
        test_loss += F.mse_loss(output.view(-1), target, reduction='sum').item() # sum up batch loss
        pred = output.data.max(1, keepdim=True)[1] # get the index of the max log-probability
        
    test_loss /= len(test_loader.dataset)
    print('\nTest set: Average loss: {:.4f}\n'.format(test_loss))

モデルをトレーニング


In [ ]:
import time

In [ ]:
t = time.time()

for epoch in range(1, args.epochs + 1):
    train(epoch)

    
total_time = time.time() - t
print('Total', round(total_time, 2), 's')

性能評価


In [ ]:
test()

Section 2: 暗号化した勾配ベクトルで集計を行う

ここで、暗号化した勾配ベクトルで集計を行えるよう、サンプルのコードを少しだけ変更しましょう。変更点はtrain()関数内の1〜2行です。では、再度データやモデルを初期化してBobとAliceに分配しましょう。


In [ ]:
remote_dataset = (list(),list())

train_distributed_dataset = []

for batch_idx, (data,target) in enumerate(train_loader):
    data = data.send(compute_nodes[batch_idx % len(compute_nodes)])
    target = target.send(compute_nodes[batch_idx % len(compute_nodes)])
    remote_dataset[batch_idx % len(compute_nodes)].append((data, target))

def update(data, target, model, optimizer):
    model.send(data.location)
    optimizer.zero_grad()
    pred = model(data)
    loss = F.mse_loss(pred.view(-1), target)
    loss.backward()
    optimizer.step()
    return model

bobs_model = Net()
alices_model = Net()

bobs_optimizer = optim.SGD(bobs_model.parameters(), lr=args.lr)
alices_optimizer = optim.SGD(alices_model.parameters(), lr=args.lr)

models = [bobs_model, alices_model]
params = [list(bobs_model.parameters()), list(alices_model.parameters())]
optimizers = [bobs_optimizer, alices_optimizer]

トレーニングロジックの変更

唯一の変更点はこのトレーニング関数の中にあります。一行、一行見ていきましょう。

Part A: 学習:


In [ ]:
# トレーニングを行うバッチを選択します。
data_index = 0
# リモートモデルをアップデートします
# 次のステップへ進む前に何度か繰り返しても良いのですが、ここではワーカー事1回です。
for remote_index in range(len(compute_nodes)):
    data, target = remote_dataset[remote_index][data_index]
    models[remote_index] = update(data, target, models[remote_index], optimizers[remote_index])

Part B: 暗号化された勾配ベクトルの集計


In [ ]:
# 平均化された暗号化されたモデルを保存するリストを作成しておきます
new_params = list()

In [ ]:
compute_nodes

In [ ]:
# 全てのパラメータに対して繰り返します
for param_i in range(len(params[0])):

    # 全てのワーカーに対して
    spdz_params = list()
    for remote_index in range(len(compute_nodes)):
        
        # 特定ワーカーの特定のモデルパラメータをコピーします
        copy_of_parameter = params[remote_index][param_i].copy()
        
        # SMPCは整数でしか動作しない(少数が使えない)ので、"Fixed Precision"を使って整数化します
        fixed_precision_param = copy_of_parameter.fix_precision()
        
        # この行で暗号化を行います。fixed_precision_param既にポインタですが、
        # shareをつかってリモートで暗号化を行う事ができます。戻り値のencrypted_paramももちろんポインタです。
        encrypted_param = fixed_precision_param.share(bob, alice, crypto_provider=james)
        
        # .get()を使って暗号化されたデータを取得しましょう。
        param = encrypted_param.get()
        
        # 後で集計(平均化)できるよう取得したデータを保存しておきます。
        spdz_params.append(param)

    
    # 各ワーカーから届いたパラメータの平均をとります。.get()でデータを取得し、その後データの型を
    # 整数から少数へ戻しています。
    new_param = (spdz_params[0] + spdz_params[1]).get().float_precision()/2
    
    # 集計結果を保存しておきます
    new_params.append(new_param)

Part C: 事後処理


In [ ]:
with torch.no_grad():
    for model in params:
        for param in model:
            param *= 0

    for model in models:
        model.get()

    for remote_index in range(len(compute_nodes)):
        for param_index in range(len(params[remote_index])):
            params[remote_index][param_index].set_(new_params[param_index])

それでは、上記を一つに纏めましょう

上記のステップを一つに纏めて、トレーニングループを完成させましょう。


In [ ]:
def train(epoch):
    for data_index in range(len(remote_dataset[0])-1):
        # リモートモデルをアップデート
        for remote_index in range(len(compute_nodes)):
            data, target = remote_dataset[remote_index][data_index]
            models[remote_index] = update(data, target, models[remote_index], optimizers[remote_index])

        # 暗号化された勾配ベクトルの平均を取る
        new_params = list()
        for param_i in range(len(params[0])):
            spdz_params = list()
            for remote_index in range(len(compute_nodes)):
                spdz_params.append(params[remote_index][param_i].copy().fix_precision().share(bob, alice, crypto_provider=james).get())

            new_param = (spdz_params[0] + spdz_params[1]).get().float_precision()/2
            new_params.append(new_param)

        # 綺麗に掃除します(事後処理)
        with torch.no_grad():
            for model in params:
                for param in model:
                    param *= 0

            for model in models:
                model.get()

            for remote_index in range(len(compute_nodes)):
                for param_index in range(len(params[remote_index])):
                    params[remote_index][param_index].set_(new_params[param_index])

In [ ]:
def test():
    models[0].eval()
    test_loss = 0
    for data, target in test_loader:
        output = models[0](data)
        test_loss += F.mse_loss(output.view(-1), target, reduction='sum').item() # sum up batch loss
        pred = output.data.max(1, keepdim=True)[1] # get the index of the max log-probability
        
    test_loss /= len(test_loader.dataset)
    print('Test set: Average loss: {:.4f}\n'.format(test_loss))

In [ ]:
t = time.time()

for epoch in range(args.epochs):
    print(f"Epoch {epoch + 1}")
    train(epoch)
    test()

    
total_time = time.time() - t
print('Total', round(total_time, 2), 's')

おめでとうございます!コミュニティに入ろう!

本チュートリアルを完了しました。おめでとうございます!もし、このチュートリアルを気に入って、プライバシーに配慮した非中央集権的なAI技術や付随する(データやモデルの)サプライチェーンにご興味があって、プロジェクトに参加したいと思われるなら、以下の方法で可能です。

PySyftのGitHubレポジトリにスターをつける

一番簡単に貢献できる方法はこのGitHubのレポジトリにスターを付けていただくことです。スターが増えると露出が増え、より多くのデベロッパーにこのクールな技術の事を知って貰えます。

Slackに入る

最新の開発状況のトラッキングする一番良い方法はSlackに入ることです。 下記フォームから入る事ができます。 http://slack.openmined.org

コードプロジェクトに参加する

コミュニティに貢献する一番良い方法はソースコードのコントリビューターになることです。PySyftのGitHubへアクセスしてIssueのページを開き、"Projects"で検索してみてください。参加し得るプロジェクトの状況を把握することができます。また、"good first issue"とマークされているIssueを探す事でミニプロジェクトを探すこともできます。

寄付

もし、ソースコードで貢献できるほどの時間は取れないけど、是非何かサポートしたいという場合は、寄付をしていただくことも可能です。寄附金の全ては、ハッカソンやミートアップの開催といった、コミュニティ運営経費として利用されます。

OpenMined's Open Collective Page


In [ ]: