butterfree.transform.transformations package

Submodules

Aggregated Transform entity.

class butterfree.transform.transformations.aggregated_transform.AggregatedTransform(functions: parameters_validation.parameter_validation_decorator.parameter_validation.<locals>.func_partial.<locals>.validation_partial, filter_expression: str = None)

Bases: butterfree.transform.transformations.transform_component.TransformComponent

Specifies an aggregation.

This transformation needs to be used within an AggregatedFeatureSet. Unlike the other transformations, this class won’t have a transform method implemented.

The idea behing aggregating is that, in spark, we should execute all aggregation functions after a single groupby. So an AggregateFeatureSet will have many Features with AggregatedTransform. If each one of them needs to apply a groupby.agg(), then we must join all the results in the end, making this computation extremely slow.

Now, the AggregateFeatureSet will collect all Features’ AggregatedTransform definitions and run, at once, a groupby.agg(*aggregations).

This class helps defining on a feature, which aggregation function will be applied to build a new aggregated column. Allowed aggregations are registered under the

allowed_aggregations property.

functions

namedtuple with aggregation function and data type.

filter_expression

sql boolean expression to be used inside agg function. The filter expression can be used to aggregate some column only with records that obey certain condition. Has the same behaviour of the following SQL expression: agg(case when filter_expression then col end)

Example

>>> from butterfree.transform.transformations import AggregatedTransform
>>> from butterfree.transform.features import Feature
>>> from butterfree.constants import DataType
>>> from butterfree.transform.utils import Function
>>> import pyspark.sql.functions as F
>>> feature = Feature(
...     name="feature",
...     description="aggregated transform",
...     transformation=AggregatedTransform(
...         functions=[
...                    Function(F.avg, DataType.DOUBLE),
...                    Function(F.stddev_pop, DataType.DOUBLE)],
...     ),
...     from_column="somenumber",
...)
>>> feature.get_output_columns()
['feature__avg', 'feature__stddev_pop']
>>> feature.transform(anydf)
NotImplementedError: ...
property aggregations

Aggregated spark columns.

property output_columns

Columns names generated by the transformation.

transform(dataframe: pyspark.sql.dataframe.DataFrame) → pyspark.sql.dataframe.DataFrame

(NotImplemented) Performs a transformation to the feature pipeline.

For the AggregatedTransform, the transformation won’t be applied without using an AggregatedFeatureSet.

Parameters

dataframe – input dataframe.

Raises

NotImplementedError.

CustomTransform entity.

class butterfree.transform.transformations.custom_transform.CustomTransform(transformer: Callable, **kwargs)

Bases: butterfree.transform.transformations.transform_component.TransformComponent

Defines a Custom Transform.

transformer

function to use for transforming the dataframe

\*\*kwargs

kwargs for the transformer

Example

It’s necessary to instantiate the CustomTransform class using a custom method that must always receive a dataframe and the parent feature as arguments and the custom arguments must be passed to the builder through *kwargs.

>>> from butterfree.transform.transformations import CustomTransform
>>> from butterfree.transform.features import Feature
>>> from pyspark import SparkContext
>>> from pyspark.sql import session
>>> import pyspark.sql.functions as F
>>> sc = SparkContext.getOrCreate()
>>> spark = session.SparkSession(sc)
>>> df = spark.createDataFrame([(1, "2016-04-11 11:31:11", 200, 200),
...                             (1, "2016-04-11 11:44:12", 300, 300),
...                             (1, "2016-04-11 11:46:24", 400, 400),
...                             (1, "2016-04-11 12:03:21", 500, 500)]
...                           ).toDF("id", "timestamp", "feature1", "feature2")
>>> def divide(df, parent_feature, column1, column2):
...     name = parent_feature.get_output_columns()[0]
...     df = df.withColumn(name, F.col(column1) / F.col(column2))
...     return df
>>> feature = Feature(
...    name="feature",
...    description="custom transform usage example",
...    transformation=CustomTransform(
...        transformer=divide, column1="feature1", column2="feature2",
...    )
...)
>>> feature.transform(df).orderBy("timestamp").show()
+--------+--------+---+-------------------+--------+
|feature1|feature2| id|          timestamp|feature|
+--------+--------+---+-------------------+--------+
|     200|     200|  1|2016-04-11 11:31:11|    1.0|
|     300|     300|  1|2016-04-11 11:44:12|    1.0|
|     400|     400|  1|2016-04-11 11:46:24|    1.0|
|     500|     500|  1|2016-04-11 12:03:21|    1.0|
+--------+--------+---+-------------------+--------+
property output_columns

