gigl.distributed.graph_store.dist_server#
GiGL implementation of GLT DistServer.
Uses GiGL’s DistSamplingProducer which supports neighbor sampling and ABLP (anchor-based link prediction) via the DistNeighborSampler.
Based on alibaba/graphlearn-for-pytorch
Attributes#
Interval (in seconds) to check exit status of server. |
|
Classes#
A server that supports launching remote sampling workers for |
Functions#
Get the |
|
|
Initialize the current process as a server and establish connections |
Block until all client have been shutdowned, and further shutdown the |
Module Contents#
- class gigl.distributed.graph_store.dist_server.DistServer(dataset)[source]#
A server that supports launching remote sampling workers for training clients.
Note that this server is enabled only when the distribution mode is a server-client framework, and the graph and feature store will be partitioned and managed by all server nodes.
- Parameters:
dataset (DistDataset) – The
DistDatasetobject of a partition of graph data and feature data, along with distributed patition books.
- create_sampling_producer(sampler_input, sampling_config, worker_options, sampler_options)[source]#
Create and initialize an instance of
DistSamplingProducerwith a group of subprocesses for distributed sampling.Supports both standard
NodeSamplerInputandABLPNodeSamplerInputthrough the unifiedDistNeighborSampler.- Parameters:
(NodeSamplerInput (sampler_input) – or ABLPNodeSamplerInput): The input data for sampling.
EdgeSamplerInput – or ABLPNodeSamplerInput): The input data for sampling.
RemoteSamplerInput – or ABLPNodeSamplerInput): The input data for sampling.
sampler_input (Union[graphlearn_torch.sampler.NodeSamplerInput, graphlearn_torch.sampler.EdgeSamplerInput, graphlearn_torch.sampler.RemoteSamplerInput, gigl.distributed.sampler.ABLPNodeSamplerInput])
sampling_config (graphlearn_torch.sampler.SamplingConfig)
worker_options (graphlearn_torch.distributed.RemoteDistSamplingWorkerOptions)
sampler_options (gigl.distributed.sampler_options.SamplerOptions)
- Return type:
int
:param : or ABLPNodeSamplerInput): The input data for sampling. :param sampling_config: Configuration of sampling meta info. :type sampling_config: SamplingConfig :param worker_options: Options for launching
remote sampling workers by this server.
- Parameters:
sampler_options (SamplerOptions) – Controls which sampler class is instantiated.
sampler_input (Union[graphlearn_torch.sampler.NodeSamplerInput, graphlearn_torch.sampler.EdgeSamplerInput, graphlearn_torch.sampler.RemoteSamplerInput, gigl.distributed.sampler.ABLPNodeSamplerInput])
sampling_config (graphlearn_torch.sampler.SamplingConfig)
worker_options (RemoteDistSamplingWorkerOptions)
- Returns:
A unique id of created sampling producer on this server.
- Return type:
int
- destroy_sampling_producer(producer_id)[source]#
Shutdown and destroy a sampling producer managed by this server with its producer id.
- Parameters:
producer_id (int)
- Return type:
None
- fetch_one_sampled_message(producer_id)[source]#
Fetch a sampled message from the buffer of a specific sampling producer with its producer id.
- Parameters:
producer_id (int)
- Return type:
tuple[Optional[graphlearn_torch.channel.SampleMessage], bool]
- get_ablp_input(request)[source]#
Get the ABLP (Anchor Based Link Prediction) input for a specific rank in distributed processing.
Note: rank and world_size here are for the process group we’re fetching for, not the process group we’re fetching from. e.g. if our compute cluster is of world size 4, and we have 2 storage nodes, then the world size this gets called with is 4, not 2.
- Parameters:
request (gigl.distributed.graph_store.messages.FetchABLPInputRequest) – The ABLP fetch request, including split, node type, supervision edge type, and round-robin rank/world_size.
- Returns:
A tuple containing the anchor nodes for the rank, the positive labels, and the negative labels. The positive labels are of shape [N, M], where N is the number of anchor nodes and M is the number of positive labels. The negative labels are of shape [N, M], where N is the number of anchor nodes and M is the number of negative labels. The negative labels may be None if no negative labels are available.
- Raises:
ValueError – If the split is invalid.
- Return type:
tuple[torch.Tensor, torch.Tensor, Optional[torch.Tensor]]
- get_dataset_meta()[source]#
Get the meta info of the distributed dataset managed by the current server, including partition info and graph types.
- Return type:
tuple[int, int, Optional[list[gigl.src.common.types.graph_data.NodeType]], Optional[list[gigl.src.common.types.graph_data.EdgeType]]]
- get_edge_dir()[source]#
Get the edge direction from the dataset.
- Returns:
The edge direction.
- Return type:
Literal[‘in’, ‘out’]
- get_edge_feature_info()[source]#
Get edge feature information from the dataset.
- Returns:
A single FeatureInfo object for homogeneous graphs
A dict mapping EdgeType to FeatureInfo for heterogeneous graphs
None if no edge features are available
- Return type:
Edge feature information, which can be
- get_edge_index(edge_type, layout)[source]#
- Parameters:
edge_type (Optional[gigl.src.common.types.graph_data.EdgeType])
layout (str)
- Return type:
tuple[torch.Tensor, torch.Tensor]
- get_edge_partition_book(edge_type)[source]#
Gets the partition book for the specified edge type. :param edge_type: The edge type to look up. Must be
Noneforhomogeneous datasets and non-
Nonefor heterogeneous ones.- Returns:
The partition book for the requested edge type, or
Noneif no partition book is available.- Raises:
ValueError – If
edge_typeis mismatched with the dataset type.- Parameters:
edge_type (Optional[gigl.src.common.types.graph_data.EdgeType])
- Return type:
Optional[graphlearn_torch.partition.PartitionBook]
- get_edge_size(edge_type, layout)[source]#
- Parameters:
edge_type (Optional[gigl.src.common.types.graph_data.EdgeType])
layout (str)
- Return type:
tuple[int, int]
- get_edge_types()[source]#
Get the edge types from the dataset.
- Returns:
The edge types in the dataset, None if the dataset is homogeneous.
- Return type:
Optional[list[gigl.src.common.types.graph_data.EdgeType]]
- get_node_feature(node_type, index)[source]#
- Parameters:
node_type (Optional[gigl.src.common.types.graph_data.NodeType])
index (torch.Tensor)
- Return type:
torch.Tensor
- get_node_feature_info()[source]#
Get node feature information from the dataset.
- Returns:
A single FeatureInfo object for homogeneous graphs
A dict mapping NodeType to FeatureInfo for heterogeneous graphs
None if no node features are available
- Return type:
Node feature information, which can be
- get_node_ids(request)[source]#
Get the node ids from the dataset.
- Parameters:
request (gigl.distributed.graph_store.messages.FetchNodesRequest) – The node-fetch request, including split, node type, and round-robin rank/world_size.
- Returns:
The node ids.
- Raises:
ValueError –
If the rank and world_size are not provided together
If the split is invalid
If the node ids are not a torch.Tensor or a dict[NodeType, torch.Tensor]
If the node type is provided for a homogeneous dataset
If the node ids are not a dict[NodeType, torch.Tensor] when no node type is provided
Note – When split=None, all nodes are queryable. This means nodes from any
split (train, val, or test) may be returned. This is useful when you need –
to sample neighbors during inference, as neighbor nodes may belong to any split. –
- Return type:
torch.Tensor
- get_node_label(node_type, index)[source]#
- Parameters:
node_type (Optional[gigl.src.common.types.graph_data.NodeType])
index (torch.Tensor)
- Return type:
torch.Tensor
- get_node_partition_book(node_type)[source]#
Gets the partition book for the specified node type.
- Parameters:
node_type (Optional[gigl.src.common.types.graph_data.NodeType]) – The node type to look up. Must be
Nonefor homogeneous datasets and non-Nonefor heterogeneous ones.- Returns:
The partition book for the requested node type, or
Noneif no partition book is available.- Raises:
ValueError – If
node_typeis mismatched with the dataset type.- Return type:
Optional[graphlearn_torch.partition.PartitionBook]
- get_node_partition_id(node_type, index)[source]#
- Parameters:
node_type (Optional[gigl.src.common.types.graph_data.NodeType])
index (torch.Tensor)
- Return type:
Optional[torch.Tensor]
- get_node_types()[source]#
Get the node types from the dataset.
- Returns:
The node types in the dataset, None if the dataset is homogeneous.
- Return type:
Optional[list[gigl.src.common.types.graph_data.NodeType]]
- get_tensor_size(node_type)[source]#
- Parameters:
node_type (Optional[gigl.src.common.types.graph_data.NodeType])
- Return type:
torch.Size
- gigl.distributed.graph_store.dist_server.get_server()[source]#
Get the
DistServerinstance on the current process.- Return type:
- gigl.distributed.graph_store.dist_server.init_server(num_servers, server_rank, dataset, master_addr, master_port, num_clients=0, num_rpc_threads=16, request_timeout=180, server_group_name=None, is_dynamic=False)[source]#
Initialize the current process as a server and establish connections with all other servers and clients. Note that this method should be called only in the server-client distribution mode.
- Parameters:
num_servers (int) – Number of processes participating in the server group.
server_rank (int) – Rank of the current process withing the server group (it should be a number between 0 and
num_servers-1).dataset (DistDataset) – The
DistDatasetobject of a partition of graph data and feature data, along with distributed patition book info.master_addr (str) – The master TCP address for RPC connection between all servers and clients, the value of this parameter should be same for all servers and clients.
master_port (int) – The master TCP port for RPC connection between all servers and clients, the value of this parameter should be same for all servers and clients.
num_clients (int) – Number of processes participating in the client group. if
is_dynamicisTrue, this parameter will be ignored.num_rpc_threads (int) – The number of RPC worker threads used for the current server to respond remote requests. (Default:
16).request_timeout (int) – The max timeout seconds for remote requests, otherwise an exception will be raised. (Default:
16).server_group_name (str) – A unique name of the server group that current process belongs to. If set to
None, a default name will be used. (Default:None).is_dynamic (bool) – Whether the world size is dynamic. (Default:
False).
- Return type:
None
- gigl.distributed.graph_store.dist_server.wait_and_shutdown_server()[source]#
Block until all client have been shutdowned, and further shutdown the server on the current process and destroy all RPC connections.
- Return type:
None