Skip to content
This repository was archived by the owner on May 14, 2025. It is now read-only.

Commit 9dbb39f

Browse files
authored
Add deploy props param to "create stream" REST API (#5143)
Fixes #5043
1 parent 00aaf2a commit 9dbb39f

File tree

5 files changed

+153
-38
lines changed

5 files changed

+153
-38
lines changed

spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/controller/StreamDefinitionController.java

Lines changed: 44 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import java.util.Collections;
2121
import java.util.LinkedList;
2222
import java.util.List;
23+
import java.util.Map;
2324

2425
import org.slf4j.Logger;
2526
import org.slf4j.LoggerFactory;
@@ -44,8 +45,10 @@
4445
import org.springframework.hateoas.server.ExposesResourceFor;
4546
import org.springframework.hateoas.server.RepresentationModelAssembler;
4647
import org.springframework.http.HttpStatus;
48+
import org.springframework.http.MediaType;
4749
import org.springframework.util.Assert;
4850
import org.springframework.web.bind.annotation.PathVariable;
51+
import org.springframework.web.bind.annotation.RequestBody;
4952
import org.springframework.web.bind.annotation.RequestMapping;
5053
import org.springframework.web.bind.annotation.RequestMethod;
5154
import org.springframework.web.bind.annotation.RequestParam;
@@ -129,7 +132,10 @@ public PagedModel<? extends StreamDefinitionResource> list(Pageable pageable,
129132
}
130133

131134
/**
132-
* Create a new stream.
135+
* Create a new stream and optionally deploy it.
136+
* <p>
137+
* Differs from {@link #saveWithDeployProps} by accepting deployment properties and consuming
138+
* {@link MediaType#APPLICATION_FORM_URLENCODED} request content (required by the Dataflow Shell).
133139
*
134140
* @param name stream name
135141
* @param dsl DSL definition for stream
@@ -139,15 +145,46 @@ public PagedModel<? extends StreamDefinitionResource> list(Pageable pageable,
139145
* @return the created stream definition
140146
* @throws DuplicateStreamDefinitionException if a stream definition with the same name
141147
* already exists
142-
* @throws InvalidStreamDefinitionException if there errors in parsing the stream DSL,
148+
* @throws InvalidStreamDefinitionException if there are errors parsing the stream DSL,
143149
* resolving the name, or type of applications in the stream
144150
*/
145-
@RequestMapping(value = "", method = RequestMethod.POST)
151+
@RequestMapping(value = "", method = RequestMethod.POST, consumes = MediaType.APPLICATION_FORM_URLENCODED_VALUE)
146152
@ResponseStatus(HttpStatus.CREATED)
147-
public StreamDefinitionResource save(@RequestParam("name") String name, @RequestParam("definition") String dsl,
148-
@RequestParam(value = "description", defaultValue = "") String description,
149-
@RequestParam(value = "deploy", defaultValue = "false") boolean deploy) {
150-
StreamDefinition streamDefinition = this.streamService.createStream(name, dsl, description, deploy);
153+
public StreamDefinitionResource save(@RequestParam("name") String name,
154+
@RequestParam("definition") String dsl,
155+
@RequestParam(value = "description", defaultValue = "") String description,
156+
@RequestParam(value = "deploy", defaultValue = "false") boolean deploy) {
157+
StreamDefinition streamDefinition = this.streamService.createStream(name, dsl, description, deploy, null);
158+
return ((RepresentationModelAssembler<StreamDefinition, ? extends StreamDefinitionResource>)
159+
this.streamDefinitionAssemblerProvider.getStreamDefinitionAssembler(Collections.singletonList(streamDefinition))).toModel(streamDefinition);
160+
}
161+
162+
/**
163+
* Create a new stream and optionally deploy it.
164+
* <p>
165+
* Differs from {@link #save} by accepting deployment properties and consuming
166+
* {@link MediaType#APPLICATION_JSON} request content.
167+
*
168+
* @param name stream name
169+
* @param dsl DSL definition for stream
170+
* @param deploy if {@code true}, the stream is deployed upon creation (default is
171+
* {@code false})
172+
* @param deploymentProperties the optional deployment properties to use when the stream is deployed upon creation
173+
* @param description description of the stream definition
174+
* @return the created stream definition
175+
* @throws DuplicateStreamDefinitionException if a stream definition with the same name
176+
* already exists
177+
* @throws InvalidStreamDefinitionException if there are errors parsing the stream DSL,
178+
* resolving the name, or type of applications in the stream
179+
*/
180+
@RequestMapping(value = "", method = RequestMethod.POST, consumes = MediaType.APPLICATION_JSON_VALUE)
181+
@ResponseStatus(HttpStatus.CREATED)
182+
public StreamDefinitionResource saveWithDeployProps(@RequestParam("name") String name,
183+
@RequestParam("definition") String dsl,
184+
@RequestParam(value = "description", defaultValue = "") String description,
185+
@RequestParam(value = "deploy", defaultValue = "false") boolean deploy,
186+
@RequestBody(required = false) Map<String, String> deploymentProperties) {
187+
StreamDefinition streamDefinition = this.streamService.createStream(name, dsl, description, deploy, deploymentProperties);
151188
return ((RepresentationModelAssembler<StreamDefinition, ? extends StreamDefinitionResource>)
152189
this.streamDefinitionAssemblerProvider.getStreamDefinitionAssembler(Collections.singletonList(streamDefinition))).toModel(streamDefinition);
153190
}

spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/service/StreamService.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -96,11 +96,13 @@ public interface StreamService {
9696
* @param description description of the stream definition
9797
* @param deploy if {@code true}, the stream is deployed upon creation (default is
9898
* {@code false})
99+
* @param deploymentProperties the optional deployment properties to use when the stream is deployed upon creation
99100
* @return the created stream definition already exists
100101
* @throws InvalidStreamDefinitionException if there are errors in parsing the stream DSL,
101102
* resolving the name, or type of applications in the stream
102103
*/
103-
StreamDefinition createStream(String streamName, String dsl, String description, boolean deploy);
104+
StreamDefinition createStream(String streamName, String dsl, String description, boolean deploy,
105+
Map<String, String> deploymentProperties);
104106

105107
/**
106108
* Deploys the stream with the user provided deployment properties.

spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/service/impl/DefaultStreamService.java

Lines changed: 4 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -393,19 +393,9 @@ public StreamDeployment info(String streamName) {
393393
return this.skipperStreamDeployer.getStreamInfo(streamName);
394394
}
395395

396-
/**
397-
* Create a new stream.
398-
*
399-
* @param streamName stream name
400-
* @param dsl DSL definition for stream
401-
* @param description description of the stream definition
402-
* @param deploy if {@code true}, the stream is deployed upon creation (default is
403-
* {@code false})
404-
* @return the created stream definition already exists
405-
* @throws InvalidStreamDefinitionException if there are errors in parsing the stream DSL,
406-
* resolving the name, or type of applications in the stream
407-
*/
408-
public StreamDefinition createStream(String streamName, String dsl, String description, boolean deploy) {
396+
@Override
397+
public StreamDefinition createStream(String streamName, String dsl, String description, boolean deploy,
398+
Map<String, String> deploymentProperties) {
409399
StreamDefinition streamDefinition = createStreamDefinition(streamName, dsl, description);
410400
List<String> errorMessages = new ArrayList<>();
411401

@@ -432,7 +422,7 @@ public StreamDefinition createStream(String streamName, String dsl, String descr
432422
final StreamDefinition savedStreamDefinition = this.streamDefinitionRepository.save(streamDefinition);
433423

434424
if (deploy) {
435-
this.deployStream(streamName, new HashMap<>());
425+
this.deployStream(streamName, deploymentProperties);
436426
}
437427

438428
auditRecordService.populateAndSaveAuditRecord(

spring-cloud-dataflow-server-core/src/test/java/org/springframework/cloud/dataflow/server/controller/StreamControllerTests.java

Lines changed: 99 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -24,15 +24,18 @@
2424
import java.util.HashMap;
2525
import java.util.List;
2626
import java.util.Map;
27+
import java.util.stream.Stream;
2728

2829
import com.fasterxml.jackson.databind.DeserializationFeature;
2930
import com.fasterxml.jackson.databind.MappingIterator;
3031
import com.fasterxml.jackson.databind.ObjectMapper;
3132
import com.fasterxml.jackson.dataformat.yaml.YAMLMapper;
32-
import org.junit.After;
33-
import org.junit.Before;
34-
import org.junit.Test;
35-
import org.junit.runner.RunWith;
33+
import org.junit.jupiter.api.AfterEach;
34+
import org.junit.jupiter.api.BeforeEach;
35+
import org.junit.jupiter.api.Test;
36+
import org.junit.jupiter.params.ParameterizedTest;
37+
import org.junit.jupiter.params.provider.Arguments;
38+
import org.junit.jupiter.params.provider.MethodSource;
3639
import org.mockito.ArgumentCaptor;
3740

3841
import org.springframework.beans.factory.annotation.Autowired;
@@ -74,13 +77,16 @@
7477
import org.springframework.http.MediaType;
7578
import org.springframework.test.annotation.DirtiesContext;
7679
import org.springframework.test.annotation.DirtiesContext.ClassMode;
77-
import org.springframework.test.context.junit4.SpringRunner;
7880
import org.springframework.test.web.servlet.MockMvc;
81+
import org.springframework.test.web.servlet.request.MockHttpServletRequestBuilder;
7982
import org.springframework.test.web.servlet.setup.MockMvcBuilders;
83+
import org.springframework.util.LinkedMultiValueMap;
84+
import org.springframework.util.MultiValueMap;
8085
import org.springframework.web.client.RestClientException;
8186
import org.springframework.web.context.WebApplicationContext;
8287

8388
import static org.assertj.core.api.Assertions.assertThat;
89+
import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException;
8490
import static org.hamcrest.Matchers.hasSize;
8591
import static org.hamcrest.Matchers.is;
8692
import static org.hamcrest.Matchers.startsWith;
@@ -110,7 +116,6 @@
110116
* @author Christian Tzolov
111117
* @author Daniel Serleg
112118
*/
113-
@RunWith(SpringRunner.class)
114119
@SpringBootTest(classes = TestDependencies.class)
115120
@DirtiesContext(classMode = ClassMode.BEFORE_EACH_TEST_METHOD)
116121
@AutoConfigureTestDatabase(replace = Replace.ANY)
@@ -138,7 +143,7 @@ public class StreamControllerTests {
138143
@Autowired
139144
private StreamDefinitionService streamDefinitionService;
140145

141-
@Before
146+
@BeforeEach
142147
public void setupMocks() {
143148
this.mockMvc = MockMvcBuilders.webAppContextSetup(wac)
144149
.defaultRequest(get("/").accept(MediaType.APPLICATION_JSON)).build();
@@ -156,24 +161,51 @@ public void setupMocks() {
156161
when(skipperClient.search(anyString(), eq(false))).thenReturn(new ArrayList<PackageMetadata>());
157162
}
158163

159-
@After
164+
@AfterEach
160165
public void tearDown() {
161166
repository.deleteAll();
162167
auditRecordRepository.deleteAll();
163168
assertThat(repository.count()).isZero();
164169
assertThat(auditRecordRepository.count()).isZero();
165170
}
166171

167-
@Test(expected = IllegalArgumentException.class)
172+
@Test
168173
public void testConstructorMissingStreamService() {
169-
new StreamDefinitionController(null, null, null, null, null);
174+
assertThatIllegalArgumentException()
175+
.isThrownBy(() -> {
176+
new StreamDefinitionController(null, null, null, null, null);
177+
});
170178
}
171179

172180
@Test
173-
public void testSave() throws Exception {
181+
public void testSaveNoDeployJsonEncoded() throws Exception {
174182
assertThat(repository.count()).isZero();
175-
mockMvc.perform(post("/streams/definitions/").param("name", "myStream").param("definition", "time | log")
176-
.accept(MediaType.APPLICATION_JSON)).andDo(print()).andExpect(status().isCreated());
183+
mockMvc.perform(post("/streams/definitions/")
184+
.param("name", "myStream")
185+
.param("definition", "time | log")
186+
.contentType(MediaType.APPLICATION_JSON)
187+
.accept(MediaType.APPLICATION_JSON))
188+
.andDo(print())
189+
.andExpect(status().isCreated());
190+
assertThatStreamSavedWithoutDeploy();
191+
}
192+
193+
@Test
194+
public void testSaveNoDeployFormEncoded() throws Exception {
195+
assertThat(repository.count()).isZero();
196+
MultiValueMap<String, String> values = new LinkedMultiValueMap<>();
197+
values.add("name", "myStream");
198+
values.add("definition", "time | log");
199+
mockMvc.perform(post("/streams/definitions/")
200+
.params(values)
201+
.contentType(MediaType.APPLICATION_FORM_URLENCODED)
202+
.accept(MediaType.APPLICATION_JSON))
203+
.andDo(print())
204+
.andExpect(status().isCreated());
205+
assertThatStreamSavedWithoutDeploy();
206+
}
207+
208+
private void assertThatStreamSavedWithoutDeploy() {
177209
assertThat(repository.count()).isEqualTo(1);
178210
StreamDefinition myStream = repository.findById("myStream").get();
179211
assertThat(myStream.getDslText()).isEqualTo("time | log");
@@ -189,6 +221,60 @@ public void testSave() throws Exception {
189221
assertThat(logDefinition.getProperties().get(BindingPropertyKeys.INPUT_GROUP)).isEqualTo("myStream");
190222
}
191223

224+
@ParameterizedTest
225+
@MethodSource("testSaveAndDeployWithDeployPropsProvider")
226+
public void testSaveAndDeploy(Map<String, String> deploymentProps, Map<String, String> expectedPropsOnApps) throws Exception {
227+
assertThat(repository.count()).isZero();
228+
String definition = "time | log";
229+
String streamName = "testSaveAndDeploy-stream";
230+
MockHttpServletRequestBuilder requestBuilder = post("/streams/definitions/")
231+
.param("name", streamName)
232+
.param("definition", definition)
233+
.param("deploy", "true")
234+
.accept(MediaType.APPLICATION_JSON);
235+
if (deploymentProps != null) {
236+
requestBuilder
237+
.content(new ObjectMapper().writeValueAsBytes(deploymentProps))
238+
.contentType(MediaType.APPLICATION_JSON);
239+
}
240+
mockMvc.perform(requestBuilder)
241+
.andDo(print())
242+
.andExpect(status().isCreated());
243+
244+
assertThat(repository.count()).isEqualTo(1);
245+
assertThat(repository.findById(streamName)).isPresent()
246+
.hasValueSatisfying((sd) -> {
247+
assertThat(sd.getName()).isEqualTo(streamName);
248+
assertThat(sd.getDslText()).isEqualTo(definition);
249+
})
250+
.map(this.streamDefinitionService::getAppDefinitions).get().asList()
251+
.extracting("name").containsExactly("time", "log");
252+
253+
ArgumentCaptor<UploadRequest> uploadRequestCaptor = ArgumentCaptor.forClass(UploadRequest.class);
254+
verify(skipperClient, times(1)).upload(uploadRequestCaptor.capture());
255+
256+
List<UploadRequest> uploadRequests = uploadRequestCaptor.getAllValues();
257+
assertThat(uploadRequests).hasSize(1);
258+
Package pkg = SkipperPackageUtils.loadPackageFromBytes(uploadRequestCaptor);
259+
260+
Package timePackage = findChildPackageByName(pkg, "time");
261+
assertThat(timePackage).isNotNull();
262+
SpringCloudDeployerApplicationSpec timeSpec = parseSpec(timePackage.getConfigValues().getRaw());
263+
assertThat(timeSpec.getDeploymentProperties()).containsAllEntriesOf(expectedPropsOnApps);
264+
265+
Package logPackage = findChildPackageByName(pkg, "log");
266+
assertThat(logPackage).isNotNull();
267+
SpringCloudDeployerApplicationSpec logAppSpec = parseSpec(logPackage.getConfigValues().getRaw());
268+
assertThat(logAppSpec.getDeploymentProperties()).containsAllEntriesOf(expectedPropsOnApps);
269+
}
270+
271+
private static Stream<Arguments> testSaveAndDeployWithDeployPropsProvider() {
272+
return Stream.of(
273+
Arguments.of(Collections.singletonMap("deployer.*.count", "2"), Collections.singletonMap("spring.cloud.deployer.count", "2")),
274+
Arguments.of(Collections.emptyMap(), Collections.emptyMap()),
275+
Arguments.of(null, Collections.emptyMap()));
276+
}
277+
192278
@Test
193279
public void testSaveWithSensitiveProperties() throws Exception {
194280
assertThat(repository.count()).isZero();

spring-cloud-dataflow-server-core/src/test/java/org/springframework/cloud/dataflow/server/service/impl/DefaultStreamServiceTests.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,7 @@ public void createStream() {
127127
final StreamDefinition expectedStreamDefinition = new StreamDefinition("testStream", "time | log");
128128
when(streamDefinitionRepository.save(expectedStreamDefinition)).thenReturn(expectedStreamDefinition);
129129

130-
this.defaultStreamService.createStream("testStream", "time | log", "demo stream", false);
130+
this.defaultStreamService.createStream("testStream", "time | log", "demo stream", false, null);
131131

132132
verify(this.streamValidationService).isRegistered("time", ApplicationType.source);
133133
verify(this.streamValidationService).isRegistered("log", ApplicationType.sink);
@@ -150,7 +150,7 @@ public void createStreamWithMissingApps() {
150150
thrown.expectMessage("Application name 'time' with type 'source' does not exist in the app registry.\n" +
151151
"Application name 'log' with type 'sink' does not exist in the app registry.");
152152

153-
this.defaultStreamService.createStream("testStream", "time | log", "demo stream", false);
153+
this.defaultStreamService.createStream("testStream", "time | log", "demo stream", false, null);
154154
}
155155

156156
@Test
@@ -161,7 +161,7 @@ public void createStreamInvalidDsl() {
161161
thrown.expect(InvalidStreamDefinitionException.class);
162162
thrown.expectMessage("Application name 'koza' with type 'app' does not exist in the app registry.");
163163

164-
this.defaultStreamService.createStream("testStream", "koza", "demo stream", false);
164+
this.defaultStreamService.createStream("testStream", "koza", "demo stream", false, null);
165165
}
166166

167167
@Test

0 commit comments

Comments
 (0)