butterfree.pipelines package

Submodules

FeatureSetPipeline entity.

class butterfree.pipelines.feature_set_pipeline.FeatureSetPipeline(source: Source, feature_set: FeatureSet, sink: Sink, spark_client: Optional[SparkClient] = None)

Bases: object

Defines a ETL pipeline for the construction of a feature set.

source

source of the data, the entry point of the pipeline.

feature_set

feature set composed by features and context metadata.

sink

sink used to write the output dataframe in the desired locations.

spark_client

client used to access Spark connection.

Example

This an example regarding the feature set pipeline definition. All sources, feature set (and its features) and writers are defined.

>>> import os
>>> from butterfree.pipelines import FeatureSetPipeline
>>> from butterfree.constants.columns import TIMESTAMP_COLUMN
>>> from butterfree.configs.db import MetastoreConfig
>>> from butterfree.extract import Source
>>> from butterfree.extract.readers import TableReader
>>> from butterfree.transform import FeatureSet
>>> from butterfree.transform.features import (
...     Feature,
...     KeyFeature,
...     TimestampFeature,
...)
>>> from butterfree.transform.transformations import (
...     SparkFunctionTransform,
...     CustomTransform,
... )
>>> from butterfree.load import Sink
>>> from butterfree.load.writers import HistoricalFeatureStoreWriter
>>> from pyspark.sql import functions
>>> def divide(df, fs, column1, column2):
...     name = fs.get_output_columns()[0]
...     df = df.withColumn(name,
...            functions.col(column1) / functions.col(column2))
...     return df
>>> pipeline = FeatureSetPipeline(
...    source=Source(
...        readers=[
...            TableReader(
...                id="table_reader_id",
...                database="table_reader_db",
...                table="table_reader_table",
...            ),
...        ],
...        query=f"select * from table_reader_id ",
...    ),
...    feature_set=FeatureSet(
...        name="feature_set",
...        entity="entity",
...        description="description",
...        features=[
...            Feature(
...                name="feature1",
...                description="test",
...            transformation=SparkFunctionTransform(
...                 functions=[Function(functions.avg, DataType.DOUBLE),
...                         Function(functions.stddev_pop, DataType.DOUBLE)],
...             ).with_window(
...                 partition_by="id",
...                 order_by=TIMESTAMP_COLUMN,
...                 mode="fixed_windows",
...                 window_definition=["2 minutes", "15 minutes"],
...             ),
...            ),
...            Feature(
...                name="divided_feature",
...                description="unit test",
...                transformation=CustomTransform(
...                    transformer=divide,
...                    column1="feature1",
...                    column2="feature2",
...                ),
...            ),
...        ],
...        keys=[
...            KeyFeature(
...                name="id",
...                description="The user's Main ID or device ID"
...            )
...        ],
...        timestamp=TimestampFeature(),
...    ),
...    sink=Sink(
...         writers=[
...            HistoricalFeatureStoreWriter(
...                db_config=MetastoreConfig(
...                    format_="parquet",
...                    path=os.path.join(
...                        os.path.dirname(os.path.abspath(__file__))
...                    ),
...                ),
...            )
...        ],
...    ),
...)
>>> pipeline.run()

This last method (run) will execute the pipeline flow, it’ll read from the defined sources, compute all the transformations and save the data to the specified locations.

We can run the pipeline over a range of dates by passing an end-date

and a start-date, where it will only bring data within this date range.

>>> pipeline.run(end_date="2020-08-04", start_date="2020-07-04")

Or run up to a date, where it will only bring data up to the specific date.

>>> pipeline.run(end_date="2020-08-04")

Or just a specific date, where you will only bring data for that day.

>>> pipeline.run_for_date(execution_date="2020-08-04")
property feature_set: FeatureSet

Feature set composed by features and context metadata.

run(end_date: Optional[str] = None, partition_by: Optional[List[str]] = None, order_by: Optional[List[str]] = None, num_processors: Optional[int] = None, start_date: Optional[str] = None) None

Runs the defined feature set pipeline.

The pipeline consists in the following steps: - Constructs the input dataframe from the data source. - Construct the feature set dataframe using the defined Features. - Load the data to the configured sink locations.

It’s important to notice, however, that both parameters partition_by and num_processors are WIP, we intend to enhance their functionality soon. Use only if strictly necessary.

run_for_date(execution_date: Optional[str] = None, partition_by: Optional[List[str]] = None, order_by: Optional[List[str]] = None, num_processors: Optional[int] = None) None

Runs the defined feature set pipeline for a specific date.

The pipeline consists in the following steps:

  • Constructs the input dataframe from the data source.

  • Construct the feature set dataframe using the defined Features.

  • Load the data to the configured sink locations.

It’s important to notice, however, that both parameters partition_by and num_processors are WIP, we intend to enhance their functionality soon. Use only if strictly necessary.

property sink: Sink

Sink used to write the output dataframe in the desired locations.

property source: Source

Source of the data, the entry point of the pipeline.

property spark_client: SparkClient

