gigl.distributed#

GLT Distributed Classes implemented in GiGL

Submodules#

Classes#

DistLinkPredictionDataset

This class is inherited from GraphLearn-for-PyTorch's DistDataset class. We override the __init__ functionality to support positive and

DistNeighborLoader

A generic data loader base that performs distributed sampling, which

DistPartitioner

This class is based on GLT's DistRandomPartitioner class (alibaba/graphlearn-for-pytorch)

DistRangePartitioner

This class is responsible for implementing range-based partitioning. Rather than using a tensor-based partition

DistributedContext

GiGL Distributed Context

Functions#

build_dataset(serialized_graph_metadata, ...[, ...])

Launches a spawned process for building and returning a DistLinkPredictionDataset instance provided some SerializedGraphMetadata

build_dataset_from_task_config_uri(task_config_uri, ...)

Builds a dataset from a provided task_config_uri as part of GiGL orchestration. Parameters to

Package Contents#

class gigl.distributed.DistLinkPredictionDataset(rank, world_size, edge_dir, graph_partition=None, node_feature_partition=None, edge_feature_partition=None, node_partition_book=None, edge_partition_book=None, positive_edge_label=None, negative_edge_label=None, node_ids=None, num_train=None, num_val=None, num_test=None)[source]#

Bases: graphlearn_torch.distributed.dist_dataset.DistDataset

This class is inherited from GraphLearn-for-PyTorch’s DistDataset class. We override the __init__ functionality to support positive and negative edges and labels. We also override the share_ipc function to correctly serialize these new fields. We additionally introduce a build function for storing the partitioned inside of this class. We assume data in this class is only in the CPU RAM, and do not support data on GPU memory, thus simplifying the logic and tooling required compared to the base DistDataset class.

Initializes the fields of the DistLinkPredictionDataset class. This function is called upon each serialization of the DistLinkPredictionDataset instance. :param rank: Rank of the current process :type rank: int :param world_size: World size of the current process :type world_size: int :param edge_dir: Edge direction of the provied graph :type edge_dir: Literal[“in”, “out”]

The below arguments are only expected to be provided when re-serializing an instance of the DistLinkPredictionDataset class after build() has been called

graph_partition (Optional[Union[Graph, Dict[EdgeType, Graph]]]): Partitioned Graph Data node_feature_partition (Optional[Union[Feature, Dict[NodeType, Feature]]]): Partitioned Node Feature Data edge_feature_partition (Optional[Union[torch.Tensor, Dict[EdgeType, torch.Tensor]]]): Partitioned Edge Feature Data node_partition_book (Optional[Union[PartitionBook, Dict[NodeType, PartitionBook]]]): Node Partition Book edge_partition_book (Optional[Union[PartitionBook, Dict[EdgeType, PartitionBook]]]): Edge Partition Book positive_edge_label (Optional[Union[torch.Tensor, Dict[EdgeType, torch.Tensor]]]): Positive Edge Label Tensor negative_edge_label (Optional[Union[torch.Tensor, Dict[EdgeType, torch.Tensor]]]): Negative Edge Label Tensor node_ids (Optional[Union[torch.Tensor, Dict[NodeType, torch.Tensor]]]): Node IDs on the current machine num_train: (Optional[Mapping[NodeType, int]]): Number of training nodes on the current machine. Will be a dict if heterogeneous. num_val: (Optional[Mapping[NodeType, int]]): Number of validation nodes on the current machine. Will be a dict if heterogeneous. num_test: (Optional[Mapping[NodeType, int]]): Number of test nodes on the current machine. Will be a dict if heterogeneous.

Parameters:
  • rank (int)

  • world_size (int)

  • edge_dir (Literal['in', 'out'])

  • graph_partition (Optional[Union[graphlearn_torch.data.Graph, Dict[gigl.src.common.types.graph_data.EdgeType, graphlearn_torch.data.Graph]]])

  • node_feature_partition (Optional[Union[graphlearn_torch.data.Feature, Dict[gigl.src.common.types.graph_data.NodeType, graphlearn_torch.data.Feature]]])

  • edge_feature_partition (Optional[Union[graphlearn_torch.data.Feature, Dict[gigl.src.common.types.graph_data.EdgeType, graphlearn_torch.data.Feature]]])

  • node_partition_book (Optional[Union[graphlearn_torch.partition.PartitionBook, Dict[gigl.src.common.types.graph_data.NodeType, graphlearn_torch.partition.PartitionBook]]])

  • edge_partition_book (Optional[Union[graphlearn_torch.partition.PartitionBook, Dict[gigl.src.common.types.graph_data.EdgeType, graphlearn_torch.partition.PartitionBook]]])

  • positive_edge_label (Optional[Union[torch.Tensor, Dict[gigl.src.common.types.graph_data.EdgeType, torch.Tensor]]])

  • negative_edge_label (Optional[Union[torch.Tensor, Dict[gigl.src.common.types.graph_data.EdgeType, torch.Tensor]]])

  • node_ids (Optional[Union[torch.Tensor, Dict[gigl.src.common.types.graph_data.NodeType, torch.Tensor]]])

  • num_train (Optional[Union[int, Dict[gigl.src.common.types.graph_data.NodeType, int]]])

  • num_val (Optional[Union[int, Dict[gigl.src.common.types.graph_data.NodeType, int]]])

  • num_test (Optional[Union[int, Dict[gigl.src.common.types.graph_data.NodeType, int]]])

build(partition_output, splitter=None)[source]#

Provided some partition graph information, this method stores these tensors inside of the class for subsequent live subgraph sampling using a GraphLearn-for-PyTorch NeighborLoader.

Note that this method will clear the following fields from the provided partition_output:
  • partitioned_edge_index

  • partitioned_node_features

  • partitioned_edge_features

We do this to decrease the peak memory usage during the build process by removing these intermediate assets.

Parameters:
  • partition_output (PartitionOutput) – Partitioned Graph to be stored in the DistLinkPredictionDataset class

  • splitter (Optional[NodeAnchorLinkSplitter]) –

    A function that takes in an edge index and returns:
    • a tuple of train, val, and test node ids, if heterogeneous

    • a dict[NodeType, tuple[train, val, test]] of node ids, if homogeneous

    Optional as not all datasets need to be split on, e.g. if we’re doing inference.

Return type:

None

abstract load(*args, **kwargs)[source]#

Load a certain dataset partition from partitioned files and create in-memory objects (Graph, Feature or torch.Tensor).

Parameters:
  • root_dir (str) – The directory path to load the graph and feature partition data.

  • partition_idx (int) – Partition idx to load.

  • graph_mode (str) – Mode for creating graphlearn_torch’s Graph, including CPU, ZERO_COPY or CUDA. (default: ZERO_COPY)

  • input_layout (str) – layout of the input graph, including CSR, CSC or COO. (default: COO)

  • feature_with_gpu (bool) – A Boolean value indicating whether the created Feature objects of node/edge features use UnifiedTensor. If True, it means Feature consists of UnifiedTensor, otherwise Feature is a PyTorch CPU Tensor, the device_group_list and device will be invliad. (default: True)

  • graph_caching (bool) – A Boolean value indicating whether to load the full graph totoploy instead of partitioned one.

  • device_group_list (List[DeviceGroup], optional) – A list of device groups used for feature lookups, the GPU part of feature data will be replicated on each device group in this list during the initialization. GPUs with peer-to-peer access to each other should be set in the same device group properly. (default: None)

  • whole_node_label_file (str) – The path to the whole node labels which are not partitioned. (default: None)

  • device – The target cuda device rank used for graph operations when graph mode is not “CPU” and feature lookups when the GPU part is not None. (default: None)

share_ipc()[source]#

Serializes the member variables of the DistLinkPredictionDatasetClass :returns: Rank on current machine

int: World size across all machines Literal[“in”, “out”]: Graph Edge Direction Optional[Union[Graph, Dict[EdgeType, Graph]]]: Partitioned Graph Data Optional[Union[Feature, Dict[NodeType, Feature]]]: Partitioned Node Feature Data Optional[Union[Feature, Dict[EdgeType, Feature]]]: Partitioned Edge Feature Data Optional[Union[torch.Tensor, Dict[NodeType, torch.Tensor]]]: Node Partition Book Tensor Optional[Union[torch.Tensor, Dict[EdgeType, torch.Tensor]]]: Edge Partition Book Tensor Optional[Union[torch.Tensor, Dict[EdgeType, torch.Tensor]]]: Positive Edge Label Tensor Optional[Union[torch.Tensor, Dict[EdgeType, torch.Tensor]]]: Negative Edge Label Tensor Optional[Union[int, Dict[NodeType, int]]]: Number of training nodes on the current machine. Will be a dict if heterogeneous. Optional[Union[int, Dict[NodeType, int]]]: Number of validation nodes on the current machine. Will be a dict if heterogeneous. Optional[Union[int, Dict[NodeType, int]]]: Number of test nodes on the current machine. Will be a dict if heterogeneous.

Return type:

int

property edge_dir: Literal['in', 'out']#
Return type:

Literal[‘in’, ‘out’]

property edge_features: graphlearn_torch.data.Feature | Dict[gigl.src.common.types.graph_data.EdgeType, graphlearn_torch.data.Feature] | None#

During serializiation, the initialized Feature type does not immediately contain the feature and id2index tensors. These fields are initially set to None, and are only populated when we retrieve the size, retrieve the shape, or index into one of these tensors. This can also be done manually with the feature.lazy_init_with_ipc_handle() function.

Return type:

Optional[Union[graphlearn_torch.data.Feature, Dict[gigl.src.common.types.graph_data.EdgeType, graphlearn_torch.data.Feature]]]

property edge_pb: graphlearn_torch.partition.PartitionBook | Dict[gigl.src.common.types.graph_data.EdgeType, graphlearn_torch.partition.PartitionBook] | None#
Return type:

Optional[Union[graphlearn_torch.partition.PartitionBook, Dict[gigl.src.common.types.graph_data.EdgeType, graphlearn_torch.partition.PartitionBook]]]

property graph: graphlearn_torch.data.Graph | Dict[gigl.src.common.types.graph_data.EdgeType, graphlearn_torch.data.Graph] | None#
Return type:

Optional[Union[graphlearn_torch.data.Graph, Dict[gigl.src.common.types.graph_data.EdgeType, graphlearn_torch.data.Graph]]]

