butterfree.clients package

Submodules

Abstract class for database clients.

class butterfree.clients.abstract_client.AbstractClient

Bases: abc.ABC

Abstract base class for database clients.

abstract property conn

Returns a connection object.

abstract get_schema(table: str, database: str = None) → Any

Returns desired table schema.

table

desired table.

Returns

A list of dictionaries in the format [{“column_name”: “example1”, type: “Spark_type”}, …]

abstract sql(query: str) → Any

Runs a query.

Parameters

query – client query.

Returns

Set of records.

CassandraClient entity.

class butterfree.clients.cassandra_client.CassandraClient(host: List[str], keyspace: str, user: Optional[str] = None, password: Optional[str] = None)

Bases: butterfree.clients.abstract_client.AbstractClient

Cassandra Client.

user

username to use in connection.

password

password to use in connection.

keyspace

key space used in connection.

host

cassandra endpoint used in connection.

property conn

Establishes a Cassandra connection.

create_table(columns: List[butterfree.clients.cassandra_client.CassandraColumn], table: str) → None

Creates a table.

columns

a list dictionaries in the format [{“column_name”: “example1”, type: “cql_type”, primary_key: True}, …]

get_schema(table: str, database: str = None) → List[Dict[str, str]]

Returns desired table schema.

table

desired table.

Returns

A list dictionaries in the format [{“column_name”: “example1”, type: “cql_type”}, …]

sql(query: str) → cassandra.cluster.ResponseFuture

Executes desired query.

query

desired query.

class butterfree.clients.cassandra_client.CassandraColumn(**kwargs)

Bases: dict

Type for cassandra columns.

It’s just a type abstraction, we can use it or a normal dict

>>> def function(column: CassandraColumn) -> CassandraColumn:
...     return column
>>> # The following two lines will pass in the type checking
>>> function({'column_name': 'test', 'type': 'integer', 'primary_key': False})
>>> function(CassandraColumn(column_name='test', type='integer', primary_key=False))
column_name: str
primary_key: bool
type: str

SparkClient entity.

class butterfree.clients.spark_client.SparkClient

Bases: butterfree.clients.abstract_client.AbstractClient

Handle Spark session connection.

Get query results with SQL, reads and writes data on external systems.

add_table_partitions(partitions: List[Dict[str, Any]], table: str, database: str = None) → None

Add partitions to an existing table.

Parameters
  • partitions – partitions to add to the table. It’s expected a list of partition dicts to add to the table. Example: [{“year”: 2020, “month”: 8, “day”: 14}, …]

  • table – table to add the partitions.

  • database – name of the database where the table is saved.

property conn

Gets or creates an SparkSession.

Returns

Spark session

static create_temporary_view(dataframe: pyspark.sql.dataframe.DataFrame, name: str) → Any

Create a temporary view from a given dataframe.

Parameters
  • dataframe – dataframe to be be queried by the view.

  • name – name of the temporary view.

get_schema(table: str, database: str = None) → List[Dict[str, str]]

Returns desired table schema.

table

desired table.

Returns

A list of dictionaries in the format [{“column_name”: “example1”, type: “Spark_type”}, …]

read(format: str, path: Optional[Union[str, List[str]]] = None, schema: Optional[pyspark.sql.types.StructType] = None, stream: bool = False, **options: Any) → pyspark.sql.dataframe.DataFrame

Use the SparkSession.read interface to load data into a dataframe.

Check docs for more information:

https://spark.apache.org/docs/latest/sql-data-sources-load-save-functions.html#generic-loadsave-functions

Parameters
  • format – string with the format to be used by the DataframeReader.

  • path – optional string or a list of string for file-system.

  • stream – flag to indicate if data must be read in stream mode.

  • schema – an optional pyspark.sql.types.StructType for the input schema.

  • options – options to setup the DataframeReader.

Returns

Dataframe

read_table(table: str, database: str = None) → pyspark.sql.dataframe.DataFrame

Use the SparkSession.read interface to read a metastore table.

Parameters
  • database – name of the metastore database/schema

  • table – name of the table in metastore

Returns

Dataframe

sql(query: str) → pyspark.sql.dataframe.DataFrame

Run a query using Spark SQL.

Parameters

query – Spark SQL query.

Returns

Dataframe

static write_dataframe(dataframe: pyspark.sql.dataframe.DataFrame, format_: str, mode: str, **options: Any) → None

