gigl.distributed.graph_store.remote_channel#

GiGL-owned remote receiving channel for graph-store sampling.

This mirrors GLT’s RemoteReceivingChannel [1] behavior, but routes fetch RPCs through GiGL server methods so channel-based sampling works with shared producers.

[1] alibaba/graphlearn-for-pytorch

Attributes#

Classes#

RemoteReceivingChannel

A base class that initializes a channel for sample messages and

Module Contents#

class gigl.distributed.graph_store.remote_channel.RemoteReceivingChannel(server_rank, channel_id, prefetch_size=2, active_mask=None, pin_memory=False)[source]#

Bases: graphlearn_torch.channel.ChannelBase

A base class that initializes a channel for sample messages and provides send() and recv() routines.

Pull-based receiving channel that fetches sampled messages from servers.

Parameters:
  • server_rank (Union[int, list[int]]) – Target storage server rank(s).

  • channel_id (Union[int, list[int]]) – Sampling channel id(s), one per server rank.

  • prefetch_size (int) – Number of in-flight fetch requests per server.

  • active_mask (Optional[list[bool]]) – Optional per-server mask indicating which channels can produce at least one batch this epoch. Inactive servers are treated as already finished and are never polled.

  • pin_memory (bool) – If True, copy received tensors to CUDA-pinned host memory before returning from recv(). Enables faster GPU transfers via DMA in the downstream collate function.

Differences from GLT’s RemoteReceivingChannel.__init__:

  • active_mask parameter: marks servers with no data this epoch so they are never polled; server_end_of_epoch is initialized from this mask instead of all-False.

  • pin_memory parameter: when True, recv() copies tensors to CUDA-pinned host memory for faster DMA-based GPU transfers.

  • Explicit ValueError validation replaces bare assert for length checks on server_rank / channel_id / active_mask.

  • Recv-count logging state (_recv_count, _log_every_n) supports periodic timing telemetry added to recv().

  • Typed queue: Queue[tuple[Optional[SampleMessage], bool, int]].

recv(**kwargs)[source]#

Pull-based receiving channel that fetches sampled messages from servers.

This method blocks until a message is available or the epoch ends.

Parameters:

kwargs (object) – Additional keyword arguments - unused.

Returns:

The sampled message.

Raises:
  • StopIteration – If the epoch ends and no messages are available.

  • Exception – If the future fails.

Return type:

graphlearn_torch.channel.SampleMessage

reset()[source]#

Reset all state to start a new epoch.

Return type:

None

send(msg, **kwargs)[source]#
Send a sample message into channel, the implemented channel should

porcess this message data properly.

Parameters:
  • msg (graphlearn_torch.channel.SampleMessage) – The sample message to send.

  • kwargs (object)

Return type:

None

gigl.distributed.graph_store.remote_channel.logger[source]#