Workflow
Working with Data Analytics we came across the need for executing a workflow that wasn't predefined, but they could be defined and configured by the user. This library provides the user with a toolset to design, configure, execute and monitor a Workflow. Scheduling of a Workflow is also available, but through a different library.
Workflow Types
Data-checkin
workflows are used to load data to the platform with a very limited pre-defined number of Blocks available to the user.Analytics
workflows are used for more advanced fine-grained data processing, model training, evaluation and application.Internal
workflows are used by the platform for workflows that need to take place in the background without involving the end user.
Workflow Configuration
A Workflow is a Directed Acyclic Graph (DAG) made of Tasks, which in turn, are instances of Blocks.
Blocks
A Block can be perceived as a function of code (which, obviously, might call other functions), with specific input variables and a predefined output. These functions are written in Python, grouped by the execution framework they support and bundled accordingly.
Once a block is added to the appropriate bundle, it should be registered in the platform, and made available to be used in a Workflow. For registering a block, the user supplies the platform with information on how a task based on this block should be configured and validated before used in a workflow.
This information includes presentation (name, description, block category and type) and configuration (input parameters, output structure) options.
Tasks
A Task is an instance of a block. The user adds a task in a workflow, and provides its configuration based on the block definition. Tasks are the interconnected nodes of the workflow DAG. A workflow might have multiple tasks based on the same block, while each task can be added to a workflow only once.
Workflow Execution
Methods
As the configuration of a Workflow is a complex process, besides the normal, end-to-end execution of a workflow, we provide the user the ability to execute dry and test runs for a workflow, upto a specific Task.
dry-run
is used to get the structure of the data after a specific task was executed. Since many blocks add, remove or modify the data structure in ways that might not be directly visible to the user (or just for confirmation), the "dry-run" execution method is available to assist (or verify)test-run
is used to get both the structure of the data and a small sample after a specific task was executed. This is to assist the user to verify that the configuration is working as intendedsample-run
(data-checkin only) is run on a small sample provided by the user after each data-checkin step to assist the user to verify that the configuration is working as intended
Execution Frameworks
While Spark is the de-facto execution engine for Big Data, we will add support different execution "frameworks":
- Spark v3 - Available
- Python v3 - Available
- Generic Kubernetes - Under Investigation
- Dask - TBC
Execution Location
- Cloud - Using platform resources, on Kubernetes
- On-Premise - Using user's on-premise resources, executed with a registered runner
Execution configuration pre-processing
In order for a workflow to be submitted for execution, it needs to conform to a set of rules. These rules depend on the Workflow Type, the Execution Method, the Execution Framework and the Execution Location.
Execution Rule Engine
The execution rule engine is responsible for validating the workflow configuration as well as handling any additional pre-requisites that need to take place before the execution takes place depending on the factors mentioned above. Some examples of rules are:
Analytics Test Run on Cloud
: Clears all intermediate task results from mongoAnalytics Normal Run on Cloud
: Calculates the workflow's ingested data size in order to estimate the required resources to allocate for execution.Data-checkin Sample Run
: Checks if there is a sample file provided and adds it to the execution configurationData-checkin Normal Run
: Checks if there is a harvester task in the workflow and whether its configured appropriatelyData-checkin On-Premise Run
: Adds a pre-signed MinIO URL to store the generated parquet file after executionData-checkin Spark Run
: Checks if the ingested method is through CSV files and whether the files are larger than the pre-defined Spark threshold. If it is, it changes the execution framework to Spark
Task Transformers/Builder
After the workflow goes through the rule engine validation and before is finally sent for execution, several of its blocks (usually generic blocks) are transformed to specific blocks (i.e. dc.Harvester
is transformed to dc.FileHarvester
if it involves files or dc.ApiHarvester
if it involves APIs, etc). Along with the block transformation, we are also adjusting various parameters in the execution, like required resources, type of execution, credentials for reading/writing (Vault token, MinIO credentials, etc). This is not observed by the user in any way, but is something done internally.
Logging
Given the potential complexity of Workflows, the workflow module provides a dedicated logging mechanism to store all log messages produced by the execution of all tasks. Log messages can then be filtered by type (debug
, info
, warn
, error
), workflow execution, task, datetime, even the executor/worker (for cases where the execution was performed in a distributed way).
Workflow Execution v2 (Product Only)
To address several issues and shortcomings observed in the initial version of the execution, the execution of a workflow has changed in the Product. The changes include:
- Adding more advanced read blocks, where the users can filter the data their pipelines use, reducing the initial memory requirements and allowing more advanced cases (i.e. execute pipeline only on new data)
- Running a
test-run
on any Task will produce result samples for all tasks between that task and the read block - Moving Task result samples to MongoDB and providing a set of appropriate endpoints for retrieving them. This will reduce the initial loading times of the Workflow Designer, and can provide further optimisations on the display of the test results (filtering, pagination, etc)
- Re-writting the transformation engine, to support more complex scenarios and make it easily extensible for new cases that will arrive in the future. While the old approach was a set of if conditions, the current solution involves a builder pattern, where we start using the Workflow along with its Tasks, and we build the correct configuration to be send to the execution engine.