@@ -10,11 +10,11 @@ import {
10
10
S3Client ,
11
11
} from '@aws-sdk/client-s3'
12
12
import { getSignedUrl } from '@aws-sdk/s3-request-presigner'
13
- import * as R from 'remeda'
14
13
14
+ import * as R from 'remeda'
15
15
import { z } from 'zod'
16
16
import { parseEnv , StorageDriver } from '~/lib/storage/defineStorageDriver'
17
- import { createTempDir } from '~/lib/utils'
17
+ import { createTempDir , streamToBuffer } from '~/lib/utils'
18
18
19
19
export class S3StorageDriver extends StorageDriver {
20
20
s3
@@ -95,20 +95,15 @@ export class S3StorageDriver extends StorageDriver {
95
95
)
96
96
}
97
97
98
- async uploadPart ( opts : {
99
- objectName : string
100
- uploadId : string
101
- partNumber : number
102
- data : ReadableStream
103
- } ) {
98
+ async uploadPart ( opts : { uploadId : string ; partNumber : number ; data : ReadableStream } ) {
104
99
await this . s3 . send (
105
100
new PutObjectCommand ( {
106
101
Bucket : this . bucket ,
107
102
Key : this . getUploadPartObjectName ( {
108
103
uploadId : opts . uploadId ,
109
104
partNumber : opts . partNumber ,
110
105
} ) ,
111
- Body : opts . data ,
106
+ Body : await streamToBuffer ( opts . data ) ,
112
107
} ) ,
113
108
)
114
109
}
@@ -121,41 +116,40 @@ export class S3StorageDriver extends StorageDriver {
121
116
const tempDir = await createTempDir ( )
122
117
const outputTempFilePath = path . join ( tempDir , 'output' )
123
118
119
+ await fs . writeFile ( outputTempFilePath , '' )
124
120
const outputTempFile = await fs . open ( outputTempFilePath , 'r+' )
125
121
126
122
let currentChunk = 0
127
123
for ( const partNumber of opts . partNumbers ) {
128
124
const part = await this . s3 . send (
129
125
new GetObjectCommand ( {
130
126
Bucket : this . bucket ,
131
- Key : this . addUploadFolderPrefix ( {
127
+ Key : this . getUploadPartObjectName ( {
128
+ partNumber,
132
129
uploadId : opts . uploadId ,
133
- objectName : this . getUploadPartObjectName ( {
134
- partNumber,
135
- uploadId : opts . uploadId ,
136
- } ) ,
137
130
} ) ,
138
131
} ) ,
139
132
)
140
133
141
134
if ( ! part . Body ) throw new Error ( `Part ${ partNumber } is missing` )
142
135
143
- const partStream = part . Body as ReadableStream
144
-
136
+ const partStream = part . Body . transformToWebStream ( )
145
137
const bufferWriteStream = new WritableStream < Buffer > ( {
146
138
async write ( chunk ) {
139
+ const start = currentChunk
147
140
currentChunk += chunk . length
148
- await outputTempFile . write ( chunk , 0 , chunk . length , currentChunk )
141
+ await outputTempFile . write ( chunk , 0 , chunk . length , start )
149
142
} ,
150
143
} )
151
144
await partStream . pipeTo ( bufferWriteStream )
152
145
}
153
146
147
+ const readStream = createReadStream ( outputTempFilePath )
154
148
await this . s3 . send (
155
149
new PutObjectCommand ( {
156
150
Bucket : this . bucket ,
157
151
Key : this . addBaseFolderPrefix ( opts . finalOutputObjectName ) ,
158
- Body : await createReadStream ( outputTempFilePath ) ,
152
+ Body : readStream ,
159
153
} ) ,
160
154
)
161
155
0 commit comments