butterfree.load.writers package

Submodules

Holds the Historical Feature Store writer class.

class butterfree.load.writers.historical_feature_store_writer.HistoricalFeatureStoreWriter(db_config=None, database=None, num_partitions=None, validation_threshold: float = 0.01, debug_mode: bool = False)

Bases: butterfree.load.writers.writer.Writer

Enable writing feature sets into the Historical Feature Store.

db_config

Datalake configuration for Spark, by default on AWS S3. For more information check module ‘butterfree.db.configs’.

database

database name to use in Spark metastore. By default FEATURE_STORE_HISTORICAL_DATABASE environment variable.

num_partitions

value to use when applying repartition on the df before save.

validation_threshold

lower and upper tolerance to using in count validation. The default value is defined in DEFAULT_VALIDATION_THRESHOLD property. For example: with a validation_threshold = 0.01 and a given calculated count on the dataframe equal to 100000 records, if the feature store return a count equal to 995000 an error will not be thrown. Use validation_threshold = 0 to not use tolerance in the validation.

debug_mode

“dry run” mode, write the result to a temporary view.

Example

Simple example regarding HistoricalFeatureStoreWriter class instantiation. We can instantiate this class without db configurations, so the class get the S3Config() where it provides default configurations about AWS S3 service.

>>> spark_client = SparkClient()
>>> writer = HistoricalFeatureStoreWriter()
>>> writer.write(feature_set=feature_set,
   ...           dataframe=dataframe,
   ...           spark_client=spark_client)

However, we can define the db configurations, like write mode, file format and S3 bucket, and provide them to HistoricalFeatureStoreWriter.

>>> spark_client = SparkClient()
>>> config = S3Config(bucket="my_s3_bucket_name",
    ...               mode="overwrite",
    ...               format_="parquet")
>>> writer = HistoricalFeatureStoreWriter(db_config=config)
>>> writer.write(feature_set=feature_set,
   ...           dataframe=dataframe,
   ...           spark_client=spark_client)

For what settings you can use on S3Config and default settings, to read S3Config class.

We can instantiate HistoricalFeatureStoreWriter class to validate the df to be written.

>>> spark_client = SparkClient()
>>> writer = HistoricalFeatureStoreWriter()
>>> writer.validate(feature_set=feature_set,
   ...              dataframe=dataframe,
   ...              spark_client=spark_client)

Both methods (write and validate) will need the Spark Client, Feature Set and DataFrame, to write or to validate, according to the Writer’s arguments.

P.S.: When writing, the HistoricalFeatureStoreWrite partitions the data to improve queries performance. The data is stored in partition folders in AWS S3 based on time (per year, month and day).

DEFAULT_VALIDATION_THRESHOLD = 0.01
PARTITION_BY = ['year', 'month', 'day']
validate(feature_set: butterfree.transform.feature_set.FeatureSet, dataframe: pyspark.sql.dataframe.DataFrame, spark_client: butterfree.clients.spark_client.SparkClient)

Calculate dataframe rows to validate data into Feature Store.

Parameters
  • feature_set – object processed with feature_set informations.

  • dataframe – spark dataframe containing data from a feature set.

  • spark_client – client for spark connections with external services.

Raises

AssertionError – if count of written data doesn’t match count in current feature set dataframe.

write(feature_set: butterfree.transform.feature_set.FeatureSet, dataframe: pyspark.sql.dataframe.DataFrame, spark_client: butterfree.clients.spark_client.SparkClient)

Loads the data from a feature set into the Historical Feature Store.

Parameters
  • feature_set – object processed with feature_set informations.

  • dataframe – spark dataframe containing data from a feature set.

  • spark_client – client for spark connections with external services.

If the debug_mode is set to True, a temporary table with a name in the format: historical_feature_store__{feature_set.name} will be created instead of writing to the real historical feature store.

Holds the Online Feature Store writer class.

class butterfree.load.writers.online_feature_store_writer.OnlineFeatureStoreWriter(db_config=None, debug_mode: bool = False, write_to_entity=False)

Bases: butterfree.load.writers.writer.Writer

Enable writing feature sets into the Online Feature Store.

db_config

