Skip to content
GitLab
Projects
Groups
Snippets
/
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Sign in / Register
Toggle navigation
Menu
Open sidebar
MPAI-Private
MPAI-CAE
arp
Tape Audio Restoration
Commits
e027a97a
Commit
e027a97a
authored
Apr 24, 2023
by
Matteo
Browse files
update
parent
d4767d38
Changes
7
Hide whitespace changes
Inline
Side-by-side
.env
View file @
e027a97a
...
@@ -15,4 +15,6 @@ SPEED_W=15
...
@@ -15,4 +15,6 @@ SPEED_W=15
STANDARD_R="NAB"
STANDARD_R="NAB"
# Reading tape speed [ips].
# Reading tape speed [ips].
# Accepted values: (3.75, 7.5, 15, 30)
# Accepted values: (3.75, 7.5, 15, 30)
SPEED_R=15
SPEED_R=15
\ No newline at end of file
# Serving port
PORT=50051
\ No newline at end of file
Dockerfile
View file @
e027a97a
...
@@ -10,4 +10,4 @@ RUN poetry install --no-cache --only main
...
@@ -10,4 +10,4 @@ RUN poetry install --no-cache --only main
VOLUME
[ "/data" ]
VOLUME
[ "/data" ]
CMD
["poetry", "run", "
uvicorn", "src.server:app", "--host", "0.0.0.0", "--port", "80", "--log-config", "config/logger.yaml", "--root-path", "
/tape
-
audio
-
restoration"]
CMD
["poetry", "run", "
python", "src
/tape
_
audio
_
restoration
/server.py
"]
poetry.lock
View file @
e027a97a
...
@@ -751,14 +751,14 @@ files = [
...
@@ -751,14 +751,14 @@ files = [
[[package]]
[[package]]
name = "mpai-cae-arp"
name = "mpai-cae-arp"
version = "0.
3.2
"
version = "0.
4.1
"
description = "The MPAI CAE-ARP software API"
description = "The MPAI CAE-ARP software API"
category = "main"
category = "main"
optional = false
optional = false
python-versions = ">=3.10,<4.0"
python-versions = ">=3.10,<4.0"
files = [
files = [
{file = "mpai_cae_arp-0.
3.2
-py3-none-any.whl", hash = "sha256:
468db6aaae5a6d54d1460ede6e9a47686d12ab92680a51e14adf14dcba405aac
"},
{file = "mpai_cae_arp-0.
4.1
-py3-none-any.whl", hash = "sha256:
9601eb46c4561cae52347f8bdfbd8b575861fb6dad180b82eb0496b9dfa9f041
"},
{file = "mpai_cae_arp-0.
3.2
.tar.gz", hash = "sha256:
e447ee46f6c4ca18dd87e3b94dbf366d3f4852ad5530886fd577df9b8d32b2cd
"},
{file = "mpai_cae_arp-0.
4.1
.tar.gz", hash = "sha256:
11329c3abc906112a123130ebfb728151979af05d01ee9a1984af06b8c7be74c
"},
]
]
[package.dependencies]
[package.dependencies]
...
@@ -1345,14 +1345,14 @@ test = ["asv", "gmpy2", "mpmath", "pooch", "pytest", "pytest-cov", "pytest-timeo
...
@@ -1345,14 +1345,14 @@ test = ["asv", "gmpy2", "mpmath", "pooch", "pytest", "pytest-cov", "pytest-timeo
[[package]]
[[package]]
name = "setuptools"
name = "setuptools"
version = "67.7.
1
"
version = "67.7.
2
"
description = "Easily download, build, install, upgrade, and uninstall Python packages"
description = "Easily download, build, install, upgrade, and uninstall Python packages"
category = "main"
category = "main"
optional = false
optional = false
python-versions = ">=3.7"
python-versions = ">=3.7"
files = [
files = [
{file = "setuptools-67.7.
1
-py3-none-any.whl", hash = "sha256:
6f0839fbdb7e3cfef1fc38d7954f5c1c26bf4eebb155a55c9bf8faf997b9fb67
"},
{file = "setuptools-67.7.
2
-py3-none-any.whl", hash = "sha256:
23aaf86b85ca52ceb801d32703f12d77517b2556af839621c641fca11287952b
"},
{file = "setuptools-67.7.
1
.tar.gz", hash = "sha256:
bb16732e8eb928922eabaa022f881ae2b7cdcfaf9993ef1f5e841a96d32b8e0c
"},
{file = "setuptools-67.7.
2
.tar.gz", hash = "sha256:
f104fa03692a2602fa0fec6c6a9e63b6c8a968de13e17c026957dd1f53d80990
"},
]
]
[package.extras]
[package.extras]
...
@@ -1484,4 +1484,4 @@ socks = ["PySocks (>=1.5.6,!=1.5.7,<2.0)"]
...
@@ -1484,4 +1484,4 @@ socks = ["PySocks (>=1.5.6,!=1.5.7,<2.0)"]
[metadata]
[metadata]
lock-version = "2.0"
lock-version = "2.0"
python-versions = ">=3.10,<3.12"
python-versions = ">=3.10,<3.12"
content-hash = "
5a3914f84e608f8a2b21b7010f3ecfc12f268e593396f6c81e1aee89ac022109
"
content-hash = "
b58f6b06a29fe087ece41d671018b4b385d478acbe378ccb1183152df3bb6b76
"
pyproject.toml
View file @
e027a97a
...
@@ -15,8 +15,8 @@ python = ">=3.10,<3.12"
...
@@ -15,8 +15,8 @@ python = ">=3.10,<3.12"
numpy
=
"1.23.3"
numpy
=
"1.23.3"
control
=
"0.9.3.post2"
control
=
"0.9.3.post2"
scipy
=
"^1.10.1"
scipy
=
"^1.10.1"
mpai-cae-arp
=
"^0.3.2"
rich
=
"^13.3.4"
rich
=
"^13.3.4"
mpai-cae-arp
=
"^0.4.1"
[build-system]
[build-system]
...
...
src/tape_audio_restoration/cli.py
View file @
e027a97a
...
@@ -5,9 +5,10 @@ from typing import NoReturn
...
@@ -5,9 +5,10 @@ from typing import NoReturn
from
rich.console
import
Console
from
rich.console
import
Console
from
scipy.io
import
wavfile
from
scipy.io
import
wavfile
from
mpai_cae_arp.files
import
File
,
FileType
from
mpai_cae_arp.audio.standards
import
SpeedStandard
from
mpai_cae_arp.io
import
Color
,
Style
,
pprint
from
mpai_cae_arp.time
import
seconds_to_string
from
mpai_cae_arp.audio
import
AudioWave
from
mpai_cae_arp.types.restoration
import
Restoration
,
EditingList
from
.lib
import
correction
,
get_correction_filter
,
save_file
,
check_input
from
.lib
import
correction
,
get_correction_filter
,
save_file
,
check_input
...
@@ -118,13 +119,39 @@ def main():
...
@@ -118,13 +119,39 @@ def main():
sample_rate
,
preservation_audio_file
=
wavfile
.
read
(
paf_path
)
sample_rate
,
preservation_audio_file
=
wavfile
.
read
(
paf_path
)
# get the correction filter
# get the correction filter
with
console
.
status
(
"[bold]Applying correction filter..."
,
spinner
=
"dots"
):
with
console
.
status
(
"[bold]Applying correction filter..."
,
spinner
=
"dots"
):
a
,
b
,
new_sample_rate
,
_
=
get_correction_filter
(
standard_w
,
speed_w
,
standard_r
,
speed_r
,
sample_rate
)
correction_filter
=
get_correction_filter
(
standard_w
,
speed_w
,
standard_r
,
speed_r
,
sample_rate
)
restored_audio_file
=
preservation_audio_file
restored_audio_file
=
preservation_audio_file
# if needed, apply the correction filter to the Preservation Audio file
if
correction_filter
is
None
:
if
len
(
a
)
!=
0
:
new_sample_rate
=
sample_rate
restored_audio_file
=
correction
(
a
,
b
,
preservation_audio_file
,
round
(
new_sample_rate
))
else
:
a
,
b
,
new_sample_rate
,
_
=
correction_filter
# if needed, apply the correction filter to the Preservation Audio file
if
len
(
a
)
!=
0
:
restored_audio_file
=
correction
(
a
,
b
,
preservation_audio_file
,
round
(
new_sample_rate
))
with
console
.
status
(
"[bold]Saving Restoration Audio file..."
,
spinner
=
"dots"
):
with
console
.
status
(
"[bold]Saving Restoration Audio file..."
,
spinner
=
"dots"
):
save_file
(
restored_audio_file
,
round
(
new_sample_rate
),
temp_path
,
f
"
{
files_name
}
.wav"
)
save_file
(
restored_audio_file
,
round
(
new_sample_rate
),
temp_path
,
files_name
)
editing_list_path
=
os
.
path
.
join
(
working_path
,
'temp'
,
files_name
,
'EditingList.json'
)
editing_list
=
EditingList
(
OriginalEqualisationStandard
=
standard_w
,
OriginalSampleFrequency
=
sample_rate
,
OriginalSpeedStandard
=
SpeedStandard
(
speed_w
),
Restorations
=
[
Restoration
(
PreservationAudioFileStart
=
"00:00:00.000"
,
PreservationAudioFileEnd
=
seconds_to_string
(
len
(
preservation_audio_file
)
/
sample_rate
),
RestoredAudioFileURI
=
f
"
{
temp_path
}
/RestoredAudioFile/
{
files_name
}
.wav"
,
ReadingBackwards
=
False
,
AppliedSpeedStandard
=
SpeedStandard
(
speed_r
),
AppliedSampleFrequency
=
new_sample_rate
,
AppliedEqualisationStandard
=
standard_r
,
)
]
)
editing_list
.
save_as_json_file
(
editing_list_path
)
except
ValueError
:
except
ValueError
:
exit_with_error
(
exit_with_error
(
...
@@ -137,7 +164,3 @@ def main():
...
@@ -137,7 +164,3 @@ def main():
console
.
print
(
"[green] Success! :tada:"
)
console
.
print
(
"[green] Success! :tada:"
)
quit
(
os
.
EX_OK
)
quit
(
os
.
EX_OK
)
if
__name__
==
'__main__'
:
main
()
src/tape_audio_restoration/lib.py
View file @
e027a97a
...
@@ -11,16 +11,16 @@ import array
...
@@ -11,16 +11,16 @@ import array
import
numpy
as
np
import
numpy
as
np
import
os
import
os
import
shutil
import
shutil
import
sys
from
statistics
import
mode
from
statistics
import
mode
from
control
import
c2d
,
TransferFunction
from
control
import
c2d
,
TransferFunction
from
numpy
import
ndarray
from
numpy
import
ndarray
from
scipy.io
import
wavfile
from
scipy.io
import
wavfile
from
scipy.signal
import
tf2zpk
,
zpk2tf
,
lfilter
from
scipy.signal
import
tf2zpk
,
zpk2tf
,
lfilter
from
mpai_cae_arp.audio.standards
import
EqualizationStandard
,
SpeedStandard
from
mpai_cae_arp.files
import
File
,
FileType
from
mpai_cae_arp.files
import
File
,
FileType
from
mpai_cae_arp.io
import
Color
,
Style
,
pprint
from
mpai_cae_arp.io
import
Color
,
pprint
from
mpai_cae_arp.types.irregularity
import
IrregularityFile
,
Irregularity
,
IrregularityProperties
from
mpai_cae_arp.types.irregularity
import
IrregularityFile
__copyright__
=
"Copyright 2022, Audio Innova S.r.l."
__copyright__
=
"Copyright 2022, Audio Innova S.r.l."
...
@@ -32,28 +32,54 @@ def verify_path(working_path: str, files_name: str) -> str:
...
@@ -32,28 +32,54 @@ def verify_path(working_path: str, files_name: str) -> str:
raise
NotImplementedError
raise
NotImplementedError
def
verify_equalization_and_speed
(
standard_w
:
str
,
speed_w
:
float
,
standard_r
:
str
,
speed_r
:
float
)
->
tuple
[
str
,
str
]:
def
verify_equalization_and_speed
(
standard_w
:
EqualizationStandard
|
str
,
speed_w
:
SpeedStandard
|
float
,
standard_r
:
EqualizationStandard
|
str
,
speed_r
:
SpeedStandard
|
float
)
->
tuple
[
str
,
str
]:
"""
"""
Method to verify that the equalization and speed parameters are correct.
Method to verify that the equalization and speed parameters are correct.
"""
"""
raise
NotImplementedError
raise
NotImplementedError
def
check_input
(
working_path
:
str
,
files_name
:
str
,
standard_w
:
str
,
speed_w
:
float
,
standard_r
:
str
,
speed_r
:
float
)
->
tuple
[
str
,
str
,
str
,
str
]:
def
check_input
(
working_path
:
str
,
files_name
:
str
,
standard_w
:
EqualizationStandard
,
speed_w
:
float
,
standard_r
:
EqualizationStandard
,
speed_r
:
float
)
->
tuple
[
str
,
str
,
EqualizationStandard
,
EqualizationStandard
]:
"""
"""
Method to check that passed arguments are correct and that the environment is conformant to the standard;
Method to check that passed arguments are correct and that the environment is conformant to the standard;
:param working_path: str representing the path where all files resulting from previous AIMs are stored,
:param files_name: str representing the Preservation files name, to identify the input directory,
Parameters
:param standard_w: str specifying the equalization standard used when the tape was recorded,
----------
:param speed_w: float specifying the speed used when the tape was recorded,
working_path: str
:param standard_r: str specifying the equalization standard used when the tape was read,
representing the path where all files resulting from previous AIMs are stored,
:param speed_r: float specifying the speed used when the tape was read.
files_name: str
:return: tuple consisting of three variables:
representing the Preservation files name, to identify the input directory,
1) str representing the path where the Preservation Audio File is stored;
standard_w: EqualizationStandard
2) str representing the path where the files to be processed during the current execution are stored;
specifying the equalization standard used when the tape was recorded,
3) the operating standard_w;
speed_w: float
4) the operating standard_r.
specifying the speed used when the tape was recorded,
standard_r: EqualizationStandard
specifying the equalization standard used when the tape was read,
speed_r: float
specifying the speed used when the tape was read.
Returns
-------
tuple[str, str, EqualizationStandard, EqualizationStandard]
consisting of four variables:
- str representing the path where the Preservation Audio File is stored;
- str representing the path where the files to be processed during the current execution are stored;
- the operating standard_w;
- the operating standard_r.
"""
"""
if
standard_w
is
EqualizationStandard
.
CCIR
:
standard_w
=
'CCIR'
elif
standard_w
is
EqualizationStandard
.
NAB
:
standard_w
=
'NAB'
elif
standard_w
is
EqualizationStandard
.
IEC
:
standard_w
=
'IEC'
if
standard_r
is
EqualizationStandard
.
CCIR
:
standard_r
=
'CCIR'
elif
standard_r
is
EqualizationStandard
.
NAB
:
standard_r
=
'NAB'
elif
standard_r
is
EqualizationStandard
.
IEC
:
standard_r
=
'IEC'
# Check for working path existence
# Check for working path existence
if
not
os
.
path
.
exists
(
working_path
):
if
not
os
.
path
.
exists
(
working_path
):
...
@@ -135,20 +161,30 @@ def check_input(working_path: str, files_name: str, standard_w: str, speed_w: fl
...
@@ -135,20 +161,30 @@ def check_input(working_path: str, files_name: str, standard_w: str, speed_w: fl
)
)
standard_r
=
'CCIR'
standard_r
=
'CCIR'
if
standard_w
==
'CCIR'
:
standard_w
=
EqualizationStandard
.
CCIR
elif
standard_w
==
'NAB'
:
standard_w
=
EqualizationStandard
.
NAB
if
standard_r
==
'CCIR'
:
standard_r
=
EqualizationStandard
.
CCIR
elif
standard_r
==
'NAB'
:
standard_r
=
EqualizationStandard
.
NAB
return
paf_path
,
temp_path
,
standard_w
,
standard_r
return
paf_path
,
temp_path
,
standard_w
,
standard_r
def
get_correction_filter
(
standard_w
:
str
,
speed_w
:
float
,
standard_r
:
str
,
speed_r
:
float
,
fs
:
int
)
->
tuple
[
list
,
list
,
float
,
int
]:
def
get_correction_filter
(
standard_w
:
EqualizationStandard
,
speed_w
:
float
,
standard_r
:
EqualizationStandard
,
speed_r
:
float
,
fs
:
int
)
->
tuple
[
list
,
list
,
float
,
int
]
|
None
:
"""
"""
Method to establish correct filter transfer function coefficients;
Method to establish correct filter transfer function coefficients;
Parameters
Parameters
----------
----------
standard_w :
str
standard_w :
EqualizationStandard
String specifying the equalization standard used when the tape was recorded.
String specifying the equalization standard used when the tape was recorded.
speed_w : float
speed_w : float
Float specifying the speed used when the tape was recorded.
Float specifying the speed used when the tape was recorded.
standard_r :
str
standard_r :
EqualizationStandard
String specifying the equalization standard used when the tape was read.
String specifying the equalization standard used when the tape was read.
speed_r : float
speed_r : float
Float specifying the speed used when the tape was read.
Float specifying the speed used when the tape was read.
...
@@ -165,6 +201,21 @@ def get_correction_filter(standard_w: str, speed_w: float, standard_r: str, spee
...
@@ -165,6 +201,21 @@ def get_correction_filter(standard_w: str, speed_w: float, standard_r: str, spee
* float specifying the operating sampling frequency;
* float specifying the operating sampling frequency;
* int informing about the case number.
* int informing about the case number.
"""
"""
if
standard_w
is
EqualizationStandard
.
CCIR
:
standard_w
=
'CCIR'
elif
standard_w
is
EqualizationStandard
.
NAB
:
standard_w
=
'NAB'
elif
standard_w
is
EqualizationStandard
.
IEC
:
standard_w
=
'IEC'
if
standard_r
is
EqualizationStandard
.
CCIR
:
standard_r
=
'CCIR'
elif
standard_r
is
EqualizationStandard
.
NAB
:
standard_r
=
'NAB'
elif
standard_r
is
EqualizationStandard
.
IEC
:
standard_r
=
'IEC'
# CCIR time constants.
# CCIR time constants.
t2_30
=
17.5
*
10
**
(
-
6
)
# time constant CCIR_30
t2_30
=
17.5
*
10
**
(
-
6
)
# time constant CCIR_30
t2_15
=
35
*
10
**
(
-
6
)
# time constant CCIR_15
t2_15
=
35
*
10
**
(
-
6
)
# time constant CCIR_15
...
@@ -230,7 +281,7 @@ def get_correction_filter(standard_w: str, speed_w: float, standard_r: str, spee
...
@@ -230,7 +281,7 @@ def get_correction_filter(standard_w: str, speed_w: float, standard_r: str, spee
if
speed_r
==
30
:
if
speed_r
==
30
:
print
(
'Reference case: 31'
)
print
(
'Reference case: 31'
)
pprint
(
'Nothing to do!'
,
color
=
Color
.
GREEN
)
pprint
(
'Nothing to do!'
,
color
=
Color
.
GREEN
)
quit
(
os
.
EX_OK
)
return
None
# Case 15
# Case 15
elif
speed_r
==
15
:
elif
speed_r
==
15
:
fs
=
2
*
fs
# Doubling sampling frequency
fs
=
2
*
fs
# Doubling sampling frequency
...
@@ -294,7 +345,7 @@ def get_correction_filter(standard_w: str, speed_w: float, standard_r: str, spee
...
@@ -294,7 +345,7 @@ def get_correction_filter(standard_w: str, speed_w: float, standard_r: str, spee
elif
speed_r
==
15
:
elif
speed_r
==
15
:
print
(
'Reference case: 33'
)
print
(
'Reference case: 33'
)
pprint
(
'Nothing to do!'
,
color
=
Color
.
GREEN
)
pprint
(
'Nothing to do!'
,
color
=
Color
.
GREEN
)
quit
(
os
.
EX_OK
)
return
None
# Case 20
# Case 20
else
:
# speed_r == 7.5
else
:
# speed_r == 7.5
fs
=
2
*
fs
# Doubling the sampling frequency
fs
=
2
*
fs
# Doubling the sampling frequency
...
@@ -358,7 +409,7 @@ def get_correction_filter(standard_w: str, speed_w: float, standard_r: str, spee
...
@@ -358,7 +409,7 @@ def get_correction_filter(standard_w: str, speed_w: float, standard_r: str, spee
else
:
# speed_r == 7.5
else
:
# speed_r == 7.5
print
(
'Reference case: 35'
)
print
(
'Reference case: 35'
)
pprint
(
'Nothing to do!'
,
color
=
Color
.
GREEN
)
pprint
(
'Nothing to do!'
,
color
=
Color
.
GREEN
)
quit
(
os
.
EX_OK
)
return
None
else
:
# standard_w == 'NAB'
else
:
# standard_w == 'NAB'
if
speed_w
==
15
:
if
speed_w
==
15
:
if
standard_r
==
'NAB'
:
if
standard_r
==
'NAB'
:
...
@@ -366,7 +417,7 @@ def get_correction_filter(standard_w: str, speed_w: float, standard_r: str, spee
...
@@ -366,7 +417,7 @@ def get_correction_filter(standard_w: str, speed_w: float, standard_r: str, spee
if
speed_r
==
15
:
if
speed_r
==
15
:
print
(
'Reference case: 32'
)
print
(
'Reference case: 32'
)
pprint
(
'Nothing to do!'
,
color
=
Color
.
GREEN
)
pprint
(
'Nothing to do!'
,
color
=
Color
.
GREEN
)
quit
(
os
.
EX_OK
)
return
None
# Case 17
# Case 17
elif
speed_r
==
7.5
:
elif
speed_r
==
7.5
:
fs
=
2
*
fs
# Doubling the sampling frequency
fs
=
2
*
fs
# Doubling the sampling frequency
...
@@ -455,7 +506,7 @@ def get_correction_filter(standard_w: str, speed_w: float, standard_r: str, spee
...
@@ -455,7 +506,7 @@ def get_correction_filter(standard_w: str, speed_w: float, standard_r: str, spee
elif
speed_r
==
7.5
:
elif
speed_r
==
7.5
:
print
(
'Reference case: 34'
)
print
(
'Reference case: 34'
)
pprint
(
'Nothing to do!'
,
color
=
Color
.
GREEN
)
pprint
(
'Nothing to do!'
,
color
=
Color
.
GREEN
)
quit
(
os
.
EX_OK
)
return
None
# Case 22
# Case 22
else
:
# speed_r == 3.75
else
:
# speed_r == 3.75
fs
=
2
*
fs
# Doubling the sampling frequency
fs
=
2
*
fs
# Doubling the sampling frequency
...
@@ -544,7 +595,7 @@ def get_correction_filter(standard_w: str, speed_w: float, standard_r: str, spee
...
@@ -544,7 +595,7 @@ def get_correction_filter(standard_w: str, speed_w: float, standard_r: str, spee
else
:
# speed_r == 3.75
else
:
# speed_r == 3.75
print
(
'Reference case: 36'
)
print
(
'Reference case: 36'
)
pprint
(
'Nothing to do!'
,
color
=
Color
.
GREEN
)
pprint
(
'Nothing to do!'
,
color
=
Color
.
GREEN
)
quit
(
os
.
EX_OK
)
return
None
else
:
# standard_r == 'CCIR'
else
:
# standard_r == 'CCIR'
# Case 12
# Case 12
if
speed_r
==
30
:
if
speed_r
==
30
:
...
@@ -713,18 +764,18 @@ def read_params_from_irregularity_file(irregularity_file_path: str) -> tuple[str
...
@@ -713,18 +764,18 @@ def read_params_from_irregularity_file(irregularity_file_path: str) -> tuple[str
- writing speed: 7.5
- writing speed: 7.5
"""
"""
json_content
=
File
(
irregularity_file_path
,
FileType
.
JSON
).
get_content
()
json_content
=
File
(
irregularity_file_path
,
FileType
.
JSON
).
get_content
()
irregularity_file
=
IrregularityFile
.
from_json
(
json_content
)
irregularity_file
=
IrregularityFile
(
**
json_content
)
write_speeds
=
[]
write_speeds
=
[]
read_speeds
=
[]
read_speeds
=
[]
write_eq
=
[]
write_eq
=
[]
read_eq
=
[]
read_eq
=
[]
for
irr
in
irregularity_file
.
irregularities
:
for
irr
in
irregularity_file
.
irregularities
:
if
irr
.
irregularity_
properties
is
not
None
:
if
irr
.
properties
is
not
None
:
read_eq
.
append
(
irr
.
irregularity_
properties
.
reading_equalisation
)
read_eq
.
append
(
irr
.
properties
.
reading_equalisation
)
read_speeds
.
append
(
irr
.
irregularity_
properties
.
reading_speed
)
read_speeds
.
append
(
irr
.
properties
.
reading_speed
)
write_eq
.
append
(
irr
.
irregularity_
properties
.
writing_equalisation
)
write_eq
.
append
(
irr
.
properties
.
writing_equalisation
)
write_speeds
.
append
(
irr
.
irregularity_
properties
.
writing_speed
)
write_speeds
.
append
(
irr
.
properties
.
writing_speed
)
else
:
else
:
read_eq
.
append
(
None
)
read_eq
.
append
(
None
)
read_speeds
.
append
(
None
)
read_speeds
.
append
(
None
)
...
@@ -735,56 +786,6 @@ def read_params_from_irregularity_file(irregularity_file_path: str) -> tuple[str
...
@@ -735,56 +786,6 @@ def read_params_from_irregularity_file(irregularity_file_path: str) -> tuple[str
if
all
(
x
is
None
for
x
in
[
result_rs
,
result_ws
])
or
len
(
write_speeds
)
==
0
:
if
all
(
x
is
None
for
x
in
[
result_rs
,
result_ws
])
or
len
(
write_speeds
)
==
0
:
result_ws
,
result_rs
=
7.5
,
7.5
result_ws
,
result_rs
=
7.5
,
7.5
if
all
(
x
is
None
for
x
in
[
result_we
,
result_re
])
or
len
(
write_eq
)
==
0
:
if
all
(
x
is
None
for
x
in
[
result_we
,
result_re
])
or
len
(
write_eq
)
==
0
:
result_we
,
result_re
=
'
NAB
'
,
'
NAB
'
result_we
,
result_re
=
EqualizationStandard
.
NAB
,
EqualizationStandard
.
NAB
return
result_we
,
result_ws
,
result_re
,
result_rs
return
result_we
,
result_ws
,
result_re
,
result_rs
# def main():
# """
# Main execution method.
# :return: exit codes corresponding to the execution status.
# """
# pprint("\nWelcome to ARP Tape Audio Restoration!", styles=[Style.BOLD])
# print("You are using Python version: " + sys.version)
# # Get the input from config/args.yaml or command line
# working_path, files_name, standard_w, speed_w, standard_r, speed_r = get_args()
# # Check if input is correct
# paf_path, temp_path, standard_w, standard_r = check_input(working_path, files_name, standard_w, speed_w, standard_r, speed_r)
# # Display input parameters
# print('\nInput parameters:')
# print(' WORKING_PATH: ' + working_path)
# print(' FILES_NAME: ' + files_name)
# print(' STANDARD_W: ' + standard_w)
# print(' SPEED_W: ' + str(speed_w) + ' ips')
# print(' STANDARD_R: ' + standard_r)
# print(' SPEED_R: ' + str(speed_r) + ' ips')
# # Preservation Audio File check
# print("Opening '%s'..." % paf_path)
# fs, paf = wavfile.read(paf_path)
# print('Preservation Audio File opened!')
# print(' FS: ' + str(fs) + ' Hz\n')
# # Decision stage
# a, b, fs, case = get_correction_filter(standard_w, speed_w, standard_r, speed_r, fs)
# # Casting FS to int because wavfile.write() is stupid
# fs = round(fs)
# print('Reference case: ' + str(case))
# print('Operational FS: ' + str(fs) + ' Hz.')
# # Correction phase
# if len(a) != 0:
# # Not all cases present a correction filter!
# raf = correction(a, b, paf, fs)
# save_file(raf, fs, temp_path, '1')
# else:
# # Just save Restored Audio File, but with modified fs
# save_file(paf, fs, temp_path, '1')
# # End
# pprint("Success!\n", color=Color.GREEN, styles=[Style.BOLD])
src/tape_audio_restoration/server.py
View file @
e027a97a
import
os
from
concurrent
import
futures
import
grpc
from
grpc
import
StatusCode
from
rich.console
import
Console
from
scipy.io
import
wavfile
from
mpai_cae_arp.audio.standards
import
EqualizationStandard
,
SpeedStandard
from
mpai_cae_arp.files
import
File
,
FileType
from
mpai_cae_arp.time
import
seconds_to_string
from
mpai_cae_arp.types.restoration
import
Restoration
,
EditingList
from
mpai_cae_arp.network
import
arp_pb2_grpc
as
arp_pb2_grpc
from
mpai_cae_arp.network.arp_pb2
import
(
JobRequest
,
JobResponse
,
Contact
,
InfoResponse
,
License
,
)
from
tape_audio_restoration
import
lib
PORT
=
os
.
getenv
(
"PORT"
)
or
'50051'
info
=
File
(
'config.yml'
,
FileType
.
YAML
).
get_content
()
def
error_response
(
context
,
status
,
message
):
context
.
set_code
(
status
)
context
.
set_details
(
message
)
return
JobResponse
(
status
=
"error"
,
message
=
message
)
class
TapeAudioRestorationServicer
(
arp_pb2_grpc
.
AIMServicer
):
def
__init__
(
self
,
console
:
Console
):
self
.
console
=
console
def
getInfo
(
self
,
request
,
context
)
->
InfoResponse
:
self
.
console
.
log
(
'Received request for AIM info'
)
context
.
set_code
(
StatusCode
.
OK
)
context
.
set_details
(
'Success'
)
return
InfoResponse
(
title
=
info
[
'title'
],
description
=
info
[
'description'
],
version
=
info
[
'version'
],
contact
=
Contact
(
name
=
info
[
'contact'
][
'name'
],
email
=
info
[
'contact'
][
'email'
],
),
license
=
License
(
name
=
info
[
'license_info'
][
'name'
],
url
=
info
[
'license_info'
][
'url'
],
)
)
def
work
(
self
,
request
:
JobRequest
,
context
):
self
.
console
.
log
(
'Received request for computation'
)
self
.
console
.
log
(
request
)
working_dir
:
str
=
request
.
working_dir
files_name
:
str
=
request
.
files_name
temp_dir
=
os
.
path
.
join
(
working_dir
,
"temp"
,
files_name
)
audio_irreg_2
=
os
.
path
.
join
(
temp_dir
,
'AudioAnalyser_IrregularityFileOutput2.json'
)
editing_list_path
=
os
.
path
.
join
(
temp_dir
,
'EditingList.json'
)
try
:
standard_w
,
speed_w
,
standard_r
,
speed_r
=
lib
.
read_params_from_irregularity_file
(
audio_irreg_2
)
paf_path
,
temp_path
,
standard_w
,
standard_r
=
lib
.
check_input
(
working_dir
,
files_name
,
standard_w
,
speed_w
,
standard_r
,
speed_r
)
yield
JobResponse
(
status
=
"info"
,
message
=
f
"Applying equalisation filter from
{
standard_w
}
to
{
standard_r
}
"
)
yield
JobResponse
(
status
=
"info"
,
message
=
f
"Applying speed filter from
{
speed_w
}
to
{
speed_r
}
"
)
sample_rate
,
preservation_audio_file
=
wavfile
.
read
(
paf_path
)
correction_filter
=
lib
.
get_correction_filter
(
standard_w
,
speed_w
,
standard_r
,
speed_r
,
sample_rate
)
restored_audio_file
=
preservation_audio_file
if
correction_filter
is
None
:
yield
JobResponse
(
status
=
"success"
,
message
=
"No correction filter has to be applied"
)
new_sample_rate
=
sample_rate
else
:
a
,
b
,
new_sample_rate
,
_
=
correction_filter
yield
JobResponse
(
status
=
"info"
,
message
=
f
"Converting from
{
sample_rate
}
Hz to
{
new_sample_rate
}
Hz"
)
if
len
(
a
)
!=
0
:
restored_audio_file
=
lib
.
correction
(
a
,
b
,
preservation_audio_file
,
round
(
new_sample_rate
))
lib
.
save_file
(
restored_audio_file
,
round
(
new_sample_rate
),
temp_path
,
files_name
)
yield
JobResponse
(
status
=
"success"
,
message
=
f
"Restored audio file created at
{
temp_path
}
/RestoredAudioFile/
{
files_name
}
.wav"
)
editing_list
=
EditingList
(
OriginalEqualisationStandard
=
standard_w
,
OriginalSampleFrequency
=
sample_rate
,
OriginalSpeedStandard
=
SpeedStandard
(
speed_w
),
Restorations
=
[
Restoration
(
PreservationAudioFileStart
=
"00:00:00.000"
,
PreservationAudioFileEnd
=
seconds_to_string
(
len
(
preservation_audio_file
)
/
sample_rate
),
RestoredAudioFileURI
=
f
"
{
temp_path
}
/RestoredAudioFile/
{
files_name
}
.wav"
,
ReadingBackwards
=
False
,
AppliedSpeedStandard
=
SpeedStandard
(
speed_r
),
AppliedSampleFrequency
=
new_sample_rate
,
AppliedEqualisationStandard
=
standard_r
,
)
]
)
editing_list
.
save_as_json_file
(
editing_list_path
)
yield
JobResponse
(
status
=
"success"
,
message
=
f
"Editing list created at
{
editing_list_path
}
"
)
yield
JobResponse
(
status
=
"success"
,
message
=
"Success"
)
except
Exception
as
e
:
yield
JobResponse
(
status
=
"error"
,
message
=
str
(
e
))
def
serve
(
console
):
server
=
grpc
.
server
(
futures
.
ThreadPoolExecutor
(
max_workers
=
10
))
arp_pb2_grpc
.
add_AIMServicer_to_server
(
TapeAudioRestorationServicer
(
console
),
server
)
server
.
add_insecure_port
(
f
'[::]:
{
PORT
}
'
)
server
.
start
()
server
.
wait_for_termination
()
if
__name__
==
'__main__'
:
console
=
Console
()
console
.
print
(
f
'Server started at localhost:
{
PORT
}
:satellite:'
)
serve
(
console
)
Write
Preview
Supports
Markdown
0%
Try again
or
attach a new file
.
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment