@@ -8,36 +8,45 @@ import repositories from './repositories';
8
8
const app = Consumer . create ( {
9
9
queueUrl : config . services . queue . url ,
10
10
handleMessage : async ( message ) => {
11
- logger . debug ( 'Received message from queue!' , { ...message } ) ;
11
+ const namespace : any = services . Trace . getNamespace ( ) ;
12
+ const segment : any = services . Trace . createSegment ( 'upload-service-worker' ) ;
12
13
13
- const body = JSON . parse ( message . Body ) ;
14
+ await namespace . runPromise ( async ( ) => {
15
+ services . Trace . setSegment ( segment ) ;
14
16
15
- if ( 'traceId' in body ) {
16
- namespace . set ( 'traceId' , body . traceId ) ;
17
- } else {
18
- logger . warn ( 'Trace ID is not present in message body!' ) ;
19
- }
17
+ logger . debug ( 'Received message from queue!' , { ...message } ) ;
20
18
21
- if ( 'fileId' in body ) {
22
- try {
23
- const file = await repositories . File . get ( body . fileId , null ) ;
24
- const values = await services . Job . process ( body . mimetype , file ) as Record < string , string | number | boolean > ;
25
- await repositories . File . update ( { ...values , cacheable : ! body . mimetype . startsWith ( 'video' ) } , { id : file . id } ) ;
26
- } catch ( err ) {
27
- logger . error ( 'Could not process file!' , { error : err . message , stack : err . stack } ) ;
28
- }
29
- }
19
+ const body = JSON . parse ( message . Body ) ;
30
20
31
- if ( 'detail ' in body && 'jobId' in body . detail ) {
32
- if ( body . detail . status === 'COMPLETE' ) {
33
- const updatedFile = await video . AWSVideo . complete ( body ) as Record < string , string | number | boolean > ;
34
- await repositories . File . update ( { ... updatedFile , cacheable : true } , { id : updatedFile [ 'id' ] as string } ) ;
21
+ if ( 'traceId ' in body ) {
22
+ namespace . set ( 'traceId' , body . traceId ) ;
23
+ } else {
24
+ logger . warn ( 'Trace ID is not present in message body!' ) ;
35
25
}
36
-
37
- if ( body . detail . status === 'ERROR' ) {
38
- logger . error ( 'Media convert job ended with error!' , body ) ;
26
+
27
+ if ( 'fileId' in body ) {
28
+ try {
29
+ const file = await repositories . File . get ( body . fileId , null ) ;
30
+ const values = await services . Job . process ( body . mimetype , file ) as Record < string , string | number | boolean > ;
31
+ await repositories . File . update ( { ...values , cacheable : ! body . mimetype . startsWith ( 'video' ) } , { id : file . id } ) ;
32
+ } catch ( err ) {
33
+ logger . error ( 'Could not process file!' , { error : err . message , stack : err . stack } ) ;
34
+ }
35
+ }
36
+
37
+ if ( 'detail' in body && 'jobId' in body . detail ) {
38
+ if ( body . detail . status === 'COMPLETE' ) {
39
+ const updatedFile = await video . AWSVideo . complete ( body ) as Record < string , string | number | boolean > ;
40
+ await repositories . File . update ( { ...updatedFile , cacheable : true } , { id : updatedFile [ 'id' ] as string } ) ;
41
+ }
42
+
43
+ if ( body . detail . status === 'ERROR' ) {
44
+ logger . error ( 'Media convert job ended with error!' , body ) ;
45
+ }
39
46
}
40
- }
47
+
48
+ segment . close ( ) ;
49
+ } ) ;
41
50
} ,
42
51
sqs : services . AWS . sqs ,
43
52
} ) ;
@@ -52,21 +61,14 @@ app.on('error', (err) => {
52
61
53
62
app . on ( 'processing_error' , ( err ) => {
54
63
logger . error ( 'Error while processing message from queue!' , { message : err . message , stack : err . stack } ) ;
64
+
65
+ services . Trace . closeSegment ( ) ;
55
66
} ) ;
56
67
57
68
app . on ( 'timeout_error' , ( err ) => {
58
69
logger . error ( 'Timeout error!' , { message : err . message , stack : err . stack } ) ;
59
70
} ) ;
60
71
61
- const namespace : any = services . Trace . getNamespace ( ) ;
62
- const segment : any = services . Trace . createSegment ( 'upload-service-worker' ) ;
72
+ logger . info ( 'Upload worker is running' ) ;
63
73
64
- namespace . run ( ( ) => {
65
- services . Trace . setSegment ( segment ) ;
66
-
67
- logger . info ( 'Upload worker is running' ) ;
68
-
69
- app . start ( ) ;
70
-
71
- segment . close ( ) ;
72
- } ) ;
74
+ app . start ( ) ;
0 commit comments