Spark configuration for connect databases. For more information check the module ‘butterfree.db.configs’.

debug_mode

“dry run” mode, write the result to a temporary view.

write_to_entity

option to write the data to the entity table. With this option set to True, the writer will write the feature set to a table with the name equal to the entity name, defined on the pipeline. So, it WILL NOT write to a table with the name of the feature set, as it normally does.

Example

Simple example regarding OnlineFeatureStoreWriter class instantiation. We can instantiate this class without db configurations, so the class get the CassandraConfig() where it provides default configurations about CassandraDB.

>>> spark_client = SparkClient()
>>> writer = OnlineFeatureStoreWriter()
>>> writer.write(feature_set=feature_set,
   ...           dataframe=dataframe,
   ...           spark_client=spark_client)

However, we can define the db configurations and provide them to OnlineFeatureStoreWriter.

>>> spark_client = SparkClient()
>>> config = CassandraConfig(mode="overwrite",
    ...                      format_="parquet",
    ...                      keyspace="keyspace_name")
>>> writer = OnlineFeatureStoreWriter(db_config=config)
>>> writer.write(feature_set=feature_set,
   ...           dataframe=dataframe,
   ...           spark_client=spark_client)

For what settings you can use on CassandraConfig and default settings, to read CassandraConfig class.

We can instantiate OnlineFeatureStoreWriter class to validate the writers, using the default or custom configs.

>>> spark_client = SparkClient()
>>> writer = OnlineFeatureStoreWriter()
>>> writer.validate(feature_set=feature_set,
   ...              dataframe=dataframe,
   ...              spark_client=spark_client)

Both methods (writer and validate) will need the Spark Client, Feature Set and DataFrame, to write or to validate, according to OnlineFeatureStoreWriter class arguments.

static filter_latest(dataframe: pyspark.sql.dataframe.DataFrame, id_columns: List[Any]) → pyspark.sql.dataframe.DataFrame

Filters latest data from the dataframe.

Parameters
  • dataframe – spark dataframe containing data from a feature set.

  • id_columns – unique identifier column set for this feature set.

Returns

contains only latest data for each unique id in the

feature set.

Return type

dataframe

get_db_schema(feature_set: butterfree.transform.feature_set.FeatureSet)

Get desired database schema.

Parameters

feature_set – object processed with feature set metadata.

Returns

Desired database schema.

validate(feature_set: butterfree.transform.feature_set.FeatureSet, dataframe, spark_client: butterfree.clients.spark_client.SparkClient)

Calculate dataframe rows to validate data into Feature Store.

Parameters
  • feature_set – object processed with feature set metadata.

  • dataframe – Spark dataframe containing data from a feature set.

  • spark_client – client for Spark connections with external services.

Raises

AssertionError – if validation fails.

write(feature_set: butterfree.transform.feature_set.FeatureSet, dataframe: pyspark.sql.dataframe.DataFrame, spark_client: butterfree.clients.spark_client.SparkClient) → Optional[pyspark.sql.streaming.StreamingQuery]

Loads the latest data from a feature set into the Feature Store.

Parameters
  • feature_set – object processed with feature set metadata.

  • dataframe – Spark dataframe containing data from a feature set.

  • spark_client – client for Spark connections with external services.

Returns

Streaming handler if writing streaming df, None otherwise.

If the debug_mode is set to True, a temporary table with a name in the format: online_feature_store__my_feature_set will be created instead of writing to the real online feature store. If dataframe is streaming this temporary table will be updated in real time.

Writer entity.

class butterfree.load.writers.writer.Writer

Bases: abc.ABC

Abstract base class for Writers.

Parameters

spark_client – client for spark connections with external services.

abstract validate(feature_set: butterfree.transform.feature_set.FeatureSet, dataframe: pyspark.sql.dataframe.DataFrame, spark_client: butterfree.clients.spark_client.SparkClient)

Calculate dataframe rows to validate data into Feature Store.

Parameters
  • feature_set – object processed with feature set metadata.

  • dataframe – Spark dataframe containing data from a feature set.

  • spark_client – client for Spark connections with external services.

Raises

AssertionError – if validation fails.