Receive a spark DataFrame and write it.

Parameters
  • dataframe – dataframe containing data from a feature set.

  • format – format used to save the dataframe.

  • mode – writing modem can be “error”, “append”, “overwrite” or “ignore”. For more information: [here](https://spark.apache.org/docs/2.3.0/sql-programming-guide.html#save-modes).

  • **options – all other options that can be used in a DataFrameWriter.

write_stream(dataframe: pyspark.sql.dataframe.DataFrame, processing_time: str, output_mode: str, checkpoint_path: Optional[str], format_: str, mode: str, **options: Any) → pyspark.sql.streaming.StreamingQuery

Starts streaming data writing job.

Parameters
  • dataframe – Spark dataframe containing data from a feature set.

  • processing_time – a processing time interval as a string. E.g. ‘5 seconds’, ‘1 minute’. Set a trigger that runs the mini-batch periodically based on the processing time. If the effect of processing data as soon as the data arrives, without having to wait for the time frame, is desired, the value ‘0 seconds’ can be set.

  • output_mode – specifies how data of a streaming DataFrame/Dataset is written to a streaming sink destination.

  • checkpoint_path – path on S3 to save checkpoints for the stream job. These checkpoint can be used on the the job re-start to return from where it stops.

  • format – format used to save the dataframe.

  • mode – writing modem can be “error”, “append”, “overwrite” or “ignore”. For more information: [here](https://spark.apache.org/docs/2.3.0/sql-programming-guide.html#save-modes).

  • **options – all other options that can be used in a DataFrameWriter.

More information about processing_time, output_mode and checkpoint_path can be found in Spark documentation: [here](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html)

Returns

Streaming handler.

static write_table(dataframe: pyspark.sql.dataframe.DataFrame, database: Optional[str], table_name: str, path: str, format_: str = None, mode: str = None, partition_by: List[str] = None, **options: Any) → None

Receive a spark DataFrame and write it as a table in metastore.

Parameters
  • dataframe – spark dataframe containing data from a feature set.

  • database – specified database name.

  • table_name – specified table name.

  • path – string with the local to save the table.

  • format – string with the format used to save.

  • mode – writing mode, it can be: “error”, “append”, “overwrite” or “ignore”. More information: [here](https://spark.apache.org/docs/2.3.0/sql-programming-guide.html#save-modes).

  • partition_by – names of partitioning columns.

  • options – all other options that can be used in a DataFrameWriter.

Module contents

Holds connection clients.

class butterfree.clients.AbstractClient

Bases: abc.ABC

Abstract base class for database clients.

abstract property conn

Returns a connection object.

abstract get_schema(table: str, database: str = None) → Any

Returns desired table schema.

table

desired table.

Returns

A list of dictionaries in the format [{“column_name”: “example1”, type: “Spark_type”}, …]

abstract sql(query: str) → Any

Runs a query.

Parameters

query – client query.

Returns

Set of records.

class butterfree.clients.CassandraClient(host: List[str], keyspace: str, user: Optional[str] = None, password: Optional[str] = None)

Bases: butterfree.clients.abstract_client.AbstractClient

Cassandra Client.

user

username to use in connection.

password

password to use in connection.

keyspace

key space used in connection.

host

cassandra endpoint used in connection.

property conn

Establishes a Cassandra connection.

create_table(columns: List[butterfree.clients.cassandra_client.CassandraColumn], table: str) → None

Creates a table.

columns

a list dictionaries in the format [{“column_name”: “example1”, type: “cql_type”, primary_key: True}, …]

get_schema(table: str, database: str = None) → List[Dict[str, str]]

Returns desired table schema.

table

desired table.

Returns

A list dictionaries in the format [{“column_name”: “example1”, type: “cql_type”}, …]

sql(query: str) → cassandra.cluster.ResponseFuture

Executes desired query.

query

desired query.

class butterfree.clients.SparkClient

Bases: butterfree.clients.abstract_client.AbstractClient

Handle Spark session connection.

Get query results with SQL, reads and writes data on external systems.

add_table_partitions(partitions: List[Dict[str, Any]], table: str, database: str = None) → None

Add partitions to an existing table.

Parameters
  • partitions – partitions to add to the table. It’s expected a list of partition dicts to add to the table. Example: [{“year”: 2020, “month”: 8, “day”: 14}, …]

  • table – table to add the partitions.

  • database – name of the database where the table is saved.

property conn

Gets or creates an SparkSession.

Returns

Spark session

static create_temporary_view(dataframe: pyspark.sql.dataframe.DataFrame, name: str) → Any

Create a temporary view from a given dataframe.

Parameters
  • dataframe – dataframe to be be queried by the view.

  • name – name of the temporary view.

get_schema(table: str, database: str = None) → List[Dict[str, str]]

Returns desired table schema.

table

desired table.

Returns

A list of dictionaries in the format [{“column_name”: “example1”, type: “Spark_type”}, …]

read(format: str, path: Optional[Union[str, List[str]]] = None, schema: Optional[pyspark.sql.types.StructType] = None, stream: bool = False, **options: Any) → pyspark.sql.dataframe.DataFrame

Use the SparkSession.read interface to load data into a dataframe.

Check docs for more information:

https://spark.apache.org/docs/latest/sql-data-sources-load-save-functions.html#generic-loadsave-functions

Parameters
  • format – string with the format to be used by the DataframeReader.

  • path – optional string or a list of string for file-system.

  • stream – flag to indicate if data must be read in stream mode.

  • schema – an optional pyspark.sql.types.StructType for the input schema.

  • options – options to setup the DataframeReader.

Returns

Dataframe

read_table(table: str, database: str = None) → pyspark.sql.dataframe.DataFrame

Use the SparkSession.read interface to read a metastore table.

Parameters
  • database – name of the metastore database/schema

  • table – name of the table in metastore

Returns

Dataframe

sql(query: str) → pyspark.sql.dataframe.DataFrame

Run a query using Spark SQL.

Parameters

query – Spark SQL query.

Returns

Dataframe

static write_dataframe(dataframe: pyspark.sql.dataframe.DataFrame, format_: str, mode: str, **options: Any) → None

Receive a spark DataFrame and write it.

Parameters
  • dataframe – dataframe containing data from a feature set.

  • format – format used to save the dataframe.

  • mode – writing modem can be “error”, “append”, “overwrite” or “ignore”. For more information: [here](https://spark.apache.org/docs/2.3.0/sql-programming-guide.html#save-modes).

  • **options – all other options that can be used in a DataFrameWriter.

write_stream(dataframe: pyspark.sql.dataframe.DataFrame, processing_time: str, output_mode: str, checkpoint_path: Optional[str], format_: str, mode: str, **options: Any) → pyspark.sql.streaming.StreamingQuery

Starts streaming data writing job.

Parameters
  • dataframe – Spark dataframe containing data from a feature set.

  • processing_time – a processing time interval as a string. E.g. ‘5 seconds’, ‘1 minute’. Set a trigger that runs the mini-batch periodically based on the processing time. If the effect of processing data as soon as the data arrives, without having to wait for the time frame, is desired, the value ‘0 seconds’ can be set.

  • output_mode – specifies how data of a streaming DataFrame/Dataset is written to a streaming sink destination.

  • checkpoint_path – path on S3 to save checkpoints for the stream job. These checkpoint can be used on the the job re-start to return from where it stops.

  • format – format used to save the dataframe.

  • mode – writing modem can be “error”, “append”, “overwrite” or “ignore”. For more information: [here](https://spark.apache.org/docs/2.3.0/sql-programming-guide.html#save-modes).

  • **options – all other options that can be used in a DataFrameWriter.

More information about processing_time, output_mode and checkpoint_path can be found in Spark documentation: [here](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html)

Returns

Streaming handler.

static write_table(dataframe: pyspark.sql.dataframe.DataFrame, database: Optional[str], table_name: str, path: str, format_: str = None, mode: str = None, partition_by: List[str] = None, **options: Any) → None

Receive a spark DataFrame and write it as a table in metastore.

Parameters
  • dataframe – spark dataframe containing data from a feature set.

  • database – specified database name.

  • table_name – specified table name.

  • path – string with the local to save the table.

  • format – string with the format used to save.

  • mode – writing mode, it can be: “error”, “append”, “overwrite” or “ignore”. More information: [here](https://spark.apache.org/docs/2.3.0/sql-programming-guide.html#save-modes).

  • partition_by – names of partitioning columns.

  • options – all other options that can be used in a DataFrameWriter.