gigl.src.data_preprocessor.lib.transform.utils#

Attributes#

Classes#

AnalyzeAndBuildTFTransformFn

A transform object used to modify one or more PCollections.

GenerateAndVisualizeStats

A transform object used to modify one or more PCollections.

IngestRawFeatures

A transform object used to modify one or more PCollections.

InstanceDictToTFExample

Uses a feature spec to process a raw instance dict (read from some tabular data) as a TFExample. These

ReadExistingTFTransformFn

A transform object used to modify one or more PCollections.

WriteTFSchema

A transform object used to modify one or more PCollections.

Functions#

get_load_data_and_transform_pipeline_component(...[, ...])

Generate a Beam pipeline to conduct transformation, given a source feature table in BQ and an output path in GCS.

Module Contents#

class gigl.src.data_preprocessor.lib.transform.utils.AnalyzeAndBuildTFTransformFn(tensor_adapter_config, preprocessing_fn)[source]#

Bases: apache_beam.PTransform

A transform object used to modify one or more PCollections.

Subclasses must define an expand() method that will be used when the transform is applied to some arguments. Typical usage pattern will be:

input | CustomTransform(…)

The expand() method of the CustomTransform object passed in will be called with input as an argument.

Parameters:
  • tensor_adapter_config (tfx_bsl.tfxio.tensor_adapter.TensorAdapterConfig)

  • preprocessing_fn (Callable[[gigl.src.data_preprocessor.lib.types.TFTensorDict], gigl.src.data_preprocessor.lib.types.TFTensorDict])

expand(features)[source]#
Parameters:

features (apache_beam.pvalue.PCollection[pyarrow.RecordBatch])

Return type:

apache_beam.pvalue.PCollection[Any]

preprocessing_fn[source]#
tensor_adapter_config[source]#
class gigl.src.data_preprocessor.lib.transform.utils.GenerateAndVisualizeStats(facets_report_uri, stats_output_uri)[source]#

Bases: apache_beam.PTransform

A transform object used to modify one or more PCollections.

Subclasses must define an expand() method that will be used when the transform is applied to some arguments. Typical usage pattern will be:

input | CustomTransform(…)

The expand() method of the CustomTransform object passed in will be called with input as an argument.

Parameters:
expand(features)[source]#
Parameters:

features (apache_beam.pvalue.PCollection[pyarrow.RecordBatch])

Return type:

apache_beam.pvalue.PCollection[tensorflow_metadata.proto.v0.statistics_pb2.DatasetFeatureStatisticsList]

facets_report_uri[source]#
stats_output_uri[source]#
class gigl.src.data_preprocessor.lib.transform.utils.IngestRawFeatures(data_reference, feature_spec, schema, beam_record_tfxio)[source]#

Bases: apache_beam.PTransform

A transform object used to modify one or more PCollections.

Subclasses must define an expand() method that will be used when the transform is applied to some arguments. Typical usage pattern will be:

input | CustomTransform(…)

The expand() method of the CustomTransform object passed in will be called with input as an argument.

Parameters:
expand(pbegin)[source]#
Parameters:

pbegin (apache_beam.pvalue.PBegin)

Return type:

apache_beam.pvalue.PCollection[pyarrow.RecordBatch]

beam_record_tfxio[source]#
data_reference[source]#
feature_spec[source]#
schema[source]#
class gigl.src.data_preprocessor.lib.transform.utils.InstanceDictToTFExample(feature_spec, schema)[source]#

Bases: apache_beam.DoFn

Uses a feature spec to process a raw instance dict (read from some tabular data) as a TFExample. These instance dict inputs could allow us to read tabular input data from BQ, GSC or anything else. As long as we have a way of yielding instance dicts and parsing them with a feature spec, we should be able to transform this data into TFRecords during ingestion, which allows for more efficient operations in TFT. See https://www.tensorflow.org/tfx/transform/get_started#the_tfxio_format.

Parameters:
  • feature_spec (gigl.src.data_preprocessor.lib.types.FeatureSpecDict)

  • schema (tensorflow_metadata.proto.v0.schema_pb2.Schema)

process(element)[source]#

Method to use for processing elements.

This is invoked by DoFnRunner for each element of a input PCollection.

The following parameters can be used as default values on process arguments to indicate that a DoFn accepts the corresponding parameters. For example, a DoFn might accept the element and its timestamp with the following signature:

def process(element=DoFn.ElementParam, timestamp=DoFn.TimestampParam):
  ...

The full set of parameters is:

  • DoFn.ElementParam: element to be processed, should not be mutated.

  • DoFn.SideInputParam: a side input that may be used when processing.

  • DoFn.TimestampParam: timestamp of the input element.

  • DoFn.WindowParam: Window the input element belongs to.

  • DoFn.TimerParam: a userstate.RuntimeTimer object defined by the spec of the parameter.

  • DoFn.StateParam: a userstate.RuntimeState object defined by the spec of the parameter.

  • DoFn.KeyParam: key associated with the element.

  • DoFn.RestrictionParam: an iobase.RestrictionTracker will be provided here to allow treatment as a Splittable DoFn. The restriction tracker will be derived from the restriction provider in the parameter.

  • DoFn.WatermarkEstimatorParam: a function that can be used to track output watermark of Splittable DoFn implementations.

Parameters:
  • element (gigl.src.data_preprocessor.lib.types.InstanceDict) – The element to be processed

  • *args – side inputs

  • **kwargs – other keyword arguments.

Returns:

An Iterable of output elements or None.

Return type:

Iterable[bytes]

feature_spec[source]#
schema[source]#
class gigl.src.data_preprocessor.lib.transform.utils.ReadExistingTFTransformFn(tf_transform_directory)[source]#

Bases: apache_beam.PTransform

A transform object used to modify one or more PCollections.

Subclasses must define an expand() method that will be used when the transform is applied to some arguments. Typical usage pattern will be:

input | CustomTransform(…)

The expand() method of the CustomTransform object passed in will be called with input as an argument.

Parameters:

tf_transform_directory (gigl.common.Uri)

expand(pbegin)[source]#
Parameters:

pbegin (apache_beam.pvalue.PBegin)

Return type:

apache_beam.pvalue.PCollection[Any]

tf_transform_directory[source]#
class gigl.src.data_preprocessor.lib.transform.utils.WriteTFSchema(schema, target_uri, schema_descriptor)[source]#

Bases: apache_beam.PTransform

A transform object used to modify one or more PCollections.

Subclasses must define an expand() method that will be used when the transform is applied to some arguments. Typical usage pattern will be:

input | CustomTransform(…)

The expand() method of the CustomTransform object passed in will be called with input as an argument.

Parameters:
  • schema (tensorflow_metadata.proto.v0.schema_pb2.Schema)

  • target_uri (gigl.common.GcsUri)

  • schema_descriptor (str)

expand(pbegin)[source]#
Parameters:

pbegin (apache_beam.pvalue.PBegin)

Return type:

apache_beam.pvalue.PDone

schema[source]#
schema_descriptor[source]#
target_uri[source]#
gigl.src.data_preprocessor.lib.transform.utils.get_load_data_and_transform_pipeline_component(applied_task_identifier, data_reference, preprocessing_spec, transformed_features_info, num_shards, custom_worker_image_uri=None)[source]#

Generate a Beam pipeline to conduct transformation, given a source feature table in BQ and an output path in GCS.

Parameters:
Return type:

apache_beam.Pipeline

gigl.src.data_preprocessor.lib.transform.utils.logger[source]#