Monitoring Engine

General Information

The monitoring engine is responsible for storing, managing and searching executions and execution logs. It uses Elasticsearch to index the execution data, to perform search queries and handle advanced aggregations. There are two elasticsearch indices:

  • execution, used to store all execution information for the data check-in and (in the future) analytic pipelines
  • execution_log, used to store the execution logs/messages, published by each step/block, for each execution

Elasticsearch indices

Execution

The information indexed in Elasticsearch about each execution at a high-level includes:

  • general information, including the execution's id, status, duration, the dates that is was created/finished, the user/organisation that created it, information about its related workflow (id, type and job id) and which are the input/output asset ids
  • stats, including information regarding each block involved in the specific execution. This information include the block id, task id, block's display name, number of input/output records, number of input/output columns and the number of transformed/null values
  • error information, including the failed task id, failed block id and an array with all errors that the specific block produced during the execution. Earch error contains information regarding the block id, the name of the original/mapped field that the error occured and the emited error code
  • metrics, containing information regarding each output asset. This information include the asset id, number input/output/tranformed records and the number of transformed/null/total values

Sample payload of a completed execution:

{
  "executionId": "4c6f2905-2f8c-4671-b265-33eb742fca63",
  "workflowId": "fa61e575-dbfa-47af-a8d1-d949567e299d",
  "workflowType": "data-checkin",
  "jobId": "21",
  "inputAssetIds": [],
  "outputAssetIds": [
    16
  ],
  "userId": 1,
  "organisationId": 1,
  "status": "completed",
  "duration": 0.423902,
  "createdAt": "2022-05-09T16:36:10.673763",
  "finishedAt": "2022-05-09T16:36:11.097665",
  "stats": [
    {
      "blockId": "FilesHarvester",
      "taskId": "e2d33848-1a9f-41cd-a7b8-c1de8d8a5f5f",
      "displayName": "Harvester",
      "inputRecords": 0,
      "outputRecords": 0,
      "inputColumns": 0,
      "outputColumns": 0,
      "transformedValues": 0,
      "nullValues": 0
    },
    {
      "blockId": "ExportOtherFiles",
      "taskId": "cf8ebe17-0ac2-4fd7-a0c3-ff0e183e04d2",
      "displayName": "Loader",
      "inputRecords": 0,
      "outputRecords": 0,
      "inputColumns": 0,
      "outputColumns": 0,
      "transformedValues": 0,
      "nullValues": 0
    }
  ],
  "errors": [],
  "failedTaskId": null,
  "failedBlockId": null,
  "metrics": [
    {
      "assetId": 16,
      "inputRecords": 0,
      "outputRecords": 0,
      "transformedRecords": 0,
      "transformedValues": 0,
      "nullValues": 0,
      "totalValues": 0
    }
  ]
}

Sample payload of a failed execution:

{
  "executionId": "9911b8dc-01bc-4168-891a-855baaecc798",
  "workflowId": "04abe169-bee1-40bc-8a58-71fc137d235d",
  "workflowType": "data-checkin",
  "jobId": "25",
  "inputAssetIds": [],
  "outputAssetIds": [
    19
  ],
  "userId": 1,
  "organisationId": 1,
  "status": "failed",
  "duration": 0.362542,
  "createdAt": "2022-05-12T09:51:19.687373",
  "finishedAt": "2022-05-12T09:51:20.049915",
  "stats": [],
  "errors": [
    {
      "blockId": "DataMapper",
      "originalField": "Departure Time",
      "mappedField": "Battery_relatedMeasurement_measuredDateTime",
      "errorCode": 2003
    }
  ],
  "failedTaskId": "3ede5a2d-c325-441b-8fdb-821458855fe8",
  "failedBlockId": "DataMapper",
  "metrics": []
}

Execution log

The information indexed in Elasticsearch about each execution log at a high-level includes:

  • execution_id, the related execution
  • context, the name of the sercice that the execution was created
  • level, the level of the execution log
  • message, the actual message
  • timestamp, the date that the exeuction log was created

Sample payload of an execution log:

{
  "execution_id": "4c6f2905-2f8c-4671-b265-33eb742fca63",
  "context": "ExecutionService",
  "level": "info",
  "message": "Workflow submitted for execution",
  "timestamp": "2022-05-06T09:32:18.132000+00:00"
}

Execution indexing

General information

One of the main functionalities of the monitoring service is to store/index the executions and execution logs to Elasticsearch to enable search and aggregation queries to be performed.

For that reason, the monitoring service is integrated with the RabbitMQ and listens to two queues (execution and execution-log) to handle any incoming messages from the execution service and backend.

General flow

When the user finalizes the loader step of a data check-in pipeline, the backend sends the first execution (with status queued) and execution log messages to RabbitMQ. The monitoring service receives the messages and creates the execution and execution log documents in Elasticsearch.

The execution service sends its first execution message, with status running, when it starts to run the execution. If the execution was successful, it sends a message containing the new status completed, the finished date, duration, metrics and stats of the execution. However, if the execution failed, it sends a message with status failed, the errors and failed block/task ids.

