Skip to content

Streaming pipeline errors block connections #3596

@CarlOlson

Description

@CarlOlson

This issue was written for mysql, but the problem also exists in mysql2. The mysql repository no long accepts issues from non-contributors... The patch attached is for mysql, but I believe the same changes can be made to query.js in mysql2.

I'll look into making a PR with tests when I have the time.


Running this script will hang the process when pool.end is called:

test.mjs
import { createPool } from 'mysql'; // mysql or mysql2
import { pipeline } from 'node:stream/promises';

const pool = createPool({
  host: process.env.MYSQL_HOST,
  port: Number(process.env.MYSQL_PORT),
  user: process.env.MYSQL_USER,
  password: process.env.MYSQL_PASSWORD,
  database: process.env.MYSQL_DATABASE,
  poolConnectionLimit: 1,
  multipleStatements: true,
});

const promisify = (fn) =>
  new Promise((resolve, reject) =>
    fn((exn, conn) => (exn ? reject(exn) : resolve(conn)))
  );

const conn = await promisify((cb) => pool.getConnection(cb));

try {
  const stream = conn.query('select 1; select sleep(1);').stream({});
  await pipeline(stream, async function* (source) {
    for await (const chunk of source) {
      console.log('chunk:', chunk);
      throw new Error();
    }
  }).catch((err) => console.log('caught:', err));
} finally {
  conn.release();
  console.log('released');
}

// also hangs here
// console.log(await promisify((cb) => conn.query('select 1', cb)));

await promisify((cb) => pool.end(cb));

console.log('done');

This was noticed during test cleanup from pool.end, but it will also prevent additional queries after release is called.

The issue is that the connection gets paused, but is never resumed. This prevents the protocol sequence from ever processing Quit and the necessary callbacks from end.

Here is a yarn patch I've made for mysql to fix the problem:

Details
diff --git a/lib/protocol/sequences/Query.js b/lib/protocol/sequences/Query.js
index b7632959b6b18eff0a239456c67b2696a5b931bd..d085e63dde4d320ac34a2b39d74386f661f66e7a 100644
--- a/lib/protocol/sequences/Query.js
+++ b/lib/protocol/sequences/Query.js
@@ -190,37 +190,50 @@ Query.prototype._sendLocalDataFile = function(path) {
 };
 
 Query.prototype.stream = function(options) {
-  var self = this;
-
-  options = options || {};
-  options.objectMode = true;
-
-  var stream = new Readable(options);
-
-  stream._read = function() {
-    self._connection && self._connection.resume();
-  };
+  const self = this;
+  const stream = new Readable({
+    ...options,
+    objectMode: true,
+    destroy() {
+      // Must resume connection or else it will block
+      self._connection?.resume();
+
+      self.removeListener('result', on_result);
+      self.removeListener('error', on_error);
+      self.removeListener('end', on_end);
+      self.removeListener('fields', on_fields);
+
+      stream.removeListener('end', once_end);
+    },
+    read() {
+      self._connection?.resume();
+    },
+  });
 
-  stream.once('end', function() {
+  stream.once('end', function once_end() {
     process.nextTick(function () {
       stream.emit('close');
     });
   });
 
-  this.on('result', function(row, i) {
+  this.on('result', function on_result(row, i) {
+    if (stream.destroyed) return;
     if (!stream.push(row)) self._connection.pause();
     stream.emit('result', row, i);  // replicate old emitter
   });
 
-  this.on('error', function(err) {
+  this.on('error', function on_error(err) {
+    if (stream.destroyed) return;
     stream.emit('error', err);  // Pass on any errors
   });
 
-  this.on('end', function() {
+  this.on('end', function on_end() {
+    if (stream.destroyed) return;
     stream.push(null);  // pushing null, indicating EOF
   });
 
-  this.on('fields', function(fields, i) {
+  this.on('fields', function on_fields(fields, i) {
+    if (stream.destroyed) return;
     stream.emit('fields', fields, i);  // replicate old emitter
   });

However, this patch may not be sufficient to fix the problem in all situations. I think resume should be called in release and end functions.

This problem is also persistent in mysql2.

Metadata

Metadata

Assignees

No one assigned

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions