Skip to content

Runner

The Runner builds the pipeline from the given pipeline configuration file, creating the necessary datasets and orchestrating the nodes.

aineko.Runner

Runs the pipeline described in the config.

Parameters:

Name Type Description Default
pipeline_config_file str

Path to pipeline config file

required
pipeline_name str

Name of the pipeline

None
kafka_config dict

Config for kafka broker

get('BROKER_CONFIG')
dataset_prefix Optional[str]

Prefix for dataset names.

None

Attributes:

Name Type Description
pipeline_config_file str

Path to pipeline config file

pipeline_name str

Name of the pipeline, overrides pipeline config

kafka_config dict

Config for kafka broker

pipeline_name str

Name of the pipeline, loaded from config

dataset_prefix Optional[str]

Prefix for dataset names

Source code in aineko/core/runner.py
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
class Runner:
    """Runs the pipeline described in the config.

    Args:
        pipeline_config_file (str): Path to pipeline config file
        pipeline_name (str): Name of the pipeline
        kafka_config (dict): Config for kafka broker
        dataset_prefix (Optional[str]): Prefix for dataset names.
        Kafka topics will be called <prefix>.<pipeline>.<dataset_name>.

    Attributes:
        pipeline_config_file (str): Path to pipeline config file
        pipeline_name (str): Name of the pipeline, overrides pipeline config
        kafka_config (dict): Config for kafka broker
        pipeline_name (str): Name of the pipeline, loaded from config
        dataset_prefix (Optional[str]): Prefix for dataset names
    """

    def __init__(
        self,
        pipeline_config_file: str,
        pipeline_name: Optional[str] = None,
        kafka_config: dict = DEFAULT_KAFKA_CONFIG.get("BROKER_CONFIG"),
        metrics_export_port: int = AINEKO_CONFIG.get("RAY_METRICS_PORT"),
        dataset_prefix: Optional[str] = None,
    ):
        """Initializes the runner class."""
        self.pipeline_config_file = pipeline_config_file
        self.kafka_config = kafka_config
        self.metrics_export_port = metrics_export_port
        self.pipeline_name = pipeline_name
        self.dataset_prefix = dataset_prefix or ""

    def run(self) -> None:
        """Runs the pipeline.

        Step 1: Load config for pipeline

        Step 2: Set up datasets

        Step 3: Set up PoisonPill node that is available to all nodes

        Step 4: Set up nodes (including Node Manager)
        """
        # Load pipeline config
        pipeline_config = self.load_pipeline_config()
        self.pipeline_name = self.pipeline_name or pipeline_config["name"]

        # Create the necessary datasets
        self.prepare_datasets(
            config=pipeline_config["datasets"],
            user_dataset_prefix=self.pipeline_name,
        )

        # Initialize ray cluster
        ray.shutdown()
        ray.init(
            namespace=self.pipeline_name,
            _metrics_export_port=self.metrics_export_port,
        )

        # Create poison pill actor
        poison_pill = ray.remote(PoisonPill).remote()

        # Add Node Manager to pipeline config
        pipeline_config["nodes"][
            NODE_MANAGER_CONFIG.get("NAME")
        ] = NODE_MANAGER_CONFIG.get("NODE_CONFIG")

        # Create each node (actor)
        results = self.prepare_nodes(
            pipeline_config=pipeline_config,
            poison_pill=poison_pill,  # type: ignore
        )

        ray.get(results)

    def load_pipeline_config(self) -> dict:
        """Loads the config for a given pipeline.

        Returns:
            pipeline config
        """
        config = ConfigLoader(
            pipeline_config_file=self.pipeline_config_file,
        ).load_config()

        return config["pipeline"]

    def prepare_datasets(
        self, config: dict, user_dataset_prefix: Optional[str] = None
    ) -> bool:
        """Creates the required datasets for a given pipeline.

        Datasets can be configured using the `params` key, using config keys
        found in: https://kafka.apache.org/documentation.html#topicconfigs

        Args:
            config: dataset configuration found in pipeline config
            Should follow the schema
                {
                    "dataset_name": {
                        "type": str ("kafka_stream"),
                        "params": dict
                }
            user_dataset_prefix: prefix only for datasets defined by the user.
            i.e. <prefix>.<user_dataset_prefix>.<dataset_name>

        Returns:
            True if successful

        Raises:
            ValueError: if dataset "logging" is defined in the catalog
        """
        # Connect to kafka cluster
        kafka_client = AdminClient(self.kafka_config)

        # Add prefix to user defined datasets
        if user_dataset_prefix:
            config = {
                f"{user_dataset_prefix}.{dataset_name}": dataset_config
                for dataset_name, dataset_config in config.items()
            }

        # Fail if reserved dataset names are defined in catalog
        for reserved_dataset in DEFAULT_KAFKA_CONFIG.get("DATASETS"):
            if reserved_dataset in config:
                raise ValueError(
                    f"Unable to create dataset `{reserved_dataset}`. "
                    "Reserved for internal use."
                )

        # Add logging dataset to catalog
        config[DEFAULT_KAFKA_CONFIG.get("LOGGING_DATASET")] = {
            "type": AINEKO_CONFIG.get("KAFKA_STREAM_TYPE"),
            "params": DEFAULT_KAFKA_CONFIG.get("DATASET_PARAMS"),
        }

        # Create all dataset defined in the catalog
        dataset_list = []
        for dataset_name, dataset_config in config.items():
            logger.info(
                "Creating dataset: %s: %s", dataset_name, dataset_config
            )
            # Create dataset for kafka streams
            if dataset_config["type"] == AINEKO_CONFIG.get("KAFKA_STREAM_TYPE"):
                # User defined
                dataset_params = {
                    **DEFAULT_KAFKA_CONFIG.get("DATASET_PARAMS"),
                    **dataset_config.get("params", {}),
                }

                # Configure dataset
                if self.dataset_prefix:
                    topic_name = f"{self.dataset_prefix}.{dataset_name}"
                else:
                    topic_name = dataset_name

                new_dataset = NewTopic(
                    topic=topic_name,
                    num_partitions=dataset_params.get("num_partitions"),
                    replication_factor=dataset_params.get("replication_factor"),
                    config=dataset_params.get("config"),
                )

                # Add dataset to appropriate list
                dataset_list.append(new_dataset)

            else:
                raise ValueError(
                    "Unknown dataset type. Expected: "
                    f"{AINEKO_CONFIG.get('STREAM_TYPES')}."
                )

        # Create all configured datasets
        datasets = kafka_client.create_topics(dataset_list)

        # Block until all datasets finish creation
        cur_time = time.time()
        while True:
            if all(future.done() for future in datasets.values()):
                logger.info("All datasets created.")
                break
            if time.time() - cur_time > AINEKO_CONFIG.get(
                "DATASET_CREATION_TIMEOUT"
            ):
                raise TimeoutError(
                    "Timeout while creating Kafka datasets. "
                    "Please check your Kafka cluster."
                )

        return datasets

    def prepare_nodes(
        self, pipeline_config: dict, poison_pill: ray.actor.ActorHandle
    ) -> list:
        """Prepare actor handles for all nodes.

        Args:
            pipeline_config: pipeline configuration

        Returns:
            dict: mapping of node names to actor handles
            list: list of ray objects
        """
        # Collect all  actor futures
        results = []

        default_node_config = pipeline_config.get("default_node_settings", {})

        for node_name, node_config in pipeline_config["nodes"].items():
            # Initialize actor from specified class in config
            target_class = imports.import_from_string(
                attr=node_config["class"], kind="class"
            )
            actor_params = {
                **default_node_config,
                **node_config.get("node_settings", {}),
                "name": node_name,
                "namespace": self.pipeline_name,
            }

            wrapped_class = ray.remote(target_class)
            wrapped_class.options(**actor_params)
            actor_handle = wrapped_class.remote(
                node_name=node_name,
                pipeline_name=self.pipeline_name,
                poison_pill=poison_pill,
            )

            # Setup input and output datasets
            outputs = node_config.get("outputs", [])
            actor_handle.setup_datasets.remote(
                inputs=node_config.get("inputs", None),
                outputs=outputs,
                datasets=pipeline_config["datasets"],
                has_pipeline_prefix=True,
            )

            # Setup internal datasets like logging, without pipeline prefix
            actor_handle.setup_datasets.remote(
                outputs=DEFAULT_KAFKA_CONFIG.get("DATASETS"),
                datasets=pipeline_config["datasets"],
            )

            # Create actor future (for execute method)
            results.append(
                actor_handle.execute.remote(
                    params=node_config.get("node_params", None)
                )
            )

        return results

