Source code for libtts.ground_detection

"""Functions for detecting ground points and classifying points (ground and vegetation).

Current implementation: creating a Digital Terrain Model (DTM), and classifying points as ground or vegetation based on the height.

Example:
    out_gd_file, out_veg_file = libtts.run_ground_detection(infile = infile, out_gd_file = out_gd_file, out_veg_file = out_veg_file, grid_size=0.1, height_threshold = 0.5)                                                 
"""

import argparse
import numpy as np
from scipy.interpolate import griddata
import cv2

try:
    import matplotlib.pyplot as plt
    PLOTTING_ENABLED = True
except ImportError:
    PLOTTING_ENABLED = False

try:
    from plyfile import PlyData, PlyElement
    PLYFILE_ENABLED = True
except ImportError:
    PLYFILE_ENABLED = False

try:
    import laspy
    LASPY_ENABLED = True
except ImportError:
    LASPY_ENABLED = False

# --- Core Algorithm Functions ---

def _create_initial_grid(
    points: np.ndarray,
    grid_size: float
) -> tuple[np.ndarray, np.ndarray, np.ndarray]:
    """Internal function to create a raw 2D grid of lowest Z points.
    
    Args:
        points (np.ndarray): Nx3 array of (x, y, z) points.
        grid_size (float): The size of each grid cell.
    
    Returns:
        tuple: A tuple containing:
            - grid_x (np.ndarray): 2D array of X coordinates for the grid.
            - grid_y (np.ndarray): 2D array of Y coordinates for the grid.
            - grid_z_min (np.ndarray): 2D array of minimum Z values for each grid cell.
    """
    points = np.asarray(points)[:, :3]
    min_x, min_y, _ = np.min(points, axis=0)
    max_x, max_y, _ = np.max(points, axis=0)

    x_indices = ((points[:, 0] - min_x) / grid_size).astype(int)
    y_indices = ((points[:, 1] - min_y) / grid_size).astype(int)

    n_x = int((max_x - min_x) / grid_size) + 1
    n_y = int((max_y - min_y) / grid_size) + 1

    grid_z_min = np.full((n_y, n_x), np.inf, dtype=float)
    indices_and_z = np.vstack([y_indices, x_indices, points[:, 2]]).T
    indices_and_z = indices_and_z[indices_and_z[:, 2].argsort()]

    unique_indices, first_occurrence_indices = np.unique(
        indices_and_z[:, :2], axis=0, return_index=True
    )
    unique_indices = unique_indices.astype(int)
    min_z_values = indices_and_z[first_occurrence_indices, 2]

    grid_z_min[unique_indices[:, 0], unique_indices[:, 1]] = min_z_values
    grid_z_min[grid_z_min == np.inf] = np.nan

    grid_x_coords = min_x + np.arange(n_x) * grid_size
    grid_y_coords = min_y + np.arange(n_y) * grid_size
    grid_x, grid_y = np.meshgrid(grid_x_coords, grid_y_coords)

    return grid_x, grid_y, grid_z_min


def _fill_and_smooth_grid(
    grid_z: np.ndarray,
    outlier_std_dev: float,
    gaussian_kernel_size: int
) -> np.ndarray:
    """Internal function to fill, filter, and smooth a 2D grid.

    This helper function performs three main operations on a copy of the input grid:
    1.  Removes statistical outliers by replacing them with NaN.
    2.  Fills any NaN values using linear interpolation based on valid neighbors.
    3.  Applies a Gaussian blur to smooth the entire grid.

    Args:
        grid_z (np.ndarray): A 2D NumPy array representing the Z-values of the grid.
            This grid may contain NaN values.
        outlier_std_dev (float): The number of standard deviations from the mean
            to use as a threshold for outlier removal. Any value outside
            `mean ± outlier_std_dev * std` is replaced with NaN. If this is
            set to 0 or less, this step is skipped.
        gaussian_kernel_size (int): The size of the kernel for Gaussian smoothing.
            This should be a positive, odd integer (e.g., 3, 5, 7). If this
            is set to 0 or less, this step is skipped.

    Returns:
        np.ndarray: A new 2D NumPy array of the same shape as `grid_z` that has
            been filtered, filled, and smoothed.
    """

    processed_grid = np.copy(grid_z)
    
    if outlier_std_dev > 0:
        grid_nonan = processed_grid[~np.isnan(processed_grid)]
        if grid_nonan.size > 0:
            mean_z = np.mean(grid_nonan)
            std_z = np.std(grid_nonan)
            processed_grid[processed_grid > mean_z + outlier_std_dev * std_z] = np.nan
            processed_grid[processed_grid < mean_z - outlier_std_dev * std_z] = np.nan

    nan_mask = np.isnan(processed_grid)
    if np.any(nan_mask):
        valid_coords = np.array(np.nonzero(~nan_mask)).T
        valid_values = processed_grid[~nan_mask]
        
        if valid_coords.size > 0:
            nan_coords = np.array(np.nonzero(nan_mask)).T
            interpolated_values = griddata(valid_coords, valid_values, nan_coords, method='linear')
            # nan_mask_after_linear = np.isnan(interpolated_values)
            # if np.any(nan_mask_after_linear):
            #     nearest_values = griddata(valid_coords, valid_values, nan_coords[nan_mask_after_linear], method='nearest')
            #     interpolated_values[nan_mask_after_linear] = nearest_values
            processed_grid[nan_mask] = interpolated_values

    if gaussian_kernel_size > 0:
        processed_grid = cv2.GaussianBlur(processed_grid, (gaussian_kernel_size, gaussian_kernel_size), 0)

    return processed_grid


def _save_points(
    points_xyzh: np.ndarray,
    outfile: str
):
    """
    A helper function to save point clouds in various formats.

    Args:
        points_xyzh (np.ndarray): An Nx4 array of (x, y, z, h) points.
        outfile (str): The path to the output file. Format is determined by extension.
                       Filename must contain 'xyz', 'xyh', or 'xyzh' to specify columns.
    """
    if outfile is None:
        return

    # Determine which columns to save based on filename
    if "xyzh" in outfile.lower():
        data_to_save = points_xyzh
        dtype = [('x', 'f4'), ('y', 'f4'), ('z', 'f4'), ('h', 'f4')]
    elif "xyh" in outfile.lower():
        data_to_save = points_xyzh[:, [0, 1, 3]]
        dtype = [('x', 'f4'), ('y', 'f4'), ('h', 'f4')]
    else: # Default to xyz
        data_to_save = points_xyzh[:, :3]
        dtype = [('x', 'f4'), ('y', 'f4'), ('z', 'f4')]

    # Determine file format
    if outfile.lower().endswith(".pts") or outfile.lower().endswith(".txt"):
        np.savetxt(outfile, data_to_save, fmt="%.3f")
        print(f"Saved {len(data_to_save)} points to {outfile}")
    elif outfile.lower().endswith(".ply"):
        if not PLYFILE_ENABLED:
            print("Warning: plyfile library is not installed. Cannot save .ply file.")
            return
        vertex = np.array([tuple(p) for p in data_to_save], dtype=dtype)
        el = PlyElement.describe(vertex, 'vertex')
        PlyData([el]).write(outfile)
        print(f"Saved {len(data_to_save)} points to {outfile}")
    elif outfile.lower().endswith(".las") or outfile.lower().endswith(".laz"):
        if not LASPY_ENABLED:
            print("Warning: laspy library is not installed. Cannot save .las or .laz file.")
            return
        
        header = laspy.LasHeader(point_format=3, version="1.2")
        num_dims = points_xyzh.shape[1]
        if num_dims == 4:
            header.add_extra_dim(laspy.ExtraBytesParams(name="h", type=np.float32))
        las = laspy.LasData(header)
        las.x = data_to_save[:, 0]
        las.y = data_to_save[:, 1]
        las.z = data_to_save[:, 2]
        if num_dims == 4:
            las.h = data_to_save[:, 3]
        las.write(outfile)
        print(f"Saved {len(data_to_save)} points to las file (ver 1.2)\n{outfile}")
    else:
        print(f"Warning: Unknown output file format for {outfile}. Supported formats: .pts, .txt, .ply")

# --- High-Level API Functions ---

