gigl.common.data.export#
Utility functions for exporting embeddings to Google Cloud Storage and BigQuery.
Note that we use avro files here since due to testing they are quicker to generate and upload compared to parquet files.
However, if we switch to an on-line upload scheme, where we upload the embeddings as they are generated, then we should look into if parquet or orc files are more performant in that modality.
Attributes#
Classes#
Initializes an EmbeddingExporter instance. |
Functions#
|
Loads multiple Avro files containing GNN embeddings from GCS into BigQuery. |
Module Contents#
- class gigl.common.data.export.EmbeddingExporter(export_dir, file_prefix=None, min_shard_size_threshold_bytes=0)[source]#
Initializes an EmbeddingExporter instance.
Note that after every flush, either via exiting a context manager, by calling flush_embeddings(), or when the buffer reaches the file_flush_threshold, a new avro file will be created, and subsequent calls to add_embedding will add to the new file. This means that after all embeddings have been added the export_dir may look like the below:
gs://my_bucket/embeddings/ ├── shard_00000000.avro ├── shard_00000001.avro └── shard_00000002.avro
- Parameters:
export_dir (GcsUri) – The Google Cloud Storage URI where the Avro files will be uploaded. This should be a fully qualified GCS path, e.g., ‘gs://bucket_name/path/to/’.
file_prefix (Optional[str]) – An optional prefix to add to the file name. If provided then the the file names will be like $file_prefix_shard_00000000.avro.
min_shard_size_threshold_bytes (int) – The minimum size in bytes at which the buffer will be flushed to GCS. The buffer will contain the entire batch of embeddings that caused it to reach the threshold, so the file sizes on GCS may be larger than this value. If set to zero, the default, then the buffer will be flushed only when flush_embeddings is called or when the context manager is exited. An error will be thrown if this value is negative. Note that for the last shared, the buffer may be much smaller than this limit.
- add_embedding(id_batch, embedding_batch, embedding_type)[source]#
Adds to the in-memory buffer the integer IDs and their corresponding embeddings.
- Parameters:
id_batch (torch.Tensor) – A torch.Tensor containing integer IDs.
embedding_batch (torch.Tensor) – A torch.Tensor containing embeddings corresponding to the integer IDs in id_batch.
embedding_type (str) – A tag for the type of the embeddings, e.g., ‘user’, ‘content’, etc.
- gigl.common.data.export.load_embeddings_to_bigquery(gcs_folder, project_id, dataset_id, table_id, should_run_async=False)[source]#
Loads multiple Avro files containing GNN embeddings from GCS into BigQuery.
Note that this function will upload all Avro files in the GCS folder to BigQuery, recursively. So if we have some nested directories, e.g.:
gs://MY BUCKET/embeddings/shard_0000.avro gs://MY BUCKET/embeddings/nested/shard_0001.avro
Both files will be uploaded to BigQuery.
- Parameters:
gcs_folder (GcsUri) – The GCS folder containing the Avro files with embeddings.
project_id (str) – The GCP project ID.
dataset_id (str) – The BigQuery dataset ID.
table_id (str) – The BigQuery table ID.
should_run_async (bool) – Whether loading to bigquery step should happen asynchronously. Defaults to False.
- Returns:
A BigQuery LoadJob object representing the load operation, which allows user to monitor and retrieve details about the job status and result. The returned job will be done if should_run_async=False and will be returned immediately after creation (not necessarily complete) if should_run_asnyc=True.
- Return type:
LoadJob