gigl.distributed.graph_store.remote_dist_dataset#

Attributes#

Classes#

RemoteDistDataset

Represents a dataset that is stored on a different storage cluster.

Module Contents#

class gigl.distributed.graph_store.remote_dist_dataset.RemoteDistDataset(cluster_info, local_rank)[source]#

Represents a dataset that is stored on a different storage cluster. Must be used in the GiGL graph-store distributed setup.

This class must be used on the compute (client) side of the graph-store distributed setup.

Parameters:
  • cluster_info (GraphStoreInfo) – The cluster information.

  • local_rank (int) – The local rank of the process on the compute node.

fetch_ablp_input(split, rank=None, world_size=None, anchor_node_type=None, supervision_edge_type=None)[source]#

Fetch ABLP (Anchor Based Link Prediction) input from the storage nodes.

The returned dict maps storage rank to an ABLPInputNodes dataclass for that storage node. If (rank, world_size) is provided, the input will be sharded across the compute nodes using contiguous server assignments. If both are None, the input will be returned unsharded for all storage nodes.

The ABLPInputNodes dataclass carries explicit node type information and keys the label tensors by their label EdgeType, making it unambiguous which node types the positive/negative labels correspond to.

Parameters:
  • split (Literal['train', 'val', 'test']) – The split to get the input for.

  • rank (Optional[int]) – The compute rank requesting data. When None (together with world_size), all data is returned unsharded from all storage nodes.

  • world_size (Optional[int]) – The total number of compute processes. When None (together with rank), all data is returned unsharded from all storage nodes.

  • anchor_node_type (Optional[gigl.src.common.types.graph_data.NodeType]) – The type of the anchor nodes to retrieve. Must be provided for heterogeneous graphs. Must be None for labeled homogeneous graphs.

  • supervision_edge_type (Optional[gigl.src.common.types.graph_data.EdgeType]) – The edge type for supervision. Must be provided for heterogeneous graphs. Must be None for labeled homogeneous graphs.

Returns:

  • anchor_node_type: The node type of the anchor nodes, or DEFAULT_HOMOGENEOUS_NODE_TYPE for labeled homogeneous.

  • anchor_nodes: 1D tensor of anchor node IDs for the split.

  • positive_labels: Dict mapping positive label EdgeType to a 2D tensor [N, M].

  • negative_labels: Optional dict mapping negative label EdgeType to a 2D tensor [N, M].

Return type:

A dict mapping storage rank to an ABLPInputNodes containing

Raises:

ValueError – If only one of rank or world_size is provided.

Example

Suppose we have 2 storage nodes and 2 compute nodes. Storage rank 0 has anchor nodes [0, 1, 2] (train), storage rank 1 has anchor nodes [3, 4, 5] (train), with positive/negative labels for link prediction.

Shard training ABLP input across 2 compute nodes (contiguous — each rank gets entire servers):

>>> dataset.fetch_ablp_input(split="train", rank=0, world_size=2)
{
    0: ABLPInputNodes(
        anchor_nodes=tensor([0, 1, 2]),
        labels={...},
    ),
    1: ABLPInputNodes(
        anchor_nodes=tensor([]),
        labels={...},
    ),
}
>>> dataset.fetch_ablp_input(split="train", rank=1, world_size=2)
{
    0: ABLPInputNodes(
        anchor_nodes=tensor([]),
        labels={...},
    ),
    1: ABLPInputNodes(
        anchor_nodes=tensor([3, 4, 5]),
        labels={...},
    ),
}

With 3 storage nodes and 2 compute nodes, server 1 is fractionally split. Storage rank 0 has anchors [0, 1], rank 1 has [2, 3], rank 2 has [4, 5]:

>>> dataset.fetch_ablp_input(split="train", rank=0, world_size=2)
{
    0: ABLPInputNodes(
        anchor_nodes=tensor([0, 1]),
        labels={...},
    ),
    1: ABLPInputNodes(
        anchor_nodes=tensor([2]),    # First half of storage 1
        labels={...},
    ),
    2: ABLPInputNodes(
        anchor_nodes=tensor([]),     # Nothing from storage 2
        labels={...},
    ),
}
fetch_edge_dir()[source]#

Fetch the edge direction from the registered dataset.

Returns:

The edge direction.

Return type:

Union[str, Literal[‘in’, ‘out’]]

fetch_edge_feature_info()[source]#

Fetch edge feature information from the registered dataset.

Returns:

  • A single FeatureInfo object for homogeneous graphs

  • A dict mapping EdgeType to FeatureInfo for heterogeneous graphs

  • None if no edge features are available

Return type:

Edge feature information, which can be

fetch_edge_partition_book(edge_type=None)[source]#

Fetches the partition book for the specified edge type.

Parameters:

edge_type (Optional[gigl.src.common.types.graph_data.EdgeType]) – The edge type to look up. Must be None for homogeneous datasets and non-None for heterogeneous ones.

Returns:

The partition book for the requested edge type, or None if no partition book is available.

Return type:

Optional[graphlearn_torch.partition.PartitionBook]

fetch_edge_types()[source]#

Fetch the edge types from the registered dataset.

Returns:

The edge types in the dataset, None if the dataset is homogeneous.

Return type:

Optional[list[gigl.src.common.types.graph_data.EdgeType]]

fetch_free_ports_on_storage_cluster(num_ports)[source]#

Get free ports from the storage master node.

This must be used with a torch.distributed process group initialized, for the entire training cluster.

All compute ranks will receive the same free ports.

Parameters:

num_ports (int) – Number of free ports to get.

Returns:

A list of free port numbers on the storage master node.

Return type:

list[int]

fetch_node_feature_info()[source]#

Fetch node feature information from the registered dataset.

Returns:

  • A single FeatureInfo object for homogeneous graphs

  • A dict mapping NodeType to FeatureInfo for heterogeneous graphs

  • None if no node features are available

Return type:

Node feature information, which can be

fetch_node_ids(rank=None, world_size=None, split=None, node_type=None)[source]#

Fetch node ids from the storage nodes for the current compute node (machine).

The returned dict maps storage rank to the node ids stored on that storage node, filtered and sharded according to the provided arguments.

Storage servers are assigned to compute nodes in contiguous blocks. Each compute node fetches all data from its assigned server(s) and receives empty tensors for unassigned ones. When both rank and world_size are None, all data is returned unsharded from every storage server.

Parameters:
  • rank (Optional[int]) – The compute rank requesting data. When None (together with world_size), all data is returned unsharded from all storage nodes.

  • world_size (Optional[int]) – The total number of compute processes. When None (together with rank), all data is returned unsharded from all storage nodes.

  • split (Optional[Literal['train', 'val', 'test']]) – The split of the dataset to get node ids from. If provided, the dataset must have train_node_ids, val_node_ids, and test_node_ids properties.

  • node_type (Optional[gigl.src.common.types.graph_data.NodeType]) – The type of nodes to get. Must be provided for heterogeneous datasets. Must be None for labeled homogeneous graphs.

Raises:

ValueError – If only one of rank or world_size is provided.

Returns:

A dict mapping storage rank to node ids.

Return type:

dict[int, torch.Tensor]

Example

Suppose we have 2 storage nodes and 2 compute nodes, with 16 total nodes. Nodes are partitioned across storage nodes, with splits defined as:

Storage rank 0: [0, 1, 2, 3, 4, 5, 6, 7]
    train=[0, 1, 2, 3], val=[4, 5], test=[6, 7]
Storage rank 1: [8, 9, 10, 11, 12, 13, 14, 15]
    train=[8, 9, 10, 11], val=[12, 13], test=[14, 15]

Get all nodes (no split filtering, no sharding):

>>> dataset.fetch_node_ids()
{
    0: tensor([0, 1, 2, 3, 4, 5, 6, 7]),
    1: tensor([8, 9, 10, 11, 12, 13, 14, 15]),
}

Shard training nodes across 2 compute nodes (contiguous — each rank gets entire servers):

>>> dataset.fetch_node_ids(rank=0, world_size=2, split="train")
{
    0: tensor([0, 1, 2, 3]),  # All training nodes from storage 0
    1: tensor([]),             # Nothing from storage 1
}
>>> dataset.fetch_node_ids(rank=1, world_size=2, split="train")
{
    0: tensor([]),             # Nothing from storage 0
    1: tensor([8, 9, 10, 11]), # All training nodes from storage 1
}

With 3 storage nodes and 2 compute nodes, server 1 is fractionally split:

>>> dataset.fetch_node_ids(rank=0, world_size=2, split="train")
{
    0: tensor([0, 1, 2, 3]),  # All of storage 0
    1: tensor([8, 9]),         # First half of storage 1
    2: tensor([]),             # Nothing from storage 2
}

Note

When split=None, all nodes are queryable. This means nodes from any split (train, val, or test) may be returned. This is useful when you need to sample neighbors during inference, as neighbor nodes may belong to any split.

fetch_node_partition_book(node_type=None)[source]#

Fetches the partition book for the specified node type.

Parameters:

node_type (Optional[gigl.src.common.types.graph_data.NodeType]) – The node type to look up. Must be None for homogeneous datasets and non-None for heterogeneous ones.

Returns:

The partition book for the requested node type, or None if no partition book is available.

Return type:

Optional[graphlearn_torch.partition.PartitionBook]

fetch_node_types()[source]#

Fetch the node types from the registered dataset.

Returns:

The node types in the dataset, None if the dataset is homogeneous.

Return type:

Optional[list[gigl.src.common.types.graph_data.NodeType]]

property cluster_info: gigl.env.distributed.GraphStoreInfo[source]#
Return type:

gigl.env.distributed.GraphStoreInfo

gigl.distributed.graph_store.remote_dist_dataset.logger[source]#