Client used to access Spark connection.

Module contents

ETL Pipelines.

class butterfree.pipelines.FeatureSetPipeline(source: Source, feature_set: FeatureSet, sink: Sink, spark_client: Optional[SparkClient] = None)

Bases: object

Defines a ETL pipeline for the construction of a feature set.

source

source of the data, the entry point of the pipeline.

feature_set

feature set composed by features and context metadata.

sink

sink used to write the output dataframe in the desired locations.

spark_client

client used to access Spark connection.

Example

This an example regarding the feature set pipeline definition. All sources, feature set (and its features) and writers are defined.

>>> import os
>>> from butterfree.pipelines import FeatureSetPipeline
>>> from butterfree.constants.columns import TIMESTAMP_COLUMN
>>> from butterfree.configs.db import MetastoreConfig
>>> from butterfree.extract import Source
>>> from butterfree.extract.readers import TableReader
>>> from butterfree.transform import FeatureSet
>>> from butterfree.transform.features import (
...     Feature,
...     KeyFeature,
...     TimestampFeature,
...)
>>> from butterfree.transform.transformations import (
...     SparkFunctionTransform,
...     CustomTransform,
... )
>>> from butterfree.load import Sink
>>> from butterfree.load.writers import HistoricalFeatureStoreWriter
>>> from pyspark.sql import functions
>>> def divide(df, fs, column1, column2):
...     name = fs.get_output_columns()[0]
...     df = df.withColumn(name,
...            functions.col(column1) / functions.col(column2))
...     return df
>>> pipeline = FeatureSetPipeline(
...    source=Source(
...        readers=[
...            TableReader(
...                id="table_reader_id",
...                database="table_reader_db",
...                table="table_reader_table",
...            ),
...        ],
...        query=f"select * from table_reader_id ",
...    ),
...    feature_set=FeatureSet(
...        name="feature_set",
...        entity="entity",
...        description="description",
...        features=[
...            Feature(
...                name="feature1",
...                description="test",
...            transformation=SparkFunctionTransform(
...                 functions=[Function(functions.avg, DataType.DOUBLE),
...                         Function(functions.stddev_pop, DataType.DOUBLE)],
...             ).with_window(
...                 partition_by="id",
...                 order_by=TIMESTAMP_COLUMN,
...                 mode="fixed_windows",
...                 window_definition=["2 minutes", "15 minutes"],
...             ),
...            ),
...            Feature(
...                name="divided_feature",
...                description="unit test",
...                transformation=CustomTransform(
...                    transformer=divide,
...                    column1="feature1",
...                    column2="feature2",
...                ),
...            ),
...        ],
...        keys=[
...            KeyFeature(
...                name="id",
...                description="The user's Main ID or device ID"
...            )
...        ],
...        timestamp=TimestampFeature(),
...    ),
...    sink=Sink(
...         writers=[
...            HistoricalFeatureStoreWriter(
...                db_config=MetastoreConfig(
...                    format_="parquet",
...                    path=os.path.join(
...                        os.path.dirname(os.path.abspath(__file__))
...                    ),
...                ),
...            )
...        ],
...    ),
...)
>>> pipeline.run()

This last method (run) will execute the pipeline flow, it’ll read from the defined sources, compute all the transformations and save the data to the specified locations.

We can run the pipeline over a range of dates by passing an end-date

and a start-date, where it will only bring data within this date range.

>>> pipeline.run(end_date="2020-08-04", start_date="2020-07-04")

Or run up to a date, where it will only bring data up to the specific date.

>>> pipeline.run(end_date="2020-08-04")

Or just a specific date, where you will only bring data for that day.

>>> pipeline.run_for_date(execution_date="2020-08-04")
property feature_set: FeatureSet

Feature set composed by features and context metadata.

run(end_date: Optional[str] = None, partition_by: Optional[List[str]] = None, order_by: Optional[List[str]] = None, num_processors: Optional[int] = None, start_date: Optional[str] = None) None

Runs the defined feature set pipeline.

The pipeline consists in the following steps: - Constructs the input dataframe from the data source. - Construct the feature set dataframe using the defined Features. - Load the data to the configured sink locations.

It’s important to notice, however, that both parameters partition_by and num_processors are WIP, we intend to enhance their functionality soon. Use only if strictly necessary.

run_for_date(execution_date: Optional[str] = None, partition_by: Optional[List[str]] = None, order_by: Optional[List[str]] = None, num_processors: Optional[int] = None) None

Runs the defined feature set pipeline for a specific date.

The pipeline consists in the following steps:

  • Constructs the input dataframe from the data source.

  • Construct the feature set dataframe using the defined Features.

  • Load the data to the configured sink locations.

It’s important to notice, however, that both parameters partition_by and num_processors are WIP, we intend to enhance their functionality soon. Use only if strictly necessary.

property sink: Sink

Sink used to write the output dataframe in the desired locations.

property source: Source

Source of the data, the entry point of the pipeline.

property spark_client: SparkClient

Client used to access Spark connection.