butterfree.extract.readers package

Submodules

FileReader entity.

class butterfree.extract.readers.file_reader.FileReader(id: str, path: str, format: str, schema: Optional[StructType] = None, format_options: Optional[Dict[Any, Any]] = None, stream: bool = False)

Bases: Reader

Responsible for get data from files.

id

unique string id for register the reader as a view on the metastore.

path

file location.

format

can be one of the keys: json, parquet, orc, or csv.

schema

an optional pyspark.sql.types.StructType for the input schema.

format_options

additional options required by some formats. Check docs: https://spark.apache.org/docs/latest/sql-data-sources-load-save-functions.html#manually-specifying-options

Example

Simple example regarding FileReader class instantiation.

>>> from butterfree.extract.readers import FileReader
>>> from butterfree.clients import SparkClient
>>> from butterfree.extract.pre_processing import filter
>>> spark_client = SparkClient()
>>> file_reader = FileReader(
...                 id="file_reader_id",
...                 path="data_path",
...                 format="json"
...               )
>>> df = file_reader.consume(spark_client)

However, we can define the schema and format_options, like header, and provide them to FileReader.

>>> spark_client = SparkClient()
>>> schema_csv = StructType([
...            StructField("column_a", LongType()),
...            StructField("column_b", DoubleType()),
...            StructField("coumn_c", StringType())
...          ])
>>> file_reader = FileReader(
...                 id="file_reader_id",
...                 path="data_path",
...                 format="csv",
...                 schema=schema_csv,
...                 format_options={
...                    "header": True
...                 }
...               )
>>> df = file_reader.consume(spark_client)

This last method will use the Spark Client, as default, to read the desired file, loading data into a dataframe, according to FileReader class arguments.

It’s also possible to define simple transformations within the reader’s scope:

>>> file_reader.with_(filter, condition="year = 2019").build(spark_client)

In this case, however, a temp view will be created, cointaining the transformed data.

consume(client: SparkClient) DataFrame

Extract data from files stored in defined path.

Try to auto-infer schema if in stream mode and not manually defining a schema.

Parameters:

client – client responsible for connecting to Spark session.

Returns:

Dataframe with all the files data.

KafkaSource entity.

class butterfree.extract.readers.kafka_reader.KafkaReader(id: str, topic: str, value_schema: StructType, connection_string: Optional[str] = None, topic_options: Optional[Dict[Any, Any]] = None, stream: bool = True)

Bases: Reader

Responsible for get data from a Kafka topic.

id

unique string id for register the reader as a view on the metastore

value_schema

expected schema of the default column named “value” from Kafka.

topic

string with the Kafka topic name to subscribe.

connection_string

string with hosts and ports to connect. The string need to be in the format: host1:port,host2:port,…,hostN:portN. The argument is not necessary if is passed as a environment variable named KAFKA_CONSUMER_CONNECTION_STRING.

topic_options

additional options for consuming from topic. See docs: https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html.

stream

flag to indicate the reading mode: stream or batch

The default df schema coming from Kafka reader of Spark is the following:

key:string value:string topic:string partition:integer offset:long timestamp:timestamp timestampType:integer

But using this reader and passing the desired schema under value_schema we would have the following result:

With value_schema declared as:

>>> value_schema = StructType(
...     [
...         StructField("ts", LongType(), nullable=True),
...         StructField("id", LongType(), nullable=True),
...         StructField("type", StringType(), nullable=True),
...     ]
... )
The output df schema would be:

ts:long id:long type:string kafka_metadata:struct

key:string topic:string value:string partition:integer offset:long timestamp:timestamp timestampType:integer

Instantiation example:

>>> from butterfree.extract.readers import KafkaReader
>>> from butterfree.clients import SparkClient
>>> from pyspark.sql.types import StructType, StructField, StringType, LongType
>>> spark_client = SparkClient()
>>> value_schema = StructType(
...     [
...         StructField("ts", LongType(), nullable=True),
...         StructField("id", LongType(), nullable=True),
...         StructField("type", StringType(), nullable=True),
...     ]
... )
>>> kafka_reader = KafkaReader(
...                 id="kafka_reader_id",
...                 topic="topic",
...                 value_schema=value_schema
...                 connection_string="host1:port,host2:port",
...                )
>>> df = kafka_reader.consume(spark_client)

This last method will use the Spark Client, as default, to read the desired topic, loading data into a dataframe, according to KafkaReader class arguments.

In this case, however, a temp view will be created, containing the transformed data.

KAFKA_COLUMNS = ['key', 'topic', 'value', 'partition', 'offset', 'timestamp', 'timestampType']
consume(client: SparkClient) DataFrame

