Batched Jobs

cbq allows for tracking jobs as part of a batch. The main use case for batched jobs is to dispatch additional jobs when a batch has completed successfully, completed with failures, or completed in any fashion. (Think try, catch, finally.)

Setup

To utilize batches, you first need to set up a Batch repository. Batches are tracked separate from jobs. cbq utilizes a database repository to track batches regardless of what provider your Job's connection uses.

A database migration is provided in resources/database/migrations/2000_01_01_000001_create_cbq_batches_table.cfc. You can copy this to your own project to run via cfmigrations or you can reference the migration or SQL scripts below.

You can customize the batch repository using the batchRepositoryProperties of cbq's moduleSettings. This is a struct with two properties you can set:

tableName

This defaults to cbq_batches. You can set this to any unique table name in your datasource.

queryOptions

This is a struct of options that will be passed to queryExecute. The most common use case for this property is to specify a specific datasource to use for the batches table. This defaults to an empty struct ({}).

Defining Batches

To create a batch, you can get an instance of PendingBatch@cbq from WireBox or use the cbq.batch() helper method.

Once you have a PendingBatch instance, you can add jobs to the batch using the add method.

add

Adds a single Job or an array of Jobs to a PendingBatch.

Arguments
Type
Required
Default
Description

job

string | Job | array<Job>

true

The Job WireBox id, Job instance, or array of Job instances to add to the PendingBatch.

properties

Struct

false

{}

A struct of properties for the Job instance. (Only used when providing a Job WireBox id.)

chain

array<Job>

false

[]

An array of Jobs to chain after this Job. (Only used when providing a Job WireBox id.)

queue

string

false

The queue the Job belongs to. (Only used when providing a Job WireBox id.)

connection

string

false

The Connection to dispatch the Job on. (Only used when providing a Job WireBox id.)

backoff

numeric

false

The amount of time, in seconds, to wait between attempting Jobs. (Only used when providing a Job WireBox id.)

timeout

numeric

false

The amount of time, in seconds, to wait before treating a Job as erroring. (Only used when providing a Job WireBox id.)

maxAttempts

numeric

false

The maximum amount of attempts of a Job before treating the Job as failed. (Only used when providing a Job WireBox id.)

Return: The PendingBatch to be dispatched.

batch
    .add( cbq.job( "ImportCsvJob", { "start": 1, "end": 100 } ) )
    .add( "ImportCsvJob", { "start": 101, "end": 200 } )
    .add( [
        cbq.job( "ImportCsvJob", { "start": 201, "end": 300 } ),
	cbq.job( "ImportCsvJob", { "start": 301, "end": 400 } ),
	cbq.job( "ImportCsvJob", { "start": 401, "end": 500 } )
    ] );

The primary use case of batches is to dispatch lifecycle jobs when the batch has completed and if the batch completed successfully or completed with failures. These jobs can be configured using the then, catch, and finally methods.

then

Defines a Job to be dispatched when all the jobs in the batch finishes successfully.

Arguments
Type
Required
Default
Description

job

string | Job

true

The Job WireBox id or Job instance to execute if all the jobs in the Batch complete successfully.

properties

Struct

false

{}

A struct of properties for the Job instance. (Only used when providing a Job WireBox id.)

chain

array<Job>

false

[]

An array of Jobs to chain after this Job. (Only used when providing a Job WireBox id.)

queue

string

false

The queue the Job belongs to. (Only used when providing a Job WireBox id.)

connection

string

false

The Connection to dispatch the Job on. (Only used when providing a Job WireBox id.)

backoff

numeric

false

The amount of time, in seconds, to wait between attempting Jobs. (Only used when providing a Job WireBox id.)

timeout

numeric

false

The amount of time, in seconds, to wait before treating a Job as erroring. (Only used when providing a Job WireBox id.)

maxAttempts

numeric

false

The maximum amount of attempts of a Job before treating the Job as failed. (Only used when providing a Job WireBox id.)

Return: The PendingBatch instance.

