Skip to content

DatasetProducer

The DatasetProducer class is used to produce to kafka topics.

aineko.DatasetProducer

Wrapper class for Kafka producer object.

See DatasetConsumer for prefix rules.

Parameters:

Name Type Description Default
dataset_name str

dataset name

required
node_name str

name of the node that is producing the message

required
pipeline_name str

name of the pipeline

required
dataset_config Dict[str, Any]

dataset config

required
prefix Optional[str]

prefix for topic name (.)

None
has_pipeline_prefix bool

whether the dataset name has pipeline name prefix

False

Attributes:

Name Type Description
producer

Kafka producer object

Methods:

Name Description
produce

produce a message to the dataset

Source code in aineko/core/dataset.py
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
class DatasetProducer:
    """Wrapper class for Kafka producer object.

    See DatasetConsumer for prefix rules.

    Args:
        dataset_name: dataset name
        node_name: name of the node that is producing the message
        pipeline_name: name of the pipeline
        dataset_config: dataset config
        prefix: prefix for topic name (<prefix>.<dataset_name>)
        has_pipeline_prefix: whether the dataset name has pipeline name prefix

    Attributes:
        producer: Kafka producer object

    Methods:
        produce: produce a message to the dataset
    """

    def __init__(
        self,
        dataset_name: str,
        node_name: str,
        pipeline_name: str,
        dataset_config: Dict[str, Any],
        prefix: Optional[str] = None,
        has_pipeline_prefix: bool = False,
    ):
        """Initialize the producer."""
        self.source_pipeline = pipeline_name
        self.dataset = dataset_name
        self.source_node = node_name
        self.prefix = prefix
        self.has_pipeline_prefix = has_pipeline_prefix

        # Create topic name based on prefix rules
        topic_name = dataset_name
        if has_pipeline_prefix:
            topic_name = f"{pipeline_name}.{topic_name}"
        if prefix:
            topic_name = f"{prefix}.{topic_name}"
        self.topic_name = topic_name

        # Assign kafka config
        self.kafka_config = DEFAULT_KAFKA_CONFIG

        # Set producer parameters
        producer_config = self.kafka_config.get("PRODUCER_CONFIG")

        # Override default config with dataset specific config
        if "params" in dataset_config:
            for param in self.kafka_config.get("PRODUCER_OVERRIDABLES"):
                if param in dataset_config["params"]:
                    producer_config[param] = dataset_config["params"][param]

        # Create producer
        self.producer = Producer(producer_config)

    @staticmethod
    def _delivery_report(err: Any, message: Message) -> None:
        """Called once for each message produced to indicate delivery result.

        Triggered by poll() or flush().

        Args:
            err: error message
            message: message object from Kafka
        """
        if err is not None:
            logger.error("Message %s delivery failed: %s", message, err)

    def produce(self, message: dict, key: Optional[str] = None) -> None:
        """Produce a message to the dataset.

        Args:
            message: message to produce to the dataset
            key: key to use for the message
        """
        message = {
            "timestamp": datetime.datetime.now().strftime(
                AINEKO_CONFIG.get("MSG_TIMESTAMP_FORMAT")
            ),
            "dataset": self.dataset,
            "source_pipeline": self.source_pipeline,
            "source_node": self.source_node,
            "message": message,
        }
        self.producer.poll(0)

        key_bytes = str(key).encode("utf-8") if key is not None else None

        self.producer.produce(
            topic=self.topic_name,
            key=key_bytes,
            value=json.dumps(message).encode("utf-8"),
            callback=self._delivery_report,
        )
        self.producer.flush()

__init__(dataset_name, node_name, pipeline_name, dataset_config, prefix=None, has_pipeline_prefix=False)

Initialize the producer.

Source code in aineko/core/dataset.py
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
def __init__(
    self,
    dataset_name: str,
    node_name: str,
    pipeline_name: str,
    dataset_config: Dict[str, Any],
    prefix: Optional[str] = None,
    has_pipeline_prefix: bool = False,
):
    """Initialize the producer."""
    self.source_pipeline = pipeline_name
    self.dataset = dataset_name
    self.source_node = node_name
    self.prefix = prefix
    self.has_pipeline_prefix = has_pipeline_prefix

    # Create topic name based on prefix rules
    topic_name = dataset_name
    if has_pipeline_prefix:
        topic_name = f"{pipeline_name}.{topic_name}"
    if prefix:
        topic_name = f"{prefix}.{topic_name}"
    self.topic_name = topic_name

    # Assign kafka config
    self.kafka_config = DEFAULT_KAFKA_CONFIG

    # Set producer parameters
    producer_config = self.kafka_config.get("PRODUCER_CONFIG")

    # Override default config with dataset specific config
    if "params" in dataset_config:
        for param in self.kafka_config.get("PRODUCER_OVERRIDABLES"):
            if param in dataset_config["params"]:
                producer_config[param] = dataset_config["params"][param]

    # Create producer
    self.producer = Producer(producer_config)

produce(message, key=None)

Produce a message to the dataset.

Parameters:

Name Type Description Default
message dict

message to produce to the dataset

required
key Optional[str]

key to use for the message

None
Source code in aineko/core/dataset.py
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
def produce(self, message: dict, key: Optional[str] = None) -> None:
    """Produce a message to the dataset.

    Args:
        message: message to produce to the dataset
        key: key to use for the message
    """
    message = {
        "timestamp": datetime.datetime.now().strftime(
            AINEKO_CONFIG.get("MSG_TIMESTAMP_FORMAT")
        ),
        "dataset": self.dataset,
        "source_pipeline": self.source_pipeline,
        "source_node": self.source_node,
        "message": message,
    }
    self.producer.poll(0)

    key_bytes = str(key).encode("utf-8") if key is not None else None

    self.producer.produce(
        topic=self.topic_name,
        key=key_bytes,
        value=json.dumps(message).encode("utf-8"),
        callback=self._delivery_report,
    )
    self.producer.flush()