Commit 56c7ce04 authored by Matteo's avatar Matteo
Browse files

refactor script with mpai-cae-arp io

parent 53f5c7b0
clean: clean-build clean-pyc clean-test
clean-build:
rm -fr build/
rm -fr dist/
rm -fr .eggs/
find . -name '*.egg-info' -exec rm -fr {} +
find . -name '*.egg' -exec rm -fr {} +
clean-pyc:
find . -name '*.pyc' -exec rm -f {} +
find . -name '*.pyo' -exec rm -f {} +
find . -name '*~' -exec rm -f {} +
find . -name '__pycache__' -exec rm -fr {} +
clean-test:
rm -fr .tox/
rm -f .coverage
rm -fr htmlcov/
rm -fr .pytest_cache
\ No newline at end of file
...@@ -14,13 +14,14 @@ import numpy as np ...@@ -14,13 +14,14 @@ import numpy as np
import os import os
import shutil import shutil
import sys import sys
import yaml
from argparse import ArgumentParser, RawTextHelpFormatter from argparse import ArgumentParser, RawTextHelpFormatter
from control import c2d, TransferFunction from control import c2d, TransferFunction
from numpy import ndarray from numpy import ndarray
from scipy.io import wavfile from scipy.io import wavfile
from scipy.signal import tf2zpk, zpk2tf, lfilter from scipy.signal import tf2zpk, zpk2tf, lfilter
from mpai_cae_arp.io import Color, Style, prettify
from mpai_cae_arp.io import Color, Style, pprint
from mpai_cae_arp.files import File, FileType
__author__ = "Nadir Dalla Pozza" __author__ = "Nadir Dalla Pozza"
__copyright__ = "Copyright 2022, Audio Innova S.r.l." __copyright__ = "Copyright 2022, Audio Innova S.r.l."
...@@ -32,22 +33,6 @@ __email__ = "nadir.dallapozza@unipd.it" ...@@ -32,22 +33,6 @@ __email__ = "nadir.dallapozza@unipd.it"
__status__ = "Production" __status__ = "Production"
class CC:
"""
Variables for customizing console colors
"""
PURPLE = '\033[95m'
CYAN = '\033[96m'
DARK_CYAN = '\033[36m'
BLUE = '\033[94m'
GREEN = '\033[92m'
YELLOW = '\033[93m'
RED = '\033[91m'
BOLD = '\033[1m'
UNDERLINE = '\033[4m'
END = '\033[0m'
def get_arguments() -> tuple[str, str, str, float, str, float]: def get_arguments() -> tuple[str, str, str, float, str, float]:
""" """
Method to obtain arguments from config/args.yaml file or command line. Method to obtain arguments from config/args.yaml file or command line.
...@@ -115,36 +100,22 @@ def get_arguments() -> tuple[str, str, str, float, str, float]: ...@@ -115,36 +100,22 @@ def get_arguments() -> tuple[str, str, str, float, str, float]:
speed_r = float(args.speed_r) speed_r = float(args.speed_r)
else: else:
# Read configuration file # Read configuration file
config = object arg_names = ['WORKING_PATH', 'FILES_NAME', 'STANDARD_W', 'SPEED_W', 'STANDARD_R', 'SPEED_R']
config = {}
try: try:
config = yaml.safe_load(open('config/args.yaml', 'r')) config = File('config/args.yaml', FileType.YAML).get_content()
if 'WORKING_PATH' not in config: for argument in arg_names:
print(prettify('WORKING_PATH key not found in config/args.yaml!', color=Color.RED)) if argument not in config:
quit(os.EX_CONFIG) pprint(f'{argument} key not found in config/args.yaml!', color=Color.RED)
if 'FILES_NAME' not in config: quit(os.EX_CONFIG)
print(CC.RED + 'FILES_NAME key not found in config/args.yaml!' + CC.END)
quit(os.EX_CONFIG)
if 'STANDARD_W' not in config:
print(CC.RED + 'STANDARD_W key not found in config/args.yaml!' + CC.END)
quit(os.EX_CONFIG)
if 'SPEED_W' not in config:
print(CC.RED + 'SPEED_W key not found in config/args.yaml!' + CC.END)
quit(os.EX_CONFIG)
if 'STANDARD_R' not in config:
print(CC.RED + 'STANDARD_R key not found in config/args.yaml!' + CC.END)
quit(os.EX_CONFIG)
if 'SPEED_R' not in config:
print(CC.RED + 'SPEED_R key not found in config/args.yaml!' + CC.END)
quit(os.EX_CONFIG)
except FileNotFoundError: except FileNotFoundError:
print(CC.RED + 'config/args.yaml file not found!' + CC.END) pprint('config/args.yaml file not found!', color=Color.RED)
quit(os.EX_NOINPUT) quit(os.EX_NOINPUT)
working_path = config['WORKING_PATH']
files_name = config['FILES_NAME'] # unpack config dictionary
standard_w = config['STANDARD_W'] working_path, files_name, standard_w, speed_w, standard_r, speed_r = map(config.get, arg_names)
speed_w = config['SPEED_W']
standard_r = config['STANDARD_R']
speed_r = config['SPEED_R']
return working_path, files_name, standard_w, speed_w, standard_r, speed_r return working_path, files_name, standard_w, speed_w, standard_r, speed_r
...@@ -166,80 +137,88 @@ def check_input(working_path: str, files_name: str, standard_w: str, speed_w: fl ...@@ -166,80 +137,88 @@ def check_input(working_path: str, files_name: str, standard_w: str, speed_w: fl
# Check for working path existence # Check for working path existence
if not os.path.exists(working_path): if not os.path.exists(working_path):
print(CC.RED + 'The specified WORKING_PATH is non-existent!' + CC.END) pprint('The specified WORKING_PATH is non-existent!', color=Color.RED)
quit(os.EX_CONFIG) quit(os.EX_CONFIG)
# Check for Preservation Audio File existence # Check for Preservation Audio File existence
audio_file = files_name + '.wav' audio_file = files_name + '.wav'
paf_path = os.path.join(working_path, 'PreservationAudioFile', audio_file) paf_path = os.path.join(working_path, 'PreservationAudioFile', audio_file)
if not os.path.exists(paf_path): if not os.path.exists(paf_path):
print(CC.RED + 'Preservation Audio File not found!' + CC.END) pprint('Preservation Audio File not found!', color=Color.RED)
quit(os.EX_NOINPUT) quit(os.EX_NOINPUT)
# Check for temp directory existence # Check for temp directory existence
temp_path = os.path.join(working_path, 'temp') temp_path = os.path.join(working_path, 'temp')
if not os.path.exists(temp_path): if not os.path.exists(temp_path):
print(CC.RED + 'WORKING_PATH structure is not conformant!' + CC.END) pprint('WORKING_PATH structure is not conformant!', color=Color.RED)
quit(os.EX_NOINPUT) quit(os.EX_NOINPUT)
# Check for input directory existence # Check for input directory existence
temp_path = os.path.join(temp_path, files_name) temp_path = os.path.join(temp_path, files_name)
if not os.path.exists(temp_path): if not os.path.exists(temp_path):
print(CC.RED + 'The specified FILES_NAME has no corresponding files!' + CC.END) pprint('WORKING_PATH structure is not conformant!', color=Color.RED)
quit(os.EX_NOINPUT) quit(os.EX_NOINPUT)
# Configuration parameters check # Configuration parameters check
# Recording tape speed check # Recording tape speed check
if speed_w != 3.75 and speed_w != 7.5 and speed_w != 15 and speed_w != 30: if speed_w != 3.75 and speed_w != 7.5 and speed_w != 15 and speed_w != 30:
print( pprint(
CC.RED + 'Incorrect SPEED_W: \'' + str(speed_w) + '\'. Accepted value are: 3.75, 7.5, 15, 30.' + CC.END f"Incorrect SPEED_W: '{speed_w}'. Accepted value are: 3.75, 7.5, 15, 30.",
color=Color.RED
) )
quit(os.EX_CONFIG) quit(os.EX_CONFIG)
# Reading tape speed check. # Reading tape speed check.
if speed_r != 3.75 and speed_r != 7.5 and speed_r != 15 and speed_r != 30: if speed_r != 3.75 and speed_r != 7.5 and speed_r != 15 and speed_r != 30:
print( pprint(
CC.RED + 'Incorrect SPEED_R: \'' + str(speed_r) + '\'. Accepted value are: 3.75, 7.5, 15, 30.' + CC.END f"Incorrect SPEED_R: '{speed_r}'. Accepted value are: 3.75, 7.5, 15, 30.",
color=Color.RED
) )
quit(os.EX_CONFIG) quit(os.EX_CONFIG)
# Equalization standard check. # Equalization standard check.
if not (standard_r == 'CCIR' or standard_r == 'NAB'): if not (standard_r == 'CCIR' or standard_r == 'NAB'):
print( pprint(
CC.RED + 'Incorrect STANDARD_R: \'' + standard_r + '\'. Accepted values are: CCIR, NAB.' + CC.END f"Incorrect STANDARD_R: '{standard_r}'. Accepted values are: CCIR, NAB.",
color=Color.RED
) )
quit(os.EX_CONFIG) quit(os.EX_CONFIG)
if not (standard_w == 'CCIR' or standard_w == 'NAB'): if not (standard_w == 'CCIR' or standard_w == 'NAB'):
print( pprint(
CC.RED + 'Incorrect STANDARD_W: \'' + standard_w + '\'. Accepted values are: CCIR, NAB.' + CC.END f"Incorrect STANDARD_W: '{standard_w}'. Accepted values are: CCIR, NAB.",
color=Color.RED
) )
quit(os.EX_CONFIG) quit(os.EX_CONFIG)
# CCIR speed check. # CCIR speed check.
if standard_w == 'CCIR' and speed_w == 3.75: if (standard_w == 'CCIR' and speed_w == 3.75):
print( pprint(
CC.YELLOW + 'CCIR is undefined at 3.75 ips. Recording equalization standard is set to NAB.' + CC.END 'CCIR is undefined at 3.75 ips. Recording equalization standard is set to NAB.',
color=Color.YELLOW
) )
standard_w = 'NAB' standard_w = 'NAB'
if standard_r == 'CCIR' and speed_r == 3.75: if standard_r == 'CCIR' and speed_r == 3.75:
print( pprint(
CC.YELLOW + 'CCIR is undefined at 3.75 ips. Reading equalization standard is set to NAB.' + CC.END 'CCIR is undefined at 3.75 ips. Reading equalization standard is set to NAB.',
color=Color.YELLOW
) )
standard_r = 'NAB' standard_r = 'NAB'
# NAB speed check. # NAB speed check.
if standard_w == 'NAB' and speed_w == 30: if standard_w == 'NAB' and speed_w == 30:
print( pprint(
CC.YELLOW + 'NAB is undefined at 30 ips. Recording equalization standard is set to CCIR.' + CC.END 'NAB is undefined at 30 ips. Recording equalization standard is set to CCIR.',
color=Color.YELLOW
) )
standard_w = 'CCIR' standard_w = 'CCIR'
if standard_r == 'NAB' and speed_r == 30: if standard_r == 'NAB' and speed_r == 30:
print( pprint(
CC.YELLOW + 'NAB is undefined at 30 ips. Reading equalization standard is set to CCIR.' + CC.END 'NAB is undefined at 30 ips. Reading equalization standard is set to CCIR.',
color=Color.YELLOW
) )
standard_r = 'CCIR' standard_r = 'CCIR'
return paf_path, temp_path, standard_w, standard_r return paf_path, temp_path, standard_w, standard_r
def get_correction_filter(standard_w: str, speed_w: float, standard_r: str, speed_r: float, fs: int) -> tuple[array, array, float, int]: def get_correction_filter(standard_w: str, speed_w: float, standard_r: str, speed_r: float, fs: int) -> tuple[list, list, float, int]:
""" """
Method to establish correct filter transfer function coefficients; Method to establish correct filter transfer function coefficients;
:param standard_w: str specifying the equalization standard used when the tape was recorded, :param standard_w: str specifying the equalization standard used when the tape was recorded,
...@@ -381,7 +360,7 @@ def get_correction_filter(standard_w: str, speed_w: float, standard_r: str, spee ...@@ -381,7 +360,7 @@ def get_correction_filter(standard_w: str, speed_w: float, standard_r: str, spee
# Case 33 # Case 33
elif speed_r == 15: elif speed_r == 15:
print('Reference case: 33') print('Reference case: 33')
print(CC.GREEN + 'Nothing to do!' + CC.END) pprint('Nothing to do!', color=Color.GREEN)
quit(os.EX_OK) quit(os.EX_OK)
# Case 20 # Case 20
else: # speed_r == 7.5 else: # speed_r == 7.5
...@@ -445,7 +424,7 @@ def get_correction_filter(standard_w: str, speed_w: float, standard_r: str, spee ...@@ -445,7 +424,7 @@ def get_correction_filter(standard_w: str, speed_w: float, standard_r: str, spee
# Case 35 # Case 35
else: # speed_r == 7.5 else: # speed_r == 7.5
print('Reference case: 35') print('Reference case: 35')
print(CC.GREEN + 'Nothing to do!' + CC.END) pprint('Nothing to do!', color=Color.GREEN)
quit(os.EX_OK) quit(os.EX_OK)
else: # standard_w == 'NAB' else: # standard_w == 'NAB'
if speed_w == 15: if speed_w == 15:
...@@ -453,7 +432,7 @@ def get_correction_filter(standard_w: str, speed_w: float, standard_r: str, spee ...@@ -453,7 +432,7 @@ def get_correction_filter(standard_w: str, speed_w: float, standard_r: str, spee
# Case 32 # Case 32
if speed_r == 15: if speed_r == 15:
print('Reference case: 32') print('Reference case: 32')
print(CC.GREEN + 'Nothing to do!' + CC.END) pprint('Nothing to do!', color=Color.GREEN)
quit(os.EX_OK) quit(os.EX_OK)
# Case 17 # Case 17
elif speed_r == 7.5: elif speed_r == 7.5:
...@@ -542,7 +521,7 @@ def get_correction_filter(standard_w: str, speed_w: float, standard_r: str, spee ...@@ -542,7 +521,7 @@ def get_correction_filter(standard_w: str, speed_w: float, standard_r: str, spee
# Case 34 # Case 34
elif speed_r == 7.5: elif speed_r == 7.5:
print('Reference case: 34') print('Reference case: 34')
print(CC.GREEN + 'Nothing to do!' + CC.END) pprint('Nothing to do!', color=Color.GREEN)
quit(os.EX_OK) quit(os.EX_OK)
# Case 22 # Case 22
else: # speed_r == 3.75 else: # speed_r == 3.75
...@@ -631,7 +610,7 @@ def get_correction_filter(standard_w: str, speed_w: float, standard_r: str, spee ...@@ -631,7 +610,7 @@ def get_correction_filter(standard_w: str, speed_w: float, standard_r: str, spee
# Case 36 # Case 36
else: # speed_r == 3.75 else: # speed_r == 3.75
print('Reference case: 36') print('Reference case: 36')
print(CC.GREEN + 'Nothing to do!' + CC.END) pprint('Nothing to do!', color=Color.GREEN)
quit(os.EX_OK) quit(os.EX_OK)
else: # standard_r == 'CCIR' else: # standard_r == 'CCIR'
# Case 12 # Case 12
...@@ -705,7 +684,7 @@ def correction(a: array, b: array, paf: ndarray, fs: int) -> ndarray: ...@@ -705,7 +684,7 @@ def correction(a: array, b: array, paf: ndarray, fs: int) -> ndarray:
if p[i] == 0: if p[i] == 0:
# Replace pole # Replace pole
p[i] = -pole_frequency * 2 * np.pi p[i] = -pole_frequency * 2 * np.pi
print('\n' + CC.PURPLE + 'Pole at 0 Hz replaced!' + CC.END) pprint('\nPole at 0 Hz replaced!', color=Color.PURPLE)
# Back to transfer function representation # Back to transfer function representation
ap, bp = zpk2tf(z, p, k) ap, bp = zpk2tf(z, p, k)
...@@ -742,7 +721,7 @@ def save_file(file: ndarray, fs: int, temp_path: str, name: str): ...@@ -742,7 +721,7 @@ def save_file(file: ndarray, fs: int, temp_path: str, name: str):
make_raf = True make_raf = True
print("Restored Audio Files directory '% s' created" % raf_path) print("Restored Audio Files directory '% s' created" % raf_path)
else: else:
print((CC.PURPLE + "Restored Audio Files directory '% s' already exists!" + CC.END) % raf_path) pprint(f"Restored Audio Files directory '{raf_path}' already exists!", color=Color.PURPLE)
overwrite = input('Do you want to overwrite it? [y/n]: ') overwrite = input('Do you want to overwrite it? [y/n]: ')
if overwrite.casefold() == 'y': if overwrite.casefold() == 'y':
# Overwrite directory # Overwrite directory
...@@ -751,7 +730,7 @@ def save_file(file: ndarray, fs: int, temp_path: str, name: str): ...@@ -751,7 +730,7 @@ def save_file(file: ndarray, fs: int, temp_path: str, name: str):
make_raf = True make_raf = True
print('Restored Audio Files directory overwritten') print('Restored Audio Files directory overwritten')
elif overwrite.casefold() != 'n': elif overwrite.casefold() != 'n':
print(CC.RED + 'Unknown command, exiting' + CC.END) pprint('Unknown command, exiting', color=Color.RED)
quit(os.EX_USAGE) quit(os.EX_USAGE)
if make_raf: if make_raf:
print("Saving Restored Audio File to: '%s' ..." % raf_path) print("Saving Restored Audio File to: '%s' ..." % raf_path)
...@@ -764,7 +743,7 @@ def main(): ...@@ -764,7 +743,7 @@ def main():
:return: exit codes corresponding to the execution status. :return: exit codes corresponding to the execution status.
""" """
print(CC.BOLD + "\nWelcome to ARP Tape Audio Restoration!" + CC.END) pprint("\nWelcome to ARP Tape Audio Restoration!", styles=[Style.BOLD])
print("You are using Python version: " + sys.version) print("You are using Python version: " + sys.version)
# Get the input from config/args.yaml or command line # Get the input from config/args.yaml or command line
...@@ -805,7 +784,7 @@ def main(): ...@@ -805,7 +784,7 @@ def main():
save_file(paf, fs, temp_path, '1') save_file(paf, fs, temp_path, '1')
# End # End
print(CC.GREEN + CC.BOLD + "Success!" + CC.END + '\n') pprint("Success!\n", color=Color.GREEN, styles=[Style.BOLD])
if __name__ == '__main__': if __name__ == '__main__':
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment