"""
Problem file for AVF FSI simulation
"""
import numpy as np
from turtleFSI.problems import *
from dolfin import HDF5File, Mesh, MeshFunction, facets, UserExpression, FacetNormal, ds, \
DirichletBC, Measure, inner, parameters, assemble, Constant, SpatialCoordinate
from vasp.simulations.simulation_common import load_probe_points, calculate_and_print_flow_properties, \
print_probe_points
# set compiler arguments
parameters["form_compiler"]["quadrature_degree"] = 6
parameters["form_compiler"]["optimize"] = True
# The "ghost_mode" has to do with the assembly of form containing the facet
# normals n('+') within interior boundaries (dS). for 3D mesh the value should
# be "shared_vertex", for 2D mesh "shared_facet", the default value is "none"
parameters["ghost_mode"] = "shared_vertex"
_compiler_parameters = dict(parameters["form_compiler"])
[docs]
def set_problem_parameters(default_variables, **namespace):
# Compute some solid parameters
# Need to stay here due to arithmetics
E_s_val_artery = 1E6 # artery Young modulus (elasticity) [Pa]
E_s_val_vein = 1E6 # vein Young modulus (elasticity) [Pa]
nu_s_val = 0.45 # Poisson ratio (compressibility)
mu_s_val_artery = E_s_val_artery / (2 * (1 + nu_s_val)) # artery Shear modulus
mu_s_val_vein = E_s_val_vein / (2 * (1 + nu_s_val)) # vein Shear modulus
lambda_s_val_artery = nu_s_val * 2. * mu_s_val_artery / (1. - 2. * nu_s_val) # artery Solid 1rst Lamé coef. [Pa]
lambda_s_val_vein = nu_s_val * 2. * mu_s_val_vein / (1. - 2. * nu_s_val) # vein Solid 1rst Lamé coef. [Pa]
default_variables.update(dict(
# Temporal parameters
T=3, # Simulation end time (3 cardiac cycles)
dt=0.0001, # Time step size
theta=0.501, # Theta scheme (implicit/explicit time stepping)
save_step=1, # Save frequency of files for visualisation
checkpoint_step=500, # checkpoint frequency
# Linear solver parameters
linear_solver="mumps",
atol=1e-7, # Absolute tolerance in the Newton solver
rtol=1e-7, # Relative tolerance in the Newton solver
recompute=30, # Recompute the Jacobian matix within time steps
recompute_tstep=10, # Number of time steps before recompute Jacobian
# boundary condition parameters
inlet_id1=3, # inlet 1 id (PA)
inlet_id2=2, # inlet 2 id (DA)
outlet_id1=4, # outlet id (V)
rigid_id=[11, 1011], # "rigid wall" id for the fluid and mesh problem
fsi_id=[22, 1022], # fsi interface
outlet_s_id=44, # solid outlet id
outer_id=[33, 1033], # outer wall surface
ds_s_id=[33, 1033], # ID of solid external boundary(for Robin BC)
vel_t_ramp=0.2, # time for velocity ramp
p_t_ramp_start=0.05, # pressure ramp start time
p_t_ramp_end=0.2, # pressure ramp end time
# Fluid parameters
rho_f=1.025E3, # Fluid density [kg/m3]
mu_f=3.5E-3, # Fluid dynamic viscosity [Pa.s]
dx_f_id=1, # ID of marker in the fluid domain
# mesh lifting parameters (see turtleFSI for options)
extrapolation="laplace", # laplace, elastic, biharmonic, no-extrapolation
extrapolation_sub_type="constant", # constant, small_constant, volume, volume_change, bc1, bc2
# Solid parameters
rho_s=[1.0E3, 1.0E3], # Solid density [kg/m3]
mu_s=[mu_s_val_artery, mu_s_val_vein], # Solid shear modulus or 2nd Lame Coef. [Pa]
nu_s=nu_s_val, # Solid Poisson ratio [-]
lambda_s=[lambda_s_val_artery, lambda_s_val_vein], # Solid 1rst Lamé coef. [Pa]
material_model="MooneyRivlin", # Material model
dx_s_id=[2, 1002], # ID of marker in the solid domain
solid_properties=[{"dx_s_id": 2, "material_model": "MooneyRivlin", "rho_s": 1.0E3, "mu_s": mu_s_val_artery,
"lambda_s": lambda_s_val_artery, "C01": 0.03e6, "C10": 0.0, "C11": 2.2e6},
{"dx_s_id": 1002, "material_model": "MooneyRivlin", "rho_s": 1.0E3, "mu_s": mu_s_val_vein,
"lambda_s": lambda_s_val_vein, "C01": 0.003e6, "C10": 0.0, "C11": 0.538e6}],
# Robin BC parameters
robin_bc=True, # Robin BC
k_s=1E5, # elastic response necesary for RobinBC
c_s=1E1, # viscoelastic response necesary for RobinBC
# FSI parameters
fsi_region=[0.33642, 0.0873934, 0.0369964, 0.002], # [x, y, z, radius]
# Simulation parameters
mesh_path="mesh/avf.h5",
patient_data_path="avf.csv",
folder="avf_results", # Folder where the results will be stored
compiler_parameters=_compiler_parameters, # Update the defaul values of the compiler arguments (FEniCS)
save_deg=2, # Degree of the functions saved for visualisation
scale_probe=True, # Scale the probe points to meters
))
return default_variables
[docs]
def get_mesh_domain_and_boundaries(mesh_path, fsi_region, fsi_id, rigid_id, outer_id, **namespace):
# Import mesh file
mesh = Mesh()
hdf = HDF5File(mesh.mpi_comm(), mesh_path, "r")
hdf.read(mesh, "/mesh", False)
boundaries = MeshFunction("size_t", mesh, 2)
hdf.read(boundaries, "/boundaries")
domains = MeshFunction("size_t", mesh, 3)
hdf.read(domains, "/domains")
# Only consider FSI in domain within this sphere
sph_x = fsi_region[0]
sph_y = fsi_region[1]
sph_z = fsi_region[2]
sph_rad = fsi_region[3]
i = 0
for submesh_facet in facets(mesh):
idx_facet = boundaries.array()[i]
if idx_facet == fsi_id[0]:
mid = submesh_facet.midpoint()
dist_sph_center = np.sqrt((mid.x() - sph_x) ** 2 + (mid.y() - sph_y) ** 2 + (mid.z() - sph_z) ** 2)
if dist_sph_center > sph_rad:
boundaries.array()[i] = rigid_id[0] # changed "fsi" idx to "rigid wall" idx
if idx_facet == fsi_id[1]:
mid = submesh_facet.midpoint()
dist_sph_center = np.sqrt((mid.x() - sph_x) ** 2 + (mid.y() - sph_y) ** 2 + (mid.z() - sph_z) ** 2)
if dist_sph_center > sph_rad:
boundaries.array()[i] = rigid_id[1] # changed "fsi" idx to "rigid wall" idx
if idx_facet == outer_id[0]:
mid = submesh_facet.midpoint()
dist_sph_center = np.sqrt((mid.x() - sph_x) ** 2 + (mid.y() - sph_y) ** 2 + (mid.z() - sph_z) ** 2)
if dist_sph_center > sph_rad:
boundaries.array()[i] = rigid_id[0] # changed "outer" idx to "rigid wall" idx
if idx_facet == outer_id[1]:
mid = submesh_facet.midpoint()
dist_sph_center = np.sqrt((mid.x() - sph_x) ** 2 + (mid.y() - sph_y) ** 2 + (mid.z() - sph_z) ** 2)
if dist_sph_center > sph_rad:
boundaries.array()[i] = rigid_id[1] # changed "outer" idx to "rigid wall" idx
i += 1
return mesh, domains, boundaries
# Define velocity inlet parabolic profile
[docs]
class VelInPara(UserExpression):
def __init__(self, t, dt, vel_t_ramp, n, dsi, mesh, interp_velocity, **kwargs):
self.t = t
self.dt = dt
self.t_ramp = vel_t_ramp
self.interp_velocity = interp_velocity
self.number = int(self.t / self.dt)
self.n = n # normal direction
self.dsi = dsi # surface integral element
self.d = mesh.geometry().dim()
self.x = SpatialCoordinate(mesh)
# Compute area of boundary tesselation by integrating 1.0 over all facets
self.A = assemble(Constant(1.0, name="one") * self.dsi)
# Compute barycenter by integrating x components over all facets
self.c = [assemble(self.x[i] * self.dsi) / self.A for i in range(self.d)]
# Compute radius by taking max radius of boundary points
self.r = np.sqrt(self.A / np.pi)
super().__init__(**kwargs)
[docs]
def update(self, t):
self.t = t
if self.number + 1 < len(self.interp_velocity):
self.number = int(self.t / self.dt)
[docs]
def eval(self, value, x):
# Define the parabola
r2 = (x[0] - self.c[0]) ** 2 + (x[1] - self.c[1]) ** 2 + (x[2] - self.c[2]) ** 2 # radius**2
fact_r = 1 - (r2 / self.r ** 2)
# Define the velocity ramp with sigmoid
if (self.t < self.t_ramp) and (self.t_ramp > 0.0):
fact = self.interp_velocity[self.number] * (-0.5 * np.cos((np.pi / (self.t_ramp)) * (self.t)) + 0.5)
value[0] = -self.n[0] * fact_r * fact
value[1] = -self.n[1] * fact_r * fact
value[2] = -self.n[2] * fact_r * fact
else:
value[0] = -self.n[0] * (self.interp_velocity[self.number]) * fact_r
value[1] = -self.n[1] * (self.interp_velocity[self.number]) * fact_r
value[2] = -self.n[2] * (self.interp_velocity[self.number]) * fact_r
[docs]
def value_shape(self):
return (3,)
# Define the pressure profile
[docs]
class InnerP(UserExpression):
def __init__(self, t, dt, p_t_ramp_start, p_t_ramp_end, interp_P, **kwargs):
self.t = t
self.dt = dt
self.interp_P = interp_P
self.number = int(self.t / self.dt)
self.p_t_ramp_start = p_t_ramp_start
self.p_t_ramp_end = p_t_ramp_end
super().__init__(**kwargs)
[docs]
def update(self, t):
self.t = t
if self.number + 1 < len(self.interp_P):
self.number = int(self.t / self.dt)
[docs]
def eval(self, value, x):
# Define the pressure ramp with sigmoid
if self.t < self.p_t_ramp_start:
value[0] = 0.0
elif self.t < self.p_t_ramp_end:
value[0] = self.interp_P[self.number] * (-0.5 * np.cos((np.pi / (self.p_t_ramp_end - self.p_t_ramp_start))
* (self.t - self.p_t_ramp_start)) + 0.5)
else:
value[0] = self.interp_P[self.number]
[docs]
def value_shape(self):
return ()
# Define boundary conditions
[docs]
def create_bcs(DVP, mesh, boundaries, T, dt, fsi_id, inlet_id1, inlet_id2, rigid_id, psi, F_solid_linear,
vel_t_ramp, p_t_ramp_start, p_t_ramp_end, p_deg, v_deg, patient_data_path, **namespace):
if MPI.rank(MPI.comm_world) == 0:
print("Create bcs")
# Fluid velocity BCs
dsi1 = ds(inlet_id1, domain=mesh, subdomain_data=boundaries)
dsi2 = ds(inlet_id2, domain=mesh, subdomain_data=boundaries)
n = FacetNormal(mesh)
ndim = mesh.geometry().dim()
ni1 = np.array([assemble(n[i] * dsi1) for i in range(ndim)])
ni2 = np.array([assemble(n[i] * dsi2) for i in range(ndim)])
n_len1 = np.sqrt(sum([ni1[i] ** 2 for i in range(ndim)]))
n_len2 = np.sqrt(sum([ni2[i] ** 2 for i in range(ndim)]))
normal1 = ni1 / n_len1
normal2 = ni2 / n_len2
# Get patient specific velocity and pressure data from csv file
# We assume that the data is in the following order: PA, DA, PV in the csv file
# Also we assume that the data has first row as header, so we skip it
patient_data = np.loadtxt(patient_data_path, skiprows=1, delimiter=",", usecols=(0, 1, 2))
v_PA = patient_data[:, 0]
v_DA = patient_data[:, 1]
PV = patient_data[:, 2]
len_v = len(v_PA)
t_v = np.arange(len(v_PA))
num_t = int(T / dt) # 30.000 timesteps = 3s (T) / 0.0001s (dt)
tnew = np.linspace(0, len_v, num=num_t)
interp_DA = np.array(np.interp(tnew, t_v, v_DA))
interp_PA = np.array(np.interp(tnew, t_v, v_PA))
# pressure interpolation (velocity and pressure waveforms must be syncronized)
interp_P = np.array(np.interp(tnew, t_v, PV))
# Create Parabolic profile for Proximal Artey (PA) and Distal Artey (DA)
u_inflow_exp1 = VelInPara(t=0.0, dt=dt, vel_t_ramp=vel_t_ramp, n=normal1, dsi=dsi1, mesh=mesh,
interp_velocity=interp_PA, degree=v_deg)
u_inflow_exp2 = VelInPara(t=0.0, dt=dt, vel_t_ramp=vel_t_ramp, n=normal2, dsi=dsi2, mesh=mesh,
interp_velocity=interp_DA, degree=v_deg)
# Impose the pulsatile parabolic inlet velocity at the PA and DA
u_inlet1 = DirichletBC(DVP.sub(1), u_inflow_exp1, boundaries, inlet_id1)
u_inlet2 = DirichletBC(DVP.sub(1), u_inflow_exp2, boundaries, inlet_id2)
# Impose velocity 0 for rigid solid regions
u_inlet_s1 = DirichletBC(DVP.sub(1), ((0.0, 0.0, 0.0)), boundaries, rigid_id[0])
u_inlet_s2 = DirichletBC(DVP.sub(1), ((0.0, 0.0, 0.0)), boundaries, rigid_id[1])
# Solid Displacement BCs, fix the rigid solid regions
d_inlet1 = DirichletBC(DVP.sub(0), ((0.0, 0.0, 0.0)), boundaries, inlet_id1)
d_inlet2 = DirichletBC(DVP.sub(0), ((0.0, 0.0, 0.0)), boundaries, inlet_id2)
d_inlet_s1 = DirichletBC(DVP.sub(0), ((0.0, 0.0, 0.0)), boundaries, rigid_id[0])
d_inlet_s2 = DirichletBC(DVP.sub(0), ((0.0, 0.0, 0.0)), boundaries, rigid_id[1])
# Assign InnerP on the reference domain (FSI interface)
p_out_bc_val = InnerP(t=0.0, dt=dt, interp_P=interp_P, p_t_ramp_start=p_t_ramp_start,
p_t_ramp_end=p_t_ramp_end, degree=p_deg)
dSS = Measure("dS", domain=mesh, subdomain_data=boundaries)
n = FacetNormal(mesh)
F_solid_linear += p_out_bc_val * inner(n('+'), psi('+')) * dSS(fsi_id[0]) + p_out_bc_val * \
inner(n('+'), psi('+')) * dSS(fsi_id[1])
# Assemble Dirichlet boundary conditions
bcs = [u_inlet1, u_inlet2, u_inlet_s1, u_inlet_s2, d_inlet1, d_inlet2, d_inlet_s1, d_inlet_s2]
# Create inlet subdomain for computing the flow rate inside post_solve
inlet_area = assemble(1.0 * dsi1)
return dict(bcs=bcs, u_inflow_exp1=u_inflow_exp1, u_inflow_exp2=u_inflow_exp2, p_out_bc_val=p_out_bc_val,
F_solid_linear=F_solid_linear, n=n, inlet_area=inlet_area, dsi1=dsi1)
[docs]
def initiate(mesh_path, scale_probe, **namespace):
probe_points = load_probe_points(mesh_path)
# In case the probe points are in mm, scale them to meters
if scale_probe:
probe_points = probe_points * 0.001
return dict(probe_points=probe_points)
[docs]
def pre_solve(t, u_inflow_exp1, u_inflow_exp2, p_out_bc_val, **namespace):
# Update the time variable used for the inlet boundary condition
u_inflow_exp1.update(t)
u_inflow_exp2.update(t)
p_out_bc_val.update(t)
return dict(u_inflow_exp1=u_inflow_exp1, u_inflow_exp2=u_inflow_exp2, p_out_bc_val=p_out_bc_val)
[docs]
def post_solve(dvp_, n, dsi1, dt, mesh, inlet_area, mu_f, rho_f, probe_points, **namespace):
v = dvp_["n"].sub(1, deepcopy=True)
p = dvp_["n"].sub(2, deepcopy=True)
print_probe_points(v, p, probe_points)
calculate_and_print_flow_properties(dt, mesh, v, inlet_area, mu_f, rho_f, n, dsi1)