Orchestrator
Overview
This library is responsible for orchestrating asynchronous tasks. It provides some consumers that handle events raised within the application or subscribe to a RabbitMQ queue for messages published from external execution services.
We have two consumers: EventConsumer
and FeedbackConsumer
. The first is used for "internal" events (events originating from one of our libraries) and the second for "external" events (events received from RabbitMQ). Their only job is to receive the message and call the correct function from the appropriate service.
Event Consumers
Internal (Redis)
The following events are registered and handled:
DCJ_CREATED
: Raised when a data check-in job was created. Currently it only adds a log entry for the creation of the job.DCJ_DELETED
: Raised when a data check-in job was deleted. It sends a 'cancel' message to the RabbitMQ for each of the deleted job's steps, to kill them and free resources.DCJ_CONFIGURED
: Raised when a data check-in job step was finalised. It parses the step's configuration and process accordingly (see Process Configuration below)DCJ_STEP_UPDATED
: Raised when a data check-in job step was updated. It parses the step's updated configuration and process accordingly (see Process Configuration below)DCJ_STEP_RESCHEDULE
: Raised when a data check-in job step should start (scheduled event). If the step is ready to be executed (is configured and not already Queued or Running), then its configuration is prepared and submitted to execution director for further execution.USER_CREATED
: Raised when a new user was created/registered. It creates a new bucket in MinIO for user's files.ORGANISATION_CREATED
: Raised when a new organisation was created. It creates a new bucket in MinIO for organisation's files.
External (RabbitMQ)
The FeedbackConsumer is listening for messages in a feedback
topic in RabbitMQ. These messages are expected to have the following structure:
export interface IFeedback {
jobId: number;
stepId: number;
serviceType: string;
reason: FeedbackType;
message?: {
progress?: Record<string, any>;
error?: string;
errorCode?: number;
};
stats?: Record<string, any>;
files: { name: string; size: number; mimeType: string }[];
processed?: string[];
unprocessed?: string[];
}
The supported reasons are:
cancelled
: Step execution cancelled by user intervention,completed
: Step execution completed successfuly,continue
: Step is still running but it saved data to MinIO. It is used by Kafka and API polling harvesters to notify the backend that a file was saved, and, if configured to do so, start the next step,progress
: Step is still running but it has made any kind of "progress". Used in a similar way ascontinue
, BUT it never triggers the execution of the next step, as these messages are raised by short-lived services, that will eventually becompleted
,failed
: Step execution failed. Expected to return anerrorCode
and anerror
message.started
: Service container for this step is created and the execution has started,
For details on how we handle external events, see the Process Message section below.
Orchestrator
Process Configuration
Whenever a step is finalised and every time it is updated afterwards, its configuration goes through the configuration flow:
Note: Scheduling either next or current step is done by setting the
schedule
property of the step and calling thescheduleStep
function of the Task Scheduler service.
Run Step
If a step has schedules, it is run by the scheduler using the "DCJ_STEP_RESCHEDULE" event. Otherwise, the execution is initiated by:
- The user, by finalising (or updating) the step's configuration
- The previous step, when its execution is completed OR a
continue
message arrives.
Running a step goes through the following flow:
Note: Preparing the configuration includes: (a) generating a presigned MinIO url for each file to be processed, (b) generating a presigned url for uploading files back to MinIO and (c) appending a token for retrieving private information from Vault (generated using the
VaultService
from vault library).
Process Message
Whenever a message arrives in the RabbitMQ queue, it is goes through the following flow:
Notes:
- Adding/unlocking/changing the status of files is done using the
FileService
in theminio library.- Updating and indexing metadata and files metadata is done using the
AssetService
in the asset library.- Merging stats is done using the
StepService
in data check-in library.
Task Scheduler
Schedule Daily Tasks
Even though we allow the users to set any date for their tasks (steps) to be executed, we only schedule tasks on a daily basis, every day at midnight (UTC). This is done by retrieving any tasks (steps) that have defined schedules and, if configured, are added to the scheduler. In parallel, any expired schedules are cleared.
(Re)Start Scheduler
As the scheduling is done using an in-memory queue, whenever the backend is (re)started we need to re-schedule everything. To do so, we re-schedule daily tasks (as described above), WITHOUT clearing expired schedules, as this will be performed automatically at midnight.
Schedule Step
Scheduling a step goes through a set of checks and, eventually, a DCJ_STEP_RESCHEDULE
event is scheduled to be triggered according to user's configuration, which will result (as seen above) to the execution of the step.