TSK-1705: Add a job to update priority for unfinished tasks
This commit is contained in:
parent
167d47f951
commit
e9540d04e4
|
|
@ -58,6 +58,10 @@ public class TaskanaEngineConfiguration {
|
||||||
private static final String TASKANA_JOB_CLEANUP_MINIMUM_AGE = "taskana.jobs.cleanup.minimumAge";
|
private static final String TASKANA_JOB_CLEANUP_MINIMUM_AGE = "taskana.jobs.cleanup.minimumAge";
|
||||||
private static final String TASKANA_JOB_TASK_CLEANUP_ALL_COMPLETED_SAME_PARENT_BUSINESS =
|
private static final String TASKANA_JOB_TASK_CLEANUP_ALL_COMPLETED_SAME_PARENT_BUSINESS =
|
||||||
"taskana.jobs.cleanup.allCompletedSameParentBusiness";
|
"taskana.jobs.cleanup.allCompletedSameParentBusiness";
|
||||||
|
private static final String TASKANA_JOB_PRIORITY_BATCHSIZE = "taskana.jobs.priority.batchSize";
|
||||||
|
private static final String TASKANA_JOB_PRIORITY_RUN_EVERY = "taskana.jobs.priority.runEvery";
|
||||||
|
private static final String TASKANA_JOB_PRIORITY_FIRST_RUN = "taskana.jobs.priority.firstRunAt";
|
||||||
|
private static final String TASKANA_JOB_PRIORITY_ACTIVE = "taskana.jobs.priority.active";
|
||||||
private static final String TASKANA_DOMAINS_PROPERTY = "taskana.domains";
|
private static final String TASKANA_DOMAINS_PROPERTY = "taskana.domains";
|
||||||
private static final String TASKANA_CLASSIFICATION_TYPES_PROPERTY =
|
private static final String TASKANA_CLASSIFICATION_TYPES_PROPERTY =
|
||||||
"taskana.classification.types";
|
"taskana.classification.types";
|
||||||
|
|
@ -104,6 +108,11 @@ public class TaskanaEngineConfiguration {
|
||||||
private Duration cleanupJobMinimumAge = Duration.parse("P14D");
|
private Duration cleanupJobMinimumAge = Duration.parse("P14D");
|
||||||
private boolean taskCleanupJobAllCompletedSameParentBusiness = true;
|
private boolean taskCleanupJobAllCompletedSameParentBusiness = true;
|
||||||
|
|
||||||
|
private int priorityJobBatchSize = 100;
|
||||||
|
private Instant priorityJobFirstRun = Instant.parse("2018-01-01T00:00:00Z");
|
||||||
|
private Duration priorityJobRunEvery = Duration.parse("P1D");
|
||||||
|
private boolean priorityJobActive = false;
|
||||||
|
|
||||||
public TaskanaEngineConfiguration(
|
public TaskanaEngineConfiguration(
|
||||||
DataSource dataSource, boolean useManagedTransactions, String schemaName) {
|
DataSource dataSource, boolean useManagedTransactions, String schemaName) {
|
||||||
this(dataSource, useManagedTransactions, true, schemaName);
|
this(dataSource, useManagedTransactions, true, schemaName);
|
||||||
|
|
@ -349,6 +358,38 @@ public class TaskanaEngineConfiguration {
|
||||||
taskCleanupJobAllCompletedSameParentBusiness;
|
taskCleanupJobAllCompletedSameParentBusiness;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public int getPriorityJobBatchSize() {
|
||||||
|
return priorityJobBatchSize;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setPriorityJobBatchSize(int priorityJobBatchSize) {
|
||||||
|
this.priorityJobBatchSize = priorityJobBatchSize;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Instant getPriorityJobFirstRun() {
|
||||||
|
return priorityJobFirstRun;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setPriorityJobFirstRun(Instant priorityJobFirstRun) {
|
||||||
|
this.priorityJobFirstRun = priorityJobFirstRun;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Duration getPriorityJobRunEvery() {
|
||||||
|
return priorityJobRunEvery;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setPriorityJobRunEvery(Duration priorityJobRunEvery) {
|
||||||
|
this.priorityJobRunEvery = priorityJobRunEvery;
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean isPriorityJobActive() {
|
||||||
|
return priorityJobActive;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setPriorityJobActive(boolean priorityJobActive) {
|
||||||
|
this.priorityJobActive = priorityJobActive;
|
||||||
|
}
|
||||||
|
|
||||||
public String getSchemaName() {
|
public String getSchemaName() {
|
||||||
return schemaName;
|
return schemaName;
|
||||||
}
|
}
|
||||||
|
|
@ -405,6 +446,18 @@ public class TaskanaEngineConfiguration {
|
||||||
parseProperty(props, TASKANA_JOB_CLEANUP_MINIMUM_AGE, Duration::parse)
|
parseProperty(props, TASKANA_JOB_CLEANUP_MINIMUM_AGE, Duration::parse)
|
||||||
.ifPresent(this::setCleanupJobMinimumAge);
|
.ifPresent(this::setCleanupJobMinimumAge);
|
||||||
|
|
||||||
|
parseProperty(props, TASKANA_JOB_PRIORITY_BATCHSIZE, Integer::parseInt)
|
||||||
|
.ifPresent(this::setPriorityJobBatchSize);
|
||||||
|
|
||||||
|
parseProperty(props, TASKANA_JOB_PRIORITY_RUN_EVERY, Duration::parse)
|
||||||
|
.ifPresent(this::setPriorityJobRunEvery);
|
||||||
|
|
||||||
|
parseProperty(props, TASKANA_JOB_PRIORITY_FIRST_RUN, Instant::parse)
|
||||||
|
.ifPresent(this::setPriorityJobFirstRun);
|
||||||
|
|
||||||
|
parseProperty(props, TASKANA_JOB_PRIORITY_ACTIVE, Boolean::parseBoolean)
|
||||||
|
.ifPresent(this::setPriorityJobActive);
|
||||||
|
|
||||||
parseProperty(
|
parseProperty(
|
||||||
props,
|
props,
|
||||||
TASKANA_JOB_TASK_CLEANUP_ALL_COMPLETED_SAME_PARENT_BUSINESS,
|
TASKANA_JOB_TASK_CLEANUP_ALL_COMPLETED_SAME_PARENT_BUSINESS,
|
||||||
|
|
|
||||||
|
|
@ -8,6 +8,7 @@ import java.util.Objects;
|
||||||
import pro.taskana.classification.internal.jobs.ClassificationChangedJob;
|
import pro.taskana.classification.internal.jobs.ClassificationChangedJob;
|
||||||
import pro.taskana.task.internal.jobs.TaskCleanupJob;
|
import pro.taskana.task.internal.jobs.TaskCleanupJob;
|
||||||
import pro.taskana.task.internal.jobs.TaskRefreshJob;
|
import pro.taskana.task.internal.jobs.TaskRefreshJob;
|
||||||
|
import pro.taskana.task.internal.jobs.TaskUpdatePriorityJob;
|
||||||
import pro.taskana.workbasket.internal.jobs.WorkbasketCleanupJob;
|
import pro.taskana.workbasket.internal.jobs.WorkbasketCleanupJob;
|
||||||
|
|
||||||
/** This class holds all data that go into the Job table. */
|
/** This class holds all data that go into the Job table. */
|
||||||
|
|
@ -173,6 +174,7 @@ public class ScheduledJob {
|
||||||
CLASSIFICATION_CHANGED_JOB(ClassificationChangedJob.class.getName()),
|
CLASSIFICATION_CHANGED_JOB(ClassificationChangedJob.class.getName()),
|
||||||
TASK_REFRESH_JOB(TaskRefreshJob.class.getName()),
|
TASK_REFRESH_JOB(TaskRefreshJob.class.getName()),
|
||||||
TASK_CLEANUP_JOB(TaskCleanupJob.class.getName()),
|
TASK_CLEANUP_JOB(TaskCleanupJob.class.getName()),
|
||||||
|
TASK_UPDATE_PRIORITY_JOB(TaskUpdatePriorityJob.class.getName()),
|
||||||
WORKBASKET_CLEANUP_JOB(WorkbasketCleanupJob.class.getName()),
|
WORKBASKET_CLEANUP_JOB(WorkbasketCleanupJob.class.getName()),
|
||||||
HISTORY_CLEANUP_JOB("pro.taskana.simplehistory.impl.jobs.HistoryCleanupJob");
|
HISTORY_CLEANUP_JOB("pro.taskana.simplehistory.impl.jobs.HistoryCleanupJob");
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -13,8 +13,8 @@ import pro.taskana.common.internal.transaction.TaskanaTransactionProvider;
|
||||||
/** Abstract base for all background jobs of TASKANA. */
|
/** Abstract base for all background jobs of TASKANA. */
|
||||||
public abstract class AbstractTaskanaJob implements TaskanaJob {
|
public abstract class AbstractTaskanaJob implements TaskanaJob {
|
||||||
|
|
||||||
protected final Instant firstRun;
|
protected Instant firstRun;
|
||||||
protected final Duration runEvery;
|
protected Duration runEvery;
|
||||||
protected final TaskanaEngineImpl taskanaEngineImpl;
|
protected final TaskanaEngineImpl taskanaEngineImpl;
|
||||||
protected final TaskanaTransactionProvider txProvider;
|
protected final TaskanaTransactionProvider txProvider;
|
||||||
protected final ScheduledJob scheduledJob;
|
protected final ScheduledJob scheduledJob;
|
||||||
|
|
|
||||||
|
|
@ -43,6 +43,10 @@ public class PriorityServiceManager {
|
||||||
return Objects.nonNull(singleton) && singleton.enabled;
|
return Objects.nonNull(singleton) && singleton.enabled;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public long countRegisteredServices() {
|
||||||
|
return StreamSupport.stream(serviceLoader.spliterator(), false).count();
|
||||||
|
}
|
||||||
|
|
||||||
public Optional<Integer> calculatePriorityOfTask(TaskSummary task) {
|
public Optional<Integer> calculatePriorityOfTask(TaskSummary task) {
|
||||||
if (LOGGER.isDebugEnabled()) {
|
if (LOGGER.isDebugEnabled()) {
|
||||||
LOGGER.debug("Sending task to PriorityServiceProviders: {}", task);
|
LOGGER.debug("Sending task to PriorityServiceProviders: {}", task);
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,99 @@
|
||||||
|
package pro.taskana.task.internal.jobs;
|
||||||
|
|
||||||
|
import static pro.taskana.common.internal.util.CollectionUtil.partitionBasedOnSize;
|
||||||
|
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import pro.taskana.common.api.ScheduledJob;
|
||||||
|
import pro.taskana.common.api.ScheduledJob.Type;
|
||||||
|
import pro.taskana.common.api.TaskanaEngine;
|
||||||
|
import pro.taskana.common.api.exceptions.SystemException;
|
||||||
|
import pro.taskana.common.internal.JobServiceImpl;
|
||||||
|
import pro.taskana.common.internal.jobs.AbstractTaskanaJob;
|
||||||
|
import pro.taskana.common.internal.transaction.TaskanaTransactionProvider;
|
||||||
|
import pro.taskana.task.internal.jobs.helper.TaskUpdatePriorityWorker;
|
||||||
|
|
||||||
|
/** Job to recalculate the priority of each task that is not in an endstate. */
|
||||||
|
public class TaskUpdatePriorityJob extends AbstractTaskanaJob {
|
||||||
|
|
||||||
|
private static final Logger LOGGER = LoggerFactory.getLogger(TaskUpdatePriorityJob.class);
|
||||||
|
|
||||||
|
private final int batchSize;
|
||||||
|
private final boolean isJobActive;
|
||||||
|
|
||||||
|
public TaskUpdatePriorityJob(TaskanaEngine taskanaEngine) {
|
||||||
|
this(taskanaEngine, null, null);
|
||||||
|
}
|
||||||
|
|
||||||
|
public TaskUpdatePriorityJob(
|
||||||
|
TaskanaEngine taskanaEngine,
|
||||||
|
TaskanaTransactionProvider txProvider,
|
||||||
|
ScheduledJob scheduledJob) {
|
||||||
|
super(taskanaEngine, txProvider, scheduledJob, true);
|
||||||
|
batchSize = taskanaEngine.getConfiguration().getPriorityJobBatchSize();
|
||||||
|
isJobActive = taskanaEngine.getConfiguration().isPriorityJobActive();
|
||||||
|
runEvery = taskanaEngine.getConfiguration().getPriorityJobRunEvery();
|
||||||
|
firstRun = taskanaEngine.getConfiguration().getPriorityJobFirstRun();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void execute() {
|
||||||
|
if (!isJobActive()) {
|
||||||
|
LOGGER.debug("Job to update task priority is not active.");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
TaskUpdatePriorityWorker worker = new TaskUpdatePriorityWorker(taskanaEngineImpl);
|
||||||
|
LOGGER.info("Running job to calculate all non finished task priorities");
|
||||||
|
try {
|
||||||
|
partitionBasedOnSize(worker.getAllRelevantTaskIds(), getBatchSize())
|
||||||
|
.forEach(worker::executeBatch);
|
||||||
|
LOGGER.info("Job to update priority of tasks has finished.");
|
||||||
|
} catch (Exception e) {
|
||||||
|
throw new SystemException("Error while processing TaskUpdatePriorityJob.", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean isJobActive() {
|
||||||
|
return isJobActive;
|
||||||
|
}
|
||||||
|
|
||||||
|
public int getBatchSize() {
|
||||||
|
return batchSize;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Initializes the TaskUpdatePriorityJob schedule. <br>
|
||||||
|
* All scheduled jobs are cancelled/deleted and a new one is scheduled.
|
||||||
|
*
|
||||||
|
* @param taskanaEngine the TASKANA engine.
|
||||||
|
*/
|
||||||
|
public static void initializeSchedule(TaskanaEngine taskanaEngine) {
|
||||||
|
JobServiceImpl jobService = (JobServiceImpl) taskanaEngine.getJobService();
|
||||||
|
TaskUpdatePriorityJob job = new TaskUpdatePriorityJob(taskanaEngine);
|
||||||
|
jobService.deleteJobs(job.getType());
|
||||||
|
job.scheduleNextJob();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected Type getType() {
|
||||||
|
return Type.TASK_UPDATE_PRIORITY_JOB;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String toString() {
|
||||||
|
return "TaskUpdatePriorityJob [firstRun="
|
||||||
|
+ firstRun
|
||||||
|
+ ", runEvery="
|
||||||
|
+ runEvery
|
||||||
|
+ ", taskanaEngineImpl="
|
||||||
|
+ taskanaEngineImpl
|
||||||
|
+ ", txProvider="
|
||||||
|
+ txProvider
|
||||||
|
+ ", scheduledJob="
|
||||||
|
+ scheduledJob
|
||||||
|
+ ", batchSize="
|
||||||
|
+ batchSize
|
||||||
|
+ "]";
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,45 @@
|
||||||
|
package pro.taskana.task.internal.jobs.helper;
|
||||||
|
|
||||||
|
import java.sql.Connection;
|
||||||
|
import java.sql.SQLException;
|
||||||
|
import java.util.Objects;
|
||||||
|
import javax.sql.DataSource;
|
||||||
|
|
||||||
|
import pro.taskana.common.api.TaskanaEngine;
|
||||||
|
import pro.taskana.common.api.exceptions.SystemException;
|
||||||
|
import pro.taskana.common.internal.util.CheckedConsumer;
|
||||||
|
|
||||||
|
/** Run low level SQL Statements reusing the taskana datasource. */
|
||||||
|
public class SqlConnectionRunner {
|
||||||
|
|
||||||
|
private final DataSource dataSource;
|
||||||
|
|
||||||
|
public SqlConnectionRunner(TaskanaEngine taskanaEngine) {
|
||||||
|
this(
|
||||||
|
Objects.requireNonNull(taskanaEngine, "Taskana engine may not be null")
|
||||||
|
.getConfiguration()
|
||||||
|
.getDatasource());
|
||||||
|
}
|
||||||
|
|
||||||
|
public SqlConnectionRunner(DataSource dataSource) {
|
||||||
|
this.dataSource = Objects.requireNonNull(dataSource, "Datasource may not be null.");
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Run custom queries on a given connection. Please check for committing changes.
|
||||||
|
*
|
||||||
|
* @param consumer consumes a connection.
|
||||||
|
* @throws SystemException will pass on any checked SQLException as a runtime SystemException
|
||||||
|
*/
|
||||||
|
public void runWithConnection(CheckedConsumer<Connection, SQLException> consumer) {
|
||||||
|
try (Connection connection = getConnection()) {
|
||||||
|
consumer.accept(connection);
|
||||||
|
} catch (SQLException e) {
|
||||||
|
throw new SystemException("SQL error while running low level SQL", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public Connection getConnection() throws SQLException {
|
||||||
|
return dataSource.getConnection();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,30 @@
|
||||||
|
package pro.taskana.task.internal.jobs.helper;
|
||||||
|
|
||||||
|
import java.sql.Connection;
|
||||||
|
import java.sql.PreparedStatement;
|
||||||
|
import java.sql.SQLException;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
/** Update a lot of priorities for tasks in a performant way without the need to write SQL. */
|
||||||
|
public class TaskUpdatePriorityBatchStatement {
|
||||||
|
|
||||||
|
private static final Logger LOGGER =
|
||||||
|
LoggerFactory.getLogger(TaskUpdatePriorityBatchStatement.class);
|
||||||
|
private final PreparedStatement preparedStatement;
|
||||||
|
|
||||||
|
public TaskUpdatePriorityBatchStatement(Connection connection) throws SQLException {
|
||||||
|
preparedStatement = connection.prepareStatement("update TASK set PRIORITY = ? where ID = ?");
|
||||||
|
}
|
||||||
|
|
||||||
|
public void addPriorityUpdate(String taskId, Integer priority) throws SQLException {
|
||||||
|
preparedStatement.setInt(1, priority);
|
||||||
|
preparedStatement.setString(2, taskId);
|
||||||
|
LOGGER.debug("Job update priority to {} for task {}.", priority, taskId);
|
||||||
|
preparedStatement.addBatch();
|
||||||
|
}
|
||||||
|
|
||||||
|
public void executeBatch() throws SQLException {
|
||||||
|
preparedStatement.executeBatch();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,83 @@
|
||||||
|
package pro.taskana.task.internal.jobs.helper;
|
||||||
|
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Optional;
|
||||||
|
import java.util.function.Predicate;
|
||||||
|
|
||||||
|
import pro.taskana.common.api.BaseQuery.SortDirection;
|
||||||
|
import pro.taskana.common.api.TaskanaEngine;
|
||||||
|
import pro.taskana.spi.priority.internal.PriorityServiceManager;
|
||||||
|
import pro.taskana.task.api.TaskQueryColumnName;
|
||||||
|
import pro.taskana.task.api.TaskState;
|
||||||
|
import pro.taskana.task.api.models.TaskSummary;
|
||||||
|
|
||||||
|
public class TaskUpdatePriorityWorker {
|
||||||
|
|
||||||
|
private final SqlConnectionRunner sqlConnectionRunner;
|
||||||
|
private final TaskanaEngine taskanaEngine;
|
||||||
|
|
||||||
|
public TaskUpdatePriorityWorker(TaskanaEngine taskanaEngine) {
|
||||||
|
this.taskanaEngine = taskanaEngine;
|
||||||
|
this.sqlConnectionRunner = new SqlConnectionRunner(taskanaEngine);
|
||||||
|
}
|
||||||
|
|
||||||
|
public List<String> executeBatch(List<String> taskIds) {
|
||||||
|
|
||||||
|
List<String> updatedTaskIds = new ArrayList<>();
|
||||||
|
sqlConnectionRunner.runWithConnection(
|
||||||
|
connection -> {
|
||||||
|
TaskUpdatePriorityBatchStatement taskUpdateBatch =
|
||||||
|
new TaskUpdatePriorityBatchStatement(connection);
|
||||||
|
|
||||||
|
List<TaskSummary> list = getTaskSummariesByIds(taskIds);
|
||||||
|
for (TaskSummary taskSummary : list) {
|
||||||
|
Optional<Integer> calculatedPriority = getCalculatedPriority(taskSummary);
|
||||||
|
if (calculatedPriority.isPresent()) {
|
||||||
|
final String taskId = taskSummary.getId();
|
||||||
|
updatedTaskIds.add(taskId);
|
||||||
|
taskUpdateBatch.addPriorityUpdate(taskId, calculatedPriority.get());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
taskUpdateBatch.executeBatch();
|
||||||
|
|
||||||
|
// don't forget to save changes
|
||||||
|
if (!connection.getAutoCommit()) {
|
||||||
|
connection.commit();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
return updatedTaskIds;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This will return all relevant task ids. This may result in a LOT! of ids
|
||||||
|
*
|
||||||
|
* @return list of task ids.
|
||||||
|
*/
|
||||||
|
public List<String> getAllRelevantTaskIds() {
|
||||||
|
return taskanaEngine
|
||||||
|
.getTaskService()
|
||||||
|
.createTaskQuery()
|
||||||
|
.stateNotIn(TaskState.END_STATES)
|
||||||
|
.listValues(TaskQueryColumnName.ID, SortDirection.ASCENDING);
|
||||||
|
}
|
||||||
|
|
||||||
|
public List<TaskSummary> getTaskSummariesByIds(List<String> taskIds) {
|
||||||
|
return taskanaEngine
|
||||||
|
.getTaskService()
|
||||||
|
.createTaskQuery()
|
||||||
|
.idIn(taskIds.toArray(new String[0]))
|
||||||
|
.list();
|
||||||
|
}
|
||||||
|
|
||||||
|
public Optional<Integer> getCalculatedPriority(TaskSummary taskSummary) {
|
||||||
|
return PriorityServiceManager.getInstance()
|
||||||
|
.calculatePriorityOfTask(taskSummary)
|
||||||
|
.filter(hasDifferentPriority(taskSummary));
|
||||||
|
}
|
||||||
|
|
||||||
|
public static Predicate<Integer> hasDifferentPriority(TaskSummary taskSummary) {
|
||||||
|
return prio -> taskSummary != null && prio != taskSummary.getPriority();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,141 @@
|
||||||
|
package acceptance.jobs;
|
||||||
|
|
||||||
|
import static org.assertj.core.api.Assertions.assertThat;
|
||||||
|
import static org.assertj.core.api.Assertions.assertThatCode;
|
||||||
|
import static org.assertj.core.api.Assertions.assertThatThrownBy;
|
||||||
|
|
||||||
|
import acceptance.AbstractAccTest;
|
||||||
|
import java.time.Duration;
|
||||||
|
import java.time.Instant;
|
||||||
|
import java.time.temporal.ChronoUnit;
|
||||||
|
import java.util.List;
|
||||||
|
import org.junit.jupiter.api.BeforeEach;
|
||||||
|
import org.junit.jupiter.api.Test;
|
||||||
|
import org.junit.jupiter.api.extension.ExtendWith;
|
||||||
|
|
||||||
|
import pro.taskana.common.api.BaseQuery.SortDirection;
|
||||||
|
import pro.taskana.common.api.ScheduledJob;
|
||||||
|
import pro.taskana.common.api.ScheduledJob.Type;
|
||||||
|
import pro.taskana.common.api.exceptions.SystemException;
|
||||||
|
import pro.taskana.common.test.security.JaasExtension;
|
||||||
|
import pro.taskana.common.test.security.WithAccessId;
|
||||||
|
import pro.taskana.task.api.TaskQueryColumnName;
|
||||||
|
import pro.taskana.task.internal.jobs.TaskUpdatePriorityJob;
|
||||||
|
|
||||||
|
/** Acceptance test for all "jobs tasks runner" scenarios. */
|
||||||
|
@ExtendWith(JaasExtension.class)
|
||||||
|
class TaskUpdatePriorityJobAccTest extends AbstractAccTest {
|
||||||
|
|
||||||
|
@BeforeEach
|
||||||
|
void before() throws Exception {
|
||||||
|
// required if single tests modify database
|
||||||
|
// TODO split test class into readOnly & modifying tests to improve performance
|
||||||
|
resetDb(true);
|
||||||
|
|
||||||
|
taskanaEngineConfiguration.setPriorityJobActive(true);
|
||||||
|
taskanaEngineConfiguration.setPriorityJobBatchSize(20);
|
||||||
|
taskanaEngineConfiguration.setPriorityJobRunEvery(Duration.ofMinutes(30));
|
||||||
|
taskanaEngineConfiguration.setPriorityJobFirstRun(Instant.parse("2007-12-03T10:15:30.00Z"));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
@WithAccessId(user = "admin")
|
||||||
|
void should_workWithoutException() {
|
||||||
|
// given
|
||||||
|
TaskUpdatePriorityJob job = new TaskUpdatePriorityJob(taskanaEngine);
|
||||||
|
|
||||||
|
// then
|
||||||
|
assertThatCode(job::execute).doesNotThrowAnyException();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
@WithAccessId(user = "admin")
|
||||||
|
void should_catchException_When_executedWithWrongSettings() {
|
||||||
|
// given
|
||||||
|
taskanaEngineConfiguration.setPriorityJobBatchSize(0);
|
||||||
|
TaskUpdatePriorityJob job = new TaskUpdatePriorityJob(taskanaEngine);
|
||||||
|
|
||||||
|
// then
|
||||||
|
assertThatThrownBy(job::execute).isInstanceOf(SystemException.class);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
@WithAccessId(user = "admin")
|
||||||
|
void should_doNothing_When_NotActive() {
|
||||||
|
// given
|
||||||
|
taskanaEngineConfiguration.setPriorityJobActive(false);
|
||||||
|
TaskUpdatePriorityJob job = new TaskUpdatePriorityJob(taskanaEngine);
|
||||||
|
List<String> priorities =
|
||||||
|
taskanaEngine
|
||||||
|
.getTaskService()
|
||||||
|
.createTaskQuery()
|
||||||
|
.listValues(TaskQueryColumnName.PRIORITY, SortDirection.ASCENDING);
|
||||||
|
|
||||||
|
// when
|
||||||
|
job.execute();
|
||||||
|
|
||||||
|
// then
|
||||||
|
List<String> prioritiesUnchanged =
|
||||||
|
taskanaEngine
|
||||||
|
.getTaskService()
|
||||||
|
.createTaskQuery()
|
||||||
|
.listValues(TaskQueryColumnName.PRIORITY, SortDirection.ASCENDING);
|
||||||
|
|
||||||
|
assertThat(priorities).isEqualTo(prioritiesUnchanged);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
@WithAccessId(user = "admin")
|
||||||
|
void should_ScheduleNextJob() throws NoSuchFieldException, IllegalAccessException {
|
||||||
|
// given
|
||||||
|
final Instant someTimeInTheFuture = Instant.now().plus(10, ChronoUnit.DAYS);
|
||||||
|
|
||||||
|
// when
|
||||||
|
TaskUpdatePriorityJob.initializeSchedule(taskanaEngine);
|
||||||
|
|
||||||
|
// then
|
||||||
|
assertThat(getJobMapper().findJobsToRun(someTimeInTheFuture))
|
||||||
|
.hasSizeGreaterThanOrEqualTo(1)
|
||||||
|
.extracting(ScheduledJob::getType)
|
||||||
|
.contains(Type.TASK_UPDATE_PRIORITY_JOB);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
@WithAccessId(user = "admin")
|
||||||
|
void should_readConfigurationForBatchSize() {
|
||||||
|
// given
|
||||||
|
taskanaEngineConfiguration.setPriorityJobBatchSize(20);
|
||||||
|
|
||||||
|
// when
|
||||||
|
final TaskUpdatePriorityJob job = new TaskUpdatePriorityJob(taskanaEngine);
|
||||||
|
|
||||||
|
// then
|
||||||
|
assertThat(job.getBatchSize()).isEqualTo(20);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
@WithAccessId(user = "admin")
|
||||||
|
void should_readConfigurationForIsActive() {
|
||||||
|
// given
|
||||||
|
taskanaEngineConfiguration.setPriorityJobActive(false);
|
||||||
|
|
||||||
|
// when
|
||||||
|
final TaskUpdatePriorityJob job = new TaskUpdatePriorityJob(taskanaEngine);
|
||||||
|
|
||||||
|
// then
|
||||||
|
assertThat(job.isJobActive()).isFalse();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void should_containInformation_When_convertedToString() {
|
||||||
|
// given
|
||||||
|
taskanaEngineConfiguration.setPriorityJobBatchSize(543);
|
||||||
|
taskanaEngineConfiguration.setPriorityJobRunEvery(Duration.ofMinutes(30));
|
||||||
|
|
||||||
|
// when
|
||||||
|
final TaskUpdatePriorityJob job = new TaskUpdatePriorityJob(taskanaEngine);
|
||||||
|
|
||||||
|
// then
|
||||||
|
assertThat(job).asString().contains("543").contains(Duration.ofMinutes(30).toString());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,62 @@
|
||||||
|
package acceptance.jobs.helper;
|
||||||
|
|
||||||
|
import static org.assertj.core.api.Assertions.assertThat;
|
||||||
|
import static org.assertj.core.api.Assertions.assertThatThrownBy;
|
||||||
|
|
||||||
|
import acceptance.AbstractAccTest;
|
||||||
|
import java.sql.PreparedStatement;
|
||||||
|
import java.sql.ResultSet;
|
||||||
|
import java.sql.SQLException;
|
||||||
|
import org.junit.jupiter.api.BeforeEach;
|
||||||
|
import org.junit.jupiter.api.Test;
|
||||||
|
import org.junit.jupiter.api.extension.ExtendWith;
|
||||||
|
|
||||||
|
import pro.taskana.common.api.exceptions.SystemException;
|
||||||
|
import pro.taskana.common.test.security.JaasExtension;
|
||||||
|
import pro.taskana.task.internal.jobs.helper.SqlConnectionRunner;
|
||||||
|
|
||||||
|
/** Acceptance test for all "jobs tasks runner" scenarios. */
|
||||||
|
@ExtendWith(JaasExtension.class)
|
||||||
|
class SqlConnectionRunnerAccTest extends AbstractAccTest {
|
||||||
|
|
||||||
|
@BeforeEach
|
||||||
|
void before() throws Exception {
|
||||||
|
// required if single tests modify database
|
||||||
|
// TODO split test class into readOnly & modifying tests to improve performance
|
||||||
|
resetDb(false);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void should_executeSimpleQuery() {
|
||||||
|
// given
|
||||||
|
SqlConnectionRunner runner = new SqlConnectionRunner(taskanaEngine);
|
||||||
|
String taskId = "TKI:000000000000000000000000000000000050";
|
||||||
|
|
||||||
|
// when
|
||||||
|
runner.runWithConnection(
|
||||||
|
connection -> {
|
||||||
|
PreparedStatement preparedStatement =
|
||||||
|
connection.prepareStatement("select * from TASK where ID = ?");
|
||||||
|
preparedStatement.setString(1, taskId);
|
||||||
|
final ResultSet resultSet = preparedStatement.executeQuery();
|
||||||
|
|
||||||
|
// then
|
||||||
|
assertThat(resultSet.next()).isTrue();
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void should_catchSqlExceptionAndThrowSystemException() {
|
||||||
|
// given
|
||||||
|
SqlConnectionRunner runner = new SqlConnectionRunner(taskanaEngine);
|
||||||
|
|
||||||
|
// when
|
||||||
|
assertThatThrownBy(
|
||||||
|
() ->
|
||||||
|
runner.runWithConnection(
|
||||||
|
connection -> {
|
||||||
|
throw new SQLException("test");
|
||||||
|
}))
|
||||||
|
.isInstanceOf(SystemException.class);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,51 @@
|
||||||
|
package acceptance.jobs.helper;
|
||||||
|
|
||||||
|
import static org.assertj.core.api.Assertions.assertThat;
|
||||||
|
|
||||||
|
import acceptance.AbstractAccTest;
|
||||||
|
import org.junit.jupiter.api.BeforeEach;
|
||||||
|
import org.junit.jupiter.api.Test;
|
||||||
|
import org.junit.jupiter.api.extension.ExtendWith;
|
||||||
|
|
||||||
|
import pro.taskana.common.api.exceptions.NotAuthorizedException;
|
||||||
|
import pro.taskana.common.test.security.JaasExtension;
|
||||||
|
import pro.taskana.common.test.security.WithAccessId;
|
||||||
|
import pro.taskana.task.api.exceptions.TaskNotFoundException;
|
||||||
|
import pro.taskana.task.api.models.Task;
|
||||||
|
import pro.taskana.task.internal.jobs.helper.SqlConnectionRunner;
|
||||||
|
import pro.taskana.task.internal.jobs.helper.TaskUpdatePriorityBatchStatement;
|
||||||
|
|
||||||
|
/** Acceptance test for all "jobs tasks runner" scenarios. */
|
||||||
|
@ExtendWith(JaasExtension.class)
|
||||||
|
class TaskUpdatePriorityBatchStatementAccTest extends AbstractAccTest {
|
||||||
|
|
||||||
|
@BeforeEach
|
||||||
|
void before() throws Exception {
|
||||||
|
// required if single tests modify database
|
||||||
|
// TODO split test class into readOnly & modifying tests to improve performance
|
||||||
|
resetDb(true);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
@WithAccessId(user = "admin")
|
||||||
|
void should_updatePriority() throws TaskNotFoundException, NotAuthorizedException {
|
||||||
|
// given
|
||||||
|
SqlConnectionRunner runner = new SqlConnectionRunner(taskanaEngine);
|
||||||
|
String taskId = "TKI:000000000000000000000000000000000050";
|
||||||
|
final int priorityUpdate = 25;
|
||||||
|
|
||||||
|
// when
|
||||||
|
runner.runWithConnection(
|
||||||
|
connection -> {
|
||||||
|
final TaskUpdatePriorityBatchStatement batchStatement =
|
||||||
|
new TaskUpdatePriorityBatchStatement(connection);
|
||||||
|
batchStatement.addPriorityUpdate(taskId, priorityUpdate);
|
||||||
|
batchStatement.executeBatch();
|
||||||
|
connection.commit();
|
||||||
|
});
|
||||||
|
|
||||||
|
// then
|
||||||
|
final Task actual = taskanaEngine.getTaskService().getTask(taskId);
|
||||||
|
assertThat(actual).extracting(Task::getPriority).isEqualTo(priorityUpdate);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,134 @@
|
||||||
|
package acceptance.jobs.helper;
|
||||||
|
|
||||||
|
import static org.assertj.core.api.Assertions.assertThat;
|
||||||
|
import static org.assertj.core.api.Assumptions.assumeThat;
|
||||||
|
|
||||||
|
import acceptance.AbstractAccTest;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.function.Predicate;
|
||||||
|
import org.junit.jupiter.api.BeforeEach;
|
||||||
|
import org.junit.jupiter.api.Test;
|
||||||
|
import org.junit.jupiter.api.extension.ExtendWith;
|
||||||
|
|
||||||
|
import pro.taskana.common.api.exceptions.NotAuthorizedException;
|
||||||
|
import pro.taskana.common.test.security.JaasExtension;
|
||||||
|
import pro.taskana.common.test.security.WithAccessId;
|
||||||
|
import pro.taskana.spi.priority.internal.PriorityServiceManager;
|
||||||
|
import pro.taskana.task.api.exceptions.TaskNotFoundException;
|
||||||
|
import pro.taskana.task.api.models.Task;
|
||||||
|
import pro.taskana.task.api.models.TaskSummary;
|
||||||
|
import pro.taskana.task.internal.jobs.helper.TaskUpdatePriorityWorker;
|
||||||
|
import pro.taskana.task.internal.models.TaskImpl;
|
||||||
|
|
||||||
|
/** Acceptance test for all "jobs tasks runner" scenarios. */
|
||||||
|
@ExtendWith(JaasExtension.class)
|
||||||
|
class TaskUpdatePriorityWorkerAccTest extends AbstractAccTest {
|
||||||
|
|
||||||
|
@BeforeEach
|
||||||
|
void before() throws Exception {
|
||||||
|
// required if single tests modify database
|
||||||
|
// TODO split test class into readOnly & modifying tests to improve performance
|
||||||
|
resetDb(true);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
@WithAccessId(user = "admin")
|
||||||
|
void should_loadAnyRelevantTaskId() {
|
||||||
|
// given
|
||||||
|
TaskUpdatePriorityWorker worker = new TaskUpdatePriorityWorker(taskanaEngine);
|
||||||
|
|
||||||
|
// when
|
||||||
|
final List<String> allRelevantTaskIds = worker.getAllRelevantTaskIds();
|
||||||
|
|
||||||
|
// then
|
||||||
|
assertThat(allRelevantTaskIds).hasSizeGreaterThan(0);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
@WithAccessId(user = "admin")
|
||||||
|
void should_loadExistingTaskIds() {
|
||||||
|
// given
|
||||||
|
TaskUpdatePriorityWorker worker = new TaskUpdatePriorityWorker(taskanaEngine);
|
||||||
|
final List<String> allRelevantTaskIds = worker.getAllRelevantTaskIds();
|
||||||
|
final String foundTaskId = allRelevantTaskIds.get(0);
|
||||||
|
|
||||||
|
// when
|
||||||
|
final List<TaskSummary> taskSummariesByIds = worker.getTaskSummariesByIds(List.of(foundTaskId));
|
||||||
|
|
||||||
|
// then
|
||||||
|
assertThat(taskSummariesByIds)
|
||||||
|
.hasSize(1)
|
||||||
|
.extracting(TaskSummary::getId)
|
||||||
|
.containsExactly(foundTaskId);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
@WithAccessId(user = "admin")
|
||||||
|
void should_notLoadAnyIrrelevantTaskIds() {
|
||||||
|
// given
|
||||||
|
TaskUpdatePriorityWorker worker = new TaskUpdatePriorityWorker(taskanaEngine);
|
||||||
|
String completedTaskId = "TKI:000000000000000000000000000000000038";
|
||||||
|
|
||||||
|
// when
|
||||||
|
final List<String> allRelevantTaskIds = worker.getAllRelevantTaskIds();
|
||||||
|
|
||||||
|
// then
|
||||||
|
assertThat(allRelevantTaskIds).isNotEmpty().doesNotContain(completedTaskId);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
@WithAccessId(user = "admin")
|
||||||
|
void should_loadExistingTaskSummariesById() {
|
||||||
|
// given
|
||||||
|
TaskUpdatePriorityWorker worker = new TaskUpdatePriorityWorker(taskanaEngine);
|
||||||
|
String taskId1 = "TKI:000000000000000000000000000000000050";
|
||||||
|
String taskId2 = "TKI:000000000000000000000000000000000051";
|
||||||
|
|
||||||
|
// when
|
||||||
|
final List<TaskSummary> taskSummariesByIds =
|
||||||
|
worker.getTaskSummariesByIds(List.of(taskId1, taskId2));
|
||||||
|
|
||||||
|
// then
|
||||||
|
assertThat(taskSummariesByIds)
|
||||||
|
.hasSizeGreaterThan(0)
|
||||||
|
.extracting(TaskSummary::getId)
|
||||||
|
.containsExactlyInAnyOrder(taskId1, taskId2);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
@WithAccessId(user = "admin")
|
||||||
|
void should_executeBatch() throws TaskNotFoundException, NotAuthorizedException {
|
||||||
|
// given
|
||||||
|
TaskUpdatePriorityWorker worker = new TaskUpdatePriorityWorker(taskanaEngine);
|
||||||
|
final List<String> allRelevantTaskIds = worker.getAllRelevantTaskIds();
|
||||||
|
String taskId = "TKI:000000000000000000000000000000000050";
|
||||||
|
Task taskOld = taskanaEngine.getTaskService().getTask(taskId);
|
||||||
|
|
||||||
|
// when
|
||||||
|
final List<String> updatedTaskIds = worker.executeBatch(allRelevantTaskIds);
|
||||||
|
|
||||||
|
// then
|
||||||
|
final Task taskUpdated = taskanaEngine.getTaskService().getTask(taskId);
|
||||||
|
|
||||||
|
assumeThat(PriorityServiceManager.getInstance().countRegisteredServices())
|
||||||
|
.describedAs("SPI should be provided in order to check for modified priorities.")
|
||||||
|
.isPositive();
|
||||||
|
|
||||||
|
assertThat(updatedTaskIds).contains(taskId);
|
||||||
|
assertThat(taskUpdated.getPriority()).isNotEqualTo(taskOld.getPriority());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void should_noticeDifferentPriority_When_PriorityHasChanged() {
|
||||||
|
// given
|
||||||
|
final TaskImpl task = (TaskImpl) taskanaEngine.getTaskService().newTask();
|
||||||
|
task.setPriority(232);
|
||||||
|
|
||||||
|
// when
|
||||||
|
final Predicate<Integer> differentPriority =
|
||||||
|
TaskUpdatePriorityWorker.hasDifferentPriority(task);
|
||||||
|
|
||||||
|
// then
|
||||||
|
assertThat(differentPriority).rejects(232).accepts(2, 3, 4, 5);
|
||||||
|
}
|
||||||
|
}
|
||||||
Loading…
Reference in New Issue