gigl.distributed.graph_store.storage_utils#
Composable utilities for Graph Store storage nodes.
Provides two building blocks that callers (examples, integration tests, CLI entry points) can combine with their own orchestration logic:
build_storage_dataset()— loads a task config, converts metadata, and builds aDistDatasetusingDistRangePartitioner.run_storage_server()— initialises a GLT server, sets up atorch.distributedprocess group for the storage cluster, and blocks until compute nodes signal shutdown.
Attributes#
Functions#
|
Build a |
|
Spawn sequential storage-server sessions as subprocesses. |
Module Contents#
- gigl.distributed.graph_store.storage_utils.build_storage_dataset(task_config_uri, sample_edge_direction, tf_record_uri_pattern='.*-of-.*\\.tfrecord(\\.gz)?$', splitter=None, should_load_tensors_in_parallel=True, ssl_positive_label_percentage=None)[source]#
Build a
DistDatasetfor a storage node from a task config.Loads the GBML config from task_config_uri, translates the protobuf metadata into
SerializedGraphMetadata, and delegates tobuild_dataset()withDistRangePartitioner.This should be called once per storage node (machine). A
torch.distributedprocess group must already be initialised among all storage nodes before calling this function so that the dataset can be partitioned correctly.- Parameters:
task_config_uri (gigl.common.Uri) – URI pointing to a frozen
GbmlConfigprotobuf.sample_edge_direction (Literal['in', 'out']) – Direction for edge sampling (
"in"or"out").tf_record_uri_pattern (str) – Regex pattern to match TFRecord file URIs.
splitter (Optional[Union[gigl.utils.data_splitters.DistNodeAnchorLinkSplitter, gigl.utils.data_splitters.DistNodeSplitter]]) – Optional splitter for node-anchor-link or node splitting. If
None, the dataset will not be split.should_load_tensors_in_parallel (bool) – Whether to load TFRecord tensors in parallel.
ssl_positive_label_percentage (Optional[float]) – Fraction of edges to select as self-supervised positive labels. Must be
Nonewhen supervised edge labels are already provided. For example,0.1selects 10 % of edges.
- Returns:
A partitioned
DistDatasetready to be served.- Return type:
- gigl.distributed.graph_store.storage_utils.run_storage_server(storage_rank, cluster_info, dataset, num_server_sessions, timeout_seconds=None, num_rpc_threads=16, rpc_timeout=None)[source]#
Spawn sequential storage-server sessions as subprocesses.
Each server session requires its own spawned process because you cannot re-connect to the same GLT server process after it has been joined. This function loops over num_server_sessions, spawning
_run_storage_server_session()as a subprocess each time and joining it before starting the next.- Parameters:
storage_rank (int) – Rank of this storage node in the storage cluster.
cluster_info (gigl.env.distributed.GraphStoreInfo) – Cluster topology information.
dataset (gigl.distributed.dist_dataset.DistDataset) – The
DistDatasetto serve.num_server_sessions (int) – Number of sequential server sessions to run (typically one per inference node type).
timeout_seconds (Optional[float]) – Timeout for joining each server subprocess.
Nonewaits indefinitely.num_rpc_threads (int) – The number of RPC threads to use for the server. This is the maximum number of concurrent RPC requests that the server can handle. Should be set to the maximum number of concurrent RPCs a server must handle, in practice, the compute world size is an upper bound.
rpc_timeout (Optional[int]) – The max timeout in seconds for remote RPC requests. If
None, uses theinit_serverdefault of 180 seconds. If there are long running RPCs (e.g. producer creation), and they timeout, then this parameter should be increased to avoid timeout errors.
- Return type:
None