Skip to content

Commit 7847821

Browse files
authored
Merge pull request #10 from sterlp/v1.6
V1.6
2 parents 06bfe8e + 12a32cc commit 7847821

File tree

81 files changed

+1574
-614
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

81 files changed

+1574
-614
lines changed

CHANGELOG.md

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,16 @@
11
# Changelog
22

3+
## v1.6
4+
5+
- Running triggers can be canceled now
6+
- Running triggers can be failed now
7+
- https://github.com/sterlp/spring-persistent-tasks/wiki/Cancel-a-task-trigger
8+
- Triggers have now correlationId to collect them
9+
- Added Re-Queue / Re-Run trigger to history page
10+
- Correlation Id is shown in the UI
11+
- ID search includes also Correlation Id
12+
- Moved helper classes to own test jar
13+
314
## v1.5.6 - (2025-03-06)
415

516
- Better ID search

RUN_AND_BUILD.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
mvn versions:display-dependency-updates
2-
mvn versions:set -DnewVersion=1.5.5 -DgenerateBackupPoms=false
2+
mvn versions:set -DnewVersion=1.6.0 -DgenerateBackupPoms=false
33
git tag -a v1.5.3 -m "v1.5.3 release"
4-
mvn versions:set -DnewVersion=1.5.4-SNAPSHOT -DgenerateBackupPoms=false
4+
mvn versions:set -DnewVersion=1.6.0-SNAPSHOT -DgenerateBackupPoms=false
55

66
## postgres
77

core/pom.xml

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
<parent>
77
<groupId>org.sterl.spring</groupId>
88
<artifactId>spring-persistent-tasks-root</artifactId>
9-
<version>1.5.7-SNAPSHOT</version>
9+
<version>1.6.0-SNAPSHOT</version>
1010
<relativePath>../pom.xml</relativePath>
1111
</parent>
1212

@@ -33,6 +33,10 @@
3333
<groupId>org.apache.commons</groupId>
3434
<artifactId>commons-lang3</artifactId>
3535
</dependency>
36+
<dependency>
37+
<groupId>org.liquibase</groupId>
38+
<artifactId>liquibase-core</artifactId>
39+
</dependency>
3640

3741
<dependency>
3842
<groupId>org.springframework.boot</groupId>
@@ -73,17 +77,18 @@
7377
<artifactId>spring-boot-starter-test</artifactId>
7478
<scope>test</scope>
7579
</dependency>
76-
<dependency>
77-
<groupId>uk.co.jemos.podam</groupId>
78-
<artifactId>podam</artifactId>
79-
<scope>test</scope>
80-
</dependency>
8180

8281
<dependency>
8382
<groupId>com.microsoft.sqlserver</groupId>
8483
<artifactId>mssql-jdbc</artifactId>
8584
<scope>test</scope>
8685
</dependency>
86+
<dependency>
87+
<groupId>org.sterl.test</groupId>
88+
<artifactId>hibernate-asserts</artifactId>
89+
<version>1.0.0</version>
90+
<scope>test</scope>
91+
</dependency>
8792
<dependency>
8893
<groupId>org.postgresql</groupId>
8994
<artifactId>postgresql</artifactId>

core/src/main/java/org/sterl/spring/persistent_tasks/PersistentTaskService.java

Lines changed: 38 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,9 @@
33
import java.io.Serializable;
44
import java.util.ArrayList;
55
import java.util.Collection;
6+
import java.util.Collections;
67
import java.util.List;
78
import java.util.Optional;
8-
import java.util.concurrent.ExecutionException;
9-
import java.util.concurrent.Future;
109

1110
import org.springframework.context.event.EventListener;
1211
import org.springframework.data.domain.Pageable;
@@ -17,13 +16,13 @@
1716
import org.sterl.spring.persistent_tasks.api.TriggerKey;
1817
import org.sterl.spring.persistent_tasks.api.event.TriggerTaskCommand;
1918
import org.sterl.spring.persistent_tasks.history.HistoryService;
19+
import org.sterl.spring.persistent_tasks.history.model.TriggerHistoryLastStateEntity;
2020
import org.sterl.spring.persistent_tasks.scheduler.SchedulerService;
2121
import org.sterl.spring.persistent_tasks.shared.model.TriggerData;
2222
import org.sterl.spring.persistent_tasks.trigger.TriggerService;
2323
import org.sterl.spring.persistent_tasks.trigger.model.TriggerEntity;
2424

