butterfree.extract.pre_processing package¶
Submodules¶
Explode json column for dataframes.
- butterfree.extract.pre_processing.explode_json_column_transform.explode_json_column(df: DataFrame, column: str, json_schema: StructType) DataFrame ¶
Create new columns extracting properties from a JSON column.
Example:
>>> from pyspark import SparkContext >>> from pyspark.sql import session >>> from butterfree.testing.dataframe import create_df_from_collection >>> from butterfree.extract.pre_processing import explode_json_column >>> from pyspark.sql.types import ( ... ArrayType, ... IntegerType, ... StringType, ... StructField, ... StructType, ... ) >>> spark_context = SparkContext.getOrCreate() >>> spark_session = session.SparkSession(spark_context) >>> data = [{"json_column": '{"a": 123, "b": "abc", "c": "123", "d": [1, 2, 3]}'}] >>> df = create_df_from_collection(data, spark_context, spark_session) >>> df.collect()
[Row(json_column=’{“a”: 123, “b”: “abc”, “c”: “123”, “d”: [1, 2, 3]}’)]
>>> json_column_schema = StructType( ... [ ... StructField("a", IntegerType()), ... StructField("b", StringType()), ... StructField("c", IntegerType()), ... StructField("d", ArrayType(IntegerType())), ... ] >>> explode_json_column( ... df, column='json_column', json_schema=json_column_schema ... ).collect()
- [
- Row(
json_column=’{“a”: 123, “b”: “abc”, “c”: “123”, “d”: [1, 2, 3]}’, a=123, b=’abc’, c=123, d=[1, 2, 3]
)
]
- Parameters:
df – input dataframe with the target JSON column.
column – column name that is going to be exploded.
json_schema – expected schema from that JSON column. Not all “first layer” fields need to be mapped in the json_schema, just the desired columns. If there is any JSON field that is needed to be cast to a struct, the declared expected schema (a StructType) need to have the exact same schema as the presented record, if don’t, the value in the resulting column will be null.
- Returns:
dataframe with the new extracted columns from the JSON column.
Module where filter DataFrames coming from readers.
- butterfree.extract.pre_processing.filter_transform.filter(dataframe: DataFrame, condition: str) DataFrame ¶
Filters DataFrame’s rows using the given condition and value.
- Parameters:
dataframe – Spark DataFrame.
condition – SQL expression with column, operation and value to filter the dataframe.
- Returns:
Filtered dataframe
Forward Fill Transform for dataframes.
- butterfree.extract.pre_processing.forward_fill_transform.forward_fill(dataframe: DataFrame, partition_by: Union[str, List[str]], order_by: Union[str, List[str]], fill_column: str, filled_column: Optional[str] = None) DataFrame ¶
Applies a forward fill to a single column.
Filling null values with the last known non-null value, leaving leading nulls alone.
- butterfree.extract.pre_processing.forward_fill_transform.dataframe¶
dataframe to be transformed.
- butterfree.extract.pre_processing.forward_fill_transform.partition_by¶
list of columns’ names to be used as partition for the operation.
- butterfree.extract.pre_processing.forward_fill_transform.order_by¶
list of columns’ names to be used when sorting column values.
- butterfree.extract.pre_processing.forward_fill_transform.fill_column¶
column to be forward filled.
- butterfree.extract.pre_processing.forward_fill_transform.filled_column¶
new column name. Optional. When none, operation will be inplace.
Example
>>> dataframe.orderBy("ts", "sensor_type", "location").show() +-----------+-------------------+--------+-----------+ |sensor_type| ts|location|temperature| +-----------+-------------------+--------+-----------+ | 1|2017-09-09 12:00:00| shade| 18.83018| | 1|2017-09-09 12:00:00| sun| null| | 2|2017-09-09 12:00:00| shade| 18.61258| | 2|2017-09-09 12:00:00| sun| 25.4986| | 1|2017-09-09 13:00:00| shade| 18.78458| | 1|2017-09-09 13:00:00| sun| 25.68457| | 2|2017-09-09 13:00:00| shade| null| | 2|2017-09-09 13:00:00| sun| null| | 1|2017-09-09 14:00:00| shade| 17.98115| | 1|2017-09-09 14:00:00| sun| 24.15754| | 2|2017-09-09 14:00:00| shade| 18.61258| | 2|2017-09-09 14:00:00| sun| null| +-----------+-------------------+--------+-----------+
>>> filled_df = forward_fill( ... dataframe, ... ["sensor_type", "location"], ... "ts", ... "temperature", ... "temperature_filled" ... ) >>> filled_df.orderBy("ts", "sensor_type", "location").show() +-----------+-------------------+--------+-----------+------------------+ |sensor_type| ts|location|temperature|temperature_filled| +-----------+-------------------+--------+-----------+------------------+ | 1|2017-09-09 12:00:00| shade| 18.83018| 18.83018| | 1|2017-09-09 12:00:00| sun| null| null| | 2|2017-09-09 12:00:00| shade| 18.61258| 18.61258| | 2|2017-09-09 12:00:00| sun| 25.4986| 25.4986| | 1|2017-09-09 13:00:00| shade| 18.78458| 18.78458| | 1|2017-09-09 13:00:00| sun| 25.68457| 25.68457| | 2|2017-09-09 13:00:00| shade| null| 18.61258| | 2|2017-09-09 13:00:00| sun| null| 25.4986| | 1|2017-09-09 14:00:00| shade| 17.98115| 17.98115| | 1|2017-09-09 14:00:00| sun| 24.15754| 24.15754| | 2|2017-09-09 14:00:00| shade| 18.61258| 18.61258| | 2|2017-09-09 14:00:00| sun| null| 25.4986| +-----------+-------------------+--------+-----------+------------------+ >>> # inplace forward fill >>> filled_df = forward_fill( ... dataframe, ... ["sensor_type", "location"], ... "ts", ... "temperature" ... ) >>> filled_df.orderBy("ts", "sensor_type", "location").show() +-----------+-------------------+--------+-----------+ |sensor_type| ts|location|temperature| +-----------+-------------------+--------+-----------+ | 1|2017-09-09 12:00:00| shade| 18.83018| | 1|2017-09-09 12:00:00| sun| null| | 2|2017-09-09 12:00:00| shade| 18.61258| | 2|2017-09-09 12:00:00| sun| 25.4986| | 1|2017-09-09 13:00:00| shade| 18.78458| | 1|2017-09-09 13:00:00| sun| 25.68457| | 2|2017-09-09 13:00:00| shade| 18.61258| | 2|2017-09-09 13:00:00| sun| 25.4986| | 1|2017-09-09 14:00:00| shade| 17.98115| | 1|2017-09-09 14:00:00| sun| 24.15754| | 2|2017-09-09 14:00:00| shade| 18.61258| | 2|2017-09-09 14:00:00| sun| 25.4986| +-----------+-------------------+--------+-----------+
Pivot Transform for dataframes.
- butterfree.extract.pre_processing.pivot_transform.pivot(dataframe: DataFrame, group_by_columns: List[str], pivot_column: str, agg_column: str, aggregation: Callable, mock_value: Optional[Union[float, str]] = None, mock_type: Optional[Union[DataType, str]] = None, with_forward_fill: bool = False) DataFrame ¶
Defines a pivot transformation.
- butterfree.extract.pre_processing.pivot_transform.dataframe¶
dataframe to be pivoted.
- butterfree.extract.pre_processing.pivot_transform.group_by_columns¶
list of columns’ names to be grouped.
- butterfree.extract.pre_processing.pivot_transform.pivot_column¶
column to be pivoted.
- butterfree.extract.pre_processing.pivot_transform.agg_column¶
column to be aggregated by pivoted category.
- butterfree.extract.pre_processing.pivot_transform.aggregation¶
desired spark aggregation function to be performed. An example: spark_agg(col_name). See docs for all spark_agg: https://spark.apache.org/docs/2.3.1/api/python/_modules/pyspark/sql/functions.html
- butterfree.extract.pre_processing.pivot_transform.mock_value¶
value used to make a difference between true nulls resulting from the aggregation and empty values from the pivot transformation.
- butterfree.extract.pre_processing.pivot_transform.mock_type¶
mock_value data type (compatible with spark).
- butterfree.extract.pre_processing.pivot_transform.with_forward_fill¶
applies a forward fill to null values after the pivot operation.
Example
>>> dataframe.orderBy("ts", "id", "amenity").show() +---+---+-------+-----+ | id| ts|amenity| has| +---+---+-------+-----+ | 1| 1| fridge|false| | 1| 1| oven| true| | 1| 1| pool|false| | 2| 2|balcony|false| | 1| 3|balcony| null| | 1| 4| oven| null| | 1| 4| pool| true| | 1| 5|balcony| true| +---+---+-------+-----+
>>> pivoted = pivot(dataframe, ["id", "ts"], "amenity", "has", functions.first) >>> pivoted.orderBy("ts", "id").show() +---+---+-------+------+----+-----+ | id| ts|balcony|fridge|oven| pool| +---+---+-------+------+----+-----+ | 1| 1| null| false|true|false| | 2| 2| false| null|null| null| | 1| 3| null| null|null| null| | 1| 4| null| null|null| true| | 1| 5| true| null|null| null| +---+---+-------+------+----+-----+
But, sometimes, you would like to keep the last values that some feature has assumed from previous modifications. In this example, amenity “oven” for the id=1 was set to null and “pool” was set to true at ts=4. All other amenities should then be kept to their actual state at that ts. To do that, we will use a technique called forward fill:
>>> pivoted = pivot( ... dataframe, ... ["id", "ts"], ... "amenity", ... "has", ... functions.first, ... with_forward_fill=True ...) >>> pivoted.orderBy("ts", "id").show() +---+---+-------+------+----+-----+ | id| ts|balcony|fridge|oven| pool| +---+---+-------+------+----+-----+ | 1| 1| null| false|true|false| | 2| 2| false| null|null| null| | 1| 3| null| false|true|false| | 1| 4| null| false|true| true| | 1| 5| true| false|true| true| +---+---+-------+------+----+-----+
Great! Now every amenity that didn’t have been changed kept it’s state. BUT, the force change to null for amenity “oven” on id=1 at ts=4 was ignored during forward fill. If the user wants to respect this change, it must provide a mock value and type to be used as a signal for “true nulls”. In other words, we want to forward fill only nulls that were created by the pivot transformation.
In this example, amenities only assume boolean values. So there is no mock values for a boolean. It is only true or false. So users can give a mock value of another type (for which the column can be cast to). Check this out:
>>> pivoted = pivot( ... dataframe, ... ["id", "ts"], ... "amenity", ... "has", ... functions.first, ... with_forward_fill=True, ... mock_value=-1, ... mock_type="int" ...) >>> pivoted.orderBy("ts", "id").show() +---+---+-------+------+----+-----+ | id| ts|balcony|fridge|oven| pool| +---+---+-------+------+----+-----+ | 1| 1| null| false|true|false| | 2| 2| false| null|null| null| | 1| 3| null| false|true|false| | 1| 4| null| false|null| true| | 1| 5| true| false|null| true| +---+---+-------+------+----+-----+
During transformation, this method will cast the agg_column to mock_type data type and fill all “true nulls” with the mock_value. After pivot and forward fill are applied, all new pivoted columns will then return to the original type with all mock values replaced by null.
Replace transformer for dataframes.
- butterfree.extract.pre_processing.replace_transform.replace(dataframe: DataFrame, column: str, replace_dict: Dict[str, str]) DataFrame ¶
Replace values of a string column in the dataframe using a dict.
Example:
>>> from butterfree.extract.pre_processing import replace ... from butterfree.testing.dataframe import ( ... assert_dataframe_equality, ... create_df_from_collection, ... ) >>> from pyspark import SparkContext >>> from pyspark.sql import session >>> spark_context = SparkContext.getOrCreate() >>> spark_session = session.SparkSession(spark_context) >>> input_data = [ ... {"id":1, "type": "a"}, {"id":2, "type": "b"}, {"id":3, "type": "c"} ... ] >>> input_df = create_df_from_collection(input_data, spark_context, spark_session) >>> input_df.collect()
[Row(id=1, type=’a’), Row(id=2, type=’b’), Row(id=3, type=’c’)]
>>> replace_dict = {"a": "type_a", "b": "type_b"} >>> replace(input_df, "type", replace_dict).collect()
[Row(id=1, type=’type_a’), Row(id=2, type=’type_b’), Row(id=3, type=’c’)]
- Parameters:
dataframe – data to be transformed.
column – string column on the dataframe where to apply the replace.
replace_dict – dict with values to be replaced. All mapped values must be string.
- Returns:
Dataframe with column values replaced.
Module contents¶
Pre Processing Components regarding Readers.
- butterfree.extract.pre_processing.explode_json_column(df: DataFrame, column: str, json_schema: StructType) DataFrame ¶
Create new columns extracting properties from a JSON column.
Example:
>>> from pyspark import SparkContext >>> from pyspark.sql import session >>> from butterfree.testing.dataframe import create_df_from_collection >>> from butterfree.extract.pre_processing import explode_json_column >>> from pyspark.sql.types import ( ... ArrayType, ... IntegerType, ... StringType, ... StructField, ... StructType, ... ) >>> spark_context = SparkContext.getOrCreate() >>> spark_session = session.SparkSession(spark_context) >>> data = [{"json_column": '{"a": 123, "b": "abc", "c": "123", "d": [1, 2, 3]}'}] >>> df = create_df_from_collection(data, spark_context, spark_session) >>> df.collect()
[Row(json_column=’{“a”: 123, “b”: “abc”, “c”: “123”, “d”: [1, 2, 3]}’)]
>>> json_column_schema = StructType( ... [ ... StructField("a", IntegerType()), ... StructField("b", StringType()), ... StructField("c", IntegerType()), ... StructField("d", ArrayType(IntegerType())), ... ] >>> explode_json_column( ... df, column='json_column', json_schema=json_column_schema ... ).collect()
- [
- Row(
json_column=’{“a”: 123, “b”: “abc”, “c”: “123”, “d”: [1, 2, 3]}’, a=123, b=’abc’, c=123, d=[1, 2, 3]
)
]
- Parameters:
df – input dataframe with the target JSON column.
column – column name that is going to be exploded.
json_schema – expected schema from that JSON column. Not all “first layer” fields need to be mapped in the json_schema, just the desired columns. If there is any JSON field that is needed to be cast to a struct, the declared expected schema (a StructType) need to have the exact same schema as the presented record, if don’t, the value in the resulting column will be null.
- Returns:
dataframe with the new extracted columns from the JSON column.
- butterfree.extract.pre_processing.filter(dataframe: DataFrame, condition: str) DataFrame ¶
Filters DataFrame’s rows using the given condition and value.
- Parameters:
dataframe – Spark DataFrame.
condition – SQL expression with column, operation and value to filter the dataframe.
- Returns:
Filtered dataframe
- butterfree.extract.pre_processing.forward_fill(dataframe: DataFrame, partition_by: Union[str, List[str]], order_by: Union[str, List[str]], fill_column: str, filled_column: Optional[str] = None) DataFrame ¶
Applies a forward fill to a single column.
Filling null values with the last known non-null value, leaving leading nulls alone.
- butterfree.extract.pre_processing.dataframe¶
dataframe to be transformed.
- butterfree.extract.pre_processing.partition_by¶
list of columns’ names to be used as partition for the operation.
- butterfree.extract.pre_processing.order_by¶
list of columns’ names to be used when sorting column values.
- butterfree.extract.pre_processing.fill_column¶
column to be forward filled.
- butterfree.extract.pre_processing.filled_column¶
new column name. Optional. When none, operation will be inplace.
Example
>>> dataframe.orderBy("ts", "sensor_type", "location").show() +-----------+-------------------+--------+-----------+ |sensor_type| ts|location|temperature| +-----------+-------------------+--------+-----------+ | 1|2017-09-09 12:00:00| shade| 18.83018| | 1|2017-09-09 12:00:00| sun| null| | 2|2017-09-09 12:00:00| shade| 18.61258| | 2|2017-09-09 12:00:00| sun| 25.4986| | 1|2017-09-09 13:00:00| shade| 18.78458| | 1|2017-09-09 13:00:00| sun| 25.68457| | 2|2017-09-09 13:00:00| shade| null| | 2|2017-09-09 13:00:00| sun| null| | 1|2017-09-09 14:00:00| shade| 17.98115| | 1|2017-09-09 14:00:00| sun| 24.15754| | 2|2017-09-09 14:00:00| shade| 18.61258| | 2|2017-09-09 14:00:00| sun| null| +-----------+-------------------+--------+-----------+
>>> filled_df = forward_fill( ... dataframe, ... ["sensor_type", "location"], ... "ts", ... "temperature", ... "temperature_filled" ... ) >>> filled_df.orderBy("ts", "sensor_type", "location").show() +-----------+-------------------+--------+-----------+------------------+ |sensor_type| ts|location|temperature|temperature_filled| +-----------+-------------------+--------+-----------+------------------+ | 1|2017-09-09 12:00:00| shade| 18.83018| 18.83018| | 1|2017-09-09 12:00:00| sun| null| null| | 2|2017-09-09 12:00:00| shade| 18.61258| 18.61258| | 2|2017-09-09 12:00:00| sun| 25.4986| 25.4986| | 1|2017-09-09 13:00:00| shade| 18.78458| 18.78458| | 1|2017-09-09 13:00:00| sun| 25.68457| 25.68457| | 2|2017-09-09 13:00:00| shade| null| 18.61258| | 2|2017-09-09 13:00:00| sun| null| 25.4986| | 1|2017-09-09 14:00:00| shade| 17.98115| 17.98115| | 1|2017-09-09 14:00:00| sun| 24.15754| 24.15754| | 2|2017-09-09 14:00:00| shade| 18.61258| 18.61258| | 2|2017-09-09 14:00:00| sun| null| 25.4986| +-----------+-------------------+--------+-----------+------------------+ >>> # inplace forward fill >>> filled_df = forward_fill( ... dataframe, ... ["sensor_type", "location"], ... "ts", ... "temperature" ... ) >>> filled_df.orderBy("ts", "sensor_type", "location").show() +-----------+-------------------+--------+-----------+ |sensor_type| ts|location|temperature| +-----------+-------------------+--------+-----------+ | 1|2017-09-09 12:00:00| shade| 18.83018| | 1|2017-09-09 12:00:00| sun| null| | 2|2017-09-09 12:00:00| shade| 18.61258| | 2|2017-09-09 12:00:00| sun| 25.4986| | 1|2017-09-09 13:00:00| shade| 18.78458| | 1|2017-09-09 13:00:00| sun| 25.68457| | 2|2017-09-09 13:00:00| shade| 18.61258| | 2|2017-09-09 13:00:00| sun| 25.4986| | 1|2017-09-09 14:00:00| shade| 17.98115| | 1|2017-09-09 14:00:00| sun| 24.15754| | 2|2017-09-09 14:00:00| shade| 18.61258| | 2|2017-09-09 14:00:00| sun| 25.4986| +-----------+-------------------+--------+-----------+
- butterfree.extract.pre_processing.pivot(dataframe: DataFrame, group_by_columns: List[str], pivot_column: str, agg_column: str, aggregation: Callable, mock_value: Optional[Union[float, str]] = None, mock_type: Optional[Union[DataType, str]] = None, with_forward_fill: bool = False) DataFrame ¶
Defines a pivot transformation.
- butterfree.extract.pre_processing.dataframe¶
dataframe to be pivoted.
- butterfree.extract.pre_processing.group_by_columns¶
list of columns’ names to be grouped.
- butterfree.extract.pre_processing.pivot_column¶
column to be pivoted.
- butterfree.extract.pre_processing.agg_column¶
column to be aggregated by pivoted category.
- butterfree.extract.pre_processing.aggregation¶
desired spark aggregation function to be performed. An example: spark_agg(col_name). See docs for all spark_agg: https://spark.apache.org/docs/2.3.1/api/python/_modules/pyspark/sql/functions.html
- butterfree.extract.pre_processing.mock_value¶
value used to make a difference between true nulls resulting from the aggregation and empty values from the pivot transformation.
- butterfree.extract.pre_processing.mock_type¶
mock_value data type (compatible with spark).
- butterfree.extract.pre_processing.with_forward_fill¶
applies a forward fill to null values after the pivot operation.
Example
>>> dataframe.orderBy("ts", "id", "amenity").show() +---+---+-------+-----+ | id| ts|amenity| has| +---+---+-------+-----+ | 1| 1| fridge|false| | 1| 1| oven| true| | 1| 1| pool|false| | 2| 2|balcony|false| | 1| 3|balcony| null| | 1| 4| oven| null| | 1| 4| pool| true| | 1| 5|balcony| true| +---+---+-------+-----+
>>> pivoted = pivot(dataframe, ["id", "ts"], "amenity", "has", functions.first) >>> pivoted.orderBy("ts", "id").show() +---+---+-------+------+----+-----+ | id| ts|balcony|fridge|oven| pool| +---+---+-------+------+----+-----+ | 1| 1| null| false|true|false| | 2| 2| false| null|null| null| | 1| 3| null| null|null| null| | 1| 4| null| null|null| true| | 1| 5| true| null|null| null| +---+---+-------+------+----+-----+
But, sometimes, you would like to keep the last values that some feature has assumed from previous modifications. In this example, amenity “oven” for the id=1 was set to null and “pool” was set to true at ts=4. All other amenities should then be kept to their actual state at that ts. To do that, we will use a technique called forward fill:
>>> pivoted = pivot( ... dataframe, ... ["id", "ts"], ... "amenity", ... "has", ... functions.first, ... with_forward_fill=True ...) >>> pivoted.orderBy("ts", "id").show() +---+---+-------+------+----+-----+ | id| ts|balcony|fridge|oven| pool| +---+---+-------+------+----+-----+ | 1| 1| null| false|true|false| | 2| 2| false| null|null| null| | 1| 3| null| false|true|false| | 1| 4| null| false|true| true| | 1| 5| true| false|true| true| +---+---+-------+------+----+-----+
Great! Now every amenity that didn’t have been changed kept it’s state. BUT, the force change to null for amenity “oven” on id=1 at ts=4 was ignored during forward fill. If the user wants to respect this change, it must provide a mock value and type to be used as a signal for “true nulls”. In other words, we want to forward fill only nulls that were created by the pivot transformation.
In this example, amenities only assume boolean values. So there is no mock values for a boolean. It is only true or false. So users can give a mock value of another type (for which the column can be cast to). Check this out:
>>> pivoted = pivot( ... dataframe, ... ["id", "ts"], ... "amenity", ... "has", ... functions.first, ... with_forward_fill=True, ... mock_value=-1, ... mock_type="int" ...) >>> pivoted.orderBy("ts", "id").show() +---+---+-------+------+----+-----+ | id| ts|balcony|fridge|oven| pool| +---+---+-------+------+----+-----+ | 1| 1| null| false|true|false| | 2| 2| false| null|null| null| | 1| 3| null| false|true|false| | 1| 4| null| false|null| true| | 1| 5| true| false|null| true| +---+---+-------+------+----+-----+
During transformation, this method will cast the agg_column to mock_type data type and fill all “true nulls” with the mock_value. After pivot and forward fill are applied, all new pivoted columns will then return to the original type with all mock values replaced by null.
- butterfree.extract.pre_processing.replace(dataframe: DataFrame, column: str, replace_dict: Dict[str, str]) DataFrame ¶
Replace values of a string column in the dataframe using a dict.
Example:
>>> from butterfree.extract.pre_processing import replace ... from butterfree.testing.dataframe import ( ... assert_dataframe_equality, ... create_df_from_collection, ... ) >>> from pyspark import SparkContext >>> from pyspark.sql import session >>> spark_context = SparkContext.getOrCreate() >>> spark_session = session.SparkSession(spark_context) >>> input_data = [ ... {"id":1, "type": "a"}, {"id":2, "type": "b"}, {"id":3, "type": "c"} ... ] >>> input_df = create_df_from_collection(input_data, spark_context, spark_session) >>> input_df.collect()
[Row(id=1, type=’a’), Row(id=2, type=’b’), Row(id=3, type=’c’)]
>>> replace_dict = {"a": "type_a", "b": "type_b"} >>> replace(input_df, "type", replace_dict).collect()
[Row(id=1, type=’type_a’), Row(id=2, type=’type_b’), Row(id=3, type=’c’)]
- Parameters:
dataframe – data to be transformed.
column – string column on the dataframe where to apply the replace.
replace_dict – dict with values to be replaced. All mapped values must be string.
- Returns:
Dataframe with column values replaced.