butterfree.extract.readers package


FileReader entity.

class butterfree.extract.readers.file_reader.FileReader(id: str, path: str, format: str, schema: Optional[StructType] = None, format_options: Optional[Dict[Any, Any]] = None, stream: bool = False)

Bases: Reader

Responsible for get data from files.


unique string id for register the reader as a view on the metastore.


file location.


can be one of the keys: json, parquet, orc, or csv.


an optional pyspark.sql.types.StructType for the input schema.


additional options required by some formats. Check docs: https://spark.apache.org/docs/latest/sql-data-sources-load-save-functions.html#manually-specifying-options


Simple example regarding FileReader class instantiation.

>>> from butterfree.extract.readers import FileReader
>>> from butterfree.clients import SparkClient
>>> from butterfree.extract.pre_processing import filter
>>> spark_client = SparkClient()
>>> file_reader = FileReader(
...                 id="file_reader_id",
...                 path="data_path",
...                 format="json"
...               )
>>> df = file_reader.consume(spark_client)

However, we can define the schema and format_options, like header, and provide them to FileReader.

>>> spark_client = SparkClient()
>>> schema_csv = StructType([
...            StructField("column_a", LongType()),
...            StructField("column_b", DoubleType()),
...            StructField("coumn_c", StringType())
...          ])
>>> file_reader = FileReader(
...                 id="file_reader_id",
...                 path="data_path",
...                 format="csv",
...                 schema=schema_csv,
...                 format_options={
...                    "header": True
...                 }
...               )
>>> df = file_reader.consume(spark_client)

This last method will use the Spark Client, as default, to read the desired file, loading data into a dataframe, according to FileReader class arguments.

It’s also possible to define simple transformations within the reader’s scope:

>>> file_reader.with_(filter, condition="year = 2019").build(spark_client)

In this case, however, a temp view will be created, cointaining the transformed data.

consume(client: SparkClient) DataFrame

Extract data from files stored in defined path.

Try to auto-infer schema if in stream mode and not manually defining a schema.


client – client responsible for connecting to Spark session.


Dataframe with all the files data.

KafkaSource entity.

class butterfree.extract.readers.kafka_reader.KafkaReader(id: str, topic: str, value_schema: StructType, connection_string: Optional[str] = None, topic_options: Optional[Dict[Any, Any]] = None, stream: bool = True)

Bases: Reader

Responsible for get data from a Kafka topic.


unique string id for register the reader as a view on the metastore


expected schema of the default column named “value” from Kafka.


string with the Kafka topic name to subscribe.


string with hosts and ports to connect. The string need to be in the format: host1:port,host2:port,…,hostN:portN. The argument is not necessary if is passed as a environment variable named KAFKA_CONSUMER_CONNECTION_STRING.


additional options for consuming from topic. See docs: https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html.


flag to indicate the reading mode: stream or batch

The default df schema coming from Kafka reader of Spark is the following:

key:string value:string topic:string partition:integer offset:long timestamp:timestamp timestampType:integer

But using this reader and passing the desired schema under value_schema we would have the following result:

With value_schema declared as:

>>> value_schema = StructType(
...     [
...         StructField("ts", LongType(), nullable=True),
...         StructField("id", LongType(), nullable=True),
...         StructField("type", StringType(), nullable=True),
...     ]
... )
The output df schema would be:

ts:long id:long type:string kafka_metadata:struct

key:string topic:string value:string partition:integer offset:long timestamp:timestamp timestampType:integer

Instantiation example:

>>> from butterfree.extract.readers import KafkaReader
>>> from butterfree.clients import SparkClient
>>> from pyspark.sql.types import StructType, StructField, StringType, LongType
>>> spark_client = SparkClient()
>>> value_schema = StructType(
...     [
...         StructField("ts", LongType(), nullable=True),
...         StructField("id", LongType(), nullable=True),
...         StructField("type", StringType(), nullable=True),
...     ]
... )
>>> kafka_reader = KafkaReader(
...                 id="kafka_reader_id",
...                 topic="topic",
...                 value_schema=value_schema
...                 connection_string="host1:port,host2:port",
...                )
>>> df = kafka_reader.consume(spark_client)

This last method will use the Spark Client, as default, to read the desired topic, loading data into a dataframe, according to KafkaReader class arguments.

In this case, however, a temp view will be created, containing the transformed data.

KAFKA_COLUMNS = ['key', 'topic', 'value', 'partition', 'offset', 'timestamp', 'timestampType']
consume(client: SparkClient) DataFrame

