gigl.distributed.distributed_neighborloader#

Attributes#

Classes#

DistNeighborLoader

Base class for GiGL distributed loaders.

Functions#

flush()

Module Contents#

class gigl.distributed.distributed_neighborloader.DistNeighborLoader(dataset, num_neighbors, input_nodes=None, num_workers=1, batch_size=1, context=None, local_process_rank=None, local_process_world_size=None, pin_memory_device=None, worker_concurrency=4, channel_size='4GB', prefetch_size=None, process_start_gap_seconds=60.0, max_concurrent_producer_inits=None, num_cpu_threads=None, shuffle=False, drop_last=False, sampler_options=None)[source]#

Bases: gigl.distributed.base_dist_loader.BaseDistLoader

Base class for GiGL distributed loaders.

Consolidates shared initialization logic from DistNeighborLoader and DistABLPLoader. Subclasses GLT’s DistLoader but does NOT call its __init__ — instead, it replicates the relevant attribute-setting logic to allow configurable producer classes.

Subclasses should: 1. Call resolve_runtime() to get runtime context. 2. Determine mode (colocated vs graph store). 3. Call create_sampling_config() to build the SamplingConfig. 4. For colocated: call create_colocated_channel() and construct the

DistSamplingProducer (or subclass), then pass the producer as producer.

  1. For graph store: pass the RPC function (e.g. DistServer.create_sampling_producer) as producer.

  2. Call super().__init__() with the prepared data.

Parameters:
  • dataset (Union[gigl.distributed.dist_dataset.DistDataset, gigl.distributed.graph_store.remote_dist_dataset.RemoteDistDataset]) – DistDataset (colocated) or RemoteDistDataset (graph store).

  • sampler_input – Prepared by the subclass. Single input for colocated mode, list (one per server) for graph store mode.

  • dataset_schema – Contains edge types, feature info, edge dir, etc.

  • worker_optionsMpDistSamplingWorkerOptions (colocated) or RemoteDistSamplingWorkerOptions (graph store).

  • sampling_config – Configuration for sampling (created via create_sampling_config).

  • device – Target device for sampled results.

  • runtime – Resolved distributed runtime information.

  • producer – Either a pre-constructed DistSamplingProducer (colocated mode) or a callable to dispatch on the DistServer (graph store mode).

  • sampler_options (Optional[gigl.distributed.sampler_options.SamplerOptions]) – Controls which sampler class is instantiated.

  • process_start_gap_seconds (float) – Delay between each process for staggered colocated init. In graph store mode, this is the delay between each batch of concurrent producer initializations.

  • max_concurrent_producer_inits (Optional[int]) – Maximum number of leader ranks that may dispatch create_producer_fn RPCs concurrently in graph store mode. Leaders are grouped into batches of this size; each batch sleeps batch_index * process_start_gap_seconds before dispatching. Only applies to graph store mode. Defaults to None (no staggering).

  • num_neighbors (Union[list[int], dict[torch_geometric.typing.EdgeType, list[int]]])

  • input_nodes (Optional[Union[torch.Tensor, Tuple[gigl.src.common.types.graph_data.NodeType, torch.Tensor], collections.abc.Mapping[int, torch.Tensor], Tuple[gigl.src.common.types.graph_data.NodeType, collections.abc.Mapping[int, torch.Tensor]]]])

  • num_workers (int)

  • batch_size (int)

  • context (Optional[gigl.distributed.dist_context.DistributedContext])

  • local_process_rank (Optional[int])

  • local_process_world_size (Optional[int])

  • pin_memory_device (Optional[torch.device])

  • worker_concurrency (int)

  • channel_size (str)

  • prefetch_size (Optional[int])

  • num_cpu_threads (Optional[int])

  • shuffle (bool)

  • drop_last (bool)

Distributed Neighbor Loader. Takes in some input nodes and samples neighbors from the dataset. This loader should be used if you do not have any specially sampling needs, e.g. you need to generate training examples for Anchor Based Link Prediction (ABLP) tasks. Though this loader is useful for generating random negative examples for ABLP training.

Note: We try to adhere to pyg dataloader api as much as possible. See the following for reference: https://pytorch-geometric.readthedocs.io/en/2.5.2/_modules/torch_geometric/loader/node_loader.html#NodeLoader https://pytorch-geometric.readthedocs.io/en/2.5.2/_modules/torch_geometric/distributed/dist_neighbor_loader.html#DistNeighborLoader

