Monitoring Engine
General Information
The monitoring engine is responsible for storing, managing and searching executions and execution logs. It uses Elasticsearch to index the execution data, to perform search queries and handle advanced aggregations. There are two elasticsearch indices:
- execution, used to store all execution information for the data check-in and (in the future) analytic pipelines
- execution_log, used to store the execution logs/messages, published by each step/block, for each execution
Elasticsearch indices
Execution
The information indexed in Elasticsearch about each execution at a high-level includes:
- general information, including the execution's id, status, duration, the dates that is was created/finished, the user/organisation that created it, information about its related workflow (id, type and job id) and which are the input/output asset ids
- stats, including information regarding each block involved in the specific execution. This information include the block id, task id, block's display name, number of input/output records, number of input/output columns and the number of transformed/null values
- error information, including the failed task id, failed block id and an array with all errors that the specific block produced during the execution. Earch error contains information regarding the block id, the name of the original/mapped field that the error occured and the emited error code
- metrics, containing information regarding each output asset. This information include the asset id, number input/output/tranformed records and the number of transformed/null/total values
Sample payload of a completed execution:
{
"executionId": "4c6f2905-2f8c-4671-b265-33eb742fca63",
"workflowId": "fa61e575-dbfa-47af-a8d1-d949567e299d",
"workflowType": "data-checkin",
"jobId": "21",
"inputAssetIds": [],
"outputAssetIds": [
16
],
"userId": 1,
"organisationId": 1,
"status": "completed",
"duration": 0.423902,
"createdAt": "2022-05-09T16:36:10.673763",
"finishedAt": "2022-05-09T16:36:11.097665",
"stats": [
{
"blockId": "FilesHarvester",
"taskId": "e2d33848-1a9f-41cd-a7b8-c1de8d8a5f5f",
"displayName": "Harvester",
"inputRecords": 0,
"outputRecords": 0,
"inputColumns": 0,
"outputColumns": 0,
"transformedValues": 0,
"nullValues": 0
},
{
"blockId": "ExportOtherFiles",
"taskId": "cf8ebe17-0ac2-4fd7-a0c3-ff0e183e04d2",
"displayName": "Loader",
"inputRecords": 0,
"outputRecords": 0,
"inputColumns": 0,
"outputColumns": 0,
"transformedValues": 0,
"nullValues": 0
}
],
"errors": [],
"failedTaskId": null,
"failedBlockId": null,
"metrics": [
{
"assetId": 16,
"inputRecords": 0,
"outputRecords": 0,
"transformedRecords": 0,
"transformedValues": 0,
"nullValues": 0,
"totalValues": 0
}
]
}
Sample payload of a failed execution:
{
"executionId": "9911b8dc-01bc-4168-891a-855baaecc798",
"workflowId": "04abe169-bee1-40bc-8a58-71fc137d235d",
"workflowType": "data-checkin",
"jobId": "25",
"inputAssetIds": [],
"outputAssetIds": [
19
],
"userId": 1,
"organisationId": 1,
"status": "failed",
"duration": 0.362542,
"createdAt": "2022-05-12T09:51:19.687373",
"finishedAt": "2022-05-12T09:51:20.049915",
"stats": [],
"errors": [
{
"blockId": "DataMapper",
"originalField": "Departure Time",
"mappedField": "Battery_relatedMeasurement_measuredDateTime",
"errorCode": 2003
}
],
"failedTaskId": "3ede5a2d-c325-441b-8fdb-821458855fe8",
"failedBlockId": "DataMapper",
"metrics": []
}
Execution log
The information indexed in Elasticsearch about each execution log at a high-level includes:
- execution_id, the related execution
- context, the name of the sercice that the execution was created
- level, the level of the execution log
- message, the actual message
- timestamp, the date that the exeuction log was created
Sample payload of an execution log:
{
"execution_id": "4c6f2905-2f8c-4671-b265-33eb742fca63",
"context": "ExecutionService",
"level": "info",
"message": "Workflow submitted for execution",
"timestamp": "2022-05-06T09:32:18.132000+00:00"
}
Execution indexing
General information
One of the main functionalities of the monitoring service is to store/index the executions and execution logs to Elasticsearch to enable search and aggregation queries to be performed.
For that reason, the monitoring service is integrated with the RabbitMQ and listens to two queues (execution and execution-log) to handle any incoming messages from the execution service and backend.
General flow
When the user finalizes the loader step of a data check-in pipeline, the backend sends the first execution (with status queued
) and execution log messages to RabbitMQ. The monitoring service receives the messages and creates the execution and execution log documents in Elasticsearch.
The execution service sends its first execution message, with status running
, when it starts to run the execution. If the execution was successful, it sends a message containing the new status completed
, the finished date, duration, metrics and stats of the execution. However, if the execution failed, it sends a message with status failed
, the errors and failed block/task ids.
During the execution, several execution log messages are sent to the monitoring service, showcasing the progress of the pipeline steps.
API documentation
Get all execution
Returns all executions.
Method: GET, Path: /executions
Get an execution
Returns a specific execution.
Method: GET, Path: /executions/<execution_id>
Search executions
Performs search queries and returns searched executions. It supports faceted search, paginations and sorting options.
Method: POST, Path: /executions/search
Sample payload:
{
"query": {
"text": "*",
"workflowId": null,
"userId": 1,
"organisationId": 1,
"inputAssetIds": [],
"outputAssetIds": [],
"dateRange": {
"start": null,
"end": null
},
"durationRange": {
"min": null,
"max": null
}
},
"facets": {
"status": ["failed"],
"workflowType": [],
"failedBlockId": []
},
"sortBy": {
"field": "created_at",
"asc": true
},
"pagination": {
"index": 0,
"pageSize": 10
}
}
Example response:
{
"results": [
{
"executionId": "4c6f2905-2f8c-4671-b265-33eb742fca63",
"workflowId": "fa61e575-dbfa-47af-a8d1-d949567e299d",
"workflowType": "data-checkin",
"jobId": "21",
"inputAssetIds": [],
"outputAssetIds": [
16
],
"userId": 1,
"organisationId": null,
"status": "completed",
"duration": 0.423902,
"createdAt": "2022-05-09T16:36:10.673763",
"finishedAt": "2022-05-09T16:36:11.097665",
"stats": [
{
"blockId": "FilesHarvester",
"taskId": "e2d33848-1a9f-41cd-a7b8-c1de8d8a5f5f",
"displayName": "Harvester",
"inputRecords": 0,
"outputRecords": 0,
"inputColumns": 0,
"outputColumns": 0,
"transformedValues": 0,
"nullValues": 0
},
{
"blockId": "ExportOtherFiles",
"taskId": "cf8ebe17-0ac2-4fd7-a0c3-ff0e183e04d2",
"displayName": "Loader",
"inputRecords": 0,
"outputRecords": 0,
"inputColumns": 0,
"outputColumns": 0,
"transformedValues": 0,
"nullValues": 0
}
],
"errors": [],
"failedTaskId": null,
"failedBlockId": null,
"metrics": [
{
"assetId": 16,
"inputRecords": 0,
"outputRecords": 0,
"transformedRecords": 0,
"transformedValues": 0,
"nullValues": 0,
"totalValues": 0
}
]
}
],
"facets": {
"status": [
{
"value": "completed",
"count": 1,
"selected": true
}
],
"workflowType": [
{
"value": "data-checkin",
"count": 1,
"selected": false
}
],
"failedBlockId": []
},
"pagination": {
"index": 0,
"page": 1,
"pageSize": 100,
"total": 1
}
}
Get execution failures
Returns the execution failures.
Method: POST, Path: /executions/failures
Sample payload:
Example response:
Get execution timeliness
Returns the execution timeliness.
Method: POST, Path: /executions/timeliness
Sample payload:
Example response:
Get execution completeness
Returns the execution completeness.
Method: POST, Path: /executions/completeness
Sample payload:
Example response:
Get execution summary
Returns the execution summary.
Method: POST, Path: /executions/summary
Sample payload:
Example response:
Get execution metrics
Returns the overall metrics of the searched executions.
Method: POST, Path: /executions/metrics
Sample payload:
{
"query": {
"text": "*",
"workflowId": null,
"userId": 1,
"organisationId": 1,
"inputAssetIds": [],
"outputAssetIds": [],
"dateRange": {
"start": null,
"end": null
},
"durationRange": {
"min": null,
"max": null
}
},
"facets": {
"status": ["failed"],
"workflowType": ["analytics", "data-checkin"],
"failedBlockId": []
}
}
Example response:
{
"results": {
"totalExecutions": 2,
"successfulExecutions": 2,
"failedExecutions": 0,
"avgExecutionTime": 0.4835669994354248,
"activePipelines": 1,
"failedPipelines": 0,
"resolvedPipelines": 0
},
"facets": {
"status": [
{
"value": "completed",
"count": 2,
"selected": false
}
],
"workflowType": [],
"failedBlockId": []
}
}
Update last's execution schedule
Updates the next schedule of a workflow's last execution.
Method: PUT, Path: /executions/schedule
Sample payload:
Get all execution logs
Returns all the execution logs.
Method: GET, Path: /execution-logs
Get execution's logs
Returns all the execution logs of an execution.
Method: GET, Path: /execution-logs/<execution_id>