from os import path
import numpy as np
from dolfin import FunctionSpace, Function, MPI, VectorFunctionSpace, Timer, project, sqrt, inner, HDF5File, XDMFFile, \
assign, CellDiameter, Mesh, TestFunction, list_timings, TimingClear, TimingType
from vampy.automatedPostprocessing.postprocessing_common import read_command_line, get_dataset_names, \
rate_of_dissipation, rate_of_strain
[docs]def compute_flow_and_simulation_metrics(folder, nu, dt, velocity_degree, T, times_to_average, save_frequency,
start_cycle, step, average_over_cycles):
"""
Computes several flow field characteristics and metrics for the velocity field.
Reads in the .h5 soution stored in the 'Solution' folder within the results' folder,
and stores the post-processing files in the `FlowMetrics` folder.
Args:
folder (str): The folder path where the data files are located.
nu (float): The kinematic viscosity value.
dt (float): The time step size.
velocity_degree (int): The degree of the velocity function space.
T (float): Length of one cardiac cycle, in [ms]
times_to_average (list): A list of time points to perform phase averaging. Needs to be in the inverval [0,T).
save_frequency (int): The frequency of saving data during the simulation.
start_cycle (int): The starting cycle number for averaging.
step (int): The step size for extracting data.
average_over_cycles (bool): A flag indicating whether to perform cycle averaging.
"""
# File paths
file_path_u = path.join(folder, "u.h5")
file_path_u_avg = path.join(folder, "u_avg.h5")
mesh_path = path.join(folder, "mesh.h5")
file_u = HDF5File(MPI.comm_world, file_path_u, "r")
# Get names of data to extract
start = 0
if MPI.rank(MPI.comm_world) == 0:
print("Reading dataset names")
dataset_names = get_dataset_names(file_u, start=start, step=step)
# Extract specific time steps if phase averaging
saved_time_steps_per_cycle = int(T / dt / save_frequency / step)
number_of_cycles = int(len(dataset_names) / saved_time_steps_per_cycle)
cycles_to_average = None
# Get mesh information
mesh = Mesh()
with HDF5File(MPI.comm_world, mesh_path, "r") as mesh_file:
mesh_file.read(mesh, "mesh", False)
# Function space
DG = FunctionSpace(mesh, 'DG', 0)
V = VectorFunctionSpace(mesh, "CG", velocity_degree)
Vv = FunctionSpace(mesh, "CG", velocity_degree)
if MPI.rank(MPI.comm_world) == 0:
print("Computing average velocity – u_avg(x,t)")
# Define u_avg(x,t)
u = Function(V)
u_avg = Function(V)
# Read velocity and compute cycle averaged velocity
if not path.exists(file_path_u_avg):
compute_u_avg(dataset_names, file_path_u_avg, file_u, number_of_cycles, saved_time_steps_per_cycle, start_cycle,
u, u_avg)
# Perform phase averaging (Average over cycles at specified time point(s))
if len(times_to_average) != 0:
dataset_dict, dataset_dict_avg, number_of_files = \
get_files_for_phase_averaging(times_to_average, dt, save_frequency, step, number_of_cycles, start_cycle,
saved_time_steps_per_cycle, dataset_names)
# Perform cycle averaging (Average per cycle) and time averaging (Average of all cycles)
else:
dataset_dict, dataset_dict_avg, number_of_files = \
get_files_for_cycle_averaging(dataset_names, file_path_u_avg, number_of_cycles, saved_time_steps_per_cycle,
start_cycle)
if average_over_cycles:
cycles_to_average = [cycle + 1 for cycle in range(number_of_cycles)]
# Compute flow and simulation metrics
for time_to_average, dataset in dataset_dict.items():
if len(times_to_average) != 0 and MPI.rank(MPI.comm_world) == 0:
print(f"Phase averaging results over {number_of_files} cycles at t={time_to_average} ms")
define_functions_and_iterate_dataset(time_to_average, dataset, dataset_dict_avg[time_to_average], dt, file_u,
file_path_u_avg, file_path_u, mesh, nu, number_of_files, DG, V, Vv,
cycles_to_average, saved_time_steps_per_cycle)
[docs]def compute_u_avg(dataset_names, file_path_u_avg, file_u, n_cycles, saved_time_steps_per_cycle,
start_cycle, u, u_avg):
"""
Iterate over saved time steps and compute average velocity based on save sampling parameters
Args:
dataset_names (dict): Contains timestep and respective names of solution timesteps
file_path_u_avg (str): File path to average velocity solution
file_u (str): File path to velocity solution
n_cycles (int): Number of cardiac cycles
saved_time_steps_per_cycle (int): Determines number of saved time steps per cycle
start_cycle (int): Determines which cardiac cycle to start sampling from
u (Function): Function storing velocity
u_avg (Function): Function for storing average velocity
start_cycle (int): Determines which cardiac cycle to start from for post-processing
"""
for save_step in range(saved_time_steps_per_cycle):
time = -1
for cycle in range(start_cycle - 1, n_cycles):
data = dataset_names[save_step + cycle * saved_time_steps_per_cycle]
# Set time step
if time == -1:
time = int(file_u.attributes(data)["timestamp"])
# Accumulate velocity
file_u.read(u, data)
u_avg.vector().axpy(1, u.vector())
# Average over pre-defined amount of cycles
u_avg.vector()[:] /= (n_cycles - start_cycle + 1)
if MPI.rank(MPI.comm_world) == 0:
print("=" * 10, f"Computing average velocity at time: {time} ms", "=" * 10)
# Save average velocity to HDF5 format
file_mode = "w" if save_step == 0 else "a"
viz_u_avg = HDF5File(MPI.comm_world, file_path_u_avg, file_mode=file_mode)
viz_u_avg.write(u_avg, "/velocity", time)
viz_u_avg.close()
# Reset u_avg vector
u_avg.vector().zero()
[docs]def get_files_for_cycle_averaging(dataset_names, file_path_u_avg, number_of_cycles, saved_time_steps_per_cycle,
start_cycle):
"""
Retrieves the dataset dictionaries for cycle averaging.
Args:
dataset_names (list): A list of dataset names.
file_path_u_avg (str): The file path for averaging the dataset.
number_of_cycles (int): The total number of cycles.
saved_time_steps_per_cycle (int): The number of time steps saved per cycle.
start_cycle (int): The starting cycle number for averaging.
Returns:
dataset_dict (dict): A dictionary containing dataset names for cycle averaging.
dataset_dict_avg (dict): A dictionary containing averaged dataset names.
number_of_files (int): The total number of files.
"""
# Create a dataset with dataset names from the starting index
id_start = (start_cycle - 1) * saved_time_steps_per_cycle
dataset_dict = {"": dataset_names[id_start:]}
# Create similar dataset for the mean velocity
file_u_avg = HDF5File(MPI.comm_world, file_path_u_avg, "r")
dataset_dict_avg = {"": get_dataset_names(file_u_avg, start=0, step=1) * (number_of_cycles - start_cycle + 1)}
# Get number of files based on the sliced dataset names
number_of_files = len(dataset_dict[""])
return dataset_dict, dataset_dict_avg, number_of_files
[docs]def get_files_for_phase_averaging(times_to_average, dt, save_frequency, step, number_of_cycles, start_cycle,
saved_time_steps_per_cycle, dataset_names):
"""
Generates dataset dictionaries for phase averaging based on the selected times.
Args:
times_to_average (list): A list of time points to perform phase averaging.
dt (float): The time step size.
save_frequency (int): The frequency of saving data during the simulation.
step (int): The step size for extracting data.
number_of_cycles (int): The total number of cycles.
start_cycle (int): The starting cycle number for averaging.
saved_time_steps_per_cycle (int): The number of time steps saved per cycle.
dataset_names (list): A list of dataset names.
Returns:
dataset_dict (dict): A dictionary mapping time points to corresponding dataset names.
dataset_dict_avg (dict): A dictionary mapping time points to averaged dataset names.
number_of_files (int): The total number of files.
"""
dataset_dict = {}
dataset_dict_avg = {}
# Iterate over selected times to average over
for t in times_to_average:
# Find corresponding IDs for dataset_name list
time_id = int(t / dt / save_frequency / step)
time_ids = [time_id + saved_time_steps_per_cycle * k for k in range(number_of_cycles)][start_cycle - 1:]
# Get dataset_names at corresponding times
dataset_dict[f"_{t}"] = [dataset_names[max(0, j - 1)] for j in time_ids]
dataset_dict_avg[f"_{t}"] = [dataset_names[max(0, time_id - 1)]] * len(time_ids)
# Get the number of files for the last time point
number_of_files = len(dataset_dict[f"_{times_to_average[-1]}"])
return dataset_dict, dataset_dict_avg, number_of_files
[docs]def define_functions_and_iterate_dataset(time_to_average, dataset, dataset_avg, dt, file_u, file_path_u_avg,
file_path_u, mesh, nu, number_of_files, DG, V, Vv, cycles_to_average,
saved_time_steps_per_cycle):
"""
Defines functions and vector functions for all metrics to be computed, iterates through dataset and computes
several flow and simulation metrics.
After iteration, saves the metrics to .xdmf file format.
Args:
time_to_average (int): Time in ms to average over
number_of_files (int): Number of dataset files to iterate over
DG (FunctionSpace): Discontinous Galerkin function space
V (VectorFunctionSpace): Continous Galerkin function space for vector solutions
Vv (FunctionSpace): Continous Galerkin function space for scalar solutions
mesh (Mesh): Volumetric mesh of computational domain
dataset (dict): Contains velocity solution dict keys
dataset_avg (dict): Contains average velocity solution dict keys
file_path_u (str): File path to velocity solution
file_path_u_avg (str): File path to average velocity solution
file_u (str): File path to velocity solution
saved_time_steps_per_cycle (int): Determines number of saved time steps per cycle
folder (str): Path to simulation results
nu (float): Viscosity
dt (float): Time step in [ms]
cycles_to_average (list): List of which cycles to average
"""
# Functions for storing values
v = TestFunction(DG)
u = Function(V)
u_avg = Function(V)
u_time_avg = Function(V)
u_time_cycle_avg = Function(V)
u_prime = Function(V)
# Plus-values
l_plus_avg = Function(DG)
l_plus_cycle_avg = Function(DG)
l_plus = Function(DG)
t_plus_avg = Function(DG)
t_plus_cycle_avg = Function(DG)
t_plus = Function(DG)
# Kolmogorov scales
length_scale = Function(DG)
length_scale_avg = Function(DG)
length_scale_cycle_avg = Function(DG)
time_scale = Function(DG)
time_scale_avg = Function(DG)
time_scale_cycle_avg = Function(DG)
velocity_scale = Function(DG)
velocity_scale_avg = Function(DG)
velocity_scale_cycle_avg = Function(DG)
# Inner grad(u), grad(u)
turbulent_dissipation = Function(DG)
turbulent_dissipation_avg = Function(DG)
turbulent_dissipation_cycle_avg = Function(DG)
strain = Function(DG)
strain_avg = Function(DG)
strain_cycle_avg = Function(DG)
dissipation = Function(DG)
dissipation_avg = Function(DG)
dissipation_cycle_avg = Function(DG)
# Energy
kinetic_energy = Function(Vv)
kinetic_energy_avg = Function(Vv)
kinetic_energy_cycle_avg = Function(Vv)
turbulent_kinetic_energy = Function(Vv)
turbulent_kinetic_energy_avg = Function(Vv)
turbulent_kinetic_energy_cycle_avg = Function(Vv)
# Velocity
u0 = Function(Vv)
u1 = Function(Vv)
u2 = Function(Vv)
u0_prime = Function(Vv)
u1_prime = Function(Vv)
u2_prime = Function(Vv)
# CFL
CFL = Function(DG)
CFL_avg = Function(DG)
CFL_cycle_avg = Function(DG)
# Characteristic edge length
h = CellDiameter(mesh)
characteristic_edge_length = project(h, DG)
# Create XDMF files for saving metrics
if cycles_to_average is None:
save_path = file_path_u.replace("u.h5", "%s{}%s.xdmf".format(time_to_average))
cycles_to_average = []
else:
save_path = file_path_u.replace("u.h5", "%s%s.xdmf")
number_of_files = len(cycles_to_average)
save_path = save_path.replace("Solutions", "FlowMetrics")
metric_names = ["characteristic_edge_length", "u_time_avg", "l_plus", "t_plus", "CFL", "strain", "length_scale",
"time_scale", "velocity_scale", "dissipation", "kinetic_energy", "turbulent_kinetic_energy",
"turbulent_dissipation"]
metric_variables_cycle_avg = [characteristic_edge_length, u_time_cycle_avg, l_plus_cycle_avg, t_plus_cycle_avg,
CFL_cycle_avg, strain_cycle_avg, length_scale_cycle_avg, time_scale_cycle_avg,
velocity_scale_cycle_avg, dissipation_cycle_avg, kinetic_energy_cycle_avg,
turbulent_kinetic_energy_cycle_avg, turbulent_dissipation_cycle_avg]
metric_variables_avg = [characteristic_edge_length, u_time_avg, l_plus_avg, t_plus_avg, CFL_avg, strain_avg,
length_scale_avg, time_scale_avg, velocity_scale_avg, dissipation_avg, kinetic_energy_avg,
turbulent_kinetic_energy_avg, turbulent_dissipation_avg]
metric_dict_cycle = dict(zip(metric_names, metric_variables_cycle_avg))
metric_dict = dict(zip(metric_names, metric_variables_avg))
counters_to_save = [saved_time_steps_per_cycle * cycle for cycle in cycles_to_average]
cycle_names = [""] + ["_cycle_{:02d}".format(cycle) for cycle in cycles_to_average]
metrics = {}
for cycle_name in cycle_names:
for vn in metric_dict.keys():
metrics[vn + cycle_name] = XDMFFile(MPI.comm_world, save_path % (vn, cycle_name))
metrics[vn + cycle_name].parameters["rewrite_function_mesh"] = False
metrics[vn + cycle_name].parameters["flush_output"] = True
# Get u average
file_u_avg = HDF5File(MPI.comm_world, file_path_u_avg, "r")
if MPI.rank(MPI.comm_world) == 0:
print("=" * 10, "Starting post processing", "=" * 10)
counter = 0
tolerance = 1E-12
for data, data_avg in zip(dataset, dataset_avg):
counter += 1
# Read velocity and cycle averaged velocity
file_u_avg.read(u, data_avg)
assign(u_avg, u)
file_u.read(u, data)
if MPI.rank(MPI.comm_world) == 0:
time = file_u.attributes(data)["timestamp"]
print("=" * 10, f"Time: {time} ms", "=" * 10)
# Compute time averaged velocity
t0 = Timer("Time averaged velocity")
u_time_cycle_avg.vector().axpy(1, u_avg.vector())
t0.stop()
# Compute CFL
t0 = Timer("CFL number")
u_mag = project(sqrt(inner(u, u)), DG)
CFL.vector().set_local(u_mag.vector().get_local() / characteristic_edge_length.vector().get_local() * dt)
CFL.vector().apply("insert")
CFL_cycle_avg.vector().axpy(1, CFL.vector())
t0.stop()
# Compute rate-of-strain
t0 = Timer("Rate of strain")
rate_of_strain(strain, u, v, mesh, h)
strain_cycle_avg.vector().axpy(1, strain.vector())
t0.stop()
# Compute l+
t0 = Timer("l plus")
u_star = np.sqrt(strain.vector().get_local() * nu)
l_plus.vector().set_local(u_star * characteristic_edge_length.vector().get_local() / nu)
l_plus.vector().apply("insert")
l_plus_cycle_avg.vector().axpy(1, l_plus.vector())
t0.stop()
# Compute t+
t0 = Timer("t plus")
t_plus.vector().set_local(u_star ** 2 * dt / nu)
t_plus.vector().apply("insert")
t_plus_cycle_avg.vector().axpy(1, t_plus.vector())
t0.stop()
# Compute Kolmogorov
t0 = Timer("Dissipation")
rate_of_dissipation(dissipation, u, v, mesh, h, nu)
dissipation_cycle_avg.vector().axpy(1, dissipation.vector())
t0.stop()
# Compute u_prime
t0 = Timer("u prime")
u_prime.vector().set_local(u.vector().get_local() - u_avg.vector().get_local())
u_prime.vector().apply("insert")
t0.stop()
# Compute Turbulent dissipation
t0 = Timer("Turbulent dissipation")
rate_of_dissipation(turbulent_dissipation, u_prime, v, mesh, h, nu)
turbulent_dissipation_cycle_avg.vector().axpy(1, turbulent_dissipation.vector())
eps = turbulent_dissipation.vector().get_local()
t0.stop()
# Compute length scale
t0 = Timer("Length scale")
length_scale.vector().set_local((nu ** 3 / (eps + tolerance)) ** (1. / 4))
length_scale.vector().apply("insert")
length_scale_cycle_avg.vector().axpy(1, length_scale.vector())
t0.stop()
# Compute time scale
t0 = Timer("Time scale")
time_scale.vector().set_local((nu / (eps + tolerance)) ** 0.5)
time_scale.vector().apply("insert")
time_scale_cycle_avg.vector().axpy(1, time_scale.vector())
t0.stop()
# Compute velocity scale
t0 = Timer("Velocity scale")
velocity_scale.vector().set_local((eps * nu) ** (1. / 4))
velocity_scale.vector().apply("insert")
velocity_scale_cycle_avg.vector().axpy(1, velocity_scale.vector())
t0.stop()
# Compute both kinetic energy and turbulent kinetic energy
t0 = Timer("Kinetic energy")
assign(u0, u.sub(0))
assign(u1, u.sub(1))
if mesh.geometry().dim() == 3:
assign(u2, u.sub(2))
kinetic_energy.vector().set_local(
0.5 * (u0.vector().get_local() ** 2 + u1.vector().get_local() ** 2 + u2.vector().get_local() ** 2))
kinetic_energy.vector().apply("insert")
kinetic_energy_cycle_avg.vector().axpy(1, kinetic_energy.vector())
t0.stop()
t0 = Timer("Turbulent kinetic energy")
assign(u0_prime, u_prime.sub(0))
assign(u1_prime, u_prime.sub(1))
if mesh.geometry().dim() == 3:
assign(u2_prime, u_prime.sub(2))
turbulent_kinetic_energy.vector().set_local(
0.5 * (u0_prime.vector().get_local() ** 2
+ u1_prime.vector().get_local() ** 2
+ u2_prime.vector().get_local() ** 2))
turbulent_kinetic_energy.vector().apply("insert")
turbulent_kinetic_energy_cycle_avg.vector().axpy(1, turbulent_kinetic_energy.vector())
t0.stop()
if counter % 10 == 0:
list_timings(TimingClear.clear, [TimingType.wall])
if len(cycles_to_average) != 0 and counter == counters_to_save[0]:
# Get cycle number
cycle = int(counters_to_save[0] / saved_time_steps_per_cycle)
if MPI.rank(MPI.comm_world) == 0:
print("========== Storing cardiac cycle {} ==========".format(cycle))
# Get average over sampled time steps
for metric in list(metric_dict_cycle.values())[1:]:
metric.vector()[:] = metric.vector()[:] / saved_time_steps_per_cycle
# Store solution
for name, metric in metric_dict_cycle.items():
metrics[name + "_cycle_{:02d}".format(cycle)].write_checkpoint(metric, name)
# Append solution to total solution
for metric_avg, metric_cycle_avg in zip(list(metric_dict.values())[1:],
list(metric_dict_cycle.values())[1:]):
metric_cycle_avg.vector().apply("insert")
metric_avg.vector().axpy(1, metric_cycle_avg.vector())
# Reset tmp solutions
for metric_cycle_avg in list(metric_dict_cycle.values())[1:]:
metric_cycle_avg.vector().zero()
counters_to_save.pop(0)
# Get average over sampled time steps
metrics_dict_to_save = metric_dict if len(cycles_to_average) != 0 else metric_dict_cycle
if len(cycles_to_average) != 0:
number_of_files = len(cycles_to_average)
for metric in metrics_dict_to_save.values():
metric.vector()[:] = metric.vector()[:] / number_of_files
# Store average data
if MPI.rank(MPI.comm_world) == 0:
print("=" * 10, "Saving flow and simulation metrics", "=" * 10)
for name, metric in metrics_dict_to_save.items():
metrics[name].write_checkpoint(metric, name)
# Print summary info
if MPI.rank(MPI.comm_world) == 0:
print("=" * 10, "Flow and simulation metrics summary", "=" * 10)
for metric_name, metric_value in metrics_dict_to_save.items():
sum_ = MPI.sum(MPI.comm_world, np.sum(metric_value.vector().get_local()))
num = MPI.sum(MPI.comm_world, metric_value.vector().get_local().shape[0])
mean = sum_ / num
max_ = MPI.max(MPI.comm_world, metric_value.vector().get_local().max())
min_ = MPI.min(MPI.comm_world, metric_value.vector().get_local().min())
if MPI.rank(MPI.comm_world) == 0:
print(metric_name, "mean:", mean)
print(metric_name, "max:", max_)
print(metric_name, "min:", min_)
if MPI.rank(MPI.comm_world) == 0:
print("=" * 10, "Post processing finished", "=" * 10)
save_folder = save_path.rsplit("/", 1)[0]
print(f"Results saved to: {save_folder}")
[docs]def main_metrics():
folder, nu, _, dt, velocity_degree, _, _, T, save_frequency, times_to_average, start_cycle, step, \
average_over_cycles = read_command_line()
compute_flow_and_simulation_metrics(folder, nu, dt, velocity_degree, T, times_to_average, save_frequency,
start_cycle, step, average_over_cycles)
if __name__ == '__main__':
folder, nu, _, dt, velocity_degree, _, _, T, save_frequency, times_to_average, start_cycle, step, \
average_over_cycles = read_command_line()
compute_flow_and_simulation_metrics(folder, nu, dt, velocity_degree, T, times_to_average, save_frequency,
start_cycle, step, average_over_cycles)