butterfree.dataframe_service package¶
Submodules¶
IncrementalStrategy entity.
- class butterfree.dataframe_service.incremental_strategy.IncrementalStrategy(column: Optional[str] = None)¶
Bases:
object
Define an incremental strategy to be used on data sources.
Entity responsible for defining a column expression that will be used to filter the original data source. The purpose is to get only the data related to a specific pipeline execution time interval.
- column¶
column expression on which incremental filter will be applied. The expression need to result on a date or timestamp format, so the filter can properly work with the defined upper and lower bounds.
- filter_with_incremental_strategy(dataframe: DataFrame, start_date: Optional[str] = None, end_date: Optional[str] = None) DataFrame ¶
Filters the dataframe according to the date boundaries.
- Parameters:
dataframe – dataframe that will be filtered.
start_date – date lower bound to use in the filter.
end_date – date upper bound to use in the filter.
- Returns:
Filtered dataframe based on defined time boundaries.
- from_milliseconds(column_name: str) IncrementalStrategy ¶
Create a column expression from ts column defined as milliseconds.
- Parameters:
column_name – column name where the filter will be applied.
- Returns:
IncrementalStrategy with the defined column expression.
- from_string(column_name: str, mask: Optional[str] = None) IncrementalStrategy ¶
Create a column expression from ts column defined as a simple string.
- Parameters:
column_name – column name where the filter will be applied.
mask – mask defining the date/timestamp format on the string.
- Returns:
IncrementalStrategy with the defined column expression.
- from_year_month_day_partitions(year_column: str = 'year', month_column: str = 'month', day_column: str = 'day') IncrementalStrategy ¶
Create a column expression from year, month and day partitions.
- Parameters:
year_column – column name from the year partition.
month_column – column name from the month partition.
day_column – column name from the day partition.
- Returns:
IncrementalStrategy with the defined column expression.
- get_expression(start_date: Optional[str] = None, end_date: Optional[str] = None) str ¶
Get the incremental filter expression using the defined dates.
Both arguments can be set to defined a specific date interval, but it’s only necessary to set one of the arguments for this method to work.
- Parameters:
start_date – date lower bound to use in the filter.
end_date – date upper bound to use in the filter.
- Returns:
Filter expression based on defined column and bounds.
- Raises:
ValuerError – If both arguments, start_date and end_date, are None.
ValueError – If the column expression was not defined.
Module defining partitioning methods.
- butterfree.dataframe_service.partitioning.extract_partition_values(dataframe: DataFrame, partition_columns: List[str]) List[Dict[str, Any]] ¶
Extract distinct partition values from a given dataframe.
- Parameters:
dataframe – dataframe from where to extract partition values.
partition_columns – name of partition columns presented on the dataframe.
- Returns:
distinct partition values.
Module where there are repartition methods.
- butterfree.dataframe_service.repartition.repartition_df(dataframe: DataFrame, partition_by: List[str], num_partitions: Optional[int] = None, num_processors: Optional[int] = None) DataFrame ¶
Partition the DataFrame.
- Parameters:
dataframe – Spark DataFrame.
partition_by – list of partitions.
num_processors – number of processors.
num_partitions – number of partitions.
- Returns:
Partitioned dataframe.
- butterfree.dataframe_service.repartition.repartition_sort_df(dataframe: DataFrame, partition_by: List[str], order_by: List[str], num_processors: Optional[int] = None, num_partitions: Optional[int] = None) DataFrame ¶
Partition and Sort the DataFrame.
- Parameters:
dataframe – Spark DataFrame.
partition_by – list of columns to partition by.
order_by – list of columns to order by.
num_processors – number of processors.
num_partitions – number of partitions.
- Returns:
Partitioned and sorted dataframe.
Module contents¶
Dataframe optimization components regarding Butterfree.
- class butterfree.dataframe_service.IncrementalStrategy(column: Optional[str] = None)¶
Bases:
object
Define an incremental strategy to be used on data sources.
Entity responsible for defining a column expression that will be used to filter the original data source. The purpose is to get only the data related to a specific pipeline execution time interval.
- column¶
column expression on which incremental filter will be applied. The expression need to result on a date or timestamp format, so the filter can properly work with the defined upper and lower bounds.
- filter_with_incremental_strategy(dataframe: DataFrame, start_date: Optional[str] = None, end_date: Optional[str] = None) DataFrame ¶
Filters the dataframe according to the date boundaries.
- Parameters:
dataframe – dataframe that will be filtered.
start_date – date lower bound to use in the filter.
end_date – date upper bound to use in the filter.
- Returns:
Filtered dataframe based on defined time boundaries.
- from_milliseconds(column_name: str) IncrementalStrategy ¶
Create a column expression from ts column defined as milliseconds.
- Parameters:
column_name – column name where the filter will be applied.
- Returns:
IncrementalStrategy with the defined column expression.
- from_string(column_name: str, mask: Optional[str] = None) IncrementalStrategy ¶
Create a column expression from ts column defined as a simple string.
- Parameters:
column_name – column name where the filter will be applied.
mask – mask defining the date/timestamp format on the string.
- Returns:
IncrementalStrategy with the defined column expression.
- from_year_month_day_partitions(year_column: str = 'year', month_column: str = 'month', day_column: str = 'day') IncrementalStrategy ¶
Create a column expression from year, month and day partitions.
- Parameters:
year_column – column name from the year partition.
month_column – column name from the month partition.
day_column – column name from the day partition.
- Returns:
IncrementalStrategy with the defined column expression.
- get_expression(start_date: Optional[str] = None, end_date: Optional[str] = None) str ¶
Get the incremental filter expression using the defined dates.
Both arguments can be set to defined a specific date interval, but it’s only necessary to set one of the arguments for this method to work.
- Parameters:
start_date – date lower bound to use in the filter.
end_date – date upper bound to use in the filter.
- Returns:
Filter expression based on defined column and bounds.
- Raises:
ValuerError – If both arguments, start_date and end_date, are None.
ValueError – If the column expression was not defined.
- butterfree.dataframe_service.extract_partition_values(dataframe: DataFrame, partition_columns: List[str]) List[Dict[str, Any]] ¶
Extract distinct partition values from a given dataframe.
- Parameters:
dataframe – dataframe from where to extract partition values.
partition_columns – name of partition columns presented on the dataframe.
- Returns:
distinct partition values.
- butterfree.dataframe_service.repartition_df(dataframe: DataFrame, partition_by: List[str], num_partitions: Optional[int] = None, num_processors: Optional[int] = None) DataFrame ¶
Partition the DataFrame.
- Parameters:
dataframe – Spark DataFrame.
partition_by – list of partitions.
num_processors – number of processors.
num_partitions – number of partitions.
- Returns:
Partitioned dataframe.
- butterfree.dataframe_service.repartition_sort_df(dataframe: DataFrame, partition_by: List[str], order_by: List[str], num_processors: Optional[int] = None, num_partitions: Optional[int] = None) DataFrame ¶
Partition and Sort the DataFrame.
- Parameters:
dataframe – Spark DataFrame.
partition_by – list of columns to partition by.
order_by – list of columns to order by.
num_processors – number of processors.
num_partitions – number of partitions.
- Returns:
Partitioned and sorted dataframe.