Pipeline configuration
At a high-level, building a pipeline requires defining a pipeline and implementing at least a node.
Defining a pipeline
The simplest possible pipeline would consist of a node and a dataset as shown below.
flowchart LR
classDef datasetClass fill:#87CEEB
classDef nodeClass fill:#eba487
N_sequence((sequence)):::nodeClass --> T_test_sequence[test_sequence]:::datasetClass
For the sake of simplicity, we reference a truncated version of the pipeline definition:
Example pipeline.yml
configuration file
pipeline:
name: test-aineko-pipeline
default_node_settings:
num_cpus: 0.5
nodes:
sequence:
class: my_awesome_pipeline.nodes.MySequencerNode
outputs:
- test_sequence
node_params:
initial_state: 0
increment: 1
datasets:
test_sequence:
type: kafka_stream
A pipeline definition should have the following attributes:
Keys
pipeline
This is the top-level key in a pipeline configuration file, a configuration map to define the name, default settings, nodes, and datasets for a pipeline.
Key | Required | Type | Description |
---|---|---|---|
name |
Y | string | Name of the pipeline. |
default_node_settings |
N | map | Defines common default values for node attributes which can be overridden at the node level. |
nodes |
Y | map | Defines the compute nodes for a pipeline, mapping to node names. |
datasets |
Y | map | Defines the compute nodes for a pipeline, mapping to structs with node name keys. |
default_node_settings
This optional section can be used to set common default settings for all nodes in the pipeline. These settings are passed into ray actors as parameters, and accept any of the arguments found here. The most common one we usually use is num_cpus
.
Key | Required | Type | Description |
---|---|---|---|
<setting> |
N | multiple | Any of the parameters found here. |
num_cpus |
N | float | Defines default number of CPUs for a node. Can be less than one. |
These can be overridden at the node level.
nodes
This section defines the compute nodes for a pipeline.
Key | Required | Type | Description |
---|---|---|---|
<name of node> |
Y | map | Defines map of node names to node structures in the pipeline. |
<node_name>
A particular node instance in the pipeline, defined by a unique name. Any parameters defined at the individual node level will locally overwrite any default settings defined at the default_node_settings
level.
Key | Required | Type | Description |
---|---|---|---|
class |
Y | string | Python module to run for the node. This should exist within the python module in the same repository . |
inputs |
N | list of strings | Defines which datasets to consume from if applicable. |
outputs |
N | list of strings | Defines which datasets to produce to if applicable. |
node_params |
N | map | Defines any arbitrary parameters relevant for node's application logic. In the example above, we defined initial_state and increment parameters, which are both integers. Environment variables can be injected into node_params by passing in strings that match the pattern {$ENV_VAR} where ENV_VAR is the environment variable to inject. |
num_cpus |
Y | float | Number of CPUs allocated to a node. Required either for each node definition or at default_node_settings level. |
How are environment variables injected?
The following method is used to inject environment variables into node_params
:
aineko.core.config_loader.ConfigLoader.inject_env_vars
inject_env_vars(
node_params: Optional[
Union[Dict, List, str, int, float, bool]
] = None
) -> Optional[Union[Dict, List, str, int, float, bool]]
Inject environment variables into node params.
This function is used to recursively inject environment variables into strings passed through node params via the pipeline config. We only recursively parse strings, dicts, and lists, as these are the only types that can contain environment variables (i.e. excluding ints, floats, and Nones).
Environment variables are identified in strings by the pattern {$ENV_VAR} where ENV_VAR is the name of the environment variable to inject. For example, given the following environment variables:
$ export SECRET1=secret1
$ export SECRET2=secret2
The following node params dict:
```
{
"key1": "A string with a {$SECRET1} and a {$SECRET2}.",
"key2": {
"key3": "A string with a {$SECRET1} and a {$SECRET2}.",
"key4": [
"A string with a {$SECRET1} and a {$SECRET2}.",
"A string with a {$SECRET1} and a {$SECRET2}."
]
}
}
```
Will be transformed to:
```
{
"key1": "A string with a secret1 and a secret2.",
"key2": {
"key3": "A string with a secret1 and a secret2.",
"key4": [
"A string with a secret1 and a secret2.",
"A string with a secret1 and a secret2."
]
}
}
```
Source code in aineko/core/config_loader.py
95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 |
|
datasets
This section defines the datasets for a pipeline.
Key | Required | Type | Description |
---|---|---|---|
<name of dataset> |
Y | map | Defines map of dataset names to dataset structures in the pipeline. |
<name of dataset>
A particular dataset instance in the pipeline, defined by a unique name. Each dataset is defined by a type.
Key | Required | Type | Description |
---|---|---|---|
type |
Y | string | Defines which type of dataset to use. Currently, only kafka_stream is supported. |
Note
Aineko is currently in the Beta release stage and is constantly improving.
If you have any feedback, questions, or suggestions, please reach out to us.