butterfree.load.writers package¶
Submodules¶
Holds the Historical Feature Store writer class.
- class butterfree.load.writers.historical_feature_store_writer.HistoricalFeatureStoreWriter(db_config: Optional[AbstractWriteConfig] = None, database: Optional[str] = None, num_partitions: Optional[int] = None, validation_threshold: float = 0.01, debug_mode: bool = False, interval_mode: bool = False, check_schema_hook: Optional[Hook] = None, row_count_validation: bool = True)¶
Bases:
Writer
Enable writing feature sets into the Historical Feature Store.
- db_config¶
Datalake configuration for Spark, by default on AWS S3. For more information check module ‘butterfree.db.configs’.
- database¶
database name to use in Spark metastore. By default FEATURE_STORE_HISTORICAL_DATABASE environment variable.
- num_partitions¶
value to use when applying repartition on the df before save.
- validation_threshold¶
lower and upper tolerance to using in count validation. The default value is defined in DEFAULT_VALIDATION_THRESHOLD property. For example: with a validation_threshold = 0.01 and a given calculated count on the dataframe equal to 100000 records, if the feature store return a count equal to 995000 an error will not be thrown. Use validation_threshold = 0 to not use tolerance in the validation.
- debug_mode¶
“dry run” mode, write the result to a temporary view.
Example
Simple example regarding HistoricalFeatureStoreWriter class instantiation. We can instantiate this class without db configurations, so the class get the S3Config() where it provides default configurations about AWS S3 service.
>>> spark_client = SparkClient() >>> writer = HistoricalFeatureStoreWriter() >>> writer.write(feature_set=feature_set, ... dataframe=dataframe, ... spark_client=spark_client)
However, we can define the db configurations, like write mode, file format and S3 bucket, and provide them to HistoricalFeatureStoreWriter.
>>> spark_client = SparkClient() >>> config = MetastoreConfig(path="my_s3_bucket_name", ... mode="overwrite", ... format_="parquet") >>> writer = HistoricalFeatureStoreWriter(db_config=config) >>> writer.write(feature_set=feature_set, ... dataframe=dataframe, ... spark_client=spark_client)
For what settings you can use on S3Config and default settings, to read S3Config class.
We can write with interval mode, where HistoricalFeatureStoreWrite will need to use Dynamic Partition Inserts, the behaviour of OVERWRITE keyword is controlled by spark.sql.sources.partitionOverwriteMode configuration property. The dynamic overwrite mode is enabled Spark will only delete the partitions for which it has data to be written to. All the other partitions remain intact.
>>> spark_client = SparkClient() >>> writer = HistoricalFeatureStoreWriter(interval_mode=True) >>> writer.write(feature_set=feature_set, ... dataframe=dataframe, ... spark_client=spark_client)
We can instantiate HistoricalFeatureStoreWriter class to validate the df to be written.
>>> spark_client = SparkClient() >>> writer = HistoricalFeatureStoreWriter() >>> writer.validate(feature_set=feature_set, ... dataframe=dataframe, ... spark_client=spark_client)
Both methods (write and validate) will need the Spark Client, Feature Set and DataFrame, to write or to validate, according to the Writer’s arguments.
P.S.: When writing, the HistoricalFeatureStoreWrite partitions the data to improve queries performance. The data is stored in partition folders in AWS S3 based on time (per year, month and day).
- DEFAULT_VALIDATION_THRESHOLD = 0.01¶
- PARTITION_BY = ['year', 'month', 'day']¶
- check_schema(client: Any, dataframe: DataFrame, table_name: str, database: Optional[str] = None) DataFrame ¶
Instantiate the schema check hook to check schema between dataframe and database.
- Parameters:
client – client for Spark or Cassandra connections with external services.
dataframe – Spark dataframe containing data from a feature set.
table_name – table name where the dataframe will be saved.
database – database name where the dataframe will be saved.
- validate(feature_set: FeatureSet, dataframe: DataFrame, spark_client: SparkClient) None ¶
Calculate dataframe rows to validate data into Feature Store.
- Parameters:
feature_set – object processed with feature_set informations.
dataframe – spark dataframe containing data from a feature set.
spark_client – client for spark connections with external services.
- Raises:
AssertionError – if count of written data doesn’t match count in current feature set dataframe.
- write(feature_set: FeatureSet, dataframe: DataFrame, spark_client: SparkClient) None ¶
Loads the data from a feature set into the Historical Feature Store.
- Parameters:
feature_set – object processed with feature_set informations.
dataframe – spark dataframe containing data from a feature set.
spark_client – client for spark connections with external services.
If the debug_mode is set to True, a temporary table with a name in the format: historical_feature_store__{feature_set.name} will be created instead of writing to the real historical feature store.
Holds the Online Feature Store writer class.
- class butterfree.load.writers.online_feature_store_writer.OnlineFeatureStoreWriter(db_config: Optional[AbstractWriteConfig] = None, database: Optional[str] = None, debug_mode: bool = False, write_to_entity: bool = False, interval_mode: bool = False, check_schema_hook: Optional[Hook] = None)¶
Bases:
Writer
Enable writing feature sets into the Online Feature Store.
- db_config¶
Spark configuration for connect databases. For more information check the module ‘butterfree.db.configs’.
- debug_mode¶
“dry run” mode, write the result to a temporary view.
- write_to_entity¶
option to write the data to the entity table. With this option set to True, the writer will write the feature set to a table with the name equal to the entity name, defined on the pipeline. So, it WILL NOT write to a table with the name of the feature set, as it normally does.
Example
Simple example regarding OnlineFeatureStoreWriter class instantiation. We can instantiate this class without db configurations, so the class get the CassandraConfig() where it provides default configurations about CassandraDB.
>>> spark_client = SparkClient() >>> writer = OnlineFeatureStoreWriter() >>> writer.write(feature_set=feature_set, ... dataframe=dataframe, ... spark_client=spark_client)
However, we can define the db configurations and provide them to OnlineFeatureStoreWriter.
>>> spark_client = SparkClient() >>> config = CassandraConfig(mode="overwrite", ... format_="parquet", ... keyspace="keyspace_name")
>>> writer = OnlineFeatureStoreWriter(db_config=config) >>> writer.write(feature_set=feature_set, ... dataframe=dataframe, ... spark_client=spark_client)
For what settings you can use on CassandraConfig and default settings, to read CassandraConfig class.
We can instantiate OnlineFeatureStoreWriter class to validate the writers, using the default or custom configs.
>>> spark_client = SparkClient() >>> writer = OnlineFeatureStoreWriter() >>> writer.validate(feature_set=feature_set, ... dataframe=dataframe, ... spark_client=spark_client)
Both methods (writer and validate) will need the Spark Client, Feature Set and DataFrame, to write or to validate, according to OnlineFeatureStoreWriter class arguments.
There’s an important aspect to be highlighted here: if you’re using the incremental mode, we do not check if your data is the newest before writing to the online feature store.
This behavior is known and will be fixed soon.
- check_schema(client: Any, dataframe: DataFrame, table_name: str, database: Optional[str] = None) DataFrame ¶
Instantiate the schema check hook to check schema between dataframe and database.
- Parameters:
client – client for Spark or Cassandra connections with external services.
dataframe – Spark dataframe containing data from a feature set.
table_name – table name where the dataframe will be saved.
database – database name where the dataframe will be saved.
- static filter_latest(dataframe: DataFrame, id_columns: List[Any]) DataFrame ¶
Filters latest data from the dataframe.
- Parameters:
dataframe – spark dataframe containing data from a feature set.
id_columns – unique identifier column set for this feature set.
- Returns:
- contains only latest data for each unique id in the
feature set.
- Return type:
dataframe
- get_db_schema(feature_set: FeatureSet) List[Dict[Any, Any]] ¶
Get desired database schema.
- Parameters:
feature_set – object processed with feature set metadata.
- Returns:
Desired database schema.
- validate(feature_set: FeatureSet, dataframe: DataFrame, spark_client: SparkClient) None ¶
Calculate dataframe rows to validate data into Feature Store.
- Parameters:
feature_set – object processed with feature set metadata.
dataframe – Spark dataframe containing data from a feature set.
spark_client – client for Spark connections with external services.
- Raises:
AssertionError – if validation fails.
- write(feature_set: FeatureSet, dataframe: DataFrame, spark_client: SparkClient) Optional[StreamingQuery] ¶
Loads the latest data from a feature set into the Feature Store.
- Parameters:
feature_set – object processed with feature set metadata.
dataframe – Spark dataframe containing data from a feature set.
spark_client – client for Spark connections with external services.
- Returns:
Streaming handler if writing streaming df, None otherwise.
If the debug_mode is set to True, a temporary table with a name in the format: online_feature_store__my_feature_set will be created instead of writing to the real online feature store. If dataframe is streaming this temporary table will be updated in real time.
Writer entity.
- class butterfree.load.writers.writer.Writer(db_config: AbstractWriteConfig, debug_mode: bool = False, interval_mode: bool = False, write_to_entity: bool = False, row_count_validation: bool = True)¶
Bases:
ABC
,HookableComponent
Abstract base class for Writers.
- Parameters:
spark_client – client for spark connections with external services.
- abstract check_schema(client: Any, dataframe: DataFrame, table_name: str, database: Optional[str] = None) DataFrame ¶
Instantiate the schema check hook to check schema between dataframe and database.
- Parameters:
client – client for Spark or Cassandra connections with external services.
dataframe – Spark dataframe containing data from a feature set.
table_name – table name where the dataframe will be saved.
database – database name where the dataframe will be saved.
- abstract validate(feature_set: FeatureSet, dataframe: DataFrame, spark_client: SparkClient) Any ¶
Calculate dataframe rows to validate data into Feature Store.
- Parameters:
feature_set – object processed with feature set metadata.
dataframe – Spark dataframe containing data from a feature set.
spark_client – client for Spark connections with external services.
- Raises:
AssertionError – if validation fails.
- with_(transformer: Callable[[...], DataFrame], *args: Any, **kwargs: Any) Writer ¶
Define a new transformation for the Writer.
All the transformations are used when the method consume is called.
- Parameters:
transformer – method that receives a dataframe and output a dataframe.
*args – args for the transformer.
**kwargs – kwargs for the transformer.
- Returns:
Reader object with new transformation
- abstract write(feature_set: FeatureSet, dataframe: DataFrame, spark_client: SparkClient) Any ¶
Loads the data from a feature set into the Feature Store.
Feature Store could be Online or Historical.
- Parameters:
feature_set – object processed with feature set metadata.
dataframe – Spark dataframe containing data from a feature set.
spark_client – client for Spark connections with external services.
Module contents¶
Holds data loaders for historical and online feature store.
- class butterfree.load.writers.HistoricalFeatureStoreWriter(db_config: Optional[AbstractWriteConfig] = None, database: Optional[str] = None, num_partitions: Optional[int] = None, validation_threshold: float = 0.01, debug_mode: bool = False, interval_mode: bool = False, check_schema_hook: Optional[Hook] = None, row_count_validation: bool = True)¶
Bases:
Writer
Enable writing feature sets into the Historical Feature Store.
- db_config¶
Datalake configuration for Spark, by default on AWS S3. For more information check module ‘butterfree.db.configs’.
- database¶
database name to use in Spark metastore. By default FEATURE_STORE_HISTORICAL_DATABASE environment variable.
- num_partitions¶
value to use when applying repartition on the df before save.
- validation_threshold¶
lower and upper tolerance to using in count validation. The default value is defined in DEFAULT_VALIDATION_THRESHOLD property. For example: with a validation_threshold = 0.01 and a given calculated count on the dataframe equal to 100000 records, if the feature store return a count equal to 995000 an error will not be thrown. Use validation_threshold = 0 to not use tolerance in the validation.
- debug_mode¶
“dry run” mode, write the result to a temporary view.
Example
Simple example regarding HistoricalFeatureStoreWriter class instantiation. We can instantiate this class without db configurations, so the class get the S3Config() where it provides default configurations about AWS S3 service.
>>> spark_client = SparkClient() >>> writer = HistoricalFeatureStoreWriter() >>> writer.write(feature_set=feature_set, ... dataframe=dataframe, ... spark_client=spark_client)
However, we can define the db configurations, like write mode, file format and S3 bucket, and provide them to HistoricalFeatureStoreWriter.
>>> spark_client = SparkClient() >>> config = MetastoreConfig(path="my_s3_bucket_name", ... mode="overwrite", ... format_="parquet") >>> writer = HistoricalFeatureStoreWriter(db_config=config) >>> writer.write(feature_set=feature_set, ... dataframe=dataframe, ... spark_client=spark_client)
For what settings you can use on S3Config and default settings, to read S3Config class.
We can write with interval mode, where HistoricalFeatureStoreWrite will need to use Dynamic Partition Inserts, the behaviour of OVERWRITE keyword is controlled by spark.sql.sources.partitionOverwriteMode configuration property. The dynamic overwrite mode is enabled Spark will only delete the partitions for which it has data to be written to. All the other partitions remain intact.
>>> spark_client = SparkClient() >>> writer = HistoricalFeatureStoreWriter(interval_mode=True) >>> writer.write(feature_set=feature_set, ... dataframe=dataframe, ... spark_client=spark_client)
We can instantiate HistoricalFeatureStoreWriter class to validate the df to be written.
>>> spark_client = SparkClient() >>> writer = HistoricalFeatureStoreWriter() >>> writer.validate(feature_set=feature_set, ... dataframe=dataframe, ... spark_client=spark_client)
Both methods (write and validate) will need the Spark Client, Feature Set and DataFrame, to write or to validate, according to the Writer’s arguments.
P.S.: When writing, the HistoricalFeatureStoreWrite partitions the data to improve queries performance. The data is stored in partition folders in AWS S3 based on time (per year, month and day).
- DEFAULT_VALIDATION_THRESHOLD = 0.01¶
- PARTITION_BY = ['year', 'month', 'day']¶
- check_schema(client: Any, dataframe: DataFrame, table_name: str, database: Optional[str] = None) DataFrame ¶
Instantiate the schema check hook to check schema between dataframe and database.
- Parameters:
client – client for Spark or Cassandra connections with external services.
dataframe – Spark dataframe containing data from a feature set.
table_name – table name where the dataframe will be saved.
database – database name where the dataframe will be saved.
- transformations: List[Dict[str, Any]]¶
- validate(feature_set: FeatureSet, dataframe: DataFrame, spark_client: SparkClient) None ¶
Calculate dataframe rows to validate data into Feature Store.
- Parameters:
feature_set – object processed with feature_set informations.
dataframe – spark dataframe containing data from a feature set.
spark_client – client for spark connections with external services.
- Raises:
AssertionError – if count of written data doesn’t match count in current feature set dataframe.
- write(feature_set: FeatureSet, dataframe: DataFrame, spark_client: SparkClient) None ¶
Loads the data from a feature set into the Historical Feature Store.
- Parameters:
feature_set – object processed with feature_set informations.
dataframe – spark dataframe containing data from a feature set.
spark_client – client for spark connections with external services.
If the debug_mode is set to True, a temporary table with a name in the format: historical_feature_store__{feature_set.name} will be created instead of writing to the real historical feature store.
- class butterfree.load.writers.OnlineFeatureStoreWriter(db_config: Optional[AbstractWriteConfig] = None, database: Optional[str] = None, debug_mode: bool = False, write_to_entity: bool = False, interval_mode: bool = False, check_schema_hook: Optional[Hook] = None)¶
Bases:
Writer
Enable writing feature sets into the Online Feature Store.
- db_config¶
Spark configuration for connect databases. For more information check the module ‘butterfree.db.configs’.
- debug_mode¶
“dry run” mode, write the result to a temporary view.
- write_to_entity¶
option to write the data to the entity table. With this option set to True, the writer will write the feature set to a table with the name equal to the entity name, defined on the pipeline. So, it WILL NOT write to a table with the name of the feature set, as it normally does.
Example
Simple example regarding OnlineFeatureStoreWriter class instantiation. We can instantiate this class without db configurations, so the class get the CassandraConfig() where it provides default configurations about CassandraDB.
>>> spark_client = SparkClient() >>> writer = OnlineFeatureStoreWriter() >>> writer.write(feature_set=feature_set, ... dataframe=dataframe, ... spark_client=spark_client)
However, we can define the db configurations and provide them to OnlineFeatureStoreWriter.
>>> spark_client = SparkClient() >>> config = CassandraConfig(mode="overwrite", ... format_="parquet", ... keyspace="keyspace_name")
>>> writer = OnlineFeatureStoreWriter(db_config=config) >>> writer.write(feature_set=feature_set, ... dataframe=dataframe, ... spark_client=spark_client)
For what settings you can use on CassandraConfig and default settings, to read CassandraConfig class.
We can instantiate OnlineFeatureStoreWriter class to validate the writers, using the default or custom configs.
>>> spark_client = SparkClient() >>> writer = OnlineFeatureStoreWriter() >>> writer.validate(feature_set=feature_set, ... dataframe=dataframe, ... spark_client=spark_client)
Both methods (writer and validate) will need the Spark Client, Feature Set and DataFrame, to write or to validate, according to OnlineFeatureStoreWriter class arguments.
There’s an important aspect to be highlighted here: if you’re using the incremental mode, we do not check if your data is the newest before writing to the online feature store.
This behavior is known and will be fixed soon.
- check_schema(client: Any, dataframe: DataFrame, table_name: str, database: Optional[str] = None) DataFrame ¶
Instantiate the schema check hook to check schema between dataframe and database.
- Parameters:
client – client for Spark or Cassandra connections with external services.
dataframe – Spark dataframe containing data from a feature set.
table_name – table name where the dataframe will be saved.
database – database name where the dataframe will be saved.
- static filter_latest(dataframe: DataFrame, id_columns: List[Any]) DataFrame ¶
Filters latest data from the dataframe.
- Parameters:
dataframe – spark dataframe containing data from a feature set.
id_columns – unique identifier column set for this feature set.
- Returns:
- contains only latest data for each unique id in the
feature set.
- Return type:
dataframe
- get_db_schema(feature_set: FeatureSet) List[Dict[Any, Any]] ¶
Get desired database schema.
- Parameters:
feature_set – object processed with feature set metadata.
- Returns:
Desired database schema.
- validate(feature_set: FeatureSet, dataframe: DataFrame, spark_client: SparkClient) None ¶
Calculate dataframe rows to validate data into Feature Store.
- Parameters:
feature_set – object processed with feature set metadata.
dataframe – Spark dataframe containing data from a feature set.
spark_client – client for Spark connections with external services.
- Raises:
AssertionError – if validation fails.
- write(feature_set: FeatureSet, dataframe: DataFrame, spark_client: SparkClient) Optional[StreamingQuery] ¶
Loads the latest data from a feature set into the Feature Store.
- Parameters:
feature_set – object processed with feature set metadata.
dataframe – Spark dataframe containing data from a feature set.
spark_client – client for Spark connections with external services.
- Returns:
Streaming handler if writing streaming df, None otherwise.
If the debug_mode is set to True, a temporary table with a name in the format: online_feature_store__my_feature_set will be created instead of writing to the real online feature store. If dataframe is streaming this temporary table will be updated in real time.