butterfree.dataframe_service package

Submodules

IncrementalStrategy entity.

class butterfree.dataframe_service.incremental_strategy.IncrementalStrategy(column: Optional[str] = None)

Bases: object

Define an incremental strategy to be used on data sources.

Entity responsible for defining a column expression that will be used to filter the original data source. The purpose is to get only the data related to a specific pipeline execution time interval.

column

column expression on which incremental filter will be applied. The expression need to result on a date or timestamp format, so the filter can properly work with the defined upper and lower bounds.

filter_with_incremental_strategy(dataframe: pyspark.sql.dataframe.DataFrame, start_date: Optional[str] = None, end_date: Optional[str] = None) → pyspark.sql.dataframe.DataFrame

Filters the dataframe according to the date boundaries.

Parameters
  • dataframe – dataframe that will be filtered.

  • start_date – date lower bound to use in the filter.

  • end_date – date upper bound to use in the filter.

Returns

Filtered dataframe based on defined time boundaries.

from_milliseconds(column_name: str)butterfree.dataframe_service.incremental_strategy.IncrementalStrategy

Create a column expression from ts column defined as milliseconds.

Parameters

column_name – column name where the filter will be applied.

Returns

IncrementalStrategy with the defined column expression.

from_string(column_name: str, mask: Optional[str] = None)butterfree.dataframe_service.incremental_strategy.IncrementalStrategy

Create a column expression from ts column defined as a simple string.

Parameters
  • column_name – column name where the filter will be applied.

  • mask – mask defining the date/timestamp format on the string.

Returns

IncrementalStrategy with the defined column expression.

from_year_month_day_partitions(year_column: str = 'year', month_column: str = 'month', day_column: str = 'day')butterfree.dataframe_service.incremental_strategy.IncrementalStrategy

Create a column expression from year, month and day partitions.

Parameters
  • year_column – column name from the year partition.

  • month_column – column name from the month partition.

  • day_column – column name from the day partition.

Returns

IncrementalStrategy with the defined column expression.

get_expression(start_date: Optional[str] = None, end_date: Optional[str] = None) → str

Get the incremental filter expression using the defined dates.

Both arguments can be set to defined a specific date interval, but it’s only necessary to set one of the arguments for this method to work.

Parameters
  • start_date – date lower bound to use in the filter.

  • end_date – date upper bound to use in the filter.

Returns

Filter expression based on defined column and bounds.

Raises
  • ValuerError – If both arguments, start_date and end_date, are None.

  • ValueError – If the column expression was not defined.

Module defining partitioning methods.

butterfree.dataframe_service.partitioning.extract_partition_values(dataframe: pyspark.sql.dataframe.DataFrame, partition_columns: List[str]) → List[Dict[str, Any]]

Extract distinct partition values from a given dataframe.

Parameters
  • dataframe – dataframe from where to extract partition values.

  • partition_columns – name of partition columns presented on the dataframe.

Returns

distinct partition values.

Module where there are repartition methods.

butterfree.dataframe_service.repartition.repartition_df(dataframe: pyspark.sql.dataframe.DataFrame, partition_by: List[str], num_partitions: int = None, num_processors: int = None) → pyspark.sql.dataframe.DataFrame

Partition the DataFrame.

Parameters
  • dataframe – Spark DataFrame.

  • partition_by – list of partitions.

  • num_processors – number of processors.

  • num_partitions – number of partitions.

Returns

Partitioned dataframe.

butterfree.dataframe_service.repartition.repartition_sort_df(dataframe: pyspark.sql.dataframe.DataFrame, partition_by: List[str], order_by: List[str], num_processors: int = None, num_partitions: int = None) → pyspark.sql.dataframe.DataFrame

Partition and Sort the DataFrame.

Parameters
  • dataframe – Spark DataFrame.

  • partition_by – list of columns to partition by.

  • order_by – list of columns to order by.

  • num_processors – number of processors.

  • num_partitions – number of partitions.

