gigl.distributed.distributed_neighborloader#
Attributes#
Classes#
Base class for GiGL distributed loaders. |
Functions#
|
Module Contents#
- class gigl.distributed.distributed_neighborloader.DistNeighborLoader(dataset, num_neighbors, input_nodes=None, num_workers=1, batch_size=1, context=None, local_process_rank=None, local_process_world_size=None, pin_memory_device=None, worker_concurrency=4, channel_size='4GB', prefetch_size=None, process_start_gap_seconds=60.0, num_cpu_threads=None, shuffle=False, drop_last=False)[source]#
Bases:
gigl.distributed.base_dist_loader.BaseDistLoaderBase class for GiGL distributed loaders.
Consolidates shared initialization logic from DistNeighborLoader and DistABLPLoader. Subclasses GLT’s DistLoader but does NOT call its
__init__— instead, it replicates the relevant attribute-setting logic to allow configurable producer classes.Subclasses should: 1. Call
resolve_runtime()to get runtime context. 2. Determine mode (colocated vs graph store). 3. Callcreate_sampling_config()to build the SamplingConfig. 4. For colocated: callcreate_colocated_channel()and construct theDistMpSamplingProducer(or subclass), then pass the producer assampler.For graph store: pass the RPC function (e.g.
DistServer.create_sampling_producer) assampler.Call
super().__init__()with the prepared data.
- Parameters:
dataset (Union[gigl.distributed.dist_dataset.DistDataset, gigl.distributed.graph_store.remote_dist_dataset.RemoteDistDataset]) –
DistDataset(colocated) orRemoteDistDataset(graph store).sampler_input – Prepared by the subclass. Single input for colocated mode, list (one per server) for graph store mode.
dataset_schema – Contains edge types, feature info, edge dir, etc.
worker_options –
MpDistSamplingWorkerOptions(colocated) orRemoteDistSamplingWorkerOptions(graph store).sampling_config – Configuration for the sampler (created via
create_sampling_config).device – Target device for sampled results.
runtime – Resolved distributed runtime information.
sampler – Either a pre-constructed
DistMpSamplingProducer(colocated mode) or a callable to dispatch on theDistServer(graph store mode).process_start_gap_seconds (float) – Delay between each process for staggered colocated init.
num_neighbors (Union[list[int], dict[torch_geometric.typing.EdgeType, list[int]]])
input_nodes (Optional[Union[torch.Tensor, Tuple[gigl.src.common.types.graph_data.NodeType, torch.Tensor], collections.abc.Mapping[int, torch.Tensor], Tuple[gigl.src.common.types.graph_data.NodeType, collections.abc.Mapping[int, torch.Tensor]]]])
num_workers (int)
batch_size (int)
context (Optional[gigl.distributed.dist_context.DistributedContext])
local_process_rank (Optional[int])
local_process_world_size (Optional[int])
pin_memory_device (Optional[torch.device])
worker_concurrency (int)
channel_size (str)
prefetch_size (Optional[int])
num_cpu_threads (Optional[int])
shuffle (bool)
drop_last (bool)
Distributed Neighbor Loader. Takes in some input nodes and samples neighbors from the dataset. This loader should be used if you do not have any specially sampling needs, e.g. you need to generate training examples for Anchor Based Link Prediction (ABLP) tasks. Though this loader is useful for generating random negative examples for ABLP training.
Note: We try to adhere to pyg dataloader api as much as possible. See the following for reference: https://pytorch-geometric.readthedocs.io/en/2.5.2/_modules/torch_geometric/loader/node_loader.html#NodeLoader https://pytorch-geometric.readthedocs.io/en/2.5.2/_modules/torch_geometric/distributed/dist_neighbor_loader.html#DistNeighborLoader
- Parameters:
dataset (DistDataset | RemoteDistDataset) – The dataset to sample from.
RemoteDistDataset (If this is a)
mode. (then we assumed to be in "Graph Store")
num_neighbors (list[int] or dict[Tuple[str, str, str], list[int]]) – The number of neighbors to sample for each node in each iteration. If an entry is set to -1, all neighbors will be included. In heterogeneous graphs, may also take in a dictionary denoting the amount of neighbors to sample for each individual edge type.
context (deprecated - will be removed soon) (DistributedContext) – Distributed context information of the current process.
local_process_rank (deprecated - will be removed soon) (int) – Required if context provided. The local rank of the current process within a node.
local_process_world_size (deprecated - will be removed soon)(int) – Required if context provided. The total number of processes within a node.
input_nodes (Tensor | Tuple[NodeType, Tensor] | dict[int, Tensor] | Tuple[NodeType, dict[int, Tensor]]) – The nodes to start sampling from. It is of type torch.LongTensor for homogeneous graphs. If set to None for homogeneous settings, all nodes will be considered. In heterogeneous graphs, this flag must be passed in as a tuple that holds the node type and node indices. (default: None) For Graph Store mode, this must be a tuple of (NodeType, dict[int, Tensor]) or dict[int, Tensor]. Where each Tensor in the dict is the node ids to sample from, by server. e.g. {0: [10, 20], 1: [30, 40]} means sample from nodes 10 and 20 on server 0, and nodes 30 and 40 on server 1. If a Graph Store input (e.g. list[Tensor]) is provided to colocated mode, or colocated input (e.g. Tensor) is provided to Graph Store mode, then an error will be raised.
num_workers (int) – How many workers to use (subprocesses to spwan) for distributed neighbor sampling of the current process. (default:
1).batch_size (int, optional) – how many samples per batch to load (default:
1).pin_memory_device (str, optional) – The target device that the sampled results should be copied to. If set to
None, the device is inferred based off of (got bygigl.distributed.utils.device.get_available_device). Which uses the local_process_rank and torch.cuda.device_count() to assign the device. If cuda is not available, the cpu device will be used. (default:None).worker_concurrency (int) – The max sampling concurrency for each sampling worker. Load testing has showed that setting worker_concurrency to 4 yields the best performance for sampling. Although, you may whish to explore higher/lower settings when performance tuning. (default: 4).
channel_size (int or str) – The shared-memory buffer size (bytes) allocated for the channel. Can be modified for performance tuning; a good starting point is:
num_workers * 64MB(default: “4GB”).prefetch_size (Optional[int]) – Max number of sampled messages to prefetch on the client side, per server. Only applies to Graph Store mode (remote workers). Lower values reduce server-side RPC thread contention when multiple loaders are active concurrently. (default:
None). Only applicable in Graph Store mode. If supplied and not it Graph Store mode, an error will be raised.process_start_gap_seconds (float) – Delay between each process for initializing neighbor loader. At large scales, it is recommended to set this value to be between 60 and 120 seconds – otherwise multiple processes may attempt to initialize dataloaders at overlapping times, which can cause CPU memory OOM.
num_cpu_threads (Optional[int]) – Number of cpu threads PyTorch should use for CPU training/inference neighbor loading; on top of the per process parallelism. Defaults to 2 if set to None when using cpu training/inference.
shuffle (bool) – Whether to shuffle the input nodes. (default:
False).drop_last (bool) – Whether to drop the last incomplete batch. (default:
False).