gigl.distributed.dist_ablp_neighborloader#

Attributes#

Classes#

DistABLPLoader

A generic data loader base that performs distributed sampling, which

Module Contents#

class gigl.distributed.dist_ablp_neighborloader.DistABLPLoader(dataset, num_neighbors, input_nodes=None, supervision_edge_type=None, num_workers=1, batch_size=1, pin_memory_device=None, worker_concurrency=4, channel_size='4GB', process_start_gap_seconds=60.0, num_cpu_threads=None, shuffle=False, drop_last=False, context=None, local_process_rank=None, local_process_world_size=None)[source]#

Bases: graphlearn_torch.distributed.DistLoader

A generic data loader base that performs distributed sampling, which allows mini-batch training of GNNs on large-scale graphs when full-batch training is not feasible.

This loader supports launching a collocated sampling worker on the current process, or launching separate sampling workers on the spawned subprocesses or remote server nodes. When using the separate sampling mode, a worker group including the information of separate sampling workers should be provided.

Note that the separate sampling mode supports asynchronous and concurrent sampling on each separate worker, which will achieve better performance and is recommended to use. If you want to use a collocated sampling worker, all sampling for each seed batch will be blocking and synchronous.

When launching a collocated sampling worker or some multiprocessing sampling workers (on spwaned subprocesses), the distribution mode must be non-server and only contains a group of parallel worker processes, this means that the graph and feature store should be partitioned among all those parallel worker processes and managed by them, sampling and training tasks will run on each worker process at the same time.

Otherwise, when launching some remote sampling workers, the distribution mode must be a server-client framework, which contains a group of server workers and a group of client workers, the graph and feature store will be partitioned and managed by all server workers. All client workers are responsible for training tasks and launch some workers on remote servers to perform sampling tasks, the sampled results will be consumed by client workers with a remote message channel.

Parameters:
  • data (DistDataset, optional) – The DistDataset object of a partition of graph data and feature data, along with distributed patition books. The input dataset must be provided in non-server distribution mode.

  • input_data (NodeSamplerInput or EdgeSamplerInput or RemoteSamplerInput) – The input data for which neighbors or subgraphs are sampled to create mini-batches. In heterogeneous graphs, needs to be passed as a tuple that holds the node type and node indices.

  • sampling_config (SamplingConfig) – The Configuration info for sampling.

  • to_device (torch.device, optional) – The target device that the sampled results should be copied to. If set to None, the current cuda device (got by torch.cuda.current_device) will be used if available, otherwise, the cpu device will be used. (default: None).

  • worker_options (optional) – The options for launching sampling workers. (1) If set to None or provided with a CollocatedDistWorkerOptions object, a single collocated sampler will be launched on the current process, while the separate sampling mode will be disabled . (2) If provided with a MpDistWorkerOptions object, the sampling workers will be launched on spawned subprocesses, and a share-memory based channel will be created for sample message passing from multiprocessing workers to the current loader. (3) If provided with a RemoteDistWorkerOptions object, the sampling workers will be launched on remote sampling server nodes, and a remote channel will be created for cross-machine message passing. (default: None).

  • dataset (gigl.distributed.dist_link_prediction_dataset.DistLinkPredictionDataset)

  • num_neighbors (Union[list[int], dict[torch_geometric.typing.EdgeType, list[int]]])

  • input_nodes (Optional[Union[torch.Tensor, tuple[gigl.src.common.types.graph_data.NodeType, torch.Tensor]]])

  • supervision_edge_type (Optional[torch_geometric.typing.EdgeType])

  • num_workers (int)

  • batch_size (int)

  • pin_memory_device (Optional[torch.device])

  • worker_concurrency (int)

  • channel_size (str)

  • process_start_gap_seconds (float)

  • num_cpu_threads (Optional[int])

  • shuffle (bool)

  • drop_last (bool)

  • context (Optional[gigl.distributed.dist_context.DistributedContext])

  • local_process_rank (Optional[int])

  • local_process_world_size (Optional[int])

Neighbor loader for Anchor Based Link Prediction (ABLP) tasks.

Note that for this class, the dataset must always be heterogeneous, as we need separate edge types for positive and negative labels.

By default, the loader will return {py:class} torch_geometric.data.HeteroData (heterogeneous) objects, but will return a {py:class}`torch_geometric.data.Data` (homogeneous) object if the dataset is “labeled homogeneous”.

The following fields may also be present: - y_positive: Dict[int, torch.Tensor] mapping from local anchor node id to a tensor of positive

label node ids.

  • y_negative: (Optional) Dict[int, torch.Tensor] mapping from local anchor node id to a tensor of negative

    label node ids. This will only be present if the supervision edge type has negative labels.

