Setup Configuration

To perform a ETL pipeline, some configurations are needed. For example: credentials to connect to data sources and default paths. This chapter will cover the main setup configurations on Butterfree.

Some of the configurations are possible to set trough environment variables, the API or both. The priority is first trying to use the parameters used in the instantiation, with the environment variables as fallback. Check the following examples:

Kafka Reader

Configurations:

  • Kafka Consumer Connection String: connection string to use to connect to the Kafka.

Setup by instantiation:

from butterfree.extract.readers import KafkaReader

kafka_reader = KafkaReader(
    id="reader_id",
    topic="topic_name",
    value_schema=schema,
    connection_string="host1:port,host2:port"
)

Setup by environment variables:

Setup the following environment variable in the Spark Cluster/Locally: KAFKA_CONSUMER_CONNECTION_STRING

After setting this variables you can instantiate KafkaReader easier:

from butterfree.extract.readers import KafkaReader

kafka_reader = KafkaReader(
    id="reader_id",
    topic="topic_name",
    value_schema=schema
)

Online Feature Store

Cassandra Configuration

Configurations:

  • Cassandra Username: username to connect to CassandraDB.

  • Cassandra Password: password to connect to CassandraDB.

  • Cassandra Host: host to connect to CassandraDB.

  • Cassandra Keyspace: keyspace to connect to CassandraDB.

  • Stream Checkpoint Path: path on S3 to save the checkpoint for the streaming query. Only need when performing streaming writes.

Setup by instantiation:

from butterfree.configs.db import CassandraConfig
from butterfree.load.writers import OnlineFeatureStoreWriter

db_config  = CassandraConfig(
    username="username", 
    password="password",
    host="host",
    keyspace="keyspace",
    stream_checkpoint_path="path"
)
writer = OnlineFeatureStoreWriter(db_config=db_config)

Setup by environment variables:

Setup the following environment variables in the Spark Cluster/Locally: CASSANDRA_USERNAME, CASSANDRA_PASSWORD, CASSANDRA_HOST, CASSANDRA_KEYSPACE, STREAM_CHECKPOINT_PATH

After setting this variables you can instantiate CassandraConfig and OnlineFeatureStoreWriter easier:

from butterfree.configs.db import CassandraConfig
from butterfree.load.writers import OnlineFeatureStoreWriter

db_config  = CassandraConfig()
writer = OnlineFeatureStoreWriter(db_config=db_config)

# or just
writer = OnlineFeatureStoreWriter()

Kafka Configuration

Configurations:

  • Kafka topic: Kafka topic name.

  • Kafka connection string: string with hosts and ports to connect.

  • Stream Checkpoint Path: path on S3 to save the checkpoint for the streaming query. Only need when performing streaming writes.

Setup by instantiation:

from butterfree.configs.db import KafkaConfig
from butterfree.load.writers import OnlineFeatureStoreWriter

custom_kafka_config  = KafkaConfig(
    kafka_topic="custom_topic",
    kafka_connection_string="kafka_connection_string", 
    stream_checkpoint_path="path"
)
writer = OnlineFeatureStoreWriter(db_config=custom_kafka_config)

or

from butterfree.configs.db import KafkaConfig
from butterfree.load.writers import OnlineFeatureStoreWriter

kafka_config  = KafkaConfig(
    kafka_connection_string="kafka_connection_string", 
    stream_checkpoint_path="path"
)
writer = OnlineFeatureStoreWriter(db_config=kafka_config)

Setup by environment variables:

Setup the following environment variables in the Spark Cluster/Locally: KAFKA_CONSUMER_CONNECTION_STRING, STREAM_CHECKPOINT_PATH

After setting this variables you can instantiate KafkaConfig and OnlineFeatureStoreWriter easier:

from butterfree.configs.db import KafkaConfig
from butterfree.load.writers import OnlineFeatureStoreWriter

kafka_config  = KafkaConfig()
writer = OnlineFeatureStoreWriter(db_config=kafka_config)

Historical Feature Store (Spark Metastore and S3)

Configurations:

  • Feature Store S3 Bucket: Bucket on S3 to use for the Historical Feature Store.

  • Feature Store Historical Database: Database on Spark metastore to use to write the tables from Historical Feature Store.

Setup by instantiation:

from butterfree.configs.db import S3Config
from butterfree.load.writers import HistoricalFeatureStoreWriter

db_config  = S3Config(bucket="bucket")
writer = HistoricalFeatureStoreWriter(
    db_config=db_config,
    database="database"
)

Setup by environment variables:

Setup the following environment variables in the Spark Cluster/Locally: FEATURE_STORE_S3_BUCKET, FEATURE_STORE_HISTORICAL_DATABASE

After setting this variables you can instantiate S3Config and HistoricalFeatureStoreWriter easier:

from butterfree.configs.db import S3Config
from butterfree.load.writers import HistoricalFeatureStoreWriter

db_config  = S3Config()
writer = HistoricalFeatureStoreWriter(db_config=db_config)

# or just
writer = HistoricalFeatureStoreWriter()

Important Considerations

API information

For more detailed information on the API arguments is better to check their own docstrings.

Troubleshoot checks

  • Check if Spark cluster machines have all the permissions to read/write/modify on the Historical FeatureStore Bucket

  • Check if Spark cluster machines can reach the all the configured hosts

  • Check if the configured credentials have all the needed permissions on CassandraDB