Skip to content

AbstractNode

The AbstractNode class serves as the base class for all user-defined nodes.

aineko.AbstractNode

Bases: ABC

Node base class for all nodes in the pipeline.

Nodes are the building blocks of the pipeline and are responsible for executing the pipeline. Nodes are designed to be modular and can be combined to create a pipeline. The node base class provides helper methods for setting up the consumer and producer for a node. The execute method is a wrapper for the _execute method which is to be implemented by subclasses. The _execute method is where the node logic is implemented by the user.

Attributes:

Name Type Description
consumers dict

dict of DatasetConsumer objects for inputs to node

producers dict

dict of DatasetProducer objects for outputs of node

last_hearbeat float

timestamp of the last heartbeat

test bool

True if node is in test mode else False

log_levels tuple

tuple of allowed log levels

local_state dict

shared local state between nodes. Used for intra- pipeline communication without dataset dependency.

Methods:

Name Description
setup_datasets

setup the consumers and producers for a node

execute

execute the node, wrapper for _execute method

_execute

execute the node, to be implemented by subclasses

Source code in aineko/core/node.py
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
class AbstractNode(ABC):
    """Node base class for all nodes in the pipeline.

    Nodes are the building blocks of the pipeline and are responsible for
    executing the pipeline. Nodes are designed to be modular and can be
    combined to create a pipeline. The node base class provides helper methods
    for setting up the consumer and producer for a node. The execute method is
    a wrapper for the _execute method which is to be implemented by subclasses.
    The _execute method is where the node logic is implemented by the user.

    Attributes:
        consumers (dict): dict of DatasetConsumer objects for inputs to node
        producers (dict): dict of DatasetProducer objects for outputs of node
        last_hearbeat (float): timestamp of the last heartbeat
        test (bool): True if node is in test mode else False
        log_levels (tuple): tuple of allowed log levels
        local_state (dict): shared local state between nodes. Used for intra-
            pipeline communication without dataset dependency.

    Methods:
        setup_datasets: setup the consumers and producers for a node
        execute: execute the node, wrapper for _execute method
        _execute: execute the node, to be implemented by subclasses
    """

    def __init__(
        self,
        node_name: str | None,
        pipeline_name: str,
        poison_pill: Optional[ray.actor.ActorHandle] = None,
        test: bool = False,
    ) -> None:
        """Initialize the node."""
        self.name = node_name or self.__class__.__name__
        self.pipeline_name = pipeline_name
        self.last_heartbeat = time.time()
        self.consumers: Dict = {}
        self.producers: Dict = {}
        self.params: Dict = {}
        self.test = test
        self.log_levels = AINEKO_CONFIG.get("LOG_LEVELS")
        self.poison_pill = poison_pill

    def enable_test_mode(self) -> None:
        """Enable test mode."""
        self.test = True

    def setup_datasets(
        self,
        datasets: Dict[str, dict],
        inputs: Optional[List[str]] = None,
        outputs: Optional[List[str]] = None,
        prefix: Optional[str] = None,
        has_pipeline_prefix: bool = False,
    ) -> None:
        """Setup the consumer and producer for a node.

        Args:
            datasets: dataset configuration
            node: name of the node
            pipeline: name of the pipeline
            inputs: list of dataset names for the inputs to the node
            outputs: list of dataset names for the outputs of the node
            prefix: prefix for topic name (`<prefix>.<dataset_name>`)
            has_pipeline_prefix: whether the dataset name has pipeline name
            prefix
        """
        inputs = inputs or []
        self.consumers.update(
            {
                dataset_name: DatasetConsumer(
                    dataset_name=dataset_name,
                    node_name=self.name,
                    pipeline_name=self.pipeline_name,
                    dataset_config=datasets.get(dataset_name, {}),
                    prefix=prefix,
                    has_pipeline_prefix=has_pipeline_prefix,
                )
                for dataset_name in inputs
            }
        )

        outputs = outputs or []
        self.producers.update(
            {
                dataset_name: DatasetProducer(
                    dataset_name=dataset_name,
                    node_name=self.name,
                    pipeline_name=self.pipeline_name,
                    dataset_config=datasets.get(dataset_name, {}),
                    prefix=prefix,
                    has_pipeline_prefix=has_pipeline_prefix,
                )
                for dataset_name in outputs
            }
        )

    def setup_test(
        self,
        inputs: Optional[dict] = None,
        outputs: Optional[list] = None,
        params: Optional[dict] = None,
    ) -> None:
        """Setup the node for testing.

        Args:
            inputs: inputs to the node, format should be {"dataset": [1, 2, 3]}
            outputs: outputs of the node, format should be
                ["dataset_1", "dataset_2", ...]
            params: dictionary of parameters to make accessible to _execute

        Raises:
            RuntimeError: if node is not in test mode
        """
        if self.test is False:
            raise RuntimeError(
                "Node is not in test mode. "
                "Please initialize with `enable_test_mode()`."
            )

        inputs = inputs or {}
        self.consumers = {
            dataset_name: FakeDatasetConsumer(
                dataset_name=dataset_name,
                node_name=self.__class__.__name__,
                values=values,
            )
            for dataset_name, values in inputs.items()
        }
        outputs = outputs or []
        outputs.extend(TESTING_NODE_CONFIG.get("DATASETS"))
        self.producers = {
            dataset_name: FakeDatasetProducer(
                dataset_name=dataset_name,
                node_name=self.__class__.__name__,
            )
            for dataset_name in outputs
        }
        self.params = params or {}

    def log(self, message: str, level: str = "info") -> None:
        """Log a message to the logging dataset.

        Args:
            message: Message to log
            level: Logging level. Defaults to "info". Options are:
                "info", "debug", "warning", "error", "critical"
        Raises:
            ValueError: if invalid logging level is provided
        """
        if level not in self.log_levels:
            raise ValueError(
                f"Invalid logging level {level}. Valid options are: "
                f"{', '.join(self.log_levels)}"
            )
        out_msg = {"log": message, "level": level}
        self.producers[DEFAULT_KAFKA_CONFIG.get("LOGGING_DATASET")].produce(
            out_msg
        )

    def _log_traceback(self) -> None:
        """Logs the traceback of an exception."""
        exc_info = traceback.format_exc()
        self.log(exc_info, level="debug")

    def execute(self, params: Optional[dict] = None) -> None:
        """Execute the node.

        Wrapper for _execute method to be implemented by subclasses.

        Args:
            params: Parameters to use to execute the node. Defaults to None.
        """
        params = params or {}
        run_loop = True

        try:
            self._pre_loop_hook(params)
        except Exception:  # pylint: disable=broad-except
            self._log_traceback()
            raise

        while run_loop is not False:
            # Monitoring
            try:
                run_loop = self._execute(params)  # type: ignore
            except Exception:  # pylint: disable=broad-except
                self._log_traceback()
                raise

        self.log(f"Execution loop complete for node: {self.__class__.__name__}")
        self._post_loop_hook(params)

    def activate_poison_pill(self) -> None:
        """Activates poison pill, shutting down entire pipeline."""
        if self.poison_pill:
            ray.get(self.poison_pill.activate.remote())

    @abstractmethod
    def _execute(self, params: dict) -> Optional[bool]:
        """Execute the node.

        Args:
            params: Parameters to use to execute the node.

        Note:
            Method to be implemented by subclasses

        Raises:
            NotImplementedError: if method is not implemented in subclass
        """
        raise NotImplementedError("_execute method not implemented")

    def run_test(self, runtime: Optional[int] = None) -> dict:
        """Execute the node in testing mode.

        Runs the steps in execute that involves the user defined methods.
        Includes pre_loop_hook, _execute, and post_loop_hook.

        Args:
            runtime: Number of seconds to run the execute loop for.

        Returns:
            dict: dataset names and values produced by the node.
        """
        if self.test is False:
            raise RuntimeError(
                "Node is not in test mode. "
                "Please initialize with `enable_test_mode()`."
            )
        run_loop = True
        start_time = time.time()

        self._pre_loop_hook(self.params)
        while run_loop is not False:
            run_loop = self._execute(self.params)  # type: ignore

            # Do not end loop if runtime not exceeded
            if runtime is not None:
                if time.time() - start_time < runtime:
                    continue

            # End loop if all consumers are empty
            if self.consumers and all(
                consumer.empty for consumer in self.consumers.values()
            ):
                run_loop = False

        self._post_loop_hook(self.params)

        return {
            dataset_name: producer.values
            for dataset_name, producer in self.producers.items()
        }

    def run_test_yield(
        self, runtime: Optional[int] = None
    ) -> Generator[Tuple[dict, dict, "AbstractNode"], None, None]:
        """Execute the node in testing mode, yielding at each iteration.

        This method is an alternative to `run_test`. Instead of returning the
        aggregated output, it yields the most recently consumed value, the
        produced value and the current node instance at each iteration. This is
        useful for testing nodes that either don't produce any output or if you
        need to test intermediate outputs. Testing state modifications is also
        possible using this method.

        Args:
            runtime: Number of seconds to run the execute loop for.

        Yields:
            A tuple containing the most recent input value, output value and
            the node instance.

        Example:
            >>> for input, output, node_instance in sequencer.run_test_yield():
            >>>     print(f"Input: {input}, Output: {output})
            >>>     print(f"Node Instance: {node_instance}")
        """
        if self.test is False:
            raise RuntimeError(
                "Node is not in test mode. "
                "Please initialize with `enable_test_mode()`."
            )
        run_loop = True
        start_time = time.time()

        self._pre_loop_hook(self.params)
        while run_loop is not False:
            last_produced_values = {}
            last_consumed_values = {}

            run_loop = self._execute(self.params)  # type: ignore

            # Do not end loop if runtime not exceeded
            if runtime is not None:
                if time.time() - start_time < runtime:
                    continue

            # End loop if all consumers are empty
            if self.consumers and all(
                consumer.empty for consumer in self.consumers.values()
            ):
                run_loop = False

            # Capture last consumed values
            for dataset_name, consumer in self.consumers.items():
                if consumer.values:
                    last_value = consumer.values[0]
                    last_consumed_values[dataset_name] = last_value

            # Capture last produced values
            for dataset_name, producer in self.producers.items():
                if producer.values:
                    last_value = producer.values[-1]
                    last_produced_values[dataset_name] = last_value

            yield (last_consumed_values, last_produced_values, self)

        self._post_loop_hook(self.params)

    def _pre_loop_hook(self, params: Optional[dict] = None) -> None:
        """Hook to be called before the node loop. User overrideable.

        Args:
            params: Parameters to use to execute the node.

        Note:
            Method (optional) to be implemented by subclasses.
        """
        pass

    def _post_loop_hook(self, params: Optional[dict] = None) -> None:
        """Hook to be called after the node loop. User overrideable.

        Args:
            params: Parameters to use to execute the node.

        Note:
            Method (optional) to be implemented by subclasses.
        """
        pass