Extract data from a kafka topic.

When stream mode it will get all the new data arriving at the topic in a streaming dataframe. When not in stream mode it will get all data available in the kafka topic.


client – client responsible for connecting to Spark session.


Dataframe with data from topic.

Reader entity.

class butterfree.extract.readers.reader.Reader(id: str, incremental_strategy: Optional[IncrementalStrategy] = None)

Bases: ABC, HookableComponent

Abstract base class for Readers.


unique string id for register the reader as a view on the metastore.


list os methods that will be applied over the dataframe after the raw data is extracted.

build(client: SparkClient, columns: Optional[List[Any]] = None, start_date: Optional[str] = None, end_date: Optional[str] = None) None

Register the data got from the reader in the Spark metastore.

Create a temporary view in Spark metastore referencing the data extracted from the target origin after the application of all the defined pre-processing transformations.

The arguments start_date and end_date are going to be use only when there is a defined IncrementalStrategy on the Reader.

  • client – client responsible for connecting to Spark session.

  • columns – list of tuples for selecting/renaming columns on the df.

  • start_date – lower bound to use in the filter expression.

  • end_date – upper bound to use in the filter expression.

abstract consume(client: SparkClient) DataFrame

Extract data from target origin.


client – client responsible for connecting to Spark session.


Dataframe with all the data.


Spark dataframe

with_(transformer: Callable[[...], DataFrame], *args: Any, **kwargs: Any) Any

Define a new transformation for the Reader.

All the transformations are used when the method consume is called.

  • transformer – method that receives a dataframe and output a dataframe.

  • *args – args for the transformer.

  • **kwargs – kwargs for the transformer.


Reader object with new transformation

with_incremental_strategy(incremental_strategy: IncrementalStrategy) Reader

Define the incremental strategy for the Reader.


incremental_strategy – definition of the incremental strategy.


Reader with defined incremental strategy.

TableSource entity.

class butterfree.extract.readers.table_reader.TableReader(id: str, table: str, database: Optional[str] = None)

Bases: Reader

Responsible for get data from tables registered in the metastore.


unique string id for register the reader as a view on the metastore.


name of the metastore database/schema.


name of the table.


Simple example regarding TableReader class instantiation.

>>> from butterfree.extract.readers import TableReader
>>> from butterfree.clients import SparkClient
>>> from butterfree.extract.pre_processing import filter
>>> spark_client = SparkClient()
>>> table_reader = TableReader(
...                     id="table_reader_id",
...                     database="table_reader_db",
...                     table="table_reader_table"
...                )
>>> df = table_reader.consume(spark_client)

This last method will use the Spark Client, as default, to read the desired table, loading data into a dataframe, according to TableReader class arguments.

It’s also possible to define simple transformations within the reader’s scope:

>>> table_reader.with_(filter, condition="year = 2019").build(spark_client)

In this case, however, a temp view will be created, cointaining the transformed data.

consume(client: SparkClient) DataFrame

Extract data from a table in Spark metastore.


client – client responsible for connecting to Spark session.


Dataframe with all the data from the table.

Module contents

The Reader Component of a Source.

class butterfree.extract.readers.FileReader(id: str, path: str, format: str, schema: Optional[StructType] = None, format_options: Optional[Dict[Any, Any]] = None, stream: bool = False)

Bases: Reader

Responsible for get data from files.


unique string id for register the reader as a view on the metastore.


file location.


can be one of the keys: json, parquet, orc, or csv.


an optional pyspark.sql.types.StructType for the input schema.


additional options required by some formats. Check docs: https://spark.apache.org/docs/latest/sql-data-sources-load-save-functions.html#manually-specifying-options


Simple example regarding FileReader class instantiation.

>>> from butterfree.extract.readers import FileReader
>>> from butterfree.clients import SparkClient
>>> from butterfree.extract.pre_processing import filter
>>> spark_client = SparkClient()
>>> file_reader = FileReader(
...                 id="file_reader_id",
...                 path="data_path",
...                 format="json"
...               )
>>> df = file_reader.consume(spark_client)

However, we can define the schema and format_options, like header, and provide them to FileReader.

>>> spark_client = SparkClient()
>>> schema_csv = StructType([
...            StructField("column_a", LongType()),
...            StructField("column_b", DoubleType()),
...            StructField("coumn_c", StringType())
...          ])
>>> file_reader = FileReader(
...                 id="file_reader_id",
...                 path="data_path",
...                 format="csv",
...                 schema=schema_csv,
...                 format_options={
...                    "header": True
...                 }
...               )
>>> df = file_reader.consume(spark_client)

