Commit e1615572 authored by Nadir Dalla Pozza's avatar Nadir Dalla Pozza
Browse files

Refactoring

parent 5d9dae35
......@@ -13,8 +13,9 @@ The *Tape Audio Restoration* is written in Python 3.9 which is therefore require
```
pip install pyyaml
```
[NumPy](https://numpy.org) and [SciPy](https://scipy.org) are required for correcting the audio of the Preservation Audio File. You can install them with:
[Control](https://python-control.readthedocs.io/en/0.9.3.post2/index.html), [NumPy](https://numpy.org) and [SciPy](https://scipy.org) are required for correcting the audio of the Preservation Audio File. You can install them with:
```
pip install control
pip install numpy
pip install scipy
```
......@@ -32,14 +33,14 @@ pip install -r requirements.txt
Once the libraries are installed, you should customise the configuration file `config.yaml`.
There are nine required parameters:
1. `WORKING_PATH` that specifies the working path where all input files are stored and where all output files will be saved;
2. `PRESERVATION_FILE_NAME` that specifies the name of the Preservation Audio File to be considered. Use an empty string
2. `FILES_NAME` that specifies the name of the Preservation Audio File to be considered. Use an empty string
to plot filter frequency response;
3. `STANDARD_W` that specifies the equalisation standard used to record the tape;
4. `SPEED_W` that specifies the speed used when recording the tape;
5. `STANDARD_R` that specifies the equalisation standard used to read the tape;
6. `SPEED_R` that specifies the speed used when reading the tape;
7. `FS` that specifies the sampling frequency to be used for the filters and the impulse responses.
This parameter won't be considered if `PRESERVATION_FILE_NAME` parameter is not an empty string;
This parameter won't be considered if `FILES_NAME` parameter is not an empty string;
8. `BPS` that specifies the bits-per-sample to be used when saving the filter impulse response as WAV;
9. `PLOTS` that specifies if the plots of the frequency response should be shown.
......@@ -107,7 +108,7 @@ Please note that:
* Corresponding input files shall present the same name;
* The name of Irregularity Files given above is ***mandatory***.
With this structure, `PRESERVATION_FILE_NAME` parameter could be equal to `File1` or `File2`.
With this structure, `FILES_NAME` parameter could be equal to `File1` or `File2`.
You can now launch the *Tape Audio Restoration* from the command line with:
```
......@@ -127,6 +128,7 @@ This project was developed by:
* Sergio Canazza (University of Padova).
This project takes advantage of the following libraries:
* [Control](https://python-control.readthedocs.io/en/0.9.3.post2/index.html);
* [Matplotlib](https://matplotlib.org);
* [NumPy](https://numpy.org);
* [PyYaml](https://pyyaml.org);
......
# Working path
WORKING_PATH: "/Users/nadir/Documents/MPAI-CAE/Workflow"
# Name of the Preservation Audio File (without extension)
# Use an empty string to plot filters frequency response
PRESERVATION_FILE_NAME: "sample12_W7N_R15C"
FILES_NAME: "sample12_W7N_R15C"
# PRESERVATION_FILE_NAME: "BornToDie"
# PRESERVATION_FILE_NAME: ""
# Equalisation standard used to record the tape.
......@@ -17,15 +16,6 @@ STANDARD_R: "CCIR"
# Reading tape speed [ips].
# Accepted values: (3.75, 7.5, 15, 30)
SPEED_R: 15
# Sampling frequency to be used for filters and impulse response [Hz].
# Accepted values: (44100, 48000, 96000)
# NOTE: Overwritten with input file one, if any.
FS: 96000
# Bits per sample to be used when saving the filter impulse response audio file.
# Accepted values: (8, 16, 24, 32)
# NOTE_1: Low values (8, 16) may introduce oscillations when exporting the filter impulse response in .wav format.
# NOTE_2: Overwritten with input file one, if any.
BPS: 24
# Enable plotting filter frequency responses
# Accepted values: (true, false)
PLOTS: false
\ No newline at end of file
control==0.9.3.post2
matplotlib==3.6.2
numpy==1.23.5
PyYAML==6.0
scipy==1.10.0
......@@ -9,12 +9,16 @@ It identifies and restore portions of the Preservation Audio File, providing:
- Editing List
"""
import array
import matplotlib.pyplot as plt
import numpy as np
import os
import shutil
import sys
import yaml
from argparse import ArgumentParser, RawTextHelpFormatter
from control import c2d, TransferFunction
from numpy import ndarray
from scipy.io import wavfile
from scipy.signal import freqs, freqz, tf2zpk, zpk2tf, lfilter
......@@ -28,8 +32,10 @@ __email__ = "nadir.dallapozza@unipd.it"
__status__ = "Production"
# Class for customizing Console Colors
class CC:
"""
Variables for customizing console colors
"""
PURPLE = '\033[95m'
CYAN = '\033[96m'
DARK_CYAN = '\033[36m'
......@@ -42,204 +48,254 @@ class CC:
END = '\033[0m'
def save_file(file):
if not os.path.exists(temp_path):
# Create directory
os.mkdir(temp_path)
print("temp directory '% s' created" % temp_path)
raf_path = os.path.join(temp_path, 'RestoredAudioFiles')
make_raf = False
if not os.path.exists(raf_path):
# Create directory
os.mkdir(raf_path)
make_raf = True
print("Restored Audio Files directory '% s' created" % raf_path)
else:
print((CC.PURPLE + "Restored Audio Files directory '% s' already exists!" + CC.END) % raf_path)
overwrite = input('Do you want to overwrite it? [y/n]: ')
if overwrite.casefold() == 'y':
# Overwrite directory
shutil.rmtree(raf_path)
os.mkdir(raf_path)
make_raf = True
print('Restored Audio Files directory overwritten')
elif overwrite.casefold() != 'n':
print(CC.RED + 'Unknown command, exiting' + CC.END)
quit(os.EX_USAGE)
if make_raf:
print("Saving Restored Audio File to: '%s' ..." % raf_path)
wavfile.write(os.path.join(raf_path, '1.wav'), FS, file)
if __name__ == '__main__':
# Read configuration file
config = object
try:
config = yaml.safe_load(open('config.yaml', 'r'))
if 'WORKING_PATH' not in config:
print(CC.RED + 'WORKING_PATH key not found in config.yaml!' + CC.END)
quit(os.EX_CONFIG)
if 'PRESERVATION_FILE_NAME' not in config:
print(CC.RED + 'PRESERVATION_FILE_NAME key not found in config.yaml!' + CC.END)
quit(os.EX_CONFIG)
if 'STANDARD_W' not in config:
print(CC.RED + 'STANDARD_W key not found in config.yaml!' + CC.END)
quit(os.EX_CONFIG)
if 'SPEED_W' not in config:
print(CC.RED + 'SPEED_W key not found in config.yaml!' + CC.END)
quit(os.EX_CONFIG)
if 'STANDARD_R' not in config:
print(CC.RED + 'STANDARD_R key not found in config.yaml!' + CC.END)
quit(os.EX_CONFIG)
if 'SPEED_R' not in config:
print(CC.RED + 'SPEED_R key not found in config.yaml!' + CC.END)
quit(os.EX_CONFIG)
if 'FS' not in config:
print(CC.RED + 'FS key not found in config.yaml!' + CC.END)
quit(os.EX_CONFIG)
if 'BPS' not in config:
print(CC.RED + 'BPS key not found in config.yaml!' + CC.END)
def get_arguments() -> tuple[str, str, str, float, str, float, bool]:
"""
Method to obtain arguments from config.yaml file or command line.
Default config.yaml, ignored if a command line argument is passed.
:return: tuple consisting of nine variables:
1) str specifying the working path;
2) str specifying the name of the Preservation files, which is key element to retrieve necessary files;
3) str specifying the equalization standard used when the tape was recorded;
4) float specifying the speed used when the tape was recorded;
5) str specifying the equalization standard used when the tape was read;
6) float specifying the speed used when the tape was read;
7) bool specifying if filter figures should be plotted.
"""
if len(sys.argv) > 1:
# Read from command line
parser = ArgumentParser(
prog="python3 tapeAudioRestoration.py",
formatter_class=RawTextHelpFormatter,
description="A tool that implements MPAI CAE-ARP Tape Audio Restoration Technical Specification.\n"
"By default, the configuration parameters are loaded from ./config.yaml file,\n"
"but, alternately, you can pass command line arguments to replace them."
)
parser.add_argument(
"-w",
"--working-path",
help="Specify the Working Path, where all input files are stored",
required=True
)
parser.add_argument(
"-f",
"--files-name",
help="Specify the name of the Preservation files (without extension)",
required=True
)
parser.add_argument(
"-ew",
"--equalization-w",
help="Specify the name of the equalization standard used when the tape was recorded",
required=True
)
parser.add_argument(
"-sw",
"--speed-w",
help="Specify the speed used when the tape was recorded",
required=True
)
parser.add_argument(
"-er",
"--equalization-r",
help="Specify the name of the equalization standard used when the tape was read",
required=True
)
parser.add_argument(
"-sr",
"--speed-r",
help="Specify the speed used when the tape was read",
required=True
)
parser.add_argument(
"-p",
"--plot-figures",
help="Specify if filter figures should be plotted [true, false].",
required=True
)
args = parser.parse_args()
working_path = args.working_path
files_name = args.files_name
standard_w = args.equalization_w
speed_w = float(args.speed_w)
standard_r = args.equalization_r
speed_r = float(args.speed_r)
plots = False
if args.plot_figures in ('true', 'True'):
plots = True
elif args.plot_figures not in ('false', 'False'):
print(CC.RED + 'Invalid PLOT input argument!' + CC.END)
quit(os.EX_CONFIG)
if 'PLOTS' not in config:
print(CC.RED + 'PLOTS key not found in config.yaml!' + CC.END)
else:
# Read configuration file
config = object
try:
config = yaml.safe_load(open('config.yaml', 'r'))
if 'WORKING_PATH' not in config:
print(CC.RED + 'WORKING_PATH key not found in config.yaml!' + CC.END)
quit(os.EX_CONFIG)
if 'FILES_NAME' not in config:
print(CC.RED + 'FILES_NAME key not found in config.yaml!' + CC.END)
quit(os.EX_CONFIG)
if 'STANDARD_W' not in config:
print(CC.RED + 'STANDARD_W key not found in config.yaml!' + CC.END)
quit(os.EX_CONFIG)
if 'SPEED_W' not in config:
print(CC.RED + 'SPEED_W key not found in config.yaml!' + CC.END)
quit(os.EX_CONFIG)
if 'STANDARD_R' not in config:
print(CC.RED + 'STANDARD_R key not found in config.yaml!' + CC.END)
quit(os.EX_CONFIG)
if 'SPEED_R' not in config:
print(CC.RED + 'SPEED_R key not found in config.yaml!' + CC.END)
quit(os.EX_CONFIG)
if 'PLOTS' not in config:
print(CC.RED + 'PLOTS key not found in config.yaml!' + CC.END)
quit(os.EX_CONFIG)
except FileNotFoundError:
print(CC.RED + 'config.yaml file not found!' + CC.END)
quit(os.EX_NOINPUT)
working_path = config['WORKING_PATH']
files_name = config['FILES_NAME']
standard_w = config['STANDARD_W']
speed_w = config['SPEED_W']
standard_r = config['STANDARD_R']
speed_r = config['SPEED_R']
plots = False
if config['PLOTS'] in (True, 'true', 'True'):
plots = True
elif config['PLOTS'] not in (False, 'false', 'False'):
print(CC.RED + 'Invalid PLOT input argument!' + CC.END)
quit(os.EX_CONFIG)
except FileNotFoundError:
print(CC.RED + 'config.yaml file not found!' + CC.END)
return working_path, files_name, standard_w, speed_w, standard_r, speed_r, plots
def check_input(working_path: str, files_name: str, standard_w: str, speed_w: float, standard_r: str, speed_r: float) -> tuple[str, str, str, str]:
"""
Method to check that passed arguments are correct and that the environment is conformant to the standard;
:param working_path: str representing the path where all files resulting from previous AIMs are stored,
:param files_name: str representing the Preservation files name, to identify the input directory,
:param standard_w: str specifying the equalization standard used when the tape was recorded,
:param speed_w: float specifying the speed used when the tape was recorded,
:param standard_r: str specifying the equalization standard used when the tape was read,
:param speed_r: float specifying the speed used when the tape was read.
:return: tuple consisting of three variables:
1) str representing the path where the Preservation Audio File is stored;
2) str representing the path where the files to be processed during the current execution are stored;
3) the operating standard_w;
4) the operating standard_r.
"""
# Check for working path existence
if not os.path.exists(working_path):
print(CC.RED + 'The specified WORKING_PATH is non-existent!' + CC.END)
quit(os.EX_CONFIG)
# Check for Preservation Audio File existence
audio_file = files_name + '.wav'
paf_path = os.path.join(working_path, 'PreservationAudioFile', audio_file)
if not os.path.exists(paf_path):
print(CC.RED + 'Preservation Audio File not found!' + CC.END)
quit(os.EX_NOINPUT)
# Check for temp directory existence
temp_path = os.path.join(working_path, 'temp')
if not os.path.exists(temp_path):
print(CC.RED + 'WORKING_PATH structure is not conformant!' + CC.END)
quit(os.EX_NOINPUT)
# Check for input directory existence
temp_path = os.path.join(temp_path, files_name)
if not os.path.exists(temp_path):
print(CC.RED + 'The specified FILES_NAME has no corresponding files!' + CC.END)
quit(os.EX_NOINPUT)
# config.yaml variables
WORKING_PATH = config['WORKING_PATH']
PRESERVATION_FILE_NAME = config['PRESERVATION_FILE_NAME']
STANDARD_W = config['STANDARD_W']
SPEED_W = config['SPEED_W']
STANDARD_R = config['STANDARD_R']
SPEED_R = config['SPEED_R']
FS = config['FS']
BPS = config['BPS']
PLOTS = config['PLOTS']
# Output path
temp_path = os.path.join(WORKING_PATH, 'temp', PRESERVATION_FILE_NAME)
# Configuration parameters check
# Recording tape speed check
if SPEED_W != 3.75 and SPEED_W != 7.5 and SPEED_W != 15 and SPEED_W != 30:
if speed_w != 3.75 and speed_w != 7.5 and speed_w != 15 and speed_w != 30:
print(
CC.RED + 'Incorrect SPEED_W: \'' + str(SPEED_W) + '\'. Accepted value are: 3.75, 7.5, 15, 30.' + CC.END
CC.RED + 'Incorrect SPEED_W: \'' + str(speed_w) + '\'. Accepted value are: 3.75, 7.5, 15, 30.' + CC.END
)
quit(os.EX_CONFIG)
# Reading tape speed check.
if SPEED_R != 3.75 and SPEED_R != 7.5 and SPEED_R != 15 and SPEED_R != 30:
if speed_r != 3.75 and speed_r != 7.5 and speed_r != 15 and speed_r != 30:
print(
CC.RED + 'Incorrect SPEED_R: \'' + str(SPEED_R) + '\'. Accepted value are: 3.75, 7.5, 15, 30.' + CC.END
CC.RED + 'Incorrect SPEED_R: \'' + str(speed_r) + '\'. Accepted value are: 3.75, 7.5, 15, 30.' + CC.END
)
quit(os.EX_CONFIG)
# Equalization standard check.
if not (STANDARD_R == 'CCIR' or STANDARD_R == 'NAB'):
if not (standard_r == 'CCIR' or standard_r == 'NAB'):
print(
CC.RED + 'Incorrect STANDARD_R: \'' + STANDARD_R + '\'. Accepted values are: CCIR, NAB.' + CC.END
CC.RED + 'Incorrect STANDARD_R: \'' + standard_r + '\'. Accepted values are: CCIR, NAB.' + CC.END
)
quit(os.EX_CONFIG)
if not (STANDARD_W == 'CCIR' or STANDARD_W == 'NAB'):
if not (standard_w == 'CCIR' or standard_w == 'NAB'):
print(
CC.RED + 'Incorrect STANDARD_W: \'' + STANDARD_W + '\'. Accepted values are: CCIR, NAB.' + CC.END
CC.RED + 'Incorrect STANDARD_W: \'' + standard_w + '\'. Accepted values are: CCIR, NAB.' + CC.END
)
quit(os.EX_CONFIG)
# CCIR speed check.
if STANDARD_W == 'CCIR' and SPEED_W == 3.75:
if standard_w == 'CCIR' and speed_w == 3.75:
print(
CC.YELLOW + 'CCIR is undefined at 3.75 ips. Recording equalization standard is set to NAB.' + CC.END
)
STANDARD_W = 'NAB'
if STANDARD_R == 'CCIR' and SPEED_R == 3.75:
standard_w = 'NAB'
if standard_r == 'CCIR' and speed_r == 3.75:
print(
CC.YELLOW + 'CCIR is undefined at 3.75 ips. Reading equalization standard is set to NAB.' + CC.END
)
STANDARD_R = 'NAB'
standard_r = 'NAB'
# NAB speed check.
if STANDARD_W == 'NAB' and SPEED_W == 30:
if standard_w == 'NAB' and speed_w == 30:
print(
CC.YELLOW + 'NAB is undefined at 30 ips. Recording equalization standard is set to CCIR.' + CC.END
)
STANDARD_W = 'CCIR'
if STANDARD_R == 'NAB' and SPEED_R == 30:
standard_w = 'CCIR'
if standard_r == 'NAB' and speed_r == 30:
print(
CC.YELLOW + 'NAB is undefined at 30 ips. Reading equalization standard is set to CCIR.' + CC.END
)
STANDARD_R = 'CCIR'
# Sampling frequency check.
if len(PRESERVATION_FILE_NAME) == 0:
if FS != 44100 and FS != 48000 and FS != 96000:
print(
CC.RED + 'Incorrect FS: \'' + str(FS) + '\'. Accepted values are: 44100, 48000, 96000.' + CC.END
)
quit(os.EX_CONFIG)
# Bits per sample check.
if BPS != 8 and BPS != 16 and BPS != 24 and BPS != 32:
print(
CC.RED + 'Incorrect BPS: \'' + str(BPS) + '\'. Accepted values are: 8, 16, 24, 32.' + CC.END
)
quit(os.EX_CONFIG)
# Display input parameters
print('Input parameters:')
print(' WORKING_PATH: ' + WORKING_PATH)
print(' PRESERVATION_FILE_NAME: ' + PRESERVATION_FILE_NAME)
print(' STANDARD_W: ' + STANDARD_W)
print(' SPEED_W: ' + str(SPEED_W) + ' ips')
print(' STANDARD_R: ' + STANDARD_R)
print(' SPEED_R: ' + str(SPEED_R) + ' ips')
if len(PRESERVATION_FILE_NAME) == 0:
print(' FS: ' + str(FS) + ' Hz')
print(' BPS: ' + str(BPS) + '\n')
# Preservation Audio File check
paf = []
if len(PRESERVATION_FILE_NAME) > 0:
audio_file = PRESERVATION_FILE_NAME + '.wav'
paf_path = os.path.join(WORKING_PATH, 'PreservationAudioFile', audio_file)
try:
print("Opening '%s'..." % paf_path)
FS, paf = wavfile.read(paf_path)
print('Preservation Audio File opened!')
print('Overwritten parameters:')
print(' FS: ' + str(FS) + ' Hz\n')
except OSError:
print(CC.RED + "Preservation Audio File not found!" + CC.END)
quit(os.EX_NOINPUT)
# Equalization standard time constants
standard_r = 'CCIR'
return paf_path, temp_path, standard_w, standard_r
def get_correction_filter(standard_w: str, speed_w: float, standard_r: str, speed_r: float, fs: int) -> tuple[array, array, float, int]:
"""
Method to establish correct filter transfer function coefficients;
:param standard_w: str specifying the equalization standard used when the tape was recorded,
:param speed_w: float specifying the speed used when the tape was recorded,
:param standard_r: str specifying the equalization standard used when the tape was read,
:param speed_r: float specifying the speed used when the tape was read,
:param fs: float specifying the sampling frequency.
:return: tuple consisting of four variables:
1) array representing the filter numerator coefficients;
2) array representing the filter denominator coefficients;
3) float specifying the operating sampling frequency;
4) int informing about the case number.
"""
# CCIR time constants.
t2_30 = 17.5 * 10**(-6) # time constant CCIR_30
t2_15 = 35 * 10**(-6) # time constant CCIR_15
t2_7 = 70 * 10**(-6) # time constant CCIR_7.5
t2_30 = 17.5 * 10 ** (-6) # time constant CCIR_30
t2_15 = 35 * 10 ** (-6) # time constant CCIR_15
t2_7 = 70 * 10 ** (-6) # time constant CCIR_7.5
# NAB time constants.
t3 = 3180 * 10**(-6)
t4_15 = 50 * 10**(-6) # time constant NAB_15
t4_7 = 50 * 10**(-6) # time constant NAB_7.5
t4_3 = 90 * 10**(-6) # time constant NAB_3.75
# Decision stage
a = [] # Filter numerator
b = [] # Filter denominator
case = 0 # Reference case number
t3 = 3180 * 10 ** (-6)
t4_15 = 50 * 10 ** (-6) # time constant NAB_15
t4_7 = 50 * 10 ** (-6) # time constant NAB_7.5
t4_3 = 90 * 10 ** (-6) # time constant NAB_3.75
a = []
b = []
case = -1
# This section will establish which time constants must be modified to obtain the desired equalisation standard.
if STANDARD_W == 'CCIR':
if SPEED_W == 30:
if STANDARD_R == 'NAB':
if standard_w == 'CCIR':
if speed_w == 30:
if standard_r == 'NAB':
# Case 1
if SPEED_R == 15:
FS = 2 * FS # Doubling the sampling frequency
if speed_r == 15:
fs = 2 * fs # Doubling the sampling frequency
# Correction filter: NABw15_mod + CCIRr30
# - NAB constants divided by 2
t3 = t3 / 2
......@@ -252,8 +308,8 @@ if __name__ == '__main__':
# Plot information
case = 1
# Case 2
elif SPEED_R == 7.5:
FS = 4 * FS # Quadrupling the sampling frequency
elif speed_r == 7.5:
fs = 4 * fs # Quadrupling the sampling frequency
# Correction filter: NABw7.5_mod + CCIRr30
# - NAB constants divided by 4
t3 = t3 / 4
......@@ -266,8 +322,8 @@ if __name__ == '__main__':
# Plot information
case = 2
# Case 3
else: # SPEED_R == 3.75
FS = 8 * FS # Multiplying by 8 the sampling frequency
else: # speed_r == 3.75
fs = 8 * fs # Multiplying by 8 the sampling frequency
# Correction filter: NABw3.75_mod + CCIRr30
# - NAB constants divided by 8
t3 = t3 / 8
......@@ -279,26 +335,26 @@ if __name__ == '__main__':
b = [t3 * t4, t3, 0]
# Plot information
case = 3
else: # STANDARD_R == 'CCIR'
else: # standard_r == 'CCIR'
# Case 31
if SPEED_R == 30:
if speed_r == 30:
print('Reference case: 31')
print(CC.GREEN + 'Nothing to do!' + CC.END)
quit(os.EX_OK)
# Case 15
elif SPEED_R == 15:
FS = 2 * FS # Doubling sampling frequency
elif speed_r == 15:
fs = 2 * fs # Doubling sampling frequency
# Plot information
case = 15
# Case 16
else: # SPEED_R == 7.5
FS = 4 * FS # Quadrupling the sampling frequency
else: # speed_r == 7.5
fs = 4 * fs # Quadrupling the sampling frequency
# Plot information
case = 16
elif SPEED_W == 15:
if STANDARD_R == 'NAB':
elif speed_w == 15:
if standard_r == 'NAB':
# Case 28
if SPEED_R == 15:
if speed_r == 15:
# No speed change
# Correction filter: NABw15 + CCIRr15
# - NAB_15 constants not altered
......@@ -311,8 +367,8 @@ if __name__ == '__main__':
# Plot information
case = 28
# Case 6
elif SPEED_R == 7.5:
FS = 2 * FS # Doubling the sampling frequency
elif speed_r == 7.5:
fs = 2 * fs # Doubling the sampling frequency
# Correction filter: NABw7.5_mod + CCIRr15
# - NAB constants divided by 2
t3 = t3 / 2
......@@ -325,8 +381,8 @@ if __name__ == '__main__':
# Plot information
case = 6
# Case 7
else: # SPEED_R == 3.75
FS = 4 * FS # Quadrupling the sampling frequency
else: # speed_r == 3.75
fs = 4 * fs # Quadrupling the sampling frequency
# Correction filter: NABw3.75_mod + CCIRr15
# - NAB constants divided by 4
t3 = t3 / 4
......@@ -338,27 +394,27 @@ if __name__ == '__main__':
b = [t3 * t4, t3, 0]
# Plot information
case = 7
else: # STANDARD_R == 'CCIR'
else: # standard_r == 'CCIR'
# Case 19
if SPEED_R == 30:
FS = FS / 2 # Halving the sampling frequency
if speed_r == 30:
fs = fs / 2 # Halving the sampling frequency
# Plot information
case = 19
# Case 33
elif SPEED_R == 15:
elif speed_r == 15:
print('Reference case: 33')
print(CC.GREEN + 'Nothing to do!' + CC.END)
quit(os.EX_OK)
# Case 20
else: # SPEED_R == 7.5
FS = 2 * FS # Doubling the sampling frequency
else: # speed_r == 7.5
fs = 2 * fs # Doubling the sampling frequency
# Plot information
case = 20
else: # SPEED_W == 7.5
if STANDARD_R == 'NAB':
else: # speed_w == 7.5
if standard_r == 'NAB':
# Case 10
if SPEED_R == 15:
FS = FS / 2 # Halving the sampling frequency
if speed_r == 15:
fs = fs / 2 # Halving the sampling frequency
# Correction filter: NABw15_mod + CCIRr7.5
# - NAB constants multiplied by 2
t3 = t3 * 2
......@@ -371,7 +427,7 @@ if __name__ == '__main__':
# Plot information
case = 10
# Case 30
elif SPEED_R == 7.5:
elif speed_r == 7.5:
# No speed change
# Correction filter: NABw7.5 + CCIRr7.5
# - NAB_7.5 constant not altered
......@@ -384,8 +440,8 @@ if __name__ == '__main__':
# Plot information
case = 30
# Case 11
else: # SPEED_R == 3.75
FS = 2 * FS # Doubling the sampling frequency
else: # speed_r == 3.75
fs = 2 * fs # Doubling the sampling frequency
# Correction filter: NABw3.75_mod + CCIRr7.5
# - NAB constants divided by 2
t3 = t3 / 2
......@@ -397,33 +453,33 @@ if __name__ == '__main__':
b = [t3 * t4, t3, 0]
# Plot information
case = 11
else: # STANDARD_R == 'CCIR'
else: # standard_r == 'CCIR'
# Case 23
if SPEED_R == 30:
FS = FS / 4 # Quartering the sampling frequency
if speed_r == 30:
fs = fs / 4 # Quartering the sampling frequency
# Plot information
case = 23
# Case 24
elif SPEED_R == 15:
FS = FS / 2 # Halving the sampling frequency
elif speed_r == 15:
fs = fs / 2 # Halving the sampling frequency
# Plot information
case = 24
# Case 35
else: # SPEED_R == 7.5
else: # speed_r == 7.5
print('Reference case: 35')
print(CC.GREEN + 'Nothing to do!' + CC.END)
quit(os.EX_OK)
else: # STANDARD_W == 'NAB'
if SPEED_W == 15:
if STANDARD_R == 'NAB':
else: # standard_w == 'NAB'
if speed_w == 15:
if standard_r == 'NAB':
# Case 32
if SPEED_R == 15:
if speed_r == 15:
print('Reference case: 32')
print(CC.GREEN + 'Nothing to do!' + CC.END)
quit(os.EX_OK)
# Case 17
elif SPEED_R == 7.5:
FS = 2 * FS # Doubling the sampling frequency
elif speed_r == 7.5:
fs = 2 * fs # Doubling the sampling frequency
# Correction filter: NABw7.5_mod + NABr15
# - NABw constants divided by 2
t3_mod = t3 / 2
......@@ -436,8 +492,8 @@ if __name__ == '__main__':
# Plot information
case = 17
# Case 18
else: # SPEED_R == 3.75
FS = 4 * FS # Quadrupling the sampling frequency
else: # speed_r == 3.75
fs = 4 * fs # Quadrupling the sampling frequency
# Correction filter: NABw3.75_mod + NABr15
# - NAB constants divided by 4
t3_mod = t3 / 4
......@@ -449,10 +505,10 @@ if __name__ == '__main__':
b = [t3 * t3_mod * t4_mod, t3_mod * (t3 + t4_mod), t3_mod]
# Plot information
case = 18
else: # STANDARD_R == 'CCIR'
else: # standard_r == 'CCIR'
# Case 4
if SPEED_R == 30:
FS = FS / 2 # Halving the sampling frequency
if speed_r == 30:
fs = fs / 2 # Halving the sampling frequency
# Correction filter: CCIRw30_mod + NABr15
# - CCIR_30 constant multiplied by 2
t2 = t2_30 * 2
......@@ -464,7 +520,7 @@ if __name__ == '__main__':
# Plot information
case = 4
# Case 27
elif SPEED_R == 15:
elif speed_r == 15:
# No speed change
# Correction filter: CCIRw15 + NABr15
# - CCIR_15 constant not altered
......@@ -477,8 +533,8 @@ if __name__ == '__main__':
# Plot information
case = 27
# Case 5
else: # SPEED_R == 7.5
FS = FS * 2 # Doubling the sampling frequency
else: # speed_r == 7.5
fs = fs * 2 # Doubling the sampling frequency
# Correction filter: CCIRw7.5_mod + NABr15
# - CCIR_7.5 constant divided by 2
t2 = t2_7 / 2
......@@ -489,11 +545,11 @@ if __name__ == '__main__':
b = [t2 * t3, t2 + t3, 1]
# Plot information
case = 5
elif SPEED_W == 7.5:
if STANDARD_R == 'NAB':
elif speed_w == 7.5:
if standard_r == 'NAB':
# Case 21
if SPEED_R == 15:
FS = FS / 2 # Halving the sampling frequency
if speed_r == 15:
fs = fs / 2 # Halving the sampling frequency
# Correction filter: NABw15_mod + NABr7.5
# - NABw constants multiplied by 2
t3_mod = t3 * 2
......@@ -506,13 +562,13 @@ if __name__ == '__main__':
# Plot information
case = 21
# Case 34
elif SPEED_R == 7.5:
elif speed_r == 7.5:
print('Reference case: 34')
print(CC.GREEN + 'Nothing to do!' + CC.END)
quit(os.EX_OK)
# Case 22
else: # SPEED_R == 3.75
FS = 2 * FS # Doubling the sampling frequency
else: # speed_r == 3.75
fs = 2 * fs # Doubling the sampling frequency
# Correction filter: NABw3.75_mod + NABr7.5
# - NABw constants divided by 2
t3_mod = t3 / 2
......@@ -524,10 +580,10 @@ if __name__ == '__main__':
b = [t3 * t3_mod * t4_mod, t3_mod * (t3 + t4_mod), t3_mod]
# Plot information
case = 22
else: # STANDARD_R == 'CCIR'
else: # standard_r == 'CCIR'
# Case 8
if SPEED_R == 30:
FS = FS / 4 # Quartering the sampling frequency
if speed_r == 30:
fs = fs / 4 # Quartering the sampling frequency
# Correction filter: CCIRw30_mod + NABr7.5
# - CCIR_30 constant multiplied by 4
t2 = t2_30 * 4
......@@ -539,8 +595,8 @@ if __name__ == '__main__':
# Plot information
case = 8
# Case 9
elif SPEED_R == 15:
FS = FS / 2 # Halving the sampling frequency
elif speed_r == 15:
fs = fs / 2 # Halving the sampling frequency
# Correction filter: CCIRw15_mod + NABr7.5
# - CCIR_15 constant multiplied by 2
t2 = t2_15 * 2
......@@ -552,7 +608,7 @@ if __name__ == '__main__':
# Plot information
case = 9
# Case 29
else: # SPEED_R == 7.5
else: # speed_r == 7.5
# No speed change
# Correction filter: CCIRw7.5 + NABr7.5
# - CCIR_7.5 constant not altered
......@@ -564,11 +620,11 @@ if __name__ == '__main__':
b = [t2 * t3, t2 + t3, 1]
# Plot information
case = 29
else: # SPEED_W == 3.75
if STANDARD_R == 'NAB':
else: # speed_w == 3.75
if standard_r == 'NAB':
# Case 25
if SPEED_R == 15:
FS = FS / 4 # Quartering the sampling frequency
if speed_r == 15:
fs = fs / 4 # Quartering the sampling frequency
# Correction filter: NABw15_mod + NABr3.75
# - NAB constants multiplied by 4
t3_mod = t3 * 4
......@@ -581,8 +637,8 @@ if __name__ == '__main__':
# Plot information
case = 25
# Case 26
elif SPEED_R == 7.5:
FS = FS / 2 # Halving the sampling frequency
elif speed_r == 7.5:
fs = fs / 2 # Halving the sampling frequency
# Correction filter: NABw7.5_mod + NABr3.75
# - NAB constants multiplied by 2
t3_mod = t3 * 2
......@@ -595,14 +651,14 @@ if __name__ == '__main__':
# Plot information
case = 26
# Case 36
else: # SPEED_R == 3.75
else: # speed_r == 3.75
print('Reference case: 36')
print(CC.GREEN + 'Nothing to do!' + CC.END)
quit(os.EX_OK)
else: # STANDARD_R == 'CCIR'
else: # standard_r == 'CCIR'
# Case 12
if SPEED_R == 30:
FS = FS / 8 # Dividing by 8 the sampling frequency
if speed_r == 30:
fs = fs / 8 # Dividing by 8 the sampling frequency
# Correction filter: CCIRw30_mod + NABr3.75
# - CCIR_30 constant multiplied by 8
t2 = t2_30 * 8
......@@ -614,8 +670,8 @@ if __name__ == '__main__':
# Plot information
case = 12
# Case 13
elif SPEED_R == 15:
FS = FS / 4 # Quartering the sampling frequency
elif speed_r == 15:
fs = fs / 4 # Quartering the sampling frequency
# Correction filter: CCIRw15_mod + NABr3.75
# - CCIR_15 constant multiplied by 4
t2 = t2_15 * 4
......@@ -627,8 +683,8 @@ if __name__ == '__main__':
# Plot information
case = 13
# Case 14
else: # SPEED_R == 7.5
FS = FS / 2 # Halving the sampling frequency
else: # speed_r == 7.5
fs = fs / 2 # Halving the sampling frequency
# Correction filter: CCIRw7.5_mod + NABr3.75
# - CCIR_7.5 constant multiplied by 2
t2 = t2_7 * 2
......@@ -639,124 +695,205 @@ if __name__ == '__main__':
b = [t2 * t3, t2 + t3, 1]
# Plot information
case = 14
return a, b, fs, case
def correction(a: array, b: array, paf: ndarray, fs: int, plots: bool) -> ndarray:
"""
Apply a correction filter to a Preservation Audio File;
:param a: array of coefficients, specifying the numerator of filter transfer function,
:param b: array of coefficients, specifying in the denominator of filter transfer function,
:param paf: ndarray specifying the raw audio data of the Preservation Audio File,
:param fs: int specifying the operational sampling frequency,
:param plots: bool specifying if filter plots should be displayed.
:return: the corrected audio as a Restored Audio File.
"""
# Analog transfer function
h_a = TransferFunction(a, b)
# Analog frequency vector
w_a = np.logspace(np.log10(1), np.log10(fs * np.pi), 5000)
if plots:
# Analog filter frequency response
w_t, h_t = freqs(a, b, worN=w_a)
# Plot analog graph
# - Magnitude
plt.subplot(2, 1, 1)
plt.semilogx(w_t / (2 * np.pi), 20 * np.log10(abs(h_t)))
plt.xlim([1, 24000])
plt.xlabel('Frequency')
plt.ylim([-40, 40])
plt.ylabel('Amplitude response [dB]')
plt.grid(True)
# - Phase
plt.subplot(2, 1, 2)
plt.semilogx(w_t / (2 * np.pi), np.angle(h_t) * 180 / np.pi)
plt.xlim([1, 24000])
plt.xlabel('Frequency')
plt.ylabel('Phase [deg]')
plt.grid(True)
# Digital transfer function through bilinear digitisation
h_d = c2d(h_a, 1 / fs, 'bilinear')
num_d = h_d.num[0][0] # Inspect Hd.num to see why [0][0] is needed...
den_d = h_d.den[0][0] # Same story here
# Digital frequency vector
w_d = np.logspace(np.log10(1), np.log10(fs / 2), 5000)
if plots:
# Digital filter frequency response
w_n, h_n = freqz(num_d, den_d, worN=w_d, fs=fs)
# Plot digital graph
# - Magnitude
plt.subplot(2, 1, 1)
plt.semilogx(w_n, 20 * np.log10(abs(h_n)), '--')
plt.legend(['Analog', 'Bilinear'])
# - Phase
plt.subplot(2, 1, 2)
plt.semilogx(w_n, np.angle(h_n) * 180 / np.pi, '--')
plt.legend(['Analog', 'Bilinear'])
# Pole check
# New pole frequency
pole_frequency = 2
# Move to zero-pole representation
z, p, k = tf2zpk(a, b)
# Check if the function presents a pole at 0 Hz
for i in range(len(p)):
if p[i] == 0:
# Replace pole
p[i] = -pole_frequency * 2 * np.pi
print('\n' + CC.PURPLE + 'Pole at 0 Hz replaced!' + CC.END)
# Back to transfer function representation
ap, bp = zpk2tf(z, p, k)
# Analog transfer function
hp_a = TransferFunction(ap, bp)
if plots:
# Analog filter frequency response
wp_t, hp_t = freqs(ap, bp, worN=w_a)
# Plot analog graph
# - Magnitude
plt.subplot(2, 1, 1)
plt.semilogx(wp_t / (2 * np.pi), 20 * np.log10(abs(hp_t)))
# - Phase
plt.subplot(2, 1, 2)
plt.semilogx(wp_t / (2 * np.pi), np.angle(hp_t) * 180 / np.pi)
# Digital transfer function through bilinear digitisation
hp_d = c2d(hp_a, 1 / fs, 'bilinear')
num_d = hp_d.num[0][0]
den_d = hp_d.den[0][0]
if plots:
# Digital filter frequency response
wp_n, hp_n = freqz(num_d, den_d, worN=w_d, fs=fs)
# Plot digital graph
# - Magnitude
plt.subplot(2, 1, 1)
plt.semilogx(wp_n, 20 * np.log10(abs(hp_n)), '--')
plt.legend(['Analog', 'Bilinear', 'Pole - Analog', 'Pole - Digital'])
# - Phase
plt.subplot(2, 1, 2)
plt.semilogx(wp_n, np.angle(hp_n) * 180 / np.pi, '--')
plt.legend(['Analog', 'Bilinear', 'Pole - Analog', 'Pole - Digital'])
if plots:
plt.show()
print('\nFiltering Preservation Audio File...')
# Filter Preservation Audio File
raf = lfilter(num_d, den_d, paf, axis=0)
# Again, wavfile.write() is stupid, and you must cast everything to not destroy your ears...
raf = np.rint(raf).astype(paf.dtype)
return raf
def save_file(file: ndarray, fs: int, temp_path: str, name: str):
"""
Save an audio file to the given path with name 1.wav;
:param file: ndarray specifying the raw audio data,
:param fs: int specifying the operational sampling frequency,
:param temp_path: str specifying the path where the file will be saved,
:param name: str specifying the file name.
:return: exit codes corresponding to the execution status.
"""
raf_path = os.path.join(temp_path, 'RestoredAudioFiles')
make_raf = False
if not os.path.exists(raf_path):
# Create directory
os.mkdir(raf_path)
make_raf = True
print("Restored Audio Files directory '% s' created" % raf_path)
else:
print((CC.PURPLE + "Restored Audio Files directory '% s' already exists!" + CC.END) % raf_path)
overwrite = input('Do you want to overwrite it? [y/n]: ')
if overwrite.casefold() == 'y':
# Overwrite directory
shutil.rmtree(raf_path)
os.mkdir(raf_path)
make_raf = True
print('Restored Audio Files directory overwritten')
elif overwrite.casefold() != 'n':
print(CC.RED + 'Unknown command, exiting' + CC.END)
quit(os.EX_USAGE)
if make_raf:
print("Saving Restored Audio File to: '%s' ..." % raf_path)
wavfile.write(os.path.join(raf_path, name + '.wav'), fs, file)
def main():
"""
Main execution method.
:return: exit codes corresponding to the execution status.
"""
print(CC.BOLD + "\nWelcome to ARP Tape Audio Restoration!" + CC.END)
print("You are using Python version: " + sys.version)
# Get the input from config.yaml or command line
working_path, files_name, standard_w, speed_w, standard_r, speed_r, plots = get_arguments()
# Check if input is correct
paf_path, temp_path, standard_w, standard_r = check_input(working_path, files_name, standard_w, speed_w, standard_r, speed_r)
# Display input parameters
print('\nInput parameters:')
print(' WORKING_PATH: ' + working_path)
print(' FILES_NAME: ' + files_name)
print(' STANDARD_W: ' + standard_w)
print(' SPEED_W: ' + str(speed_w) + ' ips')
print(' STANDARD_R: ' + standard_r)
print(' SPEED_R: ' + str(speed_r) + ' ips')
# Preservation Audio File check
print("Opening '%s'..." % paf_path)
fs, paf = wavfile.read(paf_path)
print('Preservation Audio File opened!')
print(' FS: ' + str(fs) + ' Hz\n')
# Decision stage
a, b, fs, case = get_correction_filter(standard_w, speed_w, standard_r, speed_r, fs)
# Casting FS to int because wavfile.write() is stupid
FS = round(FS)
fs = round(fs)
print('Reference case: ' + str(case))
print('Operational sampling frequency: ' + str(FS) + ' Hz.')
# Correction filter
print('Operational FS: ' + str(fs) + ' Hz.')
# Not all cases present a correction filter!
# Correction phase
if len(a) != 0:
# Analog transfer function
H_a = TransferFunction(a, b)
# Analog frequency vector
w_a = np.logspace(np.log10(1), np.log10(FS * np.pi), 5000)
if PLOTS:
# Analog filter frequency response
w_t, h_t = freqs(a, b, worN=w_a)
# Plot analog graph
# - Magnitude
plt.subplot(2, 1, 1)
plt.semilogx(w_t / (2 * np.pi), 20 * np.log10(abs(h_t)))
plt.xlim([1, 24000])
plt.xlabel('Frequency')
plt.ylim([-40, 40])
plt.ylabel('Amplitude response [dB]')
plt.grid(True)
# - Phase
plt.subplot(2, 1, 2)
plt.semilogx(w_t / (2 * np.pi), np.angle(h_t) * 180 / np.pi)
plt.xlim([1, 24000])
plt.xlabel('Frequency')
plt.ylabel('Phase [deg]')
plt.grid(True)
# Digital transfer function through bilinear digitisation
H_d = c2d(H_a, 1/FS, 'bilinear')
num_d = H_d.num[0][0] # Inspect Hd.num to see why [0][0] is needed...
den_d = H_d.den[0][0] # Same story here
# Digital frequency vector
w_d = np.logspace(np.log10(1), np.log10(FS / 2), 5000)
if PLOTS:
# Digital filter frequency response
w_n, h_n = freqz(num_d, den_d, worN=w_d, fs=FS)
# Plot digital graph
# - Magnitude
plt.subplot(2, 1, 1)
plt.semilogx(w_n, 20 * np.log10(abs(h_n)), '--')
plt.legend(['Analog', 'Bilinear'])
# - Phase
plt.subplot(2, 1, 2)
plt.semilogx(w_n, np.angle(h_n) * 180 / np.pi, '--')
plt.legend(['Analog', 'Bilinear'])
# Pole check
# New pole frequency
pole_frequency = 2
# Move to zero-pole representation
z, p, k = tf2zpk(a, b)
# Check if the function presents a pole at 0 Hz
for i in range(len(p)):
if p[i] == 0:
# Replace pole
p[i] = -pole_frequency * 2 * np.pi
print('\n' + CC.PURPLE + 'Pole at 0 Hz replaced!' + CC.END)
# Back to transfer function representation
ap, bp = zpk2tf(z, p, k)
# Analog transfer function
Hp_a = TransferFunction(ap, bp)
if PLOTS:
# Analog filter frequency response
wp_t, hp_t = freqs(ap, bp, worN=w_a)
# Plot analog graph
# - Magnitude
plt.subplot(2, 1, 1)
plt.semilogx(wp_t / (2 * np.pi), 20 * np.log10(abs(hp_t)))
# - Phase
plt.subplot(2, 1, 2)
plt.semilogx(wp_t / (2 * np.pi), np.angle(hp_t) * 180 / np.pi)
# Digital transfer function through bilinear digitisation
Hp_d = c2d(Hp_a, 1 / FS, 'bilinear')
num_d = Hp_d.num[0][0]
den_d = Hp_d.den[0][0]
if PLOTS:
# Digital filter frequency response
wp_n, hp_n = freqz(num_d, den_d, worN=w_d, fs=FS)
# Plot digital graph
# - Magnitude
plt.subplot(2, 1, 1)
plt.semilogx(wp_n, 20 * np.log10(abs(hp_n)), '--')
plt.legend(['Analog', 'Bilinear', 'Pole - Analog', 'Pole - Digital'])
# - Phase
plt.subplot(2, 1, 2)
plt.semilogx(wp_n, np.angle(hp_n) * 180 / np.pi, '--')
plt.legend(['Analog', 'Bilinear', 'Pole - Analog', 'Pole - Digital'])
if PLOTS:
plt.show()
if len(PRESERVATION_FILE_NAME) > 0:
print('\nFiltering Preservation Audio File...')
# Filter Preservation Audio File
raf = lfilter(num_d, den_d, paf, axis=0)
# Again, wavfile.write() is stupid, and you must cast everything to not destroy your ears...
raf = np.rint(raf).astype(paf.dtype)
# Save Restored Audio File
save_file(raf)
# Not all cases present a correction filter!
raf = correction(paf, a, b, fs, plots)
save_file(raf, fs, temp_path, '1')
else:
# Save Restored Audio File
save_file(paf)
# Just save Restored Audio File, but with modified fs
save_file(paf, fs, temp_path, '1')
# End
print(CC.GREEN + CC.BOLD + "Success!" + CC.END + '\n')
if __name__ == '__main__':
main()
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment