gigl.distributed.graph_store.storage_utils#
Composable utilities for Graph Store storage nodes.
Provides two building blocks that callers (examples, integration tests, CLI entry points) can combine with their own orchestration logic:
build_storage_dataset()— loads a task config, converts metadata, and builds aDistDatasetusingDistRangePartitioner.run_storage_server()— initialises a GLT server, sets up atorch.distributedprocess group for the storage cluster, and blocks until compute nodes signal shutdown.
Attributes#
Functions#
|
Build a |
|
Spawn sequential storage-server sessions as subprocesses. |
Module Contents#
- gigl.distributed.graph_store.storage_utils.build_storage_dataset(task_config_uri, sample_edge_direction, tf_record_uri_pattern='.*-of-.*\\.tfrecord(\\.gz)?$', splitter=None, should_load_tensors_in_parallel=True, ssl_positive_label_percentage=None)[source]#
Build a
DistDatasetfor a storage node from a task config.Loads the GBML config from task_config_uri, translates the protobuf metadata into
SerializedGraphMetadata, and delegates tobuild_dataset()withDistRangePartitioner.This should be called once per storage node (machine). A
torch.distributedprocess group must already be initialised among all storage nodes before calling this function so that the dataset can be partitioned correctly.- Parameters:
task_config_uri (gigl.common.Uri) – URI pointing to a frozen
GbmlConfigprotobuf.sample_edge_direction (Literal['in', 'out']) – Direction for edge sampling (
"in"or"out").tf_record_uri_pattern (str) – Regex pattern to match TFRecord file URIs.
splitter (Optional[Union[gigl.utils.data_splitters.DistNodeAnchorLinkSplitter, gigl.utils.data_splitters.DistNodeSplitter]]) – Optional splitter for node-anchor-link or node splitting. If
None, the dataset will not be split.should_load_tensors_in_parallel (bool) – Whether to load TFRecord tensors in parallel.
ssl_positive_label_percentage (Optional[float]) – Fraction of edges to select as self-supervised positive labels. Must be
Nonewhen supervised edge labels are already provided. For example,0.1selects 10 % of edges.
- Returns:
A partitioned
DistDatasetready to be served.- Return type:
- gigl.distributed.graph_store.storage_utils.run_storage_server(storage_rank, cluster_info, dataset, num_server_sessions, timeout_seconds=None)[source]#
Spawn sequential storage-server sessions as subprocesses.
Each server session requires its own spawned process because you cannot re-connect to the same GLT server process after it has been joined. This function loops over num_server_sessions, spawning
_run_storage_server_session()as a subprocess each time and joining it before starting the next.- Parameters:
storage_rank (int) – Rank of this storage node in the storage cluster.
cluster_info (gigl.env.distributed.GraphStoreInfo) – Cluster topology information.
dataset (gigl.distributed.dist_dataset.DistDataset) – The
DistDatasetto serve.num_server_sessions (int) – Number of sequential server sessions to run (typically one per inference node type).
timeout_seconds (Optional[float]) – Timeout for joining each server subprocess.
Nonewaits indefinitely.
- Return type:
None