Skip to content

Commit 34d2f6c

Browse files
authored
Fix missing context issue (#1)
* Obtain X-Ray daemon address before app and worker init * Refactor tracing initialization
1 parent 19af9a3 commit 34d2f6c

File tree

4 files changed

+96
-86
lines changed

4 files changed

+96
-86
lines changed

src/index.ts

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,9 @@
11
import http from 'http';
22
import { createTerminus } from '@godaddy/terminus';
3-
import app from './app';
43
import config from './config';
54
import logger from './logger';
65
import services from './services';
76

8-
const server = http.createServer(app);
9-
107
async function onSignal(): Promise<void> {
118
logger.warn('Server is going to shut down! Starting cleanup...');
129
}
@@ -19,14 +16,6 @@ async function onHealthCheck(): Promise<void> {
1916
return;
2017
}
2118

22-
createTerminus(server, {
23-
healthChecks: {
24-
'/health/liveness': onHealthCheck,
25-
},
26-
onSignal,
27-
onShutdown,
28-
});
29-
3019
async function start(): Promise<void> {
3120
try {
3221
const namespace = config.services.trace.daemonAddressNamespace;
@@ -35,9 +24,20 @@ async function start(): Promise<void> {
3524
const address = await services.ServiceDiscovery.discoverInstance(namespace, name);
3625

3726
(services.Trace as any).setDaemonAddress(address);
38-
(services.Trace as any).captureHTTPRequests();
3927
}
4028

29+
const app = (await import('./app')).default;
30+
31+
const server = http.createServer(app);
32+
33+
createTerminus(server, {
34+
healthChecks: {
35+
'/health/liveness': onHealthCheck,
36+
},
37+
onSignal,
38+
onShutdown,
39+
});
40+
4141
server.listen(config.port, () => {
4242
logger.info(`Server started at http://localhost:${ config.port }`);
4343
});

src/services/trace/AWSXRay.ts

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,9 @@ import https from 'https';
55
class AWSXRay implements Services.Trace {
66
constructor (plugins: string) {
77
this.setPlugins(plugins);
8+
9+
XRay.captureHTTPsGlobal(http, true);
10+
XRay.captureHTTPsGlobal(https, true);
811
}
912

1013
public openSegment(defaultName: string) {
@@ -60,11 +63,6 @@ class AWSXRay implements Services.Trace {
6063

6164
XRay.config(xrayPlugins);
6265
}
63-
64-
public captureHTTPRequests(): void {
65-
XRay.captureHTTPsGlobal(http, true);
66-
XRay.captureHTTPsGlobal(https, true);
67-
}
6866
}
6967

7068
export default AWSXRay;

src/services/worker/Consumer.ts

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
import { Consumer } from 'sqs-consumer';
2+
import services from '../../services';
3+
import config from '../../config';
4+
import logger from '../../logger';
5+
import video from '../../services/job/video';
6+
import repositories from '../../repositories';
7+
8+
function createConsumer() {
9+
const consumer = Consumer.create({
10+
queueUrl: config.services.queue.url,
11+
handleMessage: async (message) => {
12+
const namespace: any = services.Trace.getNamespace();
13+
const segment: any = services.Trace.createSegment('upload-service-worker');
14+
15+
await namespace.runPromise(async () => {
16+
services.Trace.setSegment(segment);
17+
18+
logger.debug('Received message from queue!', { ...message });
19+
20+
const body = JSON.parse(message.Body);
21+
22+
if ('traceId' in body) {
23+
namespace.set('traceId', body.traceId);
24+
} else {
25+
logger.warn('Trace ID is not present in message body!');
26+
}
27+
28+
if ('fileId' in body) {
29+
try {
30+
const file = await repositories.File.get(body.fileId, null);
31+
const values = await services.Job.process(body.mimetype, file) as Record<string, string | number | boolean>;
32+
await repositories.File.update({ ...values, cacheable: !body.mimetype.startsWith('video') }, { id: file.id });
33+
} catch (err) {
34+
logger.error('Could not process file!', { error: err.message, stack: err.stack });
35+
}
36+
}
37+
38+
if ('detail' in body && 'jobId' in body.detail) {
39+
if (body.detail.status === 'COMPLETE') {
40+
const updatedFile = await video.AWSVideo.complete(body) as Record<string, string | number | boolean>;
41+
await repositories.File.update({ ...updatedFile, cacheable: true }, { id: updatedFile['id'] as string });
42+
}
43+
44+
if (body.detail.status === 'ERROR') {
45+
logger.error('Media convert job ended with error!', body);
46+
}
47+
}
48+
49+
segment.close();
50+
});
51+
},
52+
sqs: services.AWS.sqs,
53+
});
54+
55+
consumer.on('message_processed', (message) => {
56+
logger.debug('Message successfully processed and removed from the queue!', { message });
57+
});
58+
59+
consumer.on('error', (err) => {
60+
logger.error('Unknown error occured!', { message: err.message, stack: err.stack });
61+
});
62+
63+
consumer.on('processing_error', (err) => {
64+
logger.error('Error while processing message from queue!', { message: err.message, stack: err.stack });
65+
66+
services.Trace.closeSegment();
67+
});
68+
69+
consumer.on('timeout_error', (err) => {
70+
logger.error('Timeout error!', { message: err.message, stack: err.stack });
71+
});
72+
73+
return consumer;
74+
}
75+
76+
export default {
77+
get: createConsumer,
78+
}

src/worker.ts

Lines changed: 3 additions & 69 deletions
Original file line numberDiff line numberDiff line change
@@ -1,73 +1,7 @@
1-
import { Consumer } from 'sqs-consumer';
21
import services from './services';
32
import config from './config';
43
import logger from './logger';
5-
import video from './services/job/video';
6-
import repositories from './repositories';
7-
8-
const app = Consumer.create({
9-
queueUrl: config.services.queue.url,
10-
handleMessage: async (message) => {
11-
const namespace: any = services.Trace.getNamespace();
12-
const segment: any = services.Trace.createSegment('upload-service-worker');
13-
14-
await namespace.runPromise(async () => {
15-
services.Trace.setSegment(segment);
16-
17-
logger.debug('Received message from queue!', { ...message });
18-
19-
const body = JSON.parse(message.Body);
20-
21-
if ('traceId' in body) {
22-
namespace.set('traceId', body.traceId);
23-
} else {
24-
logger.warn('Trace ID is not present in message body!');
25-
}
26-
27-
if ('fileId' in body) {
28-
try {
29-
const file = await repositories.File.get(body.fileId, null);
30-
const values = await services.Job.process(body.mimetype, file) as Record<string, string | number | boolean>;
31-
await repositories.File.update({ ...values, cacheable: !body.mimetype.startsWith('video') }, { id: file.id });
32-
} catch (err) {
33-
logger.error('Could not process file!', { error: err.message, stack: err.stack });
34-
}
35-
}
36-
37-
if ('detail' in body && 'jobId' in body.detail) {
38-
if (body.detail.status === 'COMPLETE') {
39-
const updatedFile = await video.AWSVideo.complete(body) as Record<string, string | number | boolean>;
40-
await repositories.File.update({ ...updatedFile, cacheable: true }, { id: updatedFile['id'] as string });
41-
}
42-
43-
if (body.detail.status === 'ERROR') {
44-
logger.error('Media convert job ended with error!', body);
45-
}
46-
}
47-
48-
segment.close();
49-
});
50-
},
51-
sqs: services.AWS.sqs,
52-
});
53-
54-
app.on('message_processed', (message) => {
55-
logger.debug('Message successfully processed and removed from the queue!', { message });
56-
});
57-
58-
app.on('error', (err) => {
59-
logger.error('Unknown error occured!', { message: err.message, stack: err.stack });
60-
});
61-
62-
app.on('processing_error', (err) => {
63-
logger.error('Error while processing message from queue!', { message: err.message, stack: err.stack });
64-
65-
services.Trace.closeSegment();
66-
});
67-
68-
app.on('timeout_error', (err) => {
69-
logger.error('Timeout error!', { message: err.message, stack: err.stack });
70-
});
4+
import Consumer from './services/worker/Consumer';
715

726
async function start() {
737
const namespace = config.services.trace.daemonAddressNamespace;
@@ -76,12 +10,12 @@ async function start() {
7610
const address = await services.ServiceDiscovery.discoverInstance(namespace, name);
7711

7812
(services.Trace as any).setDaemonAddress(address);
79-
(services.Trace as any).captureHTTPRequests();
8013
}
8114

8215
logger.info('Upload worker is running');
8316

84-
app.start();
17+
const consumer = Consumer.get();
18+
consumer.start();
8519
}
8620

8721
start();

0 commit comments

Comments
 (0)