Skip to content

AbstractNode

The AbstractNode class serves as the base class for all user-defined nodes.

aineko.AbstractNode

AbstractNode(
    pipeline_name: str,
    node_name: Optional[str] = None,
    poison_pill: Optional[ActorHandle] = None,
    test: bool = False,
)

Bases: ABC

Node base class for all nodes in the pipeline.

Nodes are the building blocks of the pipeline and are responsible for executing the pipeline. Nodes are designed to be modular and can be combined to create a pipeline. The node base class provides helper methods for setting up the dataset inputs and outputs for a node. The execute method is a wrapper for the _execute method which is to be implemented by subclasses. The _execute method is where the node logic is implemented by the user.

Attributes:

Name Type Description
name str

name of the node

pipeline_name str

name of the pipeline

params dict

dict of parameters to be used by the node

inputs dict

dict of AbstractDataset objects for inputs that node can read / consume.

outputs dict

dict of AbstractDataset objects for outputs that node can write / produce.

last_hearbeat float

timestamp of the last heartbeat

test bool

True if node is in test mode else False

log_levels tuple

tuple of allowed log levels

logging_dataset str

name of the logging dataset

local_state dict

shared local state between nodes. Used for intra- pipeline communication without dataset dependency.

Methods:

Name Description
setup_datasets

setup the dataset query layer for a node

execute

execute the node, wrapper for _execute method

_execute

execute the node, to be implemented by subclasses

Initialize the node.

Source code in aineko/core/node.py
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
def __init__(
    self,
    pipeline_name: str,
    node_name: Optional[str] = None,
    poison_pill: Optional[ray.actor.ActorHandle] = None,
    test: bool = False,
) -> None:
    """Initialize the node."""
    self.name = node_name or self.__class__.__name__
    self.pipeline_name = pipeline_name
    self.last_heartbeat = time.time()
    self.inputs: dict = {}
    self.outputs: dict = {}
    self.params: Dict = {}
    self.test = test
    self.log_levels = AINEKO_CONFIG.get("LOG_LEVELS")
    self.logging_dataset: str = AINEKO_CONFIG.get("LOGGING_DATASET")["name"]
    self.poison_pill = poison_pill

inputs instance-attribute

inputs: dict = {}

last_heartbeat instance-attribute

last_heartbeat = time()

log_levels instance-attribute

log_levels = get('LOG_LEVELS')

logging_dataset instance-attribute

logging_dataset: str = get('LOGGING_DATASET')['name']

name instance-attribute

name = node_name or __name__

outputs instance-attribute

outputs: dict = {}

params instance-attribute

params: Dict = {}

pipeline_name instance-attribute

pipeline_name = pipeline_name

poison_pill instance-attribute

poison_pill = poison_pill

test instance-attribute

test = test

activate_poison_pill

activate_poison_pill() -> None

Activates poison pill, shutting down entire pipeline.

Source code in aineko/core/node.py
275
276
277
278
def activate_poison_pill(self) -> None:
    """Activates poison pill, shutting down entire pipeline."""
    if self.poison_pill:
        ray.get(self.poison_pill.activate.remote())

enable_test_mode

enable_test_mode() -> None

Enable test mode.

Source code in aineko/core/node.py
 99
100
101
def enable_test_mode(self) -> None:
    """Enable test mode."""
    self.test = True

execute

execute(params: Optional[dict] = None) -> None

Execute the node.

Wrapper for _execute method to be implemented by subclasses.

Parameters:

Name Type Description Default
params Optional[dict]

Parameters to use to execute the node. Defaults to None.

None
Source code in aineko/core/node.py
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
def execute(self, params: Optional[dict] = None) -> None:
    """Execute the node.

    Wrapper for _execute method to be implemented by subclasses.

    Args:
        params: Parameters to use to execute the node. Defaults to None.
    """
    params = params or {}
    run_loop = True

    try:
        self._pre_loop_hook(params)
    except Exception:  # pylint: disable=broad-except
        self._log_traceback()
        raise

    while run_loop is not False:
        # Monitoring
        try:
            run_loop = self._execute(params)  # type: ignore
        except Exception:  # pylint: disable=broad-except
            self._log_traceback()
            raise

    self.log(f"Execution loop complete for node: {self.__class__.__name__}")
    self._post_loop_hook(params)

log

log(message: str, level: str = 'info') -> None

Log a message to the logging dataset.

Parameters:

Name Type Description Default
message str

Message to log

required
level str

Logging level. Defaults to "info". Options are: "info", "debug", "warning", "error", "critical"

'info'

Raises: ValueError: if invalid logging level is provided

