butterfree.load.writers package

Submodules

Holds the Historical Feature Store writer class.

class butterfree.load.writers.historical_feature_store_writer.HistoricalFeatureStoreWriter(db_config: Optional[AbstractWriteConfig] = None, database: Optional[str] = None, num_partitions: Optional[int] = None, validation_threshold: float = 0.01, debug_mode: bool = False, interval_mode: bool = False, check_schema_hook: Optional[Hook] = None, row_count_validation: bool = True)

Bases: Writer

Enable writing feature sets into the Historical Feature Store.

db_config

Datalake configuration for Spark, by default on AWS S3. For more information check module ‘butterfree.db.configs’.

database

database name to use in Spark metastore. By default FEATURE_STORE_HISTORICAL_DATABASE environment variable.

num_partitions

value to use when applying repartition on the df before save.

validation_threshold

lower and upper tolerance to using in count validation. The default value is defined in DEFAULT_VALIDATION_THRESHOLD property. For example: with a validation_threshold = 0.01 and a given calculated count on the dataframe equal to 100000 records, if the feature store return a count equal to 995000 an error will not be thrown. Use validation_threshold = 0 to not use tolerance in the validation.

debug_mode

“dry run” mode, write the result to a temporary view.

Example

Simple example regarding HistoricalFeatureStoreWriter class instantiation. We can instantiate this class without db configurations, so the class get the S3Config() where it provides default configurations about AWS S3 service.

>>> spark_client = SparkClient()
>>> writer = HistoricalFeatureStoreWriter()
>>> writer.write(feature_set=feature_set,
   ...           dataframe=dataframe,
   ...           spark_client=spark_client)

However, we can define the db configurations, like write mode, file format and S3 bucket, and provide them to HistoricalFeatureStoreWriter.

>>> spark_client = SparkClient()
>>> config = MetastoreConfig(path="my_s3_bucket_name",
    ...               mode="overwrite",
    ...               format_="parquet")
>>> writer = HistoricalFeatureStoreWriter(db_config=config)
>>> writer.write(feature_set=feature_set,
   ...           dataframe=dataframe,
   ...           spark_client=spark_client)

For what settings you can use on S3Config and default settings, to read S3Config class.

We can write with interval mode, where HistoricalFeatureStoreWrite will need to use Dynamic Partition Inserts, the behaviour of OVERWRITE keyword is controlled by spark.sql.sources.partitionOverwriteMode configuration property. The dynamic overwrite mode is enabled Spark will only delete the partitions for which it has data to be written to. All the other partitions remain intact.

>>> spark_client = SparkClient()
>>> writer = HistoricalFeatureStoreWriter(interval_mode=True)
>>> writer.write(feature_set=feature_set,
   ...           dataframe=dataframe,
   ...           spark_client=spark_client)

We can instantiate HistoricalFeatureStoreWriter class to validate the df to be written.

>>> spark_client = SparkClient()
>>> writer = HistoricalFeatureStoreWriter()
>>> writer.validate(feature_set=feature_set,
   ...              dataframe=dataframe,
   ...              spark_client=spark_client)

Both methods (write and validate) will need the Spark Client, Feature Set and DataFrame, to write or to validate, according to the Writer’s arguments.

P.S.: When writing, the HistoricalFeatureStoreWrite partitions the data to improve queries performance. The data is stored in partition folders in AWS S3 based on time (per year, month and day).

DEFAULT_VALIDATION_THRESHOLD = 0.01
PARTITION_BY = ['year', 'month', 'day']
check_schema(client: Any, dataframe: DataFrame, table_name: str, database: Optional[str] = None) DataFrame

Instantiate the schema check hook to check schema between dataframe and database.

Parameters:
  • client – client for Spark or Cassandra connections with external services.

  • dataframe – Spark dataframe containing data from a feature set.

  • table_name – table name where the dataframe will be saved.

  • database – database name where the dataframe will be saved.

validate(feature_set: FeatureSet, dataframe: DataFrame, spark_client: SparkClient) None

