#!/usr/bin/env python """ MPAI CAE-ARP Tape Audio Restoration. Implements MPAI CAE-ARP Tape Audio Restoration Technical Specification. It identifies and restore portions of the Preservation Audio File, providing: - Restored Audio Files; - Editing List """ import array import matplotlib.pyplot as plt import numpy as np import os import shutil import sys import yaml from argparse import ArgumentParser, RawTextHelpFormatter from control import c2d, TransferFunction from numpy import ndarray from scipy.io import wavfile from scipy.signal import freqs, freqz, tf2zpk, zpk2tf, lfilter __author__ = "Nadir Dalla Pozza" __copyright__ = "Copyright 2022, Audio Innova S.r.l." __credits__ = ["Niccolò Pretto", "Nadir Dalla Pozza", "Sergio Canazza"] __license__ = "GPL v3.0" __version__ = "1.0.1" __maintainer__ = "Nadir Dalla Pozza" __email__ = "nadir.dallapozza@unipd.it" __status__ = "Production" class CC: """ Variables for customizing console colors """ PURPLE = '\033[95m' CYAN = '\033[96m' DARK_CYAN = '\033[36m' BLUE = '\033[94m' GREEN = '\033[92m' YELLOW = '\033[93m' RED = '\033[91m' BOLD = '\033[1m' UNDERLINE = '\033[4m' END = '\033[0m' def get_arguments() -> tuple[str, str, str, float, str, float, bool]: """ Method to obtain arguments from config.yaml file or command line. Default config.yaml, ignored if a command line argument is passed. :return: tuple consisting of nine variables: 1) str specifying the working path; 2) str specifying the name of the Preservation files, which is key element to retrieve necessary files; 3) str specifying the equalization standard used when the tape was recorded; 4) float specifying the speed used when the tape was recorded; 5) str specifying the equalization standard used when the tape was read; 6) float specifying the speed used when the tape was read; 7) bool specifying if filter figures should be plotted. """ if len(sys.argv) > 1: # Read from command line parser = ArgumentParser( prog="python3 tapeAudioRestoration.py", formatter_class=RawTextHelpFormatter, description="A tool that implements MPAI CAE-ARP Tape Audio Restoration Technical Specification.\n" "By default, the configuration parameters are loaded from ./config.yaml file,\n" "but, alternately, you can pass command line arguments to replace them." ) parser.add_argument( "-w", "--working-path", help="Specify the Working Path, where all input files are stored", required=True ) parser.add_argument( "-f", "--files-name", help="Specify the name of the Preservation files (without extension)", required=True ) parser.add_argument( "-ew", "--equalization-w", help="Specify the name of the equalization standard used when the tape was recorded", required=True ) parser.add_argument( "-sw", "--speed-w", help="Specify the speed used when the tape was recorded", required=True ) parser.add_argument( "-er", "--equalization-r", help="Specify the name of the equalization standard used when the tape was read", required=True ) parser.add_argument( "-sr", "--speed-r", help="Specify the speed used when the tape was read", required=True ) parser.add_argument( "-p", "--plot-figures", help="Specify if filter figures should be plotted [true, false].", required=True ) args = parser.parse_args() working_path = args.working_path files_name = args.files_name standard_w = args.equalization_w speed_w = float(args.speed_w) standard_r = args.equalization_r speed_r = float(args.speed_r) plots = False if args.plot_figures in ('true', 'True'): plots = True elif args.plot_figures not in ('false', 'False'): print(CC.RED + 'Invalid PLOT input argument!' + CC.END) quit(os.EX_CONFIG) else: # Read configuration file config = object try: config = yaml.safe_load(open('config.yaml', 'r')) if 'WORKING_PATH' not in config: print(CC.RED + 'WORKING_PATH key not found in config.yaml!' + CC.END) quit(os.EX_CONFIG) if 'FILES_NAME' not in config: print(CC.RED + 'FILES_NAME key not found in config.yaml!' + CC.END) quit(os.EX_CONFIG) if 'STANDARD_W' not in config: print(CC.RED + 'STANDARD_W key not found in config.yaml!' + CC.END) quit(os.EX_CONFIG) if 'SPEED_W' not in config: print(CC.RED + 'SPEED_W key not found in config.yaml!' + CC.END) quit(os.EX_CONFIG) if 'STANDARD_R' not in config: print(CC.RED + 'STANDARD_R key not found in config.yaml!' + CC.END) quit(os.EX_CONFIG) if 'SPEED_R' not in config: print(CC.RED + 'SPEED_R key not found in config.yaml!' + CC.END) quit(os.EX_CONFIG) if 'PLOTS' not in config: print(CC.RED + 'PLOTS key not found in config.yaml!' + CC.END) quit(os.EX_CONFIG) except FileNotFoundError: print(CC.RED + 'config.yaml file not found!' + CC.END) quit(os.EX_NOINPUT) working_path = config['WORKING_PATH'] files_name = config['FILES_NAME'] standard_w = config['STANDARD_W'] speed_w = config['SPEED_W'] standard_r = config['STANDARD_R'] speed_r = config['SPEED_R'] plots = False if config['PLOTS'] in (True, 'true', 'True'): plots = True elif config['PLOTS'] not in (False, 'false', 'False'): print(CC.RED + 'Invalid PLOT input argument!' + CC.END) quit(os.EX_CONFIG) return working_path, files_name, standard_w, speed_w, standard_r, speed_r, plots def check_input(working_path: str, files_name: str, standard_w: str, speed_w: float, standard_r: str, speed_r: float) -> tuple[str, str, str, str]: """ Method to check that passed arguments are correct and that the environment is conformant to the standard; :param working_path: str representing the path where all files resulting from previous AIMs are stored, :param files_name: str representing the Preservation files name, to identify the input directory, :param standard_w: str specifying the equalization standard used when the tape was recorded, :param speed_w: float specifying the speed used when the tape was recorded, :param standard_r: str specifying the equalization standard used when the tape was read, :param speed_r: float specifying the speed used when the tape was read. :return: tuple consisting of three variables: 1) str representing the path where the Preservation Audio File is stored; 2) str representing the path where the files to be processed during the current execution are stored; 3) the operating standard_w; 4) the operating standard_r. """ # Check for working path existence if not os.path.exists(working_path): print(CC.RED + 'The specified WORKING_PATH is non-existent!' + CC.END) quit(os.EX_CONFIG) # Check for Preservation Audio File existence audio_file = files_name + '.wav' paf_path = os.path.join(working_path, 'PreservationAudioFile', audio_file) if not os.path.exists(paf_path): print(CC.RED + 'Preservation Audio File not found!' + CC.END) quit(os.EX_NOINPUT) # Check for temp directory existence temp_path = os.path.join(working_path, 'temp') if not os.path.exists(temp_path): print(CC.RED + 'WORKING_PATH structure is not conformant!' + CC.END) quit(os.EX_NOINPUT) # Check for input directory existence temp_path = os.path.join(temp_path, files_name) if not os.path.exists(temp_path): print(CC.RED + 'The specified FILES_NAME has no corresponding files!' + CC.END) quit(os.EX_NOINPUT) # Configuration parameters check # Recording tape speed check if speed_w != 3.75 and speed_w != 7.5 and speed_w != 15 and speed_w != 30: print( CC.RED + 'Incorrect SPEED_W: \'' + str(speed_w) + '\'. Accepted value are: 3.75, 7.5, 15, 30.' + CC.END ) quit(os.EX_CONFIG) # Reading tape speed check. if speed_r != 3.75 and speed_r != 7.5 and speed_r != 15 and speed_r != 30: print( CC.RED + 'Incorrect SPEED_R: \'' + str(speed_r) + '\'. Accepted value are: 3.75, 7.5, 15, 30.' + CC.END ) quit(os.EX_CONFIG) # Equalization standard check. if not (standard_r == 'CCIR' or standard_r == 'NAB'): print( CC.RED + 'Incorrect STANDARD_R: \'' + standard_r + '\'. Accepted values are: CCIR, NAB.' + CC.END ) quit(os.EX_CONFIG) if not (standard_w == 'CCIR' or standard_w == 'NAB'): print( CC.RED + 'Incorrect STANDARD_W: \'' + standard_w + '\'. Accepted values are: CCIR, NAB.' + CC.END ) quit(os.EX_CONFIG) # CCIR speed check. if standard_w == 'CCIR' and speed_w == 3.75: print( CC.YELLOW + 'CCIR is undefined at 3.75 ips. Recording equalization standard is set to NAB.' + CC.END ) standard_w = 'NAB' if standard_r == 'CCIR' and speed_r == 3.75: print( CC.YELLOW + 'CCIR is undefined at 3.75 ips. Reading equalization standard is set to NAB.' + CC.END ) standard_r = 'NAB' # NAB speed check. if standard_w == 'NAB' and speed_w == 30: print( CC.YELLOW + 'NAB is undefined at 30 ips. Recording equalization standard is set to CCIR.' + CC.END ) standard_w = 'CCIR' if standard_r == 'NAB' and speed_r == 30: print( CC.YELLOW + 'NAB is undefined at 30 ips. Reading equalization standard is set to CCIR.' + CC.END ) standard_r = 'CCIR' return paf_path, temp_path, standard_w, standard_r def get_correction_filter(standard_w: str, speed_w: float, standard_r: str, speed_r: float, fs: int) -> tuple[array, array, float, int]: """ Method to establish correct filter transfer function coefficients; :param standard_w: str specifying the equalization standard used when the tape was recorded, :param speed_w: float specifying the speed used when the tape was recorded, :param standard_r: str specifying the equalization standard used when the tape was read, :param speed_r: float specifying the speed used when the tape was read, :param fs: float specifying the sampling frequency. :return: tuple consisting of four variables: 1) array representing the filter numerator coefficients; 2) array representing the filter denominator coefficients; 3) float specifying the operating sampling frequency; 4) int informing about the case number. """ # CCIR time constants. t2_30 = 17.5 * 10 ** (-6) # time constant CCIR_30 t2_15 = 35 * 10 ** (-6) # time constant CCIR_15 t2_7 = 70 * 10 ** (-6) # time constant CCIR_7.5 # NAB time constants. t3 = 3180 * 10 ** (-6) t4_15 = 50 * 10 ** (-6) # time constant NAB_15 t4_7 = 50 * 10 ** (-6) # time constant NAB_7.5 t4_3 = 90 * 10 ** (-6) # time constant NAB_3.75 a = [] b = [] case = -1 # This section will establish which time constants must be modified to obtain the desired equalisation standard. if standard_w == 'CCIR': if speed_w == 30: if standard_r == 'NAB': # Case 1 if speed_r == 15: fs = 2 * fs # Doubling the sampling frequency # Correction filter: NABw15_mod + CCIRr30 # - NAB constants divided by 2 t3 = t3 / 2 t4 = t4_15 / 2 # - CCIR_30 constant not altered t2 = t2_30 # Filter coefficients a = [t2 * t3, t2 + t3, 1] b = [t3 * t4, t3, 0] # Plot information case = 1 # Case 2 elif speed_r == 7.5: fs = 4 * fs # Quadrupling the sampling frequency # Correction filter: NABw7.5_mod + CCIRr30 # - NAB constants divided by 4 t3 = t3 / 4 t4 = t4_7 / 4 # - CCIR_30 constant not altered t2 = t2_30 # Filter coefficients a = [t2 * t3, t2 + t3, 1] b = [t3 * t4, t3, 0] # Plot information case = 2 # Case 3 else: # speed_r == 3.75 fs = 8 * fs # Multiplying by 8 the sampling frequency # Correction filter: NABw3.75_mod + CCIRr30 # - NAB constants divided by 8 t3 = t3 / 8 t4 = t4_3 / 8 # - CCIR_30 constant not altered t2 = t2_30 # Filter coefficients a = [t2 * t3, t2 + t3, 1] b = [t3 * t4, t3, 0] # Plot information case = 3 else: # standard_r == 'CCIR' # Case 31 if speed_r == 30: print('Reference case: 31') print(CC.GREEN + 'Nothing to do!' + CC.END) quit(os.EX_OK) # Case 15 elif speed_r == 15: fs = 2 * fs # Doubling sampling frequency # Plot information case = 15 # Case 16 else: # speed_r == 7.5 fs = 4 * fs # Quadrupling the sampling frequency # Plot information case = 16 elif speed_w == 15: if standard_r == 'NAB': # Case 28 if speed_r == 15: # No speed change # Correction filter: NABw15 + CCIRr15 # - NAB_15 constants not altered t4 = t4_15 # - CCIR_30 constant not altered t2 = t2_15 # Filter coefficients a = [t2 * t3, t2 + t3, 1] b = [t3 * t4, t3, 0] # Plot information case = 28 # Case 6 elif speed_r == 7.5: fs = 2 * fs # Doubling the sampling frequency # Correction filter: NABw7.5_mod + CCIRr15 # - NAB constants divided by 2 t3 = t3 / 2 t4 = t4_7 / 2 # - CCIR_15 constant not altered t2 = t2_15 # Filter coefficients a = [t2 * t3, t2 + t3, 1] b = [t3 * t4, t3, 0] # Plot information case = 6 # Case 7 else: # speed_r == 3.75 fs = 4 * fs # Quadrupling the sampling frequency # Correction filter: NABw3.75_mod + CCIRr15 # - NAB constants divided by 4 t3 = t3 / 4 t4 = t4_3 / 4 # - CCIR_15 constant not altered t2 = t2_15 # Filter coefficients a = [t2 * t3, t2 + t3, 1] b = [t3 * t4, t3, 0] # Plot information case = 7 else: # standard_r == 'CCIR' # Case 19 if speed_r == 30: fs = fs / 2 # Halving the sampling frequency # Plot information case = 19 # Case 33 elif speed_r == 15: print('Reference case: 33') print(CC.GREEN + 'Nothing to do!' + CC.END) quit(os.EX_OK) # Case 20 else: # speed_r == 7.5 fs = 2 * fs # Doubling the sampling frequency # Plot information case = 20 else: # speed_w == 7.5 if standard_r == 'NAB': # Case 10 if speed_r == 15: fs = fs / 2 # Halving the sampling frequency # Correction filter: NABw15_mod + CCIRr7.5 # - NAB constants multiplied by 2 t3 = t3 * 2 t4 = t4_15 * 2 # - CCIR_7.5 constant not altered t2 = t2_7 # Filter coefficients a = [t2 * t3, t2 + t3, 1] b = [t3 * t4, t3, 0] # Plot information case = 10 # Case 30 elif speed_r == 7.5: # No speed change # Correction filter: NABw7.5 + CCIRr7.5 # - NAB_7.5 constant not altered t4 = t4_7 # - CCIR_7.5 constant not altered t2 = t2_7 # Filter coefficients a = [t2 * t3, t2 + t3, 1] b = [t3 * t4, t3, 0] # Plot information case = 30 # Case 11 else: # speed_r == 3.75 fs = 2 * fs # Doubling the sampling frequency # Correction filter: NABw3.75_mod + CCIRr7.5 # - NAB constants divided by 2 t3 = t3 / 2 t4 = t4_3 / 2 # - CCIR_7.5 constant not altered t2 = t2_7 # Filter coefficients a = [t2 * t3, t2 + t3, 1] b = [t3 * t4, t3, 0] # Plot information case = 11 else: # standard_r == 'CCIR' # Case 23 if speed_r == 30: fs = fs / 4 # Quartering the sampling frequency # Plot information case = 23 # Case 24 elif speed_r == 15: fs = fs / 2 # Halving the sampling frequency # Plot information case = 24 # Case 35 else: # speed_r == 7.5 print('Reference case: 35') print(CC.GREEN + 'Nothing to do!' + CC.END) quit(os.EX_OK) else: # standard_w == 'NAB' if speed_w == 15: if standard_r == 'NAB': # Case 32 if speed_r == 15: print('Reference case: 32') print(CC.GREEN + 'Nothing to do!' + CC.END) quit(os.EX_OK) # Case 17 elif speed_r == 7.5: fs = 2 * fs # Doubling the sampling frequency # Correction filter: NABw7.5_mod + NABr15 # - NABw constants divided by 2 t3_mod = t3 / 2 t4_mod = t4_7 / 2 # - NABr constant not altered t4 = t4_15 # Filter coefficients a = [t3 * t3_mod * t4, t3 * (t3_mod + t4), t3] b = [t3 * t3_mod * t4_mod, t3_mod * (t3 + t4_mod), t3_mod] # Plot information case = 17 # Case 18 else: # speed_r == 3.75 fs = 4 * fs # Quadrupling the sampling frequency # Correction filter: NABw3.75_mod + NABr15 # - NAB constants divided by 4 t3_mod = t3 / 4 t4_mod = t4_3 / 4 # - NABr constant not altered t4 = t4_15 # Filter coefficients a = [t3 * t3_mod * t4, t3 * (t3_mod + t4), t3] b = [t3 * t3_mod * t4_mod, t3_mod * (t3 + t4_mod), t3_mod] # Plot information case = 18 else: # standard_r == 'CCIR' # Case 4 if speed_r == 30: fs = fs / 2 # Halving the sampling frequency # Correction filter: CCIRw30_mod + NABr15 # - CCIR_30 constant multiplied by 2 t2 = t2_30 * 2 # - NAB_15 constant not altered t4 = t4_15 # Filter coefficients a = [t3 * t4, t3, 0] b = [t2 * t3, t2 + t3, 1] # Plot information case = 4 # Case 27 elif speed_r == 15: # No speed change # Correction filter: CCIRw15 + NABr15 # - CCIR_15 constant not altered t2 = t2_15 # - NAB_15 constant not altered t4 = t4_15 # Filter coefficients a = [t3 * t4, t3, 0] b = [t2 * t3, t2 + t3, 1] # Plot information case = 27 # Case 5 else: # speed_r == 7.5 fs = fs * 2 # Doubling the sampling frequency # Correction filter: CCIRw7.5_mod + NABr15 # - CCIR_7.5 constant divided by 2 t2 = t2_7 / 2 # - NAB_15 constant not altered t4 = t4_15 # Filter coefficients a = [t3 * t4, t3, 0] b = [t2 * t3, t2 + t3, 1] # Plot information case = 5 elif speed_w == 7.5: if standard_r == 'NAB': # Case 21 if speed_r == 15: fs = fs / 2 # Halving the sampling frequency # Correction filter: NABw15_mod + NABr7.5 # - NABw constants multiplied by 2 t3_mod = t3 * 2 t4_mod = t4_15 * 2 # - NABr constant not altered t4 = t4_7 # Filter coefficients a = [t3 * t3_mod * t4, t3 * (t3_mod + t4), t3] b = [t3 * t3_mod * t4_mod, t3_mod * (t3 + t4_mod), t3_mod] # Plot information case = 21 # Case 34 elif speed_r == 7.5: print('Reference case: 34') print(CC.GREEN + 'Nothing to do!' + CC.END) quit(os.EX_OK) # Case 22 else: # speed_r == 3.75 fs = 2 * fs # Doubling the sampling frequency # Correction filter: NABw3.75_mod + NABr7.5 # - NABw constants divided by 2 t3_mod = t3 / 2 t4_mod = t4_3 / 2 # - NABr constant not altered t4 = t4_7 # Filter coefficients a = [t3 * t3_mod * t4, t3 * (t3_mod + t4), t3] b = [t3 * t3_mod * t4_mod, t3_mod * (t3 + t4_mod), t3_mod] # Plot information case = 22 else: # standard_r == 'CCIR' # Case 8 if speed_r == 30: fs = fs / 4 # Quartering the sampling frequency # Correction filter: CCIRw30_mod + NABr7.5 # - CCIR_30 constant multiplied by 4 t2 = t2_30 * 4 # - NAB_7.5 constant not altered t4 = t4_7 # Filter coefficients a = [t3 * t4, t3, 0] b = [t2 * t3, t2 + t3, 1] # Plot information case = 8 # Case 9 elif speed_r == 15: fs = fs / 2 # Halving the sampling frequency # Correction filter: CCIRw15_mod + NABr7.5 # - CCIR_15 constant multiplied by 2 t2 = t2_15 * 2 # - NAB_7.5 constant not altered t4 = t4_7 # Filter coefficients a = [t3 * t4, t3, 0] b = [t2 * t3, t2 + t3, 1] # Plot information case = 9 # Case 29 else: # speed_r == 7.5 # No speed change # Correction filter: CCIRw7.5 + NABr7.5 # - CCIR_7.5 constant not altered t2 = t2_7 # - NAB_7.5 constant not altered t4 = t4_7 # Filter coefficients a = [t3 * t4, t3, 0] b = [t2 * t3, t2 + t3, 1] # Plot information case = 29 else: # speed_w == 3.75 if standard_r == 'NAB': # Case 25 if speed_r == 15: fs = fs / 4 # Quartering the sampling frequency # Correction filter: NABw15_mod + NABr3.75 # - NAB constants multiplied by 4 t3_mod = t3 * 4 t4_mod = t4_15 * 4 # - NABr constant not altered t4 = t4_3 # Filter coefficients a = [t3 * t3_mod * t4, t3 * (t3_mod + t4), t3] b = [t3 * t3_mod * t4_mod, t3_mod * (t3 + t4_mod), t3_mod] # Plot information case = 25 # Case 26 elif speed_r == 7.5: fs = fs / 2 # Halving the sampling frequency # Correction filter: NABw7.5_mod + NABr3.75 # - NAB constants multiplied by 2 t3_mod = t3 * 2 t4_mod = t4_7 * 2 # - NABr constant not altered t4 = t4_3 # Filter coefficients a = [t3 * t3_mod * t4, t3 * (t3_mod + t4), t3] b = [t3 * t3_mod * t4_mod, t3_mod * (t3 + t4_mod), t3_mod] # Plot information case = 26 # Case 36 else: # speed_r == 3.75 print('Reference case: 36') print(CC.GREEN + 'Nothing to do!' + CC.END) quit(os.EX_OK) else: # standard_r == 'CCIR' # Case 12 if speed_r == 30: fs = fs / 8 # Dividing by 8 the sampling frequency # Correction filter: CCIRw30_mod + NABr3.75 # - CCIR_30 constant multiplied by 8 t2 = t2_30 * 8 # - NAB_3.75 constant not altered t4 = t4_3 # Filter coefficients a = [t3 * t4, t3, 0] b = [t2 * t3, t2 + t3, 1] # Plot information case = 12 # Case 13 elif speed_r == 15: fs = fs / 4 # Quartering the sampling frequency # Correction filter: CCIRw15_mod + NABr3.75 # - CCIR_15 constant multiplied by 4 t2 = t2_15 * 4 # - NAB_3.75 constant not altered t4 = t4_3 # Filter coefficients a = [t3 * t4, t3, 0] b = [t2 * t3, t2 + t3, 1] # Plot information case = 13 # Case 14 else: # speed_r == 7.5 fs = fs / 2 # Halving the sampling frequency # Correction filter: CCIRw7.5_mod + NABr3.75 # - CCIR_7.5 constant multiplied by 2 t2 = t2_7 * 2 # - NAB_3.75 constant not altered t4 = t4_3 # Filter coefficients a = [t3 * t4, t3, 0] b = [t2 * t3, t2 + t3, 1] # Plot information case = 14 return a, b, fs, case def correction(a: array, b: array, paf: ndarray, fs: int, plots: bool) -> ndarray: """ Apply a correction filter to a Preservation Audio File; :param a: array of coefficients, specifying the numerator of filter transfer function, :param b: array of coefficients, specifying in the denominator of filter transfer function, :param paf: ndarray specifying the raw audio data of the Preservation Audio File, :param fs: int specifying the operational sampling frequency, :param plots: bool specifying if filter plots should be displayed. :return: the corrected audio as a Restored Audio File. """ # Analog transfer function h_a = TransferFunction(a, b) # Analog frequency vector w_a = np.logspace(np.log10(1), np.log10(fs * np.pi), 5000) if plots: # Analog filter frequency response w_t, h_t = freqs(a, b, worN=w_a) # Plot analog graph # - Magnitude plt.subplot(2, 1, 1) plt.semilogx(w_t / (2 * np.pi), 20 * np.log10(abs(h_t))) plt.xlim([1, 24000]) plt.xlabel('Frequency') plt.ylim([-40, 40]) plt.ylabel('Amplitude response [dB]') plt.grid(True) # - Phase plt.subplot(2, 1, 2) plt.semilogx(w_t / (2 * np.pi), np.angle(h_t) * 180 / np.pi) plt.xlim([1, 24000]) plt.xlabel('Frequency') plt.ylabel('Phase [deg]') plt.grid(True) # Digital transfer function through bilinear digitisation h_d = c2d(h_a, 1 / fs, 'bilinear') num_d = h_d.num[0][0] # Inspect Hd.num to see why [0][0] is needed... den_d = h_d.den[0][0] # Same story here # Digital frequency vector w_d = np.logspace(np.log10(1), np.log10(fs / 2), 5000) if plots: # Digital filter frequency response w_n, h_n = freqz(num_d, den_d, worN=w_d, fs=fs) # Plot digital graph # - Magnitude plt.subplot(2, 1, 1) plt.semilogx(w_n, 20 * np.log10(abs(h_n)), '--') plt.legend(['Analog', 'Bilinear']) # - Phase plt.subplot(2, 1, 2) plt.semilogx(w_n, np.angle(h_n) * 180 / np.pi, '--') plt.legend(['Analog', 'Bilinear']) # Pole check # New pole frequency pole_frequency = 2 # Move to zero-pole representation z, p, k = tf2zpk(a, b) # Check if the function presents a pole at 0 Hz for i in range(len(p)): if p[i] == 0: # Replace pole p[i] = -pole_frequency * 2 * np.pi print('\n' + CC.PURPLE + 'Pole at 0 Hz replaced!' + CC.END) # Back to transfer function representation ap, bp = zpk2tf(z, p, k) # Analog transfer function hp_a = TransferFunction(ap, bp) if plots: # Analog filter frequency response wp_t, hp_t = freqs(ap, bp, worN=w_a) # Plot analog graph # - Magnitude plt.subplot(2, 1, 1) plt.semilogx(wp_t / (2 * np.pi), 20 * np.log10(abs(hp_t))) # - Phase plt.subplot(2, 1, 2) plt.semilogx(wp_t / (2 * np.pi), np.angle(hp_t) * 180 / np.pi) # Digital transfer function through bilinear digitisation hp_d = c2d(hp_a, 1 / fs, 'bilinear') num_d = hp_d.num[0][0] den_d = hp_d.den[0][0] if plots: # Digital filter frequency response wp_n, hp_n = freqz(num_d, den_d, worN=w_d, fs=fs) # Plot digital graph # - Magnitude plt.subplot(2, 1, 1) plt.semilogx(wp_n, 20 * np.log10(abs(hp_n)), '--') plt.legend(['Analog', 'Bilinear', 'Pole - Analog', 'Pole - Digital']) # - Phase plt.subplot(2, 1, 2) plt.semilogx(wp_n, np.angle(hp_n) * 180 / np.pi, '--') plt.legend(['Analog', 'Bilinear', 'Pole - Analog', 'Pole - Digital']) if plots: plt.show() print('\nFiltering Preservation Audio File...') # Filter Preservation Audio File raf = lfilter(num_d, den_d, paf, axis=0) # Again, wavfile.write() is stupid, and you must cast everything to not destroy your ears... raf = np.rint(raf).astype(paf.dtype) return raf def save_file(file: ndarray, fs: int, temp_path: str, name: str): """ Save an audio file to the given path with name 1.wav; :param file: ndarray specifying the raw audio data, :param fs: int specifying the operational sampling frequency, :param temp_path: str specifying the path where the file will be saved, :param name: str specifying the file name. :return: exit codes corresponding to the execution status. """ raf_path = os.path.join(temp_path, 'RestoredAudioFiles') make_raf = False if not os.path.exists(raf_path): # Create directory os.mkdir(raf_path) make_raf = True print("Restored Audio Files directory '% s' created" % raf_path) else: print((CC.PURPLE + "Restored Audio Files directory '% s' already exists!" + CC.END) % raf_path) overwrite = input('Do you want to overwrite it? [y/n]: ') if overwrite.casefold() == 'y': # Overwrite directory shutil.rmtree(raf_path) os.mkdir(raf_path) make_raf = True print('Restored Audio Files directory overwritten') elif overwrite.casefold() != 'n': print(CC.RED + 'Unknown command, exiting' + CC.END) quit(os.EX_USAGE) if make_raf: print("Saving Restored Audio File to: '%s' ..." % raf_path) wavfile.write(os.path.join(raf_path, name + '.wav'), fs, file) def main(): """ Main execution method. :return: exit codes corresponding to the execution status. """ print(CC.BOLD + "\nWelcome to ARP Tape Audio Restoration!" + CC.END) print("You are using Python version: " + sys.version) # Get the input from config.yaml or command line working_path, files_name, standard_w, speed_w, standard_r, speed_r, plots = get_arguments() # Check if input is correct paf_path, temp_path, standard_w, standard_r = check_input(working_path, files_name, standard_w, speed_w, standard_r, speed_r) # Display input parameters print('\nInput parameters:') print(' WORKING_PATH: ' + working_path) print(' FILES_NAME: ' + files_name) print(' STANDARD_W: ' + standard_w) print(' SPEED_W: ' + str(speed_w) + ' ips') print(' STANDARD_R: ' + standard_r) print(' SPEED_R: ' + str(speed_r) + ' ips') # Preservation Audio File check print("Opening '%s'..." % paf_path) fs, paf = wavfile.read(paf_path) print('Preservation Audio File opened!') print(' FS: ' + str(fs) + ' Hz\n') # Decision stage a, b, fs, case = get_correction_filter(standard_w, speed_w, standard_r, speed_r, fs) # Casting FS to int because wavfile.write() is stupid fs = round(fs) print('Reference case: ' + str(case)) print('Operational FS: ' + str(fs) + ' Hz.') # Correction phase if len(a) != 0: # Not all cases present a correction filter! raf = correction(paf, a, b, fs, plots) save_file(raf, fs, temp_path, '1') else: # Just save Restored Audio File, but with modified fs save_file(paf, fs, temp_path, '1') # End print(CC.GREEN + CC.BOLD + "Success!" + CC.END + '\n') if __name__ == '__main__': main()