__init__(pipeline_config_file, pipeline_name=None, kafka_config=DEFAULT_KAFKA_CONFIG.get('BROKER_CONFIG'), metrics_export_port=AINEKO_CONFIG.get('RAY_METRICS_PORT'), dataset_prefix=None)

Initializes the runner class.

Source code in aineko/core/runner.py
41
42
43
44
45
46
47
48
49
50
51
52
53
54
def __init__(
    self,
    pipeline_config_file: str,
    pipeline_name: Optional[str] = None,
    kafka_config: dict = DEFAULT_KAFKA_CONFIG.get("BROKER_CONFIG"),
    metrics_export_port: int = AINEKO_CONFIG.get("RAY_METRICS_PORT"),
    dataset_prefix: Optional[str] = None,
):
    """Initializes the runner class."""
    self.pipeline_config_file = pipeline_config_file
    self.kafka_config = kafka_config
    self.metrics_export_port = metrics_export_port
    self.pipeline_name = pipeline_name
    self.dataset_prefix = dataset_prefix or ""

load_pipeline_config()

Loads the config for a given pipeline.

Returns:

Type Description
dict

pipeline config

Source code in aineko/core/runner.py
100
101
102
103
104
105
106
107
108
109
110
def load_pipeline_config(self) -> dict:
    """Loads the config for a given pipeline.

    Returns:
        pipeline config
    """
    config = ConfigLoader(
        pipeline_config_file=self.pipeline_config_file,
    ).load_config()

    return config["pipeline"]

