butterfree.transform package

Subpackages

Submodules

AggregatedFeatureSet entity.

class butterfree.transform.aggregated_feature_set.AggregatedFeatureSet(name: str, entity: str, description: str, keys: List[KeyFeature], timestamp: TimestampFeature, features: List[Feature])

Bases: FeatureSet

Holds metadata about the aggregated feature set.

This class overrides some methods of the parent FeatureSet class and has specific methods for aggregations.

The AggregatedTransform can only be used on AggregatedFeatureSets. The construct method will be responsible by collecting every feature’s transformation definitions so it can run a groupby over the input dataframe, taking into account whether the user want’s to run an rolling window aggregation, pivoting or just apply aggregation functions.

Example

This an example regarding the aggregated feature set definition. All features and its transformations are defined.

>>> from butterfree.transform.aggregated_feature_set import (
...       AggregatedFeatureSet
... )
>>> from butterfree.transform.features import (
...     Feature,
...     KeyFeature,
...     TimestampFeature,
...)
>>> from butterfree.transform.transformations import (
...     AggregatedTransform,
... )
>>> from butterfree.constants import DataType
>>> from butterfree.clients import SparkClient
>>> from butterfree.transform.utils import Function
>>> import pyspark.sql.functions as F
>>> client = SparkClient()
>>> client.conn.conf.set("spark.sql.session.timeZone", "UTC")
>>> dataframe = client.conn.createDataFrame(
...     [
...         (1, "2020-01-01 13:01:00+000", 1000, "publicado"),
...         (2, "2020-01-01 14:01:00+000", 2000, "publicado"),
...         (1, "2020-01-02 13:01:00+000", 2000, "alugado"),
...         (1, "2020-01-03 13:01:00+000", 1000, "despublicado"),
...         (2, "2020-01-09 14:01:00+000", 1000, "despublicado"),
...     ],
...     ("id", "ts", "rent", "status"),
... )
>>> dataframe = dataframe.withColumn("ts", dataframe["ts"].cast("timestamp"))
>>> feature_set = AggregatedFeatureSet(
...    name="aggregated_feature_set",
...    entity="entity",
...    description="description",
...    features=[
...        Feature(
...            name="feature1",
...            description="test",
...            transformation=AggregatedTransform(
...                 functions=[
...                    Function(F.avg, DataType.DOUBLE),
...                    Function(F.stddev_pop, DataType.DOUBLE)],
...             ),
...             from_column="rent",
...        ),
...    ],
...    keys=[KeyFeature(name="id", description="lul")],
...    timestamp=TimestampFeature(from_column="ts"),
...)
>>> result = feature_set.construct(
...     dataframe=dataframe,
...     client=client,
...     end_date="2020-01-15"
... )
>>> result.show()
+---+-------------------+-------------+----------------+
| id|          timestamp|feature1__avg|feature1__stddev|
+---+-------------------+-------------+----------------+
|  1|2020-01-01 13:01:00|       1000.0|            null|
|  1|2020-01-03 13:01:00|       1000.0|            null|
|  1|2020-01-02 13:01:00|       2000.0|            null|
|  2|2020-01-09 14:01:00|       1000.0|            null|
|  2|2020-01-01 14:01:00|       2000.0|            null|
+---+-------------------+-------------+----------------+

Since you didn’t define a window, the AggregateFeatureSet will always group by keys and timestamp feature columns. So in this example, there will be no changes to the dataframe, since it doesn’t duplicate on id and timestamp :)

Let’s run one example with windows:

>>> feature_set.with_windows(definitions=["3 days"])
>>> result = feature_set.construct(
...     dataframe=dataframe,
...     client=client,
...     end_date="2020-01-15"
... )
>>> result.orderBy("timestamp", "id").show()
+---+-------------------+-----------------------------------------+
| id|          timestamp|feature1__avg_over_3_days_rolling_windows|
+---+-------------------+-----------------------------------------+
|  1|2020-01-01 00:00:00|                                     null|
|  2|2020-01-01 00:00:00|                                     null|
|  1|2020-01-02 00:00:00|                                   1000.0|
|  2|2020-01-02 00:00:00|                                   2000.0|
|  1|2020-01-03 00:00:00|                                   1500.0|
|  1|2020-01-04 00:00:00|                       1333.3333333333333|
|  1|2020-01-05 00:00:00|                                   1500.0|
|  2|2020-01-05 00:00:00|                                     null|
|  1|2020-01-06 00:00:00|                                   1000.0|
|  1|2020-01-07 00:00:00|                                     null|
|  2|2020-01-10 00:00:00|                                   1000.0|
|  2|2020-01-13 00:00:00|                                     null|
+---+-------------------+-----------------------------------------+

