#!./venv/bin/activate """ MPAI CAE-ARP Packager. Implements MPAI CAE-ARP Packager Technical Specification, providing: - Access Copy Files: 1. Restored Audio Files. 2. Editing List. 3. Set of Irregularity Images in a .zip file. 4. Irregularity File. - Preservation Master Files: 1. Preservation Audio File. 2. Preservation Audio-Visual File where the audio has been replaced with the Audio of the Preservation Audio File fully synchronised with the video. 3. Set of Irregularity Images in a .zip file. 4. Irregularity File. """ import json import os import shutil import sys import yaml from argparse import ArgumentParser, RawTextHelpFormatter from moviepy.editor import VideoFileClip, AudioFileClip __author__ = "Nadir Dalla Pozza" __copyright__ = "Copyright 2022, Audio Innova S.r.l." __credits__ = ["Niccolò Pretto", "Nadir Dalla Pozza", "Sergio Canazza"] __license__ = "GPL v3.0" __version__ = "1.0.1" __maintainer__ = "Nadir Dalla Pozza" __email__ = "nadir.dallapozza@unipd.it" __status__ = "Production" class CC: """ Variables for customizing console colors """ PURPLE = '\033[95m' CYAN = '\033[96m' DARK_CYAN = '\033[36m' BLUE = '\033[94m' GREEN = '\033[92m' YELLOW = '\033[93m' RED = '\033[91m' BOLD = '\033[1m' UNDERLINE = '\033[4m' END = '\033[0m' def get_arguments() -> tuple[str, str]: """ Method to obtain arguments from config.yaml file or command line. Default config.yaml, ignored if a command line argument is passed. :return: tuple consisting of two strings: 1) the working path; 2) the name of the Preservation files, which is key element to retrieve necessary files. """ if len(sys.argv) > 1: # Read from command line parser = ArgumentParser( prog="python3 packager.py", formatter_class=RawTextHelpFormatter, description="A tool that implements MPAI CAE-ARP Packager Technical Specification.\n" "By default, the configuration parameters are loaded from ./config.yaml file,\n" "but, alternately, you can pass command line arguments to replace them." ) parser.add_argument( "-w", "--working-path", help="Specify the Working Path, where all input files are stored", required=True ) parser.add_argument( "-f", "--files-name", help="Specify the name of the Preservation files (without extension)", required=True ) args = parser.parse_args() working_path = args.working_path files_name = args.files_name else: # Read from configuration file config = object try: config = yaml.safe_load(open('./config.yaml', 'r')) if 'WORKING_PATH' not in config: print(CC.RED + 'WORKING_PATH key not found in config.yaml!' + CC.END) quit(os.EX_CONFIG) if 'FILES_NAME' not in config: print(CC.RED + 'FILES_NAME key not found in config.yaml!' + CC.END) quit(os.EX_CONFIG) except FileNotFoundError: print(CC.RED + 'config.yaml file not found!' + CC.END) quit(os.EX_NOINPUT) working_path = config['WORKING_PATH'] files_name = config['FILES_NAME'] return working_path, files_name def check_input(working_path: str, files_name: str) -> str: """ Method to check that passed arguments are correct and that the environment is conformant to the standard. :param working_path: str representing the path where all files resulting from previous AIMs are stored. :param files_name: str representing the Preservation files name, to identify the input directory. :return: str representing the path where the files to be processed during the current execution are stored. """ if not os.path.exists(working_path): print(CC.RED + 'The specified WORKING_PATH is non-existent!' + CC.END) quit(os.EX_CONFIG) # Check for temp directory existence temp_path = os.path.join(working_path, 'temp') if not os.path.exists(temp_path): print(CC.RED + 'WORKING_PATH structure is not conformant!' + CC.END) quit(os.EX_NOINPUT) # Check for input directory existence temp_path = os.path.join(temp_path, files_name) if not os.path.exists(temp_path): print(CC.RED + 'The specified FILES_NAME has no corresponding files!' + CC.END) quit(os.EX_NOINPUT) return temp_path def make_dir(dir_path: str) -> bool: """ Decide if the given directory has to be created or should be overridden. :param dir_path: str representing the path of the directory. :return: bool specifying if the files within the directory should be created. """ if not os.path.exists(dir_path): # Create directory os.mkdir(dir_path) print("Directory '% s' created" % dir_path) return True else: print((CC.PURPLE + "Directory '% s' already exists!" + CC.END) % dir_path) overwrite = input('Do you want to overwrite it? [y/n]: ') if overwrite.casefold() == 'y': # Overwrite directory shutil.rmtree(dir_path) os.mkdir(dir_path) print('%s overwritten' % dir_path) return True elif overwrite.casefold() != 'n': print(CC.RED + 'Unknown command, exiting' + CC.END) quit(os.EX_USAGE) return False def main(): """ Main execution method. :return: exit codes corresponding to the execution status """ print(CC.BOLD + "\nWelcome to ARP Packager!" + CC.END) print("You are using Python version: " + sys.version) # Get the input from config.yaml or command line working_path, files_name = get_arguments() # Check if input is correct temp_path = check_input(working_path, files_name) # Access Copy Files print('\n' + CC.BOLD + 'Creation of Access Copy Files...' + CC.END) # By default, AccessCopyFiles directory is created in the output path... acf_dir = 'AccessCopyFiles/' + files_name + '/' acf_path = os.path.join(working_path, acf_dir) # ...however, if it already exists, the user can decide if it has to be overridden make_acf = make_dir(acf_path) if make_acf: # Copy RestoredAudioFiles raf_path = os.path.join(temp_path, 'RestoredAudioFiles') if not os.path.exists(raf_path): print((CC.RED + "Restored Audio Files directory '% s' not found!" + CC.END) % raf_path) quit(os.EX_NOINPUT) restored_audio_files = os.listdir(raf_path) if len(restored_audio_files) == 1: print((CC.YELLOW + "Restored Audio Files directory '% s' is empty" + CC.END) % raf_path) shutil.copytree(raf_path, os.path.join(acf_path, 'RestoredAudioFiles')) print("Restored Audio Files copied") # Copy Editing List el_path = os.path.join(temp_path, 'EditingList.json') try: shutil.copy2(el_path, acf_path) except FileNotFoundError: print((CC.RED + "Editing List file '% s' not found!" + CC.END) % el_path) quit(os.EX_NOINPUT) print("Editing List copied") # Create Irregularity Images archive ii_path = os.path.join(temp_path, 'IrregularityImages') if not os.path.exists(ii_path): print((CC.RED + "Irregularity Images directory '% s' not found!" + CC.END) % ii_path) quit(os.EX_NOINPUT) irregularity_images = os.listdir(ii_path) if len(irregularity_images) == 1: print((CC.YELLOW + "Irregularity Images directory '% s' is empty" + CC.END) % ii_path) shutil.make_archive(acf_path + 'IrregularityImages', 'zip', temp_path, 'IrregularityImages') print("Irregularity Images archive created") # Copy Irregularity File if_path = os.path.join(temp_path, 'TapeIrregularityClassifier_IrregularityFileOutput2.json') try: shutil.copy2(if_path, os.path.join(acf_path, 'IrregularityFile.json')) except FileNotFoundError: print((CC.RED + "Irregularity File file '% s' not found!" + CC.END) % if_path) quit(os.EX_NOINPUT) print("Irregularity File copied") # End Access Copy Files print(CC.GREEN + CC.BOLD + "Success!" + CC.END) # Preservation Master Files print('\n' + CC.BOLD + 'Creation of Preservation Master Files...' + CC.END) # By default, PreservationMasterFiles directory is created in the output path... pmf_dir = 'PreservationMasterFiles/' + files_name + '/' pmf_path = os.path.join(working_path, pmf_dir) # ...however, if it already exists, it is possible to skip its creation make_pmf = make_dir(pmf_path) if make_pmf: # Copy Preservation Audio File audio_file = files_name + '.wav' paf_path = os.path.join(working_path, 'PreservationAudioFile/', audio_file) try: shutil.copy2(paf_path, pmf_path + 'PreservationAudioFile.wav') except FileNotFoundError: print((CC.RED + "Preservation Audio File file '% s' not found!" + CC.END) % paf_path) quit(os.EX_NOINPUT) print("Preservation Audio File copied") # Create Preservation Audio-Visual File with substituted audio video_file = files_name + '.mov' pvf_path = os.path.join(working_path, 'PreservationAudioVisualFile/', video_file) try: audio = AudioFileClip(paf_path) video = VideoFileClip(pvf_path) # Open Irregularity File to get offset irregularity_file_json = open( os.path.join(temp_path, 'TapeIrregularityClassifier_IrregularityFileOutput2.json') ) irregularity_file = json.load(irregularity_file_json) offset = irregularity_file['Offset']/1000 if offset > 0: audio = audio.subclip(t_start=offset) else: video = video.subclip(t_start=offset) video = video.set_audio(audio) video.write_videofile(pmf_path + 'PreservationAudioVisualFile.mov', bitrate='3000k', codec='mpeg4') print("Preservation Audio-Visual File created") except OSError: print((CC.RED + "Preservation Audio-Visual File file '% s' not found!" + CC.END) % pvf_path) quit(os.EX_NOINPUT) # Create Irregularity Images archive (already checked) shutil.make_archive(pmf_path + 'IrregularityImages', 'zip', temp_path, 'IrregularityImages') print("Irregularity Images archive created") # Copy Irregularity File (already checked) shutil.copy2( os.path.join(temp_path, 'TapeIrregularityClassifier_IrregularityFileOutput2.json'), os.path.join(pmf_path, 'IrregularityFile.json') ) print("Irregularity File copied") # End Preservation Master Files print(CC.GREEN + CC.BOLD + "Success!" + CC.END + '\n') else: print("\nExit") if __name__ == '__main__': main()