gigl.distributed.graph_store.dist_server#

GiGL implementation of GLT DistServer.

Uses GiGL’s DistSamplingProducer which supports neighbor sampling and ABLP (anchor-based link prediction) via the DistNeighborSampler.

Based on alibaba/graphlearn-for-pytorch

Attributes#

R

SERVER_EXIT_STATUS_CHECK_INTERVAL

Interval (in seconds) to check exit status of server.

logger

Classes#

DistServer

A server that supports launching remote sampling workers for

Functions#

get_server()

Get the DistServer instance on the current process.

init_server(num_servers, server_rank, dataset, ...[, ...])

Initialize the current process as a server and establish connections

wait_and_shutdown_server()

Block until all client have been shutdowned, and further shutdown the

Module Contents#

class gigl.distributed.graph_store.dist_server.DistServer(dataset)[source]#

A server that supports launching remote sampling workers for training clients.

Note that this server is enabled only when the distribution mode is a server-client framework, and the graph and feature store will be partitioned and managed by all server nodes.

Parameters:

dataset (DistDataset) – The DistDataset object of a partition of graph data and feature data, along with distributed patition books.

create_sampling_producer(sampler_input, sampling_config, worker_options, sampler_options)[source]#

Create and initialize an instance of DistSamplingProducer with a group of subprocesses for distributed sampling.

Supports both standard NodeSamplerInput and ABLPNodeSamplerInput through the unified DistNeighborSampler.

Parameters:
  • (NodeSamplerInput (sampler_input) – or ABLPNodeSamplerInput): The input data for sampling.

  • EdgeSamplerInput – or ABLPNodeSamplerInput): The input data for sampling.

  • RemoteSamplerInput – or ABLPNodeSamplerInput): The input data for sampling.

  • sampler_input (Union[graphlearn_torch.sampler.NodeSamplerInput, graphlearn_torch.sampler.EdgeSamplerInput, graphlearn_torch.sampler.RemoteSamplerInput, gigl.distributed.sampler.ABLPNodeSamplerInput])

  • sampling_config (graphlearn_torch.sampler.SamplingConfig)

  • worker_options (graphlearn_torch.distributed.RemoteDistSamplingWorkerOptions)

  • sampler_options (gigl.distributed.sampler_options.SamplerOptions)

Return type:

int

:param : or ABLPNodeSamplerInput): The input data for sampling. :param sampling_config: Configuration of sampling meta info. :type sampling_config: SamplingConfig :param worker_options: Options for launching

remote sampling workers by this server.

Parameters:
  • sampler_options (SamplerOptions) – Controls which sampler class is instantiated.

  • sampler_input (Union[graphlearn_torch.sampler.NodeSamplerInput, graphlearn_torch.sampler.EdgeSamplerInput, graphlearn_torch.sampler.RemoteSamplerInput, gigl.distributed.sampler.ABLPNodeSamplerInput])

  • sampling_config (graphlearn_torch.sampler.SamplingConfig)

  • worker_options (RemoteDistSamplingWorkerOptions)

Returns:

A unique id of created sampling producer on this server.

Return type:

int

destroy_sampling_producer(producer_id)[source]#

Shutdown and destroy a sampling producer managed by this server with its producer id.

Parameters:

producer_id (int)

Return type:

None

exit()[source]#

Set the exit flag to True.

Return type:

bool

fetch_one_sampled_message(producer_id)[source]#

Fetch a sampled message from the buffer of a specific sampling producer with its producer id.

Parameters:

producer_id (int)

Return type:

tuple[Optional[graphlearn_torch.channel.SampleMessage], bool]

get_ablp_input(request)[source]#

Get the ABLP (Anchor Based Link Prediction) input for a specific rank in distributed processing.

Note: rank and world_size here are for the process group we’re fetching for, not the process group we’re fetching from. e.g. if our compute cluster is of world size 4, and we have 2 storage nodes, then the world size this gets called with is 4, not 2.

Parameters:

request (gigl.distributed.graph_store.messages.FetchABLPInputRequest) – The ABLP fetch request, including split, node type, supervision edge type, and round-robin rank/world_size.

Returns:

A tuple containing the anchor nodes for the rank, the positive labels, and the negative labels. The positive labels are of shape [N, M], where N is the number of anchor nodes and M is the number of positive labels. The negative labels are of shape [N, M], where N is the number of anchor nodes and M is the number of negative labels. The negative labels may be None if no negative labels are available.

Raises:

ValueError – If the split is invalid.

Return type:

tuple[torch.Tensor, torch.Tensor, Optional[torch.Tensor]]

get_dataset_meta()[source]#

Get the meta info of the distributed dataset managed by the current server, including partition info and graph types.

Return type:

tuple[int, int, Optional[list[gigl.src.common.types.graph_data.NodeType]], Optional[list[gigl.src.common.types.graph_data.EdgeType]]]

get_edge_dir()[source]#

Get the edge direction from the dataset.

Returns:

The edge direction.

Return type:

Literal[‘in’, ‘out’]

get_edge_feature_info()[source]#

Get edge feature information from the dataset.

Returns:

  • A single FeatureInfo object for homogeneous graphs

  • A dict mapping EdgeType to FeatureInfo for heterogeneous graphs

  • None if no edge features are available

Return type:

Edge feature information, which can be

get_edge_index(edge_type, layout)[source]#
Parameters:
Return type:

tuple[torch.Tensor, torch.Tensor]

get_edge_partition_book(edge_type)[source]#

Gets the partition book for the specified edge type. :param edge_type: The edge type to look up. Must be None for

homogeneous datasets and non-None for heterogeneous ones.