__init__(node_name, pipeline_name, poison_pill=None, test=False)

Initialize the node.

Source code in aineko/core/node.py
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
def __init__(
    self,
    node_name: str | None,
    pipeline_name: str,
    poison_pill: Optional[ray.actor.ActorHandle] = None,
    test: bool = False,
) -> None:
    """Initialize the node."""
    self.name = node_name or self.__class__.__name__
    self.pipeline_name = pipeline_name
    self.last_heartbeat = time.time()
    self.consumers: Dict = {}
    self.producers: Dict = {}
    self.params: Dict = {}
    self.test = test
    self.log_levels = AINEKO_CONFIG.get("LOG_LEVELS")
    self.poison_pill = poison_pill

activate_poison_pill()

Activates poison pill, shutting down entire pipeline.

Source code in aineko/core/node.py
250
251
252
253
def activate_poison_pill(self) -> None:
    """Activates poison pill, shutting down entire pipeline."""
    if self.poison_pill:
        ray.get(self.poison_pill.activate.remote())

enable_test_mode()

Enable test mode.

Source code in aineko/core/node.py
100
101
102
def enable_test_mode(self) -> None:
    """Enable test mode."""
    self.test = True

execute(params=None)

Execute the node.

Wrapper for _execute method to be implemented by subclasses.