This last method will use the Spark Client, as default, to read the desired file, loading data into a dataframe, according to FileReader class arguments.

It’s also possible to define simple transformations within the reader’s scope:

>>> file_reader.with_(filter, condition="year = 2019").build(spark_client)

In this case, however, a temp view will be created, cointaining the transformed data.

consume(client: SparkClient) DataFrame

Extract data from files stored in defined path.

Try to auto-infer schema if in stream mode and not manually defining a schema.


client – client responsible for connecting to Spark session.


Dataframe with all the files data.

class butterfree.extract.readers.KafkaReader(id: str, topic: str, value_schema: StructType, connection_string: Optional[str] = None, topic_options: Optional[Dict[Any, Any]] = None, stream: bool = True)

Bases: Reader

Responsible for get data from a Kafka topic.


unique string id for register the reader as a view on the metastore


expected schema of the default column named “value” from Kafka.


string with the Kafka topic name to subscribe.


string with hosts and ports to connect. The string need to be in the format: host1:port,host2:port,…,hostN:portN. The argument is not necessary if is passed as a environment variable named KAFKA_CONSUMER_CONNECTION_STRING.


additional options for consuming from topic. See docs: https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html.


flag to indicate the reading mode: stream or batch

The default df schema coming from Kafka reader of Spark is the following:

key:string value:string topic:string partition:integer offset:long timestamp:timestamp timestampType:integer

But using this reader and passing the desired schema under value_schema we would have the following result:

With value_schema declared as:

>>> value_schema = StructType(
...     [
...         StructField("ts", LongType(), nullable=True),
...         StructField("id", LongType(), nullable=True),
...         StructField("type", StringType(), nullable=True),
...     ]
... )
The output df schema would be:

ts:long id:long type:string kafka_metadata:struct

key:string topic:string value:string partition:integer offset:long timestamp:timestamp timestampType:integer

Instantiation example:

>>> from butterfree.extract.readers import KafkaReader
>>> from butterfree.clients import SparkClient
>>> from pyspark.sql.types import StructType, StructField, StringType, LongType
>>> spark_client = SparkClient()
>>> value_schema = StructType(
...     [
...         StructField("ts", LongType(), nullable=True),
...         StructField("id", LongType(), nullable=True),
...         StructField("type", StringType(), nullable=True),
...     ]
... )
>>> kafka_reader = KafkaReader(
...                 id="kafka_reader_id",
...                 topic="topic",
...                 value_schema=value_schema
...                 connection_string="host1:port,host2:port",
...                )
>>> df = kafka_reader.consume(spark_client)

This last method will use the Spark Client, as default, to read the desired topic, loading data into a dataframe, according to KafkaReader class arguments.

In this case, however, a temp view will be created, containing the transformed data.

KAFKA_COLUMNS = ['key', 'topic', 'value', 'partition', 'offset', 'timestamp', 'timestampType']
consume(client: SparkClient) DataFrame

Extract data from a kafka topic.

When stream mode it will get all the new data arriving at the topic in a streaming dataframe. When not in stream mode it will get all data available in the kafka topic.


client – client responsible for connecting to Spark session.


Dataframe with data from topic.

transformations: List[Dict[str, Any]]
class butterfree.extract.readers.TableReader(id: str, table: str, database: Optional[str] = None)

Bases: Reader

Responsible for get data from tables registered in the metastore.


unique string id for register the reader as a view on the metastore.


name of the metastore database/schema.


name of the table.


Simple example regarding TableReader class instantiation.

>>> from butterfree.extract.readers import TableReader
>>> from butterfree.clients import SparkClient
>>> from butterfree.extract.pre_processing import filter
>>> spark_client = SparkClient()
>>> table_reader = TableReader(
...                     id="table_reader_id",
...                     database="table_reader_db",
...                     table="table_reader_table"
...                )
>>> df = table_reader.consume(spark_client)

This last method will use the Spark Client, as default, to read the desired table, loading data into a dataframe, according to TableReader class arguments.

It’s also possible to define simple transformations within the reader’s scope:

>>> table_reader.with_(filter, condition="year = 2019").build(spark_client)

In this case, however, a temp view will be created, cointaining the transformed data.

consume(client: SparkClient) DataFrame

Extract data from a table in Spark metastore.


client – client responsible for connecting to Spark session.


Dataframe with all the data from the table.