Skip to content

DatasetConsumer

The DatasetConsumer class is used to consume from Kafka topics.

aineko.DatasetConsumer

Wrapper class for Kafka consumer object.

DatasetConsumer objects are designed to consume messages from a single dataset and will consume the next unconsumed message in the queue.

When accessing kafka topics, prefixes will automatically be added to the dataset name as part of namespacing. For datasets defined in the pipeline config, has_pipeline_prefix will be set to True, so a dataset named my_dataset will point to a topic named my_pipeline.my_dataset.

Optionally, a custom prefix can be provided that will apply to all datasets. In the above example, if the prefix is set to test, the topic name will be test.my_pipeline.my_dataset.

Parameters:

Name Type Description Default
dataset_name str

name of the dataset

required
node_name str

name of the node that is consuming the dataset

required
pipeline_name str

name of the pipeline

required
dataset_config Dict[str, Any]

dataset config

required
bootstrap_servers Optional[str]

bootstrap_servers to connect to (e.g. "1.2.3.4:9092")

None
prefix Optional[str]

prefix for topic name (.)

None
has_pipeline_prefix bool

whether the dataset name has pipeline name prefix

False

Attributes:

Name Type Description
consumer

Kafka consumer object

cached

if the high watermark offset has been cached (updated when message consumed)

Methods:

Name Description
consume

reads a message from the dataset

Source code in aineko/core/dataset.py
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
class DatasetConsumer:
    """Wrapper class for Kafka consumer object.

    DatasetConsumer objects are designed to consume messages from a single
    dataset and will consume the next unconsumed message in the queue.

    When accessing kafka topics, prefixes will automatically be added to the
    dataset name as part of namespacing. For datasets defined in the pipeline
    config, `has_pipeline_prefix` will be set to `True`, so a dataset named
    `my_dataset` will point to a topic named `my_pipeline.my_dataset`.

    Optionally, a custom prefix can be provided that will apply to all datasets.
    In the above example, if the prefix is set to `test`, the topic name will
    be `test.my_pipeline.my_dataset`.

    Args:
        dataset_name: name of the dataset
        node_name: name of the node that is consuming the dataset
        pipeline_name: name of the pipeline
        dataset_config: dataset config
        bootstrap_servers: bootstrap_servers to connect to (e.g. "1.2.3.4:9092")
        prefix: prefix for topic name (<prefix>.<dataset_name>)
        has_pipeline_prefix: whether the dataset name has pipeline name prefix

    Attributes:
        consumer: Kafka consumer object
        cached: if the high watermark offset has been cached
            (updated when message consumed)

    Methods:
        consume: reads a message from the dataset
    """

    def __init__(
        self,
        dataset_name: str,
        node_name: str,
        pipeline_name: str,
        dataset_config: Dict[str, Any],
        bootstrap_servers: Optional[str] = None,
        prefix: Optional[str] = None,
        has_pipeline_prefix: bool = False,
    ):
        """Initialize the consumer."""
        self.pipeline_name = pipeline_name
        self.kafka_config = DEFAULT_KAFKA_CONFIG
        self.prefix = prefix
        self.has_pipeline_prefix = has_pipeline_prefix
        self.cached = False

        consumer_config = self.kafka_config.get("CONSUMER_CONFIG")
        # Overwrite bootstrap server with broker if provided
        if bootstrap_servers:
            consumer_config["bootstrap.servers"] = bootstrap_servers

        # Override default config with dataset specific config
        for param, value in dataset_config.get("params", {}).items():
            consumer_config[param] = value

        topic_name = dataset_name
        if has_pipeline_prefix:
            topic_name = f"{pipeline_name}.{dataset_name}"

        if self.prefix:
            self.name = f"{prefix}.{pipeline_name}.{node_name}"
            consumer_config["group.id"] = self.name
            self.consumer = Consumer(consumer_config)
            self.consumer.subscribe([f"{prefix}.{topic_name}"])

        else:
            self.name = f"{pipeline_name}.{node_name}"
            consumer_config["group.id"] = f"{pipeline_name}.{node_name}"
            self.consumer = Consumer(consumer_config)
            self.consumer.subscribe([topic_name])

        self.topic_name = topic_name

    @staticmethod
    def _validate_message(
        message: Optional[Message] = None,
    ) -> Optional[dict]:
        """Checks if a message is valid and converts it to appropriate format.

        Args:
            message: message to check

        Returns:
            message if valid, None if not
        """
        # Check if message is valid
        if message is None or message.value() is None:
            return None

        # Check if message is an error
        if message.error():
            logger.error(str(message.error()))
            return None

        # Convert message to dict
        message = message.value()
        if isinstance(message, bytes):
            message = message.decode("utf-8")
        return json.loads(message)

    def _update_offset_to_latest(self) -> None:
        """Updates offset to latest.

        Note that the initial call, for this method might take
        a while due to consumer initialization.
        """
        partitions = self.consumer.assignment()
        # Initialize consumers if not already initialized by polling
        while not partitions:
            self.consumer.poll(timeout=0)
            partitions = self.consumer.assignment()

        for partition in partitions:
            high_offset = self.consumer.get_watermark_offsets(
                partition, cached=self.cached
            )[1]

            # Invalid high offset can be caused by various reasons,
            # including rebalancing and empty topic. Default to -1.
            if high_offset == OFFSET_INVALID:
                logger.error(
                    "Invalid offset received for consumer: %s", self.name
                )
                partition.offset = -1
            else:
                partition.offset = high_offset - 1

        self.consumer.assign(partitions)

    def consume(
        self,
        how: Literal["next", "last"] = "next",
        timeout: Optional[float] = None,
    ) -> Optional[dict]:
        """Polls a message from the dataset.

        If the consume method is last but the method encounters
        an error trying to udpdate the offset to latest, it will
        poll and return None.

        Args:
            how: how to read the message.
                "next": read the next message in the queue
                "last": read the last message in the queue
            timeout: seconds to poll for a response from kafka broker.
                If using how="last", set to bigger than 0.

        Returns:
            message from the dataset

        Raises:
            ValueError: if how is not "next" or "last"
        """
        if how not in ["next", "last"]:
            raise ValueError(f"Invalid how: {how}. Expected `next` or `last`.")

        timeout = timeout or self.kafka_config.get("CONSUMER_TIMEOUT")
        if how == "next":
            # next unread message from queue
            message = self.consumer.poll(timeout=timeout)

        if how == "last":
            # last message from queue
            try:
                self._update_offset_to_latest()
            except KafkaError as e:
                logger.error(
                    "Error updating offset to latest for consumer %s: %s",
                    self.name,
                    e,
                )
                return None
            message = self.consumer.poll(timeout=timeout)

        self.cached = True

        return self._validate_message(message)

    def _consume_message(
        self, how: Literal["next", "last"], timeout: Optional[float] = None
    ) -> dict:
        """Calls the consume method and blocks until a message is returned.

        Args:
            how: See `consume` method for available options.

        Returns:
            message from dataset
        """
        while True:
            try:
                message = self.consume(how=how, timeout=timeout)
                if message is not None:
                    return message
            except KafkaError as e:
                if e.code() == "_MAX_POLL_EXCEEDED":
                    continue
                raise e

    def next(self) -> dict:
        """Consumes the next message from the dataset.

        Wraps the `consume(how="next")` method. It implements a
        block that waits until a message is received before returning it.
        This method ensures that every message is consumed, but the consumed
        message may not be the most recent message if the consumer is slower
        than the producer.

        This is useful when the timeout is short and you expect the consumer
        to often return `None`.

        Returns:
            message from the dataset
        """
        return self._consume_message(how="next")

    def last(self, timeout: int = 1) -> dict:
        """Consumes the last message from the dataset.

        Wraps the `consume(how="last")` method. It implements a
        block that waits until a message is received before returning it.
        This method ensures that the consumed message is always the most
        recent message. If the consumer is slower than the producer, messages
        might be skipped. If the consumer is faster than the producer,
        messages might be repeated.

        This is useful when the timeout is short and you expect the consumer
        to often return `None`.

        Note: The timeout must be greater than 0 to prevent
        overwhelming the broker with requests to update the offset.

        Args:
            timeout: seconds to poll for a response from kafka broker.
                Must be >0.

        Returns:
            message from the dataset

        Raises:
            ValueError: if timeout is <= 0
        """
        if timeout <= 0:
            raise ValueError(
                "Timeout must be > 0 when consuming the last message."
            )
        return self._consume_message(how="last", timeout=timeout)

    def consume_all(self, end_message: str | bool = False) -> list:
        """Reads all messages from the dataset until a specific one is found.

        Args:
            end_message: Message to trigger the completion of consumption

        Returns:
            list of messages from the dataset
        """
        messages = []
        while True:
            message = self.consume()
            if message is None:
                continue
            if message["message"] == end_message:
                break
            messages.append(message)
        return messages

