gigl.common.data.export#
Utility functions for exporting embeddings and predictions to Google Cloud Storage and BigQuery.
Note that we use avro files here since due to testing they are quicker to generate and upload compared to parquet files.
However, if we switch to an on-line upload scheme, where we upload the embeddings as they are generated, then we should look into if parquet or orc files are more performant in that modality.
Attributes#
Classes#
| Initializes an EmbeddingExporter instance, which will write embeddings to gcs with an embedding avro schema for | |
| Initializes a BaseGcsExporter instance. | |
| Initializes a PredictionExporter instance, which will write predictions to gcs with a prediction avro schema for | 
Functions#
| 
 | Loads multiple Avro files containing GNN embeddings from GCS into BigQuery. | 
| 
 | Loads multiple Avro files containing GNN predictions from GCS into BigQuery. | 
Module Contents#
- class gigl.common.data.export.EmbeddingExporter(export_dir, file_prefix=None, min_shard_size_threshold_bytes=0)[source]#
- Bases: - GcsExporter- Initializes an EmbeddingExporter instance, which will write embeddings to gcs with an embedding avro schema for writing an array of floats per record. - Parameters:
- export_dir (Uri) – URI where the Avro files will be uploaded. 
- file_prefix (Optional[str]) – An optional prefix to add to the file name. If provided then the 
- min_shard_size_threshold_bytes (int) – The minimum size in bytes at which the buffer will be flushed to GCS. 
 
 - add_embedding(id_batch, embedding_batch, embedding_type)[source]#
- Adds to the in-memory buffer the integer IDs and their corresponding embeddings. - Parameters:
- id_batch (torch.Tensor) – A torch.Tensor containing integer IDs. 
- embedding_batch (torch.Tensor) – A torch.Tensor containing embeddings corresponding to the integer IDs in id_batch. 
- embedding_type (str) – A tag for the type of the embeddings, e.g., ‘user’, ‘content’, etc. 
 
 
 
- class gigl.common.data.export.GcsExporter(export_dir, avro_schema, file_prefix=None, min_shard_size_threshold_bytes=0)[source]#
- Initializes a BaseGcsExporter instance. - Note that after every flush, either via exiting a context manager, by calling flush_records(), or when the buffer reaches the file_flush_threshold, a new avro file will be created, and subsequent calls to add_record will add to the new file. This means that after all records have been added the export_dir may look like the below: - gs://my_bucket/records/ ├── shard_00000000.avro ├── shard_00000001.avro └── shard_00000002.avro :param export_dir: URI where the Avro files will be uploaded. - If a GCS URI, this should be a fully qualified GCS path, e.g., ‘gs://bucket_name/path/to/’. If a local URI (e.g. /tmp/gigl/records), then the directory will be created when GcsExporter is initialized. - Parameters:
- file_prefix (Optional[str]) – An optional prefix to add to the file name. If provided then the the file names will be like $file_prefix_shard_00000000.avro. 
- min_shard_size_threshold_bytes (int) – The minimum size in bytes at which the buffer will be flushed to GCS. The buffer will contain the entire batch of records that caused it to reach the threshold, so the file sizes on GCS may be larger than this value. If set to zero, the default, then the buffer will be flushed only when flush_records is called or when the context manager is exited. An error will be thrown if this value is negative. Note that for the last shard, the buffer may be much smaller than this limit. 
- export_dir (Uri) 
- avro_schema (fastavro.types.Schema) 
 
 
- class gigl.common.data.export.PredictionExporter(export_dir, file_prefix=None, min_shard_size_threshold_bytes=0)[source]#
- Bases: - GcsExporter- Initializes a PredictionExporter instance, which will write predictions to gcs with a prediction avro schema for writing a single float per record. - Parameters:
- export_dir (Uri) – URI where the Avro files will be uploaded. 
- file_prefix (Optional[str]) – An optional prefix to add to the file name. If provided then the 
- min_shard_size_threshold_bytes (int) – The minimum size in bytes at which the buffer will be flushed to GCS. 
 
 - add_prediction(id_batch, prediction_batch, prediction_type)[source]#
- Adds to the in-memory buffer the integer IDs and their corresponding predictions. - Parameters:
- id_batch (torch.Tensor) – A torch.Tensor containing integer IDs. 
- prediction_batch (torch.Tensor) – A torch.Tensor containing predictions corresponding to the integer IDs in id_batch. 
- prediction_type (str) – A tag for the type of the predictions, e.g., ‘user’, ‘content’, etc. 
 
 
 
- gigl.common.data.export.load_embeddings_to_bigquery(gcs_folder, project_id, dataset_id, table_id, should_run_async=False)[source]#
- Loads multiple Avro files containing GNN embeddings from GCS into BigQuery. - Note that this function will upload all Avro files in the GCS folder to BigQuery, recursively. So if you specify gcs_folder to be gs://MY BUCKET/embeddings/ and if we have some nested directories, e.g.: - gs://MY BUCKET/embeddings/shard_0000.avro gs://MY BUCKET/embeddings/nested/shard_0001.avro - Both files will be uploaded to BigQuery. - Parameters:
- gcs_folder (GcsUri) – The GCS folder containing the Avro files with embeddings. 
- project_id (str) – The GCP project ID. 
- dataset_id (str) – The BigQuery dataset ID. 
- table_id (str) – The BigQuery table ID. 
- should_run_async (bool) – Whether loading to bigquery step should happen asynchronously. Defaults to False. 
 
- Returns:
- A BigQuery LoadJob object representing the load operation, which allows user to monitor and retrieve details about the job status and result. The returned job will be done if should_run_async=False and will be returned immediately after creation (not necessarily complete) if should_run_asnyc=True. 
- Return type:
- LoadJob 
 
- gigl.common.data.export.load_predictions_to_bigquery(gcs_folder, project_id, dataset_id, table_id, should_run_async=False)[source]#
- Loads multiple Avro files containing GNN predictions from GCS into BigQuery. - Note that this function will upload all Avro files in the GCS folder to BigQuery, recursively. So if you specify gcs_folder to be gs://MY BUCKET/predictions/ and if we have some nested directories, e.g.: - gs://MY BUCKET/predictions/shard_0000.avro gs://MY BUCKET/predictions/nested/shard_0001.avro - Both files will be uploaded to BigQuery. - Parameters:
- gcs_folder (GcsUri) – The GCS folder containing the Avro files with predictions. 
- project_id (str) – The GCP project ID. 
- dataset_id (str) – The BigQuery dataset ID. 
- table_id (str) – The BigQuery table ID. 
- should_run_async (bool) – Whether loading to bigquery step should happen asynchronously. Defaults to False. 
 
- Returns:
- A BigQuery LoadJob object representing the load operation, which allows user to monitor and retrieve details about the job status and result. The returned job will be done if should_run_async=False and will be returned immediately after creation (not necessarily complete) if should_run_asnyc=True. 
- Return type:
- LoadJob 
 
- gigl.common.data.export.EMBEDDING_BIGQUERY_SCHEMA: Final[Sequence[google.cloud.bigquery.SchemaField]][source]#