Columns generated by transformation.

transform(dataframe: pyspark.sql.dataframe.DataFrame) → pyspark.sql.dataframe.DataFrame

Performs a transformation to the feature pipeline.

Parameters

dataframe – input dataframe to be transformed.

Returns

Transformed dataframe.

property transformer

Function to use for transforming the dataframe.

H3 Transform entity.

class butterfree.transform.transformations.h3_transform.H3HashTransform(h3_resolutions: parameters_validation.parameter_validation_decorator.parameter_validation.<locals>.func_partial.<locals>.validation_partial, lat_column: parameters_validation.parameter_validation_decorator.parameter_validation.<locals>.func_partial.<locals>.validation_partial, lng_column: parameters_validation.parameter_validation_decorator.parameter_validation.<locals>.func_partial.<locals>.validation_partial)

Bases: butterfree.transform.transformations.transform_component.TransformComponent

Defines a H3 hash transformation.

h3_resolutions

h3 resolutions from 6 to 12.

lat_column

latitude column.

lng_column

longitude column.

Example

It’s necessary to declare the desired h3 resolutions and latitude and longitude columns.

>>> from butterfree.transform.features import Feature
>>> from butterfree.transform.transformations import (
... H3HashTransform
...)
>>> from pyspark import SparkContext
>>> from pyspark.sql import session
>>> sc = SparkContext.getOrCreate()
>>> spark = session.SparkSession(sc)
>>> df = spark.createDataFrame([(1, 200, -23.554190, -46.670723),
...                             (1, 300, -23.554190, -46.670723),
...                             (1, 400, -23.554190, -46.670723),
...                             (1, 500, -23.554190, -46.670723)]
...                           ).toDF("id", "feature", "lat", "lng")
>>> feature = Feature(
...    name="feature",
...    description="h3 hash transform usage example",
...    transformation=H3HashTransform(
...        h3_resolutions=[6, 7, 8, 9, 10, 11, 12],
...        lat_column="lat",
...        lng_column="lng",
...    )
...)
>>> feature.transform(df).show()
+-------+---+---------+----------+-------------------+-------------------+
|feature| id|      lat|       lng|lat_lng__h3_hash__6|lat_lng__h3_hash__7|
+-------+---+---------+----------+-------------------+-------------------+
|    200|  1|-23.55419|-46.670723|    86a8100efffffff|    87a8100eaffffff|
|    300|  1|-23.55419|-46.670723|    86a8100efffffff|    87a8100eaffffff|
|    400|  1|-23.55419|-46.670723|    86a8100efffffff|    87a8100eaffffff|
|    500|  1|-23.55419|-46.670723|    86a8100efffffff|    87a8100eaffffff|
+-------+---+---------+----------+-------------------+-------------------+
property output_columns

Columns generated by the transformation.

transform(dataframe: pyspark.sql.dataframe.DataFrame) → pyspark.sql.dataframe.DataFrame

Performs a transformation to the feature pipeline.

Parameters

dataframe – input dataframe.

Returns

Transformed dataframe.

with_stack()

Add a final Stack step to the transformation.

A new column will be created stacking all the the resolution columns generated by H3. The name of this column will be the parent name of H3Transform, for example, the name of the KeyFeature that has the H3 as a transformation.