id

timestamp

feature1__stddev_over_3_days_rolling_windows

1 2 1 2 1 1 1 2 1 1 2 2

2020-01-01 00:00:00 2020-01-01 00:00:00 2020-01-02 00:00:00 2020-01-02 00:00:00 2020-01-03 00:00:00 2020-01-04 00:00:00 2020-01-05 00:00:00 2020-01-05 00:00:00 2020-01-06 00:00:00 2020-01-07 00:00:00 2020-01-10 00:00:00 2020-01-13 00:00:00

null null null null

707.1067811865476 577.3502691896258 707.1067811865476

null null null null null

(Had to break down the table result.)

And with pivot:

>>> feature_set.with_pivot(column="status", values=["publicado", "despublicado"])
+---+-------------------+-----------------------+--------------------------+
| id|          timestamp|publicado_feature1__avg|publicado_feature1__stddev|
+---+-------------------+-----------------------+--------------------------+
|  1|2020-01-01 13:01:00|                 1000.0|                      null|
|  2|2020-01-01 14:01:00|                 2000.0|                      null|
|  1|2020-01-02 13:01:00|                   null|                      null|
|  1|2020-01-03 13:01:00|                   null|                      null|
|  2|2020-01-09 14:01:00|                   null|                      null|
+---+-------------------+-----------------------+--------------------------+

id

timestamp

despublicado_feature1__avg

despublicado_feature1__stddev

1 2 1 1 2

2020-01-01 13:01:00 2020-01-01 14:01:00 2020-01-02 13:01:00 2020-01-03 13:01:00 2020-01-09 14:01:00

null null null

1000.0 1000.0

null null null null null

As you can see, we need to pass the values you want to pivot. It optimizes this processing in spark and allows ignoring values when pivoting. If you wanted to get the pivot aggregation for “alugado” too, just use:

>>> feature_set.with_pivot(
...     column="status", values=["publicado", "despublicado", "alugado"]
... )

You can also run it with pivot AND windows:

>>> feature_set.with_pivot(
...     column="status",
...     values=["publicado", "despublicado", "alugado"]
... ).with_windows(definitions=["1 day", "2 weeks"])

The construct method will execute the feature set, computing all the defined aggregated transformations at once.

Remember: when using an AggregatedFeatureSet without window, the group will use the timestamp column.

construct(dataframe: DataFrame, client: SparkClient, end_date: Optional[str] = None, num_processors: Optional[int] = None, start_date: Optional[str] = None) DataFrame

Use all the features to build the feature set dataframe.

After that, there’s the caching of the dataframe, however since cache() in Spark is lazy, an action is triggered in order to force persistence, but we only cache if it is not a streaming spark dataframe.

Parameters:
  • dataframe – input dataframe to be transformed by the features.

  • client – client responsible for connecting to Spark session.

  • end_date – user defined max date for having aggregated data (exclusive).

  • num_processors – cluster total number of processors for repartitioning.

  • start_date – user defined min date for having aggregated data.

Returns:

Spark dataframe with all the feature columns.

define_start_date(start_date: Optional[str] = None) Optional[str]

Get aggregated feature set start date.

Parameters:

start_date – start date regarding source dataframe.

Returns:

start date.

property features: List[Feature]

Features to compose the feature set.

property features_columns: List[str]

Name of the columns of all features in feature set.

get_schema() List[Dict[str, Any]]

Get feature set schema.

Returns:

List of dicts with the feature set schema.

with_distinct(subset: List, keep: str = 'last') AggregatedFeatureSet

Add a distinct configuration for your aggregated feature set.

Parameters:
  • subset – the columns where it will identify duplicates.

  • keep – determines which duplicates to keep. Default ‘last’. - first : Ascending sorting by timestamp. - last : Descending sorting by timestamp.

Returns:

An AggregatedFeatureSet configured with distinct.

with_pivot(column: str, values: Optional[List[Union[bool, float, int, str]]]) AggregatedFeatureSet

Add a pivot configuration for your aggregated feature set.

This means we will group the input data, pivot over the column parameter and run each aggregation function over the columns within each value group in the values parameter. In spark it will be something like: dataframe.groupBy(*group).pivot(column, values).agg(*aggregations)

