-
-
Notifications
You must be signed in to change notification settings - Fork 0
/
main.py
150 lines (123 loc) · 4.75 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
from pathlib import Path
from syftbox.lib import Client
import os
import torch
import torch.nn as nn
from torch.utils.data import DataLoader, TensorDataset
import re
import json
class SimpleNN(nn.Module):
def __init__(self):
super(SimpleNN, self).__init__()
self.fc1 = nn.Linear(28 * 28, 128)
self.fc2 = nn.Linear(128, 10)
def forward(self, x):
x = x.view(-1, 28 * 28)
x = torch.relu(self.fc1(x))
x = self.fc2(x)
return x
def get_model_file(path: Path):
model_files = []
entries = os.listdir(path)
pattern = r"^pretrained_mnist_label_[0-9]\.pt$"
for entry in entries:
if re.match(pattern, entry):
model_files.append(entry)
return model_files[0] if model_files else None
def aggregate_model(
participants: list[str], datasite_path: Path, global_model_path: Path
):
global_model = SimpleNN()
global_model_state_dict = global_model.state_dict()
aggregated_model_weights = {}
n_peers = len(participants)
aggregated_peers = []
for user_folder in participants:
public_folder_path: Path = Path(datasite_path) / user_folder / "public"
model_file = get_model_file(public_folder_path)
if model_file is None:
continue
model_file = public_folder_path / model_file
aggregated_peers.append(user_folder)
user_model_state = torch.load(str(model_file))
for key in global_model_state_dict.keys():
# If user model has a different architecture than my global model.
# Skip it
if user_model_state.keys() != global_model_state_dict.keys():
continue
if aggregated_model_weights.get(key, None) is None:
aggregated_model_weights[key] = user_model_state[key] * (1 / n_peers)
else:
aggregated_model_weights[key] += user_model_state[key] * (1 / n_peers)
if aggregated_model_weights:
print(f"Aggregated models from {aggregated_peers}")
global_model.load_state_dict(aggregated_model_weights)
torch.save(global_model.state_dict(), str(global_model_path))
return global_model
else:
return None
def network_participants(datasite_path: Path, participants_json_file_path: Path):
exclude_dir = ["apps", ".syft"]
entries = os.listdir(datasite_path)
with open(participants_json_file_path, "r") as f:
participants = json.load(f)
participants = participants["participants"]
all_users = []
for entry in entries:
user_path = Path(datasite_path / entry)
is_excluded_dir = entry in exclude_dir
is_valid_peer = user_path.is_dir() and not is_excluded_dir
if is_valid_peer:
all_users.append(entry)
participants = list(set(participants) & set(all_users))
missing_participants = list(set(participants) - set(all_users))
print(f"Pretrained model aggregator participants: {participants}")
return participants
def evaluate_global_model(global_model: nn.Module, dataset_path: Path) -> float:
global_model.eval()
# load the saved mnist subset
images, labels = torch.load(str(dataset_path))
# create a tensordataset
dataset = TensorDataset(images, labels)
# create a dataloader for the dataset
data_loader = DataLoader(dataset, batch_size=64, shuffle=True)
# dataset = torch.load(str(dataset_path))
# data_loader = torch.utils.data.DataLoader(dataset, batch_size=64, shuffle=False)
correct = 0
total = 0
with torch.no_grad():
for images, labels in data_loader:
outputs = global_model(images)
_, predicted = torch.max(outputs.data, 1)
total += labels.size(0)
correct += (predicted == labels).sum().item()
accuracy = 100 * correct / total
return accuracy
if __name__ == "__main__":
client = Client.load()
participants = network_participants(
client.datasite_path.parent, Path("participants.json")
)
global_model = None
global_model = aggregate_model(
participants,
client.datasite_path.parent,
client.datasite_path / "public" / "global_model.pth",
)
if global_model:
dataset_path = "./mnist_dataset.pt"
accuracy = evaluate_global_model(global_model, dataset_path)
print(f"Global model accuracy: {accuracy:.2f}%")
else:
print("No models to aggregate")
output_dir: Path = (
Path(client.datasite_path) / "app_pipelines" / "model_aggregator"
)
if not output_dir.is_dir():
os.mkdir(str(output_dir))
with open(str(output_dir) + "/results.json", "w") as json_file:
json.dump(
{"accuracy": accuracy, "participants": participants},
json_file,
indent=4,
)