Skip to content

Runner

The Runner builds the pipeline from the given pipeline configuration file, creating the necessary datasets and orchestrating the nodes.

aineko.Runner

Runner(
    pipeline_config_file: str,
    pipeline_name: Optional[str] = None,
    metrics_export_port: int = AINEKO_CONFIG.get(
        "RAY_METRICS_PORT"
    ),
    dataset_prefix: Optional[str] = None,
)

Runs the pipeline described in the config.

Parameters:

Name Type Description Default
pipeline_config_file str

Path to pipeline config file

required
pipeline_name str

Name of the pipeline

None
dataset_prefix Optional[str]

Prefix for dataset names.

None

Attributes:

Name Type Description
pipeline_config_file str

Path to pipeline config file

pipeline_name str

Name of the pipeline, overrides pipeline config

pipeline_name str

Name of the pipeline, loaded from config

dataset_prefix Optional[str]

Prefix for dataset names

Initializes the runner class.

Source code in aineko/core/runner.py
36
37
38
39
40
41
42
43
44
45
46
47
def __init__(
    self,
    pipeline_config_file: str,
    pipeline_name: Optional[str] = None,
    metrics_export_port: int = AINEKO_CONFIG.get("RAY_METRICS_PORT"),
    dataset_prefix: Optional[str] = None,
):
    """Initializes the runner class."""
    self.pipeline_config_file = pipeline_config_file
    self.metrics_export_port = metrics_export_port
    self.pipeline_name = pipeline_name
    self.dataset_prefix = dataset_prefix or ""

dataset_prefix instance-attribute

dataset_prefix = dataset_prefix or ''

metrics_export_port instance-attribute

metrics_export_port = metrics_export_port

pipeline_config_file instance-attribute

pipeline_config_file = pipeline_config_file

pipeline_name instance-attribute

pipeline_name = pipeline_name

load_pipeline_config

load_pipeline_config() -> Pipeline

Loads the config for a given pipeline.

Returns:

Type Description
Pipeline

pipeline config

Source code in aineko/core/runner.py
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
def load_pipeline_config(self) -> Config.Pipeline:
    """Loads the config for a given pipeline.

    Returns:
        pipeline config
    """
    config = ConfigLoader(
        pipeline_config_file=self.pipeline_config_file,
    ).load_config()

    return config.pipeline

prepare_datasets

prepare_datasets(
    config: Dict, user_dataset_prefix: Optional[str] = None
) -> List[AbstractDataset]

Creates the required datasets for a given pipeline.

Parameters:

Name Type Description Default
config Dict

