Skip to content

Commit 9cf74d0

Browse files
authored
refactor: add memory use logs, cleanup queue, streams (#44)
1 parent 2235547 commit 9cf74d0

File tree

5 files changed

+116
-16
lines changed

5 files changed

+116
-16
lines changed

apps/api/src/jobs/consumers/summary/summary.consumer.ts

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
1-
import { Logger } from '@nestjs/common';
2-
import { Process, Processor } from '@nestjs/bull';
3-
import { Job } from 'bull';
1+
import { Logger, OnModuleDestroy } from '@nestjs/common';
2+
import { InjectQueue, Process, Processor } from '@nestjs/bull';
3+
import { Job, Queue } from 'bull';
44
import { InjectRepository } from '@nestjs/typeorm';
55
import { Repository } from 'typeorm';
66
import { Summary } from 'src/modules/summary/entities/summary.entity';
@@ -13,14 +13,15 @@ import { SUMMARY_QUEUE } from 'src/modules/summary/queues/summary.queue';
1313
import { uploadDir } from 'src/main';
1414

1515
@Processor(SUMMARY_QUEUE)
16-
export class SummaryConsumer {
16+
export class SummaryConsumer implements OnModuleDestroy {
1717
private readonly logger = new Logger(SummaryConsumer.name);
1818

1919
constructor(
2020
@InjectRepository(Summary)
2121
private readonly summaryRepository: Repository<Summary>,
2222
private readonly summaryService: SummaryService,
2323
private readonly meetingSummarizerService: MeetingSummaryService,
24+
@InjectQueue(SUMMARY_QUEUE) private readonly summaryQueue: Queue,
2425
) {}
2526

2627
@Process()
@@ -49,8 +50,7 @@ export class SummaryConsumer {
4950
const summaryText = await this.meetingSummarizerService.generateMeetingSummary(fileContent);
5051
summary.outputText =
5152
summaryText ||
52-
'Summary could not be generated, please check if its a valid teams transcript file';
53-
// Update job status to SUCCESS
53+
"Summary could not be generated, please check if it's a valid teams transcript file";
5454
summary.jobStatus = JobStatus.SUCCESS;
5555
this.logger.log(`Summary generated successfully for job with ID: ${jobId}`);
5656
} catch (error) {
@@ -62,4 +62,10 @@ export class SummaryConsumer {
6262
await this.summaryRepository.save(summary);
6363
}
6464
}
65+
66+
async onModuleDestroy(): Promise<void> {
67+
if (this.summaryQueue) {
68+
await this.summaryQueue.close();
69+
}
70+
}
6571
}

apps/api/src/jobs/producers/summary/summary.producer.ts

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,11 @@
1-
import { Injectable, Logger, Optional } from '@nestjs/common';
1+
import { Injectable, Logger, Optional, OnModuleDestroy } from '@nestjs/common';
22
import { InjectQueue } from '@nestjs/bull';
33
import { Queue } from 'bull';
44
import { Summary } from 'src/modules/summary/entities/summary.entity';
55
import { SUMMARY_QUEUE } from 'src/modules/summary/queues/summary.queue';
66

77
@Injectable()
8-
export class SummaryQueueProducer {
8+
export class SummaryQueueProducer implements OnModuleDestroy {
99
private readonly logger = new Logger(SummaryQueueProducer.name);
1010

1111
constructor(@Optional() @InjectQueue(SUMMARY_QUEUE) private readonly summaryQueue: Queue) {}
@@ -27,7 +27,16 @@ export class SummaryQueueProducer {
2727
this.logger.log(`Adding job with ID ${summary.jobId}`);
2828

2929
if (this.summaryQueue) {
30-
await this.summaryQueue.add(summary.jobId);
30+
await this.summaryQueue.add(summary.jobId, {
31+
removeOnComplete: true,
32+
removeOnFail: true,
33+
});
34+
}
35+
}
36+
37+
async onModuleDestroy(): Promise<void> {
38+
if (this.summaryQueue) {
39+
await this.summaryQueue.close();
3140
}
3241
}
3342
}
Lines changed: 80 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,89 @@
1-
import { Injectable } from '@nestjs/common';
1+
import { Injectable, Logger, OnModuleInit, OnModuleDestroy } from '@nestjs/common';
22
import { Cron, CronExpression } from '@nestjs/schedule';
33
import { SummaryService } from '../summary.service';
4+
import { ConfigService } from '@nestjs/config';
5+
import Redis from 'ioredis';
46

57
@Injectable()
6-
export class ScheduleService {
7-
constructor(private readonly summaryService: SummaryService) {}
8+
export class ScheduleService implements OnModuleInit, OnModuleDestroy {
9+
private readonly logger = new Logger(ScheduleService.name);
10+
private isProcessing: boolean = false;
11+
private redisClient: Redis;
12+
13+
constructor(
14+
private readonly summaryService: SummaryService,
15+
private readonly configService: ConfigService,
16+
) {}
17+
18+
onModuleInit(): void {
19+
this.redisClient = new Redis({
20+
host: this.configService.get<string>('REDIS_HOST'),
21+
port: this.configService.get<number>('REDIS_PORT'),
22+
});
23+
}
824

925
@Cron(CronExpression.EVERY_SECOND)
1026
async addSummaryToQueue(): Promise<void> {
11-
this.summaryService.addSummaryToQueue();
27+
if (this.isProcessing) {
28+
return;
29+
}
30+
31+
this.isProcessing = true;
32+
33+
try {
34+
this.logMemoryUsage();
35+
await this.summaryService.addSummaryToQueue();
36+
} catch (error) {
37+
this.logger.error('Error in addSummaryToQueue:', error);
38+
} finally {
39+
this.isProcessing = false;
40+
}
41+
}
42+
43+
private logMemoryUsage(): void {
44+
// Node.js memory usage
45+
const memoryUsage = process.memoryUsage();
46+
this.logger.log(
47+
`Memory Usage: RSS = ${memoryUsage.rss / 1024 / 1024} MB, Heap Total = ${memoryUsage.heapTotal / 1024 / 1024} MB, Heap Used = ${memoryUsage.heapUsed / 1024 / 1024} MB`,
48+
);
49+
50+
// Redis memory usage
51+
this.redisClient.info('memory', (err, res) => {
52+
if (err) {
53+
this.logger.error('Error fetching Redis memory info:', err);
54+
} else {
55+
const memoryInfo = this.parseRedisMemoryInfo(res);
56+
this.logger.log(
57+
`Redis Memory Usage: Used = ${memoryInfo.used_memory_human}, RSS = ${memoryInfo.used_memory_rss_human}, Peak = ${memoryInfo.used_memory_peak_human}, System Memory = ${memoryInfo.total_system_memory_human}, Fragmentation Ratio = ${memoryInfo.mem_fragmentation_ratio}`,
58+
);
59+
}
60+
});
61+
}
62+
63+
// eslint-disable-next-line
64+
private parseRedisMemoryInfo(info: string): any {
65+
const memoryInfo = {};
66+
const lines = info.split('\r\n');
67+
const relevantKeys = [
68+
'used_memory_human',
69+
'used_memory_rss_human',
70+
'used_memory_peak_human',
71+
'total_system_memory_human',
72+
'mem_fragmentation_ratio',
73+
];
74+
lines.forEach((line) => {
75+
const [key, value] = line.split(':');
76+
77+
if (relevantKeys.includes(key)) {
78+
memoryInfo[key] = value;
79+
}
80+
});
81+
return memoryInfo;
82+
}
83+
84+
async onModuleDestroy(): Promise<void> {
85+
if (this.redisClient) {
86+
await this.redisClient.quit();
87+
}
1288
}
1389
}

apps/api/src/modules/summary/summarizer/summarizer.service.ts

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,13 +32,15 @@ export class MeetingSummaryService {
3232
.split('\n')
3333
.map((line) => line.trim())
3434
.filter((line) => line);
35-
const timePattern =
35+
const timePattern1 =
3636
/(\d{1,2}:\d{2}:\d{2}(?:\.\d{1,3})?) --> (\d{1,2}:\d{2}:\d{2}(?:\.\d{1,3})?)/; // Matches time format "HH:MM:SS" or "HH:MM:SS.mmm"
37+
const timePattern2 =
38+
/(\d{1,2}:\d{1,2}:\d{1,2}(?:\.\d{1,3})?) --> (\d{1,2}:\d{1,2}:\d{1,2}(?:\.\d{1,3})?)/; // Matches time format "H:M:S" or "H:M:S.mmm"
3739
const chunks: Chunk[] = [];
3840
let i = 0;
3941

4042
while (i < lines.length) {
41-
const match = lines[i].match(timePattern);
43+
const match = lines[i].match(timePattern1) || lines[i].match(timePattern2);
4244

4345
if (match) {
4446
const [startTime, endTime] = match.slice(1, 3);

apps/api/src/modules/summary/summary.service.ts

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,8 +46,10 @@ export class SummaryService extends CoreService<Summary> {
4646
const fileLocation = join(uploadDir, modifiedFilename);
4747

4848
return new Promise((resolve, reject) => {
49+
const writeStream = createWriteStream(fileLocation);
50+
4951
createReadStream()
50-
.pipe(createWriteStream(fileLocation))
52+
.pipe(writeStream)
5153
.on('finish', async () => {
5254
const summary = this.summaryRepository.create({ inputFile: modifiedFilename });
5355

@@ -65,9 +67,12 @@ export class SummaryService extends CoreService<Summary> {
6567
resolve(savedSummary);
6668
} catch (error) {
6769
reject(error);
70+
} finally {
71+
writeStream.close();
6872
}
6973
})
7074
.on('error', (error) => {
75+
writeStream.close();
7176
reject(error);
7277
});
7378
});
@@ -98,6 +103,7 @@ export class SummaryService extends CoreService<Summary> {
98103
},
99104
});
100105
}
106+
101107
getPendingSummary(): Promise<Summary[]> {
102108
this.logger.log('Getting all active pending summaries');
103109
return this.summaryRepository.find({
@@ -145,5 +151,6 @@ export class SummaryService extends CoreService<Summary> {
145151
}
146152

147153
this.isProcessingQueue = false;
154+
this.logger.log('Finished processing cron');
148155
}
149156
}

0 commit comments

Comments
 (0)