Parameters:
  • column – the column, containing categorical values, to pivot on.

  • values – the distinct values you want to be pivoted.

Returns:

An AggregatedFeatureSet configured with pivot.

with_windows(definitions: List[str], slide: Optional[str] = None) AggregatedFeatureSet

Create a list with windows defined.

FeatureSet entity.

class butterfree.transform.feature_set.FeatureSet(name: str, entity: str, description: str, keys: List[KeyFeature], timestamp: TimestampFeature, features: List[Feature])

Bases: HookableComponent

Holds metadata about the feature set and constructs the final dataframe.

name

name of the feature set.

entity

business context tag for the feature set, an entity for which we are creating all these features.

description

details about the feature set purpose.

keys

key features to define this feature set. Values for keys (may be a composition) should be unique on each moment in time (controlled by the TimestampFeature).

timestamp

A single feature that define a timestamp for each observation in this feature set.

features

features to compose the feature set.

Example

This an example regarding the feature set definition. All features and its transformations are defined.

>>> from butterfree.transform import FeatureSet
>>> from butterfree.transform.features import (
...     Feature,
...     KeyFeature,
...     TimestampFeature,
...)
>>> from butterfree.transform.transformations import (
...     SparkFunctionTransform,
...     CustomTransform,
... )
>>> from butterfree.constants import DataType
>>> from butterfree.transform.utils import Function
>>> import pyspark.sql.functions as F
>>> def divide(df, fs, column1, column2):
...     name = fs.get_output_columns()[0]
...     df = df.withColumn(name, F.col(column1) / F.col(column2))
...     return df
>>> feature_set = FeatureSet(
...    name="feature_set",
...    entity="entity",
...    description="description",
...    features=[
...        Feature(
...            name="feature1",
...            description="test",
...            transformation=SparkFunctionTransform(
...                 functions=[
...                            Function(F.avg, DataType.DOUBLE),
...                            Function(F.stddev_pop, DataType.DOUBLE)]
...             ).with_window(
...                 partition_by="id",
...                 order_by=TIMESTAMP_COLUMN,
...                 mode="fixed_windows",
...                 window_definition=["2 minutes", "15 minutes"],
...             ),
...        ),
...        Feature(
...            name="divided_feature",
...            description="unit test",
...            transformation=CustomTransform(
...                transformer=divide, column1="feature1", column2="feature2",
...            ),
...        ),
...    ],
...    keys=[KeyFeature(name="id", description="The user's Main ID or device ID")],
...    timestamp=TimestampFeature(),
...)
>>> feature_set.construct(dataframe=dataframe)

This last method (construct) will execute the feature set, computing all the defined transformations.

There’s also a functionality regarding the construct method within the scope of FeatureSet called filter_duplicated_rows. We drop rows that have repeated values over key columns and timestamp column, we do this in order to reduce our dataframe (regarding the number of rows). A detailed explation of this method can be found at filter_duplicated_rows docstring.

property columns: List[str]

All data columns within this feature set.

This references all data columns that will be created by the construct method, given keys, timestamp and features of this feature set.

Returns:

List of column names built in this feature set.

construct(dataframe: DataFrame, client: SparkClient, end_date: Optional[str] = None, num_processors: Optional[int] = None, start_date: Optional[str] = None) DataFrame

Use all the features to build the feature set dataframe.

After that, there’s the caching of the dataframe, however since cache() in Spark is lazy, an action is triggered in order to force persistence.

Parameters:
  • dataframe – input dataframe to be transformed by the features.

  • client – client responsible for connecting to Spark session.

  • start_date – user defined start date.

  • end_date – user defined end date.

  • num_processors – cluster total number of processors for repartitioning.

Returns:

Spark dataframe with all the feature columns.

define_start_date(start_date: Optional[str] = None) Optional[str]

Get feature set start date.

Parameters:

start_date – start date regarding source dataframe.

Returns:

start date.

property description: str

Details about the feature set purpose.

property entity: str

Business context tag for the feature set.

property features: List[Feature]

Features to compose the feature set.

property features_columns: List[str]

Name of the columns of all features in feature set.

get_schema() List[Dict[str, Any]]

Get feature set schema.

Returns:

List of dicts regarding cassandra feature set schema.

property keys: List[KeyFeature]

Key features to define this feature set.

property keys_columns: List[str]

Name of the columns of all keys in feature set.

property name: str

