Aineko project
Note
This is a continuation of the previous section (Quick Start). Before starting, make sure you have already created a template project using aineko create
.
Directory contents
.
├── README.md
├── conf
│ └── pipeline.yml
├── my_awesome_pipeline
│ ├── __init__.py
│ ├── config.py
│ └── nodes.py
├── poetry.lock
├── pyproject.toml
└── tests
├── __init__.py
└── test_nodes.py
This is how the boilerplate directory look - many of these files are boilerplate files to make things work.
Let's zoom in on the more interesting files to take note of:
conf/pipeline.yml
- This contains your pipeline definition that you are expected to modify to define your own pipeline. It is defined in YAML.my_awesome_pipeline/nodes.py
- Remember how nodes are abstractions for computations? These nodes are implemented in Python. You do not have to strictly define them in this file. You can define them anywhere you like within the directory as long as you reference them correctly inpipeline.yml
.
Defining a pipeline
Pipelines are defined using a .yml
file that contains specific keys. In this configuration file, you can assemble a pipeline from nodes and datasets.
Refer to this for a detailed breakdown on pipeline configuration.
Implementing a node
A node requires:
- Inheriting the base node class
aineko.core.node.AbstractNode
- Implementing at least the abstract method
_execute
and optionally_pre_loop_hook
.
_pre_loop_hook
(optional) is used to initialize the node's state before it starts to process data from the dataset.
_execute
is the main logic that run recurrently. As of writing, user should explicitly produce and consume within this method like so:
def _execute(self, params: Optional[dict] = None):
"""This node takes an input number and increments it by 1."""
input_number = self.consumers["my_input_dataset"].next()
# if we want the most recent message, we can use .last()
output_number = input_number + 1
self.producers["my_output_dataset"].produce(output_number)