gigl.distributed.dist_ablp_neighborloader#
Attributes#
Classes#
Base class for GiGL distributed loaders. |
Module Contents#
- class gigl.distributed.dist_ablp_neighborloader.DistABLPLoader(dataset, num_neighbors, input_nodes=None, supervision_edge_type=None, num_workers=1, batch_size=1, pin_memory_device=None, worker_concurrency=4, prefetch_size=None, channel_size='4GB', process_start_gap_seconds=60.0, num_cpu_threads=None, shuffle=False, drop_last=False, context=None, local_process_rank=None, local_process_world_size=None)[source]#
Bases:
gigl.distributed.base_dist_loader.BaseDistLoaderBase class for GiGL distributed loaders.
Consolidates shared initialization logic from DistNeighborLoader and DistABLPLoader. Subclasses GLT’s DistLoader but does NOT call its
__init__— instead, it replicates the relevant attribute-setting logic to allow configurable producer classes.Subclasses should: 1. Call
resolve_runtime()to get runtime context. 2. Determine mode (colocated vs graph store). 3. Callcreate_sampling_config()to build the SamplingConfig. 4. For colocated: callcreate_colocated_channel()and construct theDistMpSamplingProducer(or subclass), then pass the producer assampler.For graph store: pass the RPC function (e.g.
DistServer.create_sampling_producer) assampler.Call
super().__init__()with the prepared data.
- Parameters:
dataset (Union[gigl.distributed.dist_dataset.DistDataset, gigl.distributed.graph_store.remote_dist_dataset.RemoteDistDataset]) –
DistDataset(colocated) orRemoteDistDataset(graph store).sampler_input – Prepared by the subclass. Single input for colocated mode, list (one per server) for graph store mode.
dataset_schema – Contains edge types, feature info, edge dir, etc.
worker_options –
MpDistSamplingWorkerOptions(colocated) orRemoteDistSamplingWorkerOptions(graph store).sampling_config – Configuration for the sampler (created via
create_sampling_config).device – Target device for sampled results.
runtime – Resolved distributed runtime information.
sampler – Either a pre-constructed
DistMpSamplingProducer(colocated mode) or a callable to dispatch on theDistServer(graph store mode).process_start_gap_seconds (float) – Delay between each process for staggered colocated init.
num_neighbors (Union[list[int], dict[torch_geometric.typing.EdgeType, list[int]]])
input_nodes (Optional[Union[torch.Tensor, tuple[gigl.src.common.types.graph_data.NodeType, torch.Tensor], dict[int, gigl.utils.sampling.ABLPInputNodes]]])
supervision_edge_type (Optional[Union[torch_geometric.typing.EdgeType, list[torch_geometric.typing.EdgeType]]])
num_workers (int)
batch_size (int)
pin_memory_device (Optional[torch.device])
worker_concurrency (int)
prefetch_size (Optional[int])
channel_size (str)
num_cpu_threads (Optional[int])
shuffle (bool)
drop_last (bool)
context (Optional[gigl.distributed.dist_context.DistributedContext])
local_process_rank (Optional[int])
local_process_world_size (Optional[int])
Neighbor loader for Anchor Based Link Prediction (ABLP) tasks.
Note that for this class, the dataset must always be heterogeneous, as we need separate edge types for positive and negative labels.
By default, the loader will return {py:class} torch_geometric.data.HeteroData (heterogeneous) objects, but will return a {py:class}`torch_geometric.data.Data` (homogeneous) object if the dataset is “labeled homogeneous”.
The following fields may also be present: - y_positive: dict[int, torch.Tensor] mapping from local anchor node id to a tensor of positive
label node ids.
- y_negative: (Optional) dict[int, torch.Tensor] mapping from local anchor node id to a tensor of negative
label node ids. This will only be present if the supervision edge type has negative labels.
NOTE: for both y_positive, and y_negative, the values represented in both the key and value of the dicts are the local node ids of the sampled nodes, not the global node ids. In order to get the global node ids, you can use the node field of the Data/HeteroData object. e.g. global_positive_node_id_labels = data.node[data.y_positive[local_anchor_node_id]].
- The underlying graph engine may also add the following fields to the output Data object:
num_sampled_nodes: If heterogeneous. a dictionary mapping from node type to the number of sampled nodes for that type, by hop.
if homogeneous, a tensor the number of sampled nodes, by hop. - num_sampled_edges: If heterogeneous, a dictionary mapping from edge type to the number of sampled edges for that type, by hop. If homogeneous, a tensor denoting the number of sampled edges, by hop.
- Let’s use the following homogeneous graph (https://is.gd/a8DK15) as an example:
0 -> 1 [label=”Positive example” color=”green”] 0 -> 2 [label=”Negative example” color=”red”]
0 -> {3, 4} 3 -> {5, 6} 4 -> {7, 8}
1 -> 9 # shouldn’t be sampled 2 -> 10 # shouldn’t be sampled
- For sampling around node 0, the fields on the output Data object will be:
y_positive: {0: torch.tensor([1])} # 1 is the only positive label for node 0
y_negative: {0: torch.tensor([2])} # 2 is the only negative label for node 0
NOTE: both label fields will instead be dict[EdgeType, dict[int, torch.Tensor]] if multiple supervision edge types are provided. e.g. if there are supervision edge types: (a, to, b) and (a, to, c), then the label fields could be:
y_positive: {(a, to, b): {0: torch.tensor([1])}, (a, to, c): {0: torch.tensor([2])}}
y_negative: {(a, to, b): {0: torch.tensor([3])}, (a, to, c): {0: torch.tensor([4])}}
- Parameters:
dataset (Union[DistDataset, RemoteDistDataset]) – The dataset to sample from. If this is a RemoteDistDataset, then we are in “Graph Store” mode.
num_neighbors (list[int] or dict[tuple[str, str, str], list[int]]) – The number of neighbors to sample for each node in each iteration. If an entry is set to -1, all neighbors will be included. In heterogeneous graphs, may also take in a dictionary denoting the amount of neighbors to sample for each individual edge type.
input_nodes (Optional[Union[torch.Tensor, tuple[gigl.src.common.types.graph_data.NodeType, torch.Tensor], dict[int, gigl.utils.sampling.ABLPInputNodes]]]) –
Indices of seed nodes to start sampling from. For Colocated mode: torch.Tensor or tuple[NodeType, torch.Tensor].
If set to None for homogeneous settings, all nodes will be considered. In heterogeneous graphs, this flag must be passed in as a tuple that holds the node type and node indices. NOTE: We intend to migrate colocated mode to have a similar input format to Graph Store mode in the future. We want to do this so that users can easily control labels per anchor.
- For Graph Store mode: dict[int, ABLPInputNodes]
Maps server_rank to an ABLPInputNodes dataclass containing anchor nodes, positive labels, and negative labels with explicit node type and edge type info. This is the return type of RemoteDistDataset.fetch_ablp_input().
supervision_edge_type (Optional[Union[EdgeType, list[EdgeType]]]) –
The edge type(s) to use for supervision. For Colocated mode: Must be None iff the dataset is labeled homogeneous.
If set to a single EdgeType, the positive and negative labels will be stored in the y_positive and y_negative fields of the Data object. If set to a list of EdgeTypes, the positive and negative labels will be stored in the y_positive and y_negative fields of the Data object, with the key being the EdgeType. (default: None)
- For Graph Store mode: Must not be provided (must be None). The supervision edge types are
inferred from the label edge type keys in ABLPInputNodes.
num_workers (int) – How many workers to use (subprocesses to spwan) for distributed neighbor sampling of the current process. (default:
1).batch_size (int, optional) – how many samples per batch to load (default:
1).pin_memory_device (str, optional) – The target device that the sampled results should be copied to. If set to
None, the device is inferred based off of (got bygigl.distributed.utils.device.get_available_device). Which uses the local_process_rank and torch.cuda.device_count() to assign the device. If cuda is not available, the cpu device will be used. (default:None).worker_concurrency (int) – The max sampling concurrency for each sampling worker. Load testing has showed that setting worker_concurrency to 4 yields the best performance for sampling. Although, you may whish to explore higher/lower settings when performance tuning. (default: 4).
prefetch_size (Optional[int]) – Max number of sampled messages to prefetch on the client side, per server. Only applies to Graph Store mode (remote workers). Lower values reduce server-side RPC thread contention when multiple loaders are active concurrently. (default:
None). If supplied and not it Graph Store mode, an error will be raised.channel_size (int or str) – The shared-memory buffer size (bytes) allocated for the channel. Can be modified for performance tuning; a good starting point is:
num_workers * 64MB(default: “4GB”).process_start_gap_seconds (float) – Delay between each process for initializing neighbor loader. At large scales, it is recommended to set this value to be between 60 and 120 seconds – otherwise multiple processes may attempt to initialize dataloaders at overlapping times, which can cause CPU memory OOM.
num_cpu_threads (Optional[int]) – Number of cpu threads PyTorch should use for CPU training/inference neighbor loading; on top of the per process parallelism. Defaults to 2 if set to None when using cpu training/inference.
shuffle (bool) – Whether to shuffle the input nodes. (default:
False).drop_last (bool) – Whether to drop the last incomplete batch. (default:
False).context (deprecated - will be removed soon) (Optional[DistributedContext]) – Distributed context information of the current process.
local_process_rank (deprecated - will be removed soon) (int) – The local rank of the current process within a node.
local_process_world_size (deprecated - will be removed soon) (int) – The total number of processes within a node.