Skip to content

Runner

The Runner builds the pipeline from the given pipeline configuration file, creating the necessary datasets and orchestrating the nodes.

aineko.Runner

Runner(
    pipeline_config_file: str,
    pipeline_name: Optional[str] = None,
    kafka_config: dict = DEFAULT_KAFKA_CONFIG.get(
        "BROKER_CONFIG"
    ),
    metrics_export_port: int = AINEKO_CONFIG.get(
        "RAY_METRICS_PORT"
    ),
    dataset_prefix: Optional[str] = None,
)

Runs the pipeline described in the config.

Parameters:

Name Type Description Default
pipeline_config_file str

Path to pipeline config file

required
pipeline_name str

Name of the pipeline

None
kafka_config dict

Config for kafka broker

get('BROKER_CONFIG')
dataset_prefix Optional[str]

Prefix for dataset names. Kafka topics will be called <prefix>.<pipeline>.<dataset_name>.

None

Attributes:

Name Type Description
pipeline_config_file str

Path to pipeline config file

pipeline_name str

Name of the pipeline, overrides pipeline config

kafka_config dict

Config for kafka broker

pipeline_name str

Name of the pipeline, loaded from config

dataset_prefix Optional[str]

Prefix for dataset names

Initializes the runner class.

Source code in aineko/core/runner.py
42
43
44
45
46
47
48
49
50
51
52
53
54
55
def __init__(
    self,
    pipeline_config_file: str,
    pipeline_name: Optional[str] = None,
    kafka_config: dict = DEFAULT_KAFKA_CONFIG.get("BROKER_CONFIG"),
    metrics_export_port: int = AINEKO_CONFIG.get("RAY_METRICS_PORT"),
    dataset_prefix: Optional[str] = None,
):
    """Initializes the runner class."""
    self.pipeline_config_file = pipeline_config_file
    self.kafka_config = kafka_config
    self.metrics_export_port = metrics_export_port
    self.pipeline_name = pipeline_name
    self.dataset_prefix = dataset_prefix or ""

dataset_prefix instance-attribute

dataset_prefix = dataset_prefix or ''

kafka_config instance-attribute

kafka_config = kafka_config

metrics_export_port instance-attribute

metrics_export_port = metrics_export_port

pipeline_config_file instance-attribute

pipeline_config_file = pipeline_config_file

pipeline_name instance-attribute

pipeline_name = pipeline_name

load_pipeline_config

load_pipeline_config() -> Pipeline

Loads the config for a given pipeline.

Returns:

Type Description
Pipeline

pipeline config

Source code in aineko/core/runner.py
101
102
103
104
105
106
107
108
109
110
111
def load_pipeline_config(self) -> Config.Pipeline:
    """Loads the config for a given pipeline.

    Returns:
        pipeline config
    """
    config = ConfigLoader(
        pipeline_config_file=self.pipeline_config_file,
    ).load_config()

    return config.pipeline

prepare_datasets

prepare_datasets(
    config: Dict[str, Dataset],
    user_dataset_prefix: Optional[str] = None,
) -> bool

Creates the required datasets for a given pipeline.

Datasets can be configured using the params key, using config keys found in: https://kafka.apache.org/documentation.html#topicconfigs

Parameters:

Name Type Description Default
config Dict[str, Dataset]

