Skip to content

DatasetConsumer

The DatasetConsumer class is used to consume from Kafka topics.

aineko.DatasetConsumer

DatasetConsumer(
    dataset_name: str,
    node_name: str,
    pipeline_name: str,
    dataset_config: Union[Dict[str, Any], Dataset],
    bootstrap_servers: Optional[str] = None,
    prefix: Optional[str] = None,
    has_pipeline_prefix: bool = False,
)

Wrapper class for Kafka consumer object.

DatasetConsumer objects are designed to consume messages from a single dataset and will consume the next unconsumed message in the queue.

When accessing kafka topics, prefixes will automatically be added to the dataset name as part of namespacing. For datasets defined in the pipeline config, has_pipeline_prefix will be set to True, so a dataset named my_dataset will point to a topic named my_pipeline.my_dataset.

Optionally, a custom prefix can be provided that will apply to all datasets. In the above example, if the prefix is set to test, the topic name will be test.my_pipeline.my_dataset.

Parameters:

Name Type Description Default
dataset_name str

name of the dataset

required
node_name str

name of the node that is consuming the dataset

required
pipeline_name str

name of the pipeline

required
dataset_config Union[Dict[str, Any], Dataset]

dataset config

required
bootstrap_servers Optional[str]

bootstrap_servers to connect to (e.g. "1.2.3.4:9092")

None
prefix Optional[str]

prefix for topic name (.)

None
has_pipeline_prefix bool

whether the dataset name has pipeline name prefix

False

Attributes:

Name Type Description
consumer

Kafka consumer object

cached

if the high watermark offset has been cached (updated when message consumed)

Methods:

Name Description
consume

reads a message from the dataset

Initialize the consumer.

Source code in aineko/core/dataset.py
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
def __init__(
    self,
    dataset_name: str,
    node_name: str,
    pipeline_name: str,
    dataset_config: Union[Dict[str, Any], Config.Pipeline.Dataset],
    bootstrap_servers: Optional[str] = None,
    prefix: Optional[str] = None,
    has_pipeline_prefix: bool = False,
):
    """Initialize the consumer."""
    self.pipeline_name = pipeline_name
    self.kafka_config = DEFAULT_KAFKA_CONFIG
    self.prefix = prefix
    self.has_pipeline_prefix = has_pipeline_prefix
    self.cached = False

    if isinstance(dataset_config, Config.Pipeline.Dataset):
        dataset_config = dataset_config.model_dump()

    consumer_config = self.kafka_config.get("CONSUMER_CONFIG")
    # Overwrite bootstrap server with broker if provided
    if bootstrap_servers:
        consumer_config["bootstrap.servers"] = bootstrap_servers

    # Override default config with dataset specific config
    for param, value in dataset_config.get("params", {}).items():
        # max.message.bytes is not a valid consumer parameter
        if param != "max.message.bytes":
            consumer_config[param] = value

    topic_name = dataset_name
    if has_pipeline_prefix:
        topic_name = f"{pipeline_name}.{dataset_name}"

    if self.prefix:
        self.name = f"{prefix}.{pipeline_name}.{node_name}"
        consumer_config["group.id"] = self.name
        self.consumer = Consumer(consumer_config)
        self.consumer.subscribe([f"{prefix}.{topic_name}"])

    else:
        self.name = f"{pipeline_name}.{node_name}"
        consumer_config["group.id"] = f"{pipeline_name}.{node_name}"
        self.consumer = Consumer(consumer_config)
        self.consumer.subscribe([topic_name])

    self.topic_name = topic_name

cached instance-attribute

cached = False

consumer instance-attribute

consumer = Consumer(consumer_config)

has_pipeline_prefix instance-attribute

has_pipeline_prefix = has_pipeline_prefix

kafka_config instance-attribute

kafka_config = DEFAULT_KAFKA_CONFIG

name instance-attribute

name = f'{prefix}.{pipeline_name}.{node_name}'

pipeline_name instance-attribute

pipeline_name = pipeline_name

prefix instance-attribute

prefix = prefix

topic_name instance-attribute

topic_name = topic_name

consume

consume(
    how: Literal["next", "last"] = "next",
    timeout: Optional[float] = None,
) -> Optional[dict]

Polls a message from the dataset.

If the consume method is last but the method encounters an error trying to udpdate the offset to latest, it will poll and return None.

Parameters:

Name Type Description Default
how Literal['next', 'last']

how to read the message. "next": read the next message in the queue "last": read the last message in the queue

'next'
timeout Optional[float]

seconds to poll for a response from kafka broker. If using how="last", set to bigger than 0.

