Using collective learning with pytorch
This tutorial is a simple guide to trying out the collective learning protocol with your own machine learning code. Everything runs locally.
The most flexible way to use the collective learning backends is to make a class that implements
the Collective Learning MachineLearningInterface
defined in ml_interface.py.
For more details on how to use the MachineLearningInterface
see here
However, the simpler way is to use one of the helper classes that we have provided that implement
most of the interface for popular ML libraries.
In this tutorial we are going to walk through using the PytorchLearner
.
First we are going to define the model architecture, then
we are going to load the data and configure the model, and then we will run Collective Learning.
A standard script for machine learning with Pytorch looks like the one below
# ------------------------------------------------------------------------------
#
# Copyright 2021 Fetch.AI Limited
#
# Licensed under the Creative Commons Attribution-NonCommercial International
# License, Version 4.0 (the "License"); you may not use this file except in
# compliance with the License. You may obtain a copy of the License at
#
# http://creativecommons.org/licenses/by-nc/4.0/legalcode
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# ------------------------------------------------------------------------------
from torchsummary import summary
from torchvision import transforms, datasets
import torch.utils.data
import torch.nn as nn
import torch.nn.functional as nn_func
# define some constants
batch_size = 64
seed = 42
n_rounds = 20
train_fraction = 0.9
learning_rate = 0.001
height = 28
width = 28
n_classes = 10
num_test_batches = 10
no_cuda = False
cuda = not no_cuda and torch.cuda.is_available()
device = torch.device("cuda" if cuda else "cpu")
kwargs = {'num_workers': 1, 'pin_memory': True} if cuda else {}
# Load the data
data = datasets.MNIST('/tmp/mnist', transform=transforms.ToTensor(), download=True)
n_train = int(train_fraction * len(data))
n_test = len(data) - n_train
train_data, test_data = torch.utils.data.random_split(data, [n_train, n_test])
train_dataloader = torch.utils.data.DataLoader(train_data, batch_size=batch_size, shuffle=True, **kwargs)
test_dataloader = torch.utils.data.DataLoader(test_data, batch_size=batch_size, shuffle=True, **kwargs)
# Define the model
class Net(nn.Module):
def __init__(self):
super(Net, self).__init__()
self.conv1 = nn.Conv2d(1, 20, 5, 1)
self.conv2 = nn.Conv2d(20, 50, 5, 1)
self.fc1 = nn.Linear(4 * 4 * 50, 500)
self.fc2 = nn.Linear(500, n_classes)
def forward(self, x):
x = nn_func.relu(self.conv1(x.view(-1, 1, height, width)))
x = nn_func.max_pool2d(x, 2, 2)
x = nn_func.relu(self.conv2(x))
x = nn_func.max_pool2d(x, 2, 2)
x = x.view(-1, 4 * 4 * 50)
x = nn_func.relu(self.fc1(x))
x = self.fc2(x)
return nn_func.log_softmax(x, dim=1)
model = Net()
opt = torch.optim.Adam(model.parameters(), lr=learning_rate)
criterion = torch.nn.NLLLoss()
# Train and evaluate the model
for round in range(n_rounds):
# train model
model.train()
for batch_idx, (data, labels) in enumerate(train_dataloader):
opt.zero_grad()
# Data needs to be on same device as model
data = data.to(device)
labels = labels.to(device)
output = model(data)
loss = criterion(output, labels)
loss.backward()
opt.step()
# evaluate model
model.eval()
total_score = 0
all_labels = []
all_outputs = []
with torch.no_grad():
for batch_idx, (data, labels) in enumerate(test_dataloader):
if batch_idx == num_test_batches:
break
data = data.to(device)
labels = labels.to(device)
output = model(data)
total_score += criterion(output, labels)
avg_loss = float(total_score / (num_test_batches * batch_size))
print(f"Average loss at round {round} is {avg_loss}")
- Load the data
- Define the model
- Train the model
In this tutorial we are going to see how to modify each step to use collective learning. We'll end up with code like this:
# ------------------------------------------------------------------------------
#
# Copyright 2021 Fetch.AI Limited
#
# Licensed under the Creative Commons Attribution-NonCommercial International
# License, Version 4.0 (the "License"); you may not use this file except in
# compliance with the License. You may obtain a copy of the License at
#
# http://creativecommons.org/licenses/by-nc/4.0/legalcode
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# ------------------------------------------------------------------------------
import os
from typing_extensions import TypedDict
import torch.nn as nn
import torch.nn.functional as nn_func
import torch.utils.data
from torchsummary import summary
from torchvision import transforms, datasets
from colearn.training import initial_result, collective_learning_round, set_equal_weights
from colearn.utils.plot import ColearnPlot
from colearn.utils.results import Results, print_results
from colearn_pytorch.utils import categorical_accuracy
from colearn_pytorch.pytorch_learner import PytorchLearner
"""
MNIST training example using PyTorch
Used dataset:
- MNIST is set of 60 000 black and white hand written digits images of size 28x28x1 in 10 classes
What script does:
- Loads MNIST dataset from torchvision.datasets
- Randomly splits dataset between multiple learners
- Does multiple rounds of learning process and displays plot with results
"""
# define some constants
n_learners = 5
batch_size = 64
testing_mode = bool(os.getenv("COLEARN_EXAMPLES_TEST", "")) # for testing
n_rounds = 20 if not testing_mode else 1
vote_threshold = 0.5
train_fraction = 0.9
vote_fraction = 0.05
learning_rate = 0.001
height = 28
width = 28
n_classes = 10
vote_batches = 2
score_name = "categorical accuracy"
no_cuda = False
cuda = not no_cuda and torch.cuda.is_available()
device = torch.device("cuda" if cuda else "cpu")
DataloaderKwargs = TypedDict('DataloaderKwargs', {'num_workers': int, 'pin_memory': bool}, total=False)
kwargs: DataloaderKwargs = {'num_workers': 1, 'pin_memory': True} if cuda else {}
# Load the data and split for each learner.
DATA_DIR = os.environ.get('PYTORCH_DATA_DIR',
os.path.expanduser(os.path.join('~', 'pytorch_datasets')))
data = datasets.MNIST(DATA_DIR, transform=transforms.ToTensor(), download=True)
n_train = int(train_fraction * len(data))
n_vote = int(vote_fraction * len(data))
n_test = len(data) - n_train - n_vote
train_data, vote_data, test_data = torch.utils.data.random_split(data, [n_train, n_vote, n_test])
data_split = [len(train_data) // n_learners] * n_learners
learner_train_data = torch.utils.data.random_split(train_data, data_split)
learner_train_dataloaders = [torch.utils.data.DataLoader(
ds,
batch_size=batch_size, shuffle=True, **kwargs) for ds in learner_train_data]
data_split = [len(vote_data) // n_learners] * n_learners
learner_vote_data = torch.utils.data.random_split(vote_data, data_split)
learner_vote_dataloaders = [torch.utils.data.DataLoader(
ds,
batch_size=batch_size, shuffle=True, **kwargs) for ds in learner_vote_data]
data_split = [len(test_data) // n_learners] * n_learners
learner_test_data = torch.utils.data.random_split(test_data, data_split)
learner_test_dataloaders = [torch.utils.data.DataLoader(
ds,
batch_size=batch_size, shuffle=True, **kwargs) for ds in learner_test_data]
# Define the model
class Net(nn.Module):
def __init__(self):
super(Net, self).__init__()
self.conv1 = nn.Conv2d(1, 20, 5, 1)
self.conv2 = nn.Conv2d(20, 50, 5, 1)
self.fc1 = nn.Linear(4 * 4 * 50, 500)
self.fc2 = nn.Linear(500, n_classes)
def forward(self, x):
x = nn_func.relu(self.conv1(x.view(-1, 1, height, width)))
x = nn_func.max_pool2d(x, 2, 2)
x = nn_func.relu(self.conv2(x))
x = nn_func.max_pool2d(x, 2, 2)
x = x.view(-1, 4 * 4 * 50)
x = nn_func.relu(self.fc1(x))
x = self.fc2(x)
return nn_func.log_softmax(x, dim=1)
# Make n instances of PytorchLearner with model and torch dataloaders
all_learner_models = []
for i in range(n_learners):
model = Net().to(device)
opt = torch.optim.Adam(model.parameters(), lr=learning_rate)
learner = PytorchLearner(
model=model,
train_loader=learner_train_dataloaders[i],
vote_loader=learner_vote_dataloaders[i],
test_loader=learner_test_dataloaders[i],
device=device,
optimizer=opt,
criterion=torch.nn.NLLLoss(),
num_test_batches=vote_batches,
vote_criterion=categorical_accuracy,
minimise_criterion=False
)
all_learner_models.append(learner)
# Ensure all learners starts with exactly same weights
set_equal_weights(all_learner_models)
summary(all_learner_models[0].model, input_size=(width, height), device=str(device))
# Train the model using Collective Learning
results = Results()
results.data.append(initial_result(all_learner_models))
plot = ColearnPlot(score_name=score_name)
for round_index in range(n_rounds):
results.data.append(
collective_learning_round(all_learner_models,
vote_threshold, round_index)
)
print_results(results)
plot.plot_results_and_votes(results)
plot.block()
print("Colearn Example Finished!")
The first thing is to modify the data loading code. Each learner needs to have their own training and testing set from the data. This is easy to do with the pytorch random_split utility:
data_split = [len(test_data) // n_learners] * n_learners
learner_test_data = torch.utils.data.random_split(test_data, data_split)
The model definition is the same as before.
To use collective learning, we need to create an object that implements the MachineLearningInterface.
To make it easier to use the MachineLearningInterface
with pytorch, we've defined PytorchLearner
.
PytorchLearner
implements standard training and evaluation routines as well as the MachineLearningInterface methods.
# ------------------------------------------------------------------------------
#
# Copyright 2021 Fetch.AI Limited
#
# Licensed under the Creative Commons Attribution-NonCommercial International
# License, Version 4.0 (the "License"); you may not use this file except in
# compliance with the License. You may obtain a copy of the License at
#
# http://creativecommons.org/licenses/by-nc/4.0/legalcode
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# ------------------------------------------------------------------------------
from typing import Optional, Callable
from collections import OrderedDict, defaultdict
try:
import torch
except ImportError:
raise Exception(
"Pytorch is not installed. To use the pytorch "
"add-ons please install colearn with `pip install colearn[pytorch]`."
)
import torch.nn
import torch.optim
import torch.utils
import torch.utils.data
from torch.nn.modules.loss import _Loss
from colearn.ml_interface import (
MachineLearningInterface,
Weights,
ProposedWeights,
ColearnModel,
convert_model_to_onnx,
ModelFormat,
DiffPrivBudget,
DiffPrivConfig,
TrainingSummary,
ErrorCodes,
)
from opacus import PrivacyEngine
_DEFAULT_DEVICE = torch.device("cpu")
class PytorchLearner(MachineLearningInterface):
"""
Pytorch learner implementation of machine learning interface
"""
def __init__(
self,
model: torch.nn.Module,
optimizer: torch.optim.Optimizer,
train_loader: torch.utils.data.DataLoader,
vote_loader: torch.utils.data.DataLoader,
test_loader: Optional[torch.utils.data.DataLoader] = None,
need_reset_optimizer: bool = True,
device=_DEFAULT_DEVICE,
criterion: Optional[_Loss] = None,
minimise_criterion=True,
vote_criterion: Optional[Callable[[torch.Tensor, torch.Tensor], float]] = None,
num_train_batches: Optional[int] = None,
num_test_batches: Optional[int] = None,
diff_priv_config: Optional[DiffPrivConfig] = None,
):
"""
:param model: Pytorch model used for training
:param optimizer: Training optimizer
:param train_loader: Train dataset
:param test_loader: Optional test dataset - subset of training set will be used if not specified
:param need_reset_optimizer: True to clear optimizer history before training, False to kepp history.
:param device: Pytorch device - CPU or GPU
:param criterion: Loss function
:param minimise_criterion: True to minimise value of criterion, False to maximise
:param vote_criterion: Function to measure model performance for voting
:param num_train_batches: Number of training batches
:param num_test_batches: Number of testing batches
:param diff_priv_config: Contains differential privacy (dp) budget related configuration
"""
# Model has to be on same device as data
self.model: torch.nn.Module = model.to(device)
self.optimizer: torch.optim.Optimizer = optimizer
self.criterion = criterion
self.train_loader: torch.utils.data.DataLoader = train_loader
self.vote_loader: torch.utils.data.DataLoader = vote_loader
self.test_loader: Optional[torch.utils.data.DataLoader] = test_loader
self.need_reset_optimizer = need_reset_optimizer
self.device = device
self.num_train_batches = num_train_batches or len(train_loader)
self.num_test_batches = num_test_batches
self.minimise_criterion = minimise_criterion
self.vote_criterion = vote_criterion
self.dp_config = diff_priv_config
self.dp_privacy_engine = PrivacyEngine()
if diff_priv_config is not None:
(
self.model,
self.optimizer,
self.train_loader,
) = self.dp_privacy_engine.make_private(
module=self.model,
optimizer=self.optimizer,
data_loader=self.train_loader,
max_grad_norm=diff_priv_config.max_grad_norm,
noise_multiplier=diff_priv_config.noise_multiplier,
)
self.vote_score = self.test(self.vote_loader)
def mli_get_current_weights(self) -> Weights:
"""
:return: The current weights of the model
"""
current_state_dict = OrderedDict()
for key in self.model.state_dict():
current_state_dict[key] = self.model.state_dict()[key].clone()
w = Weights(
weights=current_state_dict, training_summary=self.get_training_summary()
)
return w
def mli_get_current_model(self) -> ColearnModel:
"""
:return: The current model and its format
"""
return ColearnModel(
model_format=ModelFormat(ModelFormat.ONNX),
model_file="",
model=convert_model_to_onnx(self.model),
)
def set_weights(self, weights: Weights):
"""
Rewrites weight of current model
:param weights: Weights to be stored
"""
self.model.load_state_dict(weights.weights)
def reset_optimizer(self):
"""
Clear optimizer state, such as number of iterations, momentums.
This way, the outdated history can be erased.
"""
self.optimizer.__setstate__({"state": defaultdict(dict)})
def train(self):
"""
Trains the model on the training dataset
"""
if self.need_reset_optimizer:
# erase the outdated optimizer memory (momentums mostly)
self.reset_optimizer()
self.model.train()
for batch_idx, (data, labels) in enumerate(self.train_loader):
if batch_idx == self.num_train_batches:
break
self.optimizer.zero_grad()
# Data needs to be on same device as model
data = data.to(self.device)
labels = labels.to(self.device)
output = self.model(data)
loss = self.criterion(output, labels)
loss.backward()
self.optimizer.step()
def mli_propose_weights(self) -> Weights:
"""
Trains model on training set and returns new weights after training
- Current model is reverted to original state after training
:return: Weights after training
"""
current_weights = self.mli_get_current_weights()
training_summary = current_weights.training_summary
if (
training_summary is not None
and training_summary.error_code is not None
and training_summary.error_code == ErrorCodes.DP_BUDGET_EXCEEDED
):
return current_weights
self.train()
new_weights = self.mli_get_current_weights()
self.set_weights(current_weights)
training_summary = new_weights.training_summary
if (
training_summary is not None
and training_summary.error_code is not None
and training_summary.error_code == ErrorCodes.DP_BUDGET_EXCEEDED
):
current_weights.training_summary = training_summary
return current_weights
return new_weights
def mli_test_weights(self, weights: Weights) -> ProposedWeights:
"""
Tests given weights on training and test set and returns weights with score values
:param weights: Weights to be tested
:return: ProposedWeights - Weights with vote and test score
"""
current_weights = self.mli_get_current_weights()
self.set_weights(weights)
vote_score = self.test(self.vote_loader)
if self.test_loader:
test_score = self.test(self.test_loader)
else:
test_score = 0
vote = self.vote(vote_score)
self.set_weights(current_weights)
return ProposedWeights(
weights=weights, vote_score=vote_score, test_score=test_score, vote=vote
)
def vote(self, new_score) -> bool:
"""
Compares current model score with proposed model score and returns vote
:param new_score: Proposed score
:return: bool positive or negative vote
"""
if self.minimise_criterion:
return new_score < self.vote_score
else:
return new_score > self.vote_score
def test(self, loader: torch.utils.data.DataLoader) -> float:
"""
Tests performance of the model on specified dataset
:param loader: Dataset for testing
:return: Value of performance metric
"""
if not self.criterion:
raise Exception("Criterion is unspecified so test method cannot be used")
self.model.eval()
total_score = 0
all_labels = []
all_outputs = []
batch_idx = 0
total_samples = 0
with torch.no_grad():
for batch_idx, (data, labels) in enumerate(loader):
total_samples += labels.shape[0]
if self.num_test_batches and batch_idx == self.num_test_batches:
break
data = data.to(self.device)
labels = labels.to(self.device)
output = self.model(data)
if self.vote_criterion is not None:
all_labels.append(labels)
all_outputs.append(output)
else:
total_score += self.criterion(output, labels).item()
if batch_idx == 0:
raise Exception("No batches in loader")
if self.vote_criterion is None:
return float(total_score / total_samples)
else:
return self.vote_criterion(
torch.cat(all_outputs, dim=0), torch.cat(all_labels, dim=0)
)
def mli_accept_weights(self, weights: Weights):
"""
Updates the model with the proposed set of weights
:param weights: The new weights
"""
self.set_weights(weights)
self.vote_score = self.test(self.vote_loader)
def get_training_summary(self) -> Optional[TrainingSummary]:
"""
Differential Privacy Budget
:return: the target and consumed epsilon so far
"""
if self.dp_config is None:
return None
delta = self.dp_config.target_delta
target_epsilon = self.dp_config.target_epsilon
consumed_epsilon = self.dp_privacy_engine.get_epsilon(delta)
budget = DiffPrivBudget(
target_epsilon=target_epsilon,
consumed_epsilon=consumed_epsilon,
target_delta=delta,
consumed_delta=delta, # delta is constatnt per training
)
err = (
ErrorCodes.DP_BUDGET_EXCEEDED
if consumed_epsilon >= target_epsilon
else None
)
return TrainingSummary(
dp_budget=budget,
error_code=err,
)
We create a set of PytorchLearners by passing in the model and the datasets:
all_learner_models = []
for i in range(n_learners):
model = Net()
opt = torch.optim.Adam(model.parameters(), lr=learning_rate)
learner = PytorchLearner(
model=model,
train_loader=learner_train_dataloaders[i],
vote_loader=learner_vote_dataloaders[i],
test_loader=learner_test_dataloaders[i],
device=device,
optimizer=opt,
criterion=torch.nn.NLLLoss(),
num_test_batches=vote_batches,
vote_criterion=categorical_accuracy,
minimise_criterion=False
)
all_learner_models.append(learner)
Then we give all the models the same weights to start off with:
And then we can move on to the final stage, which is training with Collective Learning.
The function collective_learning_round
performs one round of collective learning.
One learner is selected to train and propose an update.
The other learners vote on the update, and if the vote passes then the update is accepted.
Then a new round begins.
# Train the model using Collective Learning
results = Results()
results.data.append(initial_result(all_learner_models))
for round in range(n_rounds):
results.data.append(
collective_learning_round(all_learner_models,
vote_threshold, round)
)
plot_results(results, n_learners, score_name=score_name)
plot_votes(results)
# Plot the final result with votes
plot_results(results, n_learners, score_name=score_name)
plot_votes(results, block=True)