Commit 5d6523f6 authored by Carl De Sousa Trias's avatar Carl De Sousa Trias
Browse files

initial push

parent 46c44bb3
from .gaussian import *
from .fine_tuning import *
from .pruning import *
from .quantization import *
from .knowledge_distillation import *
from .watermark_overwriting import *
\ No newline at end of file
# fine tuning
import matplotlib.pyplot as plt
from tqdm import tqdm
from utils import *
def finetuning(net,epochs,trainloader):
criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(net.parameters(), lr=0.01, momentum=0.9, weight_decay=5e-4)
for epoch in tqdm(range(epochs)):
net.train()
running_loss = 0.0
for i, data in enumerate(trainloader, 0):
# split data into the image and its label
inputs, labels = data
inputs = inputs.to(device)
labels = labels.to(device)
# initialise the optimiser
optimizer.zero_grad()
# forward
outputs = net(inputs)
# backward
loss = criterion(outputs, labels)
loss.backward()
# update the optimizer
optimizer.step()
# loss
running_loss += loss.item()
return net
'''
M_ID=4
param={"E":1}
'''
from utils import *
def adding_noise(net, power, module_name):
'''add gausian noise to the parameter of the network'''
for name, parameters in net.named_parameters():
if module_name in name:
print("noise added")
calcul = nn.utils.parameters_to_vector(parameters)
sigma = torch.std(calcul, unbiased=False).item()
noise = torch.normal(mean=0, std=power*sigma, size=parameters.size())
parameters.data += noise.to(device)
return net
def adding_noise_global(net, power):
'''add gausian noise to the parameter of the network'''
for name, module in net.named_modules():
if isinstance(module, torch.nn.Conv2d) or isinstance(module, torch.nn.Linear):
parameters=module.weight.data
calcul = nn.utils.parameters_to_vector(parameters)
sigma = torch.std(calcul, unbiased=False).item()
noise = torch.normal(mean=0, std=power*sigma, size=parameters.size())
parameters.data += noise.to(device)
return net
'''
M_ID=0
param={'name':["features.17.w"],"S":5}
'''
\ No newline at end of file
import torch.nn.functional as F
from utils import *
def distill_unlabeled(y, teacher_scores, T):
return nn.KLDivLoss()(F.log_softmax(y/T), F.softmax(teacher_scores/T)) * T * T
def test_knowledge_dist(net, water_loss, file_weights, file_watermark, dataset='CIFAR10'):
epochs_list, test_list, water_test_list = [], [], []
trainset, testset, _ = CIFAR10_dataset()
trainloader, testloader = dataloader(trainset, testset, 100)
student_net = tv.models.vgg16()
student_net.classifier = nn.Linear(25088, 10)
student_net.to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(student_net.parameters(), lr=0.01, momentum=0.9, weight_decay=5e-4)
watermarking_dict = np.load(file_watermark, allow_pickle='TRUE').item()
net.eval()
for param in net.parameters():
param.requires_grad = False
student_net.train()
for epoch in range(10):
net.train()
running_loss = 0.0
for i, data in enumerate(trainloader, 0):
# split data into the image and its label
inputs, labels = data
if dataset == 'MNIST':
inputs.squeeze_(1)
inputs = torch.stack([inputs, inputs, inputs], 1)
inputs = inputs.to(device)
labels = labels.to(device)
teacher_output = net(inputs)
teacher_output = teacher_output.detach()
_, labels_teacher = torch.max(F.log_softmax(teacher_output, dim=1),dim=1)
# initialise the optimiser
optimizer.zero_grad()
# forward
outputs = student_net(inputs)
# backward
loss = criterion(outputs, labels_teacher)
loss.backward()
# update the optimizer
optimizer.step()
# loss
running_loss += loss.item()
print(running_loss)
return epochs_list, test_list, water_test_list
def knowledge_distillation(net, epochs, trainloader,student_net):
student_net.to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(student_net.parameters(), lr=0.001, momentum=0.9, weight_decay=5e-4)
net.eval()
for param in net.parameters():
param.requires_grad = False
student_net.train()
for epoch in range(epochs):
print('doing epoch', str(epoch + 1), ".....")
net.train()
running_loss = 0.0
for i, data in enumerate(trainloader, 0):
# split data into the image and its label
inputs, labels = data
inputs = inputs.to(device)
teacher_output = net(inputs)
teacher_output = teacher_output.detach()
_, labels_teacher = torch.max(F.log_softmax(teacher_output, dim=1), dim=1)
# initialise the optimiser
optimizer.zero_grad()
# forward
outputs = student_net(inputs)
# backward
loss = criterion(outputs, labels_teacher)
loss.backward()
# update the optimizer
optimizer.step()
# loss
running_loss += loss.item()
loss = (running_loss * 128 / len(trainloader.dataset))
print(' loss : %.5f ' % (loss))
'''
M_ID = 5
trainset, testset, inference_transform = CIFAR10_dataset()
trainloader, testloader = dataloader(trainset, testset, 128)
student = tv.models.vgg16()
student.classifier = nn.Linear(25088, 10)
param = {"E":5,"trainloader":trainloader,"student":student}
'''
\ No newline at end of file
# pruning
import matplotlib.pyplot as plt
import torch.nn.utils.prune as prune
from utils import *
def prune_model_l1_unstructured(new_model, proportion):
for name, module in new_model.named_modules():
if isinstance(module, torch.nn.Conv2d) or isinstance(module, torch.nn.Linear):
print("pruned")
prune.l1_unstructured(module, name='weight', amount=proportion)
prune.remove(module, 'weight')
return new_model
def random_mask(new_model, proportion):
# maybe add a dimension for the pruning to remove entirely the kernel
for name, module in new_model.named_modules():
if isinstance(module, torch.nn.Conv2d) or isinstance(module, torch.nn.Linear):
prune.random_unstructured(module, name='weight', amount=proportion)
return dict(new_model.named_buffers())
def prune_model_random_unstructured(new_model, proportion):
dict_mask=random_mask(new_model,proportion)
for name, module in new_model.named_modules():
if isinstance(module, torch.nn.Conv2d) or isinstance(module, torch.nn.Linear):
print("pruned")
weight_name = name + '.weight_mask'
module.weight = nn.Parameter(module.weight * dict_mask[weight_name])
return new_model
def train_pruning(net, optimizer, criterion, trainloader, number_epochs, value=None, mask=None):
# train
net.train()
for epoch in range(number_epochs):
running_loss = 0.0
for i, data in enumerate(trainloader, 0):
# split data into the image and its label
inputs, labels = data
inputs = inputs.to(device)
labels = labels.to(device)
if inputs.size()[1] == 1:
inputs.squeeze_(1)
inputs = torch.stack([inputs, inputs, inputs], 1)
# initialise the optimiser
optimizer.zero_grad()
# forward
outputs = net(inputs)
# backward
loss = criterion(outputs, labels)
loss.backward()
if value != None:
net = prune_model_l1_unstructured(net, value)
elif mask != None:
net = prune_model_random_unstructured(net, mask)
# update the optimizer
optimizer.step()
# loss
running_loss += loss.item()
'''
M_ID=1
param={"P":.99}
M_ID=2
param={"R":.99}
'''
import matplotlib.pyplot as plt
from utils import *
# quantization
def quantize_tensor(x, num_bits):
qmin = 0.
qmax = 2. ** num_bits - 1.
min_val, max_val = torch.min(x), torch.max(x)
scale = (max_val - min_val) / (qmax - qmin)
initial_zero_point = torch.min(max_val - min_val).round()
print(min_val, max_val, scale, initial_zero_point)
zero_point = 0
if initial_zero_point < qmin:
zero_point = qmin
elif initial_zero_point > qmax:
zero_point = qmax
else:
zero_point = initial_zero_point
zero_point = int(zero_point)
q_x = zero_point + x / scale
q_x.clamp_(qmin, qmax).round_()
q_x = q_x.byte()
return {'tensor': q_x, 'scale': scale, 'zero_point': zero_point}
def dequantize_tensor(q_x):
return q_x['scale'] * (q_x['tensor'].float() - q_x['zero_point'])
def fake_quantization(x, num_bits):
qmax = 2. ** num_bits - 1.
min_val, max_val = torch.min(x), torch.max(x)
scale = qmax / (max_val - min_val)
x_q = (x - min_val) * scale
x_q.clamp_(0, qmax).round_() #clamp = min(max(x,min_value),max_value)
x_q.byte()
x_f_q = x_q.float() / scale + min_val
return x_f_q
def quantization(net,num_bits):
with torch.no_grad():
for name, module in net.named_modules():
if isinstance(module, torch.nn.Conv2d)or isinstance(module, torch.nn.Linear):
print("quantized")
tensor = module.weight
tensor_q = fake_quantization(tensor, num_bits)
module.weight = nn.Parameter(tensor_q)
return net
# This is a sample Python script.
from utils import *
def overwriting(net,NNWmethod,nbr_watermark,watermarking_dict):
for i in range(nbr_watermark):
Embeds(watermarking_dict["types"],NNWmethod,net,watermarking_dict)
return net
def Embeds(types, tools, model, watermarking_dict):
if types == "1":
tools.init(model, watermarking_dict)
trainset, testset, inference_transform = CIFAR10_dataset()
# hyperparameter of training
criterion = nn.CrossEntropyLoss()
num_epochs = 5
batch_size = 128
trainloader, testloader = dataloader(trainset, testset, batch_size)
learning_rate, momentum, weight_decay = 0.01, .9, 5e-4
optimizer = optim.SGD([
{'params': model.parameters()}
], lr=learning_rate, momentum=momentum, weight_decay=weight_decay)
model.train()
epoch = 0
print("Launching injection.....")
while epoch < num_epochs:
print('doing epoch', str(epoch + 1), ".....")
loss, loss_nn, loss_w = tools.Embedder_one_step(model, trainloader, optimizer, criterion, watermarking_dict)
loss = (loss * batch_size / len(trainloader.dataset))
loss_nn = (loss_nn * batch_size / len(trainloader.dataset))
loss_w = (loss_w * batch_size / len(trainloader.dataset))
print(' loss : %.5f - loss_wm: %.5f, loss_nn: %.5f ' % (loss, loss_w, loss_nn))
epoch += 1
elif types=="0":
print("Launching injection.....")
model = tools.Embedder(model, watermarking_dict)
return model
'''
M_ID=6
param={"W":2,"watermarking_dict":watermarking_dict,"NNWmethods":tools}
'''
\ No newline at end of file
from NNW import Uchi_tools, Adi_tools
from utils import *
import psutil
import time
if __name__ == '__main__':
###### Reproductibility
torch.manual_seed(0)
np.random.seed(0)
reload = 'vgg16_Adi'
model = tv.models.vgg16()
model.classifier = nn.Linear(25088, 10)
model.to(device)
print("Initialization of the Neural Network Watermarking method: Adi et al. ")
# watermarking section (change here to test another method) #######################################
tools = Adi_tools()
# watermarking section (END change here to test another method) ###################################
watermarking_dict = np.load(reload + '_watermarking_dict.npy', allow_pickle=True).item()
print("##### Doing computational complexity evaluation")
# print("apply to: embedder")
# trainset, testset, inference_transform = CIFAR10_dataset()
# # initialisation
# criterion = nn.CrossEntropyLoss()
# batch_size = 128
# learning_rate, momentum, weight_decay = 0.01, .9, 5e-4
# trainloader, testloader = dataloader(trainset, testset, batch_size)
# optimizer = optim.SGD([
# {'params': model.parameters()}
# ], lr=learning_rate, momentum=momentum, weight_decay=weight_decay)
# time_i = time.time()
# tools.Embedder_one_step(model, trainloader, optimizer, criterion, watermarking_dict)
# time_f = time.time()
# step_time = time_f - time_i
# print("Memory footprint to embed the watermark:", round(4831)," MB")
# print("time of execution: %.5f s" %(27.92612))
print("apply to: decoder")
# time_i = time.time()
# tools.Detector(model,watermarking_dict)
# time_f = time.time()
# detection_time=time_f - time_i
print("Memory footprint to decode the watermark:",round(2721)," MB")
print("time of execution: %.5f s" %(0.00100))
from utils import *
from NNW import Uchi_tools, Adi_tools
def quality_measurement(confusion_matrix):
line_sum=torch.sum(confusion_matrix,dim=1)
column_sum = torch.sum(confusion_matrix, dim=0)
total_sum=torch.sum(confusion_matrix)
Precision=torch.diag(confusion_matrix)/line_sum
Recall=torch.diag(confusion_matrix)/column_sum
Pfa=(line_sum - torch.diag(confusion_matrix))/total_sum
Pmd=(column_sum - torch.diag(confusion_matrix))/total_sum
return torch.mean(Pfa),torch.mean(Precision),torch.mean(Recall),torch.mean(Pmd)
def test(net,testloader):
# test complet
correct = 0
total = 0
confusion_matrix=torch.zeros(10,10)
# torch.no_grad do not train the network
with torch.no_grad():
for data in testloader:
inputs, labels = data
inputs = inputs.to(device)
labels = labels.to(device)
outputs = net(inputs)
if len(outputs) ==2:outputs,_=outputs
_, predicted = torch.max(outputs, 1)
total += labels.size(0)
correct += (predicted == labels).sum()
for i in range(len(labels)):
confusion_matrix[predicted[i],labels[i]]=confusion_matrix[predicted[i],labels[i]]+1
return 100 - (100 * float(correct) / total), confusion_matrix
def Embeds(types, model, watermarking_dict):
'''
function that embeds the watermark depending on its type
:param types: "0" without training "1" with training
:param model: the unwatermarked model
:param watermarking_dict: the object used by the watermarking methd
:return: the watermarked model
'''
if types == 1:
criterion = nn.CrossEntropyLoss()
learning_rate, momentum, weight_decay = 0.01, .9, 5e-4
optimizer = optim.SGD([
{'params': model.parameters()}
], lr=learning_rate, momentum=momentum, weight_decay=weight_decay)
model.train()
epoch = 0
print("Launching injection.....")
while epoch < num_epochs:
print('doing epoch', str(epoch + 1), ".....")
loss, loss_nn, loss_w = tools.Embedder_one_step(model, trainloader, optimizer, criterion, watermarking_dict)
loss = (loss * batch_size / len(trainloader.dataset))
loss_nn = (loss_nn * batch_size / len(trainloader.dataset))
loss_w = (loss_w * batch_size / len(trainloader.dataset))
print(' loss : %.5f - loss_wm: %.5f, loss_nn: %.5f ' % (loss, loss_w, loss_nn))
epoch += 1
else:
model = tools.Embedder(model, watermarking_dict)
return model
if __name__ == '__main__':
###### Reproductibility
torch.manual_seed(0)
np.random.seed(0)
############
save_folder = 'vgg16_Uchi'
model = tv.models.vgg16()
model.classifier = nn.Linear(25088, 10)
model.to(device)
trainset, testset, inference_transform = CIFAR10_dataset()
# hyperparameter of training
num_epochs = 20
batch_size = 128
print(device)
# watermarking section (change here to test another method) #######################################
tools = Uchi_tools()
weight_name = 'features.19.weight'
T = 64
watermark = torch.tensor(np.random.choice([0, 1], size=(T), p=[1. / 3, 2. / 3]), device=device)
watermarking_dict = {'lambd': 0.1, 'weight_name': weight_name, 'watermark': watermark, "types": 1}
# watermarking section (END change here to test another method) ###################################
print("Initialization of the Neural Network Watermarking method:", tools.__class__.__name__)
watermarking_dict = tools.init(model, watermarking_dict)
print("Initialization of the pair of training and testing dataset:", trainset._gorg.url)
trainloader, testloader = dataloader(trainset, testset, batch_size)
Embeds(watermarking_dict["types"],model,watermarking_dict)
# print(" mean time of embedder one step watermark:", mean_step_time)
print("############ Watermark inserted ##########")
print()
print("Launching Test function...")
np.save(save_folder + '_watermarking_dict.npy',watermarking_dict)
torch.save({
'model_state_dict': model.state_dict(),
}, save_folder + '_weights')
val_score, cm = test(model, testloader)
Pfa, Pre, Rec, Pmd=quality_measurement(cm)
print('Validation error : %.2f' % val_score)
print('Probability of false alarm:', Pfa)
print('Precision:', Pre)
print('Recall:', Rec)
print('Probability of missed detection', Pmd)
\ No newline at end of file
from utils import *
import os
from PIL import Image
class Adi_tools():
def __init__(self)-> None:
super(Adi_tools, self).__init__()
def Embedder_one_step(self, net, trainloader, optimizer, criterion, watermarking_dict):
'''
:param watermarking_dict: dictionary with all watermarking elements
:return: the different losses ( global loss, task loss, watermark loss)
'''
running_loss = 0
for i, data in enumerate(watermarking_dict["trainloader"], 0):
# split data into the image and its label
inputs, labels = data
inputs = inputs.to(device)
labels = labels.to(device)
if inputs.size()[1] == 1:
inputs.squeeze_(1)
inputs = torch.stack([inputs, inputs, inputs], 1)
# initialise the optimiser
optimizer.zero_grad()
# forward
outputs = net(inputs)
# backward
loss = criterion(outputs, labels)
loss.backward()
# update the optimizer
optimizer.step()
# loss
running_loss += loss.item()
return running_loss, running_loss, 0
def Detector(self, net, watermarking_dict):
"""
:param file_watermark: file that contain our saved watermark elements
:return: the extracted watermark, the hamming distance compared to the original watermark
"""
# watermarking_dict = np.load(file_watermark, allow_pickle='TRUE').item() #retrieve the dictionary
keys = watermarking_dict['watermark']
res = 0
for img_file, label in keys.items():
img = self.get_image(watermarking_dict['folder'] + img_file)
net_guess = self.inference(net, img, watermarking_dict['transform'])
if net_guess == label:
res += 1
return '%i/%i' %(res,len(keys)), len(keys)-res<.1*len(keys)
def init(self, net, watermarking_dict, save=None):
'''
:param net: network
:param watermarking_dict: dictionary with all watermarking elements
:param save: file's name to save the watermark
:return: watermark_dict with a new entry: the secret key matrix X
'''
folder=watermarking_dict["folder"]
list_i = self.list_image(folder)
keys = {}
for i in range(len(list_i)):
keys[list_i[i]] = i % watermarking_dict["num_class"]
for img_file, label in keys.items():
img = self.get_image(folder + img_file)
for k in range(watermarking_dict["power"]):
self.add_images(watermarking_dict["dataset"], img, label)
trainloader = torch.utils.data.DataLoader(watermarking_dict["dataset"], batch_size=watermarking_dict["batch_size"],shuffle=True,num_workers=2)
watermarking_dict["trainloader"]=trainloader
watermarking_dict["watermark"]=keys
return watermarking_dict
def list_image(self, main_dir):
"""return all file in the directory"""
res = []
for f in os.listdir(main_dir):
if not f.startswith('.'):
res.append(f)
return res
def add_images(self, dataset, image, label):
"""add an image with its label to the dataset
:param dataset: aimed dataset to be modified
:param image: image to be added
:param label: label of this image
:return: 0
"""
(taille, height, width, channel) = np.shape(dataset.data)
dataset.data = np.append(dataset.data, image)
dataset.targets.append(label)
dataset.data = np.reshape(dataset.data, (taille + 1, height, width, channel))
return 0
def get_image(self, name):
"""
:param name: file (including the path) of an image
:return: a numpy of this image"""
image = Image.open(name)
return np.array(image)
def inference(self, net, img, transform):
"""make the inference for one image and a given transform"""
img_tensor = transform(img).unsqueeze(0)
net.eval()
with torch.no_grad():
logits = net.forward(img_tensor.to(device))
_, predicted = torch.max(logits, 1) # take the maximum value of the last layer
return predicted
'''
tools = Adi_tools()
folder = 'Resources/adi/'
power = 10
watermarking_dict = {'folder': folder, 'power': power, 'dataset': trainset, 'num_class': 10,
'batch_size':batch_size,'transform': inference_transform, "types":1}
'''
'''
Implementation of the method presented in Yusuke Uchida, Yuki Nagai, Shigeyuki Sakazawa, and Shin’ichi Satoh,
“Embedding watermarks into deep neural networks,”
in Proceedings of the 2017 ACM on International Conference on Multimedia Retrieval,
2017, pp. 269–277.
'''
from utils import *
class Uchi_tools():
def __init__(self) -> None:
super(Uchi_tools, self).__init__()
def Embedder_one_step(self, net, trainloader, optimizer, criterion, watermarking_dict):
'''
:param watermarking_dict: dictionary with all watermarking elements
:return: the different losses ( global loss, task loss, watermark loss)
'''
running_loss = 0
running_loss_nn = 0
running_loss_watermark = 0
for i, data in enumerate(trainloader, 0):
# split data into the image and its label
inputs, labels = data
inputs = inputs.to(device)
labels = labels.to(device)
if inputs.size()[1] == 1:
inputs.squeeze_(1)
inputs = torch.stack([inputs, inputs, inputs], 1)
# initialise the optimiser
optimizer.zero_grad()
# forward
outputs = net(inputs)
# backward
loss_nn = criterion(outputs, labels)
# watermark
loss_watermark = self.loss(net, watermarking_dict['weight_name'], watermarking_dict['X'], watermarking_dict['watermark'])
loss = loss_nn + watermarking_dict['lambd'] * loss_watermark # Uchida
loss.backward()
# update the optimizer
optimizer.step()
# loss
running_loss += loss.item()
running_loss_nn += loss_nn.item()
running_loss_watermark += loss_watermark.item()
return running_loss, running_loss_nn, running_loss_watermark
def Decoder(self, net, watermarking_dict):
"""
:param file_watermark: file that contain our saved watermark elements
:return: the extracted watermark, the hamming distance compared to the original watermark
"""
# watermarking_dict = np.load(file_watermark, allow_pickle='TRUE').item() #retrieve the dictionary
watermark = watermarking_dict['watermark'].to(device)
X = watermarking_dict['X'].to(device)
weight_name = watermarking_dict["weight_name"]
extraction = self.extraction(net, weight_name, X)
extraction_r = torch.round(extraction) # <.5 = 0 and >.5 = 1
res = self.hamming(watermark, extraction_r)/len(watermark)
return extraction_r, float(res)*100
def init(self, net, watermarking_dict):
'''
:param net: network
:param watermarking_dict: dictionary with all watermarking elements
:param save: file's name to save the watermark
:return: watermark_dict with a new entry: the secret key matrix X
'''
M = self.size_of_M(net, watermarking_dict['weight_name'])
T = len(watermarking_dict['watermark'])
X = torch.randn((T, M), device=device)
watermarking_dict['X']=X
watermarking_dict["types"]=1
return watermarking_dict
def projection(self, X, w):
'''
:param X: secret key matrix
:param w: flattened weight
:return: sigmoid of the matrix multiplication of the 2 inputs
'''
sigmoid_func = nn.Sigmoid()
res = torch.matmul(X, w)
sigmoid = sigmoid_func(res)
return sigmoid
def flattened_weight(self, net, weights_name):
'''
:param net: aimed network
:param weights_name: aimed layer's name
:return: a vector of dimension CxKxK (flattened weight)
'''
for name, parameters in net.named_parameters():
if weights_name in name:
f_weights = torch.mean(parameters, dim=0)
f_weights = f_weights.view(-1, )
return f_weights
def extraction(self, net, weights_name, X):
'''
:param net: aimed network
:param weights_name: aimed layer's name
:param X: secret key matrix
:return: a binary vector (watermark)
'''
W = self.flattened_weight(net, weights_name)
return self.projection(X, W)
def hamming(self, s1,s2):
'''
:param s1: sequence 1
:param s2: sequence 2
:return: the hamming distance between 2 vectors
'''
assert len(s1) == len(s2)
return sum(c1 != c2 for c1, c2 in zip(s1, s2))
def size_of_M(self, net, weight_name):
'''
:param net: aimed network
:param weights_name: aimed layer's name
:return: the 2nd dimension of the secret key matrix X
'''
for name, parameters in net.named_parameters():
if weight_name in name:
return parameters.size()[1] * parameters.size()[2] * parameters.size()[3]
def loss(self, net, weights_name, X, watermark):
'''
:param net: aimed network
:param weights_name: aimed layer's name
:param X: secret key matrix
:param watermark: the watermark
:return: Uchida's loss
'''
loss = 0
W = self.flattened_weight(net, weights_name)
yj = self.projection(X, W)
for i in range(len(watermark)):
loss += watermark[i] * torch.log2(yj[i]) + (1 - watermark[i]) * torch.log2(1 - yj[i])
return -loss/len(watermark)
# you can copy-paste this section into main to test Uchida's method
'''
tools=Uchi_tools()
weight_name = 'features.19.weight'
T = 64
watermark = torch.tensor(np.random.choice([0, 1], size=(T), p=[1. / 3, 2. / 3]), device=device)
watermarking_dict={'lambd':0.1, 'weight_name':weight_name,'watermark':watermark, "types":1}
'''
\ No newline at end of file
from .UCHIDA import *
from .ADI import *
\ No newline at end of file
# MPAI-NNW v1.2 implementation
## Case 1
This code refers to the implementation of the MPAI-NNW, as described in https://mpai.community/wp-content/uploads/2023/10/Reference-Software-Neural-Network-Watermarking-V1.pdf.
All the code is based in Python.
Case 1 resents a direct implementation of the NNW Technical Specification.
### Folders
The structure of case 1 is as followed:
- **Imperceptibility.py**, **Robustness.py**, **ComputationalCost.py** implement the different Evaluation presented in the Technical Specification.
- *Imperceptibility.py* evaluates imperceptibility by training and evaluate the trained model on a testing dataset
- *Robustness.py* evaluates the robustness of the given method by selecting a modification **M_ID=1** Modification and **{"P":0.5}** Parameters
- *Computational_cost* evaluates the computational cost of the method in terms of GPU memory and time of execution
- **utils.py** contains the common functions required for Evaluation.
- **Attacks** folder stores 7 files that implement the 7 Modifications involved in the Robustness Evaluation.
- **NNWfolder** stores a file for each watermarking method, a Python class that possesses three main functions (init, embedder/embedder_one_step, detectore/decoder).
- **Resources** folder contains additional resources for the watermarking methods and pretrained weights.
## Installation
Code was designed and tested on an Ubuntu 20.04 operating system using anaconda 23.7.2 and Python 3.9.
An environment with all the necessary libraries can be created using:
```bash
conda create --name <env> --file requirements.txt
```
## Run
All modifications should be applied inside the three files (**Imperceptibility.py**, **Robustness.py**, **ComputationalCost.py**). The "to be modified" section are delimited by ####### while the code can be run:
```bash
conda activate <env>
python Imperceptibility.py
```
To send commands to the controller as a user agent,
a second terminal should be open, and run:
```bash
conda activate <env>
python input.py
input: <your command>
```
Watermarking methods can be added in the NNW folder following the instruction specified in the technical specification (Python Class).
# Licence
[Licence](https://mpai.community/standards/mpai-cui/framework-licence/) information are detailed in the MPAI website
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment