Source code for gigl.distributed.utils.degree

"""
Utility functions for computing node degrees in distributed graph settings.

This module provides functions to compute node out-degrees from graph partitions
and aggregate them across distributed machines. Degrees are computed from the
CSR (Compressed Sparse Row) topology stored in GraphLearn-Torch Graph objects.

Note: Degree tensors are not moved to shared memory and may be duplicated across
processes on the same machine.

Requirements
============

torch.distributed must be initialized before calling these functions.

Usage
=====

Access dataset.degree_tensor to lazily compute and cache the degree tensor.

Over-counting correction is handled automatically in _all_reduce_degrees by
detecting how many processes share the same machine (and thus the same data).
"""

from collections import Counter
from typing import Union

import torch
from graphlearn_torch.data import Graph
from torch_geometric.typing import EdgeType

from gigl.common.logger import Logger
from gigl.distributed.utils.device import get_device_from_process_group
from gigl.distributed.utils.networking import get_internal_ip_from_all_ranks

[docs] logger = Logger()
[docs] def compute_and_broadcast_degree_tensor( graph: Union[Graph, dict[EdgeType, Graph]], ) -> Union[torch.Tensor, dict[EdgeType, torch.Tensor]]: """ Compute node degrees from a graph and aggregate across all machines. Computes degrees from the CSR row pointers (indptr) and performs all-reduce to aggregate across ranks. Over-counting correction (for processes sharing the same data) is handled automatically by detecting the distributed topology. Args: graph: A Graph (homogeneous) or dict[EdgeType, Graph] (heterogeneous). Returns: Union[torch.Tensor, dict[EdgeType, torch.Tensor]]: The aggregated degree tensors. - For homogeneous graphs: A tensor of shape [num_nodes]. - For heterogeneous graphs: A dict mapping EdgeType to degree tensors. Raises: RuntimeError: If torch.distributed is not initialized. ValueError: If topology is unavailable. """ if not torch.distributed.is_initialized(): raise RuntimeError( "compute_and_broadcast_degree_tensor requires torch.distributed to be initialized." ) # Compute local degrees from graph topology if isinstance(graph, Graph): topo = graph.topo if topo is None or topo.indptr is None: raise ValueError("Topology/indptr not available for graph.") local_degrees: Union[ torch.Tensor, dict[EdgeType, torch.Tensor] ] = _compute_degrees_from_indptr(topo.indptr) else: local_dict: dict[EdgeType, torch.Tensor] = {} for edge_type, edge_graph in graph.items(): topo = edge_graph.topo if topo is None or topo.indptr is None: logger.warning( f"Topology/indptr not available for edge type {edge_type}, using empty tensor." ) local_dict[edge_type] = torch.empty(0, dtype=torch.int16) else: local_dict[edge_type] = _compute_degrees_from_indptr(topo.indptr) local_degrees = local_dict # All-reduce across ranks (over-counting correction handled internally) result = _all_reduce_degrees(local_degrees) # Log results if isinstance(result, torch.Tensor): if result.numel() > 0: logger.info( f"{result.size(0)} nodes, max={result.max().item()}, min={result.min().item()}" ) else: logger.info("Graph contained 0 nodes when computing degrees") else: for edge_type, degrees in result.items(): if degrees.numel() > 0: logger.info( f"{edge_type}: {degrees.size(0)} nodes, max={degrees.max().item()}, min={degrees.min().item()}" ) else: logger.info( f"Graph contained 0 nodes for edge type {edge_type} when computing degrees" ) return result
def _pad_to_size(tensor: torch.Tensor, target_size: int) -> torch.Tensor: """Pad tensor with zeros to reach target_size.""" if tensor.size(0) >= target_size: return tensor padding = torch.zeros( target_size - tensor.size(0), dtype=tensor.dtype, device=tensor.device, ) return torch.cat([tensor, padding]) def _clamp_to_int16(tensor: torch.Tensor) -> torch.Tensor: """Clamp tensor values to int16 max and convert dtype.""" max_int16 = torch.iinfo(torch.int16).max return tensor.clamp(max=max_int16).to(torch.int16) def _compute_degrees_from_indptr(indptr: torch.Tensor) -> torch.Tensor: """Compute degrees from CSR row pointers: degree[i] = indptr[i+1] - indptr[i].""" return (indptr[1:] - indptr[:-1]).to(torch.int16) def _all_reduce_degrees( local_degrees: Union[torch.Tensor, dict[EdgeType, torch.Tensor]], ) -> Union[torch.Tensor, dict[EdgeType, torch.Tensor]]: """All-reduce degree tensors across ranks, handling both homogeneous and heterogeneous cases. For heterogeneous graphs, iterates over the edge types in local_degrees. All partitions are expected to have entries for all edge types (even if some have empty tensors). Moves tensors to GPU for the all-reduce if using NCCL backend (which requires CUDA), otherwise keeps tensors on CPU (for Gloo backend). Over-counting correction: In distributed training, multiple processes on the same machine often share the same graph partition data (via shared memory). When we all-reduce degrees, each process contributes its "local" degrees - but if 4 processes on one machine all read the same partition, that partition's degrees get summed 4 times instead of 1. Example: Machine A has 2 processes sharing partition with degrees [3, 5, 2]. Machine B has 2 processes sharing partition with degrees [1, 4, 6]. Without correction: all-reduce sums = [3+3+1+1, 5+5+4+4, 2+2+6+6] = [8, 18, 16] (wrong!) With correction: divide by local_world_size (2 per machine) = [4, 9, 8] (correct: [3+1, 5+4, 2+6]) This function detects how many processes share the same machine by comparing IP addresses, then divides by that count to correct the over-counting. Args: local_degrees: Either a single tensor (homogeneous) or dict mapping EdgeType to tensors (heterogeneous). For heterogeneous graphs, all partitions must have entries for all edge types. Returns: Aggregated degree tensors in the same format as input. Raises: RuntimeError: If torch.distributed is not initialized. """ if not torch.distributed.is_initialized(): raise RuntimeError( "_all_reduce_degrees requires torch.distributed to be initialized." ) # Compute local_world_size: number of processes on the same machine sharing data all_ips = get_internal_ip_from_all_ranks() my_rank = torch.distributed.get_rank() my_ip = all_ips[my_rank] local_world_size = Counter(all_ips)[my_ip] # NCCL backend requires CUDA tensors; Gloo works with CPU device = get_device_from_process_group() def reduce_tensor(tensor: torch.Tensor) -> torch.Tensor: """All-reduce a single tensor with size sync and over-counting correction.""" # Synchronize max size across all ranks local_size = torch.tensor([tensor.size(0)], dtype=torch.long, device=device) torch.distributed.all_reduce(local_size, op=torch.distributed.ReduceOp.MAX) max_size = int(local_size.item()) # Pad, convert to int64 (all_reduce doesn't support int16), move to device padded = _pad_to_size(tensor, max_size).to(torch.int64).to(device) torch.distributed.all_reduce(padded, op=torch.distributed.ReduceOp.SUM) # Correct for over-counting, move back to CPU, and clamp to int16 # TODO (mkolodner-sc): Potentially want to paramaterize this in the future if we want degrees higher than the int16 max. return _clamp_to_int16((padded // local_world_size).cpu()) # Homogeneous case if isinstance(local_degrees, torch.Tensor): return reduce_tensor(local_degrees) # Heterogeneous case: all-reduce each edge type # Sort edge types for deterministic ordering across ranks result: dict[EdgeType, torch.Tensor] = {} for edge_type in sorted(local_degrees.keys()): result[edge_type] = reduce_tensor(local_degrees[edge_type]) return result