2525
import lombok.RequiredArgsConstructor;
26-
import lombok.SneakyThrows;
2726

2827
/**
2928
* Abstraction to {@link SchedulerService} or {@link TriggerService}
@@ -34,37 +33,40 @@
3433
public class PersistentTaskService {
3534

3635
private final Optional<SchedulerService> schedulerService;
37-
private final List<SchedulerService> schedulers;
3836
private final TriggerService triggerService;
3937
private final HistoryService historyService;
40-
38+
4139
/**
4240
* Returns the last known {@link TriggerData} to a given key. First running triggers are checked.
4341
* Maybe out of the history event from a retry execution of the very same id.
44-
*
42+
*
4543
* @param key the {@link TriggerKey} to look for
4644
* @return the {@link TriggerData} to the {@link TriggerKey}
4745
*/
4846
public Optional<TriggerData> getLastTriggerData(TriggerKey key) {
4947
final Optional<TriggerEntity> trigger = triggerService.get(key);
5048
if (trigger.isEmpty()) {
5149
var history = historyService.findLastKnownStatus(key);
52-
if (history.isPresent()) return Optional.ofNullable(history.get().getData());
50+
if (history.isPresent()) {
51+
return Optional.ofNullable(history.get().getData());
52+
}
5353
return Optional.empty();
5454
} else {
5555
return Optional.ofNullable(trigger.get().getData());
5656
}
5757
}
58-
58+
5959
public Optional<TriggerData> getLastDetailData(TriggerKey key) {
6060
var data = historyService.findAllDetailsForKey(key, Pageable.ofSize(1));
61-
if (data.isEmpty()) return Optional.empty();
61+
if (data.isEmpty()) {
62+
return Optional.empty();
63+
}
6264
return Optional.of(data.getContent().get(0).getData());
6365
}
64-
66+
6567
@EventListener
6668
void queue(TriggerTaskCommand<? extends Serializable> event) {
67-
if (event.triggers().size() == 1) {
69+
if (event.size() == 1) {
6870
runOrQueue(event.triggers().iterator().next());
6971
} else {
7072
queue(event.triggers());
@@ -73,22 +75,26 @@ void queue(TriggerTaskCommand<? extends Serializable> event) {
7375

7476
/**
7577
* Queues/updates the given triggers, if the {@link TriggerKey} is already present
76-
*
78+
*
7779
* @param <T> the state type
7880
* @param triggers the triggers to add
7981
* @return the {@link TriggerKey}
8082
*/
8183
@Transactional(timeout = 10)
8284
@NonNull
8385
public <T extends Serializable> List<TriggerKey> queue(Collection<AddTriggerRequest<T>> triggers) {
86+
if (triggers == null || triggers.isEmpty()) {
87+
return Collections.emptyList();
88+
}
89+
8490
return triggers.stream() //
8591
.map(t -> triggerService.queue(t)) //
8692
.map(TriggerEntity::getKey) //
8793
.toList();
8894
}
8995
/**
9096
* Queues/updates the given trigger, if the {@link TriggerKey} is already present.
91-
*
97+
*
9298
* @param <T> the state type
9399
* @param trigger the trigger to add
94100
* @return the {@link TriggerKey}
@@ -102,7 +108,7 @@ public <T extends Serializable> TriggerKey queue(AddTriggerRequest<T> trigger) {
102108
/**
103109
* Runs the given trigger if a free threads are available
104110
* and the runAt time is not in the future.
105-
* @return the reference to the {@link TriggerKey}
111+
* @return the reference to the {@link TriggerKey}
106112
*/
107113
public <T extends Serializable> TriggerKey runOrQueue(
108114
AddTriggerRequest<T> triggerRequest) {
@@ -113,40 +119,27 @@ public <T extends Serializable> TriggerKey runOrQueue(
113119
}
114120
return triggerRequest.key();
115121
}
116-
117-
/**
118-
* Triggers the execution of all pending triggers.
119-
*
120-
* @return the reference to the {@link TriggerKey} of the running tasks
121-
*/
122-
public List<Future<TriggerKey>> executeTriggers() {
123-
var result = new ArrayList<Future<TriggerKey>>();
124-
for (SchedulerService s : schedulers) {
125-
result.addAll(s.triggerNextTasks());
126-
}
127-
return result;
128-
}
129-
122+
130123
/**
131-
* Triggers the execution of all pending triggers and wait for the result.
124+
* Returns all triggers for a correlationId sorted by the creation time.
125+
* @param correlationId the id to search for
126+
* @return the found {@link TriggerData} sorted by create time ASC
132127
*/
133-
@SneakyThrows
134-
public List<TriggerKey> executeTriggersAndWait() {
135-
final var result = new ArrayList<TriggerKey>();
136-
137-
List<Future<TriggerKey>> triggers;
138-
do {
139-
triggers = executeTriggers();
140-
for (Future<TriggerKey> future : triggers) {
141-
try {
142-
result.add(future.get());
143-
} catch (InterruptedException | ExecutionException e) {
144-
final Throwable cause = e.getCause();
145-
throw cause == null ? e : cause;
146-
}
147-
}
148-
} while (!triggers.isEmpty());
128+
@Transactional(readOnly = true, timeout = 5)
129+
public List<TriggerData> findAllTriggerByCorrelationId(String correlationId) {
130+
131+
var running = triggerService.findTriggerByCorrelationId(correlationId)
132+
.stream().map(TriggerEntity::getData)
133+
.toList();
134+
135+
var done = historyService.findTriggerByCorrelationId(correlationId)
136+
.stream().map(TriggerHistoryLastStateEntity::getData)
137+
.toList();
138+
149139

140+
var result = new ArrayList<TriggerData>(running.size() + done.size());
141+
result.addAll(done);
142+
result.addAll(running);
150143
return result;
151144
}
152145
}

core/src/main/java/org/sterl/spring/persistent_tasks/api/AddTriggerRequest.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22

33
import java.io.Serializable;
44
import java.time.OffsetDateTime;
5+
import java.util.Collection;
6+
import java.util.Collections;
57

68
/**
79
/**
@@ -12,12 +14,17 @@ public record AddTriggerRequest<T extends Serializable>(
1214
TriggerKey key,
1315
T state,
1416
OffsetDateTime runtAt,
15-
int priority) {
17+
int priority,
18+
String correlationId) {
1619

1720
@SuppressWarnings("unchecked")
1821
public TaskId<T> taskId() {
1922
return (TaskId<T>)key.toTaskId();
2023
}
2124

25+
public Collection<AddTriggerRequest<T>> toList() {
26+
return Collections.singleton(this);
27+
}
28+
2229
public static final int DEFAULT_PRIORITY = 4;
2330
}

core/src/main/java/org/sterl/spring/persistent_tasks/api/SpringBeanTask.java

Lines changed: 0 additions & 12 deletions
This file was deleted.

core/src/main/java/org/sterl/spring/persistent_tasks/api/TaskId.java

Lines changed: 36 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -12,16 +12,16 @@
1212
*/
1313
public record TaskId<T extends Serializable>(String name) implements Serializable {
1414

15-
public TaskTriggerBuilder<T> newTrigger() {
16-
return new TaskTriggerBuilder<>(this);
15+
public TriggerBuilder<T> newTrigger() {
16+
return new TriggerBuilder<>(this);
1717
}
1818

19-
public TaskTriggerBuilder<T> newTrigger(T state) {
20-
return new TaskTriggerBuilder<>(this).state(state);
19+
public TriggerBuilder<T> newTrigger(T state) {
20+
return new TriggerBuilder<>(this).state(state);
2121
}
2222

2323
public AddTriggerRequest<T> newUniqueTrigger(T state) {
24-
return new TaskTriggerBuilder<>(this).state(state).build();
24+
return new TriggerBuilder<>(this).state(state).build();
2525
}
2626

2727
public static TaskId<Serializable> of(String taskId) {
@@ -30,46 +30,65 @@ public static TaskId<Serializable> of(String taskId) {
3030
}
3131

3232
@RequiredArgsConstructor(access = AccessLevel.PRIVATE)
33-
public static class TaskTriggerBuilder<T extends Serializable> {
33+
public static class TriggerBuilder<T extends Serializable> {
3434
private final TaskId<T> taskId;
3535
private String id;
36+
private String correlationId;
3637
private T state;
3738
private OffsetDateTime when = OffsetDateTime.now();
3839
private int priority = AddTriggerRequest.DEFAULT_PRIORITY;
3940

40-
public static <T extends Serializable> TaskTriggerBuilder<T> newTrigger(String name) {
41-
return new TaskTriggerBuilder<>(new TaskId<T>(name));
41+
public static <T extends Serializable> TriggerBuilder<T> newTrigger(String name) {
42+
return new TriggerBuilder<>(new TaskId<T>(name));
4243
}
43-
public static <T extends Serializable> TaskTriggerBuilder<T> newTrigger(String name, T state) {
44-
return new TaskTriggerBuilder<>(new TaskId<T>(name)).state(state);
44+
public static <T extends Serializable> TriggerBuilder<T> newTrigger(String name, T state) {
45+
return new TriggerBuilder<>(new TaskId<T>(name)).state(state);
4546
}
4647
public AddTriggerRequest<T> build() {
4748
var key = TriggerKey.of(id, taskId);
48-
return new AddTriggerRequest<>(key, state, when, priority);
49+
return new AddTriggerRequest<>(key, state, when, priority, correlationId);
4950
}
50-
public TaskTriggerBuilder<T> id(String id) {
51+
/**
52+
* The ID of this task, same queued ids are replaced.
53+
*/
54+
public TriggerBuilder<T> id(String id) {
5155
this.id = id;
5256
return this;
5357
}
54-
public TaskTriggerBuilder<T> state(T state) {
58+
/**
59+
* An unique ID which is taken over to a chain/set of tasks.
60+
* If task is triggered it in a task, this ID is taken over.
61+
*/
62+
public TriggerBuilder<T> correlationId(String correlationId) {
63+
this.correlationId = correlationId;
64+
return this;
65+
}
66+
public TriggerBuilder<T> state(T state) {
5567
this.state = state;
5668
return this;
5769
}
58-
public TaskTriggerBuilder<T> priority(int priority) {
70+
/**
71+
* The higher the {@link #priority} the earlier this task is picked.
72+
* Same as JMS priority. Default is also 4, like in JMS.
73+
*
74+
* @param priority custom priority e.g. 0-9, also higher numbers are supported
75+
* @return this {@link TriggerBuilder}
76+
*/
77+
public TriggerBuilder<T> priority(int priority) {
5978
this.priority = priority;
6079
return this;
6180
}
6281
/**
6382
* synonym for {@link #runAt(OffsetDateTime)}
6483
*/
65-
public TaskTriggerBuilder<T> when(OffsetDateTime when) {
84+
public TriggerBuilder<T> when(OffsetDateTime when) {
6685
return runAt(when);
6786
}
68-
public TaskTriggerBuilder<T> runAt(OffsetDateTime when) {
87+
public TriggerBuilder<T> runAt(OffsetDateTime when) {
6988
this.when = when;
7089
return this;
7190
}
72-
public TaskTriggerBuilder<T> runAfter(Duration duration) {
91+
public TriggerBuilder<T> runAfter(Duration duration) {
7392
runAt(OffsetDateTime.now().plus(duration));
7493
return this;
7594
}

core/src/main/java/org/sterl/spring/persistent_tasks/api/Trigger.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@ public class Trigger {
1515
/** the business key which is unique it is combination for triggers but not the history! */
1616
private TriggerKey key;
1717

18+
private String correlationId;
19+
1820
private String runningOn;
1921

2022
private OffsetDateTime createdTime = OffsetDateTime.now();

0 commit comments

Comments
 (0)