Skip to content

Commit

Permalink
Feat (Core): PostgresJobQueue Implementation and Job Processing Enhan…
Browse files Browse the repository at this point in the history
…cements (#30175)

This PR introduces the `PostgresJobQueue` implementation, fulfilling the
requirement for a default job queue using PostgreSQL.
In the process of developing this implementation, we've made significant
improvements to the overall job processing system, including
enhancements to the `JobQueueManagerAPIImpl` and related components.

### Key Features of `PostgresJobQueue`:

1. **Efficient Job Management:** Implements all required operations for
job queue management, including creating, retrieving, updating, and
monitoring jobs.
2. **JSONB Field Usage:** Utilizes PostgreSQL's JSONB fields for storing
job parameters and results, allowing for flexible and efficient data
storage.
3. **Optimized Queries:** Implements efficient SQL queries, including
the use of SELECT FOR UPDATE SKIP LOCKED for concurrent job processing.
4. **Pagination Support:** Provides paginated results for job listings,
improving performance for large job sets.
5. **Comprehensive Job Lifecycle Management:** Handles all stages of a
job's lifecycle, from creation to completion or failure.
6. **Transactional Integrity:** Ensures data consistency through proper
transaction management.

### Improvements to Job Processing System:

1. **Enhanced Error Handling:** Implemented a more robust error handling
mechanism, including the use of custom exceptions for different error
scenarios.
2. **Improved Retry Logic:** Refined the retry mechanism to handle
transient failures more gracefully.
3. **Real-Time Job Monitoring:** Enhanced the real-time monitoring
capabilities, allowing for more responsive job status updates.
4. **Progress Tracking:** Improved the progress tracking system,
providing more accurate and timely updates on job completion percentage.
5. **Cancellation Support:** Implemented a more reliable job
cancellation process.

### `JobQueueManagerAPIImpl` Enhancements:

1. **Event-Driven Updates:** Implemented an event-driven system for job
status updates, improving system responsiveness.
2. **More Granular Job State Transitions:** Implemented more detailed
job state transitions, providing better insights into job processing
stages.

### Testing and Documentation:
Developed an extensive set of unit and integration tests for the
PostgresJobQueue implementation.

## Simple class diagram with main operations and interactions
```mermaid
classDiagram
    class JobQueueManagerAPI {
        <<interface>>
        +start()
        +close()
        +createJob(queueName: String, parameters: Map): String
        +getJob(jobId: String): Job
        +getJobs(page: int, pageSize: int): JobPaginatedResult
        +cancelJob(jobId: String)
        +watchJob(jobId: String, watcher: Consumer~Job~)
        +setRetryStrategy(queueName: String, retryStrategy: RetryStrategy)
    }

    class JobQueueManagerAPIImpl {
        -jobQueue: JobQueue
        -processors: Map~String, JobProcessor~
        -retryStrategies: Map~String, RetryStrategy~
        -circuitBreaker: CircuitBreaker
        +start()
        +close()
        -processJobs()
        -processJobWithRetry(job: Job)
    }

    class JobQueue {
        <<interface>>
        +createJob(queueName: String, parameters: Map): String
        +getJob(jobId: String): Job
        +getActiveJobs(queueName: String, page: int, pageSize: int): JobPaginatedResult
        +getCompletedJobs(queueName: String, startDate: LocalDateTime, endDate: LocalDateTime, page: int, pageSize: int): JobPaginatedResult
        +updateJobStatus(job: Job)
        +nextJob(): Job
        +updateJobProgress(jobId: String, progress: float)
    }

    class PostgresJobQueue {
        -objectMapper: ObjectMapper
        +createJob(queueName: String, parameters: Map): String
        +getJob(jobId: String): Job
        +getActiveJobs(queueName: String, page: int, pageSize: int): JobPaginatedResult
        +getCompletedJobs(queueName: String, startDate: LocalDateTime, endDate: LocalDateTime, page: int, pageSize: int): JobPaginatedResult
        +updateJobStatus(job: Job)
        +nextJob(): Job
        +updateJobProgress(jobId: String, progress: float)
    }

    class Job {
        +id: String
        +queueName: String
        +state: JobState
        +parameters: Map
        +progress: float
        +createdAt: LocalDateTime
        +updatedAt: LocalDateTime
        +startedAt: Optional~LocalDateTime~
        +completedAt: Optional~LocalDateTime~
        +result: Optional~JobResult~
    }

    class JobProcessor {
        <<interface>>
        +process(job: Job)
        +getResultMetadata(job: Job): Map
    }

    class Cancellable {
        <<interface>>
        +cancel(job: Job)
    }

    class RetryStrategy {
        <<interface>>
        +shouldRetry(job: Job, exceptionClass: Class): boolean
        +nextRetryDelay(job: Job): long
    }

    class CircuitBreaker {
        -failureThreshold: int
        -resetTimeout: long
        +allowRequest(): boolean
        +recordFailure()
        +reset()
    }

    class RealTimeJobMonitor {
        -jobWatchers: Map~String, List~Consumer~Job~~~
        +registerWatcher(jobId: String, watcher: Consumer~Job~)
        +updateWatchers(updatedJobs: List~Job~)
    }

    JobQueueManagerAPI <|.. JobQueueManagerAPIImpl
    JobQueueManagerAPIImpl --> JobQueue
    JobQueueManagerAPIImpl --> JobProcessor
    JobQueueManagerAPIImpl --> RetryStrategy
    JobQueueManagerAPIImpl --> CircuitBreaker
    JobQueueManagerAPIImpl --> RealTimeJobMonitor
    JobQueue <|.. PostgresJobQueue
    JobQueueManagerAPIImpl ..> Job
    JobProcessor <|-- Cancellable
```
  • Loading branch information
jgambarios authored Sep 30, 2024
1 parent b680949 commit b9dfc8a
Show file tree
Hide file tree
Showing 45 changed files with 3,754 additions and 460 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,18 @@ public class JobQueueConfig {
*/
private final int threadPoolSize;

// The interval in milliseconds to poll for job updates
private final int pollJobUpdatesIntervalMilliseconds;

/**
* Constructs a new JobQueueConfig
*
* @param threadPoolSize The number of threads to use for job processing.
* @param pollJobUpdatesIntervalMilliseconds The interval in milliseconds to poll for job updates.
*/
public JobQueueConfig(int threadPoolSize) {
public JobQueueConfig(int threadPoolSize, int pollJobUpdatesIntervalMilliseconds) {
this.threadPoolSize = threadPoolSize;
this.pollJobUpdatesIntervalMilliseconds = pollJobUpdatesIntervalMilliseconds;
}

/**
Expand All @@ -28,4 +33,13 @@ public int getThreadPoolSize() {
return threadPoolSize;
}

/**
* Gets the interval in milliseconds to poll for job updates.
*
* @return The interval in milliseconds to poll for job updates.
*/
public int getPollJobUpdatesIntervalMilliseconds() {
return pollJobUpdatesIntervalMilliseconds;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,11 @@ public class JobQueueConfigProducer {
"JOB_QUEUE_THREAD_POOL_SIZE", 10
);

// The interval in milliseconds to poll for job updates.
static final int DEFAULT_POLL_JOB_UPDATES_INTERVAL_MILLISECONDS = Config.getIntProperty(
"JOB_QUEUE_POLL_JOB_UPDATES_INTERVAL_MILLISECONDS", 1000
);

/**
* Produces a JobQueueConfig object. This method is called by the CDI container to create a
* JobQueueConfig instance when it is necessary for dependency injection.
Expand All @@ -24,7 +29,10 @@ public class JobQueueConfigProducer {
*/
@Produces
public JobQueueConfig produceJobQueueConfig() {
return new JobQueueConfig(DEFAULT_THREAD_POOL_SIZE);
return new JobQueueConfig(
DEFAULT_THREAD_POOL_SIZE,
DEFAULT_POLL_JOB_UPDATES_INTERVAL_MILLISECONDS
);
}

}
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
package com.dotcms.jobs.business.api;

import com.dotcms.jobs.business.error.CircuitBreaker;
import com.dotcms.jobs.business.error.JobCancellationException;
import com.dotcms.jobs.business.error.ProcessorNotFoundException;
import com.dotcms.jobs.business.error.JobProcessorNotFoundException;
import com.dotcms.jobs.business.error.RetryStrategy;
import com.dotcms.jobs.business.job.Job;
import com.dotcms.jobs.business.job.JobPaginatedResult;
import com.dotcms.jobs.business.processor.JobProcessor;
import java.util.List;
import com.dotcms.jobs.business.queue.JobQueue;
import com.dotmarketing.exception.DotDataException;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
Expand All @@ -15,7 +16,7 @@
* Defines the contract for interacting with the job queue system. This interface provides methods
* for managing jobs, processors, and the overall state of the job queue.
*/
public interface JobQueueManagerAPI extends AutoCloseable {
public interface JobQueueManagerAPI {

/**
* Starts the job queue manager, initializing the thread pool for job processing.
Expand Down Expand Up @@ -62,35 +63,38 @@ public interface JobQueueManagerAPI extends AutoCloseable {
* @param queueName The name of the queue
* @param parameters The parameters for the job
* @return The ID of the created job
* @throws ProcessorNotFoundException if no processor is registered for the specified queue
* @throws JobProcessorNotFoundException if no processor is registered for the specified queue
* @throws DotDataException if there's an error creating the job
*/
String createJob(String queueName, Map<String, Object> parameters)
throws ProcessorNotFoundException;
throws JobProcessorNotFoundException, DotDataException;

/**
* Retrieves a job by its ID.
*
* @param jobId The ID of the job
* @return The Job object, or null if not found
* @throws DotDataException if there's an error fetching the job
*/
Job getJob(String jobId);
Job getJob(String jobId) throws DotDataException;

/**
* Retrieves a list of jobs.
*
* @param page The page number
* @param pageSize The number of jobs per page
* @return A list of Job objects
* @return A result object containing the list of active jobs and pagination information.
* @throws DotDataException if there's an error fetching the jobs
*/
List<Job> getJobs(int page, int pageSize);
JobPaginatedResult getJobs(int page, int pageSize) throws DotDataException;

/**
* Cancels a job.
*
* @param jobId The ID of the job to cancel
* @throws JobCancellationException if the job cannot be cancelled
* @throws DotDataException if there's an error cancelling the job
*/
void cancelJob(String jobId) throws JobCancellationException;
void cancelJob(String jobId) throws DotDataException;

/**
* Registers a watcher for a specific job.
Expand All @@ -113,6 +117,11 @@ String createJob(String queueName, Map<String, Object> parameters)
*/
CircuitBreaker getCircuitBreaker();

/**
* @return The JobQueue instance
*/
JobQueue getJobQueue();

/**
* @return The size of the thread pool
*/
Expand Down
Loading

0 comments on commit b9dfc8a

Please sign in to comment.