gigl.distributed.distributed_neighborloader#
Attributes#
Classes#
A generic data loader base that performs distributed sampling, which |
|
A generic data loader base that performs distributed sampling, which |
Module Contents#
- class gigl.distributed.distributed_neighborloader.DistABLPLoader(dataset, num_neighbors, context, local_process_rank, local_process_world_size, input_nodes=None, num_workers=1, batch_size=1, pin_memory_device=None, worker_concurrency=4, channel_size='4GB', process_start_gap_seconds=60.0, num_cpu_threads=None, shuffle=False, drop_last=False)[source]#
Bases:
DistNeighborLoader
A generic data loader base that performs distributed sampling, which allows mini-batch training of GNNs on large-scale graphs when full-batch training is not feasible.
This loader supports launching a collocated sampling worker on the current process, or launching separate sampling workers on the spawned subprocesses or remote server nodes. When using the separate sampling mode, a worker group including the information of separate sampling workers should be provided.
Note that the separate sampling mode supports asynchronous and concurrent sampling on each separate worker, which will achieve better performance and is recommended to use. If you want to use a collocated sampling worker, all sampling for each seed batch will be blocking and synchronous.
When launching a collocated sampling worker or some multiprocessing sampling workers (on spwaned subprocesses), the distribution mode must be non-server and only contains a group of parallel worker processes, this means that the graph and feature store should be partitioned among all those parallel worker processes and managed by them, sampling and training tasks will run on each worker process at the same time.
Otherwise, when launching some remote sampling workers, the distribution mode must be a server-client framework, which contains a group of server workers and a group of client workers, the graph and feature store will be partitioned and managed by all server workers. All client workers are responsible for training tasks and launch some workers on remote servers to perform sampling tasks, the sampled results will be consumed by client workers with a remote message channel.
- Parameters:
data (DistDataset, optional) – The
DistDataset
object of a partition of graph data and feature data, along with distributed patition books. The input dataset must be provided in non-server distribution mode.input_data (NodeSamplerInput or EdgeSamplerInput or RemoteSamplerInput) – The input data for which neighbors or subgraphs are sampled to create mini-batches. In heterogeneous graphs, needs to be passed as a tuple that holds the node type and node indices.
sampling_config (SamplingConfig) – The Configuration info for sampling.
to_device (torch.device, optional) – The target device that the sampled results should be copied to. If set to
None
, the current cuda device (got bytorch.cuda.current_device
) will be used if available, otherwise, the cpu device will be used. (default:None
).worker_options (optional) – The options for launching sampling workers. (1) If set to
None
or provided with aCollocatedDistWorkerOptions
object, a single collocated sampler will be launched on the current process, while the separate sampling mode will be disabled . (2) If provided with aMpDistWorkerOptions
object, the sampling workers will be launched on spawned subprocesses, and a share-memory based channel will be created for sample message passing from multiprocessing workers to the current loader. (3) If provided with aRemoteDistWorkerOptions
object, the sampling workers will be launched on remote sampling server nodes, and a remote channel will be created for cross-machine message passing. (default:None
).dataset (gigl.distributed.dist_link_prediction_dataset.DistLinkPredictionDataset)
num_neighbors (Union[List[int], Dict[torch_geometric.typing.EdgeType, List[int]]])
context (gigl.distributed.DistributedContext)
local_process_rank (int)
local_process_world_size (int)
input_nodes (Optional[Union[torch.Tensor, Tuple[gigl.src.common.types.graph_data.NodeType, torch.Tensor]]])
num_workers (int)
batch_size (int)
pin_memory_device (Optional[torch.device])
worker_concurrency (int)
channel_size (str)
process_start_gap_seconds (float)
num_cpu_threads (Optional[int])
shuffle (bool)
drop_last (bool)
Neighbor loader for Anchor Based Link Prediction (ABLP) tasks.
Note that for this class, the dataset must always be heterogeneous, as we need separate edge types for positive and negative labels.
If you provide input_nodes for homogeneous input (only as a Tensor), Then we will attempt to infer the positive and optional negative labels from the dataset. In this case, the output of the loader will be a torch_geometric.data.Data object. Otherwise, the output will be a torch_geometric.data.HeteroData object.
- Args:
dataset (DistLinkPredictionDataset): The dataset to sample from. num_neighbors (List[int] or Dict[Tuple[str, str, str], List[int]]):
The number of neighbors to sample for each node in each iteration. If an entry is set to -1, all neighbors will be included. In heterogeneous graphs, may also take in a dictionary denoting the amount of neighbors to sample for each individual edge type.
context (DistributedContext): Distributed context information of the current process. local_process_rank (int): The local rank of the current process within a node. local_process_world_size (int): The total number of processes within a node. input_nodes (torch.Tensor or Tuple[str, torch.Tensor]): The
indices of seed nodes to start sampling from. It is of type torch.LongTensor for homogeneous graphs. If set to None for homogeneous settings, all nodes will be considered. In heterogeneous graphs, this flag must be passed in as a tuple that holds the node type and node indices. (default: None)
- num_workers (int): How many workers to use (subprocesses to spwan) for
distributed neighbor sampling of the current process. (default:
1
).- batch_size (int, optional): how many samples per batch to load
(default:
1
).- pin_memory_device (str, optional): The target device that the sampled
results should be copied to. If set to
None
, the device is inferred based off of (got bygigl.distributed.utils.device.get_available_device
). Which uses the local_process_rank and torch.cuda.device_count() to assign the device. If cuda is not available, the cpu device will be used. (default:None
).- worker_concurrency (int): The max sampling concurrency for each sampling
worker. Load testing has showed that setting worker_concurrency to 4 yields the best performance for sampling. Although, you may whish to explore higher/lower settings when performance tuning. (default: 4).
- channel_size (int or str): The shared-memory buffer size (bytes) allocated
for the channel. Can be modified for performance tuning; a good starting point is:
num_workers * 64MB
(default: “4GB”).- process_start_gap_seconds (float): Delay between each process for initializing neighbor loader. At large scales,
it is recommended to set this value to be between 60 and 120 seconds – otherwise multiple processes may attempt to initialize dataloaders at overlapping times, which can cause CPU memory OOM.
- num_cpu_threads (Optional[int]): Number of cpu threads PyTorch should use for CPU training/inference
neighbor loading; on top of the per process parallelism. Defaults to 2 if set to None when using cpu training/inference.
shuffle (bool): Whether to shuffle the input nodes. (default:
False
). drop_last (bool): Whether to drop the last incomplete batch. (default:False
).
- class gigl.distributed.distributed_neighborloader.DistNeighborLoader(dataset, num_neighbors, context, local_process_rank, local_process_world_size, input_nodes=None, num_workers=1, batch_size=1, pin_memory_device=None, worker_concurrency=4, channel_size='4GB', process_start_gap_seconds=60.0, num_cpu_threads=None, shuffle=False, drop_last=False, _main_inference_port=DEFAULT_MASTER_INFERENCE_PORT, _main_sampling_port=DEFAULT_MASTER_SAMPLING_PORT)[source]#
Bases:
graphlearn_torch.distributed.DistLoader
A generic data loader base that performs distributed sampling, which allows mini-batch training of GNNs on large-scale graphs when full-batch training is not feasible.
This loader supports launching a collocated sampling worker on the current process, or launching separate sampling workers on the spawned subprocesses or remote server nodes. When using the separate sampling mode, a worker group including the information of separate sampling workers should be provided.
Note that the separate sampling mode supports asynchronous and concurrent sampling on each separate worker, which will achieve better performance and is recommended to use. If you want to use a collocated sampling worker, all sampling for each seed batch will be blocking and synchronous.
When launching a collocated sampling worker or some multiprocessing sampling workers (on spwaned subprocesses), the distribution mode must be non-server and only contains a group of parallel worker processes, this means that the graph and feature store should be partitioned among all those parallel worker processes and managed by them, sampling and training tasks will run on each worker process at the same time.
Otherwise, when launching some remote sampling workers, the distribution mode must be a server-client framework, which contains a group of server workers and a group of client workers, the graph and feature store will be partitioned and managed by all server workers. All client workers are responsible for training tasks and launch some workers on remote servers to perform sampling tasks, the sampled results will be consumed by client workers with a remote message channel.
- Parameters:
data (DistDataset, optional) – The
DistDataset
object of a partition of graph data and feature data, along with distributed patition books. The input dataset must be provided in non-server distribution mode.input_data (NodeSamplerInput or EdgeSamplerInput or RemoteSamplerInput) – The input data for which neighbors or subgraphs are sampled to create mini-batches. In heterogeneous graphs, needs to be passed as a tuple that holds the node type and node indices.
sampling_config (SamplingConfig) – The Configuration info for sampling.
to_device (torch.device, optional) – The target device that the sampled results should be copied to. If set to
None
, the current cuda device (got bytorch.cuda.current_device
) will be used if available, otherwise, the cpu device will be used. (default:None
).worker_options (optional) – The options for launching sampling workers. (1) If set to
None
or provided with aCollocatedDistWorkerOptions
object, a single collocated sampler will be launched on the current process, while the separate sampling mode will be disabled . (2) If provided with aMpDistWorkerOptions
object, the sampling workers will be launched on spawned subprocesses, and a share-memory based channel will be created for sample message passing from multiprocessing workers to the current loader. (3) If provided with aRemoteDistWorkerOptions
object, the sampling workers will be launched on remote sampling server nodes, and a remote channel will be created for cross-machine message passing. (default:None
).dataset (gigl.distributed.dist_link_prediction_dataset.DistLinkPredictionDataset)
num_neighbors (Union[List[int], Dict[torch_geometric.typing.EdgeType, List[int]]])
context (gigl.distributed.DistributedContext)
local_process_rank (int)
local_process_world_size (int)
input_nodes (Optional[Union[torch.Tensor, Tuple[gigl.src.common.types.graph_data.NodeType, torch.Tensor]]])
num_workers (int)
batch_size (int)
pin_memory_device (Optional[torch.device])
worker_concurrency (int)
channel_size (str)
process_start_gap_seconds (float)
num_cpu_threads (Optional[int])
shuffle (bool)
drop_last (bool)
_main_inference_port (int)
_main_sampling_port (int)
Note: We try to adhere to pyg dataloader api as much as possible. See the following for reference: https://pytorch-geometric.readthedocs.io/en/2.5.2/_modules/torch_geometric/loader/node_loader.html#NodeLoader https://pytorch-geometric.readthedocs.io/en/2.5.2/_modules/torch_geometric/distributed/dist_neighbor_loader.html#DistNeighborLoader
- Parameters:
dataset (DistLinkPredictionDataset) – The dataset to sample from.
num_neighbors (List[int] or Dict[Tuple[str, str, str], List[int]]) – The number of neighbors to sample for each node in each iteration. If an entry is set to -1, all neighbors will be included. In heterogeneous graphs, may also take in a dictionary denoting the amount of neighbors to sample for each individual edge type.
context (DistributedContext) – Distributed context information of the current process.
local_process_rank (int) – The local rank of the current process within a node.
local_process_world_size (int) – The total number of processes within a node.
input_nodes (torch.Tensor or Tuple[str, torch.Tensor]) – The indices of seed nodes to start sampling from. It is of type torch.LongTensor for homogeneous graphs. If set to None for homogeneous settings, all nodes will be considered. In heterogeneous graphs, this flag must be passed in as a tuple that holds the node type and node indices. (default: None)
num_workers (int) – How many workers to use (subprocesses to spwan) for distributed neighbor sampling of the current process. (default:
1
).batch_size (int, optional) – how many samples per batch to load (default:
1
).pin_memory_device (str, optional) – The target device that the sampled results should be copied to. If set to
None
, the device is inferred based off of (got bygigl.distributed.utils.device.get_available_device
). Which uses the local_process_rank and torch.cuda.device_count() to assign the device. If cuda is not available, the cpu device will be used. (default:None
).worker_concurrency (int) – The max sampling concurrency for each sampling worker. Load testing has showed that setting worker_concurrency to 4 yields the best performance for sampling. Although, you may whish to explore higher/lower settings when performance tuning. (default: 4).
channel_size (int or str) – The shared-memory buffer size (bytes) allocated for the channel. Can be modified for performance tuning; a good starting point is:
num_workers * 64MB
(default: “4GB”).process_start_gap_seconds (float) – Delay between each process for initializing neighbor loader. At large scales, it is recommended to set this value to be between 60 and 120 seconds – otherwise multiple processes may attempt to initialize dataloaders at overlapping times, which can cause CPU memory OOM.
num_cpu_threads (Optional[int]) – Number of cpu threads PyTorch should use for CPU training/inference neighbor loading; on top of the per process parallelism. Defaults to 2 if set to None when using cpu training/inference.
shuffle (bool) – Whether to shuffle the input nodes. (default:
False
).drop_last (bool) – Whether to drop the last incomplete batch. (default:
False
)._main_inference_port (int) – WARNING: You don’t need to configure this unless port conflict issues. Slotted for refactor. The port number to use for inference processes. In future, the port will be automatically assigned based on availability. Currently defaults to: gigl.distributed.constants.DEFAULT_MASTER_INFERENCE_PORT
_main_sampling_port (int) – WARNING: You don’t need to configure this unless port conflict issues. Slotted for refactor. The port number to use for sampling processes. In future, the port will be automatically assigned based on availability. Currently defaults to: gigl.distributed.constants.DEFAULT_MASTER_SAMPLING_PORT