prepare_datasets(config, user_dataset_prefix=None)

Creates the required datasets for a given pipeline.

Datasets can be configured using the params key, using config keys found in: https://kafka.apache.org/documentation.html#topicconfigs

Parameters:

Name Type Description Default
config dict

dataset configuration found in pipeline config

required
user_dataset_prefix Optional[str]

prefix only for datasets defined by the user.

None

Returns:

Type Description
bool

True if successful

Raises:

Type Description
ValueError

if dataset "logging" is defined in the catalog

Source code in aineko/core/runner.py
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
def prepare_datasets(
    self, config: dict, user_dataset_prefix: Optional[str] = None
) -> bool:
    """Creates the required datasets for a given pipeline.

    Datasets can be configured using the `params` key, using config keys
    found in: https://kafka.apache.org/documentation.html#topicconfigs

    Args:
        config: dataset configuration found in pipeline config
        Should follow the schema
            {
                "dataset_name": {
                    "type": str ("kafka_stream"),
                    "params": dict
            }
        user_dataset_prefix: prefix only for datasets defined by the user.
        i.e. <prefix>.<user_dataset_prefix>.<dataset_name>

    Returns:
        True if successful

    Raises:
        ValueError: if dataset "logging" is defined in the catalog
    """
    # Connect to kafka cluster
    kafka_client = AdminClient(self.kafka_config)

    # Add prefix to user defined datasets
    if user_dataset_prefix:
        config = {
            f"{user_dataset_prefix}.{dataset_name}": dataset_config
            for dataset_name, dataset_config in config.items()
        }

    # Fail if reserved dataset names are defined in catalog
    for reserved_dataset in DEFAULT_KAFKA_CONFIG.get("DATASETS"):
        if reserved_dataset in config:
            raise ValueError(
                f"Unable to create dataset `{reserved_dataset}`. "
                "Reserved for internal use."
            )

    # Add logging dataset to catalog
    config[DEFAULT_KAFKA_CONFIG.get("LOGGING_DATASET")] = {
        "type": AINEKO_CONFIG.get("KAFKA_STREAM_TYPE"),
        "params": DEFAULT_KAFKA_CONFIG.get("DATASET_PARAMS"),
    }

    # Create all dataset defined in the catalog
    dataset_list = []
    for dataset_name, dataset_config in config.items():
        logger.info(
            "Creating dataset: %s: %s", dataset_name, dataset_config
        )
        # Create dataset for kafka streams
        if dataset_config["type"] == AINEKO_CONFIG.get("KAFKA_STREAM_TYPE"):
            # User defined
            dataset_params = {
                **DEFAULT_KAFKA_CONFIG.get("DATASET_PARAMS"),
                **dataset_config.get("params", {}),
            }

            # Configure dataset
            if self.dataset_prefix:
                topic_name = f"{self.dataset_prefix}.{dataset_name}"
            else:
                topic_name = dataset_name

            new_dataset = NewTopic(
                topic=topic_name,
                num_partitions=dataset_params.get("num_partitions"),
                replication_factor=dataset_params.get("replication_factor"),
                config=dataset_params.get("config"),
            )

            # Add dataset to appropriate list
            dataset_list.append(new_dataset)

        else:
            raise ValueError(
                "Unknown dataset type. Expected: "
                f"{AINEKO_CONFIG.get('STREAM_TYPES')}."
            )

    # Create all configured datasets
    datasets = kafka_client.create_topics(dataset_list)

    # Block until all datasets finish creation
    cur_time = time.time()
    while True:
        if all(future.done() for future in datasets.values()):
            logger.info("All datasets created.")
            break
        if time.time() - cur_time > AINEKO_CONFIG.get(
            "DATASET_CREATION_TIMEOUT"
        ):
            raise TimeoutError(
                "Timeout while creating Kafka datasets. "
                "Please check your Kafka cluster."
            )

    return datasets

