Skip to content

AbstractNode

The AbstractNode class serves as the base class for all user-defined nodes.

aineko.AbstractNode

AbstractNode(
    pipeline_name: str,
    node_name: Optional[str] = None,
    poison_pill: Optional[ActorHandle] = None,
    test: bool = False,
)

Bases: ABC

Node base class for all nodes in the pipeline.

Nodes are the building blocks of the pipeline and are responsible for executing the pipeline. Nodes are designed to be modular and can be combined to create a pipeline. The node base class provides helper methods for setting up the consumer and producer for a node. The execute method is a wrapper for the _execute method which is to be implemented by subclasses. The _execute method is where the node logic is implemented by the user.

Attributes:

Name Type Description
consumers dict

dict of DatasetConsumer objects for inputs to node

producers dict

dict of DatasetProducer objects for outputs of node

last_hearbeat float

timestamp of the last heartbeat

test bool

True if node is in test mode else False

log_levels tuple

tuple of allowed log levels

local_state dict

shared local state between nodes. Used for intra- pipeline communication without dataset dependency.

Methods:

Name Description
setup_datasets

setup the consumers and producers for a node

execute

execute the node, wrapper for _execute method

_execute

execute the node, to be implemented by subclasses

Initialize the node.

Source code in aineko/core/node.py
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
def __init__(
    self,
    pipeline_name: str,
    node_name: Optional[str] = None,
    poison_pill: Optional[ray.actor.ActorHandle] = None,
    test: bool = False,
) -> None:
    """Initialize the node."""
    self.name = node_name or self.__class__.__name__
    self.pipeline_name = pipeline_name
    self.last_heartbeat = time.time()
    self.consumers: Dict = {}
    self.producers: Dict = {}
    self.params: Dict = {}
    self.test = test
    self.log_levels = AINEKO_CONFIG.get("LOG_LEVELS")
    self.poison_pill = poison_pill

consumers instance-attribute

consumers: Dict = {}

last_heartbeat instance-attribute

last_heartbeat = time()

log_levels instance-attribute

log_levels = get('LOG_LEVELS')

name instance-attribute

name = node_name or __name__

params instance-attribute

params: Dict = {}

pipeline_name instance-attribute

pipeline_name = pipeline_name

poison_pill instance-attribute

poison_pill = poison_pill

producers instance-attribute

producers: Dict = {}

test instance-attribute

test = test

activate_poison_pill

activate_poison_pill() -> None

Activates poison pill, shutting down entire pipeline.

Source code in aineko/core/node.py
248
249
250
251
def activate_poison_pill(self) -> None:
    """Activates poison pill, shutting down entire pipeline."""
    if self.poison_pill:
        ray.get(self.poison_pill.activate.remote())

enable_test_mode

enable_test_mode() -> None

Enable test mode.

Source code in aineko/core/node.py
100
101
102
def enable_test_mode(self) -> None:
    """Enable test mode."""
    self.test = True

execute

execute(params: Optional[dict] = None) -> None

Execute the node.

Wrapper for _execute method to be implemented by subclasses.

Parameters:

Name Type Description Default
params Optional[dict]

Parameters to use to execute the node. Defaults to None.

None
Source code in aineko/core/node.py
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
def execute(self, params: Optional[dict] = None) -> None:
    """Execute the node.

    Wrapper for _execute method to be implemented by subclasses.

    Args:
        params: Parameters to use to execute the node. Defaults to None.
    """
    params = params or {}
    run_loop = True

    try:
        self._pre_loop_hook(params)
    except Exception:  # pylint: disable=broad-except
        self._log_traceback()
        raise

    while run_loop is not False:
        # Monitoring
        try:
            run_loop = self._execute(params)  # type: ignore
        except Exception:  # pylint: disable=broad-except
            self._log_traceback()
            raise

    self.log(f"Execution loop complete for node: {self.__class__.__name__}")
    self._post_loop_hook(params)

log

log(message: str, level: str = 'info') -> None

Log a message to the logging dataset.

Parameters:

Name Type Description Default
message str

Message to log

required
level str

Logging level. Defaults to "info". Options are: "info", "debug", "warning", "error", "critical"

'info'

Raises: ValueError: if invalid logging level is provided

