Source code for vampy.automatedPreprocessing.preprocessing_common

import gzip
import json
from os import path

import numpy as np
import vtkmodules.numpy_interface.dataset_adapter as dsa
from morphman import vtk_clean_polydata, vtk_triangulate_surface, get_parameters, write_parameters, read_polydata, \
    vmtkscripts, vtk, write_polydata, vtkvmtk, get_curvilinear_coordinate, vtk_compute_threshold, get_vtk_array, \
    get_distance, get_number_of_arrays, vmtk_smooth_surface, get_point_data_array, create_vtk_array, \
    get_vtk_point_locator, vtk_extract_feature_edges, get_uncapped_surface, vtk_compute_connectivity, \
    vtk_compute_mass_properties, extract_single_line, get_centerline_tolerance

from vampy.automatedPreprocessing import ImportData
from vampy.automatedPreprocessing.NetworkBoundaryConditions import FlowSplitting
from vampy.automatedPreprocessing.vmtk_pointselector import vmtkPickPointSeedSelector

# Global array names
distanceToSpheresArrayName = "DistanceToSpheres"
radiusArrayName = 'MaximumInscribedSphereRadius'
cellEntityArrayName = "CellEntityIds"
dijkstraArrayName = "DijkstraDistanceToPoints"


[docs]def get_regions_to_refine(surface, provided_points, dir_path): """ Determines which regions to refine. Args: surface (vtkPoylData): Surface model provided_points (ndarray): Points defining regions to refine dir_path (str): Path to save directory Returns: region_points (list): List of points representing regions to refine """ # Check if info exists if not path.isfile(path.join(dir_path, dir_path + ".txt")): provide_region_points(surface, provided_points, dir_path) # Open info parameters = get_parameters(dir_path) dome = [] for key, value in parameters.items(): if key.startswith("region_"): dome.append(value) if not dome: dome = provide_region_points(surface, provided_points, dir_path) # Flatten list return [item for sublist in dome for item in sublist]
[docs]def provide_region_points(surface, provided_points, dir_path=None): """ Get relevant region points from user selected points on a input surface. Args: provided_points (ndarray): Point(s) representing area to refine surface (vtkPolyData): Surface model. dir_path (str): Location of info.json file Returns: points (list): List of relevant outlet IDs """ # Fix surface cleaned_surface = vtk_clean_polydata(surface) triangulated_surface = vtk_triangulate_surface(cleaned_surface) if provided_points is None: # Select seeds print("--- Please select regions to refine in rendered window") SeedSelector = vmtkPickPointSeedSelector() SeedSelector.SetSurface(triangulated_surface) SeedSelector.Execute() regionSeedIds = SeedSelector.GetTargetSeedIds() get_point = surface.GetPoints().GetPoint points = [list(get_point(regionSeedIds.GetId(i))) for i in range(regionSeedIds.GetNumberOfIds())] else: surface_locator = get_vtk_point_locator(surface) provided_points = [[provided_points[3 * i], provided_points[3 * i + 1], provided_points[3 * i + 2]] for i in range(len(provided_points) // 3)] points = [list(surface.GetPoint(surface_locator.FindClosestPoint(p))) for p in provided_points] if dir_path is not None: info = {"number_of_regions": len(points)} for i in range(len(points)): info["region_%d" % i] = points[i] write_parameters(info, dir_path) return points
[docs]def make_voronoi_diagram(surface, file_path): """ Compute the voronoi diagram of surface model. Args: surface (polydata): Capped surface model to create a Voronoi diagram of. file_path (str): Absolute path to surface model path. Returns: voronoi (vtkPolyData): Voronoi diagram of surface. """ if path.isfile(file_path): return read_polydata(file_path) voronoi = vmtkscripts.vmtkDelaunayVoronoi() voronoi.Surface = surface voronoi.RemoveSubresolutionTetrahedra = 0 voronoi.Execute() write_polydata(voronoi.VoronoiDiagram, file_path) return voronoi.VoronoiDiagram
[docs]def compute_centers_for_meshing(surface, is_atrium, case_path=None, test_capped=False): """ Compute the center of all the openings in the surface. The inlet is chosen based on the largest area for arteries (or aneurysm). However, for atrium, the outlet is chosen based on the largest area (new). Args: test_capped (bool): Check if surface is capped. surface (vtkPolyData): centers of the openings. case_path (str): path to case directory. is_atrium (bool): Check if it is an atrium model. Returns: inlet_center (list): Inlet center. outlet_centers (list): A flattened list with all the outlet centers. """ # Get cells which are open cells = vtk_extract_feature_edges(surface) if cells.GetNumberOfCells() == 0 and not test_capped: print("WARNING: The model is capped, so it is uncapped, but the method is experimental.") uncapped_surface = get_uncapped_surface(surface) compute_centers_for_meshing(uncapped_surface, is_atrium, case_path, test_capped) elif cells.GetNumberOfCells() == 0 and test_capped: return False, 0 # Compute connectivity of the cells outputs = vtk_compute_connectivity(cells) # Get connectivity array region_array = get_point_data_array("RegionId", outputs) if test_capped: return region_array.max() >= 1, region_array.max() # Get points points = [] get_point = outputs.GetPoints().GetPoint for i in range(region_array.shape[0]): points.append(get_point(i)) points = np.asarray(points) # Get area and center area = [] center = [] for i in range(int(region_array.max()) + 1): # Compute area boundary = vtk_compute_threshold(outputs, "RegionId", lower=i - 0.1, upper=i + 0.1, threshold_type="between", source=0) delaunay_filter = vtk.vtkDelaunay2D() delaunay_filter.SetInputData(boundary) delaunay_filter.Update() area.append(vtk_compute_mass_properties(delaunay_filter.GetOutput())) # Get center center.append(np.mean(points[(region_array == i).nonzero()[0]], axis=0)) # Assume multiple inlets for atrium, and multiple outlets for arteries boundary_name = "outlet" if is_atrium else "inlet" boundaries_name = "inlet%d" if is_atrium else "outlet%d" boundary_area_name = boundary_name + "_area" boundaries_area_name = boundaries_name + "_area" boundary_id = area.index(max(area)) if case_path is not None: info = {boundary_name: center[boundary_id].tolist(), boundary_area_name: area[boundary_id]} p = 0 for i in range(len(area)): if i == boundary_id: p = -1 continue info[boundaries_name % (i + p)] = center[i].tolist() info[boundaries_area_name % (i + p)] = area[i] write_parameters(info, case_path) boundary_center = center[boundary_id].tolist() # center of the inlet (artery) / outlet (atrium) center.pop(boundary_id) center_ = [item for sublist in center for item in sublist] # centers of the outlets (artery) / inlets (atrium) return center_, boundary_center
[docs]def get_centers_for_meshing(surface, is_atrium, dir_path, use_flow_extensions=False): """ Get the centers of the inlet and outlets. Args: surface (vtkPolyData): An open surface. is_atrium (bool): True if model represents atrium dir_path (str): Path to the case file. use_flow_extensions (bool): Turn on/off flow extension. Returns: inlet (list): A flatt list with the point of the inlet outlet (list): A flatt list with the points of all the outlets. """ # Check if info exists if use_flow_extensions or not path.isfile(path.join(dir_path, dir_path + ".json")): compute_centers_for_meshing(surface, is_atrium, dir_path) # Open info parameters = get_parameters(dir_path) outlets = [] inlets = [] for key, value in parameters.items(): if "area" not in key and "id" not in key and "relevant" not in key: if "outlet" in key: outlets += value elif "inlet" in key: inlets += value num_outlets = len(outlets) // 3 num_inlets = len(inlets) // 3 if is_atrium and num_inlets != 0: inlets = [] for i in range(num_inlets): inlets += parameters["inlet%d" % i] if not is_atrium and num_outlets != 0: outlets = [] for i in range(num_outlets): outlets += parameters["outlet%d" % i] # FIXIT: atrium case has several inlets (instead of inlet) and only one outlet (instead of outlets). if inlets == [] and outlets == []: inlets, outlets = compute_centers_for_meshing(surface, is_atrium, dir_path) print("The number of outlets =", len(outlets) // 3) print("The number of inlets =", len(inlets) // 3) print() return inlets, outlets
[docs]def dist_sphere_curvature(surface, centerlines, region_center, misr_max, save_path, factor): """ Determines the target edge length for each cell on the surface, including potential refinement or coarsening of certain user specified areas. Level of refinement/coarseness is determined based on surface curvature. Args: surface (vtkPolyData): Input surface model centerlines (vtkPolyData): Centerlines of input model region_center (list): Point representing region to refine misr_max (list): Maximum inscribed sphere radius in region of refinement save_path (str): Location to store processed surface factor (float): Coarsening factor, determining the level of refinement (<1) or coarsening (>1) Returns: surface (vtkPolyData): Processed surface model with info on cell specific target edge length """ # Get longest centerline length = [] for i in range(centerlines.GetNumberOfLines()): line = extract_single_line(centerlines, i) length.append(get_curvilinear_coordinate(line)[-1]) ind_longest = length.index(max(length)) # Get all bifurcations along the longest centerline bif = [] bif_id = [] longest_line = extract_single_line(centerlines, ind_longest) tol = get_centerline_tolerance(centerlines) for i in range(centerlines.GetNumberOfLines()): if i == ind_longest: continue comp_line = extract_single_line(centerlines, i) for j in range(comp_line.GetNumberOfPoints()): pnt1 = longest_line.GetPoints().GetPoint(j) pnt2 = comp_line.GetPoints().GetPoint(j) if get_distance(pnt1, pnt2) > tol: bif.append(pnt1) bif_id.append(j) break # Remove bifurcations detected twice pop = [] for i in range(len(bif)): for j in range(i + 1, len(bif)): dist = get_distance(bif[i], bif[j]) if dist < tol * 6: pop.append(j) for i in sorted(set(pop))[::-1]: bif.pop(i) bif_id.pop(i) # Set siphon to be the location of the first branch, assumes ICA as # inlet, and that the ophthalmic is segmented. bif_id.sort() siphon_point = line.GetPoints().GetPoint(bif_id[0]) distance_to_sphere = compute_distance_to_sphere(surface, siphon_point) # Add the center of the sac for i in range(len(region_center)): distance_to_sphere = compute_distance_to_sphere(distance_to_sphere, region_center[i], distance_scale=0.2 / (misr_max[i] * 2.5)) # Compute curvature curvatureFilter = vmtkscripts.vmtkSurfaceCurvature() curvatureFilter.AbsoluteCurvature = 1 curvatureFilter.MedianFiltering = 1 curvatureFilter.CurvatureType = "gaussian" curvatureFilter.Offset = 0.15 curvatureFilter.BoundedReciprocal = 1 curvatureFilter.Surface = distance_to_sphere curvatureFilter.Execute() # Multiple the surface curvatureSurface = curvatureFilter.Surface curvatureArray = get_point_data_array("Curvature", curvatureSurface) distance_to_sphere_array = get_point_data_array(distanceToSpheresArrayName, distance_to_sphere) size_array = curvatureArray * distance_to_sphere_array * factor size_vtk_array = create_vtk_array(size_array, "Size") curvatureSurface.GetPointData().AddArray(size_vtk_array) write_polydata(curvatureSurface, save_path) return distance_to_sphere
[docs]def dist_sphere_constant(surface, centerlines, region_center, misr_max, save_path, edge_length): """ Determines the target edge length for each cell on the surface, including potential refinement or coarsening of certain user specified areas. Level of refinement/coarseness is determined based on user selected region, otherwise a constant target edge length is selected. Args: surface (vtkPolyData): Input surface model centerlines (vtkPolyData): Centerlines of input model region_center (list): Point representing region to refine misr_max (list): Maximum inscribed sphere radius in region of refinement save_path (str): Location to store processed surface edge_length (float): Target edge length Returns: surface (vtkPolyData): Processed surface model with info on cell specific target edge length """ # Constant meshing method with possible refined area. # --- Compute the distanceToCenterlines distToCenterlines = vmtkscripts.vmtkDistanceToCenterlines() distToCenterlines.Surface = surface distToCenterlines.Centerlines = centerlines distToCenterlines.Execute() distance_to_sphere = distToCenterlines.Surface # Reduce element size in region for i in range(len(region_center)): distance_to_sphere = compute_distance_to_sphere(distance_to_sphere, region_center[i], min_distance=edge_length / 3, max_distance=edge_length, distance_scale=edge_length * 3 / 4 / (misr_max[i] * 2.)) element_size = edge_length + np.zeros((surface.GetNumberOfPoints(), 1)) if len(region_center) != 0: distance_to_spheres_array = get_point_data_array(distanceToSpheresArrayName, distance_to_sphere) element_size = np.minimum(element_size, distance_to_spheres_array) vtk_array = create_vtk_array(element_size, "Size") distance_to_sphere.GetPointData().AddArray(vtk_array) write_polydata(distance_to_sphere, save_path) return distance_to_sphere
[docs]def dist_sphere_diam(surface, centerlines, region_center, misr_max, save_path, factor): """ Determines the target edge length for each cell on the surface, including potential refinement or coarsening of certain user specified areas. Level of refinement/coarseness is determined based on the distance to the centerline. Args: surface (vtkPolyData): Input surface model centerlines (vtkPolyData): Centerlines of input model region_center (list): Point representing region to refine misr_max (list): Maximum inscribed sphere radius in region of refinement save_path (str): Location to store processed surface factor (float): Coarsening factor, determining the level of refinement (<1) or coarsening (>1) Returns: surface (vtkPolyData): Processed surface model with info on cell specific target edge length """ # Meshing method following Owais way. # --- Compute the distanceToCenterlines distToCenterlines = vmtkscripts.vmtkDistanceToCenterlines() distToCenterlines.Surface = surface distToCenterlines.Centerlines = centerlines distToCenterlines.UseRadiusThreshold = False distToCenterlines.Execute() distance_to_sphere = distToCenterlines.Surface # Compute element size based on diameter upper = 20 lower = 6 diameter_array = 2 * get_point_data_array("DistanceToCenterlines", distance_to_sphere) element_size = 13. / 35 * diameter_array ** 2 + lower element_size[element_size > upper] = upper element_size[element_size < lower] = lower elements_vtk = create_vtk_array(element_size, "Num elements") distance_to_sphere.GetPointData().AddArray(elements_vtk) element_size = diameter_array / element_size # Reduce element size in refinement region for i in range(len(region_center)): distance_to_sphere = compute_distance_to_sphere(distance_to_sphere, region_center[i], max_distance=100, distance_scale=0.2 / (misr_max[i] * 2.)) if len(region_center) == 0: element_size *= factor else: distance_to_spheres_array = get_point_data_array(distanceToSpheresArrayName, distance_to_sphere) element_size = np.minimum(element_size, distance_to_spheres_array) * factor vtk_array = create_vtk_array(element_size, "Size") distance_to_sphere.GetPointData().AddArray(vtk_array) write_polydata(distance_to_sphere, save_path) return distance_to_sphere
[docs]def mesh_alternative(surface): """ Subdivides and smoothes the input surface model, preparing it for volumetric meshing. Args: surface (vtkPolyData): Input surface model to be meshed alternatively Returns: surface (vtkPolyData): Smoothed model """ print("--- Meshing failed.") print("--- Proceeding with surface smooting and meshing.") surface = vmtk_smooth_surface(surface, "laplace", iterations=500) surfaceSubdivision = vmtkscripts.vmtkSurfaceSubdivision() surfaceSubdivision.Surface = surface surfaceSubdivision.Method = "butterfly" surfaceSubdivision.Execute() surface = surfaceSubdivision.Surface return vmtk_smooth_surface(surface, "laplace", iterations=500)
[docs]def compute_distance_to_sphere(surface, center_sphere, radius_sphere=0.0, distance_offset=0.0, distance_scale=0.01, min_distance=0.2, max_distance=0.3): """ Computes cell specific target edge length (distances) based on input criterion. Args: surface (vtkPolyData): Input surface model center_sphere (list): Point representing region to refine radius_sphere (list): Maximum inscribed sphere radius in region of refinement distance_offset (float): Offsets the relevant region by a offset distance_scale (float): Determines how the distance is scaled based on the distance from the relevant region min_distance (float): Minimum distance away from the relevant region before scaling starts max_distance (float): Maximum distance away from the relevant region before scaling stops Returns: surface (vtkPolyData): Modified surface model with distances """ # Check if there allready exists a distance to spheres N = surface.GetNumberOfPoints() number, names = get_number_of_arrays(surface) add = False if distanceToSpheresArrayName not in names: add = True # Get array if add: dist_array = get_vtk_array(distanceToSpheresArrayName, 1, N) surface.GetPointData().AddArray(dist_array) else: dist_array = surface.GetPointData().GetArray(distanceToSpheresArrayName) # Compute distance for i in range(N): distanceToSphere = dist_array.GetComponent(i, 0) # Get distance, but factor in size of sphere newDist = get_distance(center_sphere, surface.GetPoints().GetPoint(i)) - radius_sphere # Set offset and scale distance newDist = distance_offset + newDist * distance_scale # Capp to min distance if newDist < min_distance: newDist = min_distance # Capp to max distance if newDist > max_distance: newDist = max_distance # Keep smallest distance newDist = min(newDist, distanceToSphere) if not add else newDist dist_array.SetComponent(i, 0, newDist) return surface
[docs]def generate_mesh(surface, add_boundary_layer): """ Generates a mesh suitable for CFD from a input surface model. Args: surface (vtkPolyData): Surface model to be meshed. Returns: mesh (vtkUnstructuredGrid): Output mesh remeshedsurface (vtkPolyData): Remeshed version of the input model """ # Compute the mesh. meshGenerator = vmtkscripts.vmtkMeshGenerator() meshGenerator.Surface = surface meshGenerator.ElementSizeMode = "edgelengtharray" # Variable size mesh meshGenerator.TargetEdgeLengthArrayName = "Size" # Variable size mesh meshGenerator.LogOn = 0 if add_boundary_layer: meshGenerator.BoundaryLayer = 1 meshGenerator.NumberOfSubLayers = 4 meshGenerator.BoundaryLayerOnCaps = 0 meshGenerator.BoundaryLayerThicknessFactor = 0.85 meshGenerator.SubLayerRatio = 0.75 meshGenerator.Tetrahedralize = 1 meshGenerator.VolumeElementScaleFactor = 0.8 meshGenerator.EndcapsEdgeLengthFactor = 1.0 else: meshGenerator.BoundaryLayer = 0 meshGenerator.BoundaryLayerOnCaps = 1 # Mesh meshGenerator.Execute() # Remeshed surface, store for later remeshSurface = meshGenerator.RemeshedSurface # Full mesh mesh = meshGenerator.Mesh return mesh, remeshSurface
[docs]def find_boundaries(model_path, mean_inflow_rate, network, mesh, verbose_print, is_atrium): """ Finds inlet and outlet boundary IDs after complete meshing, including mean flow ratio and area ratios between outlets or inlets (determined by type of model) Args: model_path (str): Path to model files mean_inflow_rate (float): Flow rate network (Network): Flow splitting network based on network boundary condition mesh (vtkUnstructuredGrid): Volumetric mesh verbose_print (bool): Prints additional info if True is_atrium (bool): Determines if model represents atrium or artery """ # Extract the surface mesh of the wall wallMesh = vtk_compute_threshold(mesh, "CellEntityIds", lower=0.5, upper=1.5) boundaryReferenceSystems = vmtkscripts.vmtkBoundaryReferenceSystems() boundaryReferenceSystems.Surface = wallMesh boundaryReferenceSystems.Execute() refSystem = boundaryReferenceSystems.ReferenceSystems cellEntityIdsArray = get_vtk_array('CellEntityIds', 0, refSystem.GetNumberOfPoints()) refSystem.GetPointData().AddArray(cellEntityIdsArray) # Extract the surface mesh of the end caps boundarySurface = vtk_compute_threshold(mesh, "CellEntityIds", lower=1.5, threshold_type="lower") pointCells = vtk.vtkIdList() surfaceCellEntityIdsArray = vtk.vtkIntArray() surfaceCellEntityIdsArray.DeepCopy(boundarySurface.GetCellData().GetArray('CellEntityIds')) # Find the corresponding couple (mesh outlet ID, network ID). ids = [] for i in range(refSystem.GetNumberOfPoints()): distancePoints = 10000000 pointId = boundarySurface.FindPoint(refSystem.GetPoint(i)) boundarySurface.GetPointCells(pointId, pointCells) cellId = pointCells.GetId(0) cellEntityId = surfaceCellEntityIdsArray.GetValue(cellId) cellEntityIdsArray.SetValue(i, cellEntityId) meshPoint = refSystem.GetPoint(i) for element in network.elements: if element.IsAnOutlet(): networkPoint = element.GetOutPointsx1()[0] if element.IsAnInlet(): networkPoint = element.GetInPointsx0()[0] if vtk.vtkMath.Distance2BetweenPoints(meshPoint, networkPoint) < distancePoints: distancePoints = vtk.vtkMath.Distance2BetweenPoints(meshPoint, networkPoint) closest = element.GetId() if network.elements[closest].IsAnInlet(): verbose_print('I am the inlet, Sup?') verbose_print(network.elements[closest].GetInPointsx0()[0]) ids.insert(0, [cellEntityId, mean_inflow_rate]) else: gamma = network.elements[closest].GetGamma() ids.append([cellEntityId, gamma]) verbose_print(gamma) verbose_print(network.elements[closest].GetOutPointsx1()[0]) verbose_print('CellEntityId: %d\n' % cellEntityId) verbose_print('meshPoint: %f, %f, %f\n' % (meshPoint[0], meshPoint[1], meshPoint[2])) verbose_print(ids) # Store information for the solver. inlet_id = [ids[0][0] - 1] outlet_ids = [] area_ratios = [] for k in range(1, refSystem.GetNumberOfPoints()): outlet_ids.append(ids[k][0] - 1) area_ratios.append(ids[k][1]) info = { "mean_flow_rate": mean_inflow_rate, "area_ratio": area_ratios } # Swap outlet with inlet if meshing atrium model if is_atrium: info['inlet_ids'] = outlet_ids info['outlet_id'] = inlet_id else: info['inlet_id'] = inlet_id info['outlet_ids'] = outlet_ids write_parameters(info, model_path)
[docs]def setup_model_network(centerlines, file_name_probe_points, region_center, verbose_print, is_atrium): """ Sets up network used for network boundary condition model. Args: centerlines (vtkPolyData): Centerlines representing meshed model file_name_probe_points (str): Save path of probe points region_center (list): List of points representing region of refinement verbose_print (bool): Prints additional info if True is_atrium (bool): Determines if model is atrium or artery Returns: network (Network): Network model probe_points (ndarray): Probe points where velocity and pressure is to be sampled """ # Set the network object used in the scripts for # boundary conditions and probes. network = ImportData.Network() centerlinesBranches = ImportData.SetNetworkStructure(centerlines, network, verbose_print) if not path.isfile(file_name_probe_points): # Get the list of coordinates for the probe points along the network centerline. listProbePoints = ImportData.GetListProbePoints(centerlinesBranches, network, verbose_print) listProbePoints += region_center print("--- Saving probes points in: %s\n" % file_name_probe_points) probe_points = np.array(listProbePoints) with open(file_name_probe_points, 'w') as outfile: json.dump(probe_points.tolist(), outfile) else: with open(file_name_probe_points, 'r') as infile: probe_points = np.array(json.load(infile)) # Set the flow split and inlet boundary condition # Compute the outlet boundary condition percentages. flowSplitting = FlowSplitting() if not is_atrium and network.GetNumberOfOutlet() > 1: flowSplitting.ComputeAlphas(network, verbose_print) flowSplitting.ComputeBetas(network, verbose_print) flowSplitting.ComputeGammas(network, verbose_print) flowSplitting.CheckTotalFlowRate(network, verbose_print) return network, probe_points
[docs]def compute_flow_rate(is_atrium, inlet, parameters, flow_rate_factor): """ Computes mean flow rate used as a boundary condition for pulsatile flow condition Args: is_atrium (bool): Determines if model is atrium or artery inlet (list): List of points representing midpoint of boundaries parameters (dict): Dictionary containing model parameters flow_rate_factor (float): Factor to adjust flow rate calculation Returns: mean_inflow_rate (float): Mean inflow rate """ # FIXME: Add plausible boundary conditions for atrial flow if is_atrium: total_inlet_area = 0.0 num_inlets = len(inlet) // 3 for i in range(num_inlets): total_inlet_area += parameters["inlet%s_area" % i] mean_inflow_rate = flow_rate_factor * total_inlet_area else: mean_inflow_rate = flow_rate_factor * parameters["inlet_area"] return mean_inflow_rate
[docs]def write_mesh(compress_mesh, file_name_surface_name, file_name_vtu_mesh, file_name_xml_mesh, mesh, remeshed_surface): """ Writes the mesh to DOLFIN format, and compresses to .gz format Args: compress_mesh (bool): Compressed mesh to zipped format file_name_surface_name (str): Path to remeshed surface model file_name_vtu_mesh (str): Path to VTK mesh file_name_xml_mesh (str): Path to XML mesh mesh (vtuUnstructuredGrid): Meshed surface model remeshed_surface (vtkPolyData): Remeshed surface model """ # Write mesh in VTU format write_polydata(remeshed_surface, file_name_surface_name) write_polydata(mesh, file_name_vtu_mesh) # Write mesh to FEniCS to format if compress_mesh: file_name_xml_mesh += '.gz' writer = vtkvmtk.vtkvmtkDolfinWriter() writer.SetInputData(mesh) writer.SetFileName(file_name_xml_mesh) writer.SetBoundaryDataArrayName("CellEntityIds") writer.SetBoundaryDataIdOffset(-1) writer.SetStoreCellMarkers(1) print('--- Writing Dolfin file') writer.Write() # Compress mesh locally to bypass VMTK issue if compress_mesh: file = open(file_name_xml_mesh, 'rb') xml = file.read() file.close() gzfile = gzip.open(file_name_xml_mesh, 'wb') gzfile.write(xml) gzfile.close()
[docs]def add_flow_extension(surface, centerlines, is_inlet, extension_length=2.0, extension_mode="boundarynormal"): """ Adds flow extensions to either all inlets or all outlets with specified extension length. Args: surface (vtkPolyData): Surface model to extend centerlines (vtkPolyData): Centerlines in model. is_inlet (bool): Determines if inlet or outlet is to be extended. extension_length (float): Determines length of flow extensions. Factor is multiplied with MISR at relevant boundary extension_mode (str): Determines how extensions are place, either normal to boundary or following centerline direction Returns: surface_extended (vtkPolyData): Extended surface model """ # Mimick behaviour of vmtkflowextensionfilter boundaryExtractor = vtkvmtk.vtkvmtkPolyDataBoundaryExtractor() boundaryExtractor.SetInputData(surface) boundaryExtractor.Update() boundaries = boundaryExtractor.GetOutput() # Find outlet lengths = [] for i in range(boundaries.GetNumberOfCells()): lengths.append(get_curvilinear_coordinate(boundaries.GetCell(i))[-1]) inlet_id = lengths.index(max(lengths)) # Exclude outlet or inlets boundaryIds = vtk.vtkIdList() for i in range(centerlines.GetNumberOfLines() + 1): if is_inlet and i == inlet_id: boundaryIds.InsertNextId(i) if not is_inlet and i != inlet_id: boundaryIds.InsertNextId(i) flowExtensionsFilter = vtkvmtk.vtkvmtkPolyDataFlowExtensionsFilter() flowExtensionsFilter.SetInputData(surface) flowExtensionsFilter.SetCenterlines(centerlines) flowExtensionsFilter.SetAdaptiveExtensionLength(1) flowExtensionsFilter.SetAdaptiveNumberOfBoundaryPoints(0) flowExtensionsFilter.SetExtensionRatio(extension_length) flowExtensionsFilter.SetTransitionRatio(1.0) flowExtensionsFilter.SetCenterlineNormalEstimationDistanceRatio(1.0) if extension_mode == "centerlinedirection": flowExtensionsFilter.SetExtensionModeToUseCenterlineDirection() if extension_mode == "boundarynormal": flowExtensionsFilter.SetExtensionModeToUseNormalToBoundary() flowExtensionsFilter.SetInterpolationModeToThinPlateSpline() flowExtensionsFilter.SetBoundaryIds(boundaryIds) flowExtensionsFilter.SetSigma(1.0) flowExtensionsFilter.Update() surface_extended = flowExtensionsFilter.GetOutput() return surface_extended
[docs]def scale_surface(surface, scale_factor): """ Scale the input surface by a factor scale_factor. Args: surface (vtkPolyData): Input surface to be scaled scale_factor (float): Scaling factor Returns: scaled_surface (vtkPolyData): Scaled input surface """ surface_scaler = vmtkscripts.vmtkSurfaceScaling() surface_scaler.Surface = surface surface_scaler.ScaleFactor = scale_factor surface_scaler.Execute() # Get scaled surface scaled_surface = surface_scaler.Surface return scaled_surface
[docs]def get_furtest_surface_point(inlet, surface): """ Calculates the furthest point on a given surface from a specified inlet point. It iterates over each point on the surface, calculating its distance from the inlet and updates the furthest distance accordingly. Args: inlet (list): A list with the inlet point. surface (vtkPolyData): The surface to check for the furthest point. Returns: outlets (list): The x, y, z coordinates of the furthest point on the surface from the inlet. """ outlets = [] end_point_distance = 0 for i in range(surface.GetNumberOfPoints()): tmp_p = list(surface.GetPoint(i)) dx = get_distance(inlet, tmp_p) if dx > end_point_distance: end_point_distance = dx outlets = tmp_p return outlets
[docs]def check_if_closed_surface(surface): """ Checks if the given surface is capped (i.e., has no feature edges). Args: surface (vtkPolyData): The surface to check for capping. Returns: bool: True if the surface is capped, False otherwise. """ cells = vtk_extract_feature_edges(surface) return cells.GetNumberOfCells() == 0
[docs]def remesh_surface(surface, edge_length, element_size_mode="edgelength", exclude=None): """ Remeshes a given surface based on the specified parameters. Args: surface (vtkPolyData): The input surface to be remeshed. edge_length (float): The target edge length for remeshing. element_size_mode (str, optional): Determines the method for sizing elements during remeshing. exclude (list, optional): A list of entity IDs to be excluded during remeshing. Defaults to None. Returns: remeshed_surface (vtkPolyData): The remeshed surface. """ surface = dsa.WrapDataObject(surface) if cellEntityArrayName not in surface.CellData.keys(): surface.CellData.append(np.zeros(surface.VTKObject.GetNumberOfCells()) + 1, cellEntityArrayName) remeshing = vmtkscripts.vmtkSurfaceRemeshing() remeshing.Surface = surface.VTKObject remeshing.CellEntityIdsArrayName = cellEntityArrayName remeshing.TargetEdgeLength = edge_length remeshing.MaxEdgeLength = 1e6 remeshing.MinEdgeLength = 0.0 remeshing.TargetEdgeLengthFactor = 1.0 remeshing.TriangleSplitFactor = 5.0 remeshing.ElementSizeMode = element_size_mode if element_size_mode == "edgelength": remeshing.TargetEdgeLengthArrayName = "" else: remeshing.TargetEdgeLengthArrayName = "Size" # Variable size mesh if exclude is not None: remeshing.ExcludeEntityIds = exclude remeshing.Execute() remeshed_surface = remeshing.Surface return remeshed_surface
[docs]def dist_sphere_geodesic(surface, region_center, max_distance, save_path, edge_length): """ Determines the target edge length for each cell on the surface, including potential refinement or coarsening of certain user specified areas. Level of refinement/coarseness is determined based on user selected region and the geodesic distance from it. Args: surface (vtkPolyData): Input surface model region_center (list): Point representing region to refine save_path (str): Location to store processed surface edge_length (float): Target edge length max_distance (float): Max distance of geodesic measure Returns: surface (vtkPolyData): Processed surface model with info on cell specific target edge length """ # Define refine region point ID locator = get_vtk_point_locator(surface) point = region_center[0] region_id = locator.FindClosestPoint(point) region_ids = vtk.vtkIdList() region_ids.InsertNextId(region_id) # Compute geodesic distance to point dijkstra = vtkvmtk.vtkvmtkPolyDataDijkstraDistanceToPoints() dijkstra.SetInputData(surface) dijkstra.SetSeedIds(region_ids) dijkstra.SetDistanceOffset(0) dijkstra.SetDistanceScale(1) dijkstra.SetMinDistance(0) dijkstra.SetMaxDistance(max_distance) dijkstra.SetDijkstraDistanceToPointsArrayName(dijkstraArrayName) dijkstra.Update() geodesic_distance = dijkstra.GetOutput() # Create smooth transition between LAA and LA lumen N = surface.GetNumberOfPoints() dist_array = geodesic_distance.GetPointData().GetArray(dijkstraArrayName) for i in range(N): dist = dist_array.GetComponent(i, 0) / max_distance newDist = 1 / 3 * edge_length * (1 + 2 * dist ** 3) dist_array.SetComponent(i, 0, newDist) # Set element size based on geodesic distance element_size = get_point_data_array(dijkstraArrayName, geodesic_distance) vtk_array = create_vtk_array(element_size, "Size") geodesic_distance.GetPointData().AddArray(vtk_array) write_polydata(geodesic_distance, save_path) return geodesic_distance