property negative_edge_label: torch.Tensor | Dict[gigl.src.common.types.graph_data.EdgeType, torch.Tensor] | None#
Return type:

Optional[Union[torch.Tensor, Dict[gigl.src.common.types.graph_data.EdgeType, torch.Tensor]]]

property node_features: graphlearn_torch.data.Feature | Dict[gigl.src.common.types.graph_data.NodeType, graphlearn_torch.data.Feature] | None#

During serializiation, the initialized Feature type does not immediately contain the feature and id2index tensors. These fields are initially set to None, and are only populated when we retrieve the size, retrieve the shape, or index into one of these tensors. This can also be done manually with the feature.lazy_init_with_ipc_handle() function.

Return type:

Optional[Union[graphlearn_torch.data.Feature, Dict[gigl.src.common.types.graph_data.NodeType, graphlearn_torch.data.Feature]]]

property node_ids: torch.Tensor | Dict[gigl.src.common.types.graph_data.NodeType, torch.Tensor] | None#
Return type:

Optional[Union[torch.Tensor, Dict[gigl.src.common.types.graph_data.NodeType, torch.Tensor]]]

property node_pb: graphlearn_torch.partition.PartitionBook | Dict[gigl.src.common.types.graph_data.NodeType, graphlearn_torch.partition.PartitionBook] | None#
Return type:

Optional[Union[graphlearn_torch.partition.PartitionBook, Dict[gigl.src.common.types.graph_data.NodeType, graphlearn_torch.partition.PartitionBook]]]

property num_partitions: int#
Return type:

int

property partition_idx: int#
Return type:

int

property positive_edge_label: torch.Tensor | Dict[gigl.src.common.types.graph_data.EdgeType, torch.Tensor] | None#
Return type:

Optional[Union[torch.Tensor, Dict[gigl.src.common.types.graph_data.EdgeType, torch.Tensor]]]

property test_node_ids: torch.Tensor | collections.abc.Mapping[gigl.src.common.types.graph_data.NodeType, torch.Tensor] | None#
Return type:

Optional[Union[torch.Tensor, collections.abc.Mapping[gigl.src.common.types.graph_data.NodeType, torch.Tensor]]]

property train_node_ids: torch.Tensor | collections.abc.Mapping[gigl.src.common.types.graph_data.NodeType, torch.Tensor] | None#
Return type:

Optional[Union[torch.Tensor, collections.abc.Mapping[gigl.src.common.types.graph_data.NodeType, torch.Tensor]]]

property val_node_ids: torch.Tensor | collections.abc.Mapping[gigl.src.common.types.graph_data.NodeType, torch.Tensor] | None#
Return type:

Optional[Union[torch.Tensor, collections.abc.Mapping[gigl.src.common.types.graph_data.NodeType, torch.Tensor]]]

class gigl.distributed.DistNeighborLoader(dataset, num_neighbors, context, local_process_rank, local_process_world_size, input_nodes=None, num_workers=1, batch_size=1, pin_memory_device=None, worker_concurrency=4, channel_size='4GB', process_start_gap_seconds=60.0, num_cpu_threads=None, shuffle=False, drop_last=False, _main_inference_port=DEFAULT_MASTER_INFERENCE_PORT, _main_sampling_port=DEFAULT_MASTER_SAMPLING_PORT)[source]#

Bases: graphlearn_torch.distributed.DistLoader

A generic data loader base that performs distributed sampling, which allows mini-batch training of GNNs on large-scale graphs when full-batch training is not feasible.

This loader supports launching a collocated sampling worker on the current process, or launching separate sampling workers on the spawned subprocesses or remote server nodes. When using the separate sampling mode, a worker group including the information of separate sampling workers should be provided.

Note that the separate sampling mode supports asynchronous and concurrent sampling on each separate worker, which will achieve better performance and is recommended to use. If you want to use a collocated sampling worker, all sampling for each seed batch will be blocking and synchronous.

When launching a collocated sampling worker or some multiprocessing sampling workers (on spwaned subprocesses), the distribution mode must be non-server and only contains a group of parallel worker processes, this means that the graph and feature store should be partitioned among all those parallel worker processes and managed by them, sampling and training tasks will run on each worker process at the same time.

Otherwise, when launching some remote sampling workers, the distribution mode must be a server-client framework, which contains a group of server workers and a group of client workers, the graph and feature store will be partitioned and managed by all server workers. All client workers are responsible for training tasks and launch some workers on remote servers to perform sampling tasks, the sampled results will be consumed by client workers with a remote message channel.

Parameters:
  • data (DistDataset, optional) – The DistDataset object of a partition of graph data and feature data, along with distributed patition books. The input dataset must be provided in non-server distribution mode.

  • input_data (NodeSamplerInput or EdgeSamplerInput or RemoteSamplerInput) – The input data for which neighbors or subgraphs are sampled to create mini-batches. In heterogeneous graphs, needs to be passed as a tuple that holds the node type and node indices.

  • sampling_config (SamplingConfig) – The Configuration info for sampling.

  • to_device (torch.device, optional) – The target device that the sampled results should be copied to. If set to None, the current cuda device (got by torch.cuda.current_device) will be used if available, otherwise, the cpu device will be used. (default: None).

  • worker_options (optional) – The options for launching sampling workers. (1) If set to None or provided with a CollocatedDistWorkerOptions object, a single collocated sampler will be launched on the current process, while the separate sampling mode will be disabled . (2) If provided with a MpDistWorkerOptions object, the sampling workers will be launched on spawned subprocesses, and a share-memory based channel will be created for sample message passing from multiprocessing workers to the current loader. (3) If provided with a RemoteDistWorkerOptions object, the sampling workers will be launched on remote sampling server nodes, and a remote channel will be created for cross-machine message passing. (default: None).

  • dataset (gigl.distributed.dist_link_prediction_dataset.DistLinkPredictionDataset)

  • num_neighbors (Union[List[int], Dict[torch_geometric.typing.EdgeType, List[int]]])

  • context (gigl.distributed.DistributedContext)

  • local_process_rank (int)

  • local_process_world_size (int)

  • input_nodes (Optional[Union[torch.Tensor, Tuple[gigl.src.common.types.graph_data.NodeType, torch.Tensor]]])

  • num_workers (int)

  • batch_size (int)

  • pin_memory_device (Optional[torch.device])

  • worker_concurrency (int)

  • channel_size (str)

  • process_start_gap_seconds (float)

  • num_cpu_threads (Optional[int])

  • shuffle (bool)

  • drop_last (bool)

  • _main_inference_port (int)

  • _main_sampling_port (int)

Note: We try to adhere to pyg dataloader api as much as possible. See the following for reference: https://pytorch-geometric.readthedocs.io/en/2.5.2/_modules/torch_geometric/loader/node_loader.html#NodeLoader https://pytorch-geometric.readthedocs.io/en/2.5.2/_modules/torch_geometric/distributed/dist_neighbor_loader.html#DistNeighborLoader

Parameters:
  • dataset (DistLinkPredictionDataset) – The dataset to sample from.

  • num_neighbors (List[int] or Dict[Tuple[str, str, str], List[int]]) – The number of neighbors to sample for each node in each iteration. If an entry is set to -1, all neighbors will be included. In heterogeneous graphs, may also take in a dictionary denoting the amount of neighbors to sample for each individual edge type.

  • context (DistributedContext) – Distributed context information of the current process.

  • local_process_rank (int) – The local rank of the current process within a node.

  • local_process_world_size (int) – The total number of processes within a node.

  • input_nodes (torch.Tensor or Tuple[str, torch.Tensor]) – The indices of seed nodes to start sampling from. It is of type torch.LongTensor for homogeneous graphs. If set to None for homogeneous settings, all nodes will be considered. In heterogeneous graphs, this flag must be passed in as a tuple that holds the node type and node indices. (default: None)

  • num_workers (int) – How many workers to use (subprocesses to spwan) for distributed neighbor sampling of the current process. (default: 1).

  • batch_size (int, optional) – how many samples per batch to load (default: 1).

  • pin_memory_device (str, optional) – The target device that the sampled results should be copied to. If set to None, the device is inferred based off of (got by gigl.distributed.utils.device.get_available_device). Which uses the local_process_rank and torch.cuda.device_count() to assign the device. If cuda is not available, the cpu device will be used. (default: None).

  • worker_concurrency (int) – The max sampling concurrency for each sampling worker. Load testing has showed that setting worker_concurrency to 4 yields the best performance for sampling. Although, you may whish to explore higher/lower settings when performance tuning. (default: 4).

  • channel_size (int or str) – The shared-memory buffer size (bytes) allocated for the channel. Can be modified for performance tuning; a good starting point is: num_workers * 64MB (default: “4GB”).

  • process_start_gap_seconds (float) – Delay between each process for initializing neighbor loader. At large scales, it is recommended to set this value to be between 60 and 120 seconds – otherwise multiple processes may attempt to initialize dataloaders at overlapping times, which can cause CPU memory OOM.

  • num_cpu_threads (Optional[int]) – Number of cpu threads PyTorch should use for CPU training/inference neighbor loading; on top of the per process parallelism. Defaults to 2 if set to None when using cpu training/inference.

  • shuffle (bool) – Whether to shuffle the input nodes. (default: False).

  • drop_last (bool) – Whether to drop the last incomplete batch. (default: False).

  • _main_inference_port (int) – WARNING: You don’t need to configure this unless port conflict issues. Slotted for refactor. The port number to use for inference processes. In future, the port will be automatically assigned based on availability. Currently defaults to: gigl.distributed.constants.DEFAULT_MASTER_INFERENCE_PORT

  • _main_sampling_port (int) – WARNING: You don’t need to configure this unless port conflict issues. Slotted for refactor. The port number to use for sampling processes. In future, the port will be automatically assigned based on availability. Currently defaults to: gigl.distributed.constants.DEFAULT_MASTER_SAMPLING_PORT

class gigl.distributed.DistPartitioner(should_assign_edges_by_src_node=False, node_ids=None, node_features=None, edge_index=None, edge_features=None, positive_labels=None, negative_labels=None)[source]#

