# Written by Mikael Mortensen <mikaem@math.uio.no> (2013)
# Edited by Henrik Kjeldsberg <henrik.kjeldsberg@live.no> (2023)
import subprocess
from collections import defaultdict
from os import getpid, path
import numpy as np
from dolfin import *
[docs]def getMemoryUsage(rss=True):
mypid = str(getpid())
rss = "rss" if rss else "vsz"
process = subprocess.Popen(["ps", "-o", rss, mypid], stdout=subprocess.PIPE)
out, _ = process.communicate()
mymemory = out.split()[1]
return eval(mymemory) / 1024
parameters["linear_algebra_backend"] = "PETSc"
parameters["form_compiler"]["optimize"] = True
parameters["form_compiler"]["cpp_optimize"] = True
parameters["form_compiler"]["representation"] = "uflacs"
parameters["form_compiler"]["cpp_optimize_flags"] = "-O3"
# Default parameters for all solvers
NS_parameters = dict(
nu=0.01, # Kinematic viscosity
folder="results", # Relative folder for storing results
velocity_degree=2, # default velocity degree
pressure_degree=1, # default pressure degree
)
NS_expressions = {}
constrained_domain = None
# To solve for scalars provide a list like ['scalar1', 'scalar2']
scalar_components = []
# With diffusivities given as a Schmidt number defined by:
# Schmidt = nu / D (= momentum diffusivity / mass diffusivity)
Schmidt = defaultdict(lambda: 1.0)
Schmidt_T = defaultdict(lambda: 0.7) # Turbulent Schmidt number (LES)
Scalar = defaultdict(lambda: dict(Schmidt=1.0, family="CG", degree=1))
# The following helper functions are available in dolfin
# They are redefined here for printing only on process 0.
RED = "\033[1;37;31m%s\033[0m"
BLUE = "\033[1;37;34m%s\033[0m"
GREEN = "\033[1;37;32m%s\033[0m"
[docs]def info_blue(s, check=True):
if MPI.rank(MPI.comm_world) == 0 and check:
print(BLUE % s)
[docs]def info_green(s, check=True):
if MPI.rank(MPI.comm_world) == 0 and check:
print(GREEN % s)
[docs]def info_red(s, check=True):
if MPI.rank(MPI.comm_world) == 0 and check:
print(RED % s)
[docs]class OasisTimer(Timer):
def __init__(self, task, verbose=False):
Timer.__init__(self, task)
info_blue(task, verbose)
[docs]class OasisMemoryUsage:
def __init__(self, s):
self.memory = 0
self.memory_vm = 0
self(s)
def __call__(self, s, verbose=True):
self.prev = self.memory
self.prev_vm = self.memory_vm
self.memory = MPI.sum(MPI.comm_world, getMemoryUsage())
self.memory_vm = MPI.sum(MPI.comm_world, getMemoryUsage(False))
if MPI.rank(MPI.comm_world) == 0 and verbose:
info_blue(
"{0:26s} {1:10d} MB {2:10d} MB {3:10d} MB {4:10d} MB".format(
s,
int(self.memory - self.prev),
int(self.memory),
int(self.memory_vm - self.prev_vm),
int(self.memory_vm),
)
)
# Print memory use up til now
initial_memory_use = getMemoryUsage()
oasis_memory = OasisMemoryUsage("Start")
# Convenience functions
[docs]def strain(u):
return 0.5 * (grad(u) + grad(u).T)
[docs]def omega(u):
return 0.5 * (grad(u) - grad(u).T)
[docs]def Omega(u):
return inner(omega(u), omega(u))
[docs]def Strain(u):
return inner(strain(u), strain(u))
[docs]def QC(u):
return Omega(u) - Strain(u)
[docs]def compute_flow_quantities(
u,
L,
nu,
mesh,
t,
tstep,
dt,
h,
outlet_area=1,
boundary=None,
outlet_ids=[],
inlet_ids=[],
id_wall=0,
period=1.0,
newfolder=None,
dynamic_mesh=False,
write_to_file=False,
):
"""
Compute max and mean Reynolds numbers, CFL numbers, and fluxes through boundaries.
Args:
u (Function): Velocity field.
L (float): Characteristic length.
nu (float): Kinematic viscosity of fluid.
mesh (Mesh): Computational mesh.
t (float): Current time.
tstep (int): Current time step.
dt (float): Time step size.
h (float): Mesh element size.
outlet_area (float): Area of the outlet, default is 1.
boundary (MeshFunction): Boundary mesh function, default is None.
outlet_ids (list of int): List of outlet boundary IDs, default is empty list.
inlet_ids (list of int): List of inlet boundary IDs, default is empty list.
id_wall (int): Wall boundary ID, default is 0.
period (float): Period of oscillation, default is 1.0.
newfolder (str): Directory for output files, default is None.
dynamic_mesh (bool): Flag for dynamic mesh, default is False.
write_to_file (bool): Flag to write results to file, default is False.
"""
# Compute boundary flow rates
flux_in = []
flux_out = []
flux_wall = []
re_outlet = 0
if boundary is not None:
ds = Measure("ds", subdomain_data=boundary)
n = FacetNormal(mesh)
u_dot_n = dot(u, n)
flux_wall.append(assemble(u_dot_n * ds(id_wall)))
for inlet_id in inlet_ids:
flux_in.append(assemble(u_dot_n * ds(inlet_id)))
for outlet_id in outlet_ids:
flux_out.append(assemble(u_dot_n * ds(outlet_id)))
u_mean = flux_out[0] / outlet_area
re_outlet = u_mean * L / nu
# Compute Womersley number
omega = 2 * np.pi / period
D_outlet = np.sqrt(4 * outlet_area / np.pi)
womersley_number = D_outlet * np.sqrt(omega / nu)
# Recompute edge length if mesh has moved
if dynamic_mesh:
h = mesh.hmin()
# Compute velocity used to estimate max and mean velocity, CFL and Reynolds number
DG = FunctionSpace(mesh, "DG", 0)
U = project(sqrt(inner(u, u)), DG)
local_U = U.vector().get_local()
U_array = MPI.comm_world.gather(local_U, 0)
if MPI.rank(MPI.comm_world) == 0:
# Gather velocity arrays and compute max and mean velocity, CFL and Reynolds number
U_gathered = np.concatenate(U_array)
U_mean = np.mean(U_gathered)
U_max = np.max(U_gathered)
cfl_mean = U_mean * dt / h
cfl_max = U_max * dt / h
re_mean = U_mean * L / nu
re_max = U_max * L / nu
info_green(
"Time = {0:2.4e}, timestep = {1:6d}, max Reynolds number={2:2.3f}, mean Reynolds number={3:2.3f}, outlet Reynolds number={4:2.3f}, Womersley number={5:2.3f}, max velocity={6:2.3f}, mean velocity={7:2.3f}, max CFL={8:2.3f}, mean CFL={9:2.3f}".format(
t,
tstep,
re_max,
re_mean,
re_outlet,
womersley_number,
U_max,
U_mean,
cfl_max,
cfl_mean,
)
)
if write_to_file:
data = (
[
t,
tstep,
re_max,
re_mean,
re_outlet,
womersley_number,
U_max,
U_mean,
cfl_max,
cfl_mean,
]
+ flux_in
+ flux_out
+ flux_wall
)
write_data_to_file(newfolder, data, "flow_metrics.txt")
[docs]def write_data_to_file(save_path, data, filename):
data_path = path.join(save_path, filename)
with open(data_path, "ab") as f:
np.savetxt(f, data, fmt=" %.16f ", newline=" ")
f.write(b"\n")
[docs]def recursive_update(dst, src):
"""Update dict dst with items from src deeply ("deep update")."""
for key, val in src.items():
if key in dst and isinstance(val, dict) and isinstance(dst[key], dict):
dst[key] = recursive_update(dst[key], val)
else:
dst[key] = val
return dst
[docs]class OasisXDMFFile(XDMFFile, object):
def __init__(self, comm, filename):
XDMFFile.__init__(self, comm, filename)
[docs]def add_function_to_tstepfiles(function, newfolder, tstepfiles, tstep):
name = function.name()
tstepfolder = path.join(newfolder, "Timeseries")
tstepfiles[name] = OasisXDMFFile(
MPI.comm_world,
path.join(tstepfolder, "{}_from_tstep_{}.xdmf".format(name, tstep)),
)
tstepfiles[name].function = function
tstepfiles[name].parameters["rewrite_function_mesh"] = False
[docs]def body_force(mesh, **NS_namespace):
"""Specify body force"""
return Constant((0,) * mesh.geometry().dim())
[docs]def initialize(**NS_namespace):
"""Initialize solution."""
pass
[docs]def create_bcs(sys_comp, **NS_namespace):
"""Return dictionary of Dirichlet boundary conditions."""
return dict((ui, []) for ui in sys_comp)
[docs]def scalar_hook(**NS_namespace):
"""Called prior to scalar solve."""
pass
[docs]def scalar_source(scalar_components, **NS_namespace):
"""Return a dictionary of scalar sources."""
return dict((ci, Constant(0)) for ci in scalar_components)
[docs]def pre_solve_hook(**NS_namespace):
"""Called just prior to entering time-loop. Must return a dictionary."""
return {}
[docs]def theend_hook(**NS_namespace):
"""Called at the very end."""
pass
[docs]def problem_parameters(**NS_namespace):
"""Updates problem specific parameters, and handles restart"""
pass
[docs]def post_import_problem(
NS_parameters, mesh, commandline_kwargs, NS_expressions, **NS_namespace
):
"""Called after importing from problem."""
dynamic_mesh = NS_parameters["dynamic_mesh"]
restart_folder = NS_parameters["restart_folder"]
# Update NS_parameters with all parameters modified through command line
for key, val in commandline_kwargs.items():
if isinstance(val, dict):
NS_parameters[key].update(val)
else:
NS_parameters[key] = val
# If the mesh is a callable function, then create the mesh here.
if callable(mesh):
mesh = mesh(**NS_parameters)
# Load mesh if restarting simulation
boundary = None
if restart_folder is not None:
# Get mesh information
mesh = Mesh()
filename = path.join(restart_folder, "mesh.h5")
with HDF5File(MPI.comm_world, filename, "r") as f:
f.read(mesh, "mesh", False)
boundary = MeshFunction(
"size_t", mesh, mesh.geometry().dim() - 1, mesh.domains()
)
f.read(boundary, "boundary")
assert isinstance(mesh, Mesh)
# Returned dictionary to be updated in the NS namespace
d = dict(mesh=mesh, boundary=boundary)
d.update(NS_parameters)
d.update(NS_expressions)
return d
[docs]def u_dot_n(u, n):
return (dot(u, n) - abs(dot(u, n))) / 2
[docs]def compute_volume(mesh, t, newfolder):
volume = assemble(Constant(1.0) * dx(mesh))
data = [t, volume]
write_data_to_file(newfolder, data, "volume.txt")