Source code in aineko/core/node.py
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
def log(self, message: str, level: str = "info") -> None:
    """Log a message to the logging dataset.

    Args:
        message: Message to log
        level: Logging level. Defaults to "info". Options are:
            "info", "debug", "warning", "error", "critical"
    Raises:
        ValueError: if invalid logging level is provided
    """
    if level not in self.log_levels:
        raise ValueError(
            f"Invalid logging level {level}. Valid options are: "
            f"{', '.join(self.log_levels)}"
        )
    out_msg = {"log": message, "level": level}
    self.producers[DEFAULT_KAFKA_CONFIG.get("LOGGING_DATASET")].produce(
        out_msg
    )

run_test

run_test(runtime: Optional[int] = None) -> dict

Execute the node in testing mode.

Runs the steps in execute that involves the user defined methods. Includes pre_loop_hook, _execute, and post_loop_hook.

Parameters:

Name Type Description Default
runtime Optional[int]

Number of seconds to run the execute loop for.

None

Returns:

Name Type Description
dict dict

dataset names and values produced by the node.

Source code in aineko/core/node.py
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
def run_test(self, runtime: Optional[int] = None) -> dict:
    """Execute the node in testing mode.

    Runs the steps in execute that involves the user defined methods.
    Includes pre_loop_hook, _execute, and post_loop_hook.

    Args:
        runtime: Number of seconds to run the execute loop for.

    Returns:
        dict: dataset names and values produced by the node.
    """
    if self.test is False:
        raise RuntimeError(
            "Node is not in test mode. "
            "Please initialize with `enable_test_mode()`."
        )
    run_loop = True
    start_time = time.time()

    self._pre_loop_hook(self.params)
    while run_loop is not False:
        run_loop = self._execute(self.params)  # type: ignore

        # Do not end loop if runtime not exceeded
        if runtime is not None:
            if time.time() - start_time < runtime:
                continue

        # End loop if all consumers are empty
        if self.consumers and all(
            consumer.empty for consumer in self.consumers.values()
        ):
            run_loop = False

    self._post_loop_hook(self.params)

    return {
        dataset_name: producer.values
        for dataset_name, producer in self.producers.items()
    }

run_test_yield

run_test_yield(
    runtime: Optional[int] = None,
) -> Generator[Tuple[dict, dict, AbstractNode], None, None]

Execute the node in testing mode, yielding at each iteration.

This method is an alternative to run_test. Instead of returning the aggregated output, it yields the most recently consumed value, the produced value and the current node instance at each iteration. This is useful for testing nodes that either don't produce any output or if you need to test intermediate outputs. Testing state modifications is also possible using this method.

Parameters:

Name Type Description Default
runtime Optional[int]

Number of seconds to run the execute loop for.

None

Yields:

Type Description
dict

A tuple containing the most recent input value, output value and

dict

the node instance.

Example