This class is based on GLT’s DistRandomPartitioner class (alibaba/graphlearn-for-pytorch) and has been optimized for better flexibility and memory management. We assume that init_rpc() and init_worker_group have been called to initialize the rpc and context, respectively, prior to this class. This class aims to partition homogeneous and heterogeneous input data, such as nodes, node features, edges, edge features, and any supervision labels across multiple machines. This class also produces partition books for edges and nodes, which are 1-d tensors that indicate which rank each node id and edge id are stored on. For example, the node partition book

[0, 0, 1, 2]

Means that node 0 is on rank 0, node 1 is on rank 0, node 2 is on rank 1, and node 3 is on rank 2.

In this class, node and edge id and feature tensors can be passed in either through the constructor or the public register functions. It is required to have registered these tensors to the class prior to partitioning. For optimal memory management, it is recommended that the reference to these large tensors be deleted after being registered to the class but before partitioning, as maintaining both original and intermediate tensors can cause OOM concerns. Registering these tensors is available through both the constructor and the register functions to support the multiple use ways customers can use partitioning:

Option 1: User wants to Partition just the nodes of a graph

` partitioner = DistPartitioner() # Customer doesn't have to pass in excessive amounts of parameters to the constructor to partition only nodes partitioner.register_nodes(node_ids) del node_ids # Del reference to node_ids outside of DistPartitioner to allow memory cleanup within the class partitioner.partition_nodes() # We may optionally want to call gc.collect() to ensure that any lingering memory is cleaned up, which may happen in cases where only a subset of inputs are partitioned (i.e no feats or labels) gc.collect() `

Option 2: User wants to partition all parts of a graph together and in sequence

``` partitioner = DistPartitioner(node_ids, edge_index, node_features, edge_features, pos_labels, neg_labels) # Register is called in the __init__ functions and doesn’t need to be called at all outside the class. del (

node_ids, edge_index, node_features, edge_features, pos_labels, neg_labels

) # Del reference to tensors outside of DistPartitioner to allow memory cleanup within the class partitioner.partition() # We may optionally want to call gc.collect() to ensure that any lingering memory is cleaned up, which may happen in cases where only a subset of inputs are partitioned (i.e no feats or labels) gc.collect() ```

The use case for only partitioning one entity through Option 1 may be in cases where we want to further parallelize some of the workload, since the previous GLT use case only had access to Partition() which calls partitioning of entities in sequence.

For optimal memory management, it is recommended that the reference to these large tensors be deleted after being registered to the class but before partitioning, as maintaining both original and intermediate tensors can cause OOM concerns.

Once all desired tensors are registered, you can either call the partition function to partition all registered fields or partition each field individually through the public partition_{entity_type} functions. With the partition function, fields which are not registered will return None. Note that each entity type should only be partitioned once, since registered fields are cleaned up after partitioning for optimal memory impact.

From GLT’s description of DistRandomPartitioner:

Each distributed partitioner will process a part of the full graph and feature data, and partition them. A distributed partitioner’s rank is corresponding to a partition index, and the number of all distributed partitioners must be same with the number of output partitions. During partitioning, the partitioned results will be sent to other distributed partitioners according to their ranks. After partitioning, each distributed partitioner will own a partitioned graph with its corresponding rank and further save the partitioned results into the local output directory.

Initializes the parameters of the partitioner. Also optionally takes in node and edge tensors as arguments and registers them to the partitioner. Registered entities should be a dictionary of Dict[[NodeType or EdgeType], torch.Tensor] if heterogeneous or a torch.Tensor if homogeneous. This class assumes the distributed context has already been initialized outside of this class with the glt.distributed.init_worker_group() function and that rpc has been initialized with glt_distributed.init_rpc(). :param should_assign_edges_by_src_node: Whether edges should be assigned to the machine of the source nodes during partitioning :type should_assign_edges_by_src_node: bool :param node_ids: Optionally registered node ids from input. Tensors should be of shape [num_nodes_on_current_rank] :type node_ids: Optional[Union[torch.Tensor, Dict[NodeType, torch.Tensor]]] :param node_features: Optionally registered node feats from input. Tensors should be of shope [num_nodes_on_current_rank, node_feat_dim] :type node_features: Optional[Union[torch.Tensor, Dict[NodeType, torch.Tensor]]] :param edge_index: Optionally registered edge indexes from input. Tensors should be of shape [2, num_edges_on_current_rank] :type edge_index: Optional[Union[torch.Tensor, Dict[EdgeType, torch.Tensor]]] :param edge_features: Optionally registered edge features from input. Tensors should be of shape [num_edges_on_current_rank, edge_feat_dim] :type edge_features: Optional[Union[torch.Tensor, Dict[EdgeType, torch.Tensor]]] :param positive_labels: Optionally registered positive labels from input. Tensors should be of shape [2, num_pos_labels_on_current_rank] :type positive_labels: Optional[Union[torch.Tensor, Dict[EdgeType, torch.Tensor]]] :param negative_labels: Optionally registered negative labels from input. Tensors should be of shape [2, num_neg_labels_on_current_rank] :type negative_labels: Optional[Union[torch.Tensor, Dict[EdgeType, torch.Tensor]]]

Parameters:
partition()[source]#

Calls partition on all registered fields. Note that at minimum nodes and edges must be registered when using this function. :returns: Reshuffled Outputs of Partitioning :rtype: PartitionOutput

Return type:

gigl.types.graph.PartitionOutput

