gigl.distributed.graph_store.storage_utils#

Composable utilities for Graph Store storage nodes.

Provides two building blocks that callers (examples, integration tests, CLI entry points) can combine with their own orchestration logic:

Attributes#

Functions#

build_storage_dataset(task_config_uri, ...[, ...])

Build a DistDataset for a storage node from a task config.

run_storage_server(storage_rank, cluster_info, ...[, ...])

Spawn sequential storage-server sessions as subprocesses.

Module Contents#

gigl.distributed.graph_store.storage_utils.build_storage_dataset(task_config_uri, sample_edge_direction, tf_record_uri_pattern='.*-of-.*\\.tfrecord(\\.gz)?$', splitter=None, should_load_tensors_in_parallel=True, ssl_positive_label_percentage=None)[source]#

Build a DistDataset for a storage node from a task config.

Loads the GBML config from task_config_uri, translates the protobuf metadata into SerializedGraphMetadata, and delegates to build_dataset() with DistRangePartitioner.

This should be called once per storage node (machine). A torch.distributed process group must already be initialised among all storage nodes before calling this function so that the dataset can be partitioned correctly.

Parameters:
  • task_config_uri (gigl.common.Uri) – URI pointing to a frozen GbmlConfig protobuf.

  • sample_edge_direction (Literal['in', 'out']) – Direction for edge sampling ("in" or "out").

  • tf_record_uri_pattern (str) – Regex pattern to match TFRecord file URIs.

  • splitter (Optional[Union[gigl.utils.data_splitters.DistNodeAnchorLinkSplitter, gigl.utils.data_splitters.DistNodeSplitter]]) – Optional splitter for node-anchor-link or node splitting. If None, the dataset will not be split.

  • should_load_tensors_in_parallel (bool) – Whether to load TFRecord tensors in parallel.

  • ssl_positive_label_percentage (Optional[float]) – Fraction of edges to select as self-supervised positive labels. Must be None when supervised edge labels are already provided. For example, 0.1 selects 10 % of edges.

Returns:

A partitioned DistDataset ready to be served.

Return type:

gigl.distributed.dist_dataset.DistDataset

gigl.distributed.graph_store.storage_utils.run_storage_server(storage_rank, cluster_info, dataset, num_server_sessions, timeout_seconds=None)[source]#

Spawn sequential storage-server sessions as subprocesses.

Each server session requires its own spawned process because you cannot re-connect to the same GLT server process after it has been joined. This function loops over num_server_sessions, spawning _run_storage_server_session() as a subprocess each time and joining it before starting the next.

Parameters:
  • storage_rank (int) – Rank of this storage node in the storage cluster.

  • cluster_info (gigl.env.distributed.GraphStoreInfo) – Cluster topology information.

  • dataset (gigl.distributed.dist_dataset.DistDataset) – The DistDataset to serve.

  • num_server_sessions (int) – Number of sequential server sessions to run (typically one per inference node type).

  • timeout_seconds (Optional[float]) – Timeout for joining each server subprocess. None waits indefinitely.

Return type:

None

gigl.distributed.graph_store.storage_utils.logger[source]#