"""
Problem file for aneurysm FSI simulation
"""
from pathlib import Path
import numpy as np
from vampy.simulation.Womersley import make_womersley_bcs, compute_boundary_geometry_acrn
from vampy.simulation.simulation_common import print_mesh_information
from turtleFSI.problems import *
from dolfin import HDF5File, Mesh, MeshFunction, facets, assemble, sqrt, FacetNormal, ds, \
DirichletBC, Measure, inner, parameters, VectorFunctionSpace, Function, XDMFFile
from vasp.simulations.simulation_common import load_probe_points, calculate_and_print_flow_properties, \
print_probe_points, InterfacePressure
# set compiler arguments
parameters["form_compiler"]["quadrature_degree"] = 6
parameters["form_compiler"]["optimize"] = True
# The "ghost_mode" has to do with the assembly of form containing the facet
# normals n('+') within interior boundaries (dS). for 3D mesh the value should
# be "shared_vertex", for 2D mesh "shared_facet", the default value is "none"
parameters["ghost_mode"] = "shared_vertex"
_compiler_parameters = dict(parameters["form_compiler"])
[docs]
def set_problem_parameters(default_variables, **namespace):
# Compute some solid parameters
# Need to stay here since mus_s and lambda_s are functions of nu_s and E_s
E_s_val = 1E6
nu_s_val = 0.45
mu_s_val = E_s_val / (2 * (1 + nu_s_val)) # 0.345E6
lambda_s_val = nu_s_val * 2. * mu_s_val / (1. - 2. * nu_s_val)
default_variables.update(dict(
# Temporal parameters
T=0.002, # Simulation end time
dt=0.001, # Timne step size
theta=0.501, # Theta scheme parameter
save_step=1, # Save frequency of files for visualisation
save_solution_after_tstep=951, # Start saving the solution after this time step for the mean value
checkpoint_step=50, # Save frequency of checkpoint files
# Linear solver parameters
linear_solver="mumps",
atol=1e-10, # Absolute tolerance in the Newton solver
rtol=1e-9, # Relative tolerance in the Newton solver
recompute=20, # Recompute the Jacobian matix within time steps
recompute_tstep=20, # Recompute the Jacobian matix over time steps
# boundary condition parameters
inlet_id=2, # inlet id for the fluid
inlet_outlet_s_id=11, # inlet and outlet id for solid
fsi_id=22, # id for fsi surface
rigid_id=11, # "rigid wall" id for the fluid
outer_id=33, # id for the outer surface of the solid
# Fluid parameters
Q_mean=1.25E-06,
P_mean=11200,
T_Cycle=0.951, # Used to define length of flow waveform
rho_f=1.000E3, # Fluid density [kg/m3]
mu_f=3.5E-3, # Fluid dynamic viscosity [Pa.s]
dx_f_id=1, # ID of marker in the fluid domain
# mesh lifting parameters (see turtleFSI for options)
extrapolation="laplace",
extrapolation_sub_type="constant",
# Solid parameters
rho_s=1.0E3, # Solid density [kg/m3]
mu_s=mu_s_val, # Solid shear modulus or 2nd Lame Coef. [Pa]
nu_s=nu_s_val, # Solid Poisson ratio [-]
lambda_s=lambda_s_val, # Solid Young's modulus [Pa]
dx_s_id=2, # ID of marker in the solid domain
# FSI parameters
fsi_region=[0.123, 0.134, 0.063, 0.004], # x, y, and z coordinate of FSI region center,
# and radius of FSI region sphere
# Simulation parameters
folder="aneurysm_results", # Folder name generated for the simulation
mesh_path="mesh/file_aneurysm.h5",
FC_file="FC_MCA_10", # File name containing the fourier coefficients for the flow waveform
P_FC_File="FC_Pressure", # File name containing the fourier coefficients for the pressure waveform
compiler_parameters=_compiler_parameters, # Update the defaul values of the compiler arguments (FEniCS)
save_deg=2, # Degree of the functions saved for visualisation
scale_probe=True, # Scale the probe points to meters
))
return default_variables
[docs]
def get_mesh_domain_and_boundaries(mesh_path, fsi_region, fsi_id, rigid_id, outer_id, **namespace):
# Read mesh
mesh = Mesh()
hdf = HDF5File(mesh.mpi_comm(), mesh_path, "r")
hdf.read(mesh, "/mesh", False)
boundaries = MeshFunction("size_t", mesh, 2)
hdf.read(boundaries, "/boundaries")
domains = MeshFunction("size_t", mesh, 3)
hdf.read(domains, "/domains")
print_mesh_information(mesh)
# Only consider FSI in domain within this sphere
sph_x = fsi_region[0]
sph_y = fsi_region[1]
sph_z = fsi_region[2]
sph_rad = fsi_region[3]
i = 0
for submesh_facet in facets(mesh):
idx_facet = boundaries.array()[i]
if idx_facet == fsi_id or idx_facet == outer_id:
mid = submesh_facet.midpoint()
dist_sph_center = sqrt((mid.x() - sph_x) ** 2 + (mid.y() - sph_y) ** 2 + (mid.z() - sph_z) ** 2)
if dist_sph_center > sph_rad:
boundaries.array()[i] = rigid_id # changed "fsi" idx to "rigid wall" idx
i += 1
return mesh, domains, boundaries
[docs]
def create_bcs(t, DVP, mesh, boundaries, mu_f,
fsi_id, inlet_id, inlet_outlet_s_id,
rigid_id, psi, F_solid_linear, p_deg, FC_file,
Q_mean, P_FC_File, P_mean, T_Cycle, **namespace):
# Load fourier coefficients for the velocity and scale by flow rate
An, Bn = np.loadtxt(Path(__file__).parent / FC_file).T
# Convert to complex fourier coefficients
Cn = (An - Bn * 1j) * Q_mean
_, tmp_center, tmp_radius, tmp_normal = compute_boundary_geometry_acrn(mesh, inlet_id, boundaries)
# Create Womersley boundary condition at inlet
tmp_element = DVP.sub(1).sub(0).ufl_element()
inlet = make_womersley_bcs(T_Cycle, None, mu_f, tmp_center, tmp_radius, tmp_normal, tmp_element, Cn=Cn)
# Initialize inlet expressions with initial time
for uc in inlet:
uc.set_t(t)
# Create Boundary conditions for the velocity
u_inlet = [DirichletBC(DVP.sub(1).sub(i), inlet[i], boundaries, inlet_id) for i in range(3)]
u_inlet_s = DirichletBC(DVP.sub(1), ((0.0, 0.0, 0.0)), boundaries, inlet_outlet_s_id)
# Solid Displacement BCs
d_inlet = DirichletBC(DVP.sub(0), (0.0, 0.0, 0.0), boundaries, inlet_id)
d_inlet_s = DirichletBC(DVP.sub(0), (0.0, 0.0, 0.0), boundaries, inlet_outlet_s_id)
d_rigid = DirichletBC(DVP.sub(0), (0.0, 0.0, 0.0), boundaries, rigid_id)
# Assemble boundary conditions
bcs = u_inlet + [d_inlet, u_inlet_s, d_inlet_s, d_rigid]
# Load Fourier coefficients for the pressure
An_P, Bn_P = np.loadtxt(Path(__file__).parent / P_FC_File).T
# Apply pulsatile pressure at the fsi interface by modifying the variational form
n = FacetNormal(mesh)
dSS = Measure("dS", domain=mesh, subdomain_data=boundaries)
interface_pressure = InterfacePressure(t=0.0, t_ramp_start=0.0, t_ramp_end=0.2, An=An_P,
Bn=Bn_P, period=T_Cycle, P_mean=P_mean, degree=p_deg)
F_solid_linear += interface_pressure * inner(n('+'), psi('+')) * dSS(fsi_id)
# Create inlet subdomain for computing the flow rate inside post_solve
dsi = ds(inlet_id, domain=mesh, subdomain_data=boundaries)
inlet_area = assemble(1.0 * dsi)
return dict(bcs=bcs, inlet=inlet, interface_pressure=interface_pressure, F_solid_linear=F_solid_linear, n=n,
dsi=dsi, inlet_area=inlet_area)
[docs]
def initiate(mesh_path, scale_probe, mesh, v_deg, p_deg, **namespace):
probe_points = load_probe_points(mesh_path)
# In case the probe points are in mm, scale them to meters
if scale_probe:
probe_points = probe_points * 0.001
Vv = VectorFunctionSpace(mesh, "CG", v_deg)
V = FunctionSpace(mesh, "CG", p_deg)
d_mean = Function(Vv)
u_mean = Function(Vv)
p_mean = Function(V)
return dict(probe_points=probe_points, d_mean=d_mean, u_mean=u_mean, p_mean=p_mean)
[docs]
def pre_solve(t, inlet, interface_pressure, **namespace):
for uc in inlet:
# Update the time variable used for the inlet boundary condition
uc.set_t(t)
# Multiply by cosine function to ramp up smoothly over time interval 0-250 ms
if t < 0.25:
uc.scale_value = -0.5 * np.cos(np.pi * t / 0.25) + 0.5
else:
uc.scale_value = 1.0
# Update pressure condition
interface_pressure.update(t)
return dict(inlet=inlet, interface_pressure=interface_pressure)
[docs]
def post_solve(dvp_, n, dsi, dt, mesh, inlet_area, mu_f, rho_f, probe_points, t,
save_solution_after_tstep, d_mean, u_mean, p_mean, **namespace):
d = dvp_["n"].sub(0, deepcopy=True)
v = dvp_["n"].sub(1, deepcopy=True)
p = dvp_["n"].sub(2, deepcopy=True)
print_probe_points(v, p, probe_points)
calculate_and_print_flow_properties(dt, mesh, v, inlet_area, mu_f, rho_f, n, dsi)
if t >= save_solution_after_tstep * dt:
# Here, we accumulate the velocity filed in u_mean
d_mean.vector().axpy(1, d.vector())
u_mean.vector().axpy(1, v.vector())
p_mean.vector().axpy(1, p.vector())
return dict(u_mean=u_mean, d_mean=d_mean, p_mean=p_mean)
else:
return None
[docs]
def finished(d_mean, u_mean, p_mean, visualization_folder, save_solution_after_tstep, T, dt, **namespace):
# Divide the accumulated vectors by the number of time steps
num_steps = T / dt - save_solution_after_tstep + 1
for data in [d_mean, u_mean, p_mean]:
data.vector()[:] = data.vector()[:] / num_steps
# Save u_mean as a XDMF file using the checkpoint
data_names = [
(d_mean, "d_mean.xdmf"),
(u_mean, "u_mean.xdmf"),
(p_mean, "p_mean.xdmf")
]
for vector, data_name in data_names:
file_path = Path(visualization_folder) / data_name
with XDMFFile(MPI.comm_world, str(file_path)) as f:
f.write_checkpoint(vector, data_name, 0, XDMFFile.Encoding.HDF5)