Module music_df.script_helpers
Functions
def get_csv_path(raw_path: str, config) ‑> str-
Expand source code
def get_csv_path(raw_path: str, config) -> str: if getattr(config, "csv_prefix_to_strip", None) is not None: raw_path = raw_path.replace(config.csv_prefix_to_strip, "", 1) if getattr(config, "csv_prefix_to_add", None) is not None: raw_path = config.csv_prefix_to_add + raw_path return raw_path def get_csv_title(raw_path, config) ‑> str-
Expand source code
def get_csv_title(raw_path, config) -> str: if getattr(config, "csv_prefix_to_strip", None) is not None: raw_path = raw_path.replace(config.csv_prefix_to_strip, "", 1) out = os.path.splitext(raw_path)[0] return out def get_itos(dictionary_paths: list[str] | str) ‑> dict[str, list[str]]-
Expand source code
def get_itos(dictionary_paths: list[str] | str) -> dict[str, list[str]]: if isinstance(dictionary_paths, str): dictionary_paths = [dictionary_paths] out = {} for dictionary_path in dictionary_paths: feature_name = os.path.basename(dictionary_path).rsplit("_", maxsplit=1)[0] out[feature_name] = get_single_itos(dictionary_path) return out def get_single_itos(dictionary_path: str, specials_to_prepend: list[str] | None = None) ‑> list[str]-
Expand source code
def get_single_itos( dictionary_path: str, specials_to_prepend: list[str] | None = None ) -> list[str]: if not specials_to_prepend: specials_to_prepend = [] with open(dictionary_path) as inf: data = inf.readlines() return specials_to_prepend + [ line.split(" ", maxsplit=1)[0] for line in data if line and not line.startswith("madeupword") ] def get_single_stoi(dictionary_path: str) ‑> dict[str, int]-
Expand source code
def get_single_stoi(dictionary_path: str) -> dict[str, int]: with open(dictionary_path) as inf: data = inf.readlines() contents = [ line.split(" ", maxsplit=1)[0] for line in data if line and not line.startswith("madeupword") ] return {token: i for i, token in enumerate(contents)} def get_stoi(dictionary_paths: list[str]) ‑> dict[str, dict[str, int]]-
Expand source code
def get_stoi(dictionary_paths: list[str]) -> dict[str, dict[str, int]]: out = {} for dictionary_path in dictionary_paths: feature_name = os.path.basename(dictionary_path).rsplit("_", maxsplit=1)[0] out[feature_name] = get_single_stoi(dictionary_path) return out def plot_item_from_logits(metadata_row,
logits,
config,
feature_vocab,
pdf_path: str | None = None,
csv_path: str | None = None,
music_df: pandas.DataFrame | None = None,
title: str | None = None,
entropy_to_transparency: bool = False,
keep_intermediate_files: bool = False,
write_csv: bool = False,
feature_name: str | None = None,
sync: bool = False,
number_every_nth_note: int | None = None,
number_specified_notes: Sequence[int] | None = None,
start_i: int | None = None,
end_i: int | None = None,
quantize: int | None = None,
concat_df_columns: tuple[tuple[str, ...], ...] = (),
binary_decision_threshold: float | None = None)-
Expand source code
def plot_item_from_logits( metadata_row, logits, config, feature_vocab, pdf_path: str | None = None, csv_path: str | None = None, music_df: pd.DataFrame | None = None, title: str | None = None, entropy_to_transparency: bool = False, keep_intermediate_files: bool = False, write_csv: bool = False, feature_name: str | None = None, sync: bool = False, number_every_nth_note: int | None = None, number_specified_notes: Sequence[int] | None = None, start_i: int | None = None, end_i: int | None = None, quantize: int | None = None, concat_df_columns: tuple[tuple[str, ...], ...] = (), binary_decision_threshold: float | None = None, ): if HUMDRUM_UNAVAILABLE: raise ValueError("Install music_df humdrum_export requirements") if getattr(config, "data_has_start_and_stop_tokens", False): logits = logits[1:-1] if music_df is None: music_df = read_csv(metadata_row.csv_path) assert music_df is not None for concat_columns in concat_df_columns: music_df = concatenate_features(music_df, concat_columns) # if title is None: # title = get_csv_title(metadata_row.csv_path, config) df_indices = metadata_row.df_indices if isinstance(df_indices, str): df_indices = ast.literal_eval(df_indices) # if "start_offset" in metadata_row.index: # title += f" {metadata_row.start_offset}" # else: # title += f" {metadata_row.name}" # subfolder = title.strip(os.path.sep).replace(os.path.sep, "+").replace(" ", "_") # if sync: # subfolder += "_synced" # This former strategy for cropping led to incorrect results sometimes: # cropped_df = crop_df(music_df, start_i=min(df_indices), end_i=max(df_indices)) cropped_df = music_df.loc[df_indices] assert cropped_df.type.unique().tolist() == ["note"] # somewhat confusingly, we crop music_df separately in show_score_and_predictions # below (there, we also need to retrieve the preceding time signature, etc.) # which is why we keep it around here notes_df = cropped_df.reset_index(drop=True) # In case logits were ragged, only take the logits corresponding to notes logits = logits[: len(notes_df)] if end_i is not None: notes_df = notes_df.iloc[:end_i] logits = logits[:end_i] df_indices = [i for i in df_indices if i <= end_i] if start_i is not None: notes_df = notes_df.iloc[start_i:] logits = logits[start_i:] df_indices = [i for i in df_indices if i >= start_i] # if feature_name in ONSET_LEVEL_FEATURES: if sync: logits = sync_array_by_df(logits, notes_df, sync_col_name_or_names="onset") if entropy_to_transparency or (binary_decision_threshold is not None): probs = softmax(logits) if entropy_to_transparency: entropy = -np.sum(probs * np.log2(probs), axis=1) else: entropy = None if binary_decision_threshold: assert logits.shape[-1] == 2, "binary_decision_threshold requires binary logits" assert config.n_specials == 0, ( "binary_decision_threshold not implemented where config.n_specials != 0" ) predicted_indices = np.where(probs[:, 1] > binary_decision_threshold, 1, 0) else: predicted_indices = logits.argmax(axis=-1) predicted_indices -= config.n_specials if predicted_indices.min() < 0: LOGGER.warning( f"Predicted at least one special token in {metadata_row.csv_path}; " "replacing with 0" ) predicted_indices[predicted_indices < 0] = 0 predictions = [feature_vocab[i] for i in predicted_indices] if quantize: music_df = quantize_df(music_df, tpq=quantize) if config.make_score_pdfs: # feature_name = feature_name if feature_name is not None else config.feature_name # pdf_basename = f"{feature_name}.pdf" # pdf_path = os.path.join(config.output_folder, subfolder, pdf_basename) # csv_path = pdf_path[:-4] + ".csv" assert pdf_path is not None return_code = show_score_and_predictions( music_df=music_df, feature_name=feature_name, predicted_feature=predictions, prediction_indices=df_indices, pdf_path=pdf_path, csv_path=csv_path if write_csv else None, col_type=config.column_types.get(feature_name, str), entropy=entropy, keep_intermediate_files=keep_intermediate_files, number_every_nth_note=number_every_nth_note, number_specified_notes=number_specified_notes, number_notes_offset=(start_i if start_i is not None else 0), ) if not return_code: LOGGER.info(f"Wrote {pdf_path}") if config.make_piano_rolls: fig, ax = plt.subplots() # TODO: (Malcolm 2023-09-29) save to a png rather than displaying plot_predictions( music_df, feature_name if feature_name is not None else config.feature_name, predictions, df_indices, ax=ax, title=title, ) plt.show() def plot_item_from_tokens(metadata_row,
tokens,
config,
pdf_path: str | None = None,
csv_path: str | None = None,
music_df: pandas.DataFrame | None = None,
title: str | None = None,
feature_name: str | None = None,
write_csv=False,
keep_intermediate_files: bool = False,
start_i: int | None = None,
end_i: int | None = None,
quantize: int | None = None,
concat_df_columns: tuple[tuple[str, ...], ...] = (),
number_specified_notes: Sequence[int] | None = None)-
Expand source code
def plot_item_from_tokens( metadata_row, tokens, config, pdf_path: str | None = None, csv_path: str | None = None, music_df: pd.DataFrame | None = None, title: str | None = None, feature_name: str | None = None, write_csv=False, keep_intermediate_files: bool = False, start_i: int | None = None, end_i: int | None = None, quantize: int | None = None, concat_df_columns: tuple[tuple[str, ...], ...] = (), number_specified_notes: Sequence[int] | None = None, ): if HUMDRUM_UNAVAILABLE: raise ValueError("Install music_df humdrum_export requirements") if music_df is None: music_df = read_csv(metadata_row.csv_path) assert music_df is not None for concat_columns in concat_df_columns: music_df = concatenate_features(music_df, concat_columns) # if title is None: # title = get_csv_title(metadata_row.csv_path, config) df_indices = metadata_row.df_indices if isinstance(df_indices, str): df_indices = ast.literal_eval(df_indices) # if "start_offset" in metadata_row.index: # title += f" {metadata_row.start_offset}" # else: # title += f" {metadata_row.name}" # subfolder = title.strip(os.path.sep).replace(os.path.sep, "+").replace(" ", "_") if quantize: music_df = quantize_df(music_df, tpq=quantize) if end_i is not None: raise NotImplementedError if start_i is not None: raise NotImplementedError if config.make_score_pdfs: # feature_name = feature_name if feature_name is not None else config.feature_name # pdf_basename = f"{feature_name}.pdf" # pdf_path = os.path.join(config.output_folder, subfolder, pdf_basename) # csv_path = pdf_path[:-4] + ".csv" assert pdf_path is not None return_code = show_score_and_predictions( music_df=music_df, feature_name=feature_name, predicted_feature=tokens, prediction_indices=df_indices, pdf_path=pdf_path, csv_path=csv_path if write_csv else None, col_type=config.column_types.get(feature_name, str), keep_intermediate_files=keep_intermediate_files, number_every_nth_note=config.number_every_nth_note, number_specified_notes=number_specified_notes, number_notes_offset=(start_i if start_i is not None else 0), ) if not return_code: LOGGER.info(f"Wrote {pdf_path}") if config.make_piano_rolls: fig, ax = plt.subplots() # TODO: (Malcolm 2023-09-29) save to a png rather than displaying plot_predictions( music_df, feature_name if feature_name is not None else config.feature_name, tokens, df_indices, ax=ax, title=title, ) plt.show() def read_config(config_path, config_cls)-
Expand source code
def read_config(config_path, config_cls): with open(config_path) as inf: config = config_cls(**yaml.safe_load(inf)) if config.debug: set_debug_hook() return config def read_config_oc(config_path: str | None, cli_args: list[str] | None, config_cls)-
Expand source code
def read_config_oc(config_path: str | None, cli_args: list[str] | None, config_cls): configs = [] assert config_path is not None or cli_args is not None if config_path is not None: configs.append(OmegaConf.load(config_path)) if cli_args is not None: configs.append(OmegaConf.from_cli(cli_args)) merged_conf = OmegaConf.merge(*configs) out = config_cls(**merged_conf) if getattr(out, "debug", False): set_debug_hook() return out def set_debug_hook()-
Expand source code
def set_debug_hook(): def custom_excepthook(exc_type, exc_value, exc_traceback): traceback.print_exception(exc_type, exc_value, exc_traceback, file=sys.stdout) pdb.post_mortem(exc_traceback) sys.excepthook = custom_excepthook def softmax(a)-
Expand source code
def softmax(a): z = np.exp(a) return z / np.sum(z, axis=-1, keepdims=True) def spinner(stop_event)-
Expand source code
def spinner(stop_event): spinner_cycle = itertools.cycle(["|", "/", "-", "\\"]) while not stop_event.is_set(): sys.stdout.write(next(spinner_cycle)) # write the next character sys.stdout.flush() # flush stdout buffer (actual character display) time.sleep(0.1) # wait a little before next cycle sys.stdout.write("\b") # backspace to overwrite the previous character def spinning_wheel()-
Expand source code
@contextmanager def spinning_wheel(): stop_event = threading.Event() spinner_thread = threading.Thread(target=spinner, args=(stop_event,)) spinner_thread.start() try: yield finally: stop_event.set() spinner_thread.join()