Calculate dataframe rows to validate data into Feature Store.

Parameters:
  • feature_set – object processed with feature_set informations.

  • dataframe – spark dataframe containing data from a feature set.

  • spark_client – client for spark connections with external services.

Raises:

AssertionError – if count of written data doesn’t match count in current feature set dataframe.

write(feature_set: FeatureSet, dataframe: DataFrame, spark_client: SparkClient) None

Loads the data from a feature set into the Historical Feature Store.

Parameters:
  • feature_set – object processed with feature_set informations.

  • dataframe – spark dataframe containing data from a feature set.

  • spark_client – client for spark connections with external services.

If the debug_mode is set to True, a temporary table with a name in the format: historical_feature_store__{feature_set.name} will be created instead of writing to the real historical feature store.

Holds the Online Feature Store writer class.

class butterfree.load.writers.online_feature_store_writer.OnlineFeatureStoreWriter(db_config: Optional[AbstractWriteConfig] = None, database: Optional[str] = None, debug_mode: bool = False, write_to_entity: bool = False, interval_mode: bool = False, check_schema_hook: Optional[Hook] = None)

Bases: Writer

Enable writing feature sets into the Online Feature Store.

db_config

Spark configuration for connect databases. For more information check the module ‘butterfree.db.configs’.

debug_mode

“dry run” mode, write the result to a temporary view.

write_to_entity

option to write the data to the entity table. With this option set to True, the writer will write the feature set to a table with the name equal to the entity name, defined on the pipeline. So, it WILL NOT write to a table with the name of the feature set, as it normally does.

Example

Simple example regarding OnlineFeatureStoreWriter class instantiation. We can instantiate this class without db configurations, so the class get the CassandraConfig() where it provides default configurations about CassandraDB.

>>> spark_client = SparkClient()
>>> writer = OnlineFeatureStoreWriter()
>>> writer.write(feature_set=feature_set,
   ...           dataframe=dataframe,
   ...           spark_client=spark_client)

However, we can define the db configurations and provide them to OnlineFeatureStoreWriter.

>>> spark_client = SparkClient()
>>> config = CassandraConfig(mode="overwrite",
    ...                      format_="parquet",
    ...                      keyspace="keyspace_name")
>>> writer = OnlineFeatureStoreWriter(db_config=config)
>>> writer.write(feature_set=feature_set,
   ...           dataframe=dataframe,
   ...           spark_client=spark_client)

For what settings you can use on CassandraConfig and default settings, to read CassandraConfig class.

We can instantiate OnlineFeatureStoreWriter class to validate the writers, using the default or custom configs.

>>> spark_client = SparkClient()
>>> writer = OnlineFeatureStoreWriter()
>>> writer.validate(feature_set=feature_set,
   ...              dataframe=dataframe,
   ...              spark_client=spark_client)

Both methods (writer and validate) will need the Spark Client, Feature Set and DataFrame, to write or to validate, according to OnlineFeatureStoreWriter class arguments.

There’s an important aspect to be highlighted here: if you’re using the incremental mode, we do not check if your data is the newest before writing to the online feature store.

This behavior is known and will be fixed soon.

check_schema(client: Any, dataframe: DataFrame, table_name: str, database: Optional[str] = None) DataFrame

Instantiate the schema check hook to check schema between dataframe and database.

Parameters:
  • client – client for Spark or Cassandra connections with external services.

  • dataframe – Spark dataframe containing data from a feature set.

  • table_name – table name where the dataframe will be saved.

  • database – database name where the dataframe will be saved.

static filter_latest(dataframe: DataFrame, id_columns: List[Any]) DataFrame

Filters latest data from the dataframe.

Parameters:
  • dataframe – spark dataframe containing data from a feature set.

  • id_columns – unique identifier column set for this feature set.

Returns:

contains only latest data for each unique id in the

feature set.

Return type:

dataframe

get_db_schema(feature_set: FeatureSet) List[Dict[Any, Any]]

