Source code for oasismove.problems.__init__

# Written by Mikael Mortensen <mikaem@math.uio.no> (2013)
# Edited by Henrik Kjeldsberg <henrik.kjeldsberg@live.no> (2023)

import subprocess
from collections import defaultdict
from os import getpid, path

import numpy as np
from dolfin import *


[docs]def getMemoryUsage(rss=True): mypid = str(getpid()) rss = "rss" if rss else "vsz" process = subprocess.Popen(["ps", "-o", rss, mypid], stdout=subprocess.PIPE) out, _ = process.communicate() mymemory = out.split()[1] return eval(mymemory) / 1024
parameters["linear_algebra_backend"] = "PETSc" parameters["form_compiler"]["optimize"] = True parameters["form_compiler"]["cpp_optimize"] = True parameters["form_compiler"]["representation"] = "uflacs" parameters["form_compiler"]["cpp_optimize_flags"] = "-O3" # Default parameters for all solvers NS_parameters = dict( nu=0.01, # Kinematic viscosity folder="results", # Relative folder for storing results velocity_degree=2, # default velocity degree pressure_degree=1, # default pressure degree ) NS_expressions = {} constrained_domain = None # To solve for scalars provide a list like ['scalar1', 'scalar2'] scalar_components = [] # With diffusivities given as a Schmidt number defined by: # Schmidt = nu / D (= momentum diffusivity / mass diffusivity) Schmidt = defaultdict(lambda: 1.0) Schmidt_T = defaultdict(lambda: 0.7) # Turbulent Schmidt number (LES) Scalar = defaultdict(lambda: dict(Schmidt=1.0, family="CG", degree=1)) # The following helper functions are available in dolfin # They are redefined here for printing only on process 0. RED = "\033[1;37;31m%s\033[0m" BLUE = "\033[1;37;34m%s\033[0m" GREEN = "\033[1;37;32m%s\033[0m"
[docs]def info_blue(s, check=True): if MPI.rank(MPI.comm_world) == 0 and check: print(BLUE % s)
[docs]def info_green(s, check=True): if MPI.rank(MPI.comm_world) == 0 and check: print(GREEN % s)
[docs]def info_red(s, check=True): if MPI.rank(MPI.comm_world) == 0 and check: print(RED % s)
[docs]class OasisTimer(Timer): def __init__(self, task, verbose=False): Timer.__init__(self, task) info_blue(task, verbose)
[docs]class OasisMemoryUsage: def __init__(self, s): self.memory = 0 self.memory_vm = 0 self(s) def __call__(self, s, verbose=True): self.prev = self.memory self.prev_vm = self.memory_vm self.memory = MPI.sum(MPI.comm_world, getMemoryUsage()) self.memory_vm = MPI.sum(MPI.comm_world, getMemoryUsage(False)) if MPI.rank(MPI.comm_world) == 0 and verbose: info_blue( "{0:26s} {1:10d} MB {2:10d} MB {3:10d} MB {4:10d} MB".format( s, int(self.memory - self.prev), int(self.memory), int(self.memory_vm - self.prev_vm), int(self.memory_vm), ) )
# Print memory use up til now initial_memory_use = getMemoryUsage() oasis_memory = OasisMemoryUsage("Start") # Convenience functions
[docs]def strain(u): return 0.5 * (grad(u) + grad(u).T)
[docs]def omega(u): return 0.5 * (grad(u) - grad(u).T)
[docs]def Omega(u): return inner(omega(u), omega(u))
[docs]def Strain(u): return inner(strain(u), strain(u))
[docs]def QC(u): return Omega(u) - Strain(u)
[docs]def compute_flow_quantities( u, L, nu, mesh, t, tstep, dt, h, outlet_area=1, boundary=None, outlet_ids=[], inlet_ids=[], id_wall=0, period=1.0, newfolder=None, dynamic_mesh=False, write_to_file=False, ): """ Compute max and mean Reynolds numbers, CFL numbers, and fluxes through boundaries. Args: u (Function): Velocity field. L (float): Characteristic length. nu (float): Kinematic viscosity of fluid. mesh (Mesh): Computational mesh. t (float): Current time. tstep (int): Current time step. dt (float): Time step size. h (float): Mesh element size. outlet_area (float): Area of the outlet, default is 1. boundary (MeshFunction): Boundary mesh function, default is None. outlet_ids (list of int): List of outlet boundary IDs, default is empty list. inlet_ids (list of int): List of inlet boundary IDs, default is empty list. id_wall (int): Wall boundary ID, default is 0. period (float): Period of oscillation, default is 1.0. newfolder (str): Directory for output files, default is None. dynamic_mesh (bool): Flag for dynamic mesh, default is False. write_to_file (bool): Flag to write results to file, default is False. """ # Compute boundary flow rates flux_in = [] flux_out = [] flux_wall = [] re_outlet = 0 if boundary is not None: ds = Measure("ds", subdomain_data=boundary) n = FacetNormal(mesh) u_dot_n = dot(u, n) flux_wall.append(assemble(u_dot_n * ds(id_wall))) for inlet_id in inlet_ids: flux_in.append(assemble(u_dot_n * ds(inlet_id))) for outlet_id in outlet_ids: flux_out.append(assemble(u_dot_n * ds(outlet_id))) u_mean = flux_out[0] / outlet_area re_outlet = u_mean * L / nu # Compute Womersley number omega = 2 * np.pi / period D_outlet = np.sqrt(4 * outlet_area / np.pi) womersley_number = D_outlet * np.sqrt(omega / nu) # Recompute edge length if mesh has moved if dynamic_mesh: h = mesh.hmin() # Compute velocity used to estimate max and mean velocity, CFL and Reynolds number DG = FunctionSpace(mesh, "DG", 0) U = project(sqrt(inner(u, u)), DG) local_U = U.vector().get_local() U_array = MPI.comm_world.gather(local_U, 0) if MPI.rank(MPI.comm_world) == 0: # Gather velocity arrays and compute max and mean velocity, CFL and Reynolds number U_gathered = np.concatenate(U_array) U_mean = np.mean(U_gathered) U_max = np.max(U_gathered) cfl_mean = U_mean * dt / h cfl_max = U_max * dt / h re_mean = U_mean * L / nu re_max = U_max * L / nu info_green( "Time = {0:2.4e}, timestep = {1:6d}, max Reynolds number={2:2.3f}, mean Reynolds number={3:2.3f}, outlet Reynolds number={4:2.3f}, Womersley number={5:2.3f}, max velocity={6:2.3f}, mean velocity={7:2.3f}, max CFL={8:2.3f}, mean CFL={9:2.3f}".format( t, tstep, re_max, re_mean, re_outlet, womersley_number, U_max, U_mean, cfl_max, cfl_mean, ) ) if write_to_file: data = ( [ t, tstep, re_max, re_mean, re_outlet, womersley_number, U_max, U_mean, cfl_max, cfl_mean, ] + flux_in + flux_out + flux_wall ) write_data_to_file(newfolder, data, "flow_metrics.txt")
[docs]def write_data_to_file(save_path, data, filename): data_path = path.join(save_path, filename) with open(data_path, "ab") as f: np.savetxt(f, data, fmt=" %.16f ", newline=" ") f.write(b"\n")
[docs]def recursive_update(dst, src): """Update dict dst with items from src deeply ("deep update").""" for key, val in src.items(): if key in dst and isinstance(val, dict) and isinstance(dst[key], dict): dst[key] = recursive_update(dst[key], val) else: dst[key] = val return dst
[docs]class OasisXDMFFile(XDMFFile, object): def __init__(self, comm, filename): XDMFFile.__init__(self, comm, filename)
[docs]def add_function_to_tstepfiles(function, newfolder, tstepfiles, tstep): name = function.name() tstepfolder = path.join(newfolder, "Timeseries") tstepfiles[name] = OasisXDMFFile( MPI.comm_world, path.join(tstepfolder, "{}_from_tstep_{}.xdmf".format(name, tstep)), ) tstepfiles[name].function = function tstepfiles[name].parameters["rewrite_function_mesh"] = False
[docs]def body_force(mesh, **NS_namespace): """Specify body force""" return Constant((0,) * mesh.geometry().dim())
[docs]def initialize(**NS_namespace): """Initialize solution.""" pass
[docs]def create_bcs(sys_comp, **NS_namespace): """Return dictionary of Dirichlet boundary conditions.""" return dict((ui, []) for ui in sys_comp)
[docs]def scalar_hook(**NS_namespace): """Called prior to scalar solve.""" pass
[docs]def scalar_source(scalar_components, **NS_namespace): """Return a dictionary of scalar sources.""" return dict((ci, Constant(0)) for ci in scalar_components)
[docs]def pre_solve_hook(**NS_namespace): """Called just prior to entering time-loop. Must return a dictionary.""" return {}
[docs]def theend_hook(**NS_namespace): """Called at the very end.""" pass
[docs]def problem_parameters(**NS_namespace): """Updates problem specific parameters, and handles restart""" pass
[docs]def post_import_problem( NS_parameters, mesh, commandline_kwargs, NS_expressions, **NS_namespace ): """Called after importing from problem.""" dynamic_mesh = NS_parameters["dynamic_mesh"] restart_folder = NS_parameters["restart_folder"] # Update NS_parameters with all parameters modified through command line for key, val in commandline_kwargs.items(): if isinstance(val, dict): NS_parameters[key].update(val) else: NS_parameters[key] = val # If the mesh is a callable function, then create the mesh here. if callable(mesh): mesh = mesh(**NS_parameters) # Load mesh if restarting simulation boundary = None if restart_folder is not None: # Get mesh information mesh = Mesh() filename = path.join(restart_folder, "mesh.h5") with HDF5File(MPI.comm_world, filename, "r") as f: f.read(mesh, "mesh", False) boundary = MeshFunction( "size_t", mesh, mesh.geometry().dim() - 1, mesh.domains() ) f.read(boundary, "boundary") assert isinstance(mesh, Mesh) # Returned dictionary to be updated in the NS namespace d = dict(mesh=mesh, boundary=boundary) d.update(NS_parameters) d.update(NS_expressions) return d
[docs]def u_dot_n(u, n): return (dot(u, n) - abs(dot(u, n))) / 2
[docs]def compute_volume(mesh, t, newfolder): volume = assemble(Constant(1.0) * dx(mesh)) data = [t, volume] write_data_to_file(newfolder, data, "volume.txt")