@@ -949,24 +949,29 @@ function sliceCopyFile(params, callback) {
949
949
var Region = params . Region ;
950
950
var Key = params . Key ;
951
951
var CopySource = params . CopySource ;
952
- var m = CopySource . match ( / ^ ( [ ^ . ] + - \d + ) \. c o s ( v 6 ) ? \. ( [ ^ . ] + ) \. [ ^ / ] + \/ ( . + ) $ / ) ;
952
+ var m = util . getSourceParams . call ( this , CopySource ) ;
953
953
if ( ! m ) {
954
954
callback ( util . error ( new Error ( 'CopySource format error' ) ) ) ;
955
955
return ;
956
956
}
957
957
958
- var SourceBucket = m [ 1 ] ;
959
- var SourceRegion = m [ 3 ] ;
960
- var SourceKey = decodeURIComponent ( m [ 4 ] ) ;
958
+ var SourceBucket = m . Bucket ;
959
+ var SourceRegion = m . Region ;
960
+ var SourceKey = decodeURIComponent ( m . Key ) ;
961
961
var CopySliceSize = params . CopySliceSize === undefined ? self . options . CopySliceSize : params . CopySliceSize ;
962
962
CopySliceSize = Math . max ( 0 , CopySliceSize ) ;
963
963
964
964
var ChunkSize = params . CopyChunkSize || this . options . CopyChunkSize ;
965
965
var ChunkParallel = this . options . CopyChunkParallelLimit ;
966
+ var ChunkRetryTimes = this . options . ChunkRetryTimes + 1 ;
966
967
968
+ var ChunkCount = 0 ;
967
969
var FinishSize = 0 ;
968
970
var FileSize ;
969
971
var onProgress ;
972
+ var SourceResHeaders = { } ;
973
+ var SourceHeaders = { } ;
974
+ var TargetHeader = { } ;
970
975
971
976
// 分片复制完成,开始 multipartComplete 操作
972
977
ep . on ( 'copy_slice_complete' , function ( UploadData ) {
@@ -980,54 +985,133 @@ function sliceCopyFile(params, callback) {
980
985
ETag : item . ETag ,
981
986
} ;
982
987
} ) ;
983
- self . multipartComplete ( {
984
- Bucket : Bucket ,
985
- Region : Region ,
986
- Key : Key ,
987
- UploadId : UploadData . UploadId ,
988
- Parts : Parts ,
989
- } , function ( err , data ) {
990
- if ( err ) {
991
- onProgress ( null , true ) ;
992
- return callback ( err ) ;
993
- }
994
- onProgress ( { loaded : FileSize , total : FileSize } , true ) ;
995
- callback ( null , data ) ;
996
- } ) ;
988
+ // 完成上传的请求也做重试
989
+ Async . retry ( ChunkRetryTimes , function ( tryCallback ) {
990
+ self . multipartComplete ( {
991
+ Bucket : Bucket ,
992
+ Region : Region ,
993
+ Key : Key ,
994
+ UploadId : UploadData . UploadId ,
995
+ Parts : Parts ,
996
+ } , tryCallback ) ;
997
+ } , function ( err , data ) {
998
+ session . removeUsing ( UploadData . UploadId ) ; // 标记 UploadId 没被使用了,因为复制没提供重试,所以只要出错,就是 UploadId 停用了。
999
+ if ( err ) {
1000
+ onProgress ( null , true ) ;
1001
+ return callback ( err ) ;
1002
+ }
1003
+ session . removeUploadId . call ( self , UploadData . UploadId ) ;
1004
+ onProgress ( { loaded : FileSize , total : FileSize } , true ) ;
1005
+ callback ( null , data ) ;
1006
+ } ) ;
997
1007
} ) ;
998
1008
999
1009
ep . on ( 'get_copy_data_finish' , function ( UploadData ) {
1000
- Async . eachLimit ( UploadData . PartList , ChunkParallel , function ( SliceItem , asyncCallback ) {
1010
+ // 处理 UploadId 缓存
1011
+ var uuid = session . getCopyFileId ( CopySource , SourceResHeaders , ChunkSize , Bucket , Key ) ;
1012
+ uuid && session . saveUploadId . call ( self , uuid , UploadData . UploadId , self . options . UploadIdCacheLimit ) ; // 缓存 UploadId
1013
+ session . setUsing ( UploadData . UploadId ) ; // 标记 UploadId 为正在使用
1014
+
1015
+ var needCopySlices = util . filter ( UploadData . PartList , function ( SliceItem ) {
1016
+ if ( SliceItem [ 'Uploaded' ] ) {
1017
+ FinishSize += SliceItem [ 'PartNumber' ] >= ChunkCount ? ( FileSize % ChunkSize || ChunkSize ) : ChunkSize ;
1018
+ }
1019
+ return ! SliceItem [ 'Uploaded' ] ;
1020
+ } ) ;
1021
+ Async . eachLimit ( needCopySlices , ChunkParallel , function ( SliceItem , asyncCallback ) {
1001
1022
var PartNumber = SliceItem . PartNumber ;
1002
1023
var CopySourceRange = SliceItem . CopySourceRange ;
1003
1024
var currentSize = SliceItem . end - SliceItem . start ;
1004
-
1005
- copySliceItem . call ( self , {
1006
- Bucket : Bucket ,
1007
- Region : Region ,
1008
- Key : Key ,
1009
- CopySource : CopySource ,
1010
- UploadId : UploadData . UploadId ,
1011
- PartNumber : PartNumber ,
1012
- CopySourceRange : CopySourceRange ,
1013
- } , function ( err , data ) {
1014
- if ( err ) return asyncCallback ( err ) ;
1015
- FinishSize += currentSize ;
1016
- onProgress ( { loaded : FinishSize , total : FileSize } ) ;
1017
- SliceItem . ETag = data . ETag ;
1018
- asyncCallback ( err || null , data ) ;
1019
- } ) ;
1025
+ Async . retry ( ChunkRetryTimes , function ( tryCallback ) {
1026
+ copySliceItem . call ( self , {
1027
+ Bucket : Bucket ,
1028
+ Region : Region ,
1029
+ Key : Key ,
1030
+ CopySource : CopySource ,
1031
+ UploadId : UploadData . UploadId ,
1032
+ PartNumber : PartNumber ,
1033
+ CopySourceRange : CopySourceRange ,
1034
+ } , tryCallback ) ;
1035
+ } , function ( err , data ) {
1036
+ if ( err ) return asyncCallback ( err ) ;
1037
+ FinishSize += currentSize ;
1038
+ onProgress ( { loaded : FinishSize , total : FileSize } ) ;
1039
+ SliceItem . ETag = data . ETag ;
1040
+ asyncCallback ( err || null , data ) ;
1041
+ } ) ;
1020
1042
} , function ( err ) {
1021
1043
if ( err ) {
1044
+ session . removeUsing ( UploadData . UploadId ) ; // 标记 UploadId 没被使用了,因为复制没提供重试,所以只要出错,就是 UploadId 停用了。
1022
1045
onProgress ( null , true ) ;
1023
1046
return callback ( err ) ;
1024
1047
}
1025
-
1026
1048
ep . emit ( 'copy_slice_complete' , UploadData ) ;
1027
1049
} ) ;
1028
1050
} ) ;
1029
1051
1030
- ep . on ( 'get_file_size_finish' , function ( SourceHeaders ) {
1052
+ ep . on ( 'get_chunk_size_finish' , function ( ) {
1053
+ var createNewUploadId = function ( ) {
1054
+ self . multipartInit ( {
1055
+ Bucket : Bucket ,
1056
+ Region : Region ,
1057
+ Key : Key ,
1058
+ Headers : TargetHeader ,
1059
+ } , function ( err , data ) {
1060
+ if ( err ) return callback ( err ) ;
1061
+ params . UploadId = data . UploadId ;
1062
+ ep . emit ( 'get_copy_data_finish' , { UploadId : params . UploadId , PartList : params . PartList } ) ;
1063
+ } ) ;
1064
+ } ;
1065
+
1066
+ // 在本地找可用的 UploadId
1067
+ var uuid = session . getCopyFileId ( CopySource , SourceResHeaders , ChunkSize , Bucket , Key ) ;
1068
+ var LocalUploadIdList = session . getUploadIdList . call ( self , uuid ) ;
1069
+ if ( ! uuid || ! LocalUploadIdList ) return createNewUploadId ( ) ;
1070
+
1071
+ var next = function ( index ) {
1072
+ // 如果本地找不到可用 UploadId,再一个个遍历校验远端
1073
+ if ( index >= LocalUploadIdList . length ) return createNewUploadId ( ) ;
1074
+ var UploadId = LocalUploadIdList [ index ] ;
1075
+ // 如果正在被使用,跳过
1076
+ if ( session . using [ UploadId ] ) return next ( index + 1 ) ;
1077
+ // 判断 UploadId 是否存在线上
1078
+ wholeMultipartListPart . call ( self , {
1079
+ Bucket : Bucket ,
1080
+ Region : Region ,
1081
+ Key : Key ,
1082
+ UploadId : UploadId ,
1083
+ } , function ( err , PartListData ) {
1084
+ if ( err ) {
1085
+ // 如果 UploadId 获取会出错,跳过并删除
1086
+ session . removeUploadId . call ( self , UploadId ) ;
1087
+ next ( index + 1 ) ;
1088
+ } else {
1089
+ // 如果异步回来 UploadId 已经被用了,也跳过
1090
+ if ( session . using [ UploadId ] ) return next ( index + 1 ) ;
1091
+ // 找到可用 UploadId
1092
+ var finishETagMap = { } ;
1093
+ var offset = 0 ;
1094
+ util . each ( PartListData . PartList , function ( PartItem ) {
1095
+ var size = parseInt ( PartItem . Size ) ;
1096
+ var end = offset + size - 1 ;
1097
+ finishETagMap [ PartItem . PartNumber + '|' + offset + '|' + end ] = PartItem . ETag ;
1098
+ offset += size ;
1099
+ } ) ;
1100
+ util . each ( params . PartList , function ( PartItem ) {
1101
+ var ETag = finishETagMap [ PartItem . PartNumber + '|' + PartItem . start + '|' + PartItem . end ] ;
1102
+ if ( ETag ) {
1103
+ PartItem . ETag = ETag ;
1104
+ PartItem . Uploaded = true ;
1105
+ }
1106
+ } ) ;
1107
+ ep . emit ( 'get_copy_data_finish' , { UploadId : UploadId , PartList : params . PartList } ) ;
1108
+ }
1109
+ } ) ;
1110
+ } ;
1111
+ next ( 0 ) ;
1112
+ } ) ;
1113
+
1114
+ ep . on ( 'get_file_size_finish' , function ( ) {
1031
1115
// 控制分片大小
1032
1116
( function ( ) {
1033
1117
var SIZE = [ 1 , 2 , 4 , 8 , 16 , 32 , 64 , 128 , 256 , 512 , 1024 , 1024 * 2 , 1024 * 4 , 1024 * 5 ] ;
@@ -1037,8 +1121,7 @@ function sliceCopyFile(params, callback) {
1037
1121
if ( FileSize / AutoChunkSize <= self . options . MaxPartNumber ) break ;
1038
1122
}
1039
1123
params . ChunkSize = ChunkSize = Math . max ( ChunkSize , AutoChunkSize ) ;
1040
-
1041
- var ChunkCount = Math . ceil ( FileSize / ChunkSize ) ;
1124
+ ChunkCount = Math . ceil ( FileSize / ChunkSize ) ;
1042
1125
1043
1126
var list = [ ] ;
1044
1127
for ( var partNumber = 1 ; partNumber <= ChunkCount ; partNumber ++ ) {
@@ -1083,16 +1166,7 @@ function sliceCopyFile(params, callback) {
1083
1166
delete TargetHeader [ 'x-cos-copy-source-If-Unmodified-Since' ] ;
1084
1167
delete TargetHeader [ 'x-cos-copy-source-If-Match' ] ;
1085
1168
delete TargetHeader [ 'x-cos-copy-source-If-None-Match' ] ;
1086
- self . multipartInit ( {
1087
- Bucket : Bucket ,
1088
- Region : Region ,
1089
- Key : Key ,
1090
- Headers : TargetHeader ,
1091
- } , function ( err , data ) {
1092
- if ( err ) return callback ( err ) ;
1093
- params . UploadId = data . UploadId ;
1094
- ep . emit ( 'get_copy_data_finish' , params ) ;
1095
- } ) ;
1169
+ ep . emit ( 'get_chunk_size_finish' ) ;
1096
1170
} ) ;
1097
1171
1098
1172
// 获取远端复制源文件的大小
@@ -1133,7 +1207,8 @@ function sliceCopyFile(params, callback) {
1133
1207
} ) ;
1134
1208
} else {
1135
1209
var resHeaders = data . headers ;
1136
- var SourceHeaders = {
1210
+ SourceResHeaders = resHeaders ;
1211
+ SourceHeaders = {
1137
1212
'Cache-Control' : resHeaders [ 'cache-control' ] ,
1138
1213
'Content-Disposition' : resHeaders [ 'content-disposition' ] ,
1139
1214
'Content-Encoding' : resHeaders [ 'content-encoding' ] ,
@@ -1147,7 +1222,7 @@ function sliceCopyFile(params, callback) {
1147
1222
SourceHeaders [ k ] = v ;
1148
1223
}
1149
1224
} ) ;
1150
- ep . emit ( 'get_file_size_finish' , SourceHeaders ) ;
1225
+ ep . emit ( 'get_file_size_finish' ) ;
1151
1226
}
1152
1227
} ) ;
1153
1228
}
0 commit comments