partition_edge_index_and_edge_features(node_partition_book)[source]#

Partitions edges of a graph, including edge indices and edge features. If there are no edge features, only edge indices are partitioned. If heterogeneous, partitions edges/features for all edge types. Must call partition_node first to get the node partition book as input. :param node_partition_book: The computed Node Partition Book :type node_partition_book: Union[PartitionBook, Dict[NodeType, PartitionBook]]

Returns:

Union[

Tuple[GraphPartitionData, FeaturePartitionData, PartitionBook], Tuple[Dict[EdgeType, GraphPartitionData], Dict[EdgeType, FeaturePartitionData], Dict[EdgeType, PartitionBook]],

]: Partitioned Graph Data, Feature Data, and corresponding edge partition book, is a dictionary if heterogeneous

Parameters:

node_partition_book (Union[graphlearn_torch.partition.PartitionBook, Dict[gigl.src.common.types.graph_data.NodeType, graphlearn_torch.partition.PartitionBook]])

Return type:

Union[Tuple[gigl.types.graph.GraphPartitionData, Optional[gigl.types.graph.FeaturePartitionData], graphlearn_torch.partition.PartitionBook], Tuple[Dict[gigl.src.common.types.graph_data.EdgeType, gigl.types.graph.GraphPartitionData], Optional[Dict[gigl.src.common.types.graph_data.EdgeType, gigl.types.graph.FeaturePartitionData]], Dict[gigl.src.common.types.graph_data.EdgeType, graphlearn_torch.partition.PartitionBook]]]

partition_labels(node_partition_book, is_positive)[source]#

Partitions positive or negative labels of a graph. If heterogeneous, partitions labels for all edge type. Must call partition_node first to get the node partition book as input. :param node_partition_book: The computed Node Partition Book :type node_partition_book: Union[PartitionBook, Dict[NodeType, PartitionBook]] :param is_positive: Whether positive labels are currently being registered. If False, negative labels will be partitioned. :type is_positive: bool

Returns:

Returns the edge indices for partitioned positive or negative label, dependent on the is_positive flag

Return type:

Union[torch.Tensor, Dict[EdgeType, torch.Tensor]]

Parameters:
  • node_partition_book (Union[graphlearn_torch.partition.PartitionBook, Dict[gigl.src.common.types.graph_data.NodeType, graphlearn_torch.partition.PartitionBook]])

  • is_positive (bool)

partition_node()[source]#

Partitions nodes of a graph. If heterogeneous, partitions nodes for all node types.

Returns:

Partition Book of input nodes or Dict if heterogeneous

Return type:

Union[PartitionBook, Dict[NodeType, PartitionBook]]

partition_node_features(node_partition_book)[source]#

Partitions node features of a graph. If heterogeneous, partitions features for all node type. Must call partition_node first to get the node partition book as input.

Parameters:

node_partition_book (Union[PartitionBook, Dict[NodeType, PartitionBook]]) – The Computed Node Partition Book

Returns:

Feature Partition Data of ids and features or Dict if heterogeneous.

Return type:

Union[FeaturePartitionData, Dict[NodeType, FeaturePartitionData]]

register_edge_features(edge_features)[source]#

Registers the edge features to the partitioner.

For optimal memory management, it is recommended that the reference to edge_features tensor be deleted after calling this function using del <tensor>, as maintaining both original and intermediate tensors can cause OOM concerns. We do not need to perform all_gather calls here since register_edge_index is responsible for determining total number of edges across all ranks and inferrring edge ids. :param edge_features: Input edge features which is either a torch.Tensor if homogeneous or a Dict if heterogeneous :type edge_features: Union[torch.Tensor, Dict[EdgeType, torch.Tensor]]

Parameters:

edge_features (Union[torch.Tensor, Dict[gigl.src.common.types.graph_data.EdgeType, torch.Tensor]])

Return type:

None

register_edge_index(edge_index)[source]#

Registers the edge_index to the partitioner. Also computes additional fields for partitioning such as the total number of edges across all ranks and the number of edges on the current rnak.

For optimal memory management, it is recommended that the reference to edge_index tensor be deleted after calling this function using del <tensor>, as maintaining both original and intermediate tensors can cause OOM concerns. :param edge_index: Input edge index which is either a torch.Tensor if homogeneous or a Dict if heterogeneous :type edge_index: Union[torch.Tensor, Dict[EdgeType, torch.Tensor]]

Parameters:

edge_index (Union[torch.Tensor, Dict[gigl.src.common.types.graph_data.EdgeType, torch.Tensor]])

Return type:

None

register_labels(label_edge_index, is_positive)[source]#

Registers the positive or negative label to the partitioner. Note that for the homogeneous case, all edge types of the graph must be present in the label edge index dictionary.

For optimal memory management, it is recommended that the reference to the label tensor be deleted after calling this function using del <tensor>, as maintaining both original and intermediate tensors can cause OOM concerns. We do not need to perform all_gather calls here since register_edge_index is responsible for determining total number of edges across all ranks and inferring edge ids. :param label_edge_index: Input positive or negative labels which is either a torch.Tensor if homogeneous or a Dict if heterogeneous :type label_edge_index: Union[torch.Tensor, Dict[EdgeType, torch.Tensor]] :param is_positive: Whether positive labels are currently being registered. If False, labels will be registered as negative :type is_positive: bool

