Commit 35dfb875 authored by Matteo's avatar Matteo
Browse files

update

parent 2ef1c0f4
This diff is collapsed.
[tool.poetry]
name = "audio-analyzer"
version = "0.0.1"
version = "0.1.0"
description = "MPAI CAE-ARP Audio Analyser"
authors = ["Matteo Spanio <dev2@audioinnova.com>"]
license = "GPLv3"
......@@ -11,6 +11,8 @@ python = "^3.10"
mpai-cae-arp = "^0.2.3"
numpy = "1.23.3"
rich = "^13.3.3"
pandas = "^2.0.0"
scikit-learn = "^1.2.2"
[tool.poetry.group.docs.dependencies]
......@@ -32,7 +34,7 @@ build-backend = "poetry.core.masonry.api"
[tool.ruff]
line-length = 88
src = ["src"]
src = ["src", "tests"]
select = [
"E", # pycodestyle
"F", # pyflakes
......@@ -41,5 +43,5 @@ select = [
[tool.pytest.ini_options]
minversion = "6.0"
addopts = "-ra -q -n auto -W error::RuntimeWarning"
addopts = "-ra -q -W error::RuntimeWarning"
testpaths = ["tests"]
\ No newline at end of file
import argparse
import os
import time
import sys
from rich.console import Console
import segment_finder as sf
from mpai_cae_arp.types.irregularity import IrregularityFile
def get_args() -> argparse.Namespace:
parser = argparse.ArgumentParser()
parser.add_argument("--working-directory", "-w", help="Working directory", required=True)
parser.add_argument("--files-name", "-f", nargs="+", help="Files name", required=True)
return parser.parse_args()
from mpai_cae_arp.files import File, FileType
from mpai_cae_arp.io import prettify, Style
def get_args() -> tuple[str, str]:
if len(sys.argv) > 1:
parser = argparse.ArgumentParser(
prog="audio-analyzer",
formatter_class=argparse.RawDescriptionHelpFormatter,
description=f"A tool that implements {prettify('MPAI CAE-ARP Audio Analyser', styles=[Style.BOLD])} Technical Specification.",
epilog="For support, please contact Matteo Spanio <dev2@audioinnova.com>.\n"
"This software is licensed under the GNU General Public License v3.0."
)
parser.add_argument("--working-directory", "-w", help="The path were the AIW will find and save the files")
parser.add_argument("--files-name", "-f", help=f"The name of the files to be analyzed {prettify('without extension', styles=[Style.UNDERLINE])}")
args = parser.parse_args()
return args.working_directory, args.files_name
else:
return os.getenv("WORKING_DIRECTORY"), os.getenv("FILES_NAME")
def exit_with_error(error_message: str, console) -> None:
console.print(f"[red bold]Error: {error_message}")
console.print(f"[red bold]Error: {error_message} :t-rex:")
quit(os.EX_USAGE)
def main():
args = get_args()
def main() -> None:
console = Console()
console.print("[bold]Welcome to ARP Audio Analyzer!")
working_directory = args.working_directory
files_name = args.files_name
console.print("[bold]Welcome to ARP Audio Analyser!")
with console.status("Reading input files...", spinner="dots"):
time.sleep(1)
audio_src = os.path.join(working_directory, "PreservationAudioFile", f"{files_name[0]}.wav")
video_src = os.path.join(working_directory, "PreservationAudioVisualFile", f"{files_name[0]}.mov")
working_directory, files_name = get_args()
if any(map(lambda x: x is None, [working_directory, files_name])):
exit_with_error("{}\n{}".format(
"Working directory or files name not specified!",
"Try -h/--help to know more about Audio Analyser usage"), console)
with console.status("[purple]Reading input files", spinner="dots"):
audio_src = os.path.join(working_directory, "PreservationAudioFile", f"{files_name}.wav")
video_src = os.path.join(working_directory, "PreservationAudioVisualFile", f"{files_name}.mov")
console.log(audio_src)
console.log(video_src)
audio_exists = os.path.exists(audio_src)
video_exists = os.path.exists(video_src)
......@@ -49,20 +61,44 @@ def main():
exit_with_error("Input files not found!", console)
# create irregularity file 1
with console.status("Creating irregularity file 1...", spinner="dots"):
with console.status("[purple]Creating irregularity file 1", spinner="dots"):
irreg1 = sf.create_irreg_file(audio_src, video_src)
console.log(f"Found {len(irreg1.irregularities)} irregularities from Audio source")
File(f"{working_directory}/IrregularityFile1.json", FileType.JSON).write_content(irreg1.to_json())
console.log("[geen]Irregularity file 1 created")
# create irregularity file 2
with console.status("Creating irregularity file 2...", spinner="dots"):
video_irreg_1 = {}
with console.status("[purple]Creating irregularity file 2", spinner="dots"):
video_irreg_1 = {
"Irregularities": [
{
"TimeLabel": "00:03:00.000",
"Source": "v",
"IrregularityID": "09a0b0c0-d0e0-f000-0000-000000000000"
},
{
"TimeLabel": "00:03:01.000",
"Source": "v",
"IrregularityID": "09a0b0c0-d0e0-f000-0000-000000000001"
}
],
"Offset": 170
}
console.log("Video irregularity file 1 found")
irreg2 = sf.merge_irreg_files(irreg1, IrregularityFile.from_json(video_irreg_1))
File(f"{working_directory}/IrregularityFile2.json", FileType.JSON).write_content(irreg2.to_json())
console.log("[geen]Irregularity file 2 created")
with console.status("Extracting audio irregularities...", spinner="bouncingBall"):
with console.status("[cyan]Extracting audio irregularities", spinner="dots"):
sf.extract_audio_irregularities(audio_src, irreg2, working_directory)
console.log("[green]Audio irregularities extracted")
# classify audio irregularities
with console.status("[cyan bold]Classifying audio irregularities", spinner="monkey"):
sf.classify_audio_irregularities(working_directory)
console.print("[green bold]Success!")
console.print("[green bold]Success! :tada:")
quit(os.EX_OK)
if __name__ == "__main__":
......
"""
The :py:mod:`ml` contains a set of facilities for data analysis: it is structured in three main modules, one :mod:`ml.classification` where can be found functions for fitting and evaluating classifiers, one :mod:`ml.clusters` where can be found functions for fitting and evaluating clustering algorithms, :mod:`ml.datasets` contains functions to easily get the datasets described in :ref:`datasets` and :mod:`ml.visualization` where can be found functions for visualizing data and results. All the modules interface with :py:mod:`sklearn` and :py:mod:`pandas` modules.
This module is specific for analyzing data from the :ref:`datasets` module, instead of :mod:`audiohandler` which as been designed for represent audio data in any kind of applications.
"""
from ._classification import load_model, generate_classifier
from ._data_structures import Classifier, ClassificationResult
__all__ = [
"load_model", "generate_classifier", "Classifier",
"ClassificationResult"
]
import pickle
from sklearn.ensemble import RandomForestClassifier
import pandas as pd
from ml.datasets import load_berio_nono, load_pretto
from ._data_structures import Classifier
from ._constants import CLASSIFICATION_MAPPING, MODELS_PATH
def load_model(model_name: str) -> Classifier:
"""Load a trained classifier from disk.
Aviable models are:
- pretto_and_berio_nono_classifier
Parameters
----------
model_name: str
the path of the model to be loaded
Raises
------
ValueError
if the model name is not valid
Returns
-------
Classifier
the classifier loaded from disk
"""
models = {
"pretto_classifier":
MODELS_PATH.joinpath('pretto_classifier.pkl'),
"pretto_and_berio_nono_classifier":
MODELS_PATH.joinpath('pretto_and_berio_nono_classifier.pkl')
}
try:
with open(models[model_name], 'rb') as f:
return Classifier(pickle.load(f))
except FileNotFoundError:
generate_classifier(models[model_name])
return load_model(model_name)
def generate_classifier(dest_path):
data1 = load_pretto()
data2 = load_berio_nono()
data = pd.concat([data1, data2])
data = data.replace(CLASSIFICATION_MAPPING)
X = data.drop(columns=['noise_type', 'label'], axis=1)
y = data.label
rfc = RandomForestClassifier(n_estimators=111,
criterion="log_loss",
max_features="log2",
min_samples_leaf=1,
n_jobs=-1)
rfc.fit(X, y)
with open(dest_path, 'wb') as f:
pickle.dump(rfc, f)
from importlib import resources
MODELS_FOLDER = 'ml.classification.models'
MODELS_PATH = resources.files(MODELS_FOLDER)
CLASSIFICATION_MAPPING = {
'3N_3N': 0,
'3N_7C': 1,
'3N_7N': 2,
'3N_15C': 3,
'3N_15N': 4,
'7C_3N': 5,
'7C_7C': 6,
'7C_7N': 7,
'7C_15C': 8,
'7C_15N': 9,
'7N_3N': 10,
'7N_7C': 11,
'7N_7N': 12,
'7N_15C': 13,
'7N_15N': 14,
'15C_3N': 15,
'15C_7C': 16,
'15C_7N': 17,
'15C_15C': 18,
'15C_15N': 19,
'15N_3N': 20,
'15N_7C': 21,
'15N_7N': 22,
'15N_15C': 23,
'15N_15N': 24
}
INVERSE_CLASSIFICATION_MAPPING = {
v: k
for k, v in CLASSIFICATION_MAPPING.items()
}
from dataclasses import dataclass
import pandas as pd
from sklearn.ensemble import RandomForestClassifier
from sklearn.tree import DecisionTreeClassifier
from sklearn.neighbors import KNeighborsClassifier
from mpai_cae_arp.audio.standards import EqualizationStandard, SpeedStandard
from ._constants import INVERSE_CLASSIFICATION_MAPPING
@dataclass
class ClassificationResult:
"""
A class to represent the result of a classification.
Since the classification recognizes 4 informations, those are stored in a container class to have the possibility to find all informations in the same place, but, when necessary, use only the part that is needed.
Informations are:
- the reading speed of the tape,
- the writing speed,
- the reading post-emphasis equalization curve
- the writing pre-emphasis equalization curve
"""
writing_speed: SpeedStandard
reading_speed: SpeedStandard
writing_equalization: EqualizationStandard
reading_equalization: EqualizationStandard
_MAP_CLASS_TO_RESULT = {
'3N_3N':
ClassificationResult(SpeedStandard.III, SpeedStandard.III,
EqualizationStandard.NAB, EqualizationStandard.NAB),
'3N_7C':
ClassificationResult(SpeedStandard.III, SpeedStandard.IV,
EqualizationStandard.NAB, EqualizationStandard.CCIR),
'3N_7N':
ClassificationResult(SpeedStandard.III, SpeedStandard.IV,
EqualizationStandard.NAB, EqualizationStandard.NAB),
'3N_15C':
ClassificationResult(SpeedStandard.III, SpeedStandard.V,
EqualizationStandard.NAB, EqualizationStandard.CCIR),
'3N_15N':
ClassificationResult(SpeedStandard.III, SpeedStandard.V,
EqualizationStandard.NAB, EqualizationStandard.NAB),
'7C_3N':
ClassificationResult(SpeedStandard.IV, SpeedStandard.III,
EqualizationStandard.CCIR, EqualizationStandard.NAB),
'7C_7C':
ClassificationResult(SpeedStandard.IV, SpeedStandard.IV,
EqualizationStandard.CCIR, EqualizationStandard.CCIR),
'7C_7N':
ClassificationResult(SpeedStandard.IV, SpeedStandard.IV,
EqualizationStandard.CCIR, EqualizationStandard.NAB),
'7C_15C':
ClassificationResult(SpeedStandard.IV, SpeedStandard.V,
EqualizationStandard.CCIR, EqualizationStandard.CCIR),
'7C_15N':
ClassificationResult(SpeedStandard.IV, SpeedStandard.V,
EqualizationStandard.CCIR, EqualizationStandard.NAB),
'7N_3N':
ClassificationResult(SpeedStandard.IV, SpeedStandard.III,
EqualizationStandard.NAB, EqualizationStandard.NAB),
'7N_7C':
ClassificationResult(SpeedStandard.IV, SpeedStandard.IV,
EqualizationStandard.NAB, EqualizationStandard.CCIR),
'7N_7N':
ClassificationResult(SpeedStandard.IV, SpeedStandard.IV,
EqualizationStandard.NAB, EqualizationStandard.NAB),
'7N_15C':
ClassificationResult(SpeedStandard.IV, SpeedStandard.V,
EqualizationStandard.NAB, EqualizationStandard.CCIR),
'7N_15N':
ClassificationResult(SpeedStandard.IV, SpeedStandard.V,
EqualizationStandard.NAB, EqualizationStandard.NAB),
'15C_3N':
ClassificationResult(SpeedStandard.V, SpeedStandard.III,
EqualizationStandard.CCIR, EqualizationStandard.NAB),
'15C_7C':
ClassificationResult(SpeedStandard.V, SpeedStandard.IV,
EqualizationStandard.CCIR, EqualizationStandard.CCIR),
'15C_7N':
ClassificationResult(SpeedStandard.V, SpeedStandard.IV,
EqualizationStandard.CCIR, EqualizationStandard.NAB),
'15C_15C':
ClassificationResult(SpeedStandard.V, SpeedStandard.V,
EqualizationStandard.CCIR, EqualizationStandard.CCIR),
'15C_15N':
ClassificationResult(SpeedStandard.V, SpeedStandard.V,
EqualizationStandard.CCIR, EqualizationStandard.NAB),
'15N_3N':
ClassificationResult(SpeedStandard.V, SpeedStandard.III,
EqualizationStandard.NAB, EqualizationStandard.NAB),
'15N_7C':
ClassificationResult(SpeedStandard.V, SpeedStandard.IV,
EqualizationStandard.NAB, EqualizationStandard.CCIR),
'15N_7N':
ClassificationResult(SpeedStandard.V, SpeedStandard.IV,
EqualizationStandard.NAB, EqualizationStandard.NAB),
'15N_15C':
ClassificationResult(SpeedStandard.V, SpeedStandard.V,
EqualizationStandard.NAB, EqualizationStandard.CCIR),
'15N_15N':
ClassificationResult(SpeedStandard.V, SpeedStandard.V,
EqualizationStandard.NAB, EqualizationStandard.NAB),
}
class Classifier:
model: RandomForestClassifier | DecisionTreeClassifier | KNeighborsClassifier
def __init__(self, model):
self.model = model
def predict(self, x: pd.DataFrame) -> pd.DataFrame:
prediction = self.model.predict(x)
prediction = pd.DataFrame(prediction, columns=['classification'])
prediction = prediction.replace(INVERSE_CLASSIFICATION_MAPPING)
prediction = prediction.replace(_MAP_CLASS_TO_RESULT)
return prediction
def get_model_description(self) -> str:
return str(self.model)
"""
The :mod:`ml.datasets` module includes utilities to load datasets.
"""
from ._loaders import load_pretto, load_berio_nono, _filter_dataset
__all__ = ['load_pretto', 'load_berio_nono', '_filter_dataset']
import itertools
from importlib import resources
import pandas as pd
_DATA_MODULE = 'ml.datasets'
def _filter_dataset(data: pd.DataFrame,
labels: list | None = None,
noise_type: str | None = None,
combination: bool = False):
df = data
if labels is not None:
if combination:
df = data[data['label'].isin(
['_'.join(l) for l in itertools.product(labels, labels)])]
else:
df = data[data['label'].isin(labels)]
if noise_type is not None:
df = df[df.noise_type == noise_type]
return df
def load_pretto(filters: dict = None, return_X_y: bool = False):
"""Load and return the Pretto dataset (classification).
================= ============================
Classes 25
Samples per noise 2075 (A), 5050 (B), 1933 (C)
Samples total 9058
Dimensionality 15
Features string, float
================= ============================
Read more in the :ref:`Datasets <pretto>`.
Examples
--------
.. doctest::
>>> from ml.datasets import load_pretto
>>> data = load_pretto(filters={'labels': ['7C', '7N'], 'noise_type': None, 'combination': True})
>>> data.noise_type.unique()
array(['A', 'B', 'C'], dtype=object)
>>> data.label.unique()
array(['7C_7C', '7C_7N', '7N_7C', '7N_7N'], dtype=object)
"""
data = pd.read_csv(resources.files(_DATA_MODULE).joinpath('data/train.csv'))
if filters is not None:
data = _filter_dataset(data, filters.get('labels'),
filters.get('noise_type'),
filters.get('combination'))
if return_X_y:
return data.drop("label", axis=1), data["label"]
return data
def load_berio_nono(filters: dict = None, return_X_y: bool = False):
"""Load and return the Berio-Nono dataset (classification).
================= ============================
Classes 4
Samples per noise 1231 (A), 1796 (B), 9175 (C)
Samples total 12202
Dimensionality 15
Features string, float
================= ============================
Read more in the :ref:`Datasets <berio-nono>`.
Examples
--------
.. doctest::
>>> from ml.datasets import load_berio_nono
>>> data = load_berio_nono(filters={'labels': ['7C', '7N'], 'noise_type': None, 'combination': True})
>>> data.noise_type.unique()
array(['A', 'B', 'C'], dtype=object)
>>> data.label.unique()
array(['7C_7C', '7N_7N'], dtype=object)
"""
data = pd.read_csv(resources.files(_DATA_MODULE).joinpath('data/test.csv'))
if filters is not None:
data = _filter_dataset(data, filters.get('labels'),
filters.get('noise_type'),
filters.get('combination'))
if return_X_y:
return data.drop("label", axis=1), data["label"]
return data
This diff is collapsed.
This diff is collapsed.
import os
import tempfile
from uuid import uuid4
import numpy as np
from mpai_cae_arp.audio import AudioWave, Noise
from mpai_cae_arp.files import File, FileType
from mpai_cae_arp.types.irregularity import Irregularity, IrregularityFile, Source
from mpai_cae_arp.time import frames_to_seconds, seconds_to_frames
from mpai_cae_arp.time import frames_to_seconds, seconds_to_frames, seconds_to_string, time_to_seconds
temp_dir = tempfile.gettempdir()
TMP_CHANNELS_MAP = os.path.join(temp_dir, "channels_map.json")
......@@ -27,17 +26,22 @@ def calculate_offset(audio: AudioWave, video: AudioWave) -> float:
float
"""
corr = np.correlate(audio.array, video.array, mode="full")
lags = np.arange(-len(audio.array) + 1, len(video.array))
lag_idx = np.argmax(np.abs(corr))
# corr = np.correlate(audio.array, video.array, mode="full")
# lags = np.arange(-len(audio.array) + 1, len(video.array))
# lag_idx = np.argmax(np.abs(corr))
return lags[lag_idx] / audio.samplerate
# return lags[lag_idx] / audio.samplerate
return 150
def get_irregularities_from_audio(audio_src: AudioWave) -> list[Irregularity]:
input_channels: list[AudioWave] = []
for channel in audio_src.channels:
input_channels.append(audio_src.get_channel(channel))
if audio_src.channels > 1:
for channel in range(audio_src.channels):
input_channels.append(audio_src.get_channel(channel))
else:
input_channels.append(audio_src)
channels_map = {}
......@@ -52,12 +56,12 @@ def get_irregularities_from_audio(audio_src: AudioWave) -> list[Irregularity]:
id = uuid4()
irreg_list.append(
Irregularity(
uuid=id,
irregularity_ID=id,
source=Source.AUDIO,
time_label=frames_to_seconds(start, audio.samplerate)
time_label= seconds_to_string(frames_to_seconds(start, audio.samplerate))
)
)
channels_map[id] = idx
channels_map[str(id)] = idx
File(TMP_CHANNELS_MAP, FileType.JSON).write_content(channels_map)
......@@ -69,32 +73,57 @@ def create_irreg_file(audio_src: str, video_src: str) -> IrregularityFile:
audio = AudioWave.from_file(audio_src, bufferize=True)
offset = calculate_offset(audio, video_src)
return IrregularityFile(get_irregularities_from_audio(audio), offset=offset)
irregularities = get_irregularities_from_audio(audio)
irregularities.sort(key=lambda x: time_to_seconds(x.time_label))
return IrregularityFile(irregularities=irregularities, offset=offset)
def merge_irreg_files(
file1: IrregularityFile,
file2: IrregularityFile) -> IrregularityFile:
new_file = IrregularityFile(
irregularities=file1.irregularities + file2.irregularities,
offset=np.argmax([file1.offset, file2.offset]))
file2: IrregularityFile
) -> IrregularityFile:
match file1.offset, file2.offset:
case None, _:
offset=file2.offset
case _, None:
offset=file1.offset
case _, _:
offset=max(file1.offset, file2.offset)
new_file.irregularities.sort(key=lambda x: x.time_label)
irregularities = file1.irregularities + file2.irregularities
irregularities.sort(key=lambda x: time_to_seconds(x.time_label))
new_file = IrregularityFile(
irregularities=irregularities, offset=offset)
return new_file
def extract_audio_irregularities(
audio: AudioWave,
audio_src: str,
irreg_file: IrregularityFile,
path: str) -> None:
path: str
) -> None:
channels_map = File(TMP_CHANNELS_MAP, FileType.JSON).get_content()
os.makedirs(f"{path}/AudioBlocks", exist_ok=True)
audio = AudioWave.from_file(audio_src, bufferize=True)
for irreg in irreg_file.irregularities:
if irreg.source == Source.AUDIO:
chunk = audio.get_channel(channels_map[irreg.irregularity_ID])[
seconds_to_frames(
irreg.time_label, audio.samplerate
):seconds_to_frames(
irreg.time_label, audio.samplerate)+audio.samplerate//2]
chunk.save(f"{path}/AudioBlocks/{irreg.irregularity_ID}.wav")
if channels_map.get(str(irreg.irregularity_ID)) is None:
audio[seconds_to_frames(
time_to_seconds(irreg.time_label), audio.samplerate
):seconds_to_frames(
time_to_seconds(irreg.time_label), audio.samplerate)+audio.samplerate//2]\
.save(f"{path}/AudioBlocks/{irreg.irregularity_ID}.wav")
else:
audio.get_channel(channels_map[str(irreg.irregularity_ID)])[
seconds_to_frames(
time_to_seconds(irreg.time_label), audio.samplerate
):seconds_to_frames(
time_to_seconds(irreg.time_label), audio.samplerate)+audio.samplerate//2]\
.save(f"{path}/AudioBlocks/{irreg.irregularity_ID}.wav")
os.remove(TMP_CHANNELS_MAP)
import os
import tempfile
import uuid
import numpy as np
from mpai_cae_arp.audio import AudioWave
from mpai_cae_arp.types.irregularity import Irregularity, IrregularityFile, Source
import segment_finder as sf
def test_calculate_offset():
audio = AudioWave(np.array([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]), 24, 1, 8000)
video = AudioWave(np.array([0, 0, 1, 1, 1, 1, 1, 1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10]), 24, 1, 8000)
offset = sf.calculate_offset(audio, video)
assert offset == 0.0
def test_get_irregularities_from_audio():
audio = AudioWave(np.array([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]), 24, 1, 8000)
irregularities = sf.get_irregularities_from_audio(audio)
assert irregularities == []
def test_merge_irreg_files():
file1 = IrregularityFile(
irregularities=[
Irregularity(
irregularity_ID=uuid.uuid4(),
source=Source.AUDIO,
time_label="00:10:00.000"
)],
offset=0.0)
file2 = IrregularityFile(
irregularities=[
Irregularity(
irregularity_ID=uuid.uuid4(),
source=Source.AUDIO,
time_label="00:00:00.000")],
offset=1.0)
new_file = sf.merge_irreg_files(file1, file2)
assert new_file.offset == 1.0
assert len(new_file.irregularities) == 2
def test_extract_audio_irregularities():
audio = AudioWave(np.array([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]), 24, 1, 8000)
irregularities = sf.get_irregularities_from_audio(audio)
irreg_file = IrregularityFile(irregularities=irregularities, offset=0.0)
sf.extract_audio_irregularities(audio, irreg_file, tempfile.gettempdir())
for irreg in irreg_file.irregularities:
if irreg.source == Source.AUDIO:
assert os.path.exists(f"{tempfile.gettempdir()}/AudioBlocks/{irreg.irregularity_ID}.wav")
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment