Skip to content

Commit 87a3caa

Browse files
author
zengfr
committed
add easymqtt4j
1 parent f476e46 commit 87a3caa

29 files changed

+1199
-0
lines changed

.gitignore

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,3 +21,8 @@
2121

2222
# virtual machine crash logs, see http://www.java.com/en/download/help/error_hotspot.xml
2323
hs_err_pid*
24+
/.idea/
25+
/*/target/
26+
*.imi
27+
*.iml
28+

easymqtt4j-activemq-plugins/pom.xml

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<project xmlns="http://maven.apache.org/POM/4.0.0"
3+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
4+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
5+
<parent>
6+
<artifactId>easymqtt4j</artifactId>
7+
<groupId>com.zengfr.easymqtt4j</groupId>
8+
<version>1.0</version>
9+
</parent>
10+
<modelVersion>4.0.0</modelVersion>
11+
12+
<artifactId>easymqtt4j-activemq-plugins</artifactId>
13+
14+
15+
</project>

easymqtt4j-artemis-plugins/pom.xml

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<project xmlns="http://maven.apache.org/POM/4.0.0"
3+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
4+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
5+
<parent>
6+
<artifactId>easymqtt4j</artifactId>
7+
<groupId>com.zengfr.easymqtt4j</groupId>
8+
<version>1.0</version>
9+
</parent>
10+
<modelVersion>4.0.0</modelVersion>
11+
<artifactId>easymqtt4j-artemis-plugins</artifactId>
12+
<packaging>jar</packaging>
13+
<build>
14+
15+
</build>
16+
<dependencies>
17+
<dependency>
18+
<groupId>org.apache.activemq</groupId>
19+
<artifactId>artemis-server</artifactId>
20+
</dependency>
21+
</dependencies>
22+
<dependencyManagement>
23+
<dependencies>
24+
<dependency>
25+
<groupId>org.apache.activemq</groupId>
26+
<artifactId>artemis-server</artifactId>
27+
<version>2.12.0</version>
28+
</dependency>
29+
</dependencies>
30+
</dependencyManagement>
31+
32+
33+
</project>

easymqtt4j-client/pom.xml

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<project xmlns="http://maven.apache.org/POM/4.0.0"
3+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
4+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
5+
<parent>
6+
<artifactId>easymqtt4j</artifactId>
7+
<groupId>com.zengfr.easymqtt4j</groupId>
8+
<version>1.0</version>
9+
</parent>
10+
<modelVersion>4.0.0</modelVersion>
11+
12+
<artifactId>easymqtt4j-client</artifactId>
13+
<dependencies>
14+
<dependency>
15+
<groupId>org.springframework.boot</groupId>
16+
<artifactId>spring-boot-starter</artifactId>
17+
</dependency>
18+
<dependency>
19+
<groupId>org.springframework.boot</groupId>
20+
<artifactId>spring-boot-starter-web</artifactId>
21+
</dependency>
22+
<dependency>
23+
<groupId>org.springframework.integration</groupId>
24+
<artifactId>spring-integration-mqtt</artifactId>
25+
</dependency>
26+
<dependency>
27+
<groupId>org.eclipse.paho</groupId>
28+
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
29+
</dependency>
30+
</dependencies>
31+
32+
</project>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
package com.zengfr.easymqtt4j.client.adapter;
2+
3+
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
4+
import org.eclipse.paho.client.mqttv3.MqttCallback;
5+
import org.eclipse.paho.client.mqttv3.MqttMessage;
6+
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
7+
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
8+
9+
/**
10+
* Created by zengfr on 2020/5/13.
11+
* https://gitee.com/zengfr/easymqtt4j
12+
* https://github.com/zengfr/easymqtt4j
13+
*/
14+
public class MqttPahoMessageDrivenChannelAdapterAdapter extends MqttPahoMessageDrivenChannelAdapter {
15+
private MqttCallback callback;
16+
17+
public MqttPahoMessageDrivenChannelAdapterAdapter(MqttCallback callback, String url, String clientId, MqttPahoClientFactory clientFactory, String... topic) {
18+
super(url, clientId, clientFactory, topic);
19+
this.callback = callback;
20+
}
21+
22+
public MqttPahoMessageDrivenChannelAdapterAdapter(MqttCallback callback, String clientId, MqttPahoClientFactory clientFactory, String... topic) {
23+
super(clientId, clientFactory, topic);
24+
this.callback = callback;
25+
}
26+
27+
public MqttPahoMessageDrivenChannelAdapterAdapter(MqttCallback callback, String url, String clientId, String... topic) {
28+
super(url, clientId, topic);
29+
this.callback = callback;
30+
}
31+
32+
@Override
33+
public void connectionLost(Throwable cause) {
34+
if (callback != null) {
35+
callback.connectionLost(cause);
36+
}
37+
super.connectionLost(cause);
38+
}
39+
40+
@Override
41+
public void messageArrived(String topic, MqttMessage message) {
42+
if (callback != null) {
43+
try {
44+
callback.messageArrived(topic, message);
45+
} catch (Exception e) {
46+
e.printStackTrace();
47+
}
48+
}
49+
super.messageArrived(topic, message);
50+
}
51+
52+
@Override
53+
public void deliveryComplete(IMqttDeliveryToken token) {
54+
if (callback != null) {
55+
callback.deliveryComplete(token);
56+
}
57+
super.deliveryComplete(token);
58+
}
59+
}
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
package com.zengfr.easymqtt4j.client.adapter;
2+
3+
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
4+
import org.eclipse.paho.client.mqttv3.MqttCallback;
5+
import org.eclipse.paho.client.mqttv3.MqttMessage;
6+
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
7+
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
8+
9+
/**
10+
* Created by zengfr on 2020/5/13.
11+
* https://gitee.com/zengfr/easymqtt4j
12+
* https://github.com/zengfr/easymqtt4j
13+
*/
14+
public class MqttPahoMessageHandlerAdapter extends MqttPahoMessageHandler {
15+
private MqttCallback callback;
16+
public MqttPahoMessageHandlerAdapter(MqttCallback callback, String url, String clientId, MqttPahoClientFactory clientFactory) {
17+
super(url, clientId, clientFactory);
18+
this.callback = callback;
19+
}
20+
21+
public MqttPahoMessageHandlerAdapter(MqttCallback callback, String clientId, MqttPahoClientFactory clientFactory) {
22+
super(clientId, clientFactory);
23+
this.callback = callback;
24+
}
25+
26+
public MqttPahoMessageHandlerAdapter(MqttCallback callback, String url, String clientId) {
27+
super(url, clientId);
28+
this.callback = callback;
29+
}
30+
31+
@Override
32+
public void connectionLost(Throwable cause) {
33+
if (callback != null) {
34+
callback.connectionLost(cause);
35+
}
36+
super.connectionLost(cause);
37+
}
38+
39+
@Override
40+
public void messageArrived(String topic, MqttMessage message) {
41+
if (callback != null) {
42+
try {
43+
callback.messageArrived(topic, message);
44+
} catch (Exception e) {
45+
e.printStackTrace();
46+
}
47+
}
48+
super.messageArrived(topic, message);
49+
}
50+
51+
@Override
52+
public void deliveryComplete(IMqttDeliveryToken token) {
53+
if (callback != null) {
54+
callback.deliveryComplete(token);
55+
}
56+
super.deliveryComplete(token);
57+
}
58+
}
Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
package com.zengfr.easymqtt4j.client.config;
2+
3+
4+
import com.zengfr.easymqtt4j.client.geteway.MqttEventGateway;
5+
import com.zengfr.easymqtt4j.client.listener.MqttEventListener;
6+
import com.zengfr.easymqtt4j.client.util.MqttBuilderUtil;
7+
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
8+
import org.springframework.beans.factory.annotation.Autowired;
9+
import org.springframework.beans.factory.annotation.Value;
10+
import org.springframework.context.annotation.Bean;
11+
import org.springframework.context.annotation.Configuration;
12+
import org.springframework.integration.annotation.IntegrationComponentScan;
13+
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
14+
15+
/**
16+
* Created by zengfr on 2020/5/12.
17+
* https://gitee.com/zengfr/easymqtt4j
18+
* https://github.com/zengfr/easymqtt4j
19+
*/
20+
@Configuration
21+
@IntegrationComponentScan
22+
public class MqttClientConfig {
23+
@Value("${spring.mqtt.mqttVersion:0}")
24+
private int mqttVersion;
25+
26+
@Value("${spring.mqtt.host.username}")
27+
private String username;
28+
29+
@Value("${spring.mqtt.host.password}")
30+
private String password;
31+
32+
@Value("${spring.mqtt.host.uris}")
33+
private String[] hostUris;
34+
35+
@Value("${spring.mqtt.conn.cleansession:false}")
36+
private boolean cleanSession;
37+
38+
@Value("${spring.mqtt.conn.keepaliveinterval:5}")
39+
private int keepAliveInterval;
40+
41+
@Value("${spring.mqtt.conn.maxInflight:99}")
42+
private int maxInflight;
43+
44+
@Value("${spring.mqtt.conn.lastwilltopic:will}")
45+
private String lastwillTopic;
46+
47+
@Value("${spring.mqtt.conn.lastwillmsg:offline}")
48+
private String lastwillMsg;
49+
50+
@Value("${spring.mqtt.conn.lastwillqos:2}")
51+
private short lastwillQos;
52+
53+
@Value("${spring.mqtt.conn.lastwillretain:true}")
54+
private boolean lastwillRetain;
55+
56+
@Value("${spring.mqtt.conn.usessl:false}")
57+
private boolean isUseSsl;
58+
@Autowired
59+
MqttEventGateway eventGateway;
60+
@Bean
61+
public MqttEventListener eventListener() {
62+
return new MqttEventListener(eventGateway);
63+
}
64+
@Bean
65+
public MqttPahoClientFactory mqttClientFactory() {
66+
MqttPahoClientFactory factory = MqttBuilderUtil.buildClientFactory(getMqttConnectOptions());
67+
return factory;
68+
}
69+
70+
@Bean
71+
public MqttConnectOptions getMqttConnectOptions() {
72+
MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
73+
mqttConnectOptions.setUserName(username);
74+
mqttConnectOptions.setPassword(password.toCharArray());
75+
mqttConnectOptions.setServerURIs(hostUris);
76+
mqttConnectOptions.setKeepAliveInterval(keepAliveInterval);
77+
mqttConnectOptions.setAutomaticReconnect(true);
78+
mqttConnectOptions.setMaxInflight(maxInflight);
79+
mqttConnectOptions.setCleanSession(cleanSession);
80+
mqttConnectOptions.setMqttVersion(mqttVersion);
81+
if (isUseSsl) {
82+
mqttConnectOptions.setSSLProperties(null);
83+
}
84+
mqttConnectOptions.setWill(lastwillTopic, lastwillMsg.getBytes(), lastwillQos, lastwillRetain);
85+
return mqttConnectOptions;
86+
}
87+
}
Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
package com.zengfr.easymqtt4j.client.config;
2+
3+
import com.zengfr.easymqtt4j.client.geteway.MqttEventGateway;
4+
import com.zengfr.easymqtt4j.client.interceptor.MqttChannelInterceptor;
5+
import com.zengfr.easymqtt4j.client.listener.MqttEventListener;
6+
import com.zengfr.easymqtt4j.client.util.MqttBuilderUtil;
7+
import com.zengfr.easymqtt4j.client.util.MqttUtil;
8+
import org.eclipse.paho.client.mqttv3.MqttCallback;
9+
import org.springframework.beans.factory.annotation.Autowired;
10+
import org.springframework.beans.factory.annotation.Value;
11+
import org.springframework.context.annotation.Bean;
12+
import org.springframework.context.annotation.Configuration;
13+
import org.springframework.integration.annotation.IntegrationComponentScan;
14+
import org.springframework.integration.annotation.ServiceActivator;
15+
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
16+
import org.springframework.integration.mqtt.support.MqttMessageConverter;
17+
import org.springframework.messaging.MessageChannel;
18+
import org.springframework.messaging.MessageHandler;
19+
import org.springframework.messaging.support.ChannelInterceptor;
20+
21+
/**
22+
* Created by zengfr on 2020/5/12.
23+
* https://gitee.com/zengfr/easymqtt4j
24+
* https://github.com/zengfr/easymqtt4j
25+
*/
26+
@Configuration
27+
@IntegrationComponentScan
28+
public class MqttPublisherConfig {
29+
30+
@Value("${spring.mqtt.publisher.id:publisher}")
31+
private String clientId;
32+
@Value("${spring.mqtt.publisher.defaulttopic:}")
33+
private String defaultTopic;
34+
@Value("${spring.mqtt.publisher.completionTimeout:5000}")
35+
private int completionTimeout;
36+
@Value("${spring.mqtt.publisher.threads:2}")
37+
private int threads;
38+
@Value("${spring.mqtt.publisher.isuserndclientId:false}")
39+
private boolean isUseRndClientId;
40+
@Value("${spring.mqtt.publisher.ispayloadasbytes:false}")
41+
private boolean isPayloadAsBytes;
42+
@Autowired
43+
MqttPahoClientFactory mqttClientFactory;
44+
@Autowired
45+
MqttEventListener eventListener;
46+
@Autowired
47+
MqttEventGateway eventGateway;
48+
@Bean
49+
public ChannelInterceptor mqttOutboundChannelInterceptor() {
50+
51+
return new MqttChannelInterceptor(clientId, eventGateway);
52+
}
53+
@Bean
54+
public MessageChannel mqttOutboundChannel() {
55+
return MqttBuilderUtil.buildMessageChannel(threads, mqttOutboundChannelInterceptor());
56+
}
57+
58+
@Bean
59+
@ServiceActivator(inputChannel = "mqttOutboundChannel")
60+
public MessageHandler mqttOutboundMessageHandler() {
61+
String id = clientId;
62+
if (isUseRndClientId) {
63+
id = clientId + "_" + MqttUtil.getNowString();
64+
}
65+
MqttCallback callBackListener = MqttBuilderUtil.buildMqttCallback(clientId, eventGateway);
66+
MqttMessageConverter messageConverter = MqttBuilderUtil.buildMessageConverter(isPayloadAsBytes);
67+
68+
MessageHandler messageHandler = MqttBuilderUtil.buildMessageHandler(
69+
mqttClientFactory, id, completionTimeout, messageConverter,
70+
defaultTopic, eventListener, callBackListener);
71+
return messageHandler;
72+
}
73+
}

0 commit comments

Comments
 (0)