cbq.batch( [
    cbq.job( "ImportCsvJob", { "start": 1, "end": 100 } ),
    cbq.job( "ImportCsvJob", { "start": 101, "end": 200 } ),
    cbq.job( "ImportCsvJob", { "start": 201, "end": 300 } ),
    cbq.job( "ImportCsvJob", { "start": 301, "end": 400 } ),
    cbq.job( "ImportCsvJob", { "start": 401, "end": 500 } )
] )
.then( cbq.job( "ImportCsvSuccessfulJob" ) );

catch

Defines a Job to be dispatched the first time a job in the Batch fails.

Arguments
Type
Required
Default
Description

job

string | Job

true

The Job WireBox id or Job instance to execute the first time a job in the Batch fails.

properties

Struct

false

{}

A struct of properties for the Job instance. (Only used when providing a Job WireBox id.)

chain

array<Job>

false

[]

An array of Jobs to chain after this Job. (Only used when providing a Job WireBox id.)

queue

string

false

The queue the Job belongs to. (Only used when providing a Job WireBox id.)

connection

string

false

The Connection to dispatch the Job on. (Only used when providing a Job WireBox id.)

backoff

numeric

false

The amount of time, in seconds, to wait between attempting Jobs. (Only used when providing a Job WireBox id.)

timeout

numeric

false

The amount of time, in seconds, to wait before treating a Job as erroring. (Only used when providing a Job WireBox id.)

maxAttempts

numeric

false

The maximum amount of attempts of a Job before treating the Job as failed. (Only used when providing a Job WireBox id.)

Return: The PendingBatch instance.

cbq.batch( [
    cbq.job( "ImportCsvJob", { "start": 1, "end": 100 } ),
    cbq.job( "ImportCsvJob", { "start": 101, "end": 200 } ),
    cbq.job( "ImportCsvJob", { "start": 201, "end": 300 } ),
    cbq.job( "ImportCsvJob", { "start": 301, "end": 400 } ),
    cbq.job( "ImportCsvJob", { "start": 401, "end": 500 } )
] )
.catch( cbq.job( "ImportCsvFailedJob" ) );

finally

Defines a Job to be dispatched after all the jobs in the Batch have executed successfully or failed.

Arguments
Type
Required
Default
Description

job

string | Job

true

The Job WireBox id or Job instance to execute after all the jobs in the Batch have executed successfully or failed.

properties

Struct

false

{}

A struct of properties for the Job instance. (Only used when providing a Job WireBox id.)

chain

array<Job>

false

[]

An array of Jobs to chain after this Job. (Only used when providing a Job WireBox id.)

queue

string

false

The queue the Job belongs to. (Only used when providing a Job WireBox id.)

connection

string

false

The Connection to dispatch the Job on. (Only used when providing a Job WireBox id.)

backoff

numeric

false

The amount of time, in seconds, to wait between attempting Jobs. (Only used when providing a Job WireBox id.)

timeout

numeric

false

The amount of time, in seconds, to wait before treating a Job as erroring. (Only used when providing a Job WireBox id.)

maxAttempts

numeric

false

The maximum amount of attempts of a Job before treating the Job as failed. (Only used when providing a Job WireBox id.)

Return: The PendingBatch instance.

cbq.batch( [
    cbq.job( "ImportCsvJob", { "start": 1, "end": 100 } ),
    cbq.job( "ImportCsvJob", { "start": 101, "end": 200 } ),
    cbq.job( "ImportCsvJob", { "start": 201, "end": 300 } ),
    cbq.job( "ImportCsvJob", { "start": 301, "end": 400 } ),
    cbq.job( "ImportCsvJob", { "start": 401, "end": 500 } )
] )
.finally( cbq.job( "ImportCsvCompletedJob" ) );

Dispatching Batches

A PendingBatch must be dispatched before any of the jobs contained in it are dispatched. This is done using the dispatch method on the PendingBatch.

dispatch

Dispatches a PendingBatch and all the Jobs it contains.

Arguments
Type
Required
Default
Description