Name of the feature set.

property timestamp: TimestampFeature

Defines a timestamp for each observation in this feature set.

property timestamp_column: str

Name of the timestamp column in feature set.

Module contents

The Transform Component of a Feature Set.

class butterfree.transform.FeatureSet(name: str, entity: str, description: str, keys: List[KeyFeature], timestamp: TimestampFeature, features: List[Feature])

Bases: HookableComponent

Holds metadata about the feature set and constructs the final dataframe.

name

name of the feature set.

entity

business context tag for the feature set, an entity for which we are creating all these features.

description

details about the feature set purpose.

keys

key features to define this feature set. Values for keys (may be a composition) should be unique on each moment in time (controlled by the TimestampFeature).

timestamp

A single feature that define a timestamp for each observation in this feature set.

features

features to compose the feature set.

Example

This an example regarding the feature set definition. All features and its transformations are defined.

>>> from butterfree.transform import FeatureSet
>>> from butterfree.transform.features import (
...     Feature,
...     KeyFeature,
...     TimestampFeature,
...)
>>> from butterfree.transform.transformations import (
...     SparkFunctionTransform,
...     CustomTransform,
... )
>>> from butterfree.constants import DataType
>>> from butterfree.transform.utils import Function
>>> import pyspark.sql.functions as F
>>> def divide(df, fs, column1, column2):
...     name = fs.get_output_columns()[0]
...     df = df.withColumn(name, F.col(column1) / F.col(column2))
...     return df
>>> feature_set = FeatureSet(
...    name="feature_set",
...    entity="entity",
...    description="description",
...    features=[
...        Feature(
...            name="feature1",
...            description="test",
...            transformation=SparkFunctionTransform(
...                 functions=[
...                            Function(F.avg, DataType.DOUBLE),
...                            Function(F.stddev_pop, DataType.DOUBLE)]
...             ).with_window(
...                 partition_by="id",
...                 order_by=TIMESTAMP_COLUMN,
...                 mode="fixed_windows",
...                 window_definition=["2 minutes", "15 minutes"],
...             ),
...        ),
...        Feature(
...            name="divided_feature",
...            description="unit test",
...            transformation=CustomTransform(
...                transformer=divide, column1="feature1", column2="feature2",
...            ),
...        ),
...    ],
...    keys=[KeyFeature(name="id", description="The user's Main ID or device ID")],
...    timestamp=TimestampFeature(),
...)
>>> feature_set.construct(dataframe=dataframe)

This last method (construct) will execute the feature set, computing all the defined transformations.

There’s also a functionality regarding the construct method within the scope of FeatureSet called filter_duplicated_rows. We drop rows that have repeated values over key columns and timestamp column, we do this in order to reduce our dataframe (regarding the number of rows). A detailed explation of this method can be found at filter_duplicated_rows docstring.

property columns: List[str]

All data columns within this feature set.

This references all data columns that will be created by the construct method, given keys, timestamp and features of this feature set.

Returns:

List of column names built in this feature set.

construct(dataframe: DataFrame, client: SparkClient, end_date: Optional[str] = None, num_processors: Optional[int] = None, start_date: Optional[str] = None) DataFrame

Use all the features to build the feature set dataframe.

After that, there’s the caching of the dataframe, however since cache() in Spark is lazy, an action is triggered in order to force persistence.

Parameters:
  • dataframe – input dataframe to be transformed by the features.

  • client – client responsible for connecting to Spark session.

  • start_date – user defined start date.

  • end_date – user defined end date.

  • num_processors – cluster total number of processors for repartitioning.

Returns:

Spark dataframe with all the feature columns.

define_start_date(start_date: Optional[str] = None) Optional[str]

Get feature set start date.

Parameters:

start_date – start date regarding source dataframe.

Returns:

start date.

property description: str

Details about the feature set purpose.

property entity: str

Business context tag for the feature set.

property features: List[Feature]

Features to compose the feature set.

property features_columns: List[str]

Name of the columns of all features in feature set.

get_schema() List[Dict[str, Any]]

Get feature set schema.

Returns:

List of dicts regarding cassandra feature set schema.

property keys: List[KeyFeature]

Key features to define this feature set.

property keys_columns: List[str]

Name of the columns of all keys in feature set.

property name: str

Name of the feature set.

property timestamp: TimestampFeature

Defines a timestamp for each observation in this feature set.

property timestamp_column: str

Name of the timestamp column in feature set.