Source code for oasismove.common.utilities

__author__ = "Mikael Mortensen <mikaem@math.uio.no>"
__date__ = "2014-10-03"
__copyright__ = "Copyright (C) 2014 " + __author__
__license__ = "GNU Lesser GPL version 3 or any later version"

from dolfin import (
    DirichletBC,
    Form,
    Function,
    FunctionAssigner,
    FunctionSpace,
    Matrix,
    PETScKrylovSolver,
    PETScPreconditioner,
    TestFunction,
    Timer,
    TrialFunction,
    Vector,
    VectorFunctionSpace,
    assemble,
    div,
    dx,
    grad,
    inner,
)
from ufl import Coefficient
from ufl.tensors import ListTensor


# Create some dictionaries to hold work matrices
[docs]class Mat_cache_dict(dict): """Items in dictionary are matrices stored for efficient reuse.""" def __init__(self): self.t = 0 self.forms = {}
[docs] def update_t(self, t): if self.t < t: self.t = t # Only assmeble each form once, then apply BCs for form in self.forms.keys(): assemble(form, tensor=self[(form, self.forms[form][0])]) # Copy over to other for b in self.forms[form][1:]: self[(form, b)].empty() self[(form, b)].axpy(1, self[(form, self.forms[form][0])], True) # Apply bcs for form, bcs in self.keys(): for bc in bcs: bc.apply(self[(form, bcs)])
def __missing__(self, key): form, bcs = key if (form, ()) in self.keys(): A = self[(form, ())].copy() else: A = assemble(form) for bc in bcs: bc.apply(A) # Add to forms dictionary if form in self.forms.keys(): self.forms[form].append(bcs) else: self.forms[form] = [bcs] self[key] = A return self[key]
# Create some dictionaries to hold solvers used for projection
[docs]class Solver_cache_dict(dict): """Items in dictionary are Linear algebra solvers stored for efficient reuse.""" def __missing__(self, key): assert len(key) == 4 form, bcs, solver_type, preconditioner_type = key prec = PETScPreconditioner(preconditioner_type) sol = PETScKrylovSolver(solver_type, prec) # sol.prec = prec # sol = KrylovSolver(solver_type, preconditioner_type) # sol.parameters["preconditioner"]["structure"] = "same" sol.parameters["error_on_nonconvergence"] = False sol.parameters["monitor_convergence"] = False sol.parameters["report"] = False self[key] = sol return self[key]
A_cache = Mat_cache_dict() Solver_cache = Solver_cache_dict()
[docs]def assemble_matrix(form, bcs=[]): """Assemble matrix using cache register.""" assert Form(form).rank() == 2 return A_cache[(form, tuple(bcs))]
[docs]class OasisFunction(Function): """Function with more or less efficient projection methods of associated linear form. The matvec option is provided for letting the right hand side be computed through a fast matrix vector product. Both the matrix and the Coefficient of the required vector must be provided. method = "default" Solve projection with regular linear algebra using solver_type and preconditioner_type method = "lumping" Solve through lumping of mass matrix """ def __init__( self, form, Space, bcs=[], name="x", matvec=[None, None], method="default", solver_type="cg", preconditioner_type="default", ): Function.__init__(self, Space, name=name) self.form = form self.method = method self.bcs = bcs self.matvec = matvec self.trial = trial = TrialFunction(Space) self.test = test = TestFunction(Space) Mass = inner(trial, test) * dx() self.bf = inner(form, test) * dx() self.rhs = Vector(self.vector()) if method.lower() == "default": self.A = A_cache[(Mass, tuple(bcs))] self.sol = Solver_cache[ (Mass, tuple(bcs), solver_type, preconditioner_type) ] elif method.lower() == "lumping": assert Space.ufl_element().degree() < 2 self.A = A_cache[(Mass, tuple(bcs))] ones = Function(Space) ones.vector()[:] = 1.0 self.ML = self.A * ones.vector() self.ML.set_local(1.0 / self.ML.array())
[docs] def assemble_rhs(self): """ Assemble right hand side (form*test*dx) in projection """ if not self.matvec[0] is None: mat, func = self.matvec self.rhs.zero() self.rhs.axpy(1.0, mat * func.vector()) else: assemble(self.bf, tensor=self.rhs)
def __call__(self, assemb_rhs=True): """ Compute the projection """ Timer("Projecting {}".format(self.name())) if assemb_rhs: self.assemble_rhs() for bc in self.bcs: bc.apply(self.rhs) if self.method.lower() == "default": self.sol.solve(self.A, self.vector(), self.rhs) else: self.vector().zero() self.vector().axpy(1.0, self.rhs * self.ML)
[docs]class GradFunction(OasisFunction): """ Function used for projecting gradients. Typically used for computing pressure gradient on velocity function space. """ def __init__(self, p_, Space, i=0, bcs=[], name="grad", method={}): assert len(p_.ufl_shape) == 0 assert i >= 0 and i < Space.mesh().geometry().dim() solver_type = method.get("solver_type", "cg") preconditioner_type = method.get("preconditioner_type", "default") solver_method = method.get("method", "default") low_memory_version = method.get("low_memory_version", False) OasisFunction.__init__( self, p_.dx(i), Space, bcs=bcs, name=name, method=solver_method, solver_type=solver_type, preconditioner_type=preconditioner_type, ) self.i = i Source = p_.function_space() if not low_memory_version: self.matvec = [ A_cache[(self.test * TrialFunction(Source).dx(i) * dx, ())], p_, ] if solver_method.lower() == "gradient_matrix": from fenicstools import compiled_gradient_module DG = FunctionSpace(Space.mesh(), "DG", 0) G = assemble(TrialFunction(DG) * self.test * dx()) dg = Function(DG) dP = assemble( TrialFunction(p_.function_space()).dx(i) * TestFunction(DG) * dx() ) self.WGM = compiled_gradient_module.compute_weighted_gradient_matrix( G, dP, dg )
[docs] def assemble_rhs(self, u=None): """ Assemble right hand side trial.dx(i)*test*dx. Possible Coefficient u may replace p and makes it possible to use this Function to compute both grad(p) and grad(dp), i.e., the gradient of pressure correction. """ if isinstance(u, Coefficient): self.matvec[1] = u self.bf = u.dx(self.i) * self.test * dx() if not self.matvec[0] is None: mat, func = self.matvec self.rhs.zero() self.rhs.axpy(1.0, mat * func.vector()) else: assemble(self.bf, tensor=self.rhs)
def __call__(self, u=None, assemb_rhs=True): if isinstance(u, Coefficient): self.matvec[1] = u self.bf = u.dx(self.i) * self.test * dx() if self.method.lower() == "gradient_matrix": self.vector().zero() self.vector().axpy(1.0, self.WGM * self.matvec[1].vector()) else: OasisFunction.__call__(self, assemb_rhs=assemb_rhs)
[docs]class DivFunction(OasisFunction): """ Function used for projecting divergence of vector. Typically used for computing divergence of velocity on pressure function space. """ def __init__(self, u_, Space, bcs=[], name="div", method={}): solver_type = method.get("solver_type", "cg") preconditioner_type = method.get("preconditioner_type", "default") solver_method = method.get("method", "default") low_memory_version = method.get("low_memory_version", False) OasisFunction.__init__( self, div(u_), Space, bcs=bcs, name=name, method=solver_method, solver_type=solver_type, preconditioner_type=preconditioner_type, ) Source = u_[0].function_space() if not low_memory_version: self.matvec = [ [A_cache[(self.test * TrialFunction(Source).dx(i) * dx, ())], u_[i]] for i in range(Space.mesh().geometry().dim()) ] if solver_method.lower() == "gradient_matrix": from fenicstools import compiled_gradient_module DG = FunctionSpace(Space.mesh(), "DG", 0) G = assemble(TrialFunction(DG) * self.test * dx()) dg = Function(DG) self.WGM = [] st = TrialFunction(Source) for i in range(Space.mesh().geometry().dim()): dP = assemble(st.dx(i) * TestFunction(DG) * dx) A = Matrix(G) self.WGM.append( compiled_gradient_module.compute_weighted_gradient_matrix(A, dP, dg) )
[docs] def assemble_rhs(self): """ Assemble right hand side (form*test*dx) in projection """ if not self.matvec[0] is None: self.rhs.zero() for mat, vec in self.matvec: self.rhs.axpy(1.0, mat * vec.vector()) else: assemble(self.bf, tensor=self.rhs)
def __call__(self, assemb_rhs=True): if self.method.lower() == "gradient_matrix": # Note that assembling rhs is not necessary using gradient_matrix if assemb_rhs: self.assemble_rhs() self.vector().zero() for i in range(self.function_space().mesh().geometry().dim()): self.vector().axpy(1.0, self.WGM[i] * self.matvec[i][1].vector()) else: OasisFunction.__call__(self, assemb_rhs=assemb_rhs)
[docs]class CG1Function(OasisFunction): """ Function used for projecting a CG1 space, possibly using a weighted average. Typically used for computing turbulent viscosity in LES. """ def __init__(self, form, mesh, bcs=[], name="CG1", method={}, bounded=False): solver_type = method.get("solver_type", "cg") preconditioner_type = method.get("preconditioner_type", "default") solver_method = method.get("method", "default") self.bounded = bounded Space = FunctionSpace(mesh, "CG", 1) OasisFunction.__init__( self, form, Space, bcs=bcs, name=name, method=solver_method, solver_type=solver_type, preconditioner_type=preconditioner_type, ) if solver_method.lower() == "weightedaverage": from fenicstools import compiled_gradient_module DG = FunctionSpace(mesh, "DG", 0) # Cannot use cache. Matrix will be modified self.A = assemble(TrialFunction(DG) * self.test * dx()) self.dg = dg = Function(DG) compiled_gradient_module.compute_DG0_to_CG_weight_matrix(self.A, dg) self.bf_dg = inner(form, TestFunction(DG)) * dx() def __call__(self): if self.method.lower() == "weightedaverage": assemble(self.bf_dg, tensor=self.dg.vector()) # Compute weighted average on CG1 self.vector().zero() self.vector().axpy(1.0, self.A * self.dg.vector()) self.vector().apply("insert") [bc.apply(self.vector()) for bc in self.bcs] else: OasisFunction.__call__(self) if self.bounded: self.bound()
[docs] def bound(self): self.vector().set_local(self.vector().get_local().clip(min=0)) self.vector().apply("insert")
[docs]class AssignedVectorFunction(Function): """Vector function used for postprocessing. Assign data from ListTensor components using FunctionAssigner. """ def __init__(self, u, name="Assigned Vector Function"): self.u = u assert isinstance(u, ListTensor) V = u[0].function_space() mesh = V.mesh() family = V.ufl_element().family() degree = V.ufl_element().degree() constrained_domain = V.dofmap().constrained_domain Vv = VectorFunctionSpace( mesh, family, degree, constrained_domain=constrained_domain ) Function.__init__(self, Vv, name=name) self.fa = [FunctionAssigner(Vv.sub(i), V) for i, _u in enumerate(u)] def __call__(self): for i, _u in enumerate(self.u): self.fa[i].assign(self.sub(i), _u)
[docs]class LESsource(Function): """Function used for computing the transposed source to the LES equation.""" def __init__(self, nut, u, Space, bcs=[], name=""): Function.__init__(self, Space, name=name) dim = Space.mesh().geometry().dim() test = TestFunction(Space) self.bf = [inner(inner(grad(nut), u.dx(i)), test) * dx for i in range(dim)]
[docs] def assemble_rhs(self, i=0): """Assemble right hand side.""" assemble(self.bf[i], tensor=self.vector())
[docs]class NNsource(Function): """Function used for computing the transposed source to the non-Newtonian equation.""" def __init__(self, nunn, u, Space, bcs=[], name=""): Function.__init__(self, Space, name=name) dim = Space.mesh().geometry().dim() test = TestFunction(Space) self.bf = [inner(inner(grad(nunn), u.dx(i)), test) * dx for i in range(dim)]
[docs] def assemble_rhs(self, i=0): """Assemble right hand side.""" assemble(self.bf[i], tensor=self.vector())
[docs]def homogenize(bcs): b = [] for bc in bcs: b0 = DirichletBC(bc) b0.homogenize() b.append(b0) return b