Skip to content

Pipeline configuration

At a high-level, building a pipeline requires defining a pipeline and implementing at least a node.

Defining a pipeline

The simplest possible pipeline would consist of a node and a dataset as shown below.

flowchart LR
classDef datasetClass fill:#87CEEB
classDef nodeClass fill:#eba487
N_sequence((sequence)):::nodeClass -->  T_test_sequence[test_sequence]:::datasetClass
For the sake of simplicity, we reference a truncated version of the pipeline definition:

Example pipeline.yml configuration file
pipeline:
  name: test-aineko-pipeline

  default_node_settings:
    num_cpus: 0.5

  nodes:
    sequence:
      class: my_awesome_pipeline.nodes.MySequencerNode
      outputs:
        - test_sequence
      node_params:
        initial_state: 0
        increment: 1


  datasets:
    test_sequence:
      type: kafka_stream

A pipeline definition should have the following attributes:

Keys

pipeline

This is the top-level key in a pipeline configuration file, a configuration map to define the name, default settings, nodes, and datasets for a pipeline.

Key Required Type Description
name Y string Name of the pipeline.
default_node_settings N map Defines common default values for node attributes which can be overridden at the node level.
nodes Y map Defines the compute nodes for a pipeline, mapping to node names.
datasets Y map Defines the compute nodes for a pipeline, mapping to structs with node name keys.

default_node_settings

This optional section can be used to set common default settings for all nodes in the pipeline. These settings are passed into ray actors as parameters, and accept any of the arguments found here. The most common one we usually use is num_cpus.

Key Required Type Description
<setting> N multiple Any of the parameters found here.
num_cpus N float Defines default number of CPUs for a node. Can be less than one.

These can be overridden at the node level.

nodes

This section defines the compute nodes for a pipeline.

Key Required Type Description
<name of node> Y map Defines map of node names to node structures in the pipeline.
<node_name>

A particular node instance in the pipeline, defined by a unique name. Any parameters defined at the individual node level will locally overwrite any default settings defined at the default_node_settings level.

Key Required Type Description
class Y string Python module to run for the node. This should exist within the python module in the same repository .
inputs N list of strings Defines which datasets to consume from if applicable.
outputs N list of strings Defines which datasets to produce to if applicable.
node_params N map Defines any arbitrary parameters relevant for node's application logic. In the example above, we defined initial_state and increment parameters, which are both integers. Environment variables can be injected into node_params by passing in strings that match the pattern {$ENV_VAR} where ENV_VAR is the environment variable to inject.
num_cpus Y float Number of CPUs allocated to a node. Required either for each node definition or at default_node_settings level.
How are environment variables injected?

The following method is used to inject environment variables into node_params:

aineko.core.config_loader.ConfigLoader.inject_env_vars

inject_env_vars(
    node_params: Optional[
        Union[Dict, List, str, int, float, bool]
    ] = None
) -> Optional[Union[Dict, List, str, int, float, bool]]

Inject environment variables into node params.

This function is used to recursively inject environment variables into strings passed through node params via the pipeline config. We only recursively parse strings, dicts, and lists, as these are the only types that can contain environment variables (i.e. excluding ints, floats, and Nones).

Environment variables are identified in strings by the pattern {$ENV_VAR} where ENV_VAR is the name of the environment variable to inject. For example, given the following environment variables:

$ export SECRET1=secret1
$ export SECRET2=secret2

The following node params dict:

```
{
    "key1": "A string with a {$SECRET1} and a {$SECRET2}.",
    "key2": {
        "key3": "A string with a {$SECRET1} and a {$SECRET2}.",
        "key4": [
            "A string with a {$SECRET1} and a {$SECRET2}.",
            "A string with a {$SECRET1} and a {$SECRET2}."
        ]
    }
}
```

Will be transformed to:

    ```
    {
        "key1": "A string with a secret1 and a secret2.",
        "key2": {
            "key3": "A string with a secret1 and a secret2.",
            "key4": [
                "A string with a secret1 and a secret2.",
                "A string with a secret1 and a secret2."
            ]
        }
    }
    ```
Source code in aineko/core/config_loader.py
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
def inject_env_vars(
    self,
    node_params: Optional[Union[Dict, List, str, int, float, bool]] = None,
) -> Optional[Union[Dict, List, str, int, float, bool]]:
    """Inject environment variables into node params.

    This function is used to recursively inject environment variables
    into strings passed through node params via the pipeline config.
    We only recursively parse strings, dicts, and lists, as these are
    the only types that can contain environment variables (i.e.
    excluding ints, floats, and Nones).

    Environment variables are identified in strings by the pattern
    {$ENV_VAR} where ENV_VAR is the name of the environment variable
    to inject. For example, given the following environment variables:

    ```
    $ export SECRET1=secret1
    $ export SECRET2=secret2
    ```

    The following node params dict:

        ```
        {
            "key1": "A string with a {$SECRET1} and a {$SECRET2}.",
            "key2": {
                "key3": "A string with a {$SECRET1} and a {$SECRET2}.",
                "key4": [
                    "A string with a {$SECRET1} and a {$SECRET2}.",
                    "A string with a {$SECRET1} and a {$SECRET2}."
                ]
            }
        }
        ```

    Will be transformed to:

            ```
            {
                "key1": "A string with a secret1 and a secret2.",
                "key2": {
                    "key3": "A string with a secret1 and a secret2.",
                    "key4": [
                        "A string with a secret1 and a secret2.",
                        "A string with a secret1 and a secret2."
                    ]
                }
            }
            ```
    """
    if isinstance(node_params, dict):
        for k, v in list(node_params.items()):
            node_params[k] = self.inject_env_vars(v)
    elif isinstance(node_params, list):
        for i, v in enumerate(node_params):
            node_params[i] = self.inject_env_vars(v)
    elif isinstance(node_params, str):
        env_var_pattern = r"\{\$.*?\}"
        env_var_match = re.search(env_var_pattern, node_params, re.DOTALL)
        if env_var_match:
            env_var_env_str = env_var_match.group()
            env_var_value = os.getenv(
                env_var_env_str[2:][:-1], default=None
            )
            if env_var_value is None:
                raise ValueError(
                    "Failed to inject environment variable. "
                    f"{env_var_env_str[2:][:-1]} was not found."
                )
            node_params = node_params.replace(
                env_var_env_str, env_var_value
            )
            return self.inject_env_vars(node_params)

    return node_params

datasets

This section defines the datasets for a pipeline.

Key Required Type Description
<name of dataset> Y map Defines map of dataset names to dataset structures in the pipeline.
<name of dataset>

A particular dataset instance in the pipeline, defined by a unique name. Each dataset is defined by a type.

Key Required Type Description
type Y string Defines which type of dataset to use. Currently, only kafka_stream is supported.

Note

Aineko is currently in the Beta release stage and is constantly improving.

If you have any feedback, questions, or suggestions, please reach out to us.