-
-
Notifications
You must be signed in to change notification settings - Fork 639
Description
This issue was written for mysql
, but the problem also exists in mysql2
. The mysql
repository no long accepts issues from non-contributors... The patch attached is for mysql
, but I believe the same changes can be made to query.js
in mysql2
.
I'll look into making a PR with tests when I have the time.
Running this script will hang the process when pool.end
is called:
test.mjs
import { createPool } from 'mysql'; // mysql or mysql2
import { pipeline } from 'node:stream/promises';
const pool = createPool({
host: process.env.MYSQL_HOST,
port: Number(process.env.MYSQL_PORT),
user: process.env.MYSQL_USER,
password: process.env.MYSQL_PASSWORD,
database: process.env.MYSQL_DATABASE,
poolConnectionLimit: 1,
multipleStatements: true,
});
const promisify = (fn) =>
new Promise((resolve, reject) =>
fn((exn, conn) => (exn ? reject(exn) : resolve(conn)))
);
const conn = await promisify((cb) => pool.getConnection(cb));
try {
const stream = conn.query('select 1; select sleep(1);').stream({});
await pipeline(stream, async function* (source) {
for await (const chunk of source) {
console.log('chunk:', chunk);
throw new Error();
}
}).catch((err) => console.log('caught:', err));
} finally {
conn.release();
console.log('released');
}
// also hangs here
// console.log(await promisify((cb) => conn.query('select 1', cb)));
await promisify((cb) => pool.end(cb));
console.log('done');
This was noticed during test cleanup from pool.end
, but it will also prevent additional queries after release
is called.
The issue is that the connection gets paused, but is never resumed. This prevents the protocol sequence from ever processing Quit
and the necessary callbacks from end
.
Here is a yarn patch I've made for mysql to fix the problem:
Details
diff --git a/lib/protocol/sequences/Query.js b/lib/protocol/sequences/Query.js
index b7632959b6b18eff0a239456c67b2696a5b931bd..d085e63dde4d320ac34a2b39d74386f661f66e7a 100644
--- a/lib/protocol/sequences/Query.js
+++ b/lib/protocol/sequences/Query.js
@@ -190,37 +190,50 @@ Query.prototype._sendLocalDataFile = function(path) {
};
Query.prototype.stream = function(options) {
- var self = this;
-
- options = options || {};
- options.objectMode = true;
-
- var stream = new Readable(options);
-
- stream._read = function() {
- self._connection && self._connection.resume();
- };
+ const self = this;
+ const stream = new Readable({
+ ...options,
+ objectMode: true,
+ destroy() {
+ // Must resume connection or else it will block
+ self._connection?.resume();
+
+ self.removeListener('result', on_result);
+ self.removeListener('error', on_error);
+ self.removeListener('end', on_end);
+ self.removeListener('fields', on_fields);
+
+ stream.removeListener('end', once_end);
+ },
+ read() {
+ self._connection?.resume();
+ },
+ });
- stream.once('end', function() {
+ stream.once('end', function once_end() {
process.nextTick(function () {
stream.emit('close');
});
});
- this.on('result', function(row, i) {
+ this.on('result', function on_result(row, i) {
+ if (stream.destroyed) return;
if (!stream.push(row)) self._connection.pause();
stream.emit('result', row, i); // replicate old emitter
});
- this.on('error', function(err) {
+ this.on('error', function on_error(err) {
+ if (stream.destroyed) return;
stream.emit('error', err); // Pass on any errors
});
- this.on('end', function() {
+ this.on('end', function on_end() {
+ if (stream.destroyed) return;
stream.push(null); // pushing null, indicating EOF
});
- this.on('fields', function(fields, i) {
+ this.on('fields', function on_fields(fields, i) {
+ if (stream.destroyed) return;
stream.emit('fields', fields, i); // replicate old emitter
});
However, this patch may not be sufficient to fix the problem in all situations. I think resume
should be called in release
and end
functions.
This problem is also persistent in mysql2.