Returns

Partitioned and sorted dataframe.

Module contents

Dataframe optimization components regarding Butterfree.

class butterfree.dataframe_service.IncrementalStrategy(column: Optional[str] = None)

Bases: object

Define an incremental strategy to be used on data sources.

Entity responsible for defining a column expression that will be used to filter the original data source. The purpose is to get only the data related to a specific pipeline execution time interval.

column

column expression on which incremental filter will be applied. The expression need to result on a date or timestamp format, so the filter can properly work with the defined upper and lower bounds.

filter_with_incremental_strategy(dataframe: pyspark.sql.dataframe.DataFrame, start_date: Optional[str] = None, end_date: Optional[str] = None) → pyspark.sql.dataframe.DataFrame

Filters the dataframe according to the date boundaries.

Parameters
  • dataframe – dataframe that will be filtered.

  • start_date – date lower bound to use in the filter.

  • end_date – date upper bound to use in the filter.

Returns

Filtered dataframe based on defined time boundaries.

from_milliseconds(column_name: str)butterfree.dataframe_service.incremental_strategy.IncrementalStrategy

Create a column expression from ts column defined as milliseconds.

Parameters

column_name – column name where the filter will be applied.

Returns

IncrementalStrategy with the defined column expression.

from_string(column_name: str, mask: Optional[str] = None)butterfree.dataframe_service.incremental_strategy.IncrementalStrategy

Create a column expression from ts column defined as a simple string.

Parameters
  • column_name – column name where the filter will be applied.

  • mask – mask defining the date/timestamp format on the string.

Returns

IncrementalStrategy with the defined column expression.

from_year_month_day_partitions(year_column: str = 'year', month_column: str = 'month', day_column: str = 'day')butterfree.dataframe_service.incremental_strategy.IncrementalStrategy

Create a column expression from year, month and day partitions.

Parameters
  • year_column – column name from the year partition.

  • month_column – column name from the month partition.

  • day_column – column name from the day partition.

Returns

IncrementalStrategy with the defined column expression.

get_expression(start_date: Optional[str] = None, end_date: Optional[str] = None) → str

Get the incremental filter expression using the defined dates.

Both arguments can be set to defined a specific date interval, but it’s only necessary to set one of the arguments for this method to work.

Parameters
  • start_date – date lower bound to use in the filter.

  • end_date – date upper bound to use in the filter.

Returns

Filter expression based on defined column and bounds.

Raises
  • ValuerError – If both arguments, start_date and end_date, are None.

  • ValueError – If the column expression was not defined.

butterfree.dataframe_service.extract_partition_values(dataframe: pyspark.sql.dataframe.DataFrame, partition_columns: List[str]) → List[Dict[str, Any]]

Extract distinct partition values from a given dataframe.

Parameters
  • dataframe – dataframe from where to extract partition values.

  • partition_columns – name of partition columns presented on the dataframe.

Returns

distinct partition values.

butterfree.dataframe_service.repartition_df(dataframe: pyspark.sql.dataframe.DataFrame, partition_by: List[str], num_partitions: int = None, num_processors: int = None) → pyspark.sql.dataframe.DataFrame

Partition the DataFrame.

Parameters
  • dataframe – Spark DataFrame.

  • partition_by – list of partitions.

  • num_processors – number of processors.

  • num_partitions – number of partitions.

Returns

Partitioned dataframe.

butterfree.dataframe_service.repartition_sort_df(dataframe: pyspark.sql.dataframe.DataFrame, partition_by: List[str], order_by: List[str], num_processors: int = None, num_partitions: int = None) → pyspark.sql.dataframe.DataFrame

Partition and Sort the DataFrame.

Parameters
  • dataframe – Spark DataFrame.

  • partition_by – list of columns to partition by.

  • order_by – list of columns to order by.

  • num_processors – number of processors.

  • num_partitions – number of partitions.

Returns

Partitioned and sorted dataframe.