butterfree.clients package¶
Submodules¶
Abstract class for database clients.
- class butterfree.clients.abstract_client.AbstractClient¶
Bases:
ABC
Abstract base class for database clients.
- abstract property conn: Any¶
Returns a connection object.
- abstract get_schema(table: str, database: Optional[str] = None) Any ¶
Returns desired table schema.
- table¶
desired table.
- Returns:
A list of dictionaries in the format [{“column_name”: “example1”, type: “Spark_type”}, …]
- abstract sql(query: str) Any ¶
Runs a query.
- Parameters:
query – client query.
- Returns:
Set of records.
CassandraClient entity.
- class butterfree.clients.cassandra_client.CassandraClient(host: List[str], keyspace: str, user: Optional[str] = None, password: Optional[str] = None)¶
Bases:
AbstractClient
Cassandra Client.
- user¶
username to use in connection.
- password¶
password to use in connection.
- keyspace¶
key space used in connection.
- host¶
cassandra endpoint used in connection.
- property conn: Session¶
Establishes a Cassandra connection.
- create_table(columns: List[CassandraColumn], table: str) None ¶
Creates a table.
- columns¶
a list dictionaries in the format [{“column_name”: “example1”, type: “cql_type”, primary_key: True}, …]
- class butterfree.clients.cassandra_client.CassandraColumn(**kwargs)¶
Bases:
dict
Type for cassandra columns.
It’s just a type abstraction, we can use it or a normal dict
>>> def function(column: CassandraColumn) -> CassandraColumn: ... return column >>> # The following two lines will pass in the type checking >>> function({'column_name': 'test', 'type': 'integer', 'primary_key': False}) >>> function(CassandraColumn(column_name='test', type='integer', primary_key=False))
- column_name: str¶
- primary_key: bool¶
- type: str¶
SparkClient entity.
- class butterfree.clients.spark_client.SparkClient¶
Bases:
AbstractClient
Handle Spark session connection.
Get query results with SQL, reads and writes data on external systems.
- add_table_partitions(partitions: List[Dict[str, Any]], table: str, database: Optional[str] = None) None ¶
Add partitions to an existing table.
- Parameters:
partitions – partitions to add to the table. It’s expected a list of partition dicts to add to the table. Example: [{“year”: 2020, “month”: 8, “day”: 14}, …]
table – table to add the partitions.
database – name of the database where the table is saved.
- property conn: SparkSession¶
Gets or creates an SparkSession.
- Returns:
Spark session
- static create_temporary_view(dataframe: DataFrame, name: str) Any ¶
Create a temporary view from a given dataframe.
- Parameters:
dataframe – dataframe to be be queried by the view.
name – name of the temporary view.
- get_schema(table: str, database: Optional[str] = None) List[Dict[str, str]] ¶
Returns desired table schema.
- table¶
desired table.
- Returns:
A list of dictionaries in the format [{“column_name”: “example1”, type: “Spark_type”}, …]
- read(format: str, path: Optional[Union[str, List[str]]] = None, schema: Optional[StructType] = None, stream: bool = False, **options: Any) DataFrame ¶
Use the SparkSession.read interface to load data into a dataframe.
- Check docs for more information:
- Parameters:
format – string with the format to be used by the DataframeReader.
path – optional string or a list of string for file-system.
stream – flag to indicate if data must be read in stream mode.
schema – an optional pyspark.sql.types.StructType for the input schema.
options – options to setup the DataframeReader.
- Returns:
Dataframe
- read_table(table: str, database: Optional[str] = None) DataFrame ¶
Use the SparkSession.read interface to read a metastore table.
- Parameters:
database – name of the metastore database/schema
table – name of the table in metastore
- Returns:
Dataframe
- sql(query: str) DataFrame ¶
Run a query using Spark SQL.
- Parameters:
query – Spark SQL query.
- Returns:
Dataframe
- static write_dataframe(dataframe: DataFrame, format_: str, mode: str, **options: Any) None ¶
Receive a spark DataFrame and write it.
- Parameters:
dataframe – dataframe containing data from a feature set.
format – format used to save the dataframe.
mode – writing modem can be “error”, “append”, “overwrite” or “ignore”. For more information: [here](https://spark.apache.org/docs/2.3.0/sql-programming-guide.html#save-modes).
**options – all other options that can be used in a DataFrameWriter.
- write_stream(dataframe: DataFrame, processing_time: str, output_mode: str, checkpoint_path: Optional[str], format_: str, mode: str, **options: Any) StreamingQuery ¶
Starts streaming data writing job.
- Parameters:
dataframe – Spark dataframe containing data from a feature set.
processing_time – a processing time interval as a string. E.g. ‘5 seconds’, ‘1 minute’. Set a trigger that runs the mini-batch periodically based on the processing time. If the effect of processing data as soon as the data arrives, without having to wait for the time frame, is desired, the value ‘0 seconds’ can be set.
output_mode – specifies how data of a streaming DataFrame/Dataset is written to a streaming sink destination.
checkpoint_path – path on S3 to save checkpoints for the stream job. These checkpoint can be used on the the job re-start to return from where it stops.
format – format used to save the dataframe.
mode – writing modem can be “error”, “append”, “overwrite” or “ignore”. For more information: [here](https://spark.apache.org/docs/2.3.0/sql-programming-guide.html#save-modes).
**options – all other options that can be used in a DataFrameWriter.
More information about processing_time, output_mode and checkpoint_path can be found in Spark documentation: [here](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html)
- Returns:
Streaming handler.
- static write_table(dataframe: DataFrame, database: Optional[str], table_name: str, path: str, format_: Optional[str] = None, mode: Optional[str] = None, partition_by: Optional[List[str]] = None, **options: Any) None ¶
Receive a spark DataFrame and write it as a table in metastore.
- Parameters:
dataframe – spark dataframe containing data from a feature set.
database – specified database name.
table_name – specified table name.
path – string with the local to save the table.
format – string with the format used to save.
mode – writing mode, it can be: “error”, “append”, “overwrite” or “ignore”. More information: [here](https://spark.apache.org/docs/2.3.0/sql-programming-guide.html#save-modes).
partition_by – names of partitioning columns.
options – all other options that can be used in a DataFrameWriter.
Module contents¶
Holds connection clients.
- class butterfree.clients.AbstractClient¶
Bases:
ABC
Abstract base class for database clients.
- abstract property conn: Any¶
Returns a connection object.
- abstract get_schema(table: str, database: Optional[str] = None) Any ¶
Returns desired table schema.
- table¶
desired table.
- Returns:
A list of dictionaries in the format [{“column_name”: “example1”, type: “Spark_type”}, …]
- abstract sql(query: str) Any ¶
Runs a query.
- Parameters:
query – client query.
- Returns:
Set of records.
- class butterfree.clients.CassandraClient(host: List[str], keyspace: str, user: Optional[str] = None, password: Optional[str] = None)¶
Bases:
AbstractClient
Cassandra Client.
- user¶
username to use in connection.
- password¶
password to use in connection.
- keyspace¶
key space used in connection.
- host¶
cassandra endpoint used in connection.
- property conn: Session¶
Establishes a Cassandra connection.
- create_table(columns: List[CassandraColumn], table: str) None ¶
Creates a table.
- columns¶
a list dictionaries in the format [{“column_name”: “example1”, type: “cql_type”, primary_key: True}, …]
- class butterfree.clients.SparkClient¶
Bases:
AbstractClient
Handle Spark session connection.
Get query results with SQL, reads and writes data on external systems.
- add_table_partitions(partitions: List[Dict[str, Any]], table: str, database: Optional[str] = None) None ¶
Add partitions to an existing table.
- Parameters:
partitions – partitions to add to the table. It’s expected a list of partition dicts to add to the table. Example: [{“year”: 2020, “month”: 8, “day”: 14}, …]
table – table to add the partitions.
database – name of the database where the table is saved.
- property conn: SparkSession¶
Gets or creates an SparkSession.
- Returns:
Spark session
- static create_temporary_view(dataframe: DataFrame, name: str) Any ¶
Create a temporary view from a given dataframe.
- Parameters:
dataframe – dataframe to be be queried by the view.
name – name of the temporary view.
- get_schema(table: str, database: Optional[str] = None) List[Dict[str, str]] ¶
Returns desired table schema.
- table¶
desired table.
- Returns:
A list of dictionaries in the format [{“column_name”: “example1”, type: “Spark_type”}, …]
- read(format: str, path: Optional[Union[str, List[str]]] = None, schema: Optional[StructType] = None, stream: bool = False, **options: Any) DataFrame ¶
Use the SparkSession.read interface to load data into a dataframe.
- Check docs for more information:
- Parameters:
format – string with the format to be used by the DataframeReader.
path – optional string or a list of string for file-system.
stream – flag to indicate if data must be read in stream mode.
schema – an optional pyspark.sql.types.StructType for the input schema.
options – options to setup the DataframeReader.
- Returns:
Dataframe
- read_table(table: str, database: Optional[str] = None) DataFrame ¶
Use the SparkSession.read interface to read a metastore table.
- Parameters:
database – name of the metastore database/schema
table – name of the table in metastore
- Returns:
Dataframe
- sql(query: str) DataFrame ¶
Run a query using Spark SQL.
- Parameters:
query – Spark SQL query.
- Returns:
Dataframe
- static write_dataframe(dataframe: DataFrame, format_: str, mode: str, **options: Any) None ¶
Receive a spark DataFrame and write it.
- Parameters:
dataframe – dataframe containing data from a feature set.
format – format used to save the dataframe.
mode – writing modem can be “error”, “append”, “overwrite” or “ignore”. For more information: [here](https://spark.apache.org/docs/2.3.0/sql-programming-guide.html#save-modes).
**options – all other options that can be used in a DataFrameWriter.
- write_stream(dataframe: DataFrame, processing_time: str, output_mode: str, checkpoint_path: Optional[str], format_: str, mode: str, **options: Any) StreamingQuery ¶
Starts streaming data writing job.
- Parameters:
dataframe – Spark dataframe containing data from a feature set.
processing_time – a processing time interval as a string. E.g. ‘5 seconds’, ‘1 minute’. Set a trigger that runs the mini-batch periodically based on the processing time. If the effect of processing data as soon as the data arrives, without having to wait for the time frame, is desired, the value ‘0 seconds’ can be set.
output_mode – specifies how data of a streaming DataFrame/Dataset is written to a streaming sink destination.
checkpoint_path – path on S3 to save checkpoints for the stream job. These checkpoint can be used on the the job re-start to return from where it stops.
format – format used to save the dataframe.
mode – writing modem can be “error”, “append”, “overwrite” or “ignore”. For more information: [here](https://spark.apache.org/docs/2.3.0/sql-programming-guide.html#save-modes).
**options – all other options that can be used in a DataFrameWriter.
More information about processing_time, output_mode and checkpoint_path can be found in Spark documentation: [here](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html)
- Returns:
Streaming handler.
- static write_table(dataframe: DataFrame, database: Optional[str], table_name: str, path: str, format_: Optional[str] = None, mode: Optional[str] = None, partition_by: Optional[List[str]] = None, **options: Any) None ¶
Receive a spark DataFrame and write it as a table in metastore.
- Parameters:
dataframe – spark dataframe containing data from a feature set.
database – specified database name.
table_name – specified table name.
path – string with the local to save the table.
format – string with the format used to save.
mode – writing mode, it can be: “error”, “append”, “overwrite” or “ignore”. More information: [here](https://spark.apache.org/docs/2.3.0/sql-programming-guide.html#save-modes).
partition_by – names of partitioning columns.
options – all other options that can be used in a DataFrameWriter.