Get desired database schema.

Parameters:

feature_set – object processed with feature set metadata.

Returns:

Desired database schema.

validate(feature_set: FeatureSet, dataframe: DataFrame, spark_client: SparkClient) None

Calculate dataframe rows to validate data into Feature Store.

Parameters:
  • feature_set – object processed with feature set metadata.

  • dataframe – Spark dataframe containing data from a feature set.

  • spark_client – client for Spark connections with external services.

Raises:

AssertionError – if validation fails.

write(feature_set: FeatureSet, dataframe: DataFrame, spark_client: SparkClient) Optional[StreamingQuery]

Loads the latest data from a feature set into the Feature Store.

Parameters:
  • feature_set – object processed with feature set metadata.

  • dataframe – Spark dataframe containing data from a feature set.

  • spark_client – client for Spark connections with external services.

Returns:

Streaming handler if writing streaming df, None otherwise.

If the debug_mode is set to True, a temporary table with a name in the format: online_feature_store__my_feature_set will be created instead of writing to the real online feature store. If dataframe is streaming this temporary table will be updated in real time.

Writer entity.

class butterfree.load.writers.writer.Writer(db_config: AbstractWriteConfig, debug_mode: bool = False, interval_mode: bool = False, write_to_entity: bool = False, row_count_validation: bool = True)

Bases: ABC, HookableComponent

Abstract base class for Writers.

Parameters:

spark_client – client for spark connections with external services.

abstract check_schema(client: Any, dataframe: DataFrame, table_name: str, database: Optional[str] = None) DataFrame

Instantiate the schema check hook to check schema between dataframe and database.

Parameters:
  • client – client for Spark or Cassandra connections with external services.

  • dataframe – Spark dataframe containing data from a feature set.

  • table_name – table name where the dataframe will be saved.

  • database – database name where the dataframe will be saved.

abstract validate(feature_set: FeatureSet, dataframe: DataFrame, spark_client: SparkClient) Any

Calculate dataframe rows to validate data into Feature Store.

Parameters:
  • feature_set – object processed with feature set metadata.

  • dataframe – Spark dataframe containing data from a feature set.

  • spark_client – client for Spark connections with external services.

Raises:

AssertionError – if validation fails.

with_(transformer: Callable[[...], DataFrame], *args: Any, **kwargs: Any) Writer

Define a new transformation for the Writer.

All the transformations are used when the method consume is called.

Parameters:
  • transformer – method that receives a dataframe and output a dataframe.

  • *args – args for the transformer.

  • **kwargs – kwargs for the transformer.

Returns:

Reader object with new transformation

abstract write(feature_set: FeatureSet, dataframe: DataFrame, spark_client: SparkClient) Any

Loads the data from a feature set into the Feature Store.

Feature Store could be Online or Historical.

Parameters:
  • feature_set – object processed with feature set metadata.

  • dataframe – Spark dataframe containing data from a feature set.

  • spark_client – client for Spark connections with external services.

Module contents

Holds data loaders for historical and online feature store.

class butterfree.load.writers.HistoricalFeatureStoreWriter(db_config: Optional[AbstractWriteConfig] = None, database: Optional[str] = None, num_partitions: Optional[int] = None, validation_threshold: float = 0.01, debug_mode: bool = False, interval_mode: bool = False, check_schema_hook: Optional[Hook] = None, row_count_validation: bool = True)

Bases: Writer

Enable writing feature sets into the Historical Feature Store.

db_config

Datalake configuration for Spark, by default on AWS S3. For more information check module ‘butterfree.db.configs’.

database

database name to use in Spark metastore. By default FEATURE_STORE_HISTORICAL_DATABASE environment variable.

num_partitions

value to use when applying repartition on the df before save.

validation_threshold

lower and upper tolerance to using in count validation. The default value is defined in DEFAULT_VALIDATION_THRESHOLD property. For example: with a validation_threshold = 0.01 and a given calculated count on the dataframe equal to 100000 records, if the feature store return a count equal to 995000 an error will not be thrown. Use validation_threshold = 0 to not use tolerance in the validation.

