Skip to content

Commit 4afd5d8

Browse files
committed
Allow reusing CompositeItemReader
Before this commit, CompositeItemReader cannot be reused for open and close. Fix GH-4926 Signed-off-by: Yanming Zhou <zhouyanming@gmail.com>
1 parent 644d7e6 commit 4afd5d8

File tree

2 files changed

+82
-11
lines changed

2 files changed

+82
-11
lines changed

spring-batch-infrastructure/src/main/java/org/springframework/batch/infrastructure/item/support/CompositeItemReader.java

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -31,14 +31,15 @@
3131
*
3232
* @author Mahmoud Ben Hassine
3333
* @author Elimelec Burghelea
34+
* @author Yanming Zhou
3435
* @param <T> type of objects to read
3536
* @since 5.2
3637
*/
3738
public class CompositeItemReader<T> implements ItemStreamReader<T> {
3839

3940
private final List<ItemStreamReader<? extends T>> delegates;
4041

41-
private final Iterator<ItemStreamReader<? extends T>> delegatesIterator;
42+
private @Nullable Iterator<ItemStreamReader<? extends T>> delegatesIterator;
4243

4344
private @Nullable ItemStreamReader<? extends T> currentDelegate;
4445

@@ -48,27 +49,29 @@ public class CompositeItemReader<T> implements ItemStreamReader<T> {
4849
*/
4950
public CompositeItemReader(List<ItemStreamReader<? extends T>> delegates) {
5051
this.delegates = delegates;
51-
this.delegatesIterator = this.delegates.iterator();
52-
this.currentDelegate = this.delegatesIterator.hasNext() ? this.delegatesIterator.next() : null;
5352
}
5453

5554
// TODO: check if we need to open/close delegates on the fly in read() to avoid
5655
// opening resources early for a long time
5756
@Override
5857
public void open(ExecutionContext executionContext) throws ItemStreamException {
59-
for (ItemStreamReader<? extends T> delegate : delegates) {
58+
59+
this.delegatesIterator = this.delegates.iterator();
60+
this.currentDelegate = this.delegatesIterator.hasNext() ? this.delegatesIterator.next() : null;
61+
62+
for (ItemStreamReader<? extends T> delegate : this.delegates) {
6063
delegate.open(executionContext);
6164
}
6265
}
6366

6467
@Override
6568
public @Nullable T read() throws Exception {
66-
if (this.currentDelegate == null) {
69+
if (this.currentDelegate == null || this.delegatesIterator == null) {
6770
return null;
6871
}
69-
T item = currentDelegate.read();
72+
T item = this.currentDelegate.read();
7073
if (item == null) {
71-
currentDelegate = this.delegatesIterator.hasNext() ? this.delegatesIterator.next() : null;
74+
this.currentDelegate = this.delegatesIterator.hasNext() ? this.delegatesIterator.next() : null;
7275
return read();
7376
}
7477
return item;
@@ -89,9 +92,13 @@ public void update(ExecutionContext executionContext) throws ItemStreamException
8992
*/
9093
@Override
9194
public void close() throws ItemStreamException {
95+
96+
this.delegatesIterator = null;
97+
this.currentDelegate = null;
98+
9299
List<Exception> exceptions = new ArrayList<>();
93100

94-
for (ItemStreamReader<? extends T> delegate : delegates) {
101+
for (ItemStreamReader<? extends T> delegate : this.delegates) {
95102
try {
96103
delegate.close();
97104
}

spring-batch-infrastructure/src/test/java/org/springframework/batch/infrastructure/item/support/CompositeItemReaderTests.java

Lines changed: 67 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,20 +23,21 @@
2323
import org.springframework.batch.infrastructure.item.ExecutionContext;
2424
import org.springframework.batch.infrastructure.item.ItemStreamException;
2525
import org.springframework.batch.infrastructure.item.ItemStreamReader;
26-
import org.springframework.batch.infrastructure.item.support.CompositeItemReader;
2726

27+
import static org.junit.jupiter.api.Assertions.assertEquals;
28+
import static org.junit.jupiter.api.Assertions.assertNull;
2829
import static org.mockito.Mockito.doThrow;
2930
import static org.mockito.Mockito.mock;
3031
import static org.mockito.Mockito.times;
3132
import static org.mockito.Mockito.verify;
32-
import static org.mockito.Mockito.verifyNoInteractions;
3333
import static org.mockito.Mockito.when;
3434

3535
/**
3636
* Test class for {@link CompositeItemReader}.
3737
*
3838
* @author Mahmoud Ben Hassine
3939
* @author Elimelec Burghelea
40+
* @author Yanming Zhou
4041
*/
4142
public class CompositeItemReaderTests {
4243

@@ -62,6 +63,7 @@ void testCompositeItemReaderRead() throws Exception {
6263
ItemStreamReader<String> reader1 = mock();
6364
ItemStreamReader<String> reader2 = mock();
6465
CompositeItemReader<String> compositeItemReader = new CompositeItemReader<>(Arrays.asList(reader1, reader2));
66+
compositeItemReader.open(new ExecutionContext());
6567
when(reader1.read()).thenReturn("foo1", "foo2", null);
6668
when(reader2.read()).thenReturn("bar1", "bar2", null);
6769

@@ -88,13 +90,15 @@ void testCompositeItemReaderUpdate() {
8890
ItemStreamReader<String> reader2 = mock();
8991
CompositeItemReader<String> compositeItemReader = new CompositeItemReader<>(Arrays.asList(reader1, reader2));
9092
ExecutionContext executionContext = new ExecutionContext();
93+
compositeItemReader.open(executionContext);
9194

9295
// when
9396
compositeItemReader.update(executionContext);
9497

9598
// then
9699
verify(reader1).update(executionContext);
97-
verifyNoInteractions(reader2); // reader1 is the current delegate in this setup
100+
verify(reader2, times(0)).update(executionContext); // reader1 is the current
101+
// delegate in this setup
98102
}
99103

100104
@Test
@@ -103,6 +107,7 @@ void testCompositeItemReaderClose() {
103107
ItemStreamReader<String> reader1 = mock();
104108
ItemStreamReader<String> reader2 = mock();
105109
CompositeItemReader<String> compositeItemReader = new CompositeItemReader<>(Arrays.asList(reader1, reader2));
110+
compositeItemReader.open(new ExecutionContext());
106111

107112
// when
108113
compositeItemReader.close();
@@ -118,6 +123,7 @@ void testCompositeItemReaderCloseWithDelegateThatThrowsException() {
118123
ItemStreamReader<String> reader1 = mock();
119124
ItemStreamReader<String> reader2 = mock();
120125
CompositeItemReader<String> compositeItemReader = new CompositeItemReader<>(Arrays.asList(reader1, reader2));
126+
compositeItemReader.open(new ExecutionContext());
121127

122128
doThrow(new ItemStreamException("A failure")).when(reader1).close();
123129

@@ -135,4 +141,62 @@ void testCompositeItemReaderCloseWithDelegateThatThrowsException() {
135141
verify(reader2).close();
136142
}
137143

144+
@Test
145+
void testCompositeItemReaderRepeatableRead() throws Exception {
146+
// given
147+
ItemStreamReader<String> reader1 = new ItemStreamReader<>() {
148+
int counter = 0;
149+
150+
@Override
151+
public String read() {
152+
return switch (this.counter++) {
153+
case 0 -> "a";
154+
case 1 -> "b";
155+
default -> null;
156+
};
157+
}
158+
159+
@Override
160+
public void close() {
161+
this.counter = 0;
162+
}
163+
};
164+
ItemStreamReader<String> reader2 = new ItemStreamReader<>() {
165+
int counter = 0;
166+
167+
@Override
168+
public String read() {
169+
return switch (this.counter++) {
170+
case 0 -> "c";
171+
case 1 -> "d";
172+
default -> null;
173+
};
174+
}
175+
176+
@Override
177+
public void close() {
178+
this.counter = 0;
179+
}
180+
};
181+
CompositeItemReader<String> compositeItemReader = new CompositeItemReader<>(Arrays.asList(reader1, reader2));
182+
183+
for (int i = 0; i < 5; i++) {
184+
verifyRead(compositeItemReader);
185+
}
186+
}
187+
188+
private void verifyRead(CompositeItemReader<String> compositeItemReader) throws Exception {
189+
// when
190+
compositeItemReader.open(new ExecutionContext());
191+
192+
// then
193+
assertEquals("a", compositeItemReader.read());
194+
assertEquals("b", compositeItemReader.read());
195+
assertEquals("c", compositeItemReader.read());
196+
assertEquals("d", compositeItemReader.read());
197+
assertNull(compositeItemReader.read());
198+
199+
compositeItemReader.close();
200+
}
201+
138202
}

0 commit comments

Comments
 (0)