Skip to content

Commit 976335e

Browse files
committed
Obtain X-Ray daemon address before app and worker init
1 parent 19af9a3 commit 976335e

File tree

4 files changed

+86
-68
lines changed

4 files changed

+86
-68
lines changed

src/index.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ createTerminus(server, {
2929

3030
async function start(): Promise<void> {
3131
try {
32+
console.log('Main.start();');
3233
const namespace = config.services.trace.daemonAddressNamespace;
3334
const name = config.services.trace.daemonAddressName;
3435
if (typeof namespace === 'string' && typeof name === 'string') {
@@ -47,4 +48,5 @@ async function start(): Promise<void> {
4748
}
4849
}
4950

51+
console.log('What the kurwa?!');
5052
start();

src/services/trace/index.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@ import config from '../../config';
44

55
const container = {
66
get AWSXRay() {
7+
console.log('Trace.AWSXRay()');
8+
79
if (typeof this._awsXRay === 'undefined') {
810
this._awsXRay = new AWSXRay(config.services.trace.plugins);
911
}
@@ -12,6 +14,7 @@ const container = {
1214
},
1315

1416
get CLSHooked() {
17+
console.log('Trace.CLSHooked()');
1518
if (typeof this._clsHooked === 'undefined') {
1619
this._clsHooked = new CLSHooked();
1720
}

src/services/worker/Consumer.ts

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
import { Consumer } from 'sqs-consumer';
2+
import services from '../../services';
3+
import config from '../../config';
4+
import logger from '../../logger';
5+
import video from '../../services/job/video';
6+
import repositories from '../../repositories';
7+
8+
function createConsumer() {
9+
const consumer = Consumer.create({
10+
queueUrl: config.services.queue.url,
11+
handleMessage: async (message) => {
12+
const namespace: any = services.Trace.getNamespace();
13+
const segment: any = services.Trace.createSegment('upload-service-worker');
14+
15+
await namespace.runPromise(async () => {
16+
services.Trace.setSegment(segment);
17+
18+
logger.debug('Received message from queue!', { ...message });
19+
20+
const body = JSON.parse(message.Body);
21+
22+
if ('traceId' in body) {
23+
namespace.set('traceId', body.traceId);
24+
} else {
25+
logger.warn('Trace ID is not present in message body!');
26+
}
27+
28+
if ('fileId' in body) {
29+
try {
30+
const file = await repositories.File.get(body.fileId, null);
31+
const values = await services.Job.process(body.mimetype, file) as Record<string, string | number | boolean>;
32+
await repositories.File.update({ ...values, cacheable: !body.mimetype.startsWith('video') }, { id: file.id });
33+
} catch (err) {
34+
logger.error('Could not process file!', { error: err.message, stack: err.stack });
35+
}
36+
}
37+
38+
if ('detail' in body && 'jobId' in body.detail) {
39+
if (body.detail.status === 'COMPLETE') {
40+
const updatedFile = await video.AWSVideo.complete(body) as Record<string, string | number | boolean>;
41+
await repositories.File.update({ ...updatedFile, cacheable: true }, { id: updatedFile['id'] as string });
42+
}
43+
44+
if (body.detail.status === 'ERROR') {
45+
logger.error('Media convert job ended with error!', body);
46+
}
47+
}
48+
49+
segment.close();
50+
});
51+
},
52+
sqs: services.AWS.sqs,
53+
});
54+
55+
consumer.on('message_processed', (message) => {
56+
logger.debug('Message successfully processed and removed from the queue!', { message });
57+
});
58+
59+
consumer.on('error', (err) => {
60+
logger.error('Unknown error occured!', { message: err.message, stack: err.stack });
61+
});
62+
63+
consumer.on('processing_error', (err) => {
64+
logger.error('Error while processing message from queue!', { message: err.message, stack: err.stack });
65+
66+
services.Trace.closeSegment();
67+
});
68+
69+
consumer.on('timeout_error', (err) => {
70+
logger.error('Timeout error!', { message: err.message, stack: err.stack });
71+
});
72+
73+
return consumer;
74+
}
75+
76+
export default {
77+
get: createConsumer,
78+
}

src/worker.ts

Lines changed: 3 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -1,73 +1,7 @@
1-
import { Consumer } from 'sqs-consumer';
21
import services from './services';
32
import config from './config';
43
import logger from './logger';
5-
import video from './services/job/video';
6-
import repositories from './repositories';
7-
8-
const app = Consumer.create({
9-
queueUrl: config.services.queue.url,
10-
handleMessage: async (message) => {
11-
const namespace: any = services.Trace.getNamespace();
12-
const segment: any = services.Trace.createSegment('upload-service-worker');
13-
14-
await namespace.runPromise(async () => {
15-
services.Trace.setSegment(segment);
16-
17-
logger.debug('Received message from queue!', { ...message });
18-
19-
const body = JSON.parse(message.Body);
20-
21-
if ('traceId' in body) {
22-
namespace.set('traceId', body.traceId);
23-
} else {
24-
logger.warn('Trace ID is not present in message body!');
25-
}
26-
27-
if ('fileId' in body) {
28-
try {
29-
const file = await repositories.File.get(body.fileId, null);
30-
const values = await services.Job.process(body.mimetype, file) as Record<string, string | number | boolean>;
31-
await repositories.File.update({ ...values, cacheable: !body.mimetype.startsWith('video') }, { id: file.id });
32-
} catch (err) {
33-
logger.error('Could not process file!', { error: err.message, stack: err.stack });
34-
}
35-
}
36-
37-
if ('detail' in body && 'jobId' in body.detail) {
38-
if (body.detail.status === 'COMPLETE') {
39-
const updatedFile = await video.AWSVideo.complete(body) as Record<string, string | number | boolean>;
40-
await repositories.File.update({ ...updatedFile, cacheable: true }, { id: updatedFile['id'] as string });
41-
}
42-
43-
if (body.detail.status === 'ERROR') {
44-
logger.error('Media convert job ended with error!', body);
45-
}
46-
}
47-
48-
segment.close();
49-
});
50-
},
51-
sqs: services.AWS.sqs,
52-
});
53-
54-
app.on('message_processed', (message) => {
55-
logger.debug('Message successfully processed and removed from the queue!', { message });
56-
});
57-
58-
app.on('error', (err) => {
59-
logger.error('Unknown error occured!', { message: err.message, stack: err.stack });
60-
});
61-
62-
app.on('processing_error', (err) => {
63-
logger.error('Error while processing message from queue!', { message: err.message, stack: err.stack });
64-
65-
services.Trace.closeSegment();
66-
});
67-
68-
app.on('timeout_error', (err) => {
69-
logger.error('Timeout error!', { message: err.message, stack: err.stack });
70-
});
4+
import Consumer from './services/worker/Consumer';
715

726
async function start() {
737
const namespace = config.services.trace.daemonAddressNamespace;
@@ -81,7 +15,8 @@ async function start() {
8115

8216
logger.info('Upload worker is running');
8317

84-
app.start();
18+
const consumer = Consumer.get();
19+
consumer.start();
8520
}
8621

8722
start();

0 commit comments

Comments
 (0)