gigl.distributed.graph_store.storage_utils#

Composable utilities for Graph Store storage nodes.

Provides two building blocks that callers (examples, integration tests, CLI entry points) can combine with their own orchestration logic:

Attributes#

Functions#

build_storage_dataset(task_config_uri, ...[, ...])

Build a DistDataset for a storage node from a task config.

run_storage_server(storage_rank, cluster_info, ...[, ...])

Spawn sequential storage-server sessions as subprocesses.

Module Contents#

gigl.distributed.graph_store.storage_utils.build_storage_dataset(task_config_uri, sample_edge_direction, tf_record_uri_pattern='.*-of-.*\\.tfrecord(\\.gz)?$', splitter=None, should_load_tensors_in_parallel=True, ssl_positive_label_percentage=None)[source]#

Build a DistDataset for a storage node from a task config.

Loads the GBML config from task_config_uri, translates the protobuf metadata into SerializedGraphMetadata, and delegates to build_dataset() with DistRangePartitioner.

This should be called once per storage node (machine). A torch.distributed process group must already be initialised among all storage nodes before calling this function so that the dataset can be partitioned correctly.

Parameters:
  • task_config_uri (gigl.common.Uri) – URI pointing to a frozen GbmlConfig protobuf.

  • sample_edge_direction (Literal['in', 'out']) – Direction for edge sampling ("in" or "out").

  • tf_record_uri_pattern (str) – Regex pattern to match TFRecord file URIs.

  • splitter (Optional[Union[gigl.utils.data_splitters.DistNodeAnchorLinkSplitter, gigl.utils.data_splitters.DistNodeSplitter]]) – Optional splitter for node-anchor-link or node splitting. If None, the dataset will not be split.

  • should_load_tensors_in_parallel (bool) – Whether to load TFRecord tensors in parallel.

  • ssl_positive_label_percentage (Optional[float]) – Fraction of edges to select as self-supervised positive labels. Must be None when supervised edge labels are already provided. For example, 0.1 selects 10 % of edges.

Returns:

A partitioned DistDataset ready to be served.

Return type:

gigl.distributed.dist_dataset.DistDataset

gigl.distributed.graph_store.storage_utils.run_storage_server(storage_rank, cluster_info, dataset, num_server_sessions, timeout_seconds=None, num_rpc_threads=16, rpc_timeout=None)[source]#

Spawn sequential storage-server sessions as subprocesses.

Each server session requires its own spawned process because you cannot re-connect to the same GLT server process after it has been joined. This function loops over num_server_sessions, spawning _run_storage_server_session() as a subprocess each time and joining it before starting the next.

Parameters:
  • storage_rank (int) – Rank of this storage node in the storage cluster.

  • cluster_info (gigl.env.distributed.GraphStoreInfo) – Cluster topology information.

  • dataset (gigl.distributed.dist_dataset.DistDataset) – The DistDataset to serve.

  • num_server_sessions (int) – Number of sequential server sessions to run (typically one per inference node type).

  • timeout_seconds (Optional[float]) – Timeout for joining each server subprocess. None waits indefinitely.

  • num_rpc_threads (int) – The number of RPC threads to use for the server. This is the maximum number of concurrent RPC requests that the server can handle. Should be set to the maximum number of concurrent RPCs a server must handle, in practice, the compute world size is an upper bound.

  • rpc_timeout (Optional[int]) – The max timeout in seconds for remote RPC requests. If None, uses the init_server default of 180 seconds. If there are long running RPCs (e.g. producer creation), and they timeout, then this parameter should be increased to avoid timeout errors.

Return type:

None

gigl.distributed.graph_store.storage_utils.logger[source]#