Skip to content

Queues Domain

The queues domain provides background job processing with configurable handlers, retry logic, and status tracking for TypeScript backends. It is structured in strict Clean Architecture layers with zero npm dependencies in the domain and application layers.

Terminal window
npx @backcap/cli add queues

The Job entity is the aggregate root. It tracks job lifecycle through guarded state transitions: pendingprocessingcompleted | failed.

import { Job } from "./domains/queues/domain/entities/job.entity";
const result = Job.create({
id: crypto.randomUUID(),
type: "emails",
payload: { to: "user@example.com", subject: "Welcome!" },
});
if (result.isOk()) {
const job = result.unwrap();
console.log(job.status); // "pending"
console.log(job.attempts); // 0
job.start(3); // maxAttempts = 3
console.log(job.status); // "processing"
job.complete();
console.log(job.status); // "completed"
}
FieldTypeDescription
idstringUnique identifier (UUID)
typestringJob type (e.g. "emails", "reports")
payloadJobPayloadValidated payload value object
status"pending" | "processing" | "completed" | "failed"Current job status
attemptsnumberNumber of processing attempts
failureReasonstring | undefinedReason for failure (set on fail())
scheduledAtDateWhen the job is scheduled to run
createdAtDateTimestamp of creation

Job.create() returns Result<Job, InvalidJobPayload>.

State transitions:

  • start(maxAttempts) — moves to processing, increments attempts. Only works from pending or failed. Returns MaxAttemptsExceeded if limit reached.
  • complete() — moves to completed. Only works from processing.
  • fail(reason) — moves to failed, stores the failure reason. Only works from processing.
import { JobPayload } from "./domains/queues/domain/value-objects/job-payload.vo";
const result = JobPayload.create({ to: "user@example.com" });
// Result<JobPayload, InvalidJobPayload>
if (result.isOk()) {
const payload = result.unwrap();
console.log(payload.value); // { to: "user@example.com" }
}

Validates: must be a non-null plain object.

Error ClassConditionMessage
JobNotFoundNo job found for the given IDJob not found with id: "<id>"
InvalidJobPayloadPayload is null, undefined, or not a plain objectInvalid job payload
MaxAttemptsExceededJob has reached the maximum retry countJob "<id>" exceeded max attempts (<n>)
EventEmitted ByPayload
JobCompletedProcessJob use casejobId, type, completedAt, occurredAt
JobFailedProcessJob use casejobId, type, reason, attempts, occurredAt

Creates a new job entity, validates it, and persists it.

import { EnqueueJob } from "./domains/queues/application/use-cases/enqueue-job.use-case";
const enqueueJob = new EnqueueJob(jobRepository);
const result = await enqueueJob.execute({
type: "emails",
payload: { to: "user@example.com", subject: "Welcome!" },
scheduledAt: new Date("2025-01-01"),
});
// Result<{ jobId: string; scheduledAt: Date }, Error>

Possible failures: InvalidJobPayload

Processes a job by invoking the configured handler. Handles success, failure, and crashes.

import { ProcessJob } from "./domains/queues/application/use-cases/process-job.use-case";
const processJob = new ProcessJob(
jobRepository,
async (job) => {
// Your processing logic here
await sendEmail(job.payload);
return Result.ok(undefined);
},
3, // maxAttempts (optional, default: 3)
);
const result = await processJob.execute({ jobId: "job-123" });
// Result<{ status: "completed" | "failed"; completedAt: Date | null; event: JobCompleted | JobFailed }, Error>

Possible failures: JobNotFound, MaxAttemptsExceeded

The handler is wrapped in a try/catch — if it throws, the job is marked as failed with the error message.

Retrieves a job’s current status by ID.

import { GetJobStatus } from "./domains/queues/application/use-cases/get-job-status.use-case";
const getJobStatus = new GetJobStatus(jobRepository);
const result = await getJobStatus.execute({ jobId: "job-123" });
// Result<{ id, type, payload, status, attempts, scheduledAt, createdAt, failureReason? }, Error>

Possible failures: JobNotFound

export interface IJobRepository {
save(job: Job): Promise<void>;
findById(id: string): Promise<Job | undefined>;
findPending(type: string): Promise<Job[]>;
}
import {
createQueuesDomain,
IQueuesService,
} from "./domains/queues/contracts";
const queuesService: IQueuesService = createQueuesDomain({
jobRepository,
processHandler: async (job) => {
await sendEmail(job.payload);
return Result.ok(undefined);
},
maxAttempts: 5, // optional, default: 3
});
// IQueuesService interface:
// enqueue(input): Promise<Result<QueuesEnqueueOutput, Error>>
// process(input): Promise<Result<QueuesProcessOutput, Error>>
// getStatus(input): Promise<Result<QueuesGetStatusOutput, Error>>

This is the only import consumers need. The internal use case classes are implementation details.

domains/queues/
domain/
entities/job.entity.ts
value-objects/job-payload.vo.ts
errors/job-not-found.error.ts
errors/invalid-job-payload.error.ts
errors/max-attempts-exceeded.error.ts
events/job-completed.event.ts
events/job-failed.event.ts
application/
use-cases/enqueue-job.use-case.ts
use-cases/process-job.use-case.ts
use-cases/get-job-status.use-case.ts
ports/job-repository.port.ts
dto/enqueue-job.dto.ts
dto/process-job.dto.ts
dto/get-job-status.dto.ts
contracts/
queues.contract.ts
queues.factory.ts
index.ts
shared/
result.ts