Skip to content

DatasetProducer

The DatasetProducer class is used to produce to kafka topics.

aineko.DatasetProducer

DatasetProducer(
    dataset_name: str,
    node_name: str,
    pipeline_name: str,
    dataset_config: Dict[str, Any],
    prefix: Optional[str] = None,
    has_pipeline_prefix: bool = False,
)

Wrapper class for Kafka producer object.

See DatasetConsumer for prefix rules.

Parameters:

Name Type Description Default
dataset_name str

dataset name

required
node_name str

name of the node that is producing the message

required
pipeline_name str

name of the pipeline

required
dataset_config Dict[str, Any]

dataset config

required
prefix Optional[str]

prefix for topic name (.)

None
has_pipeline_prefix bool

whether the dataset name has pipeline name prefix

False

Attributes:

Name Type Description
producer

Kafka producer object

Methods:

Name Description
produce

produce a message to the dataset

Initialize the producer.

Source code in aineko/core/dataset.py
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
def __init__(
    self,
    dataset_name: str,
    node_name: str,
    pipeline_name: str,
    dataset_config: Dict[str, Any],
    prefix: Optional[str] = None,
    has_pipeline_prefix: bool = False,
):
    """Initialize the producer."""
    self.source_pipeline = pipeline_name
    self.dataset = dataset_name
    self.source_node = node_name
    self.prefix = prefix
    self.has_pipeline_prefix = has_pipeline_prefix

    # Create topic name based on prefix rules
    topic_name = dataset_name
    if has_pipeline_prefix:
        topic_name = f"{pipeline_name}.{topic_name}"
    if prefix:
        topic_name = f"{prefix}.{topic_name}"
    self.topic_name = topic_name

    # Assign kafka config
    self.kafka_config = DEFAULT_KAFKA_CONFIG

    # Set producer parameters
    producer_config = self.kafka_config.get("PRODUCER_CONFIG")

    # Override default config with dataset specific config
    if "params" in dataset_config:
        for key, value in dataset_config["params"].items():
            if key == "max.message.bytes":
                # max.message.bytes is a special case and should be set
                # as message.max.bytes in the producer config
                producer_config["message.max.bytes"] = value
            else:
                producer_config[key] = value

    # Create producer
    self.producer = Producer(producer_config)

dataset instance-attribute

dataset = dataset_name

has_pipeline_prefix instance-attribute

has_pipeline_prefix = has_pipeline_prefix

kafka_config instance-attribute

kafka_config = DEFAULT_KAFKA_CONFIG

prefix instance-attribute

prefix = prefix

producer instance-attribute

producer = Producer(producer_config)

source_node instance-attribute

source_node = node_name

source_pipeline instance-attribute

source_pipeline = pipeline_name

topic_name instance-attribute

topic_name = topic_name

produce

produce(message: dict, key: Optional[str] = None) -> None

Produce a message to the dataset.

Parameters:

Name Type Description Default
message dict

message to produce to the dataset

required
key Optional[str]

key to use for the message

None
Source code in aineko/core/dataset.py
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
def produce(self, message: dict, key: Optional[str] = None) -> None:
    """Produce a message to the dataset.

    Args:
        message: message to produce to the dataset
        key: key to use for the message
    """
    message = {
        "timestamp": datetime.datetime.now().strftime(
            AINEKO_CONFIG.get("MSG_TIMESTAMP_FORMAT")
        ),
        "dataset": self.dataset,
        "source_pipeline": self.source_pipeline,
        "source_node": self.source_node,
        "message": message,
    }
    self.producer.poll(0)

    key_bytes = str(key).encode("utf-8") if key is not None else None

    self.producer.produce(
        topic=self.topic_name,
        key=key_bytes,
        value=json.dumps(message).encode("utf-8"),
        callback=self._delivery_report,
    )
    self.producer.flush()