Source code in aineko/core/node.py
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
def log(self, message: str, level: str = "info") -> None:
    """Log a message to the logging dataset.

    Args:
        message: Message to log
        level: Logging level. Defaults to "info". Options are:
            "info", "debug", "warning", "error", "critical"
    Raises:
        ValueError: if invalid logging level is provided
    """
    if level not in self.log_levels:
        raise ValueError(
            f"Invalid logging level {level}. Valid options are: "
            f"{', '.join(self.log_levels)}"
        )
    out_msg = {"log": message, "level": level}

    self.outputs[self.logging_dataset].write(out_msg)

run_test

run_test(runtime: Optional[int] = None) -> dict

Execute the node in testing mode.

Runs the steps in execute that involves the user defined methods. Includes pre_loop_hook, _execute, and post_loop_hook.

Parameters:

Name Type Description Default
runtime Optional[int]

Number of seconds to run the execute loop for.

None

Returns:

Name Type Description
dict dict

dataset names and values produced by the node.

Source code in aineko/core/node.py
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
def run_test(self, runtime: Optional[int] = None) -> dict:
    """Execute the node in testing mode.

    Runs the steps in execute that involves the user defined methods.
    Includes pre_loop_hook, _execute, and post_loop_hook.

    Args:
        runtime: Number of seconds to run the execute loop for.

    Returns:
        dict: dataset names and values produced by the node.
    """
    # pylint: disable=protected-access
    if self.test is False:
        raise RuntimeError(
            "Node is not in test mode. "
            "Please initialize with `enable_test_mode()`."
        )
    run_loop = True
    start_time = time.time()

    self._pre_loop_hook(self.params)
    while run_loop is not False:
        run_loop = self._execute(self.params)  # type: ignore

        # Do not end loop if runtime not exceeded
        if runtime is not None:
            if time.time() - start_time < runtime:
                continue

        # End loop if all inputs are empty
        if self.inputs and all(
            input.test_is_empty() for input in self.inputs.values()
        ):
            run_loop = False

    self._post_loop_hook(self.params)

    return {
        dataset_name: output.get_test_output_values()
        for dataset_name, output in self.outputs.items()
    }

run_test_yield

run_test_yield(
    runtime: Optional[int] = None,
) -> Generator[Tuple[dict, dict, AbstractNode], None, None]

Execute the node in testing mode, yielding at each iteration.

This method is an alternative to run_test. Instead of returning the aggregated output, it yields the most recently read value, the written value and the current node instance at each iteration. This is useful for testing nodes that either don't produce any output or if you need to test intermediate outputs. Testing state modifications is also possible using this method.

Parameters:

Name Type Description Default
runtime Optional[int]

Number of seconds to run the execute loop for.

None

Yields:

Type Description
dict

A tuple containing the most recent input value, output value and

dict

the node instance.

Example