Parameters:

Name Type Description Default
params Optional[dict]

Parameters to use to execute the node. Defaults to None.

None
Source code in aineko/core/node.py
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
def execute(self, params: Optional[dict] = None) -> None:
    """Execute the node.

    Wrapper for _execute method to be implemented by subclasses.

    Args:
        params: Parameters to use to execute the node. Defaults to None.
    """
    params = params or {}
    run_loop = True

    try:
        self._pre_loop_hook(params)
    except Exception:  # pylint: disable=broad-except
        self._log_traceback()
        raise

    while run_loop is not False:
        # Monitoring
        try:
            run_loop = self._execute(params)  # type: ignore
        except Exception:  # pylint: disable=broad-except
            self._log_traceback()
            raise

    self.log(f"Execution loop complete for node: {self.__class__.__name__}")
    self._post_loop_hook(params)

log(message, level='info')

Log a message to the logging dataset.

Parameters:

Name Type Description Default
message str

Message to log

required
level str

Logging level. Defaults to "info". Options are: "info", "debug", "warning", "error", "critical"

'info'

Raises: ValueError: if invalid logging level is provided

Source code in aineko/core/node.py
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
def log(self, message: str, level: str = "info") -> None:
    """Log a message to the logging dataset.

    Args:
        message: Message to log
        level: Logging level. Defaults to "info". Options are:
            "info", "debug", "warning", "error", "critical"
    Raises:
        ValueError: if invalid logging level is provided
    """
    if level not in self.log_levels:
        raise ValueError(
            f"Invalid logging level {level}. Valid options are: "
            f"{', '.join(self.log_levels)}"
        )
    out_msg = {"log": message, "level": level}
    self.producers[DEFAULT_KAFKA_CONFIG.get("LOGGING_DATASET")].produce(
        out_msg
    )