butterfree.transform.transformations.h3_transform.define_h3(*args)

UDF for h3 hash retrieval.

butterfree.transform.transformations.h3_transform.lat

latitude column.

butterfree.transform.transformations.h3_transform.lng

longitude column.

butterfree.transform.transformations.h3_transform.resolution

desired h3 resolution.

Spark Function Transform entity.

class butterfree.transform.transformations.spark_function_transform.SparkFunctionTransform(functions: parameters_validation.parameter_validation_decorator.parameter_validation.<locals>.func_partial.<locals>.validation_partial)

Bases: butterfree.transform.transformations.transform_component.TransformComponent

Defines an Spark Function.

function

namedtuple with spark function and data type.

Example

It’s necessary to declare the function method, Any spark and user defined functions are supported.

>>> from butterfree.transform.transformations import SparkFunctionTransform
>>> from butterfree.constants.columns import TIMESTAMP_COLUMN
>>> from butterfree.transform.features import Feature
>>> from butterfree.transform.utils import Function
>>> from butterfree.constants import DataType
>>> from pyspark import SparkContext
>>> from pyspark.sql import session
>>> from pyspark.sql.types import TimestampType
>>> from pyspark.sql import functions
>>> sc = SparkContext.getOrCreate()
>>> spark = session.SparkSession(sc)
>>> df = spark.createDataFrame([(1, "2016-04-11 11:31:11", 200),
...                             (1, "2016-04-11 11:44:12", 300),
...                             (1, "2016-04-11 11:46:24", 400),
...                             (1, "2016-04-11 12:03:21", 500)]
...                           ).toDF("id", "timestamp", "feature")
>>> df = df.withColumn("timestamp", df.timestamp.cast(TimestampType()))
>>> feature = Feature(
...    name="feature",
...    description="spark function transform",
...    transformation=SparkFunctionTransform(
...       functions=[Function(functions.cos, DataType.DOUBLE)],)
...)
>>> feature.transform(df).orderBy("timestamp").show()
+---+-------------------+-------+--------------------+
| id|          timestamp|feature|        feature__cos|
+---+-------------------+-------+--------------------+
|  1|2016-04-11 11:31:11|    200|  0.4871876750070059|
|  1|2016-04-11 11:44:12|    300|-0.02209661927868...|
|  1|2016-04-11 11:46:24|    400|  -0.525296338642536|
|  1|2016-04-11 12:03:21|    500|  -0.883849273431478|
+---+-------------------+-------+--------------------+

We can use this transformation with windows.

>>> feature_row_windows = Feature(
...    name="feature",
...    description="spark function transform with windows",
...    transformation=SparkFunctionTransform(
...       functions=[Function(functions.avg, DataType.DOUBLE)],)
...                    .with_window(partition_by="id",
...                                 mode="row_windows",
...                                 window_definition=["2 events"],
...   )
...)
>>> feature_row_windows.transform(df).orderBy("timestamp").show()
+--------+-----------------------+---------------------------------------+
|feature | id|          timestamp| feature_avg_over_2_events_row_windows|
+--------+---+-------------------+--------------------------------------+
|     200|  1|2016-04-11 11:31:11|                                 200.0|
|     300|  1|2016-04-11 11:44:12|                                 250.0|
|     400|  1|2016-04-11 11:46:24|                                 350.0|
|     500|  1|2016-04-11 12:03:21|                                 450.0|
+--------+---+-------------------+--------------------------------------+

It’s important to notice that transformation doesn’t affect the dataframe granularity.

property output_columns

Columns generated by the transformation.

transform(dataframe)

Performs a transformation to the feature pipeline.

Parameters

dataframe – input dataframe.

Returns

Transformed dataframe.

with_window(partition_by, order_by=None, mode=None, window_definition=None)

Create a list with windows defined.

