Source code for oasismove.problems.NSfracStep.MovingVortex

import pickle
from os import getcwd

from oasismove.problems.NSfracStep import *
from oasismove.problems.NSfracStep.MovingCommon import (
    get_coordinate_map,
    get_visualization_writers,
)


[docs]def problem_parameters( commandline_kwargs, NS_parameters, NS_expressions, **NS_namespace ): """ Problem file for running CFD simulation for the MovingVortex problem inspired by the problem by Fehn et al.[1], resembling the 2D Taylor-Green vortex. The problem solves the N-S equations in the absence of body forces, with a manufactured velocity solution. The mesh velocity is also described by an analytic displacement field, describing the oscillatory boundary movement. The movement is mainly controlled by the amplitude A0, and period length of the mesh motion T_G. [1] Fehn, N., Heinz, J., Wall, W. A., & Kronbichler, M. (2021). High-order arbitrary Lagrangian–Eulerian discontinuous Galerkin methods for the incompressible Navier–Stokes equations. Journal of Computational Physics, 430, 110040. """ if "restart_folder" in commandline_kwargs.keys(): restart_folder = commandline_kwargs["restart_folder"] restart_folder = path.join(getcwd(), restart_folder) f = open( path.join( path.dirname(path.abspath(__file__)), restart_folder, "params.dat" ), "rb", ) NS_parameters.update(pickle.load(f)) NS_parameters["restart_folder"] = restart_folder globals().update(NS_parameters) else: # Read final time from command line T = float(commandline_kwargs.get("T", 1)) NS_parameters.update( # Problem specific parameters A0=0.08, # Amplitude T_G=4 * T, # Period time L=1.0, # Dimension of domain Nx=20, # Resolution in x-direction Ny=20, # Resolution in y-direction # Fluid parameters nu=0.025, # Kinematic viscosity # Simulation parameters T=T, # Simulation time dt=0.05, # Time step folder="results_moving_vortex", # Oasis parameters max_iter=2, dynamic_mesh=True, save_step=1, save_solution_frequency=5, checkpoint=500, print_intermediate_info=100, compute_error=1, velocity_degree=1, pressure_degree=1, use_krylov_solvers=True, krylov_solvers={ "monitor_convergence": False, "report": False, "relative_tolerance": 1e-8, "absolute_tolerance": 1e-8, }, ) # Define analytical and initial state expressions NS_expressions.update( dict( exact_fields=dict( u0="-sin(2*pi*x[1])*exp(-4*pi*pi*nu*t)", u1=" sin(2*pi*x[0])*exp(-4*pi*pi*nu*t)", p=" -cos(2*pi*x[0])*cos(2*pi*x[1])*exp(-8*pi*pi*nu*t)", ), initial_fields=dict( u0="-sin(2*pi*x[1])*exp(-4*pi*pi*nu*t)", u1=" sin(2*pi*x[0])*exp(-4*pi*pi*nu*t)", p=" -cos(2*pi*x[0])*cos(2*pi*x[1])*exp(-8*pi*pi*nu*t)", ), initial_fields_w=dict( w0="A*2*pi / T_G * cos(2*pi*t/T_G)*sin(2*pi*(x[1] + L/2)/L)", w1="A*2*pi / T_G * cos(2*pi*t/T_G)*sin(2*pi*(x[0] + L/2)/L)", ), total_error=np.zeros(3), ) )
[docs]def mesh(Nx, Ny, L, dt, **params): # Define mesh mesh = RectangleMesh(Point(-L / 2, -L / 2), Point(L / 2, L / 2), Nx, Ny) print_mesh_information(mesh, dt, u_mean=1.0, dim=2) return mesh
[docs]def pre_boundary_condition(mesh, **NS_namespace): # Mark geometry boundary = MeshFunction("size_t", mesh, mesh.topology().dim() - 1) boundary.set_all(0) inlet = AutoSubDomain(lambda x, b: b) inlet.mark(boundary, 1) return dict(boundary=boundary)
[docs]def create_bcs(V, t, dt, nu, sys_comp, boundary, initial_fields, **NS_namespace): for i, ui in enumerate(sys_comp): if "IPCS" in NS_parameters["solver"]: deltat = dt / 2.0 if ui == "p" else 0.0 else: deltat = 0.0 ue = Expression((initial_fields[ui]), degree=4, t=t - deltat, nu=nu) NS_expressions["bc_%s" % ui] = ue bcs = dict((ui, []) for ui in sys_comp) bc0 = DirichletBC(V, NS_expressions["bc_u0"], boundary, 1) bc1 = DirichletBC(V, NS_expressions["bc_u1"], boundary, 1) bcs["u0"] = [bc0] bcs["u1"] = [bc1] bcs["p"] = [] return bcs
[docs]def initialize(q_, q_1, q_2, VV, t, nu, dt, initial_fields, **NS_namespace): """Initialize solution. Use t=dt/2 for pressure since pressure is computed in between timesteps. """ for ui in q_: if "IPCS" in NS_parameters["solver"]: deltat = dt / 2.0 if ui == "p" else 0.0 else: deltat = 0.0 vv = interpolate( Expression((initial_fields[ui]), degree=4, t=t - deltat, nu=nu), VV[ui] ) q_[ui].vector()[:] = vv.vector()[:] if not ui == "p": q_1[ui].vector()[:] = vv.vector()[:] deltat = -dt vv = interpolate( Expression((initial_fields[ui]), degree=4, t=t - deltat, nu=nu), VV[ui] ) q_2[ui].vector()[:] = vv.vector()[:] q_1["p"].vector()[:] = q_["p"].vector()[:]
[docs]class Walls(UserExpression): def __init__(self, t, coor, A, L, T_G, x_hat_map, counter_max, **kwargs): """ (User)Expression class for describing the wall motion. """ self.t = t self.map = x_hat_map self.max = counter_max self.coor = coor self.A = A self.L = L self.T_G = T_G self.counter = -1 super().__init__(**kwargs)
[docs] def update(self, t): self.t = t
[docs] def eval(self, values, _, **kwargs): self.counter += 1 N = (self.coor + 1) % 2 values[:] = ( 2 * np.pi / self.T_G * self.A * np.cos(2 * np.pi * self.t / self.T_G) * sin(2 * np.pi * (self.map[self.counter][N] + self.L / 2) / self.L) ) if self.counter == self.max: self.counter = -1
[docs]def pre_solve_hook( V, mesh, t, nu, L, w_, T_G, A0, newfolder, velocity_degree, u_components, boundary, exact_fields, **NS_namespace, ): # Create exact solution ue_x = Expression(exact_fields["u0"], nu=nu, t=t, degree=4) ue_y = Expression(exact_fields["u1"], nu=nu, t=t, degree=4) pe = Expression(exact_fields["p"], nu=nu, t=t, degree=4) # Visualization files viz_p, viz_u, viz_ue = get_visualization_writers( newfolder, ["pressure", "velocity", "velocity_exact"] ) # Extract dof map and coordinates VV = VectorFunctionSpace(mesh, "CG", velocity_degree) u_vec = Function(VV, name="Velocity") ue_vec = Function(VV, name="Velocity") coordinates = mesh.coordinates() if velocity_degree == 1: dof_map = vertex_to_dof_map(V) else: dof_map = V.dofmap().entity_dofs(mesh, 0) counter_max, x_hat_map = get_coordinate_map(V, boundary, w_, 1) walls_x = Walls(0, 0, A0, L, T_G, x_hat_map, counter_max, element=V.ufl_element()) walls_y = Walls(0, 1, A0, L, T_G, x_hat_map, counter_max, element=V.ufl_element()) NS_expressions["bc_w0"] = walls_x NS_expressions["bc_w1"] = walls_y # Mesh velocity conditions bc_mesh = dict((ui, []) for ui in u_components) bc0 = DirichletBC(V, walls_x, boundary, 1) bc1 = DirichletBC(V, walls_y, boundary, 1) bc_mesh["u0"] = [bc0] bc_mesh["u1"] = [bc1] return dict( viz_p=viz_p, viz_ue=viz_ue, ue_vec=ue_vec, viz_u=viz_u, u_vec=u_vec, dof_map=dof_map, bc_mesh=bc_mesh, coordinates=coordinates, ue_x=ue_x, ue_y=ue_y, pe=pe, )
[docs]def update_boundary_conditions(t, dt, NS_expressions, **NS_namespace): for key, value in NS_expressions.items(): if "bc" in key: if "IPCS" in NS_parameters["solver"] and "p" in key: deltat_ = dt / 2.0 else: deltat_ = 0.0 if "w" in key: value.update(t - deltat_) else: value.t = t - deltat_
[docs]def temporal_hook( u_, q_, t, nu, VV, dt, u_vec, ue_vec, p_, viz_u, viz_p, viz_ue, initial_fields, tstep, save_solution_frequency, sys_comp, compute_error, total_error, ue_x, ue_y, pe, testing, **NS_namespace, ): """Function called at end of timestep. Plot solution and compute error by comparing to analytical solution. Remember pressure is computed in between timesteps. """ # Save solution if not testing and tstep % save_solution_frequency == 0: assign(u_vec.sub(0), u_[0]) assign(u_vec.sub(1), u_[1]) viz_u.write(u_vec, t) viz_p.write(p_, t) ue_x.t = t ue_y.t = t pe.t = t - dt / 2 uxx = interpolate(ue_x, VV["u0"]) uyy = interpolate(ue_y, VV["u1"]) pp = interpolate(pe, VV["p"]) ues = [] if tstep % compute_error == 0: err = {} for i, ui in enumerate(sys_comp): if "IPCS" in NS_parameters["solver"]: deltat_ = dt / 2.0 if ui == "p" else 0.0 else: deltat_ = 0.0 ue = Expression( (initial_fields[ui]), element=VV[ui].ufl_element(), t=t - deltat_, nu=nu ) if ui == "u0": ue = uxx assign(ue_vec.sub(0), ue) elif ui == "u1": ue = uyy assign(ue_vec.sub(1), ue) elif ui == "p": ue = pp pp.rename("p", "p") if "u" in ui: ues.append(ue) uen = norm(ue.vector()) ue.vector().axpy(-1, q_[ui].vector()) error = norm(ue.vector()) / uen err[ui] = "{0:2.6e}".format(norm(ue.vector()) / uen) total_error[i] += error * dt viz_ue.write(ue_vec, t)
[docs]def theend_hook( q_, t, dt, nu, VV, sys_comp, total_error, initial_fields, **NS_namespace ): final_error = np.zeros(len(sys_comp)) for i, ui in enumerate(sys_comp): if "IPCS" in NS_parameters["solver"] and ui == "p": deltat = dt / 2.0 else: deltat = 0.0 ue = Expression( (initial_fields[ui]), element=VV[ui].ufl_element(), t=t - deltat, nu=nu ) ue = interpolate(ue, VV[ui]) final_error[i] = errornorm(q_[ui], ue) s0 = "Total Error:" s1 = "Final Error:" for i, ui in enumerate(sys_comp): s0 += f" {ui}={total_error[i]:2.6e}" s1 += f" {ui}={final_error[i]:2.6e}" if MPI.rank(MPI.comm_world) == 0: print(s0) print(s1)