Skip to content

feat: delete older archived notifications #448

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 10 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion apps/api/.env.example
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,11 @@ NODE_ENV=development # Use "development" for graphql playground to work. Use "pr
# Notification configuration
MAX_RETRY_COUNT=3 # Max retry count. DEFAULT: 3
ARCHIVE_LIMIT=1000 # Max notifications to archive. DEFAULT: 1000
ARCHIVE_INTERVAL=3600 # Interval (in seconds) for archiving notifications. DEFAULT: 3600 (every 1 hour)
ARCHIVE_INTERVAL_IN_SECONDS=3600 # Interval (in seconds) for archiving notifications. DEFAULT: 3600 (every 1 hour)
SCHEDULE_TIME_IN_SECONDS=5 # Interval (in seconds) for processing PENDING and AWAITING_CONFIRMATION notifications. DEFAULT: 5 (every 5 seconds)
ENABLE_ARCHIVED_NOTIFICATION_DELETION=false # Toggle to enable/disable deletion of archived notifications, VALUES: "true", "false". DEFAULT: false
DELETE_INTERVAL_IN_SECONDS=2592000 # Interval (in seconds) for deletion of archived notifications. DEFAULT: 2592000 (every 30 days)
DELETE_ARCHIVED_NOTIFICATIONS_OLDER_THAN=90d # Use formats from https://github.com/vercel/ms. Use positive values. DEFAULT: 90d (older than 90 days). LIMIT: 10y (not older than 10 years)

# Logger configuration
LOG_LEVEL=info # Minimum log severity level to be recorded, VALUES: "debug", "info", "warning", "error". DEFAULT: info
Expand Down
5 changes: 4 additions & 1 deletion apps/api/.gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -36,4 +36,7 @@ lerna-debug.log*

# Environment variable file
.env
schema.gpl
schema.gpl

# Archived Notifications Backup Directory
backups
39 changes: 39 additions & 0 deletions apps/api/package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions apps/api/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
"@apollo/server": "^4.11.3",
"@aws-sdk/client-ses": "^3.775.0",
"@aws-sdk/client-sns": "^3.775.0",
"@fast-csv/format": "^5.0.2",
"@nestjs/apollo": "^12.2.1",
"@nestjs/axios": "^3.1.1",
"@nestjs/bull": "^10.2.2",
Expand Down
16 changes: 14 additions & 2 deletions apps/api/scheduler.sh
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ SCHEDULE_TIME_IN_SECONDS="${SCHEDULE_TIME_IN_SECONDS:-5}"
BASE_URL="http://localhost:${SERVER_PORT}/notifications"
ARCHIVE_URL="http://localhost:${SERVER_PORT}/archived-notifications"

ARCHIVE_INTERVAL="${ARCHIVE_INTERVAL:-3600}"
ARCHIVE_INTERVAL_IN_SECONDS="${ARCHIVE_INTERVAL_IN_SECONDS:-3600}"
DELETE_INTERVAL_IN_SECONDS="${DELETE_INTERVAL_IN_SECONDS:-2592000}"

add_notifications_to_queue() {
curl -X POST "${BASE_URL}/queue" -H "Content-Type: application/json" -d '{}'
Expand All @@ -21,7 +22,12 @@ move_completed_notifications_to_archive() {
curl -X POST "${ARCHIVE_URL}/archive" -H "Content-Type: application/json" -d '{}'
}

delete_archived_notifications() {
curl -X DELETE "${ARCHIVE_URL}/delete"
}

last_archive_run=$(date +%s)
last_delete_run=$(date +%s)

while true; do
add_notifications_to_queue
Expand All @@ -30,10 +36,16 @@ while true; do
current_time=$(date +%s)

# Check if it's time to run the archive function
if (( (current_time - last_archive_run) >= ARCHIVE_INTERVAL )); then
if (( (current_time - last_archive_run) >= ARCHIVE_INTERVAL_IN_SECONDS )); then
move_completed_notifications_to_archive
last_archive_run=$current_time
fi

# Check if it's time to run the delete function
if (( (current_time - last_delete_run) >= DELETE_INTERVAL_IN_SECONDS )); then
delete_archived_notifications
last_delete_run=$current_time
fi

sleep $SCHEDULE_TIME_IN_SECONDS
done
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { Controller, HttpException, Logger, Post } from '@nestjs/common';
import { Controller, Delete, HttpException, Logger, Post } from '@nestjs/common';
import { ArchivedNotificationsService } from './archived-notifications.service';

@Controller('archived-notifications')
Expand All @@ -24,4 +24,21 @@ export class ArchivedNotificationsController {
throw error;
}
}