Returns:

The partition book for the requested edge type, or None if no partition book is available.

Raises:

ValueError – If edge_type is mismatched with the dataset type.

Parameters:

edge_type (Optional[gigl.src.common.types.graph_data.EdgeType])

Return type:

Optional[graphlearn_torch.partition.PartitionBook]

get_edge_size(edge_type, layout)[source]#
Parameters:
Return type:

tuple[int, int]

get_edge_types()[source]#

Get the edge types from the dataset.

Returns:

The edge types in the dataset, None if the dataset is homogeneous.

Return type:

Optional[list[gigl.src.common.types.graph_data.EdgeType]]

get_node_feature(node_type, index)[source]#
Parameters:
  • node_type (Optional[gigl.src.common.types.graph_data.NodeType])

  • index (torch.Tensor)

Return type:

torch.Tensor

get_node_feature_info()[source]#

Get node feature information from the dataset.

Returns:

  • A single FeatureInfo object for homogeneous graphs

  • A dict mapping NodeType to FeatureInfo for heterogeneous graphs

  • None if no node features are available

Return type:

Node feature information, which can be

get_node_ids(request)[source]#

Get the node ids from the dataset.

Parameters:

request (gigl.distributed.graph_store.messages.FetchNodesRequest) – The node-fetch request, including split, node type, and round-robin rank/world_size.

Returns:

The node ids.

Raises:
  • ValueError

    • If the rank and world_size are not provided together

    • If the split is invalid

    • If the node ids are not a torch.Tensor or a dict[NodeType, torch.Tensor]

    • If the node type is provided for a homogeneous dataset

    • If the node ids are not a dict[NodeType, torch.Tensor] when no node type is provided

  • Note – When split=None, all nodes are queryable. This means nodes from any

  • split (train, val, or test) may be returned. This is useful when you need

  • to sample neighbors during inference, as neighbor nodes may belong to any split.

Return type:

torch.Tensor

get_node_label(node_type, index)[source]#
Parameters:
  • node_type (Optional[gigl.src.common.types.graph_data.NodeType])

  • index (torch.Tensor)

Return type:

torch.Tensor

get_node_partition_book(node_type)[source]#

Gets the partition book for the specified node type.

Parameters:

node_type (Optional[gigl.src.common.types.graph_data.NodeType]) – The node type to look up. Must be None for homogeneous datasets and non-None for heterogeneous ones.

Returns:

The partition book for the requested node type, or None if no partition book is available.

Raises:

ValueError – If node_type is mismatched with the dataset type.

Return type:

Optional[graphlearn_torch.partition.PartitionBook]

get_node_partition_id(node_type, index)[source]#
Parameters:
  • node_type (Optional[gigl.src.common.types.graph_data.NodeType])

  • index (torch.Tensor)

Return type:

Optional[torch.Tensor]

get_node_types()[source]#

Get the node types from the dataset.

Returns:

The node types in the dataset, None if the dataset is homogeneous.

Return type:

Optional[list[gigl.src.common.types.graph_data.NodeType]]

get_tensor_size(node_type)[source]#
Parameters:

node_type (Optional[gigl.src.common.types.graph_data.NodeType])

Return type:

torch.Size

shutdown()[source]#
Return type:

None

start_new_epoch_sampling(producer_id, epoch)[source]#

Start a new epoch sampling tasks for a specific sampling producer with its producer id.

Parameters:
  • producer_id (int)

  • epoch (int)

Return type:

None

wait_for_exit()[source]#

Block until the exit flag been set to True.

Return type:

None

dataset[source]#
gigl.distributed.graph_store.dist_server.get_server()[source]#

Get the DistServer instance on the current process.

Return type:

DistServer

gigl.distributed.graph_store.dist_server.init_server(num_servers, server_rank, dataset, master_addr, master_port, num_clients=0, num_rpc_threads=16, request_timeout=180, server_group_name=None, is_dynamic=False)[source]#

Initialize the current process as a server and establish connections with all other servers and clients. Note that this method should be called only in the server-client distribution mode.

Parameters:
  • num_servers (int) – Number of processes participating in the server group.

  • server_rank (int) – Rank of the current process withing the server group (it should be a number between 0 and num_servers-1).

  • dataset (DistDataset) – The DistDataset object of a partition of graph data and feature data, along with distributed patition book info.

  • master_addr (str) – The master TCP address for RPC connection between all servers and clients, the value of this parameter should be same for all servers and clients.

  • master_port (int) – The master TCP port for RPC connection between all servers and clients, the value of this parameter should be same for all servers and clients.

  • num_clients (int) – Number of processes participating in the client group. if is_dynamic is True, this parameter will be ignored.

  • num_rpc_threads (int) – The number of RPC worker threads used for the current server to respond remote requests. (Default: 16).

  • request_timeout (int) – The max timeout seconds for remote requests, otherwise an exception will be raised. (Default: 16).

  • server_group_name (str) – A unique name of the server group that current process belongs to. If set to None, a default name will be used. (Default: None).

  • is_dynamic (bool) – Whether the world size is dynamic. (Default: False).

Return type:

None

gigl.distributed.graph_store.dist_server.wait_and_shutdown_server()[source]#

Block until all client have been shutdowned, and further shutdown the server on the current process and destroy all RPC connections.

Return type:

None

gigl.distributed.graph_store.dist_server.R[source]#
gigl.distributed.graph_store.dist_server.SERVER_EXIT_STATUS_CHECK_INTERVAL = 5.0[source]#

Interval (in seconds) to check exit status of server.

gigl.distributed.graph_store.dist_server.logger[source]#