Skip to content

Commit f064a49

Browse files
committed
Update worker tracing
1 parent 34d2f6c commit f064a49

File tree

7 files changed

+106
-87
lines changed

7 files changed

+106
-87
lines changed

src/services/aws.ts

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,17 @@ export default () => {
2727
}));
2828
},
2929

30+
get sqsConsumer(): SQSClient {
31+
return new SQSClient({
32+
apiVersion: '2012-11-05',
33+
region: config.services.queue.region,
34+
credentials: {
35+
accessKeyId: config.services.queue.accessKeyId,
36+
secretAccessKey: config.services.queue.secretAccessKey,
37+
}
38+
});
39+
},
40+
3041
get mc(): MediaConvertClient {
3142
return services.Trace.captureAWSv3Client(new MediaConvertClient({
3243
apiVersion: '2017-08-29',

src/services/trace/AWSXRay.ts

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,7 @@ class AWSXRay implements Services.Trace {
66
constructor (plugins: string) {
77
this.setPlugins(plugins);
88

9-
XRay.captureHTTPsGlobal(http, true);
10-
XRay.captureHTTPsGlobal(https, true);
9+
XRay.middleware.disableCentralizedSampling();
1110
}
1211

1312
public openSegment(defaultName: string) {
@@ -18,8 +17,8 @@ class AWSXRay implements Services.Trace {
1817
return XRay.express.closeSegment();
1918
}
2019

21-
public createSegment(name: string) {
22-
return new XRay.Segment(name);
20+
public createSegment(name: string, rootId?: string | null, parentId?: string | null) {
21+
return new XRay.Segment(name, rootId, parentId);
2322
}
2423

2524
public setSegment(segment: unknown): void {
@@ -29,7 +28,7 @@ class AWSXRay implements Services.Trace {
2928
public getTraceId(): string {
3029
const namespace = XRay.getNamespace();
3130

32-
if (namespace.active === null) {
31+
if (namespace.active === null || 'segment' in namespace.active === false) {
3332
return undefined;
3433
}
3534

@@ -46,6 +45,11 @@ class AWSXRay implements Services.Trace {
4645
return XRay.captureAWSv3Client(client as T & { middlewareStack: { remove: any, use: any }, config: any });
4746
}
4847

48+
public captureHTTPRequests(): void {
49+
XRay.captureHTTPsGlobal(http, true);
50+
XRay.captureHTTPsGlobal(https, true);
51+
}
52+
4953
public setDaemonAddress(address: string) {
5054
XRay.setDaemonAddress(address);
5155
}
@@ -63,6 +67,10 @@ class AWSXRay implements Services.Trace {
6367

6468
XRay.config(xrayPlugins);
6569
}
70+
71+
public processTraceData(data: string): { [key: string]: string } {
72+
return XRay.utils.processTraceData(data);
73+
}
6674
}
6775

6876
export default AWSXRay;

src/services/trace/CLSHooked.ts

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,9 +44,17 @@ class CLSHooked implements Services.Trace {
4444
return client;
4545
}
4646

47+
public captureHTTPRequests(): void {
48+
return;
49+
}
50+
4751
public getNamespace() {
4852
return cls.getNamespace(NAMESPACE) || cls.createNamespace(NAMESPACE);
4953
}
54+
55+
public processTraceData(): { [key: string]: string } {
56+
return {};
57+
}
5058
}
5159

5260
export default CLSHooked;

src/services/worker/Consumer.ts

Lines changed: 0 additions & 78 deletions
This file was deleted.

src/types/services.d.ts

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ declare namespace Services {
5656

5757
closeSegment(): unknown;
5858

59-
createSegment(name: string): unknown;
59+
createSegment(name: string, rootId?: string | null, parentId?: string | null): unknown;
6060

6161
setSegment(segment: unknown): void;
6262

@@ -65,5 +65,9 @@ declare namespace Services {
6565
getNamespace(): unknown;
6666

6767
captureAWSv3Client<T>(client: T): T;
68+
69+
captureHTTPRequests(): void;
70+
71+
processTraceData(data: string): { [key: string]: string };
6872
}
6973
}

src/worker.ts

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
import services from './services';
22
import config from './config';
33
import logger from './logger';
4-
import Consumer from './services/worker/Consumer';
54

65
async function start() {
76
const namespace = config.services.trace.daemonAddressNamespace;
@@ -14,8 +13,12 @@ async function start() {
1413

1514
logger.info('Upload worker is running');
1615

17-
const consumer = Consumer.get();
18-
consumer.start();
16+
const consumer = (await import('./worker/Consumer')).default;
17+
const ns = services.Trace.getNamespace();
18+
19+
(ns as any).run(() => {
20+
consumer.start();
21+
});
1922
}
2023

2124
start();

src/worker/Consumer.ts

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
import { Consumer } from 'sqs-consumer';
2+
import services from '../services';
3+
import config from '../config';
4+
import logger from '../logger';
5+
import video from '../services/job/video';
6+
import repositories from '../repositories';
7+
8+
const consumer = Consumer.create({
9+
queueUrl: config.services.queue.url,
10+
attributeNames: ['AWSTraceHeader', 'X-Request-ID'] as any,
11+
handleMessage: async (message) => {
12+
const traceHeader = message.Attributes.AWSTraceHeader;
13+
const traceData = services.Trace.processTraceData(traceHeader);
14+
const segment: any = services.Trace.createSegment('upload-service-worker', traceData.root, traceData.parent);
15+
services.Trace.setSegment(segment);
16+
17+
logger.debug('Received message from queue!', { ...message });
18+
19+
const body = JSON.parse(message.Body);
20+
21+
if ('fileId' in body) {
22+
try {
23+
const file = await repositories.File.get(body.fileId, null);
24+
const values = await services.Job.process(body.mimetype, file) as Record<string, string | number | boolean>;
25+
await repositories.File.update({ ...values, cacheable: !body.mimetype.startsWith('video') }, { id: file.id });
26+
} catch (err) {
27+
logger.error('Could not process file!', { error: err.message, stack: err.stack });
28+
}
29+
}
30+
31+
if ('detail' in body && 'jobId' in body.detail) {
32+
if (body.detail.status === 'COMPLETE') {
33+
const updatedFile = await video.AWSVideo.complete(body) as Record<string, string | number | boolean>;
34+
await repositories.File.update({ ...updatedFile, cacheable: true }, { id: updatedFile['id'] as string });
35+
}
36+
37+
if (body.detail.status === 'ERROR') {
38+
logger.error('Media convert job ended with error!', body);
39+
}
40+
}
41+
42+
segment && segment.close();
43+
},
44+
sqs: services.AWS.sqsConsumer,
45+
});
46+
47+
consumer.on('message_processed', (message) => {
48+
logger.debug('Message successfully processed and removed from the queue!', { message });
49+
});
50+
51+
consumer.on('error', (err) => {
52+
logger.error('Unknown error occured!', { message: err.message, stack: err.stack });
53+
});
54+
55+
consumer.on('processing_error', (err) => {
56+
logger.error('Error while processing message from queue!', { message: err.message, stack: err.stack });
57+
});
58+
59+
consumer.on('timeout_error', (err) => {
60+
logger.error('Timeout error!', { message: err.message, stack: err.stack });
61+
});
62+
63+
export default consumer;

0 commit comments

Comments
 (0)