Skip to content
GitLab
Projects
Groups
Snippets
/
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Sign in / Register
Toggle navigation
Menu
Open sidebar
MPAI-NNW
NNT ReferenceSoftware
Commits
f4318483
Commit
f4318483
authored
Jul 22, 2025
by
Carl De Sousa Trias
Browse files
Update NNW/__init__.py, NNW/UCHIDA.py, NNW/ADI.py
parent
0c77fda5
Pipeline
#73
canceled with stages
Changes
3
Pipelines
1
Hide whitespace changes
Inline
Side-by-side
NNW/ADI.py
0 → 100644
View file @
f4318483
from
utils
import
*
import
os
from
PIL
import
Image
class
Adi_tools
():
def
__init__
(
self
)
->
None
:
super
(
Adi_tools
,
self
).
__init__
()
def
Embedder_one_step
(
self
,
net
,
trainloader
,
optimizer
,
criterion
,
watermarking_dict
):
'''
:param watermarking_dict: dictionary with all watermarking elements
:return: the different losses ( global loss, task loss, watermark loss)
'''
running_loss
=
0
for
i
,
data
in
enumerate
(
watermarking_dict
[
"trainloader"
],
0
):
# split data into the image and its label
inputs
,
labels
=
data
inputs
=
inputs
.
to
(
device
)
labels
=
labels
.
to
(
device
)
if
inputs
.
size
()[
1
]
==
1
:
inputs
.
squeeze_
(
1
)
inputs
=
torch
.
stack
([
inputs
,
inputs
,
inputs
],
1
)
# initialise the optimiser
optimizer
.
zero_grad
()
# forward
outputs
=
net
(
inputs
)
# backward
loss
=
criterion
(
outputs
,
labels
)
loss
.
backward
()
# update the optimizer
optimizer
.
step
()
# loss
running_loss
+=
loss
.
item
()
return
running_loss
,
running_loss
,
0
def
Detector
(
self
,
net
,
watermarking_dict
):
"""
:param file_watermark: file that contain our saved watermark elements
:return: the extracted watermark, the hamming distance compared to the original watermark
"""
# watermarking_dict = np.load(file_watermark, allow_pickle='TRUE').item() #retrieve the dictionary
keys
=
watermarking_dict
[
'watermark'
]
res
=
0
for
img_file
,
label
in
keys
.
items
():
img
=
self
.
get_image
(
watermarking_dict
[
'folder'
]
+
img_file
)
net_guess
=
self
.
inference
(
net
,
img
,
watermarking_dict
[
'transform'
])
if
net_guess
==
label
:
res
+=
1
return
'%i/%i'
%
(
res
,
len
(
keys
)),
len
(
keys
)
-
res
<
.
1
*
len
(
keys
)
def
init
(
self
,
net
,
watermarking_dict
,
save
=
None
):
'''
:param net: network
:param watermarking_dict: dictionary with all watermarking elements
:param save: file's name to save the watermark
:return: watermark_dict with a new entry: the secret key matrix X
'''
folder
=
watermarking_dict
[
"folder"
]
list_i
=
self
.
list_image
(
folder
)
keys
=
{}
for
i
in
range
(
len
(
list_i
)):
keys
[
list_i
[
i
]]
=
i
%
watermarking_dict
[
"num_class"
]
for
img_file
,
label
in
keys
.
items
():
img
=
self
.
get_image
(
folder
+
img_file
)
for
k
in
range
(
watermarking_dict
[
"power"
]):
self
.
add_images
(
watermarking_dict
[
"dataset"
],
img
,
label
)
trainloader
=
torch
.
utils
.
data
.
DataLoader
(
watermarking_dict
[
"dataset"
],
batch_size
=
watermarking_dict
[
"batch_size"
],
shuffle
=
True
,
num_workers
=
2
)
watermarking_dict
[
"trainloader"
]
=
trainloader
watermarking_dict
[
"watermark"
]
=
keys
return
watermarking_dict
def
list_image
(
self
,
main_dir
):
"""return all file in the directory"""
res
=
[]
for
f
in
os
.
listdir
(
main_dir
):
if
not
f
.
startswith
(
'.'
):
res
.
append
(
f
)
return
res
def
add_images
(
self
,
dataset
,
image
,
label
):
"""add an image with its label to the dataset
:param dataset: aimed dataset to be modified
:param image: image to be added
:param label: label of this image
:return: 0
"""
(
taille
,
height
,
width
,
channel
)
=
np
.
shape
(
dataset
.
data
)
dataset
.
data
=
np
.
append
(
dataset
.
data
,
image
)
dataset
.
targets
.
append
(
label
)
dataset
.
data
=
np
.
reshape
(
dataset
.
data
,
(
taille
+
1
,
height
,
width
,
channel
))
return
0
def
get_image
(
self
,
name
):
"""
:param name: file (including the path) of an image
:return: a numpy of this image"""
image
=
Image
.
open
(
name
)
return
np
.
array
(
image
)
def
inference
(
self
,
net
,
img
,
transform
):
"""make the inference for one image and a given transform"""
img_tensor
=
transform
(
img
).
unsqueeze
(
0
)
net
.
eval
()
with
torch
.
no_grad
():
logits
=
net
.
forward
(
img_tensor
.
to
(
device
))
_
,
predicted
=
torch
.
max
(
logits
,
1
)
# take the maximum value of the last layer
return
predicted
'''
tools = Adi_tools()
folder = 'Resources/adi/'
power = 10
watermarking_dict = {'folder': folder, 'power': power, 'dataset': trainset, 'num_class': 10,
'batch_size':batch_size,'transform': inference_transform, "types":1}
'''
NNW/UCHIDA.py
0 → 100644
View file @
f4318483
'''
Implementation of the method presented in Yusuke Uchida, Yuki Nagai, Shigeyuki Sakazawa, and Shin’ichi Satoh,
“Embedding watermarks into deep neural networks,”
in Proceedings of the 2017 ACM on International Conference on Multimedia Retrieval,
2017, pp. 269–277.
'''
from
utils
import
*
class
Uchi_tools
():
def
__init__
(
self
)
->
None
:
super
(
Uchi_tools
,
self
).
__init__
()
def
Embedder_one_step
(
self
,
net
,
trainloader
,
optimizer
,
criterion
,
watermarking_dict
):
'''
:param watermarking_dict: dictionary with all watermarking elements
:return: the different losses ( global loss, task loss, watermark loss)
'''
running_loss
=
0
running_loss_nn
=
0
running_loss_watermark
=
0
for
i
,
data
in
enumerate
(
trainloader
,
0
):
# split data into the image and its label
inputs
,
labels
=
data
inputs
=
inputs
.
to
(
device
)
labels
=
labels
.
to
(
device
)
if
inputs
.
size
()[
1
]
==
1
:
inputs
.
squeeze_
(
1
)
inputs
=
torch
.
stack
([
inputs
,
inputs
,
inputs
],
1
)
# initialise the optimiser
optimizer
.
zero_grad
()
# forward
outputs
=
net
(
inputs
)
# backward
loss_nn
=
criterion
(
outputs
,
labels
)
# watermark
loss_watermark
=
self
.
loss
(
net
,
watermarking_dict
[
'weight_name'
],
watermarking_dict
[
'X'
],
watermarking_dict
[
'watermark'
])
loss
=
loss_nn
+
watermarking_dict
[
'lambd'
]
*
loss_watermark
# Uchida
loss
.
backward
()
# update the optimizer
optimizer
.
step
()
# loss
running_loss
+=
loss
.
item
()
running_loss_nn
+=
loss_nn
.
item
()
running_loss_watermark
+=
loss_watermark
.
item
()
return
running_loss
,
running_loss_nn
,
running_loss_watermark
def
Decoder
(
self
,
net
,
watermarking_dict
):
"""
:param file_watermark: file that contain our saved watermark elements
:return: the extracted watermark, the hamming distance compared to the original watermark
"""
# watermarking_dict = np.load(file_watermark, allow_pickle='TRUE').item() #retrieve the dictionary
watermark
=
watermarking_dict
[
'watermark'
].
to
(
device
)
X
=
watermarking_dict
[
'X'
].
to
(
device
)
weight_name
=
watermarking_dict
[
"weight_name"
]
extraction
=
self
.
extraction
(
net
,
weight_name
,
X
)
extraction_r
=
torch
.
round
(
extraction
)
# <.5 = 0 and >.5 = 1
res
=
self
.
hamming
(
watermark
,
extraction_r
)
/
len
(
watermark
)
return
extraction_r
,
float
(
res
)
*
100
def
init
(
self
,
net
,
watermarking_dict
):
'''
:param net: network
:param watermarking_dict: dictionary with all watermarking elements
:param save: file's name to save the watermark
:return: watermark_dict with a new entry: the secret key matrix X
'''
M
=
self
.
size_of_M
(
net
,
watermarking_dict
[
'weight_name'
])
T
=
len
(
watermarking_dict
[
'watermark'
])
X
=
torch
.
randn
((
T
,
M
),
device
=
device
)
watermarking_dict
[
'X'
]
=
X
watermarking_dict
[
"types"
]
=
1
return
watermarking_dict
def
projection
(
self
,
X
,
w
):
'''
:param X: secret key matrix
:param w: flattened weight
:return: sigmoid of the matrix multiplication of the 2 inputs
'''
sigmoid_func
=
nn
.
Sigmoid
()
res
=
torch
.
matmul
(
X
,
w
)
sigmoid
=
sigmoid_func
(
res
)
return
sigmoid
def
flattened_weight
(
self
,
net
,
weights_name
):
'''
:param net: aimed network
:param weights_name: aimed layer's name
:return: a vector of dimension CxKxK (flattened weight)
'''
for
name
,
parameters
in
net
.
named_parameters
():
if
weights_name
in
name
:
f_weights
=
torch
.
mean
(
parameters
,
dim
=
0
)
f_weights
=
f_weights
.
view
(
-
1
,
)
return
f_weights
def
extraction
(
self
,
net
,
weights_name
,
X
):
'''
:param net: aimed network
:param weights_name: aimed layer's name
:param X: secret key matrix
:return: a binary vector (watermark)
'''
W
=
self
.
flattened_weight
(
net
,
weights_name
)
return
self
.
projection
(
X
,
W
)
def
hamming
(
self
,
s1
,
s2
):
'''
:param s1: sequence 1
:param s2: sequence 2
:return: the hamming distance between 2 vectors
'''
assert
len
(
s1
)
==
len
(
s2
)
return
sum
(
c1
!=
c2
for
c1
,
c2
in
zip
(
s1
,
s2
))
def
size_of_M
(
self
,
net
,
weight_name
):
'''
:param net: aimed network
:param weights_name: aimed layer's name
:return: the 2nd dimension of the secret key matrix X
'''
for
name
,
parameters
in
net
.
named_parameters
():
if
weight_name
in
name
:
return
parameters
.
size
()[
1
]
*
parameters
.
size
()[
2
]
*
parameters
.
size
()[
3
]
def
loss
(
self
,
net
,
weights_name
,
X
,
watermark
):
'''
:param net: aimed network
:param weights_name: aimed layer's name
:param X: secret key matrix
:param watermark: the watermark
:return: Uchida's loss
'''
loss
=
0
W
=
self
.
flattened_weight
(
net
,
weights_name
)
yj
=
self
.
projection
(
X
,
W
)
for
i
in
range
(
len
(
watermark
)):
loss
+=
watermark
[
i
]
*
torch
.
log2
(
yj
[
i
])
+
(
1
-
watermark
[
i
])
*
torch
.
log2
(
1
-
yj
[
i
])
return
-
loss
/
len
(
watermark
)
# you can copy-paste this section into main to test Uchida's method
'''
tools=Uchi_tools()
weight_name = 'features.19.weight'
T = 64
watermark = torch.tensor(np.random.choice([0, 1], size=(T), p=[1. / 3, 2. / 3]), device=device)
watermarking_dict={'lambd':0.1, 'weight_name':weight_name,'watermark':watermark, "types":1}
'''
\ No newline at end of file
NNW/__init__.py
0 → 100644
View file @
f4318483
from
.UCHIDA
import
*
from
.ADI
import
*
\ No newline at end of file
Write
Preview
Supports
Markdown
0%
Try again
or
attach a new file
.
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment