butterfree.extract package

Subpackages

Submodules

Holds the SourceSelector class.

class butterfree.extract.source.Source(readers: List[Reader], query: str)

Bases: HookableComponent

The definition of the the entry point data for the ETL pipeline.

A FeatureSet (the next step in the pipeline) expects a single dataframe as input. This dataframe is built from a data composition of one or more readers defined in the Source. There is only one Source for pipeline.

TODO refactor query into multiple query components TODO make it harder to do query injection

readers

list of readers from where the source will get data.

query

Spark SQL query to run against the readers.

Example

Simple example regarding Source class instantiation.

>>> from butterfree.extract import Source
>>> from butterfree.extract.readers import TableReader, FileReader
>>> from butterfree.clients import SparkClient
>>> spark_client = SparkClient()
>>> source = Source(
...    readers=[
...        TableReader(
...            id="table_reader_id",
...            database="table_reader_db",
...            table="table_reader_table",
...        ),
...        FileReader(id="file_reader_id", path="data_sample_path", format="json"),
...    ],
...    query=f"select a.*, b.feature2 "
...    f"from table_reader_id a "
...    f"inner join file_reader_id b on a.id = b.id ",
...)
>>> df = source.construct(spark_client)

This last method will use the Spark Client, as default, to create temporary views regarding each reader and, after, will run the desired query and return a dataframe.

construct(client: SparkClient, start_date: Optional[str] = None, end_date: Optional[str] = None) DataFrame

Construct an entry point dataframe for a feature set.

This method will assemble multiple readers, by building each one and querying them using a Spark SQL. It’s important to highlight that in order to filter a dataframe regarding date boundaries, it’s important to define a IncrementalStrategy, otherwise your data will not be filtered. Besides, both start and end dates parameters are optional.

After that, there’s the caching of the dataframe, however since cache() in Spark is lazy, an action is triggered in order to force persistence.

Parameters:
  • client – client responsible for connecting to Spark session.

  • start_date – user defined start date for filtering.

  • end_date – user defined end date for filtering.

Returns:

DataFrame with the query result against all readers.

Module contents

The Source Component of a Feature Set.

class butterfree.extract.Source(readers: List[Reader], query: str)

Bases: HookableComponent

The definition of the the entry point data for the ETL pipeline.

A FeatureSet (the next step in the pipeline) expects a single dataframe as input. This dataframe is built from a data composition of one or more readers defined in the Source. There is only one Source for pipeline.

TODO refactor query into multiple query components TODO make it harder to do query injection

readers

list of readers from where the source will get data.

query

Spark SQL query to run against the readers.

Example

Simple example regarding Source class instantiation.

>>> from butterfree.extract import Source
>>> from butterfree.extract.readers import TableReader, FileReader
>>> from butterfree.clients import SparkClient
>>> spark_client = SparkClient()
>>> source = Source(
...    readers=[
...        TableReader(
...            id="table_reader_id",
...            database="table_reader_db",
...            table="table_reader_table",
...        ),
...        FileReader(id="file_reader_id", path="data_sample_path", format="json"),
...    ],
...    query=f"select a.*, b.feature2 "
...    f"from table_reader_id a "
...    f"inner join file_reader_id b on a.id = b.id ",
...)
>>> df = source.construct(spark_client)

This last method will use the Spark Client, as default, to create temporary views regarding each reader and, after, will run the desired query and return a dataframe.

construct(client: SparkClient, start_date: Optional[str] = None, end_date: Optional[str] = None) DataFrame

Construct an entry point dataframe for a feature set.

This method will assemble multiple readers, by building each one and querying them using a Spark SQL. It’s important to highlight that in order to filter a dataframe regarding date boundaries, it’s important to define a IncrementalStrategy, otherwise your data will not be filtered. Besides, both start and end dates parameters are optional.

After that, there’s the caching of the dataframe, however since cache() in Spark is lazy, an action is triggered in order to force persistence.

Parameters:
  • client – client responsible for connecting to Spark session.

  • start_date – user defined start date for filtering.

  • end_date – user defined end date for filtering.

Returns:

DataFrame with the query result against all readers.