prepare_nodes(pipeline_config, poison_pill)

Prepare actor handles for all nodes.

Parameters:

Name Type Description Default
pipeline_config dict

pipeline configuration

required

Returns:

Name Type Description
dict list

mapping of node names to actor handles

list list

list of ray objects

Source code in aineko/core/runner.py
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
def prepare_nodes(
    self, pipeline_config: dict, poison_pill: ray.actor.ActorHandle
) -> list:
    """Prepare actor handles for all nodes.

    Args:
        pipeline_config: pipeline configuration

    Returns:
        dict: mapping of node names to actor handles
        list: list of ray objects
    """
    # Collect all  actor futures
    results = []

    default_node_config = pipeline_config.get("default_node_settings", {})

    for node_name, node_config in pipeline_config["nodes"].items():
        # Initialize actor from specified class in config
        target_class = imports.import_from_string(
            attr=node_config["class"], kind="class"
        )
        actor_params = {
            **default_node_config,
            **node_config.get("node_settings", {}),
            "name": node_name,
            "namespace": self.pipeline_name,
        }

        wrapped_class = ray.remote(target_class)
        wrapped_class.options(**actor_params)
        actor_handle = wrapped_class.remote(
            node_name=node_name,
            pipeline_name=self.pipeline_name,
            poison_pill=poison_pill,
        )

        # Setup input and output datasets
        outputs = node_config.get("outputs", [])
        actor_handle.setup_datasets.remote(
            inputs=node_config.get("inputs", None),
            outputs=outputs,
            datasets=pipeline_config["datasets"],
            has_pipeline_prefix=True,
        )

        # Setup internal datasets like logging, without pipeline prefix
        actor_handle.setup_datasets.remote(
            outputs=DEFAULT_KAFKA_CONFIG.get("DATASETS"),
            datasets=pipeline_config["datasets"],
        )

        # Create actor future (for execute method)
        results.append(
            actor_handle.execute.remote(
                params=node_config.get("node_params", None)
            )
        )

    return results

run()

Runs the pipeline.

Step 1: Load config for pipeline

Step 2: Set up datasets

Step 3: Set up PoisonPill node that is available to all nodes

Step 4: Set up nodes (including Node Manager)

Source code in aineko/core/runner.py
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
def run(self) -> None:
    """Runs the pipeline.

    Step 1: Load config for pipeline

    Step 2: Set up datasets

    Step 3: Set up PoisonPill node that is available to all nodes

    Step 4: Set up nodes (including Node Manager)
    """
    # Load pipeline config
    pipeline_config = self.load_pipeline_config()
    self.pipeline_name = self.pipeline_name or pipeline_config["name"]

    # Create the necessary datasets
    self.prepare_datasets(
        config=pipeline_config["datasets"],
        user_dataset_prefix=self.pipeline_name,
    )

    # Initialize ray cluster
    ray.shutdown()
    ray.init(
        namespace=self.pipeline_name,
        _metrics_export_port=self.metrics_export_port,
    )

    # Create poison pill actor
    poison_pill = ray.remote(PoisonPill).remote()

    # Add Node Manager to pipeline config
    pipeline_config["nodes"][
        NODE_MANAGER_CONFIG.get("NAME")
    ] = NODE_MANAGER_CONFIG.get("NODE_CONFIG")

    # Create each node (actor)
    results = self.prepare_nodes(
        pipeline_config=pipeline_config,
        poison_pill=poison_pill,  # type: ignore
    )

    ray.get(results)