__init__(dataset_name, node_name, pipeline_name, dataset_config, bootstrap_servers=None, prefix=None, has_pipeline_prefix=False)

Initialize the consumer.

Source code in aineko/core/dataset.py
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
def __init__(
    self,
    dataset_name: str,
    node_name: str,
    pipeline_name: str,
    dataset_config: Dict[str, Any],
    bootstrap_servers: Optional[str] = None,
    prefix: Optional[str] = None,
    has_pipeline_prefix: bool = False,
):
    """Initialize the consumer."""
    self.pipeline_name = pipeline_name
    self.kafka_config = DEFAULT_KAFKA_CONFIG
    self.prefix = prefix
    self.has_pipeline_prefix = has_pipeline_prefix
    self.cached = False

    consumer_config = self.kafka_config.get("CONSUMER_CONFIG")
    # Overwrite bootstrap server with broker if provided
    if bootstrap_servers:
        consumer_config["bootstrap.servers"] = bootstrap_servers

    # Override default config with dataset specific config
    for param, value in dataset_config.get("params", {}).items():
        consumer_config[param] = value

    topic_name = dataset_name
    if has_pipeline_prefix:
        topic_name = f"{pipeline_name}.{dataset_name}"

    if self.prefix:
        self.name = f"{prefix}.{pipeline_name}.{node_name}"
        consumer_config["group.id"] = self.name
        self.consumer = Consumer(consumer_config)
        self.consumer.subscribe([f"{prefix}.{topic_name}"])

    else:
        self.name = f"{pipeline_name}.{node_name}"
        consumer_config["group.id"] = f"{pipeline_name}.{node_name}"
        self.consumer = Consumer(consumer_config)
        self.consumer.subscribe([topic_name])

    self.topic_name = topic_name

consume(how='next', timeout=None)

Polls a message from the dataset.

If the consume method is last but the method encounters an error trying to udpdate the offset to latest, it will poll and return None.

Parameters:

Name Type Description Default
how Literal['next', 'last']

how to read the message. "next": read the next message in the queue "last": read the last message in the queue

'next'
timeout Optional[float]

seconds to poll for a response from kafka broker. If using how="last", set to bigger than 0.

None

Returns:

Type Description
Optional[dict]

message from the dataset

Raises:

Type Description
ValueError

if how is not "next" or "last"

