Skip to content

Configuring Kafka

Aineko uses kafka under the hood for sending messages between nodes. As part of running Aineko locally, it's recommended to run a local kafka and zookeeper server using
poetry run aineko service start

To use a different kafka cluster, such as in deployment settings, Aineko allows for configuring of kafka parameters through environment variables. Typically, you would want to modify configuration for the consumer and producer to point to the desired cluster.

See below for default kafka configuration that ships with aineko and how to override them.

aineko.config

Configuration file for Aineko modules.

Kafka configuration can be set using the following environment variables:

KAFKA_CONFIG: JSON string with kafka configuration (see https://github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md for all options)

Additionally, the following environment variables can be used to specify certain configuration values. They correspond to configuration keys found in the above link, but with a prefix. For example, KAFKA_CONFIG_BOOTSTRAP_SERVERS corresponds to bootstrap.servers.

  • KAFKA_CONFIG_BOOTSTRAP_SERVERS (e.g. localhost:9092,localhost:9093)
  • KAFKA_CONFIG_SASL_USERNAME
  • KAFKA_CONFIG_SASL_PASSWORD
  • KAFKA_CONFIG_SECURITY_PROTOCOL
  • KAFKA_CONFIG_SASL_MECHANISM

DEFAULT_KAFKA_CONFIG

Bases: BaseConfig

Kafka configuration.

Source code in aineko/config.py
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
class DEFAULT_KAFKA_CONFIG(BaseConfig):
    """Kafka configuration."""

    # Default Kafka broker settings
    BROKER_CONFIG = {
        "bootstrap.servers": "localhost:9092",
    }

    kafka_config = os.environ.get("KAFKA_CONFIG", "{}")
    BROKER_CONFIG.update(json.loads(kafka_config))

    # Override these fields if set
    OVERRIDABLES = {
        "KAFKA_CONFIG_BOOTSTRAP_SERVERS": "bootstrap.servers",
        "KAFKA_CONFIG_SASL_USERNAME": "sasl.username",
        "KAFKA_CONFIG_SASL_PASSWORD": "sasl.password",
        "KAFKA_CONFIG_SECURITY_PROTOCOL": "security.protocol",
        "KAFKA_CONFIG_SASL_MECHANISM": "sasl.mechanism",
    }
    for env, config in OVERRIDABLES.items():
        value = os.environ.get(env)
        if value:
            BROKER_CONFIG[config] = value

    # Config for default kafka consumer
    CONSUMER_CONFIG: Dict[str, str] = {
        **BROKER_CONFIG,
        "auto.offset.reset": "earliest",
    }

    # Config for default kafka producer
    PRODUCER_CONFIG: Dict[str, str] = {**BROKER_CONFIG}

    # Default dataset config
    DATASET_PARAMS = {
        # One single partition for each dataset
        "num_partitions": 1,
        # No replication
        "replication_factor": 1,
        "config": {
            # Keep messages for 7 days
            "retention.ms": 1000
            * 60
            * 60
            * 24
            * 7,
        },
    }

    # Default Kafka consumer settings
    # Timeout for kafka consumer polling (seconds)
    CONSUMER_TIMEOUT = 0
    # Max number of messages to retreive when getting the last message
    CONSUMER_MAX_MESSAGES = 1000000

    # Default Kafka producer settings
    # Producer overridables
    # See: https://kafka.apache.org/documentation/#producerconfigs
    # Empty list means no overridable settings
    PRODUCER_OVERRIDABLES = []  # type: ignore

    # Default datasets to create for every pipeline
    LOGGING_DATASET = "logging"
    DATASETS = [LOGGING_DATASET]