[docs] def detect_ground( points: np.ndarray, grid_size: float = 0.5, outlier_std_dev: float = 1.0, gaussian_kernel_size: int = 5 ) -> tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]: """ Detects the ground surface and returns it as both a point cloud and a grid. Args: points (np.ndarray): The input Nx3 point cloud. grid_size (float): The resolution for the intermediate ground grid. outlier_std_dev (float): Std deviations for outlier removal. gaussian_kernel_size (int): Kernel size for smoothing. Returns: tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]: A tuple containing: - ground_model_points (np.ndarray): Mx3 array of the final ground grid points. - grid_x (np.ndarray): 2D array of X coordinates for the grid. - grid_y (np.ndarray): 2D array of Y coordinates for the grid. - grid_z (np.ndarray): 2D array of Z values for the grid. """ print(f"Creating ground grid with size {grid_size}...") grid_x, grid_y, grid_z_initial = _create_initial_grid(points, grid_size) print("Filling and smoothing ground grid...") grid_z_final = _fill_and_smooth_grid(grid_z_initial, outlier_std_dev, gaussian_kernel_size) # Prepare ground model point cloud for output ground_model_points = np.vstack([grid_x.ravel(), grid_y.ravel(), grid_z_final.ravel()]).T ground_model_points = ground_model_points[~np.isnan(ground_model_points).any(axis=1)] return ground_model_points, grid_x, grid_y, grid_z_final
[docs] def calculate_height_above_ground( points: np.ndarray, ground_grid_x: np.ndarray, ground_grid_y: np.ndarray, ground_grid_z: np.ndarray ) -> np.ndarray: """ Calculates the vertical distance of each point from a pre-computed ground grid. Args: points (np.ndarray): Nx3 array of (x, y, z) points to normalize. ground_grid_x (np.ndarray): 2D array of X coordinates for the ground model. ground_grid_y (np.ndarray): 2D array of Y coordinates for the ground model. ground_grid_z (np.ndarray): 2D array of Z values for the ground model. Returns: np.ndarray: An Nx4 array of the original points with an added height column (x, y, z, h). """ if ground_grid_z.size == 0: print("Warning: Ground grid is empty. Returning heights as original Z values.") return np.column_stack((points, points[:, 2])) # Create the set of known points from the grid for interpolation grid_points = np.vstack([ground_grid_x.ravel(), ground_grid_y.ravel()]).T grid_values = ground_grid_z.ravel() # Remove any NaN values from the grid before creating the interpolator valid_mask = ~np.isnan(grid_values) if not np.any(valid_mask): # If the entire grid is NaN, return zero height for all points return np.column_stack((points, np.zeros(points.shape[0]))) # Interpolate the Z value of the ground at the XY location of each point interpolated_z = griddata(grid_points[valid_mask], grid_values[valid_mask], points[:, :2], method='linear') # Fallback for points outside the convex hull of the ground grid nan_heights_mask = np.isnan(interpolated_z) if np.any(nan_heights_mask): nearest_interpolated_z = griddata(grid_points[valid_mask], grid_values[valid_mask], points[nan_heights_mask, :2], method='nearest') interpolated_z[nan_heights_mask] = nearest_interpolated_z heights = points[:, 2] - interpolated_z return np.column_stack((points, heights))
[docs] def classify_ground_and_vegetation( points: np.ndarray, height_threshold: float = 0.5, out_gd_file: str = None, out_veg_file: str = None, **ground_detection_params ) -> tuple[np.ndarray, np.ndarray]: """ Performs the full workflow: detect ground, normalize heights, and classify. Args: points (np.ndarray): The input Nx3 point cloud. height_threshold (float): Height to separate vegetation from ground. out_gd_file (str, optional): Path to save the ground points. out_veg_file (str, optional): Path to save the vegetation points. **ground_detection_params: Keyword arguments passed to detect_ground(), e.g., grid_size=0.5. Returns: tuple[np.ndarray, np.ndarray]: A tuple containing two numpy arrays: - The first element contains the points classified as ground (Nx3). - The second element contains the points classified as vegetation (Mx3). """ _, grid_x, grid_y, grid_z = detect_ground(points, **ground_detection_params) normalized_points = calculate_height_above_ground(points, grid_x, grid_y, grid_z) veg_mask = normalized_points[:, 3] > height_threshold ground_mask = ~veg_mask print(f"Found {np.sum(ground_mask)} ground points and {np.sum(veg_mask)} vegetation points.") # Save files if paths are provided _save_points(normalized_points[ground_mask], out_gd_file) _save_points(normalized_points[veg_mask], out_veg_file) return points[ground_mask], points[veg_mask]
# --- Visualization Functions ---
[docs] def plot_ground_model(grid_x: np.ndarray, grid_y: np.ndarray, grid_z: np.ndarray, title: str = "Ground Model (2D Heatmap)"): """ Creates a 2D heatmap directly from the generated ground grid. Args: grid_x (np.ndarray): 2D array of X coordinates from detect_ground. grid_y (np.ndarray): 2D array of Y coordinates from detect_ground. grid_z (np.ndarray): 2D array of Z values from detect_ground. title (str): The title for the plot. """ if not PLOTTING_ENABLED: print("Warning: Matplotlib is not installed. Skipping plot.") return if grid_z.size == 0: print("Warning: Not enough ground model data to create a plot.") return # Plotting the 2D heatmap fig, ax = plt.subplots(figsize=(10, 8)) # Get the extent from the grid coordinates extent = [grid_x.min(), grid_x.max(), grid_y.min(), grid_y.max()] im = ax.imshow(grid_z, extent=extent, origin='lower', cmap='terrain') # , interpolation='bilinear' fig.colorbar(im, ax=ax, label='Z Coordinate (Height)') ax.set_xlabel('X Coordinate') ax.set_ylabel('Y Coordinate') ax.set_title(title) ax.set_aspect('equal', adjustable='box') print("Displaying 2D ground model plot...") plt.show() plt.close()
# --- Complete Workflow ---
[docs] def run_ground_detection( infile: str, out_gd_file: str = None, out_veg_file: str = None, grid_size: float = 0.5, height_threshold: float = 0.5, **ground_detection_params ) -> list[str,str]: """ Runs the full ground detection workflow and saves the results. Args: infile (str): Path to the input point cloud file (.pts or .txt). out_gd_file (str, optional): Path to save ground points. out_veg_file (str, optional): Path to save vegetation points. grid_size (float): Size of the grid cells for DTM generation. height_threshold (float): Height threshold to classify vegetation. ground_detection_params include: - outlier_std_dev (float): Std deviations for outlier removal. - gaussian_kernel_size (int): Kernel size for smoothing. Returns: list[str,str]: A list containing: - Path to the saved ground points file. - Path to the saved vegetation points file. """ print(f"Loading points from {infile}...") if ".pts" in infile.lower() or ".txt" in infile.lower(): # Load points from .pts or .txt file points = np.loadtxt(infile) elif ".ply" in infile.lower(): # Load points from .ply file if not PLYFILE_ENABLED: raise ImportError("plyfile library is not installed. Cannot load .ply files.") ply_data = PlyData.read(infile) points = np.array([list(vertex) for vertex in ply_data['vertex'].data]) # we only need the first three columns (x, y, z) points = points[:, :3] elif ".las" in infile.lower() or ".laz" in infile.lower(): # Load points from .las or .laz file if not LASPY_ENABLED: raise ImportError("laspy library is not installed. Cannot load .las or .laz files.") las = laspy.read(infile) points = np.vstack((las.x, las.y, las.z)).T else: raise ValueError("Unsupported file format. Please use .pts, .txt, .las, or .ply files.") # Run the full workflow classify_ground_and_vegetation( points, height_threshold=height_threshold, out_gd_file=out_gd_file, out_veg_file=out_veg_file, grid_size=grid_size, **ground_detection_params ) return out_gd_file, out_veg_file
# --- Main CLI --- def main(): """Main function to run ground detection from the command line.""" parser = argparse.ArgumentParser(description="Detects and classifies ground points from a point cloud.") parser.add_argument("infile", help="Path to the input point cloud file (.pts or .txt).") parser.add_argument("--out_gd", help="Optional: Path to save ground points.") parser.add_argument("--out_veg", help="Optional: Path to save vegetation points.") parser.add_argument("--grid_size", type=float, default=0.5, help="Size of the grid cells for DTM generation.") parser.add_argument("--height_threshold", type=float, default=0.5, help="Height threshold to classify vegetation.") parser.add_argument("--plot", action="store_true", help="Display a 2D plot of the generated ground model.") args = parser.parse_args() try: print(f"Loading points from {args.infile}...") points = np.loadtxt(args.infile) except Exception as e: print(f"Error loading file: {e}") return # --- Run the full workflow --- ground_points, veg_points = classify_ground_and_vegetation( points, height_threshold=args.height_threshold, out_gd_file=args.out_gd, out_veg_file=args.out_veg, grid_size=args.grid_size ) # --- Optional Plotting --- if args.plot: # We need to re-run detect_ground to get the grid for plotting _, grid_x, grid_y, grid_z = detect_ground(points, grid_size=args.grid_size) plot_ground_model(grid_x, grid_y, grid_z) print("Processing complete.") if __name__ == "__main__": main()