Source code in aineko/core/dataset.py
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
def consume(
    self,
    how: Literal["next", "last"] = "next",
    timeout: Optional[float] = None,
) -> Optional[dict]:
    """Polls a message from the dataset.

    If the consume method is last but the method encounters
    an error trying to udpdate the offset to latest, it will
    poll and return None.

    Args:
        how: how to read the message.
            "next": read the next message in the queue
            "last": read the last message in the queue
        timeout: seconds to poll for a response from kafka broker.
            If using how="last", set to bigger than 0.

    Returns:
        message from the dataset

    Raises:
        ValueError: if how is not "next" or "last"
    """
    if how not in ["next", "last"]:
        raise ValueError(f"Invalid how: {how}. Expected `next` or `last`.")

    timeout = timeout or self.kafka_config.get("CONSUMER_TIMEOUT")
    if how == "next":
        # next unread message from queue
        message = self.consumer.poll(timeout=timeout)

    if how == "last":
        # last message from queue
        try:
            self._update_offset_to_latest()
        except KafkaError as e:
            logger.error(
                "Error updating offset to latest for consumer %s: %s",
                self.name,
                e,
            )
            return None
        message = self.consumer.poll(timeout=timeout)

    self.cached = True

    return self._validate_message(message)

consume_all(end_message=False)

Reads all messages from the dataset until a specific one is found.

Parameters:

Name Type Description Default
end_message str | bool

Message to trigger the completion of consumption

False

Returns:

Type Description
list

list of messages from the dataset

Source code in aineko/core/dataset.py
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
def consume_all(self, end_message: str | bool = False) -> list:
    """Reads all messages from the dataset until a specific one is found.

    Args:
        end_message: Message to trigger the completion of consumption

    Returns:
        list of messages from the dataset
    """
    messages = []
    while True:
        message = self.consume()
        if message is None:
            continue
        if message["message"] == end_message:
            break
        messages.append(message)
    return messages

last(timeout=1)

Consumes the last message from the dataset.

Wraps the consume(how="last") method. It implements a block that waits until a message is received before returning it. This method ensures that the consumed message is always the most recent message. If the consumer is slower than the producer, messages might be skipped. If the consumer is faster than the producer, messages might be repeated.

This is useful when the timeout is short and you expect the consumer to often return None.

Note: The timeout must be greater than 0 to prevent overwhelming the broker with requests to update the offset.

Parameters:

Name Type Description Default
timeout int

seconds to poll for a response from kafka broker. Must be >0.

1

Returns:

Type Description
dict

message from the dataset

Raises:

Type Description
ValueError

if timeout is <= 0

Source code in aineko/core/dataset.py
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
def last(self, timeout: int = 1) -> dict:
    """Consumes the last message from the dataset.

    Wraps the `consume(how="last")` method. It implements a
    block that waits until a message is received before returning it.
    This method ensures that the consumed message is always the most
    recent message. If the consumer is slower than the producer, messages
    might be skipped. If the consumer is faster than the producer,
    messages might be repeated.

    This is useful when the timeout is short and you expect the consumer
    to often return `None`.

    Note: The timeout must be greater than 0 to prevent
    overwhelming the broker with requests to update the offset.

    Args:
        timeout: seconds to poll for a response from kafka broker.
            Must be >0.

    Returns:
        message from the dataset

    Raises:
        ValueError: if timeout is <= 0
    """
    if timeout <= 0:
        raise ValueError(
            "Timeout must be > 0 when consuming the last message."
        )
    return self._consume_message(how="last", timeout=timeout)

next()

Consumes the next message from the dataset.

Wraps the consume(how="next") method. It implements a block that waits until a message is received before returning it. This method ensures that every message is consumed, but the consumed message may not be the most recent message if the consumer is slower than the producer.

This is useful when the timeout is short and you expect the consumer to often return None.

Returns:

Type Description
dict

message from the dataset

Source code in aineko/core/dataset.py
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
def next(self) -> dict:
    """Consumes the next message from the dataset.

    Wraps the `consume(how="next")` method. It implements a
    block that waits until a message is received before returning it.
    This method ensures that every message is consumed, but the consumed
    message may not be the most recent message if the consumer is slower
    than the producer.

    This is useful when the timeout is short and you expect the consumer
    to often return `None`.

    Returns:
        message from the dataset
    """
    return self._consume_message(how="next")