Parameters:
Return type:

None

register_node_features(node_features)[source]#

Registers the node features to the partitioner.

For optimal memory management, it is recommended that the reference to node_features tensor be deleted after calling this function using del <tensor>, as maintaining both original and intermediate tensors can cause OOM concerns. We do not need to perform all_gather calls here since register_node_ids is responsible for determining total number of nodes across all ranks. :param node_features: Input node features which is either a torch.Tensor if homogeneous or a Dict if heterogeneous :type node_features: Union[torch.Tensor, Dict[NodeType, torch.Tensor]]

Parameters:

node_features (Union[torch.Tensor, Dict[gigl.src.common.types.graph_data.NodeType, torch.Tensor]])

Return type:

None

register_node_ids(node_ids)[source]#

Registers the node ids to the partitioner. Also computes additional fields for partitioning such as the total number of nodes across all ranks.

For optimal memory management, it is recommended that the reference to the node_id tensor be deleted after calling this function using del <tensor>, as maintaining both original and intermediate tensors can cause OOM concerns. :param node_ids: Input node_ids which is either a torch.Tensor if homogeneous or a Dict if heterogeneous :type node_ids: Union[torch.Tensor, Dict[NodeType, torch.Tensor]]

Parameters:

node_ids (Union[torch.Tensor, Dict[gigl.src.common.types.graph_data.NodeType, torch.Tensor]])

Return type:

None

class gigl.distributed.DistRangePartitioner(should_assign_edges_by_src_node=False, node_ids=None, node_features=None, edge_index=None, edge_features=None, positive_labels=None, negative_labels=None)[source]#

Bases: gigl.distributed.dist_partitioner.DistPartitioner

This class is responsible for implementing range-based partitioning. Rather than using a tensor-based partition book, this approach stores the upper bound of ids for each rank. For example, a range partition book [4, 8, 12] stores edge ids 0-3 on the 0th rank, 4-7 on the 1st rank, and 8-11 on the 2nd rank. While keeping the same id-indexing pattern for rank lookup as the tensor-based partitioning, this partition book does a search through these partition bounds to fetch the ranks, rather than using a direct index lookup. For example, to get the rank of node ids 1 and 6 by doing node_pb[[1, 6]], the range partition book uses torch.searchsorted on the partition bounds to return [0, 1], the ranks of each of these ids. As a result, the range-based partition book trades off more efficient memory storage for a slower lookup time for indices.

Initializes the parameters of the partitioner. Also optionally takes in node and edge tensors as arguments and registers them to the partitioner. Registered entities should be a dictionary of Dict[[NodeType or EdgeType], torch.Tensor] if heterogeneous or a torch.Tensor if homogeneous. This class assumes the distributed context has already been initialized outside of this class with the glt.distributed.init_worker_group() function and that rpc has been initialized with glt_distributed.init_rpc(). :param should_assign_edges_by_src_node: Whether edges should be assigned to the machine of the source nodes during partitioning :type should_assign_edges_by_src_node: bool :param node_ids: Optionally registered node ids from input. Tensors should be of shape [num_nodes_on_current_rank] :type node_ids: Optional[Union[torch.Tensor, Dict[NodeType, torch.Tensor]]] :param node_features: Optionally registered node feats from input. Tensors should be of shope [num_nodes_on_current_rank, node_feat_dim] :type node_features: Optional[Union[torch.Tensor, Dict[NodeType, torch.Tensor]]] :param edge_index: Optionally registered edge indexes from input. Tensors should be of shape [2, num_edges_on_current_rank] :type edge_index: Optional[Union[torch.Tensor, Dict[EdgeType, torch.Tensor]]] :param edge_features: Optionally registered edge features from input. Tensors should be of shape [num_edges_on_current_rank, edge_feat_dim] :type edge_features: Optional[Union[torch.Tensor, Dict[EdgeType, torch.Tensor]]] :param positive_labels: Optionally registered positive labels from input. Tensors should be of shape [2, num_pos_labels_on_current_rank] :type positive_labels: Optional[Union[torch.Tensor, Dict[EdgeType, torch.Tensor]]] :param negative_labels: Optionally registered negative labels from input. Tensors should be of shape [2, num_neg_labels_on_current_rank] :type negative_labels: Optional[Union[torch.Tensor, Dict[EdgeType, torch.Tensor]]]

Parameters:
partition_edge_index_and_edge_features(node_partition_book)[source]#

Partitions edges of a graph, including edge indices and edge features. If heterogeneous, partitions edges for all edge types. You must call partition_node first to get the node partition book as input. The difference between this function and its parent is that we no longer need to check that the edge_ids have been pre-computed as a prerequisite for partitioning edges and edge features.

Parameters:

node_partition_book (Union[PartitionBook, Dict[NodeType, PartitionBook]]) – The computed Node Partition Book

Returns:

Union[

Tuple[GraphPartitionData, FeaturePartitionData, PartitionBook], Tuple[Dict[EdgeType, GraphPartitionData], Dict[EdgeType, FeaturePartitionData], Dict[EdgeType, PartitionBook]],

]: Partitioned Graph Data, Feature Data, and corresponding edge partition book, is a dictionary if heterogeneous