@Delete('delete')
async deleteArchivedNotifications(): Promise<void> {
try {
this.logger.debug('Deleting archived notifications...');
await this.archivedNotificationService.deleteArchivedNotificationsCron();
this.logger.log('End of delete archived notifications Cron');
} catch (error) {
if (error instanceof HttpException) {
throw error;
}

this.logger.error('Error while deleting notifications');
this.logger.error(JSON.stringify(error, ['message', 'stack'], 2));
throw error;
}
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { Injectable, Logger } from '@nestjs/common';
import { ArchivedNotification } from './entities/archived-notification.entity';
import { DataSource, In, QueryRunner, Repository } from 'typeorm';
import { DataSource, In, LessThan, QueryRunner, Repository } from 'typeorm';
import { Notification } from 'src/modules/notifications/entities/notification.entity';
import { ConfigService } from '@nestjs/config';
import { DeliveryStatus } from 'src/common/constants/notifications';
Expand All @@ -9,6 +9,10 @@ import { QueryOptionsDto } from 'src/common/graphql/dtos/query-options.dto';
import { ArchivedNotificationResponse } from './dtos/archived-notification-response.dto';
import { InjectRepository } from '@nestjs/typeorm';
import { CoreService } from 'src/common/graphql/services/core.service';
import ms = require('ms');
import * as path from 'path';
import * as fs from 'fs';
import { format } from '@fast-csv/format';

@Injectable()
export class ArchivedNotificationsService extends CoreService<ArchivedNotification> {
Expand Down Expand Up @@ -147,4 +151,151 @@ export class ArchivedNotificationsService extends CoreService<ArchivedNotificati
});
return archivedNotification;
}

async deleteArchivedNotificationsCron(): Promise<void> {
try {
const enableArchivedNotificationDeletion = this.configService.get<string>(
'ENABLE_ARCHIVED_NOTIFICATION_DELETION',
'false',
);

if (enableArchivedNotificationDeletion.toLowerCase() === 'true') {
this.logger.log('Running archived notification deletion cron task');
await this.deleteArchivedEntriesAndGenerateCsvBackup();
this.logger.log(`Archive notification deletion cron task completed`);
} else {
this.logger.log('Archived Notification Deletion Cron is disabled');
}
} catch (error) {
this.logger.error(`Cron job failed: ${error.message}`, error.stack);
throw error;
}
}