Extract data from a kafka topic.

When stream mode it will get all the new data arriving at the topic in a streaming dataframe. When not in stream mode it will get all data available in the kafka topic.

Parameters:

client – client responsible for connecting to Spark session.

Returns:

Dataframe with data from topic.

Reader entity.

class butterfree.extract.readers.reader.Reader(id: str, incremental_strategy: Optional[IncrementalStrategy] = None)

Bases: ABC, HookableComponent

Abstract base class for Readers.

id

unique string id for register the reader as a view on the metastore.

transformations

list os methods that will be applied over the dataframe after the raw data is extracted.

build(client: SparkClient, columns: Optional[List[Any]] = None, start_date: Optional[str] = None, end_date: Optional[str] = None) None

Register the data got from the reader in the Spark metastore.

Create a temporary view in Spark metastore referencing the data extracted from the target origin after the application of all the defined pre-processing transformations.

The arguments start_date and end_date are going to be use only when there is a defined IncrementalStrategy on the Reader.

Parameters:
  • client – client responsible for connecting to Spark session.

  • columns – list of tuples for selecting/renaming columns on the df.

  • start_date – lower bound to use in the filter expression.

  • end_date – upper bound to use in the filter expression.

abstract consume(client: SparkClient) DataFrame

Extract data from target origin.

Parameters:

client – client responsible for connecting to Spark session.

Returns:

Dataframe with all the data.

Returns:

Spark dataframe

with_(transformer: Callable[[...], DataFrame], *args: Any, **kwargs: Any) Any

Define a new transformation for the Reader.

All the transformations are used when the method consume is called.

Parameters:
  • transformer – method that receives a dataframe and output a dataframe.

  • *args – args for the transformer.

  • **kwargs – kwargs for the transformer.

Returns:

Reader object with new transformation

with_incremental_strategy(incremental_strategy: IncrementalStrategy) Reader

Define the incremental strategy for the Reader.

Parameters:

incremental_strategy – definition of the incremental strategy.

Returns:

Reader with defined incremental strategy.

TableSource entity.

class butterfree.extract.readers.table_reader.TableReader(id: str, table: str, database: Optional[str] = None)

Bases: Reader

Responsible for get data from tables registered in the metastore.

id

unique string id for register the reader as a view on the metastore.

database

name of the metastore database/schema.

table

name of the table.

Example

Simple example regarding TableReader class instantiation.

>>> from butterfree.extract.readers import TableReader
>>> from butterfree.clients import SparkClient
>>> from butterfree.extract.pre_processing import filter
>>> spark_client = SparkClient()
>>> table_reader = TableReader(
...                     id="table_reader_id",
...                     database="table_reader_db",
...                     table="table_reader_table"
...                )
>>> df = table_reader.consume(spark_client)

This last method will use the Spark Client, as default, to read the desired table, loading data into a dataframe, according to TableReader class arguments.

It’s also possible to define simple transformations within the reader’s scope:

>>> table_reader.with_(filter, condition="year = 2019").build(spark_client)

In this case, however, a temp view will be created, cointaining the transformed data.

consume(client: SparkClient) DataFrame

Extract data from a table in Spark metastore.

Parameters:

client – client responsible for connecting to Spark session.

Returns:

Dataframe with all the data from the table.

Module contents

The Reader Component of a Source.

class butterfree.extract.readers.FileReader(id: str, path: str, format: str, schema: Optional[StructType] = None, format_options: Optional[Dict[Any, Any]] = None, stream: bool = False)

Bases: Reader

Responsible for get data from files.

id

unique string id for register the reader as a view on the metastore.

path

file location.

format

can be one of the keys: json, parquet, orc, or csv.

schema

an optional pyspark.sql.types.StructType for the input schema.

format_options

additional options required by some formats. Check docs: https://spark.apache.org/docs/latest/sql-data-sources-load-save-functions.html#manually-specifying-options

Example

Simple example regarding FileReader class instantiation.

>>> from butterfree.extract.readers import FileReader
>>> from butterfree.clients import SparkClient
>>> from butterfree.extract.pre_processing import filter
>>> spark_client = SparkClient()
>>> file_reader = FileReader(
...                 id="file_reader_id",
...                 path="data_path",
...                 format="json"
...               )
>>> df = file_reader.consume(spark_client)

However, we can define the schema and format_options, like header, and provide them to FileReader.

>>> spark_client = SparkClient()
>>> schema_csv = StructType([
...            StructField("column_a", LongType()),
...            StructField("column_b", DoubleType()),
...            StructField("coumn_c", StringType())
...          ])
>>> file_reader = FileReader(
...                 id="file_reader_id",
...                 path="data_path",
...                 format="csv",
...                 schema=schema_csv,
...                 format_options={
...                    "header": True
...                 }
...               )
>>> df = file_reader.consume(spark_client)