No arguments

Return: A Batch instance created from this PendingBatch.

Interacting with Batches

Migrations and SQL Scripts

cfmigrations

schema.create( "cbq_batches", function ( t ) {
    t.string( "id" ).primaryKey();
    t.string( "name" );
    t.unsignedInteger( "totalJobs" );
    t.unsignedInteger( "pendingJobs" );
    t.unsignedInteger( "failedJobs" );
    t.text( "failedJobIds" ); // JSON column
    t.text( "options" ).nullable(); // JSON column
    t.datetime( "createdDate" );
    t.datetime( "cancelledDate" ).nullable();
    t.datetime( "completedDate" ).nullable();
} );

MySQL

CREATE TABLE `cbq_batches` (
    `id` VARCHAR(255) NOT NULL,
    `name` VARCHAR(255) NOT NULL,
    `totalJobs` INTEGER UNSIGNED NOT NULL,
    `pendingJobs` INTEGER UNSIGNED NOT NULL,
    `failedJobs` INTEGER UNSIGNED NOT NULL,
    `failedJobIds` TEXT NOT NULL,
    `options` TEXT,
    `createdDate` DATETIME NOT NULL,
    `cancelledDate` DATETIME,
    `completedDate` DATETIME,
    CONSTRAINT `pk_cbq_batches_id` PRIMARY KEY (`id`)
)

SQL Server

CREATE TABLE [cbq_batches] (
    [id] VARCHAR(255) NOT NULL,
    [name] VARCHAR(255) NOT NULL,
    [totalJobs] INTEGER NOT NULL,
    [pendingJobs] INTEGER NOT NULL,
    [failedJobs] INTEGER NOT NULL,
    [failedJobIds] VARCHAR(MAX) NOT NULL,
    [options] VARCHAR(MAX),
    [createdDate] DATETIME2 NOT NULL,
    [cancelledDate] DATETIME2,
    [completedDate] DATETIME2,
    CONSTRAINT [pk_cbq_batches_id] PRIMARY KEY ([id])
)

Postgres

CREATE TABLE "cbq_batches" (
    "id" VARCHAR(255) NOT NULL,
    "name" VARCHAR(255) NOT NULL,
    "totalJobs" INTEGER NOT NULL,
    "pendingJobs" INTEGER NOT NULL,
    "failedJobs" INTEGER NOT NULL,
    "failedJobIds" TEXT NOT NULL,
    "options" TEXT,
    "createdDate" TIMESTAMP NOT NULL,
    "cancelledDate" TIMESTAMP,
    "completedDate" TIMESTAMP,
    CONSTRAINT "pk_cbq_batches_id" PRIMARY KEY ("id")
)

Oracle

CREATE TABLE "CBQ_BATCHES" (
    "ID" VARCHAR2(255) NOT NULL,
    "NAME" VARCHAR2(255) NOT NULL,
    "TOTALJOBS" NUMBER(10, 0) NOT NULL,
    "PENDINGJOBS" NUMBER(10, 0) NOT NULL,
    "FAILEDJOBS" NUMBER(10, 0) NOT NULL,
    "FAILEDJOBIDS" CLOB NOT NULL,
    "OPTIONS" CLOB,
    "CREATEDDATE" DATE NOT NULL,
    "CANCELLEDDATE" DATE,
    "COMPLETEDDATE" DATE,
    CONSTRAINT "PK_CBQ_BATCHES_ID" PRIMARY KEY ("ID")
)

SQLite

CREATE TABLE "cbq_batches" (
    "id" TEXT NOT NULL,
    "name" TEXT NOT NULL,
    "totalJobs" INTEGER NOT NULL,
    "pendingJobs" INTEGER NOT NULL,
    "failedJobs" INTEGER NOT NULL,
    "failedJobIds" TEXT NOT NULL,
    "options" TEXT,
    "createdDate" DATETIME NOT NULL,
    "cancelledDate" DATETIME,
    "completedDate" DATETIME,
    PRIMARY KEY ("id")
)

Last updated