gigl.distributed.graph_store.remote_channel#
GiGL-owned remote receiving channel for graph-store sampling.
This mirrors GLT’s RemoteReceivingChannel [1] behavior, but routes fetch RPCs
through GiGL server methods so channel-based sampling works with shared
producers.
[1] alibaba/graphlearn-for-pytorch
Attributes#
Classes#
A base class that initializes a channel for sample messages and |
Module Contents#
- class gigl.distributed.graph_store.remote_channel.RemoteReceivingChannel(server_rank, channel_id, prefetch_size=2, active_mask=None, pin_memory=False)[source]#
Bases:
graphlearn_torch.channel.ChannelBaseA base class that initializes a channel for sample messages and provides
send()andrecv()routines.Pull-based receiving channel that fetches sampled messages from servers.
- Parameters:
server_rank (Union[int, list[int]]) – Target storage server rank(s).
channel_id (Union[int, list[int]]) – Sampling channel id(s), one per server rank.
prefetch_size (int) – Number of in-flight fetch requests per server.
active_mask (Optional[list[bool]]) – Optional per-server mask indicating which channels can produce at least one batch this epoch. Inactive servers are treated as already finished and are never polled.
pin_memory (bool) – If True, copy received tensors to CUDA-pinned host memory before returning from
recv(). Enables faster GPU transfers via DMA in the downstream collate function.
Differences from GLT’s
RemoteReceivingChannel.__init__:active_maskparameter: marks servers with no data this epoch so they are never polled;server_end_of_epochis initialized from this mask instead of all-False.pin_memoryparameter: whenTrue,recv()copies tensors to CUDA-pinned host memory for faster DMA-based GPU transfers.Explicit
ValueErrorvalidation replaces bareassertfor length checks onserver_rank/channel_id/active_mask.Recv-count logging state (
_recv_count,_log_every_n) supports periodic timing telemetry added torecv().Typed queue:
Queue[tuple[Optional[SampleMessage], bool, int]].
- recv(**kwargs)[source]#
Pull-based receiving channel that fetches sampled messages from servers.
This method blocks until a message is available or the epoch ends.
- Parameters:
kwargs (object) – Additional keyword arguments - unused.
- Returns:
The sampled message.
- Raises:
StopIteration – If the epoch ends and no messages are available.
Exception – If the future fails.
- Return type:
graphlearn_torch.channel.SampleMessage