Skip to content

Commit 644d7e6

Browse files
KILL9-NO-MERCYfmbenhassine
authored andcommitted
Fix optimistic locking and add interruption check in ChunkOrientedStep
When JobOperator.stop() is called, the StepExecution version could become out of sync between the stopping thread and the executing step thread, causing OptimisticLockingFailureException. This commit synchronizes the StepExecution version by fetching the latest state from the batch metadata before update. If the recent StepExecution is in STOPPED status, it upgrades the current StepExecution's status and version to match the database state, preventing version conflicts. Additionally, added JobRepository.update() call in ChunkOrientedStep after each chunk to match TaskletStep behavior. This ensures both step implementations properly check for job interruption signals by synchronizing with the latest StepExecution status from the database. Move stepExecution.update() outside chunk transaction to prevent version mismatch when transaction rolls back. Problem: - Chunk transaction rollback reverts DB version but leaves in-memory version incremented - Next update() causes OptimisticLockingFailureException Solution: - Call getJobRepository().update(stepExecution) after transaction completes, outside transactionTemplate.executeWithoutResult() - On success: update() is called with synchronized versions - On failure: update() is skipped, preventing version conflict Fixes #5120 Fixes #5114 Signed-off-by: kill9-no-mercy <kill9.no.mercy@gmail.com>
1 parent 98c10cd commit 644d7e6

File tree

4 files changed

+24
-18
lines changed

4 files changed

+24
-18
lines changed

spring-batch-core/src/main/java/org/springframework/batch/core/job/JobExecution.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -213,6 +213,14 @@ public boolean isStopping() {
213213
return status == BatchStatus.STOPPING;
214214
}
215215

216+
/**
217+
* Test if this {@link StepExecution} indicates that it has been stopped.
218+
* @return {@code true} if the status is {@link BatchStatus#STOPPED}.
219+
*/
220+
public boolean isStopped() {
221+
return status == BatchStatus.STOPPED;
222+
}
223+
216224
/**
217225
* Sets the {@link ExecutionContext} for this execution.
218226
* @param executionContext The context.

spring-batch-core/src/main/java/org/springframework/batch/core/repository/support/ResourcelessJobRepository.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -328,7 +328,7 @@ public long getStepExecutionCount(JobInstance jobInstance, String stepName) {
328328
@Override
329329
public void update(StepExecution stepExecution) {
330330
stepExecution.setLastUpdated(LocalDateTime.now());
331-
if (this.jobExecution != null && this.jobExecution.isStopping()) {
331+
if (this.jobExecution != null && this.jobExecution.isStopped()) {
332332
stepExecution.setTerminateOnly();
333333
}
334334
}

spring-batch-core/src/main/java/org/springframework/batch/core/repository/support/SimpleJobRepository.java

Lines changed: 13 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -158,8 +158,20 @@ public void update(StepExecution stepExecution) {
158158
Assert.notNull(stepExecution.getId(), "StepExecution must already be saved (have an id assigned)");
159159

160160
stepExecution.setLastUpdated(LocalDateTime.now());
161+
162+
StepExecution latestStepExecution = getStepExecution(stepExecution.getId());
163+
Assert.state(latestStepExecution != null,
164+
"StepExecution with id " + stepExecution.getId() + "not found. Batch metadata state may be corrupted.");
165+
166+
if (latestStepExecution.getJobExecution().isStopped()) {
167+
Integer version = latestStepExecution.getVersion();
168+
if (version != null) {
169+
stepExecution.setVersion(version);
170+
}
171+
stepExecution.setTerminateOnly();
172+
}
173+
161174
stepExecutionDao.updateStepExecution(stepExecution);
162-
checkForInterruption(stepExecution);
163175
}
164176

165177
private void validateStepExecution(StepExecution stepExecution) {
@@ -180,22 +192,6 @@ public void updateExecutionContext(JobExecution jobExecution) {
180192
ecDao.updateExecutionContext(jobExecution);
181193
}
182194

183-
/**
184-
* Check to determine whether or not the JobExecution that is the parent of the
185-
* provided StepExecution has been interrupted. If, after synchronizing the status
186-
* with the database, the status has been updated to STOPPING, then the job has been
187-
* interrupted.
188-
* @param stepExecution the step execution
189-
*/
190-
private void checkForInterruption(StepExecution stepExecution) {
191-
JobExecution jobExecution = stepExecution.getJobExecution();
192-
jobExecutionDao.synchronizeStatus(jobExecution);
193-
if (jobExecution.isStopping()) {
194-
logger.info("Parent JobExecution is stopped, so passing message on to StepExecution");
195-
stepExecution.setTerminateOnly();
196-
}
197-
}
198-
199195
@Override
200196
public void deleteStepExecution(StepExecution stepExecution) {
201197
this.ecDao.deleteExecutionContext(stepExecution);

spring-batch-core/src/main/java/org/springframework/batch/core/step/item/ChunkOrientedStep.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -373,6 +373,8 @@ protected void doExecute(StepExecution stepExecution) throws Exception {
373373
? BatchMetrics.STATUS_ROLLED_BACK : BatchMetrics.STATUS_COMMITTED;
374374
chunkTransactionEvent.commit();
375375
});
376+
377+
getJobRepository().update(stepExecution);
376378
}
377379
}
378380

0 commit comments

Comments
 (0)