NOTE: for both y_positive, and y_negative, the values represented in both the key and value of the dicts are the local node ids of the sampled nodes, not the global node ids. In order to get the global node ids, you can use the node field of the Data/HeteroData object. e.g. global_positive_node_id_labels = data.node[data.y_positive[local_anchor_node_id]].

The underlying graph engine may also add the following fields to the output Data object:
  • num_sampled_nodes: If heterogeneous. a dictionary mapping from node type to the number of sampled nodes for that type, by hop.

if homogeneous, a tensor the number of sampled nodes, by hop. - num_sampled_edges: If heterogeneous, a dictionary mapping from edge type to the number of sampled edges for that type, by hop. If homogeneous, a tensor denoting the number of sampled edges, by hop.

Let’s use the following homogeneous graph (https://is.gd/a8DK15) as an example:

0 -> 1 [label=”Positive example” color=”green”] 0 -> 2 [label=”Negative example” color=”red”]

0 -> {3, 4} 3 -> {5, 6} 4 -> {7, 8}

1 -> 9 # shouldn’t be sampled 2 -> 10 # shouldn’t be sampled

For sampling around node 0, the fields on the output Data object will be:
  • y_positive: {0: torch.tensor([1])} # 1 is the only positive label for node 0

  • y_negative: {0: torch.tensor([2])} # 2 is the only negative label for node 0

Parameters:
  • dataset (DistLinkPredictionDataset) – The dataset to sample from.

  • num_neighbors (list[int] or Dict[tuple[str, str, str], list[int]]) – The number of neighbors to sample for each node in each iteration. If an entry is set to -1, all neighbors will be included. In heterogeneous graphs, may also take in a dictionary denoting the amount of neighbors to sample for each individual edge type.

  • context (deprecated - will be removed soon) (Optional[DistributedContext]) – Distributed context information of the current process.

  • local_process_rank (deprecated - will be removed soon) (int) – The local rank of the current process within a node.

  • local_process_world_size (deprecated - will be removed soon) (int) – The total number of processes within a node.

  • input_nodes (Optional[torch.Tensor, tuple[NodeType, torch.Tensor]]) – Indices of seed nodes to start sampling from. If set to None for homogeneous settings, all nodes will be considered. In heterogeneous graphs, this flag must be passed in as a tuple that holds the node type and node indices. (default: None)

  • num_workers (int) – How many workers to use (subprocesses to spwan) for distributed neighbor sampling of the current process. (default: 1).

  • batch_size (int, optional) – how many samples per batch to load (default: 1).

  • pin_memory_device (str, optional) – The target device that the sampled results should be copied to. If set to None, the device is inferred based off of (got by gigl.distributed.utils.device.get_available_device). Which uses the local_process_rank and torch.cuda.device_count() to assign the device. If cuda is not available, the cpu device will be used. (default: None).

  • worker_concurrency (int) – The max sampling concurrency for each sampling worker. Load testing has showed that setting worker_concurrency to 4 yields the best performance for sampling. Although, you may whish to explore higher/lower settings when performance tuning. (default: 4).

  • channel_size (int or str) – The shared-memory buffer size (bytes) allocated for the channel. Can be modified for performance tuning; a good starting point is: num_workers * 64MB (default: “4GB”).

  • process_start_gap_seconds (float) – Delay between each process for initializing neighbor loader. At large scales, it is recommended to set this value to be between 60 and 120 seconds – otherwise multiple processes may attempt to initialize dataloaders at overlapping times, which can cause CPU memory OOM.

  • num_cpu_threads (Optional[int]) – Number of cpu threads PyTorch should use for CPU training/inference neighbor loading; on top of the per process parallelism. Defaults to 2 if set to None when using cpu training/inference.

  • shuffle (bool) – Whether to shuffle the input nodes. (default: False).

  • drop_last (bool) – Whether to drop the last incomplete batch. (default: False).

  • context – Distributed context information of the current process.

  • local_process_rank – The local rank of the current process within a node.

  • local_process_world_size – The total number of processes within a node.

  • supervision_edge_type (Optional[torch_geometric.typing.EdgeType])

batch_size[source]#
collect_features[source]#
data[source]#
data_partition_idx[source]#
drop_last[source]#
edge_dir[source]#
input_data[source]#
num_data_partitions[source]#
num_neighbors[source]#
sampling_config[source]#
sampling_type[source]#
shuffle[source]#
to_device[source]#
with_edge[source]#
with_weight[source]#
worker_options[source]#
gigl.distributed.dist_ablp_neighborloader.logger[source]#