Parameters:
  • dataset (DistDataset | RemoteDistDataset) – The dataset to sample from.

  • RemoteDistDataset (If this is a)

  • mode. (then we assumed to be in "Graph Store")

  • num_neighbors (list[int] or dict[Tuple[str, str, str], list[int]]) – The number of neighbors to sample for each node in each iteration. If an entry is set to -1, all neighbors will be included. In heterogeneous graphs, may also take in a dictionary denoting the amount of neighbors to sample for each individual edge type. If KHopNeighborSamplerOptions is also provided, they must match.

  • context (deprecated - will be removed soon) (DistributedContext) – Distributed context information of the current process.

  • local_process_rank (deprecated - will be removed soon) (int) – Required if context provided. The local rank of the current process within a node.

  • local_process_world_size (deprecated - will be removed soon)(int) – Required if context provided. The total number of processes within a node.

  • input_nodes (Tensor | Tuple[NodeType, Tensor] | dict[int, Tensor] | Tuple[NodeType, dict[int, Tensor]]) – The nodes to start sampling from. It is of type torch.LongTensor for homogeneous graphs. If set to None for homogeneous settings, all nodes will be considered. In heterogeneous graphs, this flag must be passed in as a tuple that holds the node type and node indices. (default: None) For Graph Store mode, this must be a tuple of (NodeType, dict[int, Tensor]) or dict[int, Tensor]. Where each Tensor in the dict is the node ids to sample from, by server. e.g. {0: [10, 20], 1: [30, 40]} means sample from nodes 10 and 20 on server 0, and nodes 30 and 40 on server 1. If a Graph Store input (e.g. list[Tensor]) is provided to colocated mode, or colocated input (e.g. Tensor) is provided to Graph Store mode, then an error will be raised.

  • num_workers (int) – How many workers to use (subprocesses to spwan) for distributed neighbor sampling of the current process. (default: 1).

  • batch_size (int, optional) – how many samples per batch to load (default: 1).

  • pin_memory_device (str, optional) – The target device that the sampled results should be copied to. If set to None, the device is inferred based off of (got by gigl.distributed.utils.device.get_available_device). Which uses the local_process_rank and torch.cuda.device_count() to assign the device. If cuda is not available, the cpu device will be used. (default: None).

  • worker_concurrency (int) – The max sampling concurrency for each sampling worker. Load testing has showed that setting worker_concurrency to 4 yields the best performance for sampling. Although, you may whish to explore higher/lower settings when performance tuning. (default: 4).

  • channel_size (int or str) – The shared-memory buffer size (bytes) allocated for the channel. Can be modified for performance tuning; a good starting point is: num_workers * 64MB (default: “4GB”).

  • prefetch_size (Optional[int]) – Max number of sampled messages to prefetch on the client side, per server. Only applies to Graph Store mode (remote workers). Lower values reduce server-side RPC thread contention when multiple loaders are active concurrently. (default: None). Only applicable in Graph Store mode. If supplied and not it Graph Store mode, an error will be raised.

  • process_start_gap_seconds (float) – Delay between each process for initializing neighbor loader. In colocated mode, each process sleeps local_rank * process_start_gap_seconds before initializing. In graph store mode, leader ranks are grouped into batches of max_concurrent_producer_inits and each batch sleeps batch_index * process_start_gap_seconds before dispatching RPCs.

  • max_concurrent_producer_inits (int) – Maximum number of leader ranks that may dispatch create-producer RPCs concurrently in graph store mode. Leaders are grouped into batches of this size; each batch is staggered by process_start_gap_seconds. Only applies to graph store mode. Defaults to None (no staggering).

  • num_cpu_threads (Optional[int]) – Number of cpu threads PyTorch should use for CPU training/inference neighbor loading; on top of the per process parallelism. Defaults to 2 if set to None when using cpu training/inference.

  • shuffle (bool) – Whether to shuffle the input nodes. (default: False).

  • drop_last (bool) – Whether to drop the last incomplete batch. (default: False).

  • sampler_options (Optional[SamplerOptions]) – Controls which sampler class is instantiated. Pass KHopNeighborSamplerOptions to use the built-in sampler, or CustomSamplerOptions to dynamically import a custom sampler class. If None, defaults to KHopNeighborSamplerOptions(num_neighbors).

gigl.distributed.distributed_neighborloader.flush()[source]#
gigl.distributed.distributed_neighborloader.logger[source]#