SQL Expression Transform entity.

class butterfree.transform.transformations.sql_expression_transform.SQLExpressionTransform(expression: parameters_validation.parameter_validation_decorator.parameter_validation.<locals>.func_partial.<locals>.validation_partial)

Bases: butterfree.transform.transformations.transform_component.TransformComponent

Defines an SQL Expression Transformation.

expression

SQL expression defined by the user.

Example

It’s necessary to declare the custom SQL query, such that it corresponds to operations between existing columns in the dataframe. Besides, the usage of SparkSQL functions is also allowed. Finally, a “complete select statement”, such as “select col_a * col_b from my_table”, is not necessary, just the simple operations are required, for instance “col_a / col_b” or “col_a * col_b”.

>>> from butterfree.transform.features import Feature
>>> from butterfree.transform.transformations import SQLExpressionTransform
>>> from pyspark import SparkContext
>>> from pyspark.sql import session
>>> import pyspark.sql.functions as F
>>> sc = SparkContext.getOrCreate()
>>> spark = session.SparkSession(sc)
>>> df = spark.createDataFrame([(1, "2016-04-11 11:31:11", 200, 200),
...                             (1, "2016-04-11 11:44:12", 300, 300),
...                             (1, "2016-04-11 11:46:24", 400, 400),
...                             (1, "2016-04-11 12:03:21", 500, 500)]
...                           ).toDF("id", "timestamp", "feature1", "feature2")
>>> feature = Feature(
...    name="feature",
...    description="SQL expression transform usage example",
...    transformation=SQLExpressionTransform(expression="feature1/feature2"),
...)
>>> feature.transform(df).orderBy("timestamp").show()
+--------+--------+---+-------------------+----------------------+
|feature1|feature2| id|          timestamp|feature1_over_feature2|
+--------+--------+---+-------------------+----------------------+
|     200|     200|  1|2016-04-11 11:31:11|                   1.0|
|     300|     300|  1|2016-04-11 11:44:12|                   1.0|
|     400|     400|  1|2016-04-11 11:46:24|                   1.0|
|     500|     500|  1|2016-04-11 12:03:21|                   1.0|
+--------+--------+---+-------------------+----------------------+
property output_columns

Columns generated by the transformation.

transform(dataframe: pyspark.sql.dataframe.DataFrame) → pyspark.sql.dataframe.DataFrame

Performs a transformation to the feature pipeline.

Parameters

dataframe – input dataframe.

Returns

Transformed dataframe.

Stack Transform entity.

class butterfree.transform.transformations.stack_transform.StackTransform(*columns_names: str, is_regex: bool = False)

Bases: butterfree.transform.transformations.transform_component.TransformComponent

Defines a Stack transformation.

For instantiation it is needed the name of the columns or a pattern to use to find the columns that need to be stacked. This transform generates just one column as output.

columns_names

full names or patterns to search for target columns on the dataframe. By default a single * character is considered a wildcard and can be anywhere in the string, multiple wildcards are not supported. Strings can also start with an ! (exclamation mark), it indicates a negation, be it a regular string or simple pattern. When parameter :param is_regex: is True, simple patterns wildcards and negation are disabled and all strings are interpreted as regular expressions.

is_regex

boolean flag to indicate if columns_names passed are a Python regex string patterns.

Example

>>> from pyspark import SparkContext
>>> from pyspark.sql import session
>>> from butterfree.testing.dataframe import create_df_from_collection
>>> from butterfree.transform.transformations import StackTransform
>>> from butterfree.transform.features import Feature
>>> spark_context = SparkContext.getOrCreate()
>>> spark_session = session.SparkSession(spark_context)
>>> data = [
...    {"feature": 100, "id_a": 1, "id_b": 2},
...    {"feature": 120, "id_a": 3, "id_b": 4},
... ]
>>> df = create_df_from_collection(data, spark_context, spark_session)
>>> df.collect()
[Row(feature=100, id_a=1, id_b=2), Row(feature=120, id_a=3, id_b=4)]
>>> feature = Feature(
...     name="stack_ids",
...     description="id_a and id_b stacked in a single column.",
...     transformation=StackTransform("id_a", "id_b"),
... )
>>> feature.transform(df).collect()
[
    Row(feature=100, id_a=1, id_b=2, stack_ids=1),
    Row(feature=100, id_a=1, id_b=2, stack_ids=2),
    Row(feature=120, id_a=3, id_b=4, stack_ids=3),
    Row(feature=120, id_a=3, id_b=4, stack_ids=4)
]

The StackTransform can be instantiated using a column pattern instead of the columns full names. Like this way:

>>> feature = Feature(
...     name="stack_ids",
...     description="id_a and id_b stacked in a single column.",
...     transformation=StackTransform(columns_prefix="id_*"),
... )
property output_columns

Columns generated by the transformation.

transform(dataframe: pyspark.sql.dataframe.DataFrame) → pyspark.sql.dataframe.DataFrame

Performs a transformation to the feature pipeline.

Parameters

dataframe – input dataframe.

Returns

Transformed dataframe.

Transform Abstract Class.

class butterfree.transform.transformations.transform_component.TransformComponent

Bases: abc.ABC

Defines an abstract class for Transform entities.

parent

parent transform component.

abstract property output_columns

Columns generated by the transformation.

property parent

Parent transform component.

abstract transform(dataframe: pyspark.sql.dataframe.DataFrame) → pyspark.sql.dataframe.DataFrame

Performs a transformation to the feature pipeline.

Parameters

dataframe – input dataframe.

Returns

Transformed dataframe.

Module contents

Holds all transformations to be used by Features.

A transformation must inherit from a TransformComponent and handle data modification, renaming and cast types using parent’s (a Feature) information.

class butterfree.transform.transformations.AggregatedTransform(functions: parameters_validation.parameter_validation_decorator.parameter_validation.<locals>.func_partial.<locals>.validation_partial, filter_expression: str = None)

Bases: butterfree.transform.transformations.transform_component.TransformComponent

Specifies an aggregation.

This transformation needs to be used within an AggregatedFeatureSet. Unlike the other transformations, this class won’t have a transform method implemented.

The idea behing aggregating is that, in spark, we should execute all aggregation functions after a single groupby. So an AggregateFeatureSet will have many Features with AggregatedTransform. If each one of them needs to apply a groupby.agg(), then we must join all the results in the end, making this computation extremely slow.

Now, the AggregateFeatureSet will collect all Features’ AggregatedTransform definitions and run, at once, a groupby.agg(*aggregations).

This class helps defining on a feature, which aggregation function will be applied to build a new aggregated column. Allowed aggregations are registered under the

allowed_aggregations property.

functions

namedtuple with aggregation function and data type.

filter_expression

sql boolean expression to be used inside agg function. The filter expression can be used to aggregate some column only with records that obey certain condition. Has the same behaviour of the following SQL expression: agg(case when filter_expression then col end)

Example

>>> from butterfree.transform.transformations import AggregatedTransform
>>> from butterfree.transform.features import Feature
>>> from butterfree.constants import DataType
>>> from butterfree.transform.utils import Function
>>> import pyspark.sql.functions as F
>>> feature = Feature(
...     name="feature",
...     description="aggregated transform",
...     transformation=AggregatedTransform(
...         functions=[
...                    Function(F.avg, DataType.DOUBLE),
...                    Function(F.stddev_pop, DataType.DOUBLE)],
...     ),
...     from_column="somenumber",
...)
>>> feature.get_output_columns()
['feature__avg', 'feature__stddev_pop']
>>> feature.transform(anydf)
NotImplementedError: ...
property aggregations

Aggregated spark columns.

property output_columns

Columns names generated by the transformation.

transform(dataframe: pyspark.sql.dataframe.DataFrame) → pyspark.sql.dataframe.DataFrame

(NotImplemented) Performs a transformation to the feature pipeline.

For the AggregatedTransform, the transformation won’t be applied without using an AggregatedFeatureSet.

Parameters

dataframe – input dataframe.

Raises

NotImplementedError.

class butterfree.transform.transformations.CustomTransform(transformer: Callable, **kwargs)

Bases: butterfree.transform.transformations.transform_component.TransformComponent

Defines a Custom Transform.

transformer

function to use for transforming the dataframe

\*\*kwargs

kwargs for the transformer

Example

It’s necessary to instantiate the CustomTransform class using a custom method that must always receive a dataframe and the parent feature as arguments and the custom arguments must be passed to the builder through *kwargs.

>>> from butterfree.transform.transformations import CustomTransform
>>> from butterfree.transform.features import Feature
>>> from pyspark import SparkContext
>>> from pyspark.sql import session
>>> import pyspark.sql.functions as F
>>> sc = SparkContext.getOrCreate()
>>> spark = session.SparkSession(sc)
>>> df = spark.createDataFrame([(1, "2016-04-11 11:31:11", 200, 200),
...                             (1, "2016-04-11 11:44:12", 300, 300),
...                             (1, "2016-04-11 11:46:24", 400, 400),
...                             (1, "2016-04-11 12:03:21", 500, 500)]
...                           ).toDF("id", "timestamp", "feature1", "feature2")
>>> def divide(df, parent_feature, column1, column2):
...     name = parent_feature.get_output_columns()[0]
...     df = df.withColumn(name, F.col(column1) / F.col(column2))
...     return df
>>> feature = Feature(
...    name="feature",
...    description="custom transform usage example",
...    transformation=CustomTransform(
...        transformer=divide, column1="feature1", column2="feature2",
...    )
...)
>>> feature.transform(df).orderBy("timestamp").show()
+--------+--------+---+-------------------+--------+
|feature1|feature2| id|          timestamp|feature|
+--------+--------+---+-------------------+--------+
|     200|     200|  1|2016-04-11 11:31:11|    1.0|
|     300|     300|  1|2016-04-11 11:44:12|    1.0|
|     400|     400|  1|2016-04-11 11:46:24|    1.0|
|     500|     500|  1|2016-04-11 12:03:21|    1.0|
+--------+--------+---+-------------------+--------+
property output_columns

Columns generated by transformation.

transform(dataframe: pyspark.sql.dataframe.DataFrame) → pyspark.sql.dataframe.DataFrame

Performs a transformation to the feature pipeline.

Parameters

dataframe – input dataframe to be transformed.

Returns

Transformed dataframe.

property transformer

Function to use for transforming the dataframe.

class butterfree.transform.transformations.SQLExpressionTransform(expression: parameters_validation.parameter_validation_decorator.parameter_validation.<locals>.func_partial.<locals>.validation_partial)

Bases: butterfree.transform.transformations.transform_component.TransformComponent

Defines an SQL Expression Transformation.

expression

SQL expression defined by the user.

Example

It’s necessary to declare the custom SQL query, such that it corresponds to operations between existing columns in the dataframe. Besides, the usage of SparkSQL functions is also allowed. Finally, a “complete select statement”, such as “select col_a * col_b from my_table”, is not necessary, just the simple operations are required, for instance “col_a / col_b” or “col_a * col_b”.

