Orchestrator

Overview

This library is responsible for orchestrating asynchronous tasks. It provides some consumers that handle events raised within the application or subscribe to a RabbitMQ queue for messages published from external execution services.

We have two consumers: EventConsumer and FeedbackConsumer. The first is used for "internal" events (events originating from one of our libraries) and the second for "external" events (events received from RabbitMQ). Their only job is to receive the message and call the correct function from the appropriate service.

Event Consumers

Internal (Redis)

The following events are registered and handled:

  • DCJ_CREATED: Raised when a data check-in job was created. Currently it only adds a log entry for the creation of the job.
  • DCJ_DELETED: Raised when a data check-in job was deleted. It sends a 'cancel' message to the RabbitMQ for each of the deleted job's steps, to kill them and free resources.
  • DCJ_CONFIGURED: Raised when a data check-in job step was finalised. It parses the step's configuration and process accordingly (see Process Configuration below)
  • DCJ_STEP_UPDATED: Raised when a data check-in job step was updated. It parses the step's updated configuration and process accordingly (see Process Configuration below)
  • DCJ_STEP_RESCHEDULE: Raised when a data check-in job step should start (scheduled event). If the step is ready to be executed (is configured and not already Queued or Running), then its configuration is prepared and submitted to execution director for further execution.
  • USER_CREATED: Raised when a new user was created/registered. It creates a new bucket in MinIO for user's files.
  • ORGANISATION_CREATED: Raised when a new organisation was created. It creates a new bucket in MinIO for organisation's files.

External (RabbitMQ)

The FeedbackConsumer is listening for messages in a feedback topic in RabbitMQ. These messages are expected to have the following structure:

export interface IFeedback {
    jobId: number;
    stepId: number;
    serviceType: string;
    reason: FeedbackType;
    message?: {
        progress?: Record<string, any>;
        error?: string;
        errorCode?: number;
    };
    stats?: Record<string, any>;
    files: { name: string; size: number; mimeType: string }[];
    processed?: string[];
    unprocessed?: string[];
}

The supported reasons are:

  • cancelled: Step execution cancelled by user intervention,
  • completed: Step execution completed successfuly,
  • continue: Step is still running but it saved data to MinIO. It is used by Kafka and API polling harvesters to notify the backend that a file was saved, and, if configured to do so, start the next step,
  • progress: Step is still running but it has made any kind of "progress". Used in a similar way as continue, BUT it never triggers the execution of the next step, as these messages are raised by short-lived services, that will eventually be completed,
  • failed: Step execution failed. Expected to return an errorCode and an error message.
  • started: Service container for this step is created and the execution has started,

For details on how we handle external events, see the Process Message section below.

Orchestrator

Process Configuration

Whenever a step is finalised and every time it is updated afterwards, its configuration goes through the configuration flow:

Process Configuration

Note: Scheduling either next or current step is done by setting the schedule property of the step and calling the scheduleStep function of the Task Scheduler service.

Run Step

If a step has schedules, it is run by the scheduler using the "DCJ_STEP_RESCHEDULE" event. Otherwise, the execution is initiated by:

  1. The user, by finalising (or updating) the step's configuration
  2. The previous step, when its execution is completed OR a continue message arrives.

Running a step goes through the following flow:

Run Step

Note: Preparing the configuration includes: (a) generating a presigned MinIO url for each file to be processed, (b) generating a presigned url for uploading files back to MinIO and (c) appending a token for retrieving private information from Vault (generated using the VaultService from vault library).

Process Message

Whenever a message arrives in the RabbitMQ queue, it is goes through the following flow:

Process Message

Notes:

  • Adding/unlocking/changing the status of files is done using the FileService in theminio library.
  • Updating and indexing metadata and files metadata is done using the AssetService in the asset library.
  • Merging stats is done using the StepService in data check-in library.

Task Scheduler

Schedule Daily Tasks

Even though we allow the users to set any date for their tasks (steps) to be executed, we only schedule tasks on a daily basis, every day at midnight (UTC). This is done by retrieving any tasks (steps) that have defined schedules and, if configured, are added to the scheduler. In parallel, any expired schedules are cleared.

(Re)Start Scheduler

As the scheduling is done using an in-memory queue, whenever the backend is (re)started we need to re-schedule everything. To do so, we re-schedule daily tasks (as described above), WITHOUT clearing expired schedules, as this will be performed automatically at midnight.

Schedule Step

Scheduling a step goes through a set of checks and, eventually, a DCJ_STEP_RESCHEDULE event is scheduled to be triggered according to user's configuration, which will result (as seen above) to the execution of the step.

Schedule Step