for input, output, node_instance in sequencer.run_test_yield(): print(f"Input: {input}, Output: {output}) print(f"Node Instance: {node_instance}")

Source code in aineko/core/node.py
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
def run_test_yield(
    self, runtime: Optional[int] = None
) -> Generator[Tuple[dict, dict, "AbstractNode"], None, None]:
    """Execute the node in testing mode, yielding at each iteration.

    This method is an alternative to `run_test`. Instead of returning the
    aggregated output, it yields the most recently read value, the
    written value and the current node instance at each iteration. This is
    useful for testing nodes that either don't produce any output or if you
    need to test intermediate outputs. Testing state modifications is also
    possible using this method.

    Args:
        runtime: Number of seconds to run the execute loop for.

    Yields:
        A tuple containing the most recent input value, output value and
        the node instance.

    Example:
        >>> for input, output, node_instance in sequencer.run_test_yield():
        >>>     print(f"Input: {input}, Output: {output})
        >>>     print(f"Node Instance: {node_instance}")
    """
    if self.test is False:
        raise RuntimeError(
            "Node is not in test mode. "
            "Please initialize with `enable_test_mode()`."
        )
    run_loop = True
    start_time = time.time()

    self._pre_loop_hook(self.params)
    while run_loop is not False:
        last_produced_values = {}
        last_consumed_values = {}

        # pylint: disable=protected-access
        # Capture last read values
        for dataset_name, input_dataset in self.inputs.items():
            test_input_values = input_dataset.get_test_input_values()
            if test_input_values:
                last_value = test_input_values[0]
                last_consumed_values[dataset_name] = last_value

        run_loop = self._execute(self.params)  # type: ignore

        # Do not end loop if runtime not exceeded
        if runtime is not None:
            if time.time() - start_time < runtime:
                continue

        # End loop if all inputs are empty
        if self.inputs and all(
            input.test_is_empty() for input in self.inputs.values()
        ):
            run_loop = False

        # Capture last produced values
        for dataset_name, output in self.outputs.items():
            test_output_values = output.get_test_output_values()
            if test_output_values:
                last_value = test_output_values[-1]
                last_produced_values[dataset_name] = last_value

        yield (last_consumed_values, last_produced_values, self)
    # pylint: enable=protected-access

    self._post_loop_hook(self.params)

setup_datasets

setup_datasets(
    datasets: Dict[str, Dict],
    inputs: Optional[List[str]] = None,
    outputs: Optional[List[str]] = None,
    prefix: Optional[str] = None,
    has_pipeline_prefix: bool = False,
) -> None

Setup the dataset inputs and outputs for a node.

Parameters:

Name Type Description Default
datasets Dict[str, Dict]

dataset configuration

required
inputs Optional[List[str]]

list of dataset names for the inputs to the node

None
outputs Optional[List[str]]

list of dataset names for the outputs of the node

None
prefix Optional[str]

prefix for topic name (<prefix>.<dataset_name>)

None
has_pipeline_prefix bool

whether the dataset name has pipeline name prefix

False
Source code in aineko/core/node.py
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
def setup_datasets(
    self,
    datasets: Dict[str, Dict],
    inputs: Optional[List[str]] = None,
    outputs: Optional[List[str]] = None,
    prefix: Optional[str] = None,
    has_pipeline_prefix: bool = False,
) -> None:
    """Setup the dataset inputs and outputs for a node.

    Args:
        datasets: dataset configuration
        inputs: list of dataset names for the inputs to the node
        outputs: list of dataset names for the outputs of the node
        prefix: prefix for topic name (`<prefix>.<dataset_name>`)
        has_pipeline_prefix: whether the dataset name has pipeline name
            prefix
    """
    inputs = inputs or []
    self.inputs.update(
        {
            dataset_name: AbstractDataset.from_config(
                name=dataset_name, config=datasets.get(dataset_name, {})
            )
            for dataset_name in inputs
        }
    )

    outputs = outputs or []
    self.outputs.update(
        {
            dataset_name: AbstractDataset.from_config(
                name=dataset_name, config=datasets.get(dataset_name, {})
            )
            for dataset_name in outputs
        }
    )

    for dataset_name in inputs:
        self.inputs[dataset_name].initialize(
            create="consumer",
            node_name=self.name,
            pipeline_name=self.pipeline_name,
            prefix=prefix,
            has_pipeline_prefix=has_pipeline_prefix,
        )

    for dataset_name in outputs:
        self.outputs[dataset_name].initialize(
            create="producer",
            node_name=self.name,
            pipeline_name=self.pipeline_name,
            prefix=prefix,
            has_pipeline_prefix=has_pipeline_prefix,
        )

setup_test

setup_test(
    dataset_type: str,
    inputs: Optional[dict] = None,
    outputs: Optional[List[str]] = None,
    params: Optional[dict] = None,
) -> None

Setup the node for testing.

Parameters:

Name Type Description Default
dataset_type str

type of dataset to use for testing (e.g. aineko.datasets.kafka.KafkaDataset)

required
inputs Optional[dict]

inputs to the node, format should be {"dataset": [1, 2, 3]}

None
outputs Optional[List[str]]

outputs of the node, format should be ["dataset_1", "dataset_2", ...]

None
params Optional[dict]

dictionary of parameters to make accessible to _execute

None

Raises:

Type Description
RuntimeError

if node is not in test mode

Source code in aineko/core/node.py
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
def setup_test(
    self,
    dataset_type: str,
    inputs: Optional[dict] = None,
    outputs: Optional[List[str]] = None,
    params: Optional[dict] = None,
) -> None:
    """Setup the node for testing.

    Args:
        dataset_type: type of dataset to use for testing (e.g.
            aineko.datasets.kafka.KafkaDataset)
        inputs: inputs to the node, format should be {"dataset": [1, 2, 3]}
        outputs: outputs of the node, format should be ["dataset_1",
            "dataset_2", ...]
        params: dictionary of parameters to make accessible to _execute

    Raises:
        RuntimeError: if node is not in test mode
    """
    if self.test is False:
        raise RuntimeError(
            "Node is not in test mode. "
            "Please initialize with `enable_test_mode()`."
        )

    inputs = inputs or {}

    self.inputs = {
        dataset_name: AbstractDataset.from_config(
            name=dataset_name,
            config={"type": dataset_type},
            test=True,
        )
        for dataset_name in inputs.keys()
    }
    for dataset_name, input_values in inputs.items():
        self.inputs[dataset_name].setup_test_mode(
            source_node=self.name,
            source_pipeline=self.pipeline_name,
            input_values=input_values,
        )

    outputs = outputs or []
    internal_dataset_names = [
        dataset["name"] for dataset in TESTING_NODE_CONFIG.get("DATASETS")
    ]
    outputs.extend(internal_dataset_names)

    self.outputs = {
        dataset_name: AbstractDataset.from_config(
            name=dataset_name,
            config={"type": dataset_type},
            test=True,
        )
        for dataset_name in outputs
    }
    for dataset_name in outputs:
        self.outputs[dataset_name].setup_test_mode(
            source_node=self.name,
            source_pipeline=self.pipeline_name,
        )
    self.params = params or {}