>>> from butterfree.transform.features import Feature
>>> from butterfree.transform.transformations import SQLExpressionTransform
>>> from pyspark import SparkContext
>>> from pyspark.sql import session
>>> import pyspark.sql.functions as F
>>> sc = SparkContext.getOrCreate()
>>> spark = session.SparkSession(sc)
>>> df = spark.createDataFrame([(1, "2016-04-11 11:31:11", 200, 200),
...                             (1, "2016-04-11 11:44:12", 300, 300),
...                             (1, "2016-04-11 11:46:24", 400, 400),
...                             (1, "2016-04-11 12:03:21", 500, 500)]
...                           ).toDF("id", "timestamp", "feature1", "feature2")
>>> feature = Feature(
...    name="feature",
...    description="SQL expression transform usage example",
...    transformation=SQLExpressionTransform(expression="feature1/feature2"),
...)
>>> feature.transform(df).orderBy("timestamp").show()
+--------+--------+---+-------------------+----------------------+
|feature1|feature2| id|          timestamp|feature1_over_feature2|
+--------+--------+---+-------------------+----------------------+
|     200|     200|  1|2016-04-11 11:31:11|                   1.0|
|     300|     300|  1|2016-04-11 11:44:12|                   1.0|
|     400|     400|  1|2016-04-11 11:46:24|                   1.0|
|     500|     500|  1|2016-04-11 12:03:21|                   1.0|
+--------+--------+---+-------------------+----------------------+
property output_columns

Columns generated by the transformation.

transform(dataframe: pyspark.sql.dataframe.DataFrame) → pyspark.sql.dataframe.DataFrame

Performs a transformation to the feature pipeline.

Parameters

dataframe – input dataframe.

Returns

Transformed dataframe.

class butterfree.transform.transformations.SparkFunctionTransform(functions: parameters_validation.parameter_validation_decorator.parameter_validation.<locals>.func_partial.<locals>.validation_partial)

Bases: butterfree.transform.transformations.transform_component.TransformComponent

Defines an Spark Function.

function

namedtuple with spark function and data type.

Example

It’s necessary to declare the function method, Any spark and user defined functions are supported.

>>> from butterfree.transform.transformations import SparkFunctionTransform
>>> from butterfree.constants.columns import TIMESTAMP_COLUMN
>>> from butterfree.transform.features import Feature
>>> from butterfree.transform.utils import Function
>>> from butterfree.constants import DataType
>>> from pyspark import SparkContext
>>> from pyspark.sql import session
>>> from pyspark.sql.types import TimestampType
>>> from pyspark.sql import functions
>>> sc = SparkContext.getOrCreate()
>>> spark = session.SparkSession(sc)
>>> df = spark.createDataFrame([(1, "2016-04-11 11:31:11", 200),
...                             (1, "2016-04-11 11:44:12", 300),
...                             (1, "2016-04-11 11:46:24", 400),
...                             (1, "2016-04-11 12:03:21", 500)]
...                           ).toDF("id", "timestamp", "feature")
>>> df = df.withColumn("timestamp", df.timestamp.cast(TimestampType()))
>>> feature = Feature(
...    name="feature",
...    description="spark function transform",
...    transformation=SparkFunctionTransform(
...       functions=[Function(functions.cos, DataType.DOUBLE)],)
...)
>>> feature.transform(df).orderBy("timestamp").show()
+---+-------------------+-------+--------------------+
| id|          timestamp|feature|        feature__cos|
+---+-------------------+-------+--------------------+
|  1|2016-04-11 11:31:11|    200|  0.4871876750070059|
|  1|2016-04-11 11:44:12|    300|-0.02209661927868...|
|  1|2016-04-11 11:46:24|    400|  -0.525296338642536|
|  1|2016-04-11 12:03:21|    500|  -0.883849273431478|
+---+-------------------+-------+--------------------+

We can use this transformation with windows.

