Skip to content

Commit fb88465

Browse files
authored
[inner-1543/1498] fix:second push and ddl's hang (#3107)
1 parent b1e2485 commit fb88465

File tree

4 files changed

+78
-27
lines changed

4 files changed

+78
-27
lines changed

src/main/java/com/actiontech/dble/backend/mysql/nio/handler/MultiNodeDDLExecuteHandler.java

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,13 +30,16 @@
3030

3131
import java.util.ArrayList;
3232
import java.util.HashSet;
33+
import java.util.concurrent.atomic.AtomicBoolean;
3334

3435
/**
3536
* @author mycat
3637
*/
3738
public class MultiNodeDDLExecuteHandler extends MultiNodeQueryHandler {
3839
private static final Logger LOGGER = LoggerFactory.getLogger(MultiNodeQueryHandler.class);
3940

41+
private AtomicBoolean specialHandleFlag = new AtomicBoolean(false); // execute special handling only once
42+
4043
public MultiNodeDDLExecuteHandler(RouteResultset rrs, NonBlockingSession session) {
4144
super(rrs, session, true);
4245
}
@@ -99,7 +102,7 @@ public void errorResponse(byte[] data, @NotNull AbstractService service) {
99102
}
100103
errConnection.add((MySQLResponseService) service);
101104
if (decrementToZero((MySQLResponseService) service)) {
102-
session.handleSpecial(rrs, false, getDDLErrorInfo());
105+
handleSpecial(rrs, false, getDDLErrorInfo());
103106
DDLTraceManager.getInstance().endDDL(session.getShardingService(), getDDLErrorInfo());
104107
LOGGER.warn("DDL execution failed");
105108
if (byteBuffer != null) {
@@ -173,13 +176,13 @@ public void okResponse(byte[] data, @NotNull AbstractService service) {
173176
if (!decrementToZero((MySQLResponseService) service))
174177
return;
175178
if (isFail()) {
176-
session.handleSpecial(rrs, false, null);
179+
handleSpecial(rrs, false, null);
177180
DDLTraceManager.getInstance().endDDL(source, "ddl end with execution failure");
178181
LOGGER.warn("DDL execution failed");
179182
session.resetMultiStatementStatus();
180183
handleEndPacket(err, false);
181184
} else {
182-
boolean metaInitial = session.handleSpecial(rrs, true, null);
185+
boolean metaInitial = handleSpecial(rrs, true, null);
183186
if (!metaInitial) {
184187
DDLTraceManager.getInstance().endDDL(source, "ddl end with meta failure");
185188
executeMetaDataFailed(null);
@@ -247,7 +250,7 @@ private void executeError(MySQLResponseService service) {
247250
}
248251
errConnection.add(service);
249252
if (canResponse()) {
250-
session.handleSpecial(rrs, false, null);
253+
handleSpecial(rrs, false, null);
251254
DDLTraceManager.getInstance().endDDL(session.getShardingService(), new String(err.getMessage()));
252255
LOGGER.warn("DDL execution failed");
253256
if (byteBuffer == null) {
@@ -272,6 +275,13 @@ private String getDDLErrorInfo() {
272275
return s.toString();
273276
}
274277

278+
private boolean handleSpecial(RouteResultset rrs0, boolean isSuccess, String errInfo) {
279+
if (specialHandleFlag.compareAndSet(false, true)) {
280+
return session.handleSpecial(rrs0, isSuccess, errInfo);
281+
}
282+
return true;
283+
}
284+
275285
protected void handleEndPacket(MySQLPacket packet, boolean isSuccess) {
276286
session.clearResources(false);
277287
session.setResponseTime(isSuccess);

src/main/java/com/actiontech/dble/backend/mysql/nio/handler/MultiNodeDdlPrepareHandler.java

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ public class MultiNodeDdlPrepareHandler extends MultiNodeHandler implements Exec
5050
private ErrorPacket err;
5151
private Set<MySQLResponseService> closedConnSet;
5252
private volatile boolean finishedTest = false;
53-
private AtomicBoolean releaseDDLLock = new AtomicBoolean(false);
53+
private AtomicBoolean specialHandleFlag = new AtomicBoolean(false); // execute special handling only once
5454

5555
public MultiNodeDdlPrepareHandler(RouteResultset rrs, NonBlockingSession session) {
5656
super(session);
@@ -178,9 +178,7 @@ private void executeConnError() {
178178
setFail(new String(err.getMessage()));
179179
}
180180
if (canResponse() && errorResponse.compareAndSet(false, true)) {
181-
if (releaseDDLLock.compareAndSet(false, true)) {
182-
session.handleSpecial(oriRrs, false, null);
183-
}
181+
handleSpecial(oriRrs, false);
184182
handleRollbackPacket(err.toBytes(), "DDL prepared failed");
185183
}
186184
}
@@ -230,7 +228,7 @@ public void errorResponse(byte[] data, @NotNull AbstractService service) {
230228
setFail(new String(errPacket.getMessage()));
231229
}
232230
if (decrementToZero((MySQLResponseService) service) && errorResponse.compareAndSet(false, true)) {
233-
session.handleSpecial(oriRrs, false, null);
231+
handleSpecial(oriRrs, false);
234232
handleRollbackPacket(err.toBytes(), "DDL prepared failed");
235233
}
236234
} finally {
@@ -265,7 +263,7 @@ public void rowEofResponse(final byte[] eof, boolean isLeft, @NotNull AbstractSe
265263

266264
if (this.isFail()) {
267265
if (errorResponse.compareAndSet(false, true)) {
268-
session.handleSpecial(oriRrs, false, null);
266+
handleSpecial(oriRrs, false);
269267
handleRollbackPacket(err.toBytes(), "DDL prepared failed");
270268
}
271269
} else {
@@ -277,7 +275,7 @@ public void rowEofResponse(final byte[] eof, boolean isLeft, @NotNull AbstractSe
277275
if (!session.isKilled()) {
278276
handler.execute();
279277
} else {
280-
session.handleSpecial(oriRrs, false, null);
278+
handleSpecial(oriRrs, false);
281279
DDLTraceManager.getInstance().endDDL(shardingService, "Query was interrupted");
282280
ErrorPacket errPacket = new ErrorPacket();
283281
errPacket.setPacketId(session.getShardingService().nextPacketId());
@@ -287,7 +285,7 @@ public void rowEofResponse(final byte[] eof, boolean isLeft, @NotNull AbstractSe
287285
}
288286
} catch (Exception e) {
289287
LOGGER.warn(String.valueOf(shardingService) + oriRrs, e);
290-
session.handleSpecial(oriRrs, false, null);
288+
handleSpecial(oriRrs, false);
291289
DDLTraceManager.getInstance().endDDL(shardingService, "take Connection error:" + e.getMessage());
292290
shardingService.writeErrMessage(ErrorCode.ERR_HANDLE_DATA, e.toString());
293291
}
@@ -335,16 +333,20 @@ private void handleRollbackPacket(byte[] data, String reason) {
335333
source.write(data, WriteFlags.SESSION_END);
336334
}
337335

336+
private boolean handleSpecial(RouteResultset rrs0, boolean isSuccess) {
337+
if (specialHandleFlag.compareAndSet(false, true)) {
338+
return session.handleSpecial(rrs0, isSuccess, null);
339+
}
340+
return true;
341+
}
338342

339343
public boolean clearIfSessionClosed() {
340344
if (session.closed()) {
341345
if (LOGGER.isDebugEnabled()) {
342346
LOGGER.debug("session closed without execution,clear resources " + session);
343347
}
348+
handleSpecial(oriRrs, false);
344349
session.clearResources(true);
345-
if (releaseDDLLock.compareAndSet(false, true)) {
346-
session.handleSpecial(oriRrs, false, null);
347-
}
348350
this.clearResources();
349351
return true;
350352
} else {

src/main/java/com/actiontech/dble/backend/mysql/nio/handler/SingleNodeDDLHandler.java

Lines changed: 37 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -14,14 +14,21 @@
1414
import com.actiontech.dble.singleton.DDLTraceManager;
1515
import com.actiontech.dble.util.StringUtil;
1616
import org.jetbrains.annotations.NotNull;
17+
import org.slf4j.Logger;
18+
import org.slf4j.LoggerFactory;
1719

1820
import javax.annotation.Nullable;
21+
import java.util.concurrent.atomic.AtomicBoolean;
1922

2023
/**
2124
* Created by szf on 2019/12/3.
2225
*/
2326
public class SingleNodeDDLHandler extends SingleNodeHandler {
2427

28+
private static final Logger LOGGER = LoggerFactory.getLogger(SingleNodeDDLHandler.class);
29+
30+
private AtomicBoolean specialHandleFlag = new AtomicBoolean(false); // execute special handling only once
31+
2532
public SingleNodeDDLHandler(RouteResultset rrs, NonBlockingSession session) {
2633
super(rrs, session);
2734
}
@@ -66,7 +73,7 @@ public void okResponse(byte[] data, @NotNull AbstractService service) {
6673
if (executeResponse) {
6774
DDLTraceManager.getInstance().updateConnectionStatus(session.getShardingService(), (MySQLResponseService) service, DDLTraceInfo.DDLConnectionStatus.CONN_EXECUTE_SUCCESS);
6875
// handleSpecial
69-
boolean metaInitial = session.handleSpecial(rrs, true, null);
76+
boolean metaInitial = handleSpecial(rrs, true);
7077
if (!metaInitial) {
7178
DDLTraceManager.getInstance().endDDL(session.getShardingService(), "ddl end with meta failure");
7279
executeMetaDataFailed((MySQLResponseService) service, null);
@@ -110,6 +117,13 @@ public void executeMetaDataFailed(MySQLResponseService service, String errMsg) {
110117

111118
@Override
112119
protected void backConnectionErr(ErrorPacket errPkg, @Nullable MySQLResponseService service, boolean syncFinished) {
120+
ShardingService shardingService = session.getShardingService();
121+
String errMsg = "errNo:" + errPkg.getErrNo() + " " + new String(errPkg.getMessage());
122+
LOGGER.info("execute sql err:{}, con:{}, frontend host:{}/{}/{}", errMsg, service,
123+
shardingService.getConnection().getHost(),
124+
shardingService.getConnection().getLocalPort(),
125+
shardingService.getUser());
126+
113127
if (service != null && !service.isFakeClosed()) {
114128
if (service.getConnection().isClosed()) {
115129
if (service.getAttachment() != null) {
@@ -127,19 +141,33 @@ protected void backConnectionErr(ErrorPacket errPkg, @Nullable MySQLResponseServ
127141
}
128142
}
129143

130-
ShardingService shardingService = session.getShardingService();
131-
String errMsg = "errNo:" + errPkg.getErrNo() + " " + new String(errPkg.getMessage());
132-
LOGGER.info("execute sql err:{}, con:{}, frontend host:{}/{}/{}", errMsg, service,
133-
shardingService.getConnection().getHost(),
134-
shardingService.getConnection().getLocalPort(),
135-
shardingService.getUser());
136-
137144
if (writeToClient.compareAndSet(false, true)) {
138-
session.handleSpecial(rrs, false, null);
145+
handleSpecial(rrs, false);
139146
handleEndPacket(errPkg, false);
140147
}
141148
}
142149

150+
public boolean clearIfSessionClosed() {
151+
if (session.closed()) {
152+
if (LOGGER.isDebugEnabled()) {
153+
LOGGER.debug("session closed without execution,clear resources " + session);
154+
}
155+
handleSpecial(rrs, false);
156+
session.clearResources(true);
157+
recycleBuffer();
158+
return true;
159+
} else {
160+
return false;
161+
}
162+
}
163+
164+
private boolean handleSpecial(RouteResultset rrs0, boolean isSuccess) {
165+
if (specialHandleFlag.compareAndSet(false, true)) {
166+
return session.handleSpecial(rrs0, isSuccess, null);
167+
}
168+
return true;
169+
}
170+
143171
protected void handleEndPacket(MySQLPacket packet, boolean isSuccess) {
144172
session.clearResources(false);
145173
session.setResponseTime(isSuccess);

src/main/java/com/actiontech/dble/backend/mysql/nio/handler/SingleNodeHandler.java

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -116,9 +116,7 @@ public void execute() throws Exception {
116116
}
117117

118118
protected void execute(BackendConnection conn) {
119-
if (session.closed()) {
120-
session.clearResources(true);
121-
recycleBuffer();
119+
if (clearIfSessionClosed()) {
122120
return;
123121
}
124122
conn.getBackendService().setResponseHandler(this);
@@ -465,4 +463,17 @@ public String toString() {
465463
return "SingleNodeHandler [node=" + node + ", packetId=" + (byte) session.getShardingService().getPacketId().get() + "]";
466464
}
467465

466+
public boolean clearIfSessionClosed() {
467+
if (session.closed()) {
468+
if (LOGGER.isDebugEnabled()) {
469+
LOGGER.debug("session closed without execution,clear resources " + session);
470+
}
471+
session.clearResources(true);
472+
recycleBuffer();
473+
return true;
474+
} else {
475+
return false;
476+
}
477+
}
478+
468479
}

0 commit comments

Comments
 (0)