butterfree.transform package¶
Subpackages¶
- butterfree.transform.features package
- butterfree.transform.transformations package
- Subpackages
- Submodules
- Module contents
- butterfree.transform.utils package
Submodules¶
AggregatedFeatureSet entity.
- class butterfree.transform.aggregated_feature_set.AggregatedFeatureSet(name: str, entity: str, description: str, keys: List[KeyFeature], timestamp: TimestampFeature, features: List[Feature])¶
Bases:
FeatureSet
Holds metadata about the aggregated feature set.
This class overrides some methods of the parent FeatureSet class and has specific methods for aggregations.
The AggregatedTransform can only be used on AggregatedFeatureSets. The construct method will be responsible by collecting every feature’s transformation definitions so it can run a groupby over the input dataframe, taking into account whether the user want’s to run an rolling window aggregation, pivoting or just apply aggregation functions.
Example
This an example regarding the aggregated feature set definition. All features and its transformations are defined.
>>> from butterfree.transform.aggregated_feature_set import ( ... AggregatedFeatureSet ... ) >>> from butterfree.transform.features import ( ... Feature, ... KeyFeature, ... TimestampFeature, ...) >>> from butterfree.transform.transformations import ( ... AggregatedTransform, ... ) >>> from butterfree.constants import DataType >>> from butterfree.clients import SparkClient >>> from butterfree.transform.utils import Function >>> import pyspark.sql.functions as F >>> client = SparkClient() >>> client.conn.conf.set("spark.sql.session.timeZone", "UTC") >>> dataframe = client.conn.createDataFrame( ... [ ... (1, "2020-01-01 13:01:00+000", 1000, "publicado"), ... (2, "2020-01-01 14:01:00+000", 2000, "publicado"), ... (1, "2020-01-02 13:01:00+000", 2000, "alugado"), ... (1, "2020-01-03 13:01:00+000", 1000, "despublicado"), ... (2, "2020-01-09 14:01:00+000", 1000, "despublicado"), ... ], ... ("id", "ts", "rent", "status"), ... ) >>> dataframe = dataframe.withColumn("ts", dataframe["ts"].cast("timestamp")) >>> feature_set = AggregatedFeatureSet( ... name="aggregated_feature_set", ... entity="entity", ... description="description", ... features=[ ... Feature( ... name="feature1", ... description="test", ... transformation=AggregatedTransform( ... functions=[ ... Function(F.avg, DataType.DOUBLE), ... Function(F.stddev_pop, DataType.DOUBLE)], ... ), ... from_column="rent", ... ), ... ], ... keys=[KeyFeature(name="id", description="lul")], ... timestamp=TimestampFeature(from_column="ts"), ...) >>> result = feature_set.construct( ... dataframe=dataframe, ... client=client, ... end_date="2020-01-15" ... ) >>> result.show() +---+-------------------+-------------+----------------+ | id| timestamp|feature1__avg|feature1__stddev| +---+-------------------+-------------+----------------+ | 1|2020-01-01 13:01:00| 1000.0| null| | 1|2020-01-03 13:01:00| 1000.0| null| | 1|2020-01-02 13:01:00| 2000.0| null| | 2|2020-01-09 14:01:00| 1000.0| null| | 2|2020-01-01 14:01:00| 2000.0| null| +---+-------------------+-------------+----------------+
Since you didn’t define a window, the AggregateFeatureSet will always group by keys and timestamp feature columns. So in this example, there will be no changes to the dataframe, since it doesn’t duplicate on id and timestamp :)
Let’s run one example with windows:
>>> feature_set.with_windows(definitions=["3 days"]) >>> result = feature_set.construct( ... dataframe=dataframe, ... client=client, ... end_date="2020-01-15" ... ) >>> result.orderBy("timestamp", "id").show() +---+-------------------+-----------------------------------------+ | id| timestamp|feature1__avg_over_3_days_rolling_windows| +---+-------------------+-----------------------------------------+ | 1|2020-01-01 00:00:00| null| | 2|2020-01-01 00:00:00| null| | 1|2020-01-02 00:00:00| 1000.0| | 2|2020-01-02 00:00:00| 2000.0| | 1|2020-01-03 00:00:00| 1500.0| | 1|2020-01-04 00:00:00| 1333.3333333333333| | 1|2020-01-05 00:00:00| 1500.0| | 2|2020-01-05 00:00:00| null| | 1|2020-01-06 00:00:00| 1000.0| | 1|2020-01-07 00:00:00| null| | 2|2020-01-10 00:00:00| 1000.0| | 2|2020-01-13 00:00:00| null| +---+-------------------+-----------------------------------------+
id
timestamp
feature1__stddev_over_3_days_rolling_windows
1 2 1 2 1 1 1 2 1 1 2 2
2020-01-01 00:00:00 2020-01-01 00:00:00 2020-01-02 00:00:00 2020-01-02 00:00:00 2020-01-03 00:00:00 2020-01-04 00:00:00 2020-01-05 00:00:00 2020-01-05 00:00:00 2020-01-06 00:00:00 2020-01-07 00:00:00 2020-01-10 00:00:00 2020-01-13 00:00:00
null null null null
707.1067811865476 577.3502691896258 707.1067811865476
null null null null null
(Had to break down the table result.)
And with pivot:
>>> feature_set.with_pivot(column="status", values=["publicado", "despublicado"]) +---+-------------------+-----------------------+--------------------------+ | id| timestamp|publicado_feature1__avg|publicado_feature1__stddev| +---+-------------------+-----------------------+--------------------------+ | 1|2020-01-01 13:01:00| 1000.0| null| | 2|2020-01-01 14:01:00| 2000.0| null| | 1|2020-01-02 13:01:00| null| null| | 1|2020-01-03 13:01:00| null| null| | 2|2020-01-09 14:01:00| null| null| +---+-------------------+-----------------------+--------------------------+
id
timestamp
despublicado_feature1__avg
despublicado_feature1__stddev
1 2 1 1 2
2020-01-01 13:01:00 2020-01-01 14:01:00 2020-01-02 13:01:00 2020-01-03 13:01:00 2020-01-09 14:01:00
null null null
1000.0 1000.0
null null null null null
As you can see, we need to pass the values you want to pivot. It optimizes this processing in spark and allows ignoring values when pivoting. If you wanted to get the pivot aggregation for “alugado” too, just use:
>>> feature_set.with_pivot( ... column="status", values=["publicado", "despublicado", "alugado"] ... )
You can also run it with pivot AND windows:
>>> feature_set.with_pivot( ... column="status", ... values=["publicado", "despublicado", "alugado"] ... ).with_windows(definitions=["1 day", "2 weeks"])
The construct method will execute the feature set, computing all the defined aggregated transformations at once.
Remember: when using an AggregatedFeatureSet without window, the group will use the timestamp column.
- construct(dataframe: DataFrame, client: SparkClient, end_date: Optional[str] = None, num_processors: Optional[int] = None, start_date: Optional[str] = None) DataFrame ¶
Use all the features to build the feature set dataframe.
After that, there’s the caching of the dataframe, however since cache() in Spark is lazy, an action is triggered in order to force persistence, but we only cache if it is not a streaming spark dataframe.
- Parameters:
dataframe – input dataframe to be transformed by the features.
client – client responsible for connecting to Spark session.
end_date – user defined max date for having aggregated data (exclusive).
num_processors – cluster total number of processors for repartitioning.
start_date – user defined min date for having aggregated data.
- Returns:
Spark dataframe with all the feature columns.
- define_start_date(start_date: Optional[str] = None) Optional[str] ¶
Get aggregated feature set start date.
- Parameters:
start_date – start date regarding source dataframe.
- Returns:
start date.
- property features_columns: List[str]¶
Name of the columns of all features in feature set.
- get_schema() List[Dict[str, Any]] ¶
Get feature set schema.
- Returns:
List of dicts with the feature set schema.
- with_distinct(subset: List, keep: str = 'last') AggregatedFeatureSet ¶
Add a distinct configuration for your aggregated feature set.
- Parameters:
subset – the columns where it will identify duplicates.
keep – determines which duplicates to keep. Default ‘last’. - first : Ascending sorting by timestamp. - last : Descending sorting by timestamp.
- Returns:
An AggregatedFeatureSet configured with distinct.
- with_pivot(column: str, values: Optional[List[Union[bool, float, int, str]]]) AggregatedFeatureSet ¶
Add a pivot configuration for your aggregated feature set.
This means we will group the input data, pivot over the column parameter and run each aggregation function over the columns within each value group in the values parameter. In spark it will be something like: dataframe.groupBy(*group).pivot(column, values).agg(*aggregations)
- Parameters:
column – the column, containing categorical values, to pivot on.
values – the distinct values you want to be pivoted.
- Returns:
An AggregatedFeatureSet configured with pivot.
- with_windows(definitions: List[str], slide: Optional[str] = None) AggregatedFeatureSet ¶
Create a list with windows defined.
FeatureSet entity.
- class butterfree.transform.feature_set.FeatureSet(name: str, entity: str, description: str, keys: List[KeyFeature], timestamp: TimestampFeature, features: List[Feature])¶
Bases:
HookableComponent
Holds metadata about the feature set and constructs the final dataframe.
- name¶
name of the feature set.
- entity¶
business context tag for the feature set, an entity for which we are creating all these features.
- description¶
details about the feature set purpose.
- keys¶
key features to define this feature set. Values for keys (may be a composition) should be unique on each moment in time (controlled by the TimestampFeature).
- timestamp¶
A single feature that define a timestamp for each observation in this feature set.
- features¶
features to compose the feature set.
Example
This an example regarding the feature set definition. All features and its transformations are defined.
>>> from butterfree.transform import FeatureSet >>> from butterfree.transform.features import ( ... Feature, ... KeyFeature, ... TimestampFeature, ...) >>> from butterfree.transform.transformations import ( ... SparkFunctionTransform, ... CustomTransform, ... ) >>> from butterfree.constants import DataType >>> from butterfree.transform.utils import Function >>> import pyspark.sql.functions as F
>>> def divide(df, fs, column1, column2): ... name = fs.get_output_columns()[0] ... df = df.withColumn(name, F.col(column1) / F.col(column2)) ... return df
>>> feature_set = FeatureSet( ... name="feature_set", ... entity="entity", ... description="description", ... features=[ ... Feature( ... name="feature1", ... description="test", ... transformation=SparkFunctionTransform( ... functions=[ ... Function(F.avg, DataType.DOUBLE), ... Function(F.stddev_pop, DataType.DOUBLE)] ... ).with_window( ... partition_by="id", ... order_by=TIMESTAMP_COLUMN, ... mode="fixed_windows", ... window_definition=["2 minutes", "15 minutes"], ... ), ... ), ... Feature( ... name="divided_feature", ... description="unit test", ... transformation=CustomTransform( ... transformer=divide, column1="feature1", column2="feature2", ... ), ... ), ... ], ... keys=[KeyFeature(name="id", description="The user's Main ID or device ID")], ... timestamp=TimestampFeature(), ...)
>>> feature_set.construct(dataframe=dataframe)
This last method (construct) will execute the feature set, computing all the defined transformations.
There’s also a functionality regarding the construct method within the scope of FeatureSet called filter_duplicated_rows. We drop rows that have repeated values over key columns and timestamp column, we do this in order to reduce our dataframe (regarding the number of rows). A detailed explation of this method can be found at filter_duplicated_rows docstring.
- property columns: List[str]¶
All data columns within this feature set.
This references all data columns that will be created by the construct method, given keys, timestamp and features of this feature set.
- Returns:
List of column names built in this feature set.
- construct(dataframe: DataFrame, client: SparkClient, end_date: Optional[str] = None, num_processors: Optional[int] = None, start_date: Optional[str] = None) DataFrame ¶
Use all the features to build the feature set dataframe.
After that, there’s the caching of the dataframe, however since cache() in Spark is lazy, an action is triggered in order to force persistence.
- Parameters:
dataframe – input dataframe to be transformed by the features.
client – client responsible for connecting to Spark session.
start_date – user defined start date.
end_date – user defined end date.
num_processors – cluster total number of processors for repartitioning.
- Returns:
Spark dataframe with all the feature columns.
- define_start_date(start_date: Optional[str] = None) Optional[str] ¶
Get feature set start date.
- Parameters:
start_date – start date regarding source dataframe.
- Returns:
start date.
- property description: str¶
Details about the feature set purpose.
- property entity: str¶
Business context tag for the feature set.
- property features_columns: List[str]¶
Name of the columns of all features in feature set.
- get_schema() List[Dict[str, Any]] ¶
Get feature set schema.
- Returns:
List of dicts regarding cassandra feature set schema.
- property keys: List[KeyFeature]¶
Key features to define this feature set.
- property keys_columns: List[str]¶
Name of the columns of all keys in feature set.
- property name: str¶
Name of the feature set.
- property timestamp: TimestampFeature¶
Defines a timestamp for each observation in this feature set.
- property timestamp_column: str¶
Name of the timestamp column in feature set.
Module contents¶
The Transform Component of a Feature Set.
- class butterfree.transform.FeatureSet(name: str, entity: str, description: str, keys: List[KeyFeature], timestamp: TimestampFeature, features: List[Feature])¶
Bases:
HookableComponent
Holds metadata about the feature set and constructs the final dataframe.
- name¶
name of the feature set.
- entity¶
business context tag for the feature set, an entity for which we are creating all these features.
- description¶
details about the feature set purpose.
- keys¶
key features to define this feature set. Values for keys (may be a composition) should be unique on each moment in time (controlled by the TimestampFeature).
- timestamp¶
A single feature that define a timestamp for each observation in this feature set.
- features¶
features to compose the feature set.
Example
This an example regarding the feature set definition. All features and its transformations are defined.
>>> from butterfree.transform import FeatureSet >>> from butterfree.transform.features import ( ... Feature, ... KeyFeature, ... TimestampFeature, ...) >>> from butterfree.transform.transformations import ( ... SparkFunctionTransform, ... CustomTransform, ... ) >>> from butterfree.constants import DataType >>> from butterfree.transform.utils import Function >>> import pyspark.sql.functions as F
>>> def divide(df, fs, column1, column2): ... name = fs.get_output_columns()[0] ... df = df.withColumn(name, F.col(column1) / F.col(column2)) ... return df
>>> feature_set = FeatureSet( ... name="feature_set", ... entity="entity", ... description="description", ... features=[ ... Feature( ... name="feature1", ... description="test", ... transformation=SparkFunctionTransform( ... functions=[ ... Function(F.avg, DataType.DOUBLE), ... Function(F.stddev_pop, DataType.DOUBLE)] ... ).with_window( ... partition_by="id", ... order_by=TIMESTAMP_COLUMN, ... mode="fixed_windows", ... window_definition=["2 minutes", "15 minutes"], ... ), ... ), ... Feature( ... name="divided_feature", ... description="unit test", ... transformation=CustomTransform( ... transformer=divide, column1="feature1", column2="feature2", ... ), ... ), ... ], ... keys=[KeyFeature(name="id", description="The user's Main ID or device ID")], ... timestamp=TimestampFeature(), ...)
>>> feature_set.construct(dataframe=dataframe)
This last method (construct) will execute the feature set, computing all the defined transformations.
There’s also a functionality regarding the construct method within the scope of FeatureSet called filter_duplicated_rows. We drop rows that have repeated values over key columns and timestamp column, we do this in order to reduce our dataframe (regarding the number of rows). A detailed explation of this method can be found at filter_duplicated_rows docstring.
- property columns: List[str]¶
All data columns within this feature set.
This references all data columns that will be created by the construct method, given keys, timestamp and features of this feature set.
- Returns:
List of column names built in this feature set.
- construct(dataframe: DataFrame, client: SparkClient, end_date: Optional[str] = None, num_processors: Optional[int] = None, start_date: Optional[str] = None) DataFrame ¶
Use all the features to build the feature set dataframe.
After that, there’s the caching of the dataframe, however since cache() in Spark is lazy, an action is triggered in order to force persistence.
- Parameters:
dataframe – input dataframe to be transformed by the features.
client – client responsible for connecting to Spark session.
start_date – user defined start date.
end_date – user defined end date.
num_processors – cluster total number of processors for repartitioning.
- Returns:
Spark dataframe with all the feature columns.
- define_start_date(start_date: Optional[str] = None) Optional[str] ¶
Get feature set start date.
- Parameters:
start_date – start date regarding source dataframe.
- Returns:
start date.
- property description: str¶
Details about the feature set purpose.
- property entity: str¶
Business context tag for the feature set.
- property features_columns: List[str]¶
Name of the columns of all features in feature set.
- get_schema() List[Dict[str, Any]] ¶
Get feature set schema.
- Returns:
List of dicts regarding cassandra feature set schema.
- property keys: List[KeyFeature]¶
Key features to define this feature set.
- property keys_columns: List[str]¶
Name of the columns of all keys in feature set.
- property name: str¶
Name of the feature set.
- property timestamp: TimestampFeature¶
Defines a timestamp for each observation in this feature set.
- property timestamp_column: str¶
Name of the timestamp column in feature set.