for input, output, node_instance in sequencer.run_test_yield(): print(f"Input: {input}, Output: {output}) print(f"Node Instance: {node_instance}")

Source code in aineko/core/node.py
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
def run_test_yield(
    self, runtime: Optional[int] = None
) -> Generator[Tuple[dict, dict, "AbstractNode"], None, None]:
    """Execute the node in testing mode, yielding at each iteration.

    This method is an alternative to `run_test`. Instead of returning the
    aggregated output, it yields the most recently consumed value, the
    produced value and the current node instance at each iteration. This is
    useful for testing nodes that either don't produce any output or if you
    need to test intermediate outputs. Testing state modifications is also
    possible using this method.

    Args:
        runtime: Number of seconds to run the execute loop for.

    Yields:
        A tuple containing the most recent input value, output value and
        the node instance.

    Example:
        >>> for input, output, node_instance in sequencer.run_test_yield():
        >>>     print(f"Input: {input}, Output: {output})
        >>>     print(f"Node Instance: {node_instance}")
    """
    if self.test is False:
        raise RuntimeError(
            "Node is not in test mode. "
            "Please initialize with `enable_test_mode()`."
        )
    run_loop = True
    start_time = time.time()

    self._pre_loop_hook(self.params)
    while run_loop is not False:
        last_produced_values = {}
        last_consumed_values = {}

        # Capture last consumed values
        for dataset_name, consumer in self.consumers.items():
            if consumer.values:
                last_value = consumer.values[0]
                last_consumed_values[dataset_name] = last_value

        run_loop = self._execute(self.params)  # type: ignore

        # Do not end loop if runtime not exceeded
        if runtime is not None:
            if time.time() - start_time < runtime:
                continue

        # End loop if all consumers are empty
        if self.consumers and all(
            consumer.empty for consumer in self.consumers.values()
        ):
            run_loop = False

        # Capture last produced values
        for dataset_name, producer in self.producers.items():
            if producer.values:
                last_value = producer.values[-1]
                last_produced_values[dataset_name] = last_value

        yield (last_consumed_values, last_produced_values, self)

    self._post_loop_hook(self.params)

setup_datasets

setup_datasets(
    datasets: Dict[str, Dict],
    inputs: Optional[List[str]] = None,
    outputs: Optional[List[str]] = None,
    prefix: Optional[str] = None,
    has_pipeline_prefix: bool = False,
) -> None

Setup the consumer and producer for a node.

Parameters:

Name Type Description Default
datasets Dict[str, Dict]

dataset configuration

required
inputs Optional[List[str]]

list of dataset names for the inputs to the node

None
outputs Optional[List[str]]

list of dataset names for the outputs of the node

None
prefix Optional[str]

prefix for topic name (<prefix>.<dataset_name>)

None
has_pipeline_prefix bool

whether the dataset name has pipeline name prefix

False
Source code in aineko/core/node.py
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
def setup_datasets(
    self,
    datasets: Dict[str, Dict],
    inputs: Optional[List[str]] = None,
    outputs: Optional[List[str]] = None,
    prefix: Optional[str] = None,
    has_pipeline_prefix: bool = False,
) -> None:
    """Setup the consumer and producer for a node.

    Args:
        datasets: dataset configuration
        inputs: list of dataset names for the inputs to the node
        outputs: list of dataset names for the outputs of the node
        prefix: prefix for topic name (`<prefix>.<dataset_name>`)
        has_pipeline_prefix: whether the dataset name has pipeline name
            prefix
    """
    inputs = inputs or []
    self.consumers.update(
        {
            dataset_name: DatasetConsumer(
                dataset_name=dataset_name,
                node_name=self.name,
                pipeline_name=self.pipeline_name,
                dataset_config=datasets.get(dataset_name, {}),
                prefix=prefix,
                has_pipeline_prefix=has_pipeline_prefix,
            )
            for dataset_name in inputs
        }
    )

    outputs = outputs or []
    self.producers.update(
        {
            dataset_name: DatasetProducer(
                dataset_name=dataset_name,
                node_name=self.name,
                pipeline_name=self.pipeline_name,
                dataset_config=datasets.get(dataset_name, {}),
                prefix=prefix,
                has_pipeline_prefix=has_pipeline_prefix,
            )
            for dataset_name in outputs
        }
    )

setup_test

setup_test(
    inputs: Optional[dict] = None,
    outputs: Optional[list] = None,
    params: Optional[dict] = None,
) -> None

Setup the node for testing.

Parameters:

Name Type Description Default
inputs Optional[dict]

inputs to the node, format should be {"dataset": [1, 2, 3]}

None
outputs Optional[list]

outputs of the node, format should be ["dataset_1", "dataset_2", ...]

None
params Optional[dict]

dictionary of parameters to make accessible to _execute

None

Raises:

Type Description
RuntimeError

if node is not in test mode

Source code in aineko/core/node.py
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
def setup_test(
    self,
    inputs: Optional[dict] = None,
    outputs: Optional[list] = None,
    params: Optional[dict] = None,
) -> None:
    """Setup the node for testing.

    Args:
        inputs: inputs to the node, format should be {"dataset": [1, 2, 3]}
        outputs: outputs of the node, format should be ["dataset_1",
            "dataset_2", ...]
        params: dictionary of parameters to make accessible to _execute

    Raises:
        RuntimeError: if node is not in test mode
    """
    if self.test is False:
        raise RuntimeError(
            "Node is not in test mode. "
            "Please initialize with `enable_test_mode()`."
        )

    inputs = inputs or {}
    self.consumers = {
        dataset_name: FakeDatasetConsumer(
            dataset_name=dataset_name,
            node_name=self.__class__.__name__,
            values=values,
        )
        for dataset_name, values in inputs.items()
    }
    outputs = outputs or []
    outputs.extend(TESTING_NODE_CONFIG.get("DATASETS"))
    self.producers = {
        dataset_name: FakeDatasetProducer(
            dataset_name=dataset_name,
            node_name=self.__class__.__name__,
        )
        for dataset_name in outputs
    }
    self.params = params or {}