with_(transformer: Callable, *args, **kwargs)

Define a new transformation for the Writer.

All the transformations are used when the method consume is called.

Parameters
  • transformer – method that receives a dataframe and output a dataframe.

  • *args – args for the transformer.

  • **kwargs – kwargs for the transformer.

Returns

Reader object with new transformation

abstract write(feature_set: butterfree.transform.feature_set.FeatureSet, dataframe: pyspark.sql.dataframe.DataFrame, spark_client: butterfree.clients.spark_client.SparkClient)

Loads the data from a feature set into the Feature Store.

Feature Store could be Online or Historical.

Parameters
  • feature_set – object processed with feature set metadata.

  • dataframe – Spark dataframe containing data from a feature set.

  • spark_client – client for Spark connections with external services.

Module contents

Holds data loaders for historical and online feature store.

class butterfree.load.writers.HistoricalFeatureStoreWriter(db_config=None, database=None, num_partitions=None, validation_threshold: float = 0.01, debug_mode: bool = False)

Bases: butterfree.load.writers.writer.Writer

Enable writing feature sets into the Historical Feature Store.

db_config

Datalake configuration for Spark, by default on AWS S3. For more information check module ‘butterfree.db.configs’.

database

database name to use in Spark metastore. By default FEATURE_STORE_HISTORICAL_DATABASE environment variable.

num_partitions

value to use when applying repartition on the df before save.

validation_threshold

lower and upper tolerance to using in count validation. The default value is defined in DEFAULT_VALIDATION_THRESHOLD property. For example: with a validation_threshold = 0.01 and a given calculated count on the dataframe equal to 100000 records, if the feature store return a count equal to 995000 an error will not be thrown. Use validation_threshold = 0 to not use tolerance in the validation.

debug_mode

“dry run” mode, write the result to a temporary view.

Example

Simple example regarding HistoricalFeatureStoreWriter class instantiation. We can instantiate this class without db configurations, so the class get the S3Config() where it provides default configurations about AWS S3 service.

>>> spark_client = SparkClient()
>>> writer = HistoricalFeatureStoreWriter()
>>> writer.write(feature_set=feature_set,
   ...           dataframe=dataframe,
   ...           spark_client=spark_client)

However, we can define the db configurations, like write mode, file format and S3 bucket, and provide them to HistoricalFeatureStoreWriter.

>>> spark_client = SparkClient()
>>> config = S3Config(bucket="my_s3_bucket_name",
    ...               mode="overwrite",
    ...               format_="parquet")
>>> writer = HistoricalFeatureStoreWriter(db_config=config)
>>> writer.write(feature_set=feature_set,
   ...           dataframe=dataframe,
   ...           spark_client=spark_client)

For what settings you can use on S3Config and default settings, to read S3Config class.

We can instantiate HistoricalFeatureStoreWriter class to validate the df to be written.

>>> spark_client = SparkClient()
>>> writer = HistoricalFeatureStoreWriter()
>>> writer.validate(feature_set=feature_set,
   ...              dataframe=dataframe,
   ...              spark_client=spark_client)

Both methods (write and validate) will need the Spark Client, Feature Set and DataFrame, to write or to validate, according to the Writer’s arguments.

P.S.: When writing, the HistoricalFeatureStoreWrite partitions the data to improve queries performance. The data is stored in partition folders in AWS S3 based on time (per year, month and day).

DEFAULT_VALIDATION_THRESHOLD = 0.01
PARTITION_BY = ['year', 'month', 'day']
validate(feature_set: butterfree.transform.feature_set.FeatureSet, dataframe: pyspark.sql.dataframe.DataFrame, spark_client: butterfree.clients.spark_client.SparkClient)

Calculate dataframe rows to validate data into Feature Store.

Parameters
  • feature_set – object processed with feature_set informations.

  • dataframe – spark dataframe containing data from a feature set.

  • spark_client – client for spark connections with external services.

Raises

AssertionError – if count of written data doesn’t match count in current feature set dataframe.

write(feature_set: butterfree.transform.feature_set.FeatureSet, dataframe: pyspark.sql.dataframe.DataFrame, spark_client: butterfree.clients.spark_client.SparkClient)

