Source code for vampy.simulation.Artery

import importlib.util
import json
import pickle
from pprint import pprint

import numpy as np
from dolfin import set_log_level, MPI

from vampy.simulation.Probe import Probes  # type: ignore
from vampy.simulation.Womersley import make_womersley_bcs, compute_boundary_geometry_acrn
from vampy.simulation.simulation_common import get_file_paths, store_u_mean, print_mesh_information, \
    store_velocity_and_pressure_h5, dump_probes

# Check for oasis and oasismove
package_name_oasis = 'oasis'
package_name_oasismove = 'oasismove'
oasis_exists = importlib.util.find_spec(package_name_oasis)
oasismove_exists = importlib.util.find_spec(package_name_oasismove)
if oasismove_exists:
    from oasismove.problems.NSfracStep import *
elif oasis_exists:
    from oasis.problems.NSfracStep import *
else:
    print("Neither oasis nor oasismove is installed. Exiting simulation..")

# FEniCS specific command to control the desired level of logging, here set to critical errors
set_log_level(50)
comm = MPI.comm_world


[docs]def problem_parameters(commandline_kwargs, NS_parameters, NS_expressions, **NS_namespace): """ Problem file for running CFD simulation in arterial models consisting of one inlet, and two or more outlets. A Womersley velocity profile is applied at the inlet, and a flow split pressure condition is applied at the outlets, following [1]. Flow rate for the inlet condition, and flow split values for the outlets are computed from the pre-processing script automatedPreProcessing.py. The simulation is run for two cycles (adjustable), but only the results/solutions from the second cycle are stored to avoid non-physiological effects from the first cycle. One cardiac cycle is set to 0.951 s from [2], and scaled by a factor of 1000, hence all parameters are in [mm] or [ms]. [1] Gin, Ron, Anthony G. Straatman, and David A. Steinman. "A dual-pressure boundary condition for use in simulations of bifurcating conduits." J. Biomech. Eng. 124.5 (2002): 617-619. [2] Hoi, Yiemeng, et al. "Characterization of volumetric flow rate waveforms at the carotid bifurcations of older adults." Physiological measurement 31.3 (2010): 291. """ if "restart_folder" in commandline_kwargs.keys(): restart_folder = commandline_kwargs["restart_folder"] f = open(path.join(restart_folder, 'params.dat'), 'rb') NS_parameters.update(pickle.load(f)) NS_parameters['restart_folder'] = restart_folder else: # Parameters are in mm and ms cardiac_cycle = float(commandline_kwargs.get("cardiac_cycle", 951)) number_of_cycles = float(commandline_kwargs.get("number_of_cycles", 2)) NS_parameters.update( # Fluid parameters nu=3.3018e-3, # Kinematic viscosity: 0.0035 Pa-s / 1060 kg/m^3 = 3.3018E-6 m^2/s = 3.3018-3 mm^2/ms # Geometry parameters id_in=[], # Inlet boundary ID id_out=[], # Outlet boundary IDs area_ratio=[], # Area ratio for the flow outlets area_inlet=[], # Area of inlet in [mm^2] # Simulation parameters cardiac_cycle=cardiac_cycle, # Duration of cardiac cycle [ms] T=cardiac_cycle * number_of_cycles, # Simulation end time [ms] dt=0.0951, # Time step size [ms] dump_probe_frequency=100, # Dump frequency for sampling velocity & pressure at probes along the centerline save_solution_frequency=5, # Save frequency for velocity and pressure field save_solution_after_cycle=1, # Store solution after 1 cardiac cycle # Oasis specific parameters checkpoint=500, # Checkpoint frequency print_intermediate_info=100, # Frequency for printing solver statistics folder="results_artery", # Preferred results folder name mesh_path=commandline_kwargs["mesh_path"], # Path to the mesh # Solver parameters velocity_degree=1, # Polynomial order of finite element for velocity. Normally linear (1) or quadratic (2) pressure_degree=1, # Polynomial order of finite element for pressure. Normally linear (1) use_krylov_solvers=True, krylov_solvers=dict(monitor_convergence=False) ) mesh_file = NS_parameters["mesh_path"].split("/")[-1] case_name = mesh_file.split(".")[0] NS_parameters["folder"] = path.join(NS_parameters["folder"], case_name) if MPI.rank(comm) == 0: print("=== Starting simulation for case: {} ===".format(case_name)) print("Running with the following parameters:") pprint(NS_parameters)
[docs]def mesh(mesh_path, **NS_namespace): # Read mesh and print mesh information mesh = Mesh(mesh_path) print_mesh_information(mesh) return mesh
[docs]def create_bcs(t, NS_expressions, V, Q, area_ratio, area_inlet, mesh, mesh_path, nu, id_in, id_out, pressure_degree, **NS_namespace): # Mesh function boundary = MeshFunction("size_t", mesh, mesh.geometry().dim() - 1, mesh.domains()) # Read case parameters info = mesh_path.split(".xml")[0] + "_info.json" with open(info) as f: info = json.load(f) id_in[:] = info['inlet_id'] id_out[:] = info['outlet_ids'] id_wall = min(id_in + id_out) - 1 Q_mean = info['mean_flow_rate'] area_ratio[:] = info['area_ratio'] area_inlet.append(info['inlet_area']) # Load normalized time and flow rate values t_values, Q_ = np.loadtxt(path.join(path.dirname(path.abspath(__file__)), "ICA_values")).T Q_values = Q_mean * Q_ # Specific flow rate = Normalized flow wave form * Prescribed flow rate t_values *= 1000 # Scale time in normalised flow wave form to [ms] _, tmp_center, tmp_radius, tmp_normal = compute_boundary_geometry_acrn(mesh, id_in[0], boundary) # Create Womersley boundary condition at inlet inlet = make_womersley_bcs(t_values, Q_values, nu, tmp_center, tmp_radius, tmp_normal, V.ufl_element()) NS_expressions["inlet"] = inlet # Initialize inlet expressions with initial time for uc in inlet: uc.set_t(t) # Create pressure boundary condition area_out = [] ds = Measure("ds", domain=mesh, subdomain_data=boundary) for i, ind in enumerate(id_out): dsi = ds(ind) area_out.append(assemble(Constant(1.0) * dsi)) bc_p = [] if MPI.rank(comm) == 0: print("=== Initial pressure and area fraction ===") for i, ID in enumerate(id_out): p_initial = area_out[i] / sum(area_out) outflow = Expression("p", p=p_initial, degree=pressure_degree) bc = DirichletBC(Q, outflow, boundary, ID) bc_p.append(bc) NS_expressions[ID] = outflow if MPI.rank(comm) == 0: print(f"Boundary ID={ID}, pressure: {p_initial:.5f}, area fraction: {area_ratio[i]:0.5f}") # No slip condition at wall wall = Constant(0.0) # Create Boundary conditions for the velocity bc_wall = DirichletBC(V, wall, boundary, id_wall) bc_inlet = [DirichletBC(V, inlet[i], boundary, id_in[0]) for i in range(3)] # Return boundary conditions in dictionary return dict(u0=[bc_inlet[0], bc_wall], u1=[bc_inlet[1], bc_wall], u2=[bc_inlet[2], bc_wall], p=bc_p)
# Oasis hook called before simulation start
[docs]def pre_solve_hook(mesh, V, Q, newfolder, mesh_path, restart_folder, velocity_degree, cardiac_cycle, save_solution_after_cycle, dt, **NS_namespace): # Mesh function boundary = MeshFunction("size_t", mesh, mesh.geometry().dim() - 1, mesh.domains()) # Create point for evaluation n = FacetNormal(mesh) eval_dict = {} rel_path = mesh_path.split(".xml")[0] + "_probe_point.json" with open(rel_path, 'r') as infile: probe_points = np.array(json.load(infile)) # Store points file in checkpoint if MPI.rank(comm) == 0: probe_points.dump(path.join(newfolder, "Checkpoint", "points")) eval_dict["centerline_u_x_probes"] = Probes(probe_points.flatten(), V) eval_dict["centerline_u_y_probes"] = Probes(probe_points.flatten(), V) eval_dict["centerline_u_z_probes"] = Probes(probe_points.flatten(), V) eval_dict["centerline_p_probes"] = Probes(probe_points.flatten(), Q) if restart_folder is None: # Get files to store results files = get_file_paths(newfolder) NS_parameters.update(dict(files=files)) else: files = NS_namespace["files"] # Save mesh as HDF5 file for post-processing with HDF5File(comm, files["mesh"], "w") as mesh_file: mesh_file.write(mesh, "mesh") # Create vector function for storing velocity Vv = VectorFunctionSpace(mesh, "CG", velocity_degree) U = Function(Vv) u_mean = Function(Vv) u_mean0 = Function(V) u_mean1 = Function(V) u_mean2 = Function(V) # Time step when solutions for post-processing should start being saved save_solution_at_tstep = int(cardiac_cycle * save_solution_after_cycle / dt) return dict(eval_dict=eval_dict, boundary=boundary, n=n, U=U, u_mean=u_mean, u_mean0=u_mean0, u_mean1=u_mean1, u_mean2=u_mean2, save_solution_at_tstep=save_solution_at_tstep)
# Oasis hook called after each time step
[docs]def temporal_hook(u_, p_, mesh, tstep, dump_probe_frequency, eval_dict, newfolder, id_in, id_out, boundary, n, save_solution_frequency, NS_parameters, NS_expressions, area_ratio, t, save_solution_at_tstep, U, area_inlet, nu, u_mean0, u_mean1, u_mean2, **NS_namespace): # Update boundary condition to current time for uc in NS_expressions["inlet"]: uc.set_t(t) # Compute flux and update pressure condition if tstep > 2: Q_ideals, Q_in, Q_outs = update_pressure_condition(NS_expressions, area_ratio, boundary, id_in, id_out, mesh, n, tstep, u_) # Compute flow rates and updated pressure at outlets, and mean velocity and Reynolds number at inlet if MPI.rank(comm) == 0 and tstep % 10 == 0: U_mean = Q_in / area_inlet[0] diam_inlet = np.sqrt(4 * area_inlet[0] / np.pi) Re = U_mean * diam_inlet / nu print("=" * 10, "Time step " + str(tstep), "=" * 10) print(f"Sum of Q_out = {sum(Q_outs):.4f}, Q_in = {Q_in:.4f}, mean velocity (inlet): {U_mean:.4f}, " + f"Reynolds number (inlet): {Re:.4f}") for i, out_id in enumerate(id_out): print(f"For outlet with boundary ID={out_id}: target flow rate: {Q_ideals[i]:.4f} mL/s, " + f"computed flow rate: {Q_outs[i]:.4f} mL/s, pressure updated to: {NS_expressions[out_id].p:.4f}") print() # Sample velocity and pressure in points/probes eval_dict["centerline_u_x_probes"](u_[0]) eval_dict["centerline_u_y_probes"](u_[1]) eval_dict["centerline_u_z_probes"](u_[2]) eval_dict["centerline_p_probes"](p_) # Store sampled velocity and pressure if tstep % dump_probe_frequency == 0: dump_probes(eval_dict, newfolder, tstep) # Save velocity and pressure for post-processing if tstep % save_solution_frequency == 0 and tstep >= save_solution_at_tstep: store_velocity_and_pressure_h5(NS_parameters, U, p_, tstep, u_, u_mean0, u_mean1, u_mean2)
# Oasis hook called after the simulation has finished
[docs]def theend_hook(u_mean, u_mean0, u_mean1, u_mean2, T, dt, save_solution_at_tstep, save_solution_frequency, **NS_namespace): store_u_mean(T, dt, save_solution_at_tstep, save_solution_frequency, u_mean, u_mean0, u_mean1, u_mean2, NS_parameters)
[docs]def beta(err, p): """ Adjusted choice of beta for the dual-pressure boundary condition. Ramped up to desired value if flow rate error (err) increases Args: err (float): Flow split error p (float): Pressure value Returns: beta (float): Variable factor in flow split method """ if p < 0: if err >= 0.1: return 0.5 else: return 1 - 5 * err ** 2 else: if err >= 0.1: return 1.5 else: return 1 + 5 * err ** 2
[docs]def update_pressure_condition(NS_expressions, area_ratio, boundary, id_in, id_out, mesh, n, tstep, u_): """ Use a dual-pressure boundary condition as pressure condition at outlet. """ ds = Measure("ds", domain=mesh, subdomain_data=boundary) Q_in = abs(assemble(dot(u_, n) * ds(id_in[0], subdomain_data=boundary))) Q_outs = [] Q_ideals = [] for i, out_id in enumerate(id_out): Q_out = abs(assemble(dot(u_, n) * ds(out_id))) Q_outs.append(Q_out) Q_ideal = area_ratio[i] * Q_in Q_ideals.append(Q_ideal) p_old = NS_expressions[out_id].p R_optimal = area_ratio[i] R_actual = Q_out / Q_in M_err = abs(R_optimal / R_actual) R_err = abs(R_optimal - R_actual) if p_old < 0: E = 1 + R_err / R_optimal else: E = -1 * (1 + R_err / R_optimal) # 1) Linear update to converge first 100 time steps of first cycle delta = (R_optimal - R_actual) / R_optimal if tstep < 100: h = 0.1 if p_old > 1 and delta < 0: NS_expressions[out_id].p = p_old else: NS_expressions[out_id].p = p_old * (1 - delta * h) # 2) Dual pressure BC else: if p_old > 2 and delta < 0: NS_expressions[out_id].p = p_old else: NS_expressions[out_id].p = p_old * beta(R_err, p_old) * M_err ** E return Q_ideals, Q_in, Q_outs