butterfree.extract.readers package¶
Submodules¶
FileReader entity.
- class butterfree.extract.readers.file_reader.FileReader(id: str, path: str, format: str, schema: Optional[StructType] = None, format_options: Optional[Dict[Any, Any]] = None, stream: bool = False)¶
Bases:
Reader
Responsible for get data from files.
- id¶
unique string id for register the reader as a view on the metastore.
- path¶
file location.
- format¶
can be one of the keys: json, parquet, orc, or csv.
- schema¶
an optional pyspark.sql.types.StructType for the input schema.
- format_options¶
additional options required by some formats. Check docs: https://spark.apache.org/docs/latest/sql-data-sources-load-save-functions.html#manually-specifying-options
Example
Simple example regarding FileReader class instantiation.
>>> from butterfree.extract.readers import FileReader >>> from butterfree.clients import SparkClient >>> from butterfree.extract.pre_processing import filter >>> spark_client = SparkClient() >>> file_reader = FileReader( ... id="file_reader_id", ... path="data_path", ... format="json" ... ) >>> df = file_reader.consume(spark_client)
However, we can define the schema and format_options, like header, and provide them to FileReader.
>>> spark_client = SparkClient() >>> schema_csv = StructType([ ... StructField("column_a", LongType()), ... StructField("column_b", DoubleType()), ... StructField("coumn_c", StringType()) ... ]) >>> file_reader = FileReader( ... id="file_reader_id", ... path="data_path", ... format="csv", ... schema=schema_csv, ... format_options={ ... "header": True ... } ... ) >>> df = file_reader.consume(spark_client)
This last method will use the Spark Client, as default, to read the desired file, loading data into a dataframe, according to FileReader class arguments.
It’s also possible to define simple transformations within the reader’s scope:
>>> file_reader.with_(filter, condition="year = 2019").build(spark_client)
In this case, however, a temp view will be created, cointaining the transformed data.
- consume(client: SparkClient) DataFrame ¶
Extract data from files stored in defined path.
Try to auto-infer schema if in stream mode and not manually defining a schema.
- Parameters:
client – client responsible for connecting to Spark session.
- Returns:
Dataframe with all the files data.
KafkaSource entity.
- class butterfree.extract.readers.kafka_reader.KafkaReader(id: str, topic: str, value_schema: StructType, connection_string: Optional[str] = None, topic_options: Optional[Dict[Any, Any]] = None, stream: bool = True)¶
Bases:
Reader
Responsible for get data from a Kafka topic.
- id¶
unique string id for register the reader as a view on the metastore
- value_schema¶
expected schema of the default column named “value” from Kafka.
- topic¶
string with the Kafka topic name to subscribe.
- connection_string¶
string with hosts and ports to connect. The string need to be in the format: host1:port,host2:port,…,hostN:portN. The argument is not necessary if is passed as a environment variable named KAFKA_CONSUMER_CONNECTION_STRING.
- topic_options¶
additional options for consuming from topic. See docs: https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html.
- stream¶
flag to indicate the reading mode: stream or batch
- The default df schema coming from Kafka reader of Spark is the following:
key:string value:string topic:string partition:integer offset:long timestamp:timestamp timestampType:integer
But using this reader and passing the desired schema under value_schema we would have the following result:
With value_schema declared as:
>>> value_schema = StructType( ... [ ... StructField("ts", LongType(), nullable=True), ... StructField("id", LongType(), nullable=True), ... StructField("type", StringType(), nullable=True), ... ] ... )
- The output df schema would be:
ts:long id:long type:string kafka_metadata:struct
key:string topic:string value:string partition:integer offset:long timestamp:timestamp timestampType:integer
Instantiation example:
>>> from butterfree.extract.readers import KafkaReader >>> from butterfree.clients import SparkClient >>> from pyspark.sql.types import StructType, StructField, StringType, LongType >>> spark_client = SparkClient() >>> value_schema = StructType( ... [ ... StructField("ts", LongType(), nullable=True), ... StructField("id", LongType(), nullable=True), ... StructField("type", StringType(), nullable=True), ... ] ... ) >>> kafka_reader = KafkaReader( ... id="kafka_reader_id", ... topic="topic", ... value_schema=value_schema ... connection_string="host1:port,host2:port", ... ) >>> df = kafka_reader.consume(spark_client)
This last method will use the Spark Client, as default, to read the desired topic, loading data into a dataframe, according to KafkaReader class arguments.
In this case, however, a temp view will be created, containing the transformed data.
- KAFKA_COLUMNS = ['key', 'topic', 'value', 'partition', 'offset', 'timestamp', 'timestampType']¶
- consume(client: SparkClient) DataFrame ¶
Extract data from a kafka topic.
When stream mode it will get all the new data arriving at the topic in a streaming dataframe. When not in stream mode it will get all data available in the kafka topic.
- Parameters:
client – client responsible for connecting to Spark session.
- Returns:
Dataframe with data from topic.
Reader entity.
- class butterfree.extract.readers.reader.Reader(id: str, incremental_strategy: Optional[IncrementalStrategy] = None)¶
Bases:
ABC
,HookableComponent
Abstract base class for Readers.
- id¶
unique string id for register the reader as a view on the metastore.
- transformations¶
list os methods that will be applied over the dataframe after the raw data is extracted.
- build(client: SparkClient, columns: Optional[List[Any]] = None, start_date: Optional[str] = None, end_date: Optional[str] = None) None ¶
Register the data got from the reader in the Spark metastore.
Create a temporary view in Spark metastore referencing the data extracted from the target origin after the application of all the defined pre-processing transformations.
The arguments start_date and end_date are going to be use only when there is a defined IncrementalStrategy on the Reader.
- Parameters:
client – client responsible for connecting to Spark session.
columns – list of tuples for selecting/renaming columns on the df.
start_date – lower bound to use in the filter expression.
end_date – upper bound to use in the filter expression.
- abstract consume(client: SparkClient) DataFrame ¶
Extract data from target origin.
- Parameters:
client – client responsible for connecting to Spark session.
- Returns:
Dataframe with all the data.
- Returns:
Spark dataframe
- with_(transformer: Callable[[...], DataFrame], *args: Any, **kwargs: Any) Any ¶
Define a new transformation for the Reader.
All the transformations are used when the method consume is called.
- Parameters:
transformer – method that receives a dataframe and output a dataframe.
*args – args for the transformer.
**kwargs – kwargs for the transformer.
- Returns:
Reader object with new transformation
- with_incremental_strategy(incremental_strategy: IncrementalStrategy) Reader ¶
Define the incremental strategy for the Reader.
- Parameters:
incremental_strategy – definition of the incremental strategy.
- Returns:
Reader with defined incremental strategy.
TableSource entity.
- class butterfree.extract.readers.table_reader.TableReader(id: str, table: str, database: Optional[str] = None)¶
Bases:
Reader
Responsible for get data from tables registered in the metastore.
- id¶
unique string id for register the reader as a view on the metastore.
- database¶
name of the metastore database/schema.
- table¶
name of the table.
Example
Simple example regarding TableReader class instantiation.
>>> from butterfree.extract.readers import TableReader >>> from butterfree.clients import SparkClient >>> from butterfree.extract.pre_processing import filter >>> spark_client = SparkClient() >>> table_reader = TableReader( ... id="table_reader_id", ... database="table_reader_db", ... table="table_reader_table" ... ) >>> df = table_reader.consume(spark_client)
This last method will use the Spark Client, as default, to read the desired table, loading data into a dataframe, according to TableReader class arguments.
It’s also possible to define simple transformations within the reader’s scope:
>>> table_reader.with_(filter, condition="year = 2019").build(spark_client)
In this case, however, a temp view will be created, cointaining the transformed data.
- consume(client: SparkClient) DataFrame ¶
Extract data from a table in Spark metastore.
- Parameters:
client – client responsible for connecting to Spark session.
- Returns:
Dataframe with all the data from the table.
Module contents¶
The Reader Component of a Source.
- class butterfree.extract.readers.FileReader(id: str, path: str, format: str, schema: Optional[StructType] = None, format_options: Optional[Dict[Any, Any]] = None, stream: bool = False)¶
Bases:
Reader
Responsible for get data from files.
- id¶
unique string id for register the reader as a view on the metastore.
- path¶
file location.
- format¶
can be one of the keys: json, parquet, orc, or csv.
- schema¶
an optional pyspark.sql.types.StructType for the input schema.
- format_options¶
additional options required by some formats. Check docs: https://spark.apache.org/docs/latest/sql-data-sources-load-save-functions.html#manually-specifying-options
Example
Simple example regarding FileReader class instantiation.
>>> from butterfree.extract.readers import FileReader >>> from butterfree.clients import SparkClient >>> from butterfree.extract.pre_processing import filter >>> spark_client = SparkClient() >>> file_reader = FileReader( ... id="file_reader_id", ... path="data_path", ... format="json" ... ) >>> df = file_reader.consume(spark_client)
However, we can define the schema and format_options, like header, and provide them to FileReader.
>>> spark_client = SparkClient() >>> schema_csv = StructType([ ... StructField("column_a", LongType()), ... StructField("column_b", DoubleType()), ... StructField("coumn_c", StringType()) ... ]) >>> file_reader = FileReader( ... id="file_reader_id", ... path="data_path", ... format="csv", ... schema=schema_csv, ... format_options={ ... "header": True ... } ... ) >>> df = file_reader.consume(spark_client)
This last method will use the Spark Client, as default, to read the desired file, loading data into a dataframe, according to FileReader class arguments.
It’s also possible to define simple transformations within the reader’s scope:
>>> file_reader.with_(filter, condition="year = 2019").build(spark_client)
In this case, however, a temp view will be created, cointaining the transformed data.
- consume(client: SparkClient) DataFrame ¶
Extract data from files stored in defined path.
Try to auto-infer schema if in stream mode and not manually defining a schema.
- Parameters:
client – client responsible for connecting to Spark session.
- Returns:
Dataframe with all the files data.
- class butterfree.extract.readers.KafkaReader(id: str, topic: str, value_schema: StructType, connection_string: Optional[str] = None, topic_options: Optional[Dict[Any, Any]] = None, stream: bool = True)¶
Bases:
Reader
Responsible for get data from a Kafka topic.
- id¶
unique string id for register the reader as a view on the metastore
- value_schema¶
expected schema of the default column named “value” from Kafka.
- topic¶
string with the Kafka topic name to subscribe.
- connection_string¶
string with hosts and ports to connect. The string need to be in the format: host1:port,host2:port,…,hostN:portN. The argument is not necessary if is passed as a environment variable named KAFKA_CONSUMER_CONNECTION_STRING.
- topic_options¶
additional options for consuming from topic. See docs: https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html.
- stream¶
flag to indicate the reading mode: stream or batch
- The default df schema coming from Kafka reader of Spark is the following:
key:string value:string topic:string partition:integer offset:long timestamp:timestamp timestampType:integer
But using this reader and passing the desired schema under value_schema we would have the following result:
With value_schema declared as:
>>> value_schema = StructType( ... [ ... StructField("ts", LongType(), nullable=True), ... StructField("id", LongType(), nullable=True), ... StructField("type", StringType(), nullable=True), ... ] ... )
- The output df schema would be:
ts:long id:long type:string kafka_metadata:struct
key:string topic:string value:string partition:integer offset:long timestamp:timestamp timestampType:integer
Instantiation example:
>>> from butterfree.extract.readers import KafkaReader >>> from butterfree.clients import SparkClient >>> from pyspark.sql.types import StructType, StructField, StringType, LongType >>> spark_client = SparkClient() >>> value_schema = StructType( ... [ ... StructField("ts", LongType(), nullable=True), ... StructField("id", LongType(), nullable=True), ... StructField("type", StringType(), nullable=True), ... ] ... ) >>> kafka_reader = KafkaReader( ... id="kafka_reader_id", ... topic="topic", ... value_schema=value_schema ... connection_string="host1:port,host2:port", ... ) >>> df = kafka_reader.consume(spark_client)
This last method will use the Spark Client, as default, to read the desired topic, loading data into a dataframe, according to KafkaReader class arguments.
In this case, however, a temp view will be created, containing the transformed data.
- KAFKA_COLUMNS = ['key', 'topic', 'value', 'partition', 'offset', 'timestamp', 'timestampType']¶
- consume(client: SparkClient) DataFrame ¶
Extract data from a kafka topic.
When stream mode it will get all the new data arriving at the topic in a streaming dataframe. When not in stream mode it will get all data available in the kafka topic.
- Parameters:
client – client responsible for connecting to Spark session.
- Returns:
Dataframe with data from topic.
- transformations: List[Dict[str, Any]]¶
- class butterfree.extract.readers.TableReader(id: str, table: str, database: Optional[str] = None)¶
Bases:
Reader
Responsible for get data from tables registered in the metastore.
- id¶
unique string id for register the reader as a view on the metastore.
- database¶
name of the metastore database/schema.
- table¶
name of the table.
Example
Simple example regarding TableReader class instantiation.
>>> from butterfree.extract.readers import TableReader >>> from butterfree.clients import SparkClient >>> from butterfree.extract.pre_processing import filter >>> spark_client = SparkClient() >>> table_reader = TableReader( ... id="table_reader_id", ... database="table_reader_db", ... table="table_reader_table" ... ) >>> df = table_reader.consume(spark_client)
This last method will use the Spark Client, as default, to read the desired table, loading data into a dataframe, according to TableReader class arguments.
It’s also possible to define simple transformations within the reader’s scope:
>>> table_reader.with_(filter, condition="year = 2019").build(spark_client)
In this case, however, a temp view will be created, cointaining the transformed data.
- consume(client: SparkClient) DataFrame ¶
Extract data from a table in Spark metastore.
- Parameters:
client – client responsible for connecting to Spark session.
- Returns:
Dataframe with all the data from the table.