gigl.distributed.graph_store.remote_dist_dataset#

Attributes#

Classes#

RemoteDistDataset

Represents a dataset that is stored on a difference storage cluster.

Module Contents#

class gigl.distributed.graph_store.remote_dist_dataset.RemoteDistDataset(cluster_info, local_rank, mp_sharing_dict=None)[source]#

Represents a dataset that is stored on a difference storage cluster. Must be used in the GiGL graph-store distributed setup.

This class must be used on the compute (client) side of the graph-store distributed setup.

Parameters:
  • cluster_info (GraphStoreInfo) – The cluster information.

  • local_rank (int) – The local rank of the process on the compute node.

  • mp_sharing_dict (Optional[MutableMapping[str, torch.Tensor]]) – (Optional) If provided, will be used to share tensors across the local machine. e.g. for get_node_ids. If provided, must be a DictProxy e.g. the return value of a mp.Manager. ex. torch.multiprocessing.Manager().dict().

get_ablp_input(split, rank=None, world_size=None, node_type=DEFAULT_HOMOGENEOUS_NODE_TYPE, supervision_edge_type=DEFAULT_HOMOGENEOUS_EDGE_TYPE)[source]#

Fetches ABLP (Anchor Based Link Prediction) input from the storage nodes.

The returned dict maps storage rank to a tuple of (anchor_nodes, positive_labels, negative_labels) for that storage node. If (rank, world_size) is provided, the input will be sharded across the compute nodes. If no (rank, world_size) is provided, the input will be returned for all storage nodes.

Parameters:
  • split (Literal["train", "val", "test"]) – The split to get the input for.

  • rank (Optional[int]) – The rank of the process requesting the input. Must be provided if world_size is provided.

  • world_size (Optional[int]) – The total number of processes in the distributed setup. Must be provided if rank is provided.

  • node_type (NodeType) – The type of nodes to retrieve. Defaults to DEFAULT_HOMOGENEOUS_NODE_TYPE.

  • supervision_edge_type (EdgeType) – The edge type for supervision. Defaults to DEFAULT_HOMOGENEOUS_EDGE_TYPE.

Returns:

A dict mapping storage rank to a tuple of: - anchor_nodes: The anchor node ids for the split - positive_labels: Positive label node ids of shape [N, M] where N is the number

of anchor nodes and M is the number of positive labels per anchor

  • negative_labels: Negative label node ids of shape [N, M], or None if unavailable

Return type:

dict[int, tuple[torch.Tensor, torch.Tensor, Optional[torch.Tensor]]]

Examples

Suppose we have 1 storage node with users [0, 1, 2, 3, 4] where:

train=[0, 1, 2], val=[3], test=[4]

And positive/negative labels defined for link prediction.

Get training ABLP input:

>>> dataset.get_ablp_input(split="train", node_type=USER)
{
    0: (
        tensor([0, 1, 2]),           # anchor nodes
        tensor([[0, 1], [1, 2], [2, 3]]),  # positive labels
        tensor([[2], [3], [4]])     # negative labels
    )
}

With sharding across 2 compute nodes (rank 0 gets first portion):

>>> dataset.get_ablp_input(split="train", rank=0, world_size=2, node_type=USER)
{
    0: (
        tensor([0]),                # first anchor node
        tensor([[0, 1]]),           # its positive labels
        tensor([[2]])               # its negative labels
    )
}

Note

The GLT sampling engine expects all processes on a given compute machine to have the same sampling input (node ids). As such, the input tensors may be duplicated across all processes on a given compute machine. To save on CPU memory, pass mp_sharing_dict to the RemoteDistDataset constructor.

get_edge_dir()[source]#

Get the edge direction from the registered dataset.

Returns:

The edge direction.

Return type:

Union[str, Literal[‘in’, ‘out’]]

get_edge_feature_info()[source]#

Get edge feature information from the registered dataset.

Returns:

  • A single FeatureInfo object for homogeneous graphs

  • A dict mapping EdgeType to FeatureInfo for heterogeneous graphs

  • None if no edge features are available

Return type:

Edge feature information, which can be

get_edge_types()[source]#

Get the edge types from the registered dataset.

Returns:

The edge types in the dataset, None if the dataset is homogeneous.

Return type:

Optional[list[gigl.src.common.types.graph_data.EdgeType]]

get_free_ports_on_storage_cluster(num_ports)[source]#

Get free ports from the storage master node.

This must be used with a torch.distributed process group initialized, for the entire training cluster.

All compute ranks will receive the same free ports.

Parameters:

num_ports (int) – Number of free ports to get.

Returns:

A list of free port numbers on the storage master node.

Return type:

list[int]

get_node_feature_info()[source]#

Get node feature information from the registered dataset.

Returns:

  • A single FeatureInfo object for homogeneous graphs

  • A dict mapping NodeType to FeatureInfo for heterogeneous graphs

  • None if no node features are available

Return type:

Node feature information, which can be

get_node_ids(rank=None, world_size=None, split=None, node_type=None)[source]#

Fetches node ids from the storage nodes for the current compute node (machine).

The returned dict maps storage rank to the node ids stored on that storage node, filtered and sharded according to the provided arguments.

Parameters:
  • rank (Optional[int]) – The rank of the process requesting node ids. Must be provided if world_size is provided.

  • world_size (Optional[int]) – The total number of processes in the distributed setup. Must be provided if rank is provided.

  • split (Optional[Literal["train", "val", "test"]]) – The split of the dataset to get node ids from. If provided, the dataset must have train_node_ids, val_node_ids, and test_node_ids properties.

  • node_type (Optional[NodeType]) – The type of nodes to get. Must be provided for heterogeneous datasets.

Returns:

A dict mapping storage rank to node ids.

Return type:

dict[int, torch.Tensor]

Examples

Suppose we have 2 storage nodes and 2 compute nodes, with 16 total nodes. Nodes are partitioned across storage nodes, with splits defined as:

Storage rank 0: [0, 1, 2, 3, 4, 5, 6, 7]

train=[0, 1, 2, 3], val=[4, 5], test=[6, 7]

Storage rank 1: [8, 9, 10, 11, 12, 13, 14, 15]

train=[8, 9, 10, 11], val=[12, 13], test=[14, 15]

Get all nodes (no split filtering, no sharding):

>>> dataset.get_node_ids()
{
    0: tensor([0, 1, 2, 3, 4, 5, 6, 7]),      # All 8 nodes from storage rank 0
    1: tensor([8, 9, 10, 11, 12, 13, 14, 15]) # All 8 nodes from storage rank 1
}

Shard all nodes across 2 compute nodes (compute rank 0 gets first half from each storage):

>>> dataset.get_node_ids(rank=0, world_size=2)
{
    0: tensor([0, 1, 2, 3]),   # First 4 of all 8 nodes from storage rank 0
    1: tensor([8, 9, 10, 11])  # First 4 of all 8 nodes from storage rank 1
}

Get only training nodes (no sharding):

>>> dataset.get_node_ids(split="train")
{
    0: tensor([0, 1, 2, 3]),   # 4 training nodes from storage rank 0
    1: tensor([8, 9, 10, 11])  # 4 training nodes from storage rank 1
}

Combine split and sharding (training nodes, sharded for compute rank 0):

>>> dataset.get_node_ids(rank=0, world_size=2, split="train")
{
    0: tensor([0, 1]),  # First 2 of 4 training nodes from storage rank 0
    1: tensor([8, 9])   # First 2 of 4 training nodes from storage rank 1
}

Note

When split=None, all nodes are queryable. This means nodes from any split (train, val, or test) may be returned. This is useful when you need to sample neighbors during inference, as neighbor nodes may belong to any split.

The GLT sampling engine expects all processes on a given compute machine to have the same sampling input (node ids). As such, the input tensors may be duplicated across all processes on a given compute machine. To save on CPU memory, pass mp_sharing_dict to the RemoteDistDataset constructor.

property cluster_info: gigl.env.distributed.GraphStoreInfo[source]#
Return type:

gigl.env.distributed.GraphStoreInfo

gigl.distributed.graph_store.remote_dist_dataset.logger[source]#