debug_mode

“dry run” mode, write the result to a temporary view.

Example

Simple example regarding HistoricalFeatureStoreWriter class instantiation. We can instantiate this class without db configurations, so the class get the S3Config() where it provides default configurations about AWS S3 service.

>>> spark_client = SparkClient()
>>> writer = HistoricalFeatureStoreWriter()
>>> writer.write(feature_set=feature_set,
   ...           dataframe=dataframe,
   ...           spark_client=spark_client)

However, we can define the db configurations, like write mode, file format and S3 bucket, and provide them to HistoricalFeatureStoreWriter.

>>> spark_client = SparkClient()
>>> config = MetastoreConfig(path="my_s3_bucket_name",
    ...               mode="overwrite",
    ...               format_="parquet")
>>> writer = HistoricalFeatureStoreWriter(db_config=config)
>>> writer.write(feature_set=feature_set,
   ...           dataframe=dataframe,
   ...           spark_client=spark_client)

For what settings you can use on S3Config and default settings, to read S3Config class.

We can write with interval mode, where HistoricalFeatureStoreWrite will need to use Dynamic Partition Inserts, the behaviour of OVERWRITE keyword is controlled by spark.sql.sources.partitionOverwriteMode configuration property. The dynamic overwrite mode is enabled Spark will only delete the partitions for which it has data to be written to. All the other partitions remain intact.

>>> spark_client = SparkClient()
>>> writer = HistoricalFeatureStoreWriter(interval_mode=True)
>>> writer.write(feature_set=feature_set,
   ...           dataframe=dataframe,
   ...           spark_client=spark_client)

We can instantiate HistoricalFeatureStoreWriter class to validate the df to be written.

>>> spark_client = SparkClient()
>>> writer = HistoricalFeatureStoreWriter()
>>> writer.validate(feature_set=feature_set,
   ...              dataframe=dataframe,
   ...              spark_client=spark_client)

Both methods (write and validate) will need the Spark Client, Feature Set and DataFrame, to write or to validate, according to the Writer’s arguments.

P.S.: When writing, the HistoricalFeatureStoreWrite partitions the data to improve queries performance. The data is stored in partition folders in AWS S3 based on time (per year, month and day).

DEFAULT_VALIDATION_THRESHOLD = 0.01
PARTITION_BY = ['year', 'month', 'day']
check_schema(client: Any, dataframe: DataFrame, table_name: str, database: Optional[str] = None) DataFrame

Instantiate the schema check hook to check schema between dataframe and database.

Parameters:
  • client – client for Spark or Cassandra connections with external services.

  • dataframe – Spark dataframe containing data from a feature set.

  • table_name – table name where the dataframe will be saved.

  • database – database name where the dataframe will be saved.

transformations: List[Dict[str, Any]]
validate(feature_set: FeatureSet, dataframe: DataFrame, spark_client: SparkClient) None

Calculate dataframe rows to validate data into Feature Store.

Parameters:
  • feature_set – object processed with feature_set informations.

  • dataframe – spark dataframe containing data from a feature set.

  • spark_client – client for spark connections with external services.

Raises:

AssertionError – if count of written data doesn’t match count in current feature set dataframe.

write(feature_set: FeatureSet, dataframe: DataFrame, spark_client: SparkClient) None

Loads the data from a feature set into the Historical Feature Store.

Parameters:
  • feature_set – object processed with feature_set informations.

  • dataframe – spark dataframe containing data from a feature set.

  • spark_client – client for spark connections with external services.

If the debug_mode is set to True, a temporary table with a name in the format: historical_feature_store__{feature_set.name} will be created instead of writing to the real historical feature store.

class butterfree.load.writers.OnlineFeatureStoreWriter(db_config: Optional[AbstractWriteConfig] = None, database: Optional[str] = None, debug_mode: bool = False, write_to_entity: bool = False, interval_mode: bool = False, check_schema_hook: Optional[Hook] = None)