dataset configuration found in pipeline config Should follow the schema below:

    {
        "dataset_name": {
            "type": str ("kafka_stream"),
            "params": dict
    }

required
user_dataset_prefix Optional[str]

prefix only for datasets defined by the user. i.e. <prefix>.<user_dataset_prefix>.<dataset_name>

None

Returns:

Type Description
bool

True if successful

Raises:

Type Description
ValueError

if dataset "logging" is defined in the catalog

Source code in aineko/core/runner.py
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
def prepare_datasets(
    self,
    config: Dict[str, Config.Pipeline.Dataset],
    user_dataset_prefix: Optional[str] = None,
) -> bool:
    """Creates the required datasets for a given pipeline.

    Datasets can be configured using the `params` key, using config keys
    found in: https://kafka.apache.org/documentation.html#topicconfigs

    Args:
        config: dataset configuration found in pipeline config
            Should follow the schema below:
            ```python
                {
                    "dataset_name": {
                        "type": str ("kafka_stream"),
                        "params": dict
                }
            ```
        user_dataset_prefix: prefix only for datasets defined by the user.
            i.e. `<prefix>.<user_dataset_prefix>.<dataset_name>`

    Returns:
        True if successful

    Raises:
        ValueError: if dataset "logging" is defined in the catalog
    """
    # Connect to kafka cluster
    kafka_client = AdminClient(self.kafka_config)

    # Add prefix to user defined datasets
    if user_dataset_prefix:
        config = {
            f"{user_dataset_prefix}.{dataset_name}": dataset_config
            for dataset_name, dataset_config in config.items()
        }

    # Fail if reserved dataset names are defined in catalog
    for reserved_dataset in DEFAULT_KAFKA_CONFIG.get("DATASETS"):
        if reserved_dataset in config:
            raise ValueError(
                f"Unable to create dataset `{reserved_dataset}`. "
                "Reserved for internal use."
            )

    # Add logging dataset to catalog
    config[
        DEFAULT_KAFKA_CONFIG.get("LOGGING_DATASET")
    ] = Config.Pipeline.Dataset(
        type=AINEKO_CONFIG.get("KAFKA_STREAM_TYPE"),
        params=DEFAULT_KAFKA_CONFIG.get("DATASET_PARAMS"),
    )

    # Create all dataset defined in the catalog
    dataset_list = []
    for dataset_name, dataset_config in config.items():
        logger.info(
            "Creating dataset: %s: %s", dataset_name, dataset_config
        )
        # Create dataset for kafka streams
        if dataset_config.type == AINEKO_CONFIG.get("KAFKA_STREAM_TYPE"):
            # User defined
            if not dataset_config.params:
                dataset_config.params = {}

            dataset_params = dict(
                DEFAULT_KAFKA_CONFIG.get("DATASET_PARAMS")
            )
            dataset_params["config"].update(dataset_config.params)

            # Configure dataset
            if self.dataset_prefix:
                topic_name = f"{self.dataset_prefix}.{dataset_name}"
            else:
                topic_name = dataset_name

            new_dataset = NewTopic(
                topic=topic_name,
                num_partitions=dataset_params.get("num_partitions"),
                replication_factor=dataset_params.get("replication_factor"),
                config=dataset_params.get("config"),
            )

            # Add dataset to appropriate list
            dataset_list.append(new_dataset)

        else:
            raise ValueError(
                "Unknown dataset type. Expected: "
                f"{AINEKO_CONFIG.get('STREAM_TYPES')}."
            )

    # Create all configured datasets
    datasets = kafka_client.create_topics(dataset_list)

    # Block until all datasets finish creation
    cur_time = time.time()
    while True:
        if all(future.done() for future in datasets.values()):
            logger.info("All datasets created.")
            break
        if time.time() - cur_time > AINEKO_CONFIG.get(
            "DATASET_CREATION_TIMEOUT"
        ):
            raise TimeoutError(
                "Timeout while creating Kafka datasets. "
                "Please check your Kafka cluster."
            )

    return datasets

prepare_nodes

prepare_nodes(
    pipeline_config: Pipeline, poison_pill: ActorHandle
) -> list

Prepare actor handles for all nodes.

Parameters:

Name Type Description Default
pipeline_config Pipeline

pipeline configuration

required

Returns:

Name Type Description
dict list

mapping of node names to actor handles

list list

list of ray objects

Raises:

Type Description
ValueError

if error occurs while initializing actor from config

Source code in aineko/core/runner.py
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
def prepare_nodes(
    self,
    pipeline_config: Config.Pipeline,
    poison_pill: ray.actor.ActorHandle,
) -> list:
    """Prepare actor handles for all nodes.

    Args:
        pipeline_config: pipeline configuration

    Returns:
        dict: mapping of node names to actor handles
        list: list of ray objects

    Raises:
        ValueError: if error occurs while initializing actor from config
    """
    # Collect all  actor futures
    results = []
    if pipeline_config.default_node_settings:
        default_node_config = pipeline_config.default_node_settings
    else:
        default_node_config = {}

    for node_name, node_config in pipeline_config.nodes.items():
        if node_config.node_settings:
            node_settings = node_config.node_settings
        else:
            node_settings = {}
        # Initialize actor from specified class in config
        try:
            target_class = imports.import_from_string(
                attr=node_config.class_name, kind="class"
            )
        except AttributeError as exc:
            raise ValueError(
                "Invalid node class name specified in config for node '"
                f"{node_name}'. Please check your config file at: "
                f"{self.pipeline_config_file}\n"
                f"Error: {exc}"
            ) from None

        actor_params = {
            **default_node_config,
            **node_settings,
            "name": node_name,
            "namespace": self.pipeline_name,
        }

        wrapped_class = ray.remote(target_class)
        wrapped_class.options(**actor_params)
        actor_handle = wrapped_class.remote(
            node_name=node_name,
            pipeline_name=self.pipeline_name,
            poison_pill=poison_pill,
        )

        if node_config.outputs:
            outputs = node_config.outputs
        else:
            outputs = []

        if node_config.inputs:
            inputs = node_config.inputs
        else:
            inputs = None

        datasets = {}
        for (
            dataset_name,
            dataset_config,
        ) in pipeline_config.datasets.items():
            datasets[dataset_name] = dataset_config.model_dump(
                exclude_none=True
            )

        # Setup input and output datasets
        actor_handle.setup_datasets.remote(
            inputs=inputs,
            outputs=outputs,
            datasets=datasets,
            has_pipeline_prefix=True,
        )

        # Setup internal datasets like logging, without pipeline prefix
        actor_handle.setup_datasets.remote(
            outputs=DEFAULT_KAFKA_CONFIG.get("DATASETS"),
            datasets=datasets,
        )

        # Create actor future (for execute method)
        results.append(
            actor_handle.execute.remote(params=node_config.node_params)
        )

    return results

run

run() -> None

Runs the pipeline.

Step 1: Load config for pipeline

Step 2: Set up datasets

Step 3: Set up PoisonPill node that is available to all nodes

Step 4: Set up nodes (including Node Manager)

Source code in aineko/core/runner.py
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
def run(self) -> None:
    """Runs the pipeline.

    Step 1: Load config for pipeline

    Step 2: Set up datasets

    Step 3: Set up PoisonPill node that is available to all nodes

    Step 4: Set up nodes (including Node Manager)
    """
    # Load pipeline config
    pipeline_config = self.load_pipeline_config()
    self.pipeline_name = self.pipeline_name or pipeline_config.name

    # Create the necessary datasets
    self.prepare_datasets(
        config=pipeline_config.datasets,
        user_dataset_prefix=self.pipeline_name,
    )

    # Initialize ray cluster
    ray.shutdown()
    ray.init(
        namespace=self.pipeline_name,
        _metrics_export_port=self.metrics_export_port,
    )

    # Create poison pill actor
    poison_pill = ray.remote(PoisonPill).remote()

    # Add Node Manager to pipeline config
    pipeline_config.nodes[
        NODE_MANAGER_CONFIG.get("NAME")
    ] = Config.Pipeline.Node(**NODE_MANAGER_CONFIG.get("NODE_CONFIG"))

    # Create each node (actor)
    results = self.prepare_nodes(
        pipeline_config=pipeline_config,
        poison_pill=poison_pill,  # type: ignore
    )

    ray.get(results)