During the execution, several execution log messages are sent to the monitoring service, showcasing the progress of the pipeline steps.

API documentation

Get all execution

Returns all executions.

Method: GET, Path: /executions

Get an execution

Returns a specific execution.

Method: GET, Path: /executions/<execution_id>

Search executions

Performs search queries and returns searched executions. It supports faceted search, paginations and sorting options.

Method: POST, Path: /executions/search

Sample payload:

{
  "query": {
    "text": "*",
    "workflowId": null,
    "userId": 1,
    "organisationId": 1,
    "inputAssetIds": [],
    "outputAssetIds": [],
    "dateRange": {
      "start": null,
      "end": null
    },
    "durationRange": {
      "min": null,
      "max": null
    }
  },
  "facets": {
    "status": ["failed"],
    "workflowType": [],
    "failedBlockId": []
  },
  "sortBy": {
    "field": "created_at",
    "asc": true
  },
  "pagination": {
    "index": 0,
    "pageSize": 10
  }
}

Example response:

{
  "results": [
    {
      "executionId": "4c6f2905-2f8c-4671-b265-33eb742fca63",
      "workflowId": "fa61e575-dbfa-47af-a8d1-d949567e299d",
      "workflowType": "data-checkin",
      "jobId": "21",
      "inputAssetIds": [],
      "outputAssetIds": [
        16
      ],
      "userId": 1,
      "organisationId": null,
      "status": "completed",
      "duration": 0.423902,
      "createdAt": "2022-05-09T16:36:10.673763",
      "finishedAt": "2022-05-09T16:36:11.097665",
      "stats": [
        {
          "blockId": "FilesHarvester",
          "taskId": "e2d33848-1a9f-41cd-a7b8-c1de8d8a5f5f",
          "displayName": "Harvester",
          "inputRecords": 0,
          "outputRecords": 0,
          "inputColumns": 0,
          "outputColumns": 0,
          "transformedValues": 0,
          "nullValues": 0
        },
        {
          "blockId": "ExportOtherFiles",
          "taskId": "cf8ebe17-0ac2-4fd7-a0c3-ff0e183e04d2",
          "displayName": "Loader",
          "inputRecords": 0,
          "outputRecords": 0,
          "inputColumns": 0,
          "outputColumns": 0,
          "transformedValues": 0,
          "nullValues": 0
        }
      ],
      "errors": [],
      "failedTaskId": null,
      "failedBlockId": null,
      "metrics": [
        {
          "assetId": 16,
          "inputRecords": 0,
          "outputRecords": 0,
          "transformedRecords": 0,
          "transformedValues": 0,
          "nullValues": 0,
          "totalValues": 0
        }
      ]
    }
  ],
  "facets": {
    "status": [
      {
        "value": "completed",
        "count": 1,
        "selected": true
      }
    ],
    "workflowType": [
      {
        "value": "data-checkin",
        "count": 1,
        "selected": false
      }
    ],
    "failedBlockId": []
  },
  "pagination": {
    "index": 0,
    "page": 1,
    "pageSize": 100,
    "total": 1
  }
}

Get execution failures

Returns the execution failures.

Method: POST, Path: /executions/failures

Sample payload:

Example response:

Get execution timeliness

Returns the execution timeliness.

Method: POST, Path: /executions/timeliness

Sample payload:

Example response:

Get execution completeness

Returns the execution completeness.

Method: POST, Path: /executions/completeness

Sample payload:

Example response:

Get execution summary

Returns the execution summary.

Method: POST, Path: /executions/summary

Sample payload:

Example response:

Get execution metrics

Returns the overall metrics of the searched executions.

Method: POST, Path: /executions/metrics

Sample payload:

{
  "query": {
    "text": "*",
    "workflowId": null,
    "userId": 1,
    "organisationId": 1,
    "inputAssetIds": [],
    "outputAssetIds": [],
    "dateRange": {
      "start": null,
      "end": null
    },
    "durationRange": {
      "min": null,
      "max": null
    }
  },
  "facets": {
    "status": ["failed"],
    "workflowType": ["analytics", "data-checkin"],
    "failedBlockId": []
  }
}

Example response:

{
	"results": {
		"totalExecutions": 2,
		"successfulExecutions": 2,
		"failedExecutions": 0,
		"avgExecutionTime": 0.4835669994354248,
		"activePipelines": 1,
		"failedPipelines": 0,
		"resolvedPipelines": 0
	},
	"facets": {
		"status": [
			{
				"value": "completed",
				"count": 2,
				"selected": false
			}
		],
		"workflowType": [],
		"failedBlockId": []
	}
}

Update last's execution schedule

Updates the next schedule of a workflow's last execution.

Method: PUT, Path: /executions/schedule

Sample payload:

Get all execution logs

Returns all the execution logs.

Method: GET, Path: /execution-logs

Get execution's logs

Returns all the execution logs of an execution.

Method: GET, Path: /execution-logs/<execution_id>