>>> feature_row_windows = Feature(
...    name="feature",
...    description="spark function transform with windows",
...    transformation=SparkFunctionTransform(
...       functions=[Function(functions.avg, DataType.DOUBLE)],)
...                    .with_window(partition_by="id",
...                                 mode="row_windows",
...                                 window_definition=["2 events"],
...   )
...)
>>> feature_row_windows.transform(df).orderBy("timestamp").show()
+--------+-----------------------+---------------------------------------+
|feature | id|          timestamp| feature_avg_over_2_events_row_windows|
+--------+---+-------------------+--------------------------------------+
|     200|  1|2016-04-11 11:31:11|                                 200.0|
|     300|  1|2016-04-11 11:44:12|                                 250.0|
|     400|  1|2016-04-11 11:46:24|                                 350.0|
|     500|  1|2016-04-11 12:03:21|                                 450.0|
+--------+---+-------------------+--------------------------------------+

It’s important to notice that transformation doesn’t affect the dataframe granularity.

property output_columns

Columns generated by the transformation.

transform(dataframe)

Performs a transformation to the feature pipeline.

Parameters

dataframe – input dataframe.

Returns

Transformed dataframe.

with_window(partition_by, order_by=None, mode=None, window_definition=None)

Create a list with windows defined.

class butterfree.transform.transformations.StackTransform(*columns_names: str, is_regex: bool = False)

Bases: butterfree.transform.transformations.transform_component.TransformComponent

Defines a Stack transformation.

For instantiation it is needed the name of the columns or a pattern to use to find the columns that need to be stacked. This transform generates just one column as output.

columns_names

full names or patterns to search for target columns on the dataframe. By default a single * character is considered a wildcard and can be anywhere in the string, multiple wildcards are not supported. Strings can also start with an ! (exclamation mark), it indicates a negation, be it a regular string or simple pattern. When parameter :param is_regex: is True, simple patterns wildcards and negation are disabled and all strings are interpreted as regular expressions.

is_regex

boolean flag to indicate if columns_names passed are a Python regex string patterns.

Example

>>> from pyspark import SparkContext
>>> from pyspark.sql import session
>>> from butterfree.testing.dataframe import create_df_from_collection
>>> from butterfree.transform.transformations import StackTransform
>>> from butterfree.transform.features import Feature
>>> spark_context = SparkContext.getOrCreate()
>>> spark_session = session.SparkSession(spark_context)
>>> data = [
...    {"feature": 100, "id_a": 1, "id_b": 2},
...    {"feature": 120, "id_a": 3, "id_b": 4},
... ]
>>> df = create_df_from_collection(data, spark_context, spark_session)
>>> df.collect()
[Row(feature=100, id_a=1, id_b=2), Row(feature=120, id_a=3, id_b=4)]
>>> feature = Feature(
...     name="stack_ids",
...     description="id_a and id_b stacked in a single column.",
...     transformation=StackTransform("id_a", "id_b"),
... )
>>> feature.transform(df).collect()
[
    Row(feature=100, id_a=1, id_b=2, stack_ids=1),
    Row(feature=100, id_a=1, id_b=2, stack_ids=2),
    Row(feature=120, id_a=3, id_b=4, stack_ids=3),
    Row(feature=120, id_a=3, id_b=4, stack_ids=4)
]

The StackTransform can be instantiated using a column pattern instead of the columns full names. Like this way:

>>> feature = Feature(
...     name="stack_ids",
...     description="id_a and id_b stacked in a single column.",
...     transformation=StackTransform(columns_prefix="id_*"),
... )
property output_columns

Columns generated by the transformation.

transform(dataframe: pyspark.sql.dataframe.DataFrame) → pyspark.sql.dataframe.DataFrame

Performs a transformation to the feature pipeline.

Parameters

dataframe – input dataframe.

Returns

Transformed dataframe.

class butterfree.transform.transformations.TransformComponent

Bases: abc.ABC

Defines an abstract class for Transform entities.

parent

parent transform component.

abstract property output_columns

Columns generated by the transformation.

property parent

Parent transform component.

abstract transform(dataframe: pyspark.sql.dataframe.DataFrame) → pyspark.sql.dataframe.DataFrame

Performs a transformation to the feature pipeline.

Parameters

dataframe – input dataframe.

Returns

Transformed dataframe.