None

Returns:

Type Description
Optional[dict]

message from the dataset

Raises:

Type Description
ValueError

if how is not "next" or "last"

Source code in aineko/core/dataset.py
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
def consume(
    self,
    how: Literal["next", "last"] = "next",
    timeout: Optional[float] = None,
) -> Optional[dict]:
    """Polls a message from the dataset.

    If the consume method is last but the method encounters
    an error trying to udpdate the offset to latest, it will
    poll and return None.

    Args:
        how: how to read the message.
            "next": read the next message in the queue
            "last": read the last message in the queue
        timeout: seconds to poll for a response from kafka broker.
            If using how="last", set to bigger than 0.

    Returns:
        message from the dataset

    Raises:
        ValueError: if how is not "next" or "last"
    """
    if how not in ["next", "last"]:
        raise ValueError(f"Invalid how: {how}. Expected `next` or `last`.")

    timeout = timeout or self.kafka_config.get("CONSUMER_TIMEOUT")
    if how == "next":
        # next unread message from queue
        message = self.consumer.poll(timeout=timeout)

    if how == "last":
        # last message from queue
        try:
            self._update_offset_to_latest()
        except KafkaError as e:
            logger.error(
                "Error updating offset to latest for consumer %s: %s",
                self.name,
                e,
            )
            return None
        message = self.consumer.poll(timeout=timeout)

    self.cached = True

    return self._validate_message(message)

consume_all

consume_all(end_message: Union[str, bool] = False) -> list

Reads all messages from the dataset until a specific one is found.

Parameters:

Name Type Description Default
end_message Union[str, bool]

Message to trigger the completion of consumption

False

Returns:

Type Description
list

list of messages from the dataset

Source code in aineko/core/dataset.py
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
def consume_all(self, end_message: Union[str, bool] = False) -> list:
    """Reads all messages from the dataset until a specific one is found.

    Args:
        end_message: Message to trigger the completion of consumption

    Returns:
        list of messages from the dataset
    """
    messages = []
    while True:
        message = self.consume()
        if message is None:
            continue
        if message["message"] == end_message:
            break
        messages.append(message)
    return messages

last

last(timeout: int = 1) -> dict

Consumes the last message from the dataset.

Wraps the consume(how="last") method. It implements a block that waits until a message is received before returning it. This method ensures that the consumed message is always the most recent message. If the consumer is slower than the producer, messages might be skipped. If the consumer is faster than the producer, messages might be repeated.

This is useful when the timeout is short and you expect the consumer to often return None.

Note: The timeout must be greater than 0 to prevent overwhelming the broker with requests to update the offset.

Parameters:

Name Type Description Default
timeout int

seconds to poll for a response from kafka broker. Must be >0.

1

Returns:

Type Description
dict

message from the dataset

Raises:

Type Description
ValueError

if timeout is <= 0

Source code in aineko/core/dataset.py
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
def last(self, timeout: int = 1) -> dict:
    """Consumes the last message from the dataset.

    Wraps the `consume(how="last")` method. It implements a
    block that waits until a message is received before returning it.
    This method ensures that the consumed message is always the most
    recent message. If the consumer is slower than the producer, messages
    might be skipped. If the consumer is faster than the producer,
    messages might be repeated.

    This is useful when the timeout is short and you expect the consumer
    to often return `None`.

    Note: The timeout must be greater than 0 to prevent
    overwhelming the broker with requests to update the offset.

    Args:
        timeout: seconds to poll for a response from kafka broker.
            Must be >0.

    Returns:
        message from the dataset

    Raises:
        ValueError: if timeout is <= 0
    """
    if timeout <= 0:
        raise ValueError(
            "Timeout must be > 0 when consuming the last message."
        )
    return self._consume_message(how="last", timeout=timeout)

next

next() -> dict

Consumes the next message from the dataset.

Wraps the consume(how="next") method. It implements a block that waits until a message is received before returning it. This method ensures that every message is consumed, but the consumed message may not be the most recent message if the consumer is slower than the producer.

This is useful when the timeout is short and you expect the consumer to often return None.

Returns:

Type Description
dict

message from the dataset

Source code in aineko/core/dataset.py
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
def next(self) -> dict:
    """Consumes the next message from the dataset.

    Wraps the `consume(how="next")` method. It implements a
    block that waits until a message is received before returning it.
    This method ensures that every message is consumed, but the consumed
    message may not be the most recent message if the consumer is slower
    than the producer.

    This is useful when the timeout is short and you expect the consumer
    to often return `None`.

    Returns:
        message from the dataset
    """
    return self._consume_message(how="next")