Source code for vasp.automatedPostprocessing.postprocessing_h5py.create_spectrograms_chromagrams

# Copyright (c) 2023 David Bruneau
# SPDX-License-Identifier: GPL-3.0-or-later

"""
This script creates spectrograms, power spectral density and chromagrams from formatted matrices (.npz files)"
"""

import logging
from pathlib import Path
from typing import Union, Optional

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from mpl_toolkits.axes_grid1 import make_axes_locatable

from vasp.automatedPostprocessing.postprocessing_h5py import spectrograms as spec
from vasp.automatedPostprocessing.postprocessing_common import read_parameters_from_file


[docs] def create_spectrogram_composite(case_name: str, quantity: str, df: pd.DataFrame, start_t: float, end_t: float, num_windows_per_sec: float, overlap_frac: float, window: str, lowcut: float, min_color: float, max_color: float, image_folder: Union[str, Path], flow_rate_file: Optional[Union[str, Path]] = None, amplitude_file: Optional[Union[str, Path]] = None, power_scaled: bool = False, ylim: Optional[float] = None) -> None: """ Create a composite spectrogram figure. Args: case_name (str): Path to simulation results. quantity (str): Quantity to postprocess. df (pd.DataFrame): Input dataframe. start_t (float): Start time for analysis. end_t (float): End time for analysis. num_windows_per_sec (float): Number of windows per second. overlap_frac (float): Overlap fraction for windows. window (str): Type of window for spectrogram. lowcut (float): Lowcut frequency for high-pass filter. min_color (float): Minimum value for the color range. max_color (float): Maximum value for the color range. image_folder (str or Path): Folder to save the images. flow_rate_file (str or Path, optional): File containing flow rate data. amplitude_file (str or Path, optional): File containing amplitude data. power_scaled (bool, optional): Whether to scale the power. ylim (float, optional): Y-axis limit for the plot. """ # Calculate number of windows (you can adjust this equation to fit your temporal/frequency resolution needs) num_windows = np.round(num_windows_per_sec * (end_t - start_t)) + 3 # Get sampling constants logging.info("--- Getting sampling constants\n") T, _, fs = spec.get_sampling_constants(df, start_t, end_t) # High-pass filter dataframe for spectrogram logging.info("--- Filtering time data") df_filtered = spec.filter_time_data(df, fs, lowcut=lowcut, highcut=15000.0, order=6, btype='highpass') # PSD calculation logging.info("\n--- Calculating power spectral density of filtered dataframe") Pxx_array, freq_array = spec.get_psd(df_filtered, fs) # Plot PSD plt.plot(freq_array, Pxx_array) plt.xlabel('Freq. (Hz)') plt.ylabel('input units^2/Hz') # Set ylim if provided if ylim is not None: plt.xlim([0, ylim]) psd_filename = f"{quantity}_psd_{case_name}" path_to_psd_figure = Path(image_folder) / (psd_filename + '.png') # Save the figure logging.info(f"--- Saving PSD figure to {path_to_psd_figure}\n") plt.savefig(path_to_psd_figure) # Create composite figure if amplitude_file and flow_rate_file: fig1, axes = plt.subplots(5, sharex=True, gridspec_kw={'height_ratios': [1, 3, 1, 1, 1]}) assert isinstance(axes, np.ndarray) and axes.shape == (5,) ax1, ax2, ax3, ax4, ax5 = axes elif flow_rate_file: fig1, axes = plt.subplots(4, sharex=True, gridspec_kw={'height_ratios': [1, 3, 1, 1]}) assert isinstance(axes, np.ndarray) and axes.shape == (4,) ax1, ax2, ax3, ax4 = axes elif amplitude_file: fig1, axes = plt.subplots(4, sharex=True, gridspec_kw={'height_ratios': [3, 1, 1, 1]}) assert isinstance(axes, np.ndarray) and axes.shape == (4,) ax2, ax3, ax4, ax5 = axes else: fig1, axes = plt.subplots(3, sharex=True, gridspec_kw={'height_ratios': [3, 1, 1]}) assert isinstance(axes, np.ndarray) and axes.shape == (3,) ax2, ax3, ax4 = axes fig1.set_size_inches(7.5, 9) logging.info("--- Computing average spectrogram") # Specs with Reyynolds number bins, freqs, Pxx, max_val, min_val, lower_thresh = \ spec.compute_average_spectrogram(df_filtered, fs, num_windows, overlap_frac, window, start_t, end_t, min_color, scaling="spectrum", filter_data=False, thresh_method="old") bins = bins + start_t # Need to shift bins so that spectrogram timing is correct spec.plot_spectrogram(fig1, ax2, bins, freqs, Pxx, ylim, color_range=[min_color, max_color]) # Chromagram ------------------------------------------------------------ n_fft = spec.shift_bit_length(int(df.shape[1] / num_windows)) * 2 n_chroma = 24 logging.info("\n--- Recomputing average spectrogram without filter") # Recalculate spectrogram without filtering the data bins_raw, freqs_raw, Pxx_raw, max_val_raw, min_val_raw, lower_thresh_raw = \ spec.compute_average_spectrogram(df, fs, num_windows, overlap_frac, window, start_t, end_t, min_color, scaling="spectrum", filter_data=False, thresh_method="old") bins_raw = bins_raw + start_t # Need to shift bins so that spectrogram timing is correct # Reverse the log of the data Pxx_raw = np.exp(Pxx_raw) # Calculate chromagram # Normalize so that all chroma in column sum to 1 (other option is "max", which sets the max value # in each column to 1) norm = "sum" chroma = spec.chromagram_from_spectrogram(Pxx_raw, fs, n_fft, n_chroma=n_chroma, norm=norm) if power_scaled: chroma_power = chroma * (Pxx.max(axis=0) - min_color) spec.plot_chromagram(fig1, ax3, bins_raw, chroma_power) else: spec.plot_chromagram(fig1, ax3, bins_raw, chroma) # Hack to make all the x axes of the subplots align divider2 = make_axes_locatable(ax4) cax2 = divider2.append_axes("right", size="5%", pad=0.9) cax2.remove() # Calculate SBI chroma_entropy = spec.calc_chroma_entropy(chroma, n_chroma) # Plot SBI if power_scaled: chroma_entropy_power = chroma_entropy * (Pxx.max(axis=0) - min_color) ax4.plot(bins, chroma_entropy_power) else: ax4.plot(bins, chroma_entropy) ax4.set_ylabel('SBI') # Plot Flow Rate or inlet velocity from input file if flow_rate_file: flow_rates = np.loadtxt(flow_rate_file) flow_rates = flow_rates[np.where((flow_rates[:, 0] > start_t) & (flow_rates[:, 0] < end_t))] ax1.plot(flow_rates[:, 0], flow_rates[:, 1]) ax1.set_ylabel('Flow Rate (normalized)') # Hack to make all the x axes of the subplots align divider = make_axes_locatable(ax1) cax = divider.append_axes("right", size="5%", pad=0.9) cax.remove() if amplitude_file: # Hack to make all the x axes of the subplots align divider = make_axes_locatable(ax5) cax = divider.append_axes("right", size="5%", pad=0.9) cax.remove() output_amplitudes = np.genfromtxt(amplitude_file, delimiter=',') output_amplitudes = output_amplitudes[output_amplitudes[:, 0] >= start_t] output_amplitudes = output_amplitudes[output_amplitudes[:, 0] <= end_t] ax5.plot(output_amplitudes[:, 0], output_amplitudes[:, 10], label=case_name) ax5.set_ylabel("Amplitude") ax5.set_xlabel('Time [s]') else: ax4.set_xlabel('Time [s]') # Name composite figure and save composite_figure_name = f"{quantity}_{case_name}_{num_windows}_windows_thresh{min_color}_composite_figure" if power_scaled: composite_figure_name += "_power_scaled" composite_figure_path = Path(image_folder) / (composite_figure_name + '.png') logging.info(f"\n--- Saving composite figure to {composite_figure_path}\n") fig1.savefig(composite_figure_path) # create separate spectrogram figure fig2, ax2_1 = plt.subplots() fig2.set_size_inches(7.5, 5) title = f"Pxx max = {max_val:.2e}, Pxx min = {min_val:.2e}, threshold Pxx = {lower_thresh}" spec.plot_spectrogram(fig2, ax2_1, bins, freqs, Pxx, ylim, title=title, x_label="Time (s)", color_range=[min_color, max_color]) # Save data to files (spectrogram) spec_filename = f"{quantity}_{case_name}_{num_windows}_windows_thresh{min_color}_spectrogram" path_to_spec = Path(image_folder) / (spec_filename + '.png') logging.info(f"--- Saving spectrogram figure to {path_to_spec}\n") fig2.savefig(path_to_spec) output_csv_path = path_to_spec.with_suffix('.csv') data_csv = np.append(freqs[np.newaxis].T, Pxx, axis=1) bins_txt = np.array2string(bins, max_line_width=10000, precision=2, separator=',').replace("[", "").replace("]", "") np.savetxt(output_csv_path, data_csv, header=bins_txt, delimiter=",") # Save data to files (chromagram) chroma_filename = f"{quantity}_{case_name}_{num_windows}_windows_chromagram" path_to_chroma = Path(image_folder) / (chroma_filename + '.png') chroma_y = np.linspace(0, 1, chroma.shape[0]) output_csv_path = path_to_chroma.with_suffix('.csv') data_csv = np.append(chroma_y[np.newaxis].T, chroma, axis=1) bins_txt = \ np.array2string(bins_raw, max_line_width=10000, precision=2, separator=',').replace("[", "").replace("]", "") np.savetxt(output_csv_path, data_csv, header=bins_txt, delimiter=",") # Save data to files (SBI) sbi_filename = f"{quantity}_{case_name}_{num_windows}_windows_SBI" path_to_sbi = Path(image_folder) / (sbi_filename + '.png') csv_output_path = path_to_sbi.with_suffix('.csv') data_csv = np.array([bins, chroma_entropy]).T np.savetxt(csv_output_path, data_csv, header="t (s), SBI", delimiter=",")
[docs] def main(): # Load in case-specific parameters args = spec.read_command_line_spec() # Create logger and set log level logging.basicConfig(level=args.log_level, format="%(message)s") # Load parameters from default_parameters.json parameters = read_parameters_from_file(args.folder) # Extract parameters fsi_region = parameters["fsi_region"] if args.fsi_region is None else args.fsi_region fluid_domain_id = parameters["dx_f_id"] solid_domain_id = parameters["dx_s_id"] end_time = args.end_time if args.end_time is not None else parameters["T"] save_deg = args.save_deg if args.save_deg is not None else (1 if args.quantity == 'p' else parameters["save_deg"]) # Create or read in spectrogram dataframe quantity, df, case_name, image_folder, visualization_hi_pass_folder = \ spec.read_spectrogram_data(args.folder, args.mesh_path, save_deg, args.stride, args.start_time, end_time, args.n_samples, args.sampling_region, args.fluid_sampling_domain_id, args.solid_sampling_domain_id, fsi_region, args.quantity, args.interface_only, args.component, args.point_ids, fluid_domain_id, solid_domain_id, sampling_method=args.sampling_method) # Should these files be used? # amplitude_file = Path(visualization_hi_pass_folder) / args.amplitude_file_name # flow_rate_file = Path(args.folder) / args.flow_rate_file_name # Create spectrograms create_spectrogram_composite(case_name, quantity, df, args.start_time, end_time, args.num_windows_per_sec, args.overlap_frac, args.window, args.lowcut, args.min_color, args.max_color, image_folder, flow_rate_file=None, amplitude_file=None, power_scaled=False, ylim=args.ylim) if args.sampling_method == "PointList": spec.sonify_point(case_name, quantity, df, args.start_time, end_time, args.overlap_frac, args.lowcut, image_folder)
if __name__ == '__main__': main()