dataset configuration found in pipeline config Should follow the schema below:

    {
        "dataset_name": {
            "type": str ("aineko.datasets.kafka.KafkaDataset"),
            "location": str ("localhost:9092"),
            "params": dict
    }

required
user_dataset_prefix Optional[str]

prefix only for datasets defined by the user. i.e. <prefix>.<user_dataset_prefix>.<dataset_name>

None

Returns:

Type Description
List[AbstractDataset]

True if successful

Raises:

Type Description
ValueError

if dataset "logging" is defined in the catalog

Source code in aineko/core/runner.py
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
def prepare_datasets(
    self, config: Dict, user_dataset_prefix: Optional[str] = None
) -> List[AbstractDataset]:
    """Creates the required datasets for a given pipeline.

    Args:
        config: dataset configuration found in pipeline config
            Should follow the schema below:
            ```python
                {
                    "dataset_name": {
                        "type": str ("aineko.datasets.kafka.KafkaDataset"),
                        "location": str ("localhost:9092"),
                        "params": dict
                }
            ```
        user_dataset_prefix: prefix only for datasets defined by the user.
            i.e. `<prefix>.<user_dataset_prefix>.<dataset_name>`

    Returns:
        True if successful

    Raises:
        ValueError: if dataset "logging" is defined in the catalog
    """
    # Create all configured dataset objects
    datasets = []
    if user_dataset_prefix:
        config = {
            f"{user_dataset_prefix}.{dataset_name}": dataset_config
            for dataset_name, dataset_config in config.items()
        }

    internal_dataset_names = [
        dataset["name"]
        for dataset in AINEKO_CONFIG.get("INTERNAL_DATASETS")
    ]
    for reserved_dataset in internal_dataset_names:
        if reserved_dataset in config:
            raise ValueError(
                f"Unable to create dataset `{reserved_dataset}`. "
                "Reserved for internal use."
            )

    for dataset_name, dataset_config in config.items():
        logger.info(
            "Creating dataset: %s: %s", dataset_name, dataset_config
        )
        # update dataset config here:
        dataset: AbstractDataset = AbstractDataset.from_config(
            dataset_name, dataset_config
        )
        datasets.append(dataset)

    # Create logging dataset
    logging_dataset_config = AINEKO_CONFIG.get("LOGGING_DATASET")

    logging_dataset_name = logging_dataset_config["name"]
    logging_config: DatasetConfig = logging_dataset_config["params"]

    logger.info(
        "Creating dataset: %s: %s", logging_dataset_name, logging_config
    )
    logging_dataset: AbstractDataset = AbstractDataset.from_config(
        logging_dataset_name,
        logging_config.model_dump(exclude_none=True),
    )
    # Create all datasets
    dataset_creation_statuses = [
        dataset.create(dataset_prefix=self.dataset_prefix)
        for dataset in datasets
    ]
    logging_create_status = logging_dataset.create()
    datasets.append(logging_dataset)

    dataset_creation_statuses.append(logging_create_status)
    cur_time = time.time()
    while True:
        if all(
            dataset_creation.done()
            for dataset_creation in dataset_creation_statuses
        ):
            logger.info("All datasets created.")
            break
        if time.time() - cur_time > AINEKO_CONFIG.get(
            "DATASET_CREATION_TIMEOUT"
        ):
            raise TimeoutError("Timeout while creating datasets.")
    return datasets

prepare_nodes

prepare_nodes(
    pipeline_config: Pipeline, poison_pill: ActorHandle
) -> list

Prepare actor handles for all nodes.

Parameters:

Name Type Description Default
pipeline_config Pipeline

pipeline configuration

required

Returns:

Name Type Description
dict list

mapping of node names to actor handles

list list

list of ray objects

Raises:

Type Description
ValueError

if error occurs while initializing actor from config

Source code in aineko/core/runner.py
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
def prepare_nodes(
    self,
    pipeline_config: Config.Pipeline,
    poison_pill: ray.actor.ActorHandle,
) -> list:
    """Prepare actor handles for all nodes.

    Args:
        pipeline_config: pipeline configuration

    Returns:
        dict: mapping of node names to actor handles
        list: list of ray objects

    Raises:
        ValueError: if error occurs while initializing actor from config
    """
    # Collect all  actor futures
    results = []
    for node_name, node_config in pipeline_config.nodes.items():
        try:
            target_class = imports.import_from_string(
                attr=node_config.class_name, kind="class"
            )
        except AttributeError as exc:
            raise ValueError(
                "Invalid node class name specified in config for node '"
                f"{node_name}'. Please check your config file at: "
                f"{self.pipeline_config_file}\n"
                f"Error: {exc}"
            ) from None

        actor_params = {
            "name": node_name,
            "namespace": self.pipeline_name,
        }
        if pipeline_config.default_node_settings:
            actor_params.update(
                pipeline_config.default_node_settings.model_dump(
                    exclude_none=True
                )
            )
        # Update actor params with node specific settings to override
        # default settings
        if node_config.node_settings:
            actor_params.update(
                node_config.node_settings.model_dump(exclude_none=True)
            )

        wrapped_class = ray.remote(target_class)
        wrapped_class.options(**actor_params)
        actor_handle = wrapped_class.remote(
            node_name=node_name,
            pipeline_name=self.pipeline_name,
            poison_pill=poison_pill,
        )

        # Setup input and output datasets
        actor_handle.setup_datasets.remote(
            inputs=node_config.inputs,
            outputs=node_config.outputs,
            datasets=pipeline_config.datasets,
            has_pipeline_prefix=True,
        )

        # Setup internal datasets like logging, without pipeline prefix
        logging_dataset_config = AINEKO_CONFIG.get("LOGGING_DATASET")

        logging_dataset_name = logging_dataset_config["name"]
        logging_config: DatasetConfig = logging_dataset_config["params"]

        actor_handle.setup_datasets.remote(
            outputs=[
                dataset["name"]
                for dataset in AINEKO_CONFIG.get("INTERNAL_DATASETS")
            ],
            datasets={
                logging_dataset_name: logging_config.model_dump(
                    exclude_none=True
                )
            },
        )

        # Create actor future (for execute method)
        results.append(
            actor_handle.execute.remote(params=node_config.node_params)
        )
    return results

run

run() -> None

Runs the pipeline.

Step 1: Load config for pipeline

Step 2: Set up datasets

Step 3: Set up PoisonPill node that is available to all nodes

Step 4: Set up nodes (including Node Manager)

Source code in aineko/core/runner.py
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
def run(self) -> None:
    """Runs the pipeline.

    Step 1: Load config for pipeline

    Step 2: Set up datasets

    Step 3: Set up PoisonPill node that is available to all nodes

    Step 4: Set up nodes (including Node Manager)
    """
    # Load pipeline config
    pipeline_config = self.load_pipeline_config()
    self.pipeline_name = self.pipeline_name or pipeline_config.name

    # Create the necessary datasets
    self.prepare_datasets(
        config=pipeline_config.datasets,
        user_dataset_prefix=self.pipeline_name,
    )

    # Initialize ray cluster
    ray.shutdown()
    ray.init(
        namespace=self.pipeline_name,
        _metrics_export_port=self.metrics_export_port,
    )

    # Create poison pill actor
    poison_pill = ray.remote(PoisonPill).remote()

    # Add Node Manager to pipeline config
    pipeline_config.nodes[
        NODE_MANAGER_CONFIG.get("NAME")
    ] = Config.Pipeline.Node(**NODE_MANAGER_CONFIG.get("NODE_CONFIG"))

    # Create each node (actor)
    results = self.prepare_nodes(
        pipeline_config=pipeline_config,
        poison_pill=poison_pill,  # type: ignore
    )

    ray.get(results)