Loads the data from a feature set into the Historical Feature Store.

Parameters
  • feature_set – object processed with feature_set informations.

  • dataframe – spark dataframe containing data from a feature set.

  • spark_client – client for spark connections with external services.

If the debug_mode is set to True, a temporary table with a name in the format: historical_feature_store__{feature_set.name} will be created instead of writing to the real historical feature store.

class butterfree.load.writers.OnlineFeatureStoreWriter(db_config=None, debug_mode: bool = False, write_to_entity=False)

Bases: butterfree.load.writers.writer.Writer

Enable writing feature sets into the Online Feature Store.

db_config

Spark configuration for connect databases. For more information check the module ‘butterfree.db.configs’.

debug_mode

“dry run” mode, write the result to a temporary view.

write_to_entity

option to write the data to the entity table. With this option set to True, the writer will write the feature set to a table with the name equal to the entity name, defined on the pipeline. So, it WILL NOT write to a table with the name of the feature set, as it normally does.

Example

Simple example regarding OnlineFeatureStoreWriter class instantiation. We can instantiate this class without db configurations, so the class get the CassandraConfig() where it provides default configurations about CassandraDB.

>>> spark_client = SparkClient()
>>> writer = OnlineFeatureStoreWriter()
>>> writer.write(feature_set=feature_set,
   ...           dataframe=dataframe,
   ...           spark_client=spark_client)

However, we can define the db configurations and provide them to OnlineFeatureStoreWriter.

>>> spark_client = SparkClient()
>>> config = CassandraConfig(mode="overwrite",
    ...                      format_="parquet",
    ...                      keyspace="keyspace_name")
>>> writer = OnlineFeatureStoreWriter(db_config=config)
>>> writer.write(feature_set=feature_set,
   ...           dataframe=dataframe,
   ...           spark_client=spark_client)

For what settings you can use on CassandraConfig and default settings, to read CassandraConfig class.

We can instantiate OnlineFeatureStoreWriter class to validate the writers, using the default or custom configs.

>>> spark_client = SparkClient()
>>> writer = OnlineFeatureStoreWriter()
>>> writer.validate(feature_set=feature_set,
   ...              dataframe=dataframe,
   ...              spark_client=spark_client)

Both methods (writer and validate) will need the Spark Client, Feature Set and DataFrame, to write or to validate, according to OnlineFeatureStoreWriter class arguments.

static filter_latest(dataframe: pyspark.sql.dataframe.DataFrame, id_columns: List[Any]) → pyspark.sql.dataframe.DataFrame

Filters latest data from the dataframe.

Parameters
  • dataframe – spark dataframe containing data from a feature set.

  • id_columns – unique identifier column set for this feature set.

Returns

contains only latest data for each unique id in the

feature set.

Return type

dataframe

get_db_schema(feature_set: butterfree.transform.feature_set.FeatureSet)

Get desired database schema.

Parameters

feature_set – object processed with feature set metadata.

Returns

Desired database schema.

validate(feature_set: butterfree.transform.feature_set.FeatureSet, dataframe, spark_client: butterfree.clients.spark_client.SparkClient)

Calculate dataframe rows to validate data into Feature Store.

Parameters
  • feature_set – object processed with feature set metadata.

  • dataframe – Spark dataframe containing data from a feature set.

  • spark_client – client for Spark connections with external services.

Raises

AssertionError – if validation fails.

write(feature_set: butterfree.transform.feature_set.FeatureSet, dataframe: pyspark.sql.dataframe.DataFrame, spark_client: butterfree.clients.spark_client.SparkClient) → Optional[pyspark.sql.streaming.StreamingQuery]

Loads the latest data from a feature set into the Feature Store.

Parameters
  • feature_set – object processed with feature set metadata.

  • dataframe – Spark dataframe containing data from a feature set.

  • spark_client – client for Spark connections with external services.

Returns

Streaming handler if writing streaming df, None otherwise.

If the debug_mode is set to True, a temporary table with a name in the format: online_feature_store__my_feature_set will be created instead of writing to the real online feature store. If dataframe is streaming this temporary table will be updated in real time.