async deleteArchivedEntriesAndGenerateCsvBackup(): Promise<void> {
try {
const deleteArchivedNotificationsOlderThan = this.configService.get<string>(
'DELETE_ARCHIVED_NOTIFICATIONS_OLDER_THAN',
'90d',
);

const maxRetentionMs = ms('10y');
const retentionDurationMs = ms(deleteArchivedNotificationsOlderThan);

// Guard rails
if (
typeof retentionDurationMs !== 'number' ||
retentionDurationMs <= 0 ||
retentionDurationMs > maxRetentionMs
) {
throw new Error(
'Invalid retention period. It must be a positive duration not exceeding 10 years.',
);
}

const cutoffTimestamp = new Date(Date.now() - retentionDurationMs);

const queryRunner = this.dataSource.createQueryRunner();
await queryRunner.connect();
await queryRunner.startTransaction();

try {
// Fetch entries to delete
const archivedEntries = await queryRunner.manager.find(ArchivedNotification, {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not a good idea to hold a lot of records here. This will consume a lot of RAM if we do it on OQSHA database where we have lakhs lakhs of records.

We need to do this process in a pagination way. If we simply fetch all records at once it will consume a lot of RAM to load the data. We need to do in pagination fashion how we do it in portal cannot pull so much data at once.

You need to also write to the excel or csv in a streaming fashion by not holding a lot of data in memory to dump in csv.

Just assume if there are 30 lakh records. 3 milion then we will consume a lot of RAM and also the query might timeout.

Kindly make sure

  1. You fetch limited records as pagination (say max 500 at a time)
  2. You need to write to the CSV file and not wait for all the records to be in memory and writing it just once in the end will cause issues by confusing a lot of RAM.
  3. You can delete thecsv file if something goes bad during this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Limit number of deleted entries to 500 447842d
Delete the csv file if transaction fails 270b980

where: {
createdOn: LessThan(cutoffTimestamp),
status: Status.ACTIVE,
},
order: {
createdOn: 'ASC',
},
take: 500,
});

if (archivedEntries.length === 0) {
this.logger.log('No entries to archive.');
await queryRunner.commitTransaction();
return;
}

// Create directory, filename and path for backups
const backupsDir = 'backups';
const backupFileName = `archived_notifications_backup_${this.getFormattedTimestamp()}.csv`;
const backupFilePath = path.join(backupsDir, backupFileName);

// Ensure the backups directory exists
try {
if (!fs.existsSync(backupsDir)) {
fs.mkdirSync(backupsDir, { recursive: true });
}
} catch (error) {
throw new Error(`Failed to create backup directory at "${backupsDir}": ${error}`);
}

// Export to CSV before deletion
await this.writeToCsv(archivedEntries, backupFilePath);

// Perform deletion
const idsToDelete = archivedEntries.map((entry) => entry.id);
await queryRunner.manager.delete(ArchivedNotification, idsToDelete);

await queryRunner.commitTransaction();
this.logger.log(
`Transaction successful. Deleted ${archivedEntries.length} entries. Backup at ${backupFilePath}`,
);
} catch (error) {
await queryRunner.rollbackTransaction();
this.logger.error('Error during deletion. Rolled back.', error.stack);
throw error;
} finally {
await queryRunner.release();
}
} catch (error) {
this.logger.error(`Failed to delete archived notifications: ${error.message}`, error.stack);
throw error;
}
}

private async writeToCsv(data: ArchivedNotification[], filePath: string): Promise<void> {
try {
// Update with json like nested fields as per archived-notification.entity file
const nestedFields = ['data', 'result'];

return new Promise((resolve, reject) => {
const ws = fs.createWriteStream(filePath);
const csvStream = format({ headers: true });

csvStream.pipe(ws).on('finish', resolve).on('error', reject);

for (const row of data) {
// Ensure data of nested fields is stringified before being added to csv
const safeRow = Object.fromEntries(
Object.entries(row).map(([key, value]) => [
key,
nestedFields.includes(key) && value !== null && typeof value === 'object'
? JSON.stringify(value)
: value,
]),
);

csvStream.write(safeRow);
}

csvStream.end();
});
} catch (error) {
this.logger.error(`Could not create backup file: ${error.message}`, error.stack);
throw error;
}
}

private getFormattedTimestamp(): string {
const now = new Date();
const yyyy = now.getFullYear().toString();
const MM = (now.getMonth() + 1).toString().padStart(2, '0');
const dd = now.getDate().toString().padStart(2, '0');
const hh = now.getHours().toString().padStart(2, '0');
const mm = now.getMinutes().toString().padStart(2, '0');
return `${yyyy}${MM}${dd}${hh}${mm}`;
}
}
Loading