butterfree.extract.pre_processing package

Submodules

Explode json column for dataframes.

butterfree.extract.pre_processing.explode_json_column_transform.explode_json_column(df: pyspark.sql.dataframe.DataFrame, column: str, json_schema: pyspark.sql.types.StructType) → pyspark.sql.dataframe.DataFrame

Create new columns extracting properties from a JSON column.

Example:

>>> from pyspark import SparkContext
>>> from pyspark.sql import session
>>> from butterfree.testing.dataframe import create_df_from_collection
>>> from butterfree.extract.pre_processing import explode_json_column
>>> from pyspark.sql.types import (
...     ArrayType,
...     IntegerType,
...     StringType,
...     StructField,
...     StructType,
... )
>>> spark_context = SparkContext.getOrCreate()
>>> spark_session = session.SparkSession(spark_context)
>>> data = [{"json_column": '{"a": 123, "b": "abc", "c": "123", "d": [1, 2, 3]}'}]
>>> df = create_df_from_collection(data, spark_context, spark_session)
>>> df.collect()

[Row(json_column=’{“a”: 123, “b”: “abc”, “c”: “123”, “d”: [1, 2, 3]}’)]

>>> json_column_schema = StructType(
... [
...    StructField("a", IntegerType()),
...    StructField("b", StringType()),
...    StructField("c", IntegerType()),
...    StructField("d", ArrayType(IntegerType())),
... ]
>>> explode_json_column(
...     df, column='json_column', json_schema=json_column_schema
... ).collect()
[
Row(

json_column=’{“a”: 123, “b”: “abc”, “c”: “123”, “d”: [1, 2, 3]}’, a=123, b=’abc’, c=123, d=[1, 2, 3]

)

]

Parameters
  • df – input dataframe with the target JSON column.

  • column – column name that is going to be exploded.

  • json_schema – expected schema from that JSON column. Not all “first layer” fields need to be mapped in the json_schema, just the desired columns. If there is any JSON field that is needed to be cast to a struct, the declared expected schema (a StructType) need to have the exact same schema as the presented record, if don’t, the value in the resulting column will be null.

Returns

dataframe with the new extracted columns from the JSON column.

Module where filter DataFrames coming from readers.

butterfree.extract.pre_processing.filter_transform.filter(dataframe: pyspark.sql.dataframe.DataFrame, condition)

Filters DataFrame’s rows using the given condition and value.

Parameters
  • dataframe – Spark DataFrame.

  • condition – SQL expression with column, operation and value to filter the dataframe.

Returns

Filtered dataframe

Forward Fill Transform for dataframes.

butterfree.extract.pre_processing.forward_fill_transform.forward_fill(dataframe: pyspark.sql.dataframe.DataFrame, partition_by: parameters_validation.parameter_validation_decorator.parameter_validation.<locals>.func_partial.<locals>.validation_partial, order_by: parameters_validation.parameter_validation_decorator.parameter_validation.<locals>.func_partial.<locals>.validation_partial, fill_column: parameters_validation.parameter_validation_decorator.parameter_validation.<locals>.func_partial.<locals>.validation_partial, filled_column: parameters_validation.parameter_validation_decorator.parameter_validation.<locals>.func_partial.<locals>.validation_partial = None)

Applies a forward fill to a single column.

Filling null values with the last known non-null value, leaving leading nulls alone.

butterfree.extract.pre_processing.forward_fill_transform.dataframe

dataframe to be transformed.

butterfree.extract.pre_processing.forward_fill_transform.partition_by

list of columns’ names to be used as partition for the operation.

butterfree.extract.pre_processing.forward_fill_transform.order_by

list of columns’ names to be used when sorting column values.

butterfree.extract.pre_processing.forward_fill_transform.fill_column

column to be forward filled.

butterfree.extract.pre_processing.forward_fill_transform.filled_column

new column name. Optional. When none, operation will be inplace.

Example

>>> dataframe.orderBy("ts", "sensor_type", "location").show()
+-----------+-------------------+--------+-----------+
|sensor_type|                 ts|location|temperature|
+-----------+-------------------+--------+-----------+
|          1|2017-09-09 12:00:00|   shade|   18.83018|
|          1|2017-09-09 12:00:00|     sun|       null|
|          2|2017-09-09 12:00:00|   shade|   18.61258|
|          2|2017-09-09 12:00:00|     sun|    25.4986|
|          1|2017-09-09 13:00:00|   shade|   18.78458|
|          1|2017-09-09 13:00:00|     sun|   25.68457|
|          2|2017-09-09 13:00:00|   shade|       null|
|          2|2017-09-09 13:00:00|     sun|       null|
|          1|2017-09-09 14:00:00|   shade|   17.98115|
|          1|2017-09-09 14:00:00|     sun|   24.15754|
|          2|2017-09-09 14:00:00|   shade|   18.61258|
|          2|2017-09-09 14:00:00|     sun|       null|
+-----------+-------------------+--------+-----------+
>>> filled_df = forward_fill(
...     dataframe,
...     ["sensor_type", "location"],
...     "ts",
...     "temperature",
...     "temperature_filled"
... )
>>> filled_df.orderBy("ts", "sensor_type", "location").show()
+-----------+-------------------+--------+-----------+------------------+
|sensor_type|                 ts|location|temperature|temperature_filled|
+-----------+-------------------+--------+-----------+------------------+
|          1|2017-09-09 12:00:00|   shade|   18.83018|          18.83018|
|          1|2017-09-09 12:00:00|     sun|       null|              null|
|          2|2017-09-09 12:00:00|   shade|   18.61258|          18.61258|
|          2|2017-09-09 12:00:00|     sun|    25.4986|           25.4986|
|          1|2017-09-09 13:00:00|   shade|   18.78458|          18.78458|
|          1|2017-09-09 13:00:00|     sun|   25.68457|          25.68457|
|          2|2017-09-09 13:00:00|   shade|       null|          18.61258|
|          2|2017-09-09 13:00:00|     sun|       null|           25.4986|
|          1|2017-09-09 14:00:00|   shade|   17.98115|          17.98115|
|          1|2017-09-09 14:00:00|     sun|   24.15754|          24.15754|
|          2|2017-09-09 14:00:00|   shade|   18.61258|          18.61258|
|          2|2017-09-09 14:00:00|     sun|       null|           25.4986|
+-----------+-------------------+--------+-----------+------------------+
>>> # inplace forward fill
>>> filled_df = forward_fill(
...     dataframe,
...     ["sensor_type", "location"],
...     "ts",
...     "temperature"
... )
>>> filled_df.orderBy("ts", "sensor_type", "location").show()
+-----------+-------------------+--------+-----------+
|sensor_type|                 ts|location|temperature|
+-----------+-------------------+--------+-----------+
|          1|2017-09-09 12:00:00|   shade|   18.83018|
|          1|2017-09-09 12:00:00|     sun|       null|
|          2|2017-09-09 12:00:00|   shade|   18.61258|
|          2|2017-09-09 12:00:00|     sun|    25.4986|
|          1|2017-09-09 13:00:00|   shade|   18.78458|
|          1|2017-09-09 13:00:00|     sun|   25.68457|
|          2|2017-09-09 13:00:00|   shade|   18.61258|
|          2|2017-09-09 13:00:00|     sun|    25.4986|
|          1|2017-09-09 14:00:00|   shade|   17.98115|
|          1|2017-09-09 14:00:00|     sun|   24.15754|
|          2|2017-09-09 14:00:00|   shade|   18.61258|
|          2|2017-09-09 14:00:00|     sun|    25.4986|
+-----------+-------------------+--------+-----------+

Pivot Transform for dataframes.

butterfree.extract.pre_processing.pivot_transform.pivot(dataframe: pyspark.sql.dataframe.DataFrame, group_by_columns: parameters_validation.parameter_validation_decorator.parameter_validation.<locals>.func_partial.<locals>.validation_partial, pivot_column: parameters_validation.parameter_validation_decorator.parameter_validation.<locals>.func_partial.<locals>.validation_partial, agg_column: parameters_validation.parameter_validation_decorator.parameter_validation.<locals>.func_partial.<locals>.validation_partial, aggregation: parameters_validation.parameter_validation_decorator.parameter_validation.<locals>.func_partial.<locals>.validation_partial, mock_value: parameters_validation.parameter_validation_decorator.parameter_validation.<locals>.func_partial.<locals>.validation_partial = None, mock_type: parameters_validation.parameter_validation_decorator.parameter_validation.<locals>.func_partial.<locals>.validation_partial = None, with_forward_fill: parameters_validation.parameter_validation_decorator.parameter_validation.<locals>.func_partial.<locals>.validation_partial = False)

Defines a pivot transformation.

butterfree.extract.pre_processing.pivot_transform.dataframe

dataframe to be pivoted.

butterfree.extract.pre_processing.pivot_transform.group_by_columns

list of columns’ names to be grouped.

butterfree.extract.pre_processing.pivot_transform.pivot_column

column to be pivoted.

butterfree.extract.pre_processing.pivot_transform.agg_column

column to be aggregated by pivoted category.

butterfree.extract.pre_processing.pivot_transform.aggregation

desired spark aggregation function to be performed. An example: spark_agg(col_name). See docs for all spark_agg: https://spark.apache.org/docs/2.3.1/api/python/_modules/pyspark/sql/functions.html

butterfree.extract.pre_processing.pivot_transform.mock_value

value used to make a difference between true nulls resulting from the aggregation and empty values from the pivot transformation.

butterfree.extract.pre_processing.pivot_transform.mock_type

mock_value data type (compatible with spark).

butterfree.extract.pre_processing.pivot_transform.with_forward_fill

applies a forward fill to null values after the pivot operation.

Example

>>> dataframe.orderBy("ts", "id", "amenity").show()
+---+---+-------+-----+
| id| ts|amenity|  has|
+---+---+-------+-----+
|  1|  1| fridge|false|
|  1|  1|   oven| true|
|  1|  1|   pool|false|
|  2|  2|balcony|false|
|  1|  3|balcony| null|
|  1|  4|   oven| null|
|  1|  4|   pool| true|
|  1|  5|balcony| true|
+---+---+-------+-----+
>>> pivoted = pivot(dataframe, ["id", "ts"], "amenity", "has", functions.first)
>>> pivoted.orderBy("ts", "id").show()
+---+---+-------+------+----+-----+
| id| ts|balcony|fridge|oven| pool|
+---+---+-------+------+----+-----+
|  1|  1|   null| false|true|false|
|  2|  2|  false|  null|null| null|
|  1|  3|   null|  null|null| null|
|  1|  4|   null|  null|null| true|
|  1|  5|   true|  null|null| null|
+---+---+-------+------+----+-----+

But, sometimes, you would like to keep the last values that some feature has assumed from previous modifications. In this example, amenity “oven” for the id=1 was set to null and “pool” was set to true at ts=4. All other amenities should then be kept to their actual state at that ts. To do that, we will use a technique called forward fill:

>>> pivoted = pivot(
...     dataframe,
...     ["id", "ts"],
...     "amenity",
...     "has",
...     functions.first,
...     with_forward_fill=True
...)
>>> pivoted.orderBy("ts", "id").show()
+---+---+-------+------+----+-----+
| id| ts|balcony|fridge|oven| pool|
+---+---+-------+------+----+-----+
|  1|  1|   null| false|true|false|
|  2|  2|  false|  null|null| null|
|  1|  3|   null| false|true|false|
|  1|  4|   null| false|true| true|
|  1|  5|   true| false|true| true|
+---+---+-------+------+----+-----+

Great! Now every amenity that didn’t have been changed kept it’s state. BUT, the force change to null for amenity “oven” on id=1 at ts=4 was ignored during forward fill. If the user wants to respect this change, it must provide a mock value and type to be used as a signal for “true nulls”. In other words, we want to forward fill only nulls that were created by the pivot transformation.

In this example, amenities only assume boolean values. So there is no mock values for a boolean. It is only true or false. So users can give a mock value of another type (for which the column can be cast to). Check this out:

>>> pivoted = pivot(
...     dataframe,
...     ["id", "ts"],
...     "amenity",
...     "has",
...     functions.first,
...     with_forward_fill=True,
...     mock_value=-1,
...     mock_type="int"
...)
>>> pivoted.orderBy("ts", "id").show()
+---+---+-------+------+----+-----+
| id| ts|balcony|fridge|oven| pool|
+---+---+-------+------+----+-----+
|  1|  1|   null| false|true|false|
|  2|  2|  false|  null|null| null|
|  1|  3|   null| false|true|false|
|  1|  4|   null| false|null| true|
|  1|  5|   true| false|null| true|
+---+---+-------+------+----+-----+

During transformation, this method will cast the agg_column to mock_type data type and fill all “true nulls” with the mock_value. After pivot and forward fill are applied, all new pivoted columns will then return to the original type with all mock values replaced by null.

Replace transformer for dataframes.

butterfree.extract.pre_processing.replace_transform.replace(dataframe: pyspark.sql.dataframe.DataFrame, column, replace_dict)

Replace values of a string column in the dataframe using a dict.

Example:

>>> from butterfree.extract.pre_processing import replace
... from butterfree.testing.dataframe import (
...     assert_dataframe_equality,
...     create_df_from_collection,
... )
>>> from pyspark import SparkContext
>>> from pyspark.sql import session
>>> spark_context = SparkContext.getOrCreate()
>>> spark_session = session.SparkSession(spark_context)
>>> input_data = [
...     {"id":1, "type": "a"}, {"id":2, "type": "b"}, {"id":3, "type": "c"}
... ]
>>> input_df = create_df_from_collection(input_data, spark_context, spark_session)
>>> input_df.collect()

[Row(id=1, type=’a’), Row(id=2, type=’b’), Row(id=3, type=’c’)]

>>> replace_dict = {"a": "type_a", "b": "type_b"}
>>> replace(input_df, "type", replace_dict).collect()

[Row(id=1, type=’type_a’), Row(id=2, type=’type_b’), Row(id=3, type=’c’)]

Parameters
  • dataframe – data to be transformed.

  • column – string column on the dataframe where to apply the replace.

  • replace_dict – dict with values to be replaced. All mapped values must be string.

Returns

Dataframe with column values replaced.

Module contents

Pre Processing Components regarding Readers.

butterfree.extract.pre_processing.explode_json_column(df: pyspark.sql.dataframe.DataFrame, column: str, json_schema: pyspark.sql.types.StructType) → pyspark.sql.dataframe.DataFrame

Create new columns extracting properties from a JSON column.

Example:

>>> from pyspark import SparkContext
>>> from pyspark.sql import session
>>> from butterfree.testing.dataframe import create_df_from_collection
>>> from butterfree.extract.pre_processing import explode_json_column
>>> from pyspark.sql.types import (
...     ArrayType,
...     IntegerType,
...     StringType,
...     StructField,
...     StructType,
... )
>>> spark_context = SparkContext.getOrCreate()
>>> spark_session = session.SparkSession(spark_context)
>>> data = [{"json_column": '{"a": 123, "b": "abc", "c": "123", "d": [1, 2, 3]}'}]
>>> df = create_df_from_collection(data, spark_context, spark_session)
>>> df.collect()

[Row(json_column=’{“a”: 123, “b”: “abc”, “c”: “123”, “d”: [1, 2, 3]}’)]

>>> json_column_schema = StructType(
... [
...    StructField("a", IntegerType()),
...    StructField("b", StringType()),
...    StructField("c", IntegerType()),
...    StructField("d", ArrayType(IntegerType())),
... ]
>>> explode_json_column(
...     df, column='json_column', json_schema=json_column_schema
... ).collect()
[
Row(

json_column=’{“a”: 123, “b”: “abc”, “c”: “123”, “d”: [1, 2, 3]}’, a=123, b=’abc’, c=123, d=[1, 2, 3]

)

]

Parameters
  • df – input dataframe with the target JSON column.

  • column – column name that is going to be exploded.

  • json_schema – expected schema from that JSON column. Not all “first layer” fields need to be mapped in the json_schema, just the desired columns. If there is any JSON field that is needed to be cast to a struct, the declared expected schema (a StructType) need to have the exact same schema as the presented record, if don’t, the value in the resulting column will be null.

Returns

dataframe with the new extracted columns from the JSON column.

butterfree.extract.pre_processing.filter(dataframe: pyspark.sql.dataframe.DataFrame, condition)

Filters DataFrame’s rows using the given condition and value.

Parameters
  • dataframe – Spark DataFrame.

  • condition – SQL expression with column, operation and value to filter the dataframe.

Returns

Filtered dataframe

butterfree.extract.pre_processing.forward_fill(dataframe: pyspark.sql.dataframe.DataFrame, partition_by: parameters_validation.parameter_validation_decorator.parameter_validation.<locals>.func_partial.<locals>.validation_partial, order_by: parameters_validation.parameter_validation_decorator.parameter_validation.<locals>.func_partial.<locals>.validation_partial, fill_column: parameters_validation.parameter_validation_decorator.parameter_validation.<locals>.func_partial.<locals>.validation_partial, filled_column: parameters_validation.parameter_validation_decorator.parameter_validation.<locals>.func_partial.<locals>.validation_partial = None)

Applies a forward fill to a single column.

Filling null values with the last known non-null value, leaving leading nulls alone.

butterfree.extract.pre_processing.dataframe

dataframe to be transformed.

butterfree.extract.pre_processing.partition_by

list of columns’ names to be used as partition for the operation.

butterfree.extract.pre_processing.order_by

list of columns’ names to be used when sorting column values.

butterfree.extract.pre_processing.fill_column

column to be forward filled.

butterfree.extract.pre_processing.filled_column

new column name. Optional. When none, operation will be inplace.

Example

>>> dataframe.orderBy("ts", "sensor_type", "location").show()
+-----------+-------------------+--------+-----------+
|sensor_type|                 ts|location|temperature|
+-----------+-------------------+--------+-----------+
|          1|2017-09-09 12:00:00|   shade|   18.83018|
|          1|2017-09-09 12:00:00|     sun|       null|
|          2|2017-09-09 12:00:00|   shade|   18.61258|
|          2|2017-09-09 12:00:00|     sun|    25.4986|
|          1|2017-09-09 13:00:00|   shade|   18.78458|
|          1|2017-09-09 13:00:00|     sun|   25.68457|
|          2|2017-09-09 13:00:00|   shade|       null|
|          2|2017-09-09 13:00:00|     sun|       null|
|          1|2017-09-09 14:00:00|   shade|   17.98115|
|          1|2017-09-09 14:00:00|     sun|   24.15754|
|          2|2017-09-09 14:00:00|   shade|   18.61258|
|          2|2017-09-09 14:00:00|     sun|       null|
+-----------+-------------------+--------+-----------+
>>> filled_df = forward_fill(
...     dataframe,
...     ["sensor_type", "location"],
...     "ts",
...     "temperature",
...     "temperature_filled"
... )
>>> filled_df.orderBy("ts", "sensor_type", "location").show()
+-----------+-------------------+--------+-----------+------------------+
|sensor_type|                 ts|location|temperature|temperature_filled|
+-----------+-------------------+--------+-----------+------------------+
|          1|2017-09-09 12:00:00|   shade|   18.83018|          18.83018|
|          1|2017-09-09 12:00:00|     sun|       null|              null|
|          2|2017-09-09 12:00:00|   shade|   18.61258|          18.61258|
|          2|2017-09-09 12:00:00|     sun|    25.4986|           25.4986|
|          1|2017-09-09 13:00:00|   shade|   18.78458|          18.78458|
|          1|2017-09-09 13:00:00|     sun|   25.68457|          25.68457|
|          2|2017-09-09 13:00:00|   shade|       null|          18.61258|
|          2|2017-09-09 13:00:00|     sun|       null|           25.4986|
|          1|2017-09-09 14:00:00|   shade|   17.98115|          17.98115|
|          1|2017-09-09 14:00:00|     sun|   24.15754|          24.15754|
|          2|2017-09-09 14:00:00|   shade|   18.61258|          18.61258|
|          2|2017-09-09 14:00:00|     sun|       null|           25.4986|
+-----------+-------------------+--------+-----------+------------------+
>>> # inplace forward fill
>>> filled_df = forward_fill(
...     dataframe,
...     ["sensor_type", "location"],
...     "ts",
...     "temperature"
... )
>>> filled_df.orderBy("ts", "sensor_type", "location").show()
+-----------+-------------------+--------+-----------+
|sensor_type|                 ts|location|temperature|
+-----------+-------------------+--------+-----------+
|          1|2017-09-09 12:00:00|   shade|   18.83018|
|          1|2017-09-09 12:00:00|     sun|       null|
|          2|2017-09-09 12:00:00|   shade|   18.61258|
|          2|2017-09-09 12:00:00|     sun|    25.4986|
|          1|2017-09-09 13:00:00|   shade|   18.78458|
|          1|2017-09-09 13:00:00|     sun|   25.68457|
|          2|2017-09-09 13:00:00|   shade|   18.61258|
|          2|2017-09-09 13:00:00|     sun|    25.4986|
|          1|2017-09-09 14:00:00|   shade|   17.98115|
|          1|2017-09-09 14:00:00|     sun|   24.15754|
|          2|2017-09-09 14:00:00|   shade|   18.61258|
|          2|2017-09-09 14:00:00|     sun|    25.4986|
+-----------+-------------------+--------+-----------+
butterfree.extract.pre_processing.pivot(dataframe: pyspark.sql.dataframe.DataFrame, group_by_columns: parameters_validation.parameter_validation_decorator.parameter_validation.<locals>.func_partial.<locals>.validation_partial, pivot_column: parameters_validation.parameter_validation_decorator.parameter_validation.<locals>.func_partial.<locals>.validation_partial, agg_column: parameters_validation.parameter_validation_decorator.parameter_validation.<locals>.func_partial.<locals>.validation_partial, aggregation: parameters_validation.parameter_validation_decorator.parameter_validation.<locals>.func_partial.<locals>.validation_partial, mock_value: parameters_validation.parameter_validation_decorator.parameter_validation.<locals>.func_partial.<locals>.validation_partial = None, mock_type: parameters_validation.parameter_validation_decorator.parameter_validation.<locals>.func_partial.<locals>.validation_partial = None, with_forward_fill: parameters_validation.parameter_validation_decorator.parameter_validation.<locals>.func_partial.<locals>.validation_partial = False)

Defines a pivot transformation.

butterfree.extract.pre_processing.dataframe

dataframe to be pivoted.

butterfree.extract.pre_processing.group_by_columns

list of columns’ names to be grouped.

butterfree.extract.pre_processing.pivot_column

column to be pivoted.

butterfree.extract.pre_processing.agg_column

column to be aggregated by pivoted category.

butterfree.extract.pre_processing.aggregation

desired spark aggregation function to be performed. An example: spark_agg(col_name). See docs for all spark_agg: https://spark.apache.org/docs/2.3.1/api/python/_modules/pyspark/sql/functions.html

butterfree.extract.pre_processing.mock_value

value used to make a difference between true nulls resulting from the aggregation and empty values from the pivot transformation.

butterfree.extract.pre_processing.mock_type

mock_value data type (compatible with spark).

butterfree.extract.pre_processing.with_forward_fill

applies a forward fill to null values after the pivot operation.

Example

>>> dataframe.orderBy("ts", "id", "amenity").show()
+---+---+-------+-----+
| id| ts|amenity|  has|
+---+---+-------+-----+
|  1|  1| fridge|false|
|  1|  1|   oven| true|
|  1|  1|   pool|false|
|  2|  2|balcony|false|
|  1|  3|balcony| null|
|  1|  4|   oven| null|
|  1|  4|   pool| true|
|  1|  5|balcony| true|
+---+---+-------+-----+
>>> pivoted = pivot(dataframe, ["id", "ts"], "amenity", "has", functions.first)
>>> pivoted.orderBy("ts", "id").show()
+---+---+-------+------+----+-----+
| id| ts|balcony|fridge|oven| pool|
+---+---+-------+------+----+-----+
|  1|  1|   null| false|true|false|
|  2|  2|  false|  null|null| null|
|  1|  3|   null|  null|null| null|
|  1|  4|   null|  null|null| true|
|  1|  5|   true|  null|null| null|
+---+---+-------+------+----+-----+

But, sometimes, you would like to keep the last values that some feature has assumed from previous modifications. In this example, amenity “oven” for the id=1 was set to null and “pool” was set to true at ts=4. All other amenities should then be kept to their actual state at that ts. To do that, we will use a technique called forward fill:

>>> pivoted = pivot(
...     dataframe,
...     ["id", "ts"],
...     "amenity",
...     "has",
...     functions.first,
...     with_forward_fill=True
...)
>>> pivoted.orderBy("ts", "id").show()
+---+---+-------+------+----+-----+
| id| ts|balcony|fridge|oven| pool|
+---+---+-------+------+----+-----+
|  1|  1|   null| false|true|false|
|  2|  2|  false|  null|null| null|
|  1|  3|   null| false|true|false|
|  1|  4|   null| false|true| true|
|  1|  5|   true| false|true| true|
+---+---+-------+------+----+-----+

Great! Now every amenity that didn’t have been changed kept it’s state. BUT, the force change to null for amenity “oven” on id=1 at ts=4 was ignored during forward fill. If the user wants to respect this change, it must provide a mock value and type to be used as a signal for “true nulls”. In other words, we want to forward fill only nulls that were created by the pivot transformation.

In this example, amenities only assume boolean values. So there is no mock values for a boolean. It is only true or false. So users can give a mock value of another type (for which the column can be cast to). Check this out:

>>> pivoted = pivot(
...     dataframe,
...     ["id", "ts"],
...     "amenity",
...     "has",
...     functions.first,
...     with_forward_fill=True,
...     mock_value=-1,
...     mock_type="int"
...)
>>> pivoted.orderBy("ts", "id").show()
+---+---+-------+------+----+-----+
| id| ts|balcony|fridge|oven| pool|
+---+---+-------+------+----+-----+
|  1|  1|   null| false|true|false|
|  2|  2|  false|  null|null| null|
|  1|  3|   null| false|true|false|
|  1|  4|   null| false|null| true|
|  1|  5|   true| false|null| true|
+---+---+-------+------+----+-----+

During transformation, this method will cast the agg_column to mock_type data type and fill all “true nulls” with the mock_value. After pivot and forward fill are applied, all new pivoted columns will then return to the original type with all mock values replaced by null.

butterfree.extract.pre_processing.replace(dataframe: pyspark.sql.dataframe.DataFrame, column, replace_dict)

Replace values of a string column in the dataframe using a dict.

Example:

>>> from butterfree.extract.pre_processing import replace
... from butterfree.testing.dataframe import (
...     assert_dataframe_equality,
...     create_df_from_collection,
... )
>>> from pyspark import SparkContext
>>> from pyspark.sql import session
>>> spark_context = SparkContext.getOrCreate()
>>> spark_session = session.SparkSession(spark_context)
>>> input_data = [
...     {"id":1, "type": "a"}, {"id":2, "type": "b"}, {"id":3, "type": "c"}
... ]
>>> input_df = create_df_from_collection(input_data, spark_context, spark_session)
>>> input_df.collect()

[Row(id=1, type=’a’), Row(id=2, type=’b’), Row(id=3, type=’c’)]

>>> replace_dict = {"a": "type_a", "b": "type_b"}
>>> replace(input_df, "type", replace_dict).collect()

[Row(id=1, type=’type_a’), Row(id=2, type=’type_b’), Row(id=3, type=’c’)]

Parameters
  • dataframe – data to be transformed.

  • column – string column on the dataframe where to apply the replace.

  • replace_dict – dict with values to be replaced. All mapped values must be string.

Returns

Dataframe with column values replaced.