Skip to content

Commit b8922d4

Browse files
committed
getObject、selectObjectContent 支持 stream
1 parent 1a40126 commit b8922d4

File tree

1 file changed

+238
-0
lines changed

1 file changed

+238
-0
lines changed

sdk/select-stream.js

Lines changed: 238 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,238 @@
1+
var { Transform } = require('stream');
2+
var sysUtil = require('util');
3+
var util = require('./util');
4+
5+
function SelectStream(options) {
6+
if (!(this instanceof SelectStream)) return new SelectStream(options);
7+
Transform.call(this, options);
8+
Object.assign(this, {
9+
totalLength: 0, // current message block's total length
10+
headerLength: 0, // current message block's header length
11+
payloadRestLength: 0, // current message block's rest payload length
12+
header: null, // current message block's header
13+
chunk: Buffer.alloc(0), // the data chunk being parsed
14+
callback: null, // current _transform function's callback
15+
});
16+
}
17+
SelectStream.prototype = {
18+
/**
19+
* process data chunk
20+
* concat the last chunk and current chunk
21+
* try to parse current message block's totalLength and headerLength
22+
* try to parse current message block's header
23+
* try to parse current message block's payload
24+
*/
25+
processChunk(chunk, encoding, callback) {
26+
Object.assign(this, {
27+
chunk: Buffer.concat(
28+
[this.chunk, chunk],
29+
this.chunk.length + chunk.length,
30+
),
31+
encoding,
32+
callback,
33+
});
34+
35+
this.parseLength();
36+
this.parseHeader();
37+
this.parsePayload();
38+
},
39+
40+
/**
41+
* try to parse current message block's totalLength and headerLength
42+
*/
43+
parseLength() {
44+
if (!this.callback) {
45+
return;
46+
}
47+
48+
if (this.totalLength && this.headerLength) {
49+
return;
50+
}
51+
52+
if (this.chunk.length >= 12) {
53+
this.totalLength = this.chunk.readInt32BE(0);
54+
this.headerLength = this.chunk.readInt32BE(4);
55+
this.payloadRestLength = this.totalLength - this.headerLength - 16;
56+
this.chunk = this.chunk.slice(12);
57+
} else {
58+
this.callback();
59+
this.callback = null;
60+
}
61+
},
62+
63+
/**
64+
* try to parse current message block's header
65+
* if header[':message-type'] is error, callback the error, emit error to next stream
66+
*/
67+
parseHeader() {
68+
if (!this.callback) {
69+
return;
70+
}
71+
72+
if (!this.headerLength || this.header) {
73+
return;
74+
}
75+
76+
if (this.chunk.length >= this.headerLength) {
77+
var header = {};
78+
var offset = 0;
79+
while (offset < this.headerLength) {
80+
var headerNameLength = this.chunk[offset] * 1;
81+
var headerName = this.chunk.toString(
82+
'ascii',
83+
offset + 1,
84+
offset + 1 + headerNameLength,
85+
);
86+
var headerValueLength = this.chunk.readInt16BE(offset + headerNameLength + 2);
87+
var headerValue = this.chunk.toString(
88+
'ascii',
89+
offset + headerNameLength + 4,
90+
offset + headerNameLength + 4 + headerValueLength,
91+
);
92+
header[headerName] = headerValue;
93+
offset += headerNameLength + 4 + headerValueLength;
94+
}
95+
this.header = header;
96+
this.chunk = this.chunk.slice(this.headerLength);
97+
this.checkErrorHeader();
98+
} else {
99+
this.callback();
100+
this.callback = null;
101+
}
102+
},
103+
104+
/**
105+
* try to parse current message block's payload
106+
*/
107+
parsePayload() {
108+
var self = this;
109+
if (!this.callback) {
110+
return;
111+
}
112+
113+
if (this.chunk.length <= this.payloadRestLength) {
114+
this.payloadRestLength -= this.chunk.length;
115+
this.pushData(this.chunk);
116+
this.chunk = Buffer.alloc(0);
117+
} else if (this.chunk.length < this.payloadRestLength + 4) {
118+
this.pushData(this.chunk.slice(0, this.payloadRestLength));
119+
this.chunk = this.chunk.slice(this.payloadRestLength);
120+
this.payloadRestLength = 0;
121+
} else {
122+
this.pushData(this.chunk.slice(0, this.payloadRestLength));
123+
this.chunk = this.chunk.slice(this.payloadRestLength + 4);
124+
this.totalLength = 0;
125+
this.headerLength = 0;
126+
this.payloadRestLength = 0;
127+
this.header = null;
128+
}
129+
130+
if (
131+
this.chunk.length
132+
&& !(this.payloadRestLength === 0 && this.chunk.length < 4)
133+
) {
134+
process.nextTick(function () {
135+
self.processChunk(Buffer.alloc(0), self.encoding, self.callback);
136+
});
137+
} else {
138+
this.callback();
139+
this.callback = null;
140+
}
141+
},
142+
143+
/**
144+
* if header[':event-type'] is Records, pipe payload to next stream
145+
*/
146+
pushData(content) {
147+
if (this.header[':event-type'] === 'Records') {
148+
this.push(content);
149+
this.emit('message:records', content);
150+
} else if (this.header[':event-type'] === 'Progress') {
151+
var progress = util.xml2json(content.toString()).Progress;
152+
this.emit('message:progress', progress);
153+
} else if (this.header[':event-type'] === 'Stats') {
154+
var stats = util.xml2json(content.toString()).Stats;
155+
this.emit('message:stats', stats);
156+
} else if (this.header[':event-type'] === 'error') {
157+
var errCode = this.header[':error-code'];
158+
var errMessage = this.header[':error-message'];
159+
var err = new Error(errMessage);
160+
err.message = errMessage;
161+
err.name = err.code = errCode;
162+
this.emit('message:error', err);
163+
} else { // 'Continuation', 'End'
164+
this.emit('message:' + this.header[':event-type'].toLowerCase());
165+
}
166+
},
167+
168+
/**
169+
* if header[':message-type'] is error, callback the error, emit error to next stream
170+
*/
171+
checkErrorHeader() {
172+
if (this.header[':message-type'] === 'error') {
173+
this.callback(this.header);
174+
this.callback = null;
175+
}
176+
},
177+
178+
/**
179+
* Transform Stream's implementations
180+
*/
181+
_transform(chunk, encoding, callback) {
182+
this.processChunk(chunk, encoding, callback);
183+
},
184+
_flush(callback) {
185+
this.processChunk(Buffer.alloc(0), this.encoding, callback);
186+
},
187+
};
188+
sysUtil.inherits(SelectStream, Transform);
189+
190+
SelectStream.parseBody = function (chunk) {
191+
var header = {};
192+
var result = {records:[]};
193+
while (chunk.length) {
194+
var totalLength = chunk.readInt32BE(0);
195+
var headerLength = chunk.readInt32BE(4);
196+
var payloadRestLength = totalLength - headerLength - 16;
197+
var offset = 0;
198+
var content;
199+
chunk = chunk.slice(12);
200+
// 获取 Message 的 header 信息
201+
while (offset < headerLength) {
202+
var headerNameLength = chunk[offset] * 1;
203+
var headerName = chunk.toString(
204+
'ascii',
205+
offset + 1,
206+
offset + 1 + headerNameLength,
207+
);
208+
var headerValueLength = chunk.readInt16BE(offset + headerNameLength + 2);
209+
var headerValue = chunk.toString(
210+
'ascii',
211+
offset + headerNameLength + 4,
212+
offset + headerNameLength + 4 + headerValueLength,
213+
);
214+
header[headerName] = headerValue;
215+
offset += headerNameLength + 4 + headerValueLength;
216+
}
217+
if (header[':event-type'] === 'Records') {
218+
content = chunk.slice(offset, offset + payloadRestLength);
219+
result.records.push(content);
220+
} else if (header[':event-type'] === 'Stats') {
221+
content = chunk.slice(offset, offset + payloadRestLength);
222+
result.stats = util.xml2json(content.toString()).Stats;
223+
} else if (header[':event-type'] === 'error') {
224+
var errCode = header[':error-code'];
225+
var errMessage = header[':error-message'];
226+
var err = new Error(errMessage);
227+
err.message = errMessage;
228+
err.name = err.code = errCode;
229+
result.error = err;
230+
} else if (['Progress', 'Continuation', 'End'].includes(header[':event-type'])) {
231+
// do nothing
232+
}
233+
chunk = chunk.slice(offset + payloadRestLength + 4);
234+
}
235+
return result;
236+
};
237+
238+
module.exports = SelectStream;

0 commit comments

Comments
 (0)