run_test(runtime=None)

Execute the node in testing mode.

Runs the steps in execute that involves the user defined methods. Includes pre_loop_hook, _execute, and post_loop_hook.

Parameters:

Name Type Description Default
runtime Optional[int]

Number of seconds to run the execute loop for.

None

Returns:

Name Type Description
dict dict

dataset names and values produced by the node.

Source code in aineko/core/node.py
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
def run_test(self, runtime: Optional[int] = None) -> dict:
    """Execute the node in testing mode.

    Runs the steps in execute that involves the user defined methods.
    Includes pre_loop_hook, _execute, and post_loop_hook.

    Args:
        runtime: Number of seconds to run the execute loop for.

    Returns:
        dict: dataset names and values produced by the node.
    """
    if self.test is False:
        raise RuntimeError(
            "Node is not in test mode. "
            "Please initialize with `enable_test_mode()`."
        )
    run_loop = True
    start_time = time.time()

    self._pre_loop_hook(self.params)
    while run_loop is not False:
        run_loop = self._execute(self.params)  # type: ignore

        # Do not end loop if runtime not exceeded
        if runtime is not None:
            if time.time() - start_time < runtime:
                continue

        # End loop if all consumers are empty
        if self.consumers and all(
            consumer.empty for consumer in self.consumers.values()
        ):
            run_loop = False

    self._post_loop_hook(self.params)

    return {
        dataset_name: producer.values
        for dataset_name, producer in self.producers.items()
    }

run_test_yield(runtime=None)

Execute the node in testing mode, yielding at each iteration.

This method is an alternative to run_test. Instead of returning the aggregated output, it yields the most recently consumed value, the produced value and the current node instance at each iteration. This is useful for testing nodes that either don't produce any output or if you need to test intermediate outputs. Testing state modifications is also possible using this method.

Parameters:

Name Type Description Default
runtime Optional[int]

Number of seconds to run the execute loop for.

None

Yields:

Type Description
dict

A tuple containing the most recent input value, output value and

dict

the node instance.

Example

for input, output, node_instance in sequencer.run_test_yield(): print(f"Input: {input}, Output: {output}) print(f"Node Instance: {node_instance}")

Source code in aineko/core/node.py
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
def run_test_yield(
    self, runtime: Optional[int] = None
) -> Generator[Tuple[dict, dict, "AbstractNode"], None, None]:
    """Execute the node in testing mode, yielding at each iteration.

    This method is an alternative to `run_test`. Instead of returning the
    aggregated output, it yields the most recently consumed value, the
    produced value and the current node instance at each iteration. This is
    useful for testing nodes that either don't produce any output or if you
    need to test intermediate outputs. Testing state modifications is also
    possible using this method.

    Args:
        runtime: Number of seconds to run the execute loop for.

    Yields:
        A tuple containing the most recent input value, output value and
        the node instance.

    Example:
        >>> for input, output, node_instance in sequencer.run_test_yield():
        >>>     print(f"Input: {input}, Output: {output})
        >>>     print(f"Node Instance: {node_instance}")
    """
    if self.test is False:
        raise RuntimeError(
            "Node is not in test mode. "
            "Please initialize with `enable_test_mode()`."
        )
    run_loop = True
    start_time = time.time()

    self._pre_loop_hook(self.params)
    while run_loop is not False:
        last_produced_values = {}
        last_consumed_values = {}

        run_loop = self._execute(self.params)  # type: ignore

        # Do not end loop if runtime not exceeded
        if runtime is not None:
            if time.time() - start_time < runtime:
                continue

        # End loop if all consumers are empty
        if self.consumers and all(
            consumer.empty for consumer in self.consumers.values()
        ):
            run_loop = False

        # Capture last consumed values
        for dataset_name, consumer in self.consumers.items():
            if consumer.values:
                last_value = consumer.values[0]
                last_consumed_values[dataset_name] = last_value

        # Capture last produced values
        for dataset_name, producer in self.producers.items():
            if producer.values:
                last_value = producer.values[-1]
                last_produced_values[dataset_name] = last_value

        yield (last_consumed_values, last_produced_values, self)

    self._post_loop_hook(self.params)

setup_datasets(datasets, inputs=None, outputs=None, prefix=None, has_pipeline_prefix=False)

Setup the consumer and producer for a node.

Parameters:

Name Type Description Default
datasets Dict[str, dict]

dataset configuration

required
node

name of the node

required
pipeline

name of the pipeline

required
inputs Optional[List[str]]

list of dataset names for the inputs to the node

None
outputs Optional[List[str]]

list of dataset names for the outputs of the node

None
prefix Optional[str]

prefix for topic name (<prefix>.<dataset_name>)

None
has_pipeline_prefix bool

whether the dataset name has pipeline name

False
Source code in aineko/core/node.py
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
def setup_datasets(
    self,
    datasets: Dict[str, dict],
    inputs: Optional[List[str]] = None,
    outputs: Optional[List[str]] = None,
    prefix: Optional[str] = None,
    has_pipeline_prefix: bool = False,
) -> None:
    """Setup the consumer and producer for a node.

    Args:
        datasets: dataset configuration
        node: name of the node
        pipeline: name of the pipeline
        inputs: list of dataset names for the inputs to the node
        outputs: list of dataset names for the outputs of the node
        prefix: prefix for topic name (`<prefix>.<dataset_name>`)
        has_pipeline_prefix: whether the dataset name has pipeline name
        prefix
    """
    inputs = inputs or []
    self.consumers.update(
        {
            dataset_name: DatasetConsumer(
                dataset_name=dataset_name,
                node_name=self.name,
                pipeline_name=self.pipeline_name,
                dataset_config=datasets.get(dataset_name, {}),
                prefix=prefix,
                has_pipeline_prefix=has_pipeline_prefix,
            )
            for dataset_name in inputs
        }
    )

    outputs = outputs or []
    self.producers.update(
        {
            dataset_name: DatasetProducer(
                dataset_name=dataset_name,
                node_name=self.name,
                pipeline_name=self.pipeline_name,
                dataset_config=datasets.get(dataset_name, {}),
                prefix=prefix,
                has_pipeline_prefix=has_pipeline_prefix,
            )
            for dataset_name in outputs
        }
    )

setup_test(inputs=None, outputs=None, params=None)

Setup the node for testing.

Parameters:

Name Type Description Default
inputs Optional[dict]

inputs to the node, format should be {"dataset": [1, 2, 3]}

None
outputs Optional[list]

outputs of the node, format should be ["dataset_1", "dataset_2", ...]

None
params Optional[dict]

dictionary of parameters to make accessible to _execute

None

Raises:

Type Description
RuntimeError

if node is not in test mode

Source code in aineko/core/node.py
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
def setup_test(
    self,
    inputs: Optional[dict] = None,
    outputs: Optional[list] = None,
    params: Optional[dict] = None,
) -> None:
    """Setup the node for testing.

    Args:
        inputs: inputs to the node, format should be {"dataset": [1, 2, 3]}
        outputs: outputs of the node, format should be
            ["dataset_1", "dataset_2", ...]
        params: dictionary of parameters to make accessible to _execute

    Raises:
        RuntimeError: if node is not in test mode
    """
    if self.test is False:
        raise RuntimeError(
            "Node is not in test mode. "
            "Please initialize with `enable_test_mode()`."
        )

    inputs = inputs or {}
    self.consumers = {
        dataset_name: FakeDatasetConsumer(
            dataset_name=dataset_name,
            node_name=self.__class__.__name__,
            values=values,
        )
        for dataset_name, values in inputs.items()
    }
    outputs = outputs or []
    outputs.extend(TESTING_NODE_CONFIG.get("DATASETS"))
    self.producers = {
        dataset_name: FakeDatasetProducer(
            dataset_name=dataset_name,
            node_name=self.__class__.__name__,
        )
        for dataset_name in outputs
    }
    self.params = params or {}