MAG240M E2E Example#
%cd ../..
# We need to change the working directory to the root of GiGL repo so we can import the necessary modules/scripts used below
Setting up GCP Project and configs#
Assuming you have a GCP project setup:
Open up
resource_config.yaml
and fill all relevant fields undercommon_compute_config
:
project
region
temp_assets_bucket
temp_regional_assets_bucket
perm_assets_bucket
temp_assets_bq_dataset_name
embedding_bq_dataset_name
gcp_service_account_email
Ensure your service account has relevant perms (A non-exaustive list):
roles/bigquery.user
roles/cloudprofiler.user
roles/compute.admin
roles/dataflow.admin
roles/dataflow.worker
roles/dataproc.editor
roles/logging.logWriter
roles/monitoring.metricWriter
roles/notebooks.legacyViewer
roles/aiplatform.user
roles/dataproc.worker
roles/storage.objectAdmin : on relevant buckets
roles/artifactregistry.reader
roles/artifactregistry.writer
from gigl.common import LocalUri, GcsUri, Uri
from gigl.env.pipelines_config import get_resource_config
from gigl.src.common.types.pb_wrappers.gigl_resource_config import GiglResourceConfigWrapper
from gigl.src.common.types.pb_wrappers.gbml_config import GbmlConfigPbWrapper
import datetime
# Firstly, let's give your job a name and ensure that the resource and task configs exist and can be loaded
JOB_NAME = "test_mag240m"
TEMPLATE_TASK_CONFIG_URI = LocalUri("examples/MAG240M/task_config.yaml")
RESOURCE_CONFIG_URI = LocalUri("examples/MAG240M/resource_config.yaml")
TEMPLATE_TASK_CONFIG: GbmlConfigPbWrapper = GbmlConfigPbWrapper.get_gbml_config_pb_wrapper_from_uri(gbml_config_uri=TEMPLATE_TASK_CONFIG_URI)
RESOURCE_CONFIG: GiglResourceConfigWrapper = get_resource_config(resource_config_uri=RESOURCE_CONFIG_URI)
PROJECT = RESOURCE_CONFIG.project
print(f"Succesfully found task config and resource config. Script will help execute job: {JOB_NAME} on project: {PROJECT}")
# Lets run some basic checks to validate correctness of the task and resource config
from gigl.src.validation_check.config_validator import kfp_validation_checks
kfp_validation_checks(
job_name=JOB_NAME,
task_config_uri=TEMPLATE_TASK_CONFIG_URI,
resource_config_uri=RESOURCE_CONFIG_URI,
# config_populator is the first step in the pipeline; where we will populat the template task config specified above and generate a frozen config
start_at="config_populator"
)
Compiling Src Docker images#
You will need to build and push docker images with your custom code so that individual GiGL components can leverage your code. For this experiment we will consider the MAG240M specs and code to be “custom code”, and we will guide you how to build a docker image with the code.
We will make use of scripts/build_and_push_docker_image.py
for this.
Make note that this builds containers/Dockerfile.src
and containers/Dockerfile.dataflow.src
; which have instructions to COPY
the examples
folder - which contains all the source code for MAG240M, and it has all the GiGL src code.
from scripts.build_and_push_docker_image import build_and_push_cpu_image, build_and_push_cuda_image, build_and_push_dataflow_image
curr_datetime = datetime.datetime.now().strftime("%Y%m%d-%H%M%S")
DOCKER_IMAGE_DATAFLOW_RUNTIME_NAME_WITH_TAG = f"gcr.io/{PROJECT}/gigl_dataflow_runtime:{curr_datetime}"
DOCKER_IMAGE_MAIN_CUDA_NAME_WITH_TAG = f"gcr.io/{PROJECT}/gigl_cuda:{curr_datetime}"
DOCKER_IMAGE_MAIN_CPU_NAME_WITH_TAG = f"gcr.io/{PROJECT}/gigl_cpu:{curr_datetime}"
build_and_push_dataflow_image(
image_name=DOCKER_IMAGE_DATAFLOW_RUNTIME_NAME_WITH_TAG,
)
build_and_push_cuda_image(
image_name=DOCKER_IMAGE_MAIN_CUDA_NAME_WITH_TAG,
)
build_and_push_cpu_image(
image_name=DOCKER_IMAGE_MAIN_CPU_NAME_WITH_TAG,
)
print(f"""We built and pushed the following docker images:
- {DOCKER_IMAGE_DATAFLOW_RUNTIME_NAME_WITH_TAG}
- {DOCKER_IMAGE_MAIN_CUDA_NAME_WITH_TAG}
- {DOCKER_IMAGE_MAIN_CPU_NAME_WITH_TAG}
""")
We will instantiate local runner to help orchestrate the test pipeline#
from gigl.orchestration.local.runner import Runner, PipelineConfig
runner = Runner()
pipeline_config = PipelineConfig(
applied_task_identifier=JOB_NAME,
task_config_uri=TEMPLATE_TASK_CONFIG_URI,
resource_config_uri=RESOURCE_CONFIG_URI,
custom_cuda_docker_uri=DOCKER_IMAGE_MAIN_CUDA_NAME_WITH_TAG,
custom_cpu_docker_uri=DOCKER_IMAGE_MAIN_CPU_NAME_WITH_TAG,
dataflow_docker_uri=DOCKER_IMAGE_DATAFLOW_RUNTIME_NAME_WITH_TAG,
)
First we will run config populator#
The config populator takes in a template GbmlConfig
and outputs a frozen GbmlConfig
by populating all job related metadata paths in sharedConfig
. These are mostly GCS paths which the following components read and write from, and use as an intermediary data communication medium. For example, the field sharedConfig.trainedModelMetadata
is populated with a GCS URI, which indicates to the Trainer to write the trained model to this path, and to the Inferencer to read the model from this path
from gigl.src.common.utils.file_loader import FileLoader
frozen_config_uri = runner.run_config_populator(pipeline_config=pipeline_config)
frozen_config = GbmlConfigPbWrapper.get_gbml_config_pb_wrapper_from_uri(gbml_config_uri=frozen_config_uri)
file_loader = FileLoader()
print(f"Config Populator has successfully generated the following frozen config from the template ({TEMPLATE_TASK_CONFIG_URI}) :")
print(frozen_config.gbml_config_pb)
pipeline_config.task_config_uri = frozen_config_uri # We need to update the task config uri to the new frozen config uri
Next we run the preprocessor#
The Data Preprocessor reads node, edge and respective feature data from a data source, and produces preprocessed / transformed versions of all this data, for subsequent components to use. It uses Tensorflow Transform to achieve data transformation in a distributed fashion, and allows for transformations like categorical encoding, scaling, normalization, casting and more.
In this case we are using preprocessing spec defined in examples/MAG240M/preprocessor_config.py
- take a look for more details.
You will note that the preprocessor will create a few BQ jobs to prepare the node and edge tables, subsequently it will kick off TFT (dataflow) jobs to do the actual preprocessing. The preprocessor will: (1) create a preprocessing spec and dump it to path specified in frozen config sharedConfig.preprocessedMetadataUri
. (2) Respective Dataflow jobs will dump the preprocessed assets as .tfrecord
files to the paths specified inside the preprocessing spec preprocessedMetadataUri
# WARN: There is an issue when trying to run dataflow jobs from inside a jupyter kernel; thus we cannot use the line
# below to run the preprocessor as you would normally in a python script.
# runner.run_data_preprocessor(pipeline_config=pipeline_config)
# Instead, we will run the preprocessor from the command line.
# Note: You can actually do this with every component; we just make use of the runner to make it easier to run the components.
!python -m gigl.src.data_preprocessor.data_preprocessor \
--job_name=$JOB_NAME \
--task_config_uri=$frozen_config_uri \
--resource_config_uri=$RESOURCE_CONFIG_URI \
--custom_worker_image_uri=$DOCKER_IMAGE_DATAFLOW_RUNTIME_NAME_WITH_TAG
Next up is subgraph sampler#
The Subgraph Sampler receives node and edge data from Data Preprocessor and mainly generates k-hop localized subgraphs for each node in the graph. Basically, the Subgraph Sampler enables us to store the computation graph of each node independently without worrying about maintaining a huge graph in memory for down-stream components. It uses Spark/Scala and runs on a Dataproc cluster. Based on the predefined sample schema for each task, the output samples are serialized/saved in TFRecord format.
print(f"Will use the following subgraph sampler config:\n{TEMPLATE_TASK_CONFIG.gbml_config_pb.dataset_config.subgraph_sampler_config}")
print(f"The resources requested for the dataproc cluster (spark job) are as follows:\n{RESOURCE_CONFIG.subgraph_sampler_config}")
runner.run_subgraph_sampler(
pipeline_config=pipeline_config,
)
Running Split Generator#
The Split Generator reads localized subgraph samples produced by Subgraph Sampler, and executes logic to split the data into training, validation and test sets. The semantics of which nodes and edges end up in which data split depends on the particular semantics of the splitting strategy.
Since the positive labels are user defined we use the setup printed in the cell below. More assigner and split strategies can be found in splitgenerator.lib.assigners and splitgenerator.lib.split_strategies respectively.
print(f"Will use the following split generator config:\n{TEMPLATE_TASK_CONFIG.gbml_config_pb.dataset_config.split_generator_config}")
print(f"The resources requested for the dataproc cluster (spark job) are as follows:\n{RESOURCE_CONFIG.split_generator_config}")
runner.run_split_generator(
pipeline_config=pipeline_config,
)
Training the model#
The Trainer component reads the outputs of split generator (which paths are specified in the frozen config), and trains a GNN model on the training set, early stops on the performance of the validation set, and finally evaluates on the test set. The training logic is implemented with PyTorch Distributed Data Parallel (DDP) Training, which enables distributed training on multiple GPU cards across multiple worker nodes.
print(f"Will use the following class and respective runtime arguments that will be passed into the training class constructor: {TEMPLATE_TASK_CONFIG.trainer_config}")
print(f"The resources requested for the Vertex AI based DDP training job: {RESOURCE_CONFIG.trainer_config}")
runner.run_trainer(
pipeline_config=pipeline_config,
)
print(
f"Model should be successfully trained and stored in the following location: " +
f"{frozen_config.trained_model_metadata_pb_wrapper.trained_model_metadata_pb.trained_model_uri}"
)
Inference#
The Inferencer component is responsible for running inference of a trained model on samples generated by the Subgraph Sampler component. At a high level, it works by applying a trained model in an embarrassingly parallel and distributed fashion across these samples, and persisting the output embeddings and/or predictions.
# WARN: There is an issue when trying to run dataflow jobs from inside a jupyter kernel; thus we cannot use the line
# below to run the inferencer as you would normally in a python script.
# runner.run_inferencer(pipeline_config=pipeline_config)
# Instead, we will run the inferencer from the command line.
# Note: You can actually do this with every component; we just make use of the runner to make it easier to run the components.
!python -m gigl.src.inference.inferencer \
--job_name=$JOB_NAME \
--task_config_uri=$frozen_config_uri \
--resource_config_uri=$RESOURCE_CONFIG_URI \
--custom_worker_image_uri=$DOCKER_IMAGE_DATAFLOW_RUNTIME_NAME_WITH_TAG \
--cpu_docker_uri=$DOCKER_IMAGE_MAIN_CPU_NAME_WITH_TAG \
--cuda_docker_uri=$DOCKER_IMAGE_MAIN_CUDA_NAME_WITH_TAG
# Looking at inference results
bq_emb_out_table = frozen_config.shared_config.inference_metadata.node_type_to_inferencer_output_info_map["paper_or_author"].embeddings_path
print(f"Embeddings should be successfully stored in the following location: {bq_emb_out_table}")
from gigl.src.common.utils.bq import BqUtils
bq_utils = BqUtils(project=PROJECT)
query = f"SELECT * FROM {bq_emb_out_table} LIMIT 5"
result = list(bq_utils.run_query(query=query, labels={}))
print(f"Query result: {bq_emb_out_table}")