Bases: Writer

Enable writing feature sets into the Online Feature Store.

db_config

Spark configuration for connect databases. For more information check the module ‘butterfree.db.configs’.

debug_mode

“dry run” mode, write the result to a temporary view.

write_to_entity

option to write the data to the entity table. With this option set to True, the writer will write the feature set to a table with the name equal to the entity name, defined on the pipeline. So, it WILL NOT write to a table with the name of the feature set, as it normally does.

Example

Simple example regarding OnlineFeatureStoreWriter class instantiation. We can instantiate this class without db configurations, so the class get the CassandraConfig() where it provides default configurations about CassandraDB.

>>> spark_client = SparkClient()
>>> writer = OnlineFeatureStoreWriter()
>>> writer.write(feature_set=feature_set,
   ...           dataframe=dataframe,
   ...           spark_client=spark_client)

However, we can define the db configurations and provide them to OnlineFeatureStoreWriter.

>>> spark_client = SparkClient()
>>> config = CassandraConfig(mode="overwrite",
    ...                      format_="parquet",
    ...                      keyspace="keyspace_name")
>>> writer = OnlineFeatureStoreWriter(db_config=config)
>>> writer.write(feature_set=feature_set,
   ...           dataframe=dataframe,
   ...           spark_client=spark_client)

For what settings you can use on CassandraConfig and default settings, to read CassandraConfig class.

We can instantiate OnlineFeatureStoreWriter class to validate the writers, using the default or custom configs.

>>> spark_client = SparkClient()
>>> writer = OnlineFeatureStoreWriter()
>>> writer.validate(feature_set=feature_set,
   ...              dataframe=dataframe,
   ...              spark_client=spark_client)

Both methods (writer and validate) will need the Spark Client, Feature Set and DataFrame, to write or to validate, according to OnlineFeatureStoreWriter class arguments.

There’s an important aspect to be highlighted here: if you’re using the incremental mode, we do not check if your data is the newest before writing to the online feature store.

This behavior is known and will be fixed soon.

check_schema(client: Any, dataframe: DataFrame, table_name: str, database: Optional[str] = None) DataFrame

Instantiate the schema check hook to check schema between dataframe and database.

Parameters:
  • client – client for Spark or Cassandra connections with external services.

  • dataframe – Spark dataframe containing data from a feature set.

  • table_name – table name where the dataframe will be saved.

  • database – database name where the dataframe will be saved.

static filter_latest(dataframe: DataFrame, id_columns: List[Any]) DataFrame

Filters latest data from the dataframe.

Parameters:
  • dataframe – spark dataframe containing data from a feature set.

  • id_columns – unique identifier column set for this feature set.

Returns:

contains only latest data for each unique id in the

feature set.

Return type:

dataframe

get_db_schema(feature_set: FeatureSet) List[Dict[Any, Any]]

Get desired database schema.

Parameters:

feature_set – object processed with feature set metadata.

Returns:

Desired database schema.

validate(feature_set: FeatureSet, dataframe: DataFrame, spark_client: SparkClient) None

Calculate dataframe rows to validate data into Feature Store.

Parameters:
  • feature_set – object processed with feature set metadata.

  • dataframe – Spark dataframe containing data from a feature set.

  • spark_client – client for Spark connections with external services.

Raises:

AssertionError – if validation fails.

write(feature_set: FeatureSet, dataframe: DataFrame, spark_client: SparkClient) Optional[StreamingQuery]

Loads the latest data from a feature set into the Feature Store.

Parameters:
  • feature_set – object processed with feature set metadata.

  • dataframe – Spark dataframe containing data from a feature set.

  • spark_client – client for Spark connections with external services.

Returns:

Streaming handler if writing streaming df, None otherwise.

If the debug_mode is set to True, a temporary table with a name in the format: online_feature_store__my_feature_set will be created instead of writing to the real online feature store. If dataframe is streaming this temporary table will be updated in real time.