Return type:

Union[tuple[gigl.types.graph.GraphPartitionData, Optional[gigl.types.graph.FeaturePartitionData], graphlearn_torch.partition.PartitionBook], tuple[Dict[gigl.src.common.types.graph_data.EdgeType, gigl.types.graph.GraphPartitionData], Optional[Dict[gigl.src.common.types.graph_data.EdgeType, gigl.types.graph.FeaturePartitionData]], Dict[gigl.src.common.types.graph_data.EdgeType, graphlearn_torch.partition.PartitionBook]]]

register_edge_index(edge_index)[source]#

Registers the edge_index to the partitioner. Unlike the tensor-based partitioner, this register pattern does not automatically infer edge ids,as they are not needed for partitioning.

For optimal memory management, it is recommended that the reference to edge_index tensor be deleted after calling this function using del <tensor>, as maintaining both original and intermediate tensors can cause OOM concerns.

Parameters:

edge_index (Union[torch.Tensor, Dict[EdgeType, torch.Tensor]]) – Input edge index which is either a torch.Tensor if homogeneous or a Dict if heterogeneous

Return type:

None

class gigl.distributed.DistributedContext[source]#

GiGL Distributed Context

global_rank: int#
global_world_size: int#
main_worker_ip_address: str#
gigl.distributed.build_dataset(serialized_graph_metadata, distributed_context, sample_edge_direction, should_load_tensors_in_parallel=True, partitioner_class=None, node_tf_dataset_options=TFDatasetOptions(), edge_tf_dataset_options=TFDatasetOptions(), should_convert_labels_to_edges=False, splitter=None, _ssl_positive_label_percentage=None, _dataset_building_port=DEFAULT_MASTER_DATA_BUILDING_PORT)[source]#

Launches a spawned process for building and returning a DistLinkPredictionDataset instance provided some SerializedGraphMetadata :param serialized_graph_metadata: Metadata about TFRecords that are serialized to disk :type serialized_graph_metadata: SerializedGraphMetadata :param distributed_context: Distributed context containing information for master_ip_address, rank, and world size :type distributed_context: DistributedContext :param sample_edge_direction: Whether edges in the graph are directed inward or outward. Note that this is

listed as a possible string to satisfy type check, but in practice must be a Literal[“in”, “out”].

Parameters:
  • should_load_tensors_in_parallel (bool) – Whether tensors should be loaded from serialized information in parallel or in sequence across the [node, edge, pos_label, neg_label] entity types.

  • partitioner_class (Optional[Type[DistPartitioner]]) – Partitioner class to partition the graph inputs. If provided, this must be a DistPartitioner or subclass of it. If not provided, will initialize use the DistPartitioner class.

  • node_tf_dataset_options (TFDatasetOptions) – Options provided to a tf.data.Dataset to tune how serialized node data is read.

  • edge_tf_dataset_options (TFDatasetOptions) – Options provided to a tf.data.Dataset to tune how serialized edge data is read.

  • should_convert_labels_to_edges (bool) – Whether to convert labels to edges in the graph. If this is set to true, the output dataset will be heterogeneous.

  • splitter (Optional[NodeAnchorLinkSplitter]) – Optional splitter to use for splitting the graph data into train, val, and test sets. If not provided (None), no splitting will be performed.

  • _ssl_positive_label_percentage (Optional[float]) – Percentage of edges to select as self-supervised labels. Must be None if supervised edge labels are provided in advance. Slotted for refactor once this functionality is available in the transductive splitter directly

  • _dataset_building_port (int) – WARNING: You don’t need to configure this unless port conflict issues. Slotted for refactor. The RPC port to use to build the dataset. In future, the port will be automatically assigned based on availability. Currently defaults to: gigl.distributed.constants.DEFAULT_MASTER_DATA_BUILDING_PORT

  • serialized_graph_metadata (gigl.common.data.load_torch_tensors.SerializedGraphMetadata)

  • distributed_context (gigl.distributed.dist_context.DistributedContext)

  • sample_edge_direction (Union[Literal["in", "out"], str])

Returns:

Built GraphLearn-for-PyTorch Dataset class

Return type:

DistLinkPredictionDataset

gigl.distributed.build_dataset_from_task_config_uri(task_config_uri, distributed_context, is_inference=True)[source]#

Builds a dataset from a provided task_config_uri as part of GiGL orchestration. Parameters to this step should be provided in the inferenceArgs field of the GbmlConfig for inference or the trainerArgs field of the GbmlConfig for training. The current parsable arguments are here are - sample_edge_direction: Direction of the graph - should_use_range_partitioning: Whether we should be using range-based partitioning - should_load_tensors_in_parallel: Whether TFRecord loading should happen in parallel across entities :param task_config_uri: URI to a GBML Config :type task_config_uri: str :param distributed_context: Distributed context containing information for

master_ip_address, rank, and world size

Parameters:
  • is_inference (bool) – Whether the run is for inference or training. If True, arguments will be read from inferenceArgs. Otherwise, arguments witll be read from trainerArgs.

  • task_config_uri (str)

  • distributed_context (DistributedContext)

Return type:

gigl.distributed.dist_link_prediction_dataset.DistLinkPredictionDataset