This last method will use the Spark Client, as default, to read the desired file, loading data into a dataframe, according to FileReader class arguments.

It’s also possible to define simple transformations within the reader’s scope:

>>> file_reader.with_(filter, condition="year = 2019").build(spark_client)

In this case, however, a temp view will be created, cointaining the transformed data.

consume(client: SparkClient) DataFrame

Extract data from files stored in defined path.

Try to auto-infer schema if in stream mode and not manually defining a schema.

Parameters:

client – client responsible for connecting to Spark session.

Returns:

Dataframe with all the files data.

class butterfree.extract.readers.KafkaReader(id: str, topic: str, value_schema: StructType, connection_string: Optional[str] = None, topic_options: Optional[Dict[Any, Any]] = None, stream: bool = True)

Bases: Reader

Responsible for get data from a Kafka topic.

id

unique string id for register the reader as a view on the metastore

value_schema

expected schema of the default column named “value” from Kafka.

topic

string with the Kafka topic name to subscribe.

connection_string

string with hosts and ports to connect. The string need to be in the format: host1:port,host2:port,…,hostN:portN. The argument is not necessary if is passed as a environment variable named KAFKA_CONSUMER_CONNECTION_STRING.

topic_options

additional options for consuming from topic. See docs: https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html.

stream

flag to indicate the reading mode: stream or batch

The default df schema coming from Kafka reader of Spark is the following:

key:string value:string topic:string partition:integer offset:long timestamp:timestamp timestampType:integer

But using this reader and passing the desired schema under value_schema we would have the following result:

With value_schema declared as:

>>> value_schema = StructType(
...     [
...         StructField("ts", LongType(), nullable=True),
...         StructField("id", LongType(), nullable=True),
...         StructField("type", StringType(), nullable=True),
...     ]
... )
The output df schema would be:

ts:long id:long type:string kafka_metadata:struct

key:string topic:string value:string partition:integer offset:long timestamp:timestamp timestampType:integer

Instantiation example:

>>> from butterfree.extract.readers import KafkaReader
>>> from butterfree.clients import SparkClient
>>> from pyspark.sql.types import StructType, StructField, StringType, LongType
>>> spark_client = SparkClient()
>>> value_schema = StructType(
...     [
...         StructField("ts", LongType(), nullable=True),
...         StructField("id", LongType(), nullable=True),
...         StructField("type", StringType(), nullable=True),
...     ]
... )
>>> kafka_reader = KafkaReader(
...                 id="kafka_reader_id",
...                 topic="topic",
...                 value_schema=value_schema
...                 connection_string="host1:port,host2:port",
...                )
>>> df = kafka_reader.consume(spark_client)

This last method will use the Spark Client, as default, to read the desired topic, loading data into a dataframe, according to KafkaReader class arguments.

In this case, however, a temp view will be created, containing the transformed data.

KAFKA_COLUMNS = ['key', 'topic', 'value', 'partition', 'offset', 'timestamp', 'timestampType']
consume(client: SparkClient) DataFrame

Extract data from a kafka topic.

When stream mode it will get all the new data arriving at the topic in a streaming dataframe. When not in stream mode it will get all data available in the kafka topic.

Parameters:

client – client responsible for connecting to Spark session.

Returns:

Dataframe with data from topic.

transformations: List[Dict[str, Any]]
class butterfree.extract.readers.TableReader(id: str, table: str, database: Optional[str] = None)

Bases: Reader

Responsible for get data from tables registered in the metastore.

id

unique string id for register the reader as a view on the metastore.

database

name of the metastore database/schema.

table

name of the table.

Example

Simple example regarding TableReader class instantiation.

>>> from butterfree.extract.readers import TableReader
>>> from butterfree.clients import SparkClient
>>> from butterfree.extract.pre_processing import filter
>>> spark_client = SparkClient()
>>> table_reader = TableReader(
...                     id="table_reader_id",
...                     database="table_reader_db",
...                     table="table_reader_table"
...                )
>>> df = table_reader.consume(spark_client)

This last method will use the Spark Client, as default, to read the desired table, loading data into a dataframe, according to TableReader class arguments.

It’s also possible to define simple transformations within the reader’s scope:

>>> table_reader.with_(filter, condition="year = 2019").build(spark_client)

In this case, however, a temp view will be created, cointaining the transformed data.

consume(client: SparkClient) DataFrame

Extract data from a table in Spark metastore.

Parameters:

client – client responsible for connecting to Spark session.

Returns:

Dataframe with all the data from the table.