Skip to content

modify to throw MqttException if MqttAsyncClient.connect() is called when there is no message broker #1022

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,7 @@
import java.util.Enumeration;
import java.util.Properties;
import java.util.Vector;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.*;

import org.eclipse.paho.client.mqttv3.BufferedMessage;
import org.eclipse.paho.client.mqttv3.IMqttActionListener;
Expand Down Expand Up @@ -271,9 +270,8 @@ public void close(boolean force) throws MqttException {
* @param token The {@link MqttToken} to track the connection
* @throws MqttException if an error occurs when connecting
*/
public void connect(MqttConnectOptions options, MqttToken token) throws MqttException {
public synchronized void connect(MqttConnectOptions options, MqttToken token) throws MqttException {
final String methodName = "connect";
synchronized (conLock) {
if (isDisconnected() && !closePending) {
//@TRACE 214=state=CONNECTING
log.fine(CLASS_NAME,methodName,"214");
Expand All @@ -297,7 +295,11 @@ public void connect(MqttConnectOptions options, MqttToken token) throws MqttExce

tokenStore.open();
ConnectBG conbg = new ConnectBG(this, token, connect, executorService);
conbg.start();
try {
conbg.start();
} catch (MqttException e){
throw e;
}
}
else {
// @TRACE 207=connect failed: not disconnected {0}
Expand All @@ -312,7 +314,6 @@ public void connect(MqttConnectOptions options, MqttToken token) throws MqttExce
throw ExceptionHelper.createMqttException(MqttException.REASON_CODE_CLIENT_CONNECTED);
}
}
}
}

public void connectComplete( MqttConnack cack, MqttException mex) throws MqttException {
Expand Down Expand Up @@ -400,7 +401,8 @@ public void shutdownConnection(MqttToken token, MqttException reason) {
// Clean session handling and tidy up
clientState.disconnected(reason);
if (clientState.getCleanSession())
callback.removeMessageListeners();

callback.removeMessageListeners();
}catch(Exception ex) {
// Ignore as we are shutting down
}
Expand All @@ -417,8 +419,10 @@ public void shutdownConnection(MqttToken token, MqttException reason) {
}

}catch(Exception ex) {

// Ignore as we are shutting down
}

// All disconnect logic has been completed allowing the
// client to be marked as disconnected.
synchronized(conLock) {
Expand All @@ -427,6 +431,7 @@ public void shutdownConnection(MqttToken token, MqttException reason) {

conState = DISCONNECTED;
stoppingComms = false;

}

// Internal disconnect processing has completed. If there
Expand All @@ -436,6 +441,7 @@ public void shutdownConnection(MqttToken token, MqttException reason) {
// any outstanding tokens and unblock any waiters
if (endToken != null && callback != null) {
callback.asyncOperationComplete(endToken);

}

if (wasConnected && callback != null) {
Expand All @@ -449,6 +455,7 @@ public void shutdownConnection(MqttToken token, MqttException reason) {
try {
close(true);
} catch (Exception e) { // ignore any errors as closing

}
}
}
Expand Down Expand Up @@ -692,11 +699,17 @@ private class ConnectBG implements Runnable {
threadName = "MQTT Con: "+getClient().getClientId();
}

void start() {
void start() throws MqttException {
Future<?> exceptionFuture = null;
if (executorService == null) {
new Thread(this).start();
exceptionFuture = Executors.newSingleThreadExecutor().submit(this);
} else {
executorService.execute(this);
exceptionFuture = executorService.submit(this);
}
try {
Object g = exceptionFuture.get();;
} catch (Exception e) {
throw new MqttException(MqttException.REASON_CODE_SERVER_CONNECT_ERROR);
}
}

Expand Down Expand Up @@ -734,13 +747,16 @@ public void run() {
//@TRACE 212=connect failed: unexpected exception
log.fine(CLASS_NAME, methodName, "212", null, ex);
mqttEx = ex;

} catch (Exception ex) {
//@TRACE 209=connect failed: unexpected exception
log.fine(CLASS_NAME, methodName, "209", null, ex);
mqttEx = ExceptionHelper.createMqttException(ex);
}

if (mqttEx != null) {
if(mqttEx.getReasonCode() == MqttException.REASON_CODE_SERVER_CONNECT_ERROR)
throw new RuntimeException(mqttEx);
shutdownConnection(conToken, mqttEx);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -763,7 +763,11 @@ public IMqttToken connect(MqttConnectOptions options, Object userContext, IMqttA
}

comms.setNetworkModuleIndex(0);
connectActionListener.connect();
try {
connectActionListener.connect();
} catch (MqttException e) {
throw e;
}

return userToken;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ public void onFailure(IMqttToken token, Throwable exception) {
try {
connect();
}
catch (MqttPersistenceException e) {
catch (MqttException e) {
onFailure(token, e); // try the next URI in the list
}
}
Expand Down Expand Up @@ -166,7 +166,7 @@ public void onFailure(IMqttToken token, Throwable exception) {
* Start the connect processing
* @throws MqttPersistenceException if an error is thrown whilst setting up persistence
*/
public void connect() throws MqttPersistenceException {
public void connect() throws MqttException {
MqttToken token = new MqttToken(client.getClientId());
token.setActionCallback(this);
token.setUserContext(this);
Expand All @@ -185,6 +185,8 @@ public void connect() throws MqttPersistenceException {
comms.connect(options, token);
}
catch (MqttException e) {
if(e.getReasonCode() == MqttException.REASON_CODE_SERVER_CONNECT_ERROR)
throw e;
onFailure(token, e);
}
}
Expand Down