Skip to content

Commit 2ec0520

Browse files
Merge branch 'next' into nv-6982-refactor-the-subscription-routes-from-subscriptionidoridentifier-to
2 parents 100d010 + 03686c4 commit 2ec0520

File tree

123 files changed

+1762
-764
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

123 files changed

+1762
-764
lines changed

.cursor/rules/dashboard.mdc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,6 @@ description:
33
globs: apps/dashboard/**/*
44
alwaysApply: false
55
---
6-
- Do not attempt to to build or run the dashboard as the user will be already running it, to check types you should be able to access the eslint results in cursor.
6+
- Do not attempt to build or run the dashboard as the user will be already running it, to check types you should be able to access the eslint results in cursor.
77
- Use lowercase with dashes for directories and files (e.g., components/auth-wizard).
88
- Favor named exports for components.

apps/api/migrations/add-default-identifier-to-topic-subscribers/add-default-identifier-to-topic-subscribers-migration.spec.ts renamed to apps/api/migrations/001-add-default-identifier-to-topic-subscribers/add-default-identifier-to-topic-subscribers-migration.spec.ts

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,7 @@ import { buildDefaultSubscriptionIdentifier } from '@novu/application-generic';
33
import { expect } from 'chai';
44
import { afterEach, beforeEach, describe, it } from 'mocha';
55
import * as sinon from 'sinon';
6-
7-
import { addDefaultIdentifierToTopicSubscribersMigration } from './add-default-identifier-to-topic-subscribers-migration';
6+
import { run as addDefaultIdentifierToTopicSubscribersMigration } from './add-default-identifier-to-topic-subscribers-migration';
87

98
describe('Add Default Identifier To Topic Subscribers Migration', () => {
109
let mockApp: any;

apps/api/migrations/add-default-identifier-to-topic-subscribers/add-default-identifier-to-topic-subscribers-migration.ts renamed to apps/api/migrations/001-add-default-identifier-to-topic-subscribers/add-default-identifier-to-topic-subscribers-migration.ts

File renamed without changes.
Lines changed: 324 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,324 @@
1+
import { NestFactory } from '@nestjs/core';
2+
import { expect } from 'chai';
3+
import { afterEach, beforeEach, describe, it } from 'mocha';
4+
import * as sinon from 'sinon';
5+
import { run as removeDuplicateIdentifiersMigration } from './remove-duplicate-identifiers';
6+
7+
describe('Remove Duplicate Identifiers Migration', () => {
8+
let mockApp: any;
9+
let mockLogger: any;
10+
let mockTopicSubscribersRepository: any;
11+
let mockCursor: any;
12+
let bulkWriteStub: sinon.SinonStub;
13+
let loggerInfoStub: sinon.SinonStub;
14+
let loggerErrorStub: sinon.SinonStub;
15+
let appCloseStub: sinon.SinonStub;
16+
17+
beforeEach(() => {
18+
mockCursor = {
19+
[Symbol.asyncIterator]: async function* () {},
20+
};
21+
22+
bulkWriteStub = sinon.stub().resolves({ deletedCount: 0 });
23+
loggerInfoStub = sinon.stub();
24+
loggerErrorStub = sinon.stub();
25+
appCloseStub = sinon.stub().resolves();
26+
27+
mockLogger = {
28+
setContext: sinon.stub(),
29+
info: loggerInfoStub,
30+
error: loggerErrorStub,
31+
};
32+
33+
mockTopicSubscribersRepository = {
34+
_model: {
35+
aggregate: sinon.stub().returns({
36+
cursor: sinon.stub().returns(mockCursor),
37+
}),
38+
},
39+
bulkWrite: bulkWriteStub,
40+
};
41+
42+
mockApp = {
43+
resolve: sinon.stub().resolves(mockLogger),
44+
get: sinon.stub().returns(mockTopicSubscribersRepository),
45+
close: appCloseStub,
46+
};
47+
48+
sinon.stub(NestFactory, 'create').resolves(mockApp);
49+
});
50+
51+
afterEach(() => {
52+
sinon.restore();
53+
});
54+
55+
it('should keep oldest document and delete newer duplicates', async () => {
56+
const duplicateGroups = [
57+
{
58+
_id: {
59+
_environmentId: 'env1',
60+
identifier: 'tk_topic-1:si_subscriber-1',
61+
},
62+
count: 3,
63+
documentIds: ['oldest-doc', 'middle-doc', 'newest-doc'],
64+
},
65+
];
66+
67+
mockCursor[Symbol.asyncIterator] = async function* () {
68+
for (const group of duplicateGroups) {
69+
yield group;
70+
}
71+
};
72+
73+
bulkWriteStub.resolves({ deletedCount: 2 });
74+
75+
await removeDuplicateIdentifiersMigration();
76+
77+
expect(bulkWriteStub.calledOnce).to.be.true;
78+
const deleteOps = bulkWriteStub.firstCall.args[0];
79+
expect(deleteOps).to.have.length(2);
80+
expect(deleteOps[0].deleteOne.filter._id).to.equal('middle-doc');
81+
expect(deleteOps[1].deleteOne.filter._id).to.equal('newest-doc');
82+
});
83+
84+
it('should log kept and deleted document IDs for each duplicate group', async () => {
85+
const duplicateGroups = [
86+
{
87+
_id: {
88+
_environmentId: 'env1',
89+
identifier: 'tk_topic-1:si_subscriber-1',
90+
},
91+
count: 3,
92+
documentIds: ['doc1', 'doc2', 'doc3'],
93+
},
94+
];
95+
96+
mockCursor[Symbol.asyncIterator] = async function* () {
97+
for (const group of duplicateGroups) {
98+
yield group;
99+
}
100+
};
101+
102+
bulkWriteStub.resolves({ deletedCount: 2 });
103+
104+
await removeDuplicateIdentifiersMigration();
105+
106+
expect(
107+
loggerInfoStub.calledWith(
108+
sinon.match({
109+
message: 'Processing duplicate group',
110+
environmentId: 'env1',
111+
identifier: 'tk_topic-1:si_subscriber-1',
112+
keptDocumentId: 'doc1',
113+
deletingDocumentIds: ['doc2', 'doc3'],
114+
})
115+
)
116+
).to.be.true;
117+
});
118+
119+
it('should process multiple duplicate groups and delete from each', async () => {
120+
const duplicateGroups = [
121+
{
122+
_id: {
123+
_environmentId: 'env1',
124+
identifier: 'identifier-1',
125+
},
126+
count: 2,
127+
documentIds: ['doc1', 'doc2'],
128+
},
129+
{
130+
_id: {
131+
_environmentId: 'env2',
132+
identifier: 'identifier-2',
133+
},
134+
count: 3,
135+
documentIds: ['doc3', 'doc4', 'doc5'],
136+
},
137+
];
138+
139+
mockCursor[Symbol.asyncIterator] = async function* () {
140+
for (const group of duplicateGroups) {
141+
yield group;
142+
}
143+
};
144+
145+
bulkWriteStub.resolves({ deletedCount: 3 });
146+
147+
await removeDuplicateIdentifiersMigration();
148+
149+
expect(bulkWriteStub.calledOnce).to.be.true;
150+
const deleteOps = bulkWriteStub.firstCall.args[0];
151+
expect(deleteOps).to.have.length(3);
152+
});
153+
154+
it('should handle empty cursor when no duplicates exist', async () => {
155+
mockCursor[Symbol.asyncIterator] = async function* () {};
156+
157+
await removeDuplicateIdentifiersMigration();
158+
159+
expect(loggerInfoStub.calledWith('start migration - remove duplicate identifiers in topic subscribers')).to.be.true;
160+
expect(loggerInfoStub.calledWith(sinon.match(/processed 0 duplicate groups, deleted 0 documents/))).to.be.true;
161+
expect(bulkWriteStub.called).to.be.false;
162+
expect(appCloseStub.calledOnce).to.be.true;
163+
});
164+
165+
it('should handle migration errors gracefully', async () => {
166+
const error = new Error('Migration failed');
167+
mockCursor[Symbol.asyncIterator] = async function* () {
168+
throw error;
169+
};
170+
171+
await removeDuplicateIdentifiersMigration();
172+
173+
expect(loggerErrorStub.calledWith('Error during migration: Error: Migration failed')).to.be.true;
174+
expect(appCloseStub.calledOnce).to.be.true;
175+
});
176+
177+
it('should handle bulk delete errors gracefully', async () => {
178+
const duplicateGroups = [
179+
{
180+
_id: {
181+
_environmentId: 'env1',
182+
identifier: 'identifier-1',
183+
},
184+
count: 2,
185+
documentIds: ['doc1', 'doc2'],
186+
},
187+
];
188+
189+
mockCursor[Symbol.asyncIterator] = async function* () {
190+
for (const group of duplicateGroups) {
191+
yield group;
192+
}
193+
};
194+
195+
bulkWriteStub.rejects(new Error('Bulk delete failed'));
196+
197+
await removeDuplicateIdentifiersMigration();
198+
199+
expect(loggerErrorStub.calledWith('Error in final bulk delete: Error: Bulk delete failed')).to.be.true;
200+
expect(appCloseStub.calledOnce).to.be.true;
201+
});
202+
203+
it('should use correct aggregation pipeline with sort before group', async () => {
204+
mockCursor[Symbol.asyncIterator] = async function* () {};
205+
206+
await removeDuplicateIdentifiersMigration();
207+
208+
const aggregateCall = mockTopicSubscribersRepository._model.aggregate;
209+
expect(aggregateCall.calledOnce).to.be.true;
210+
211+
const pipeline = aggregateCall.firstCall.args[0];
212+
expect(pipeline).to.have.length(4);
213+
214+
expect(pipeline[0].$match).to.deep.equal({
215+
identifier: { $exists: true },
216+
});
217+
218+
expect(pipeline[1].$sort).to.deep.equal({ _id: 1 });
219+
220+
expect(pipeline[2].$group).to.deep.equal({
221+
_id: {
222+
_environmentId: '$_environmentId',
223+
identifier: '$identifier',
224+
},
225+
count: { $sum: 1 },
226+
documentIds: { $push: '$_id' },
227+
});
228+
229+
expect(pipeline[3].$match).to.deep.equal({
230+
count: { $gt: 1 },
231+
});
232+
});
233+
234+
it('should use cursor with batch size of 500 for memory efficiency', async () => {
235+
mockCursor[Symbol.asyncIterator] = async function* () {};
236+
237+
await removeDuplicateIdentifiersMigration();
238+
239+
const cursorCall = mockTopicSubscribersRepository._model.aggregate().cursor;
240+
expect(cursorCall.calledWith({ batchSize: 500 })).to.be.true;
241+
});
242+
243+
it('should batch delete operations when exceeding batch size', async () => {
244+
const manyDuplicates = Array.from({ length: 300 }, (_, i) => ({
245+
_id: {
246+
_environmentId: 'env1',
247+
identifier: `identifier-${i}`,
248+
},
249+
count: 3,
250+
documentIds: [`doc-${i}-1`, `doc-${i}-2`, `doc-${i}-3`],
251+
}));
252+
253+
mockCursor[Symbol.asyncIterator] = async function* () {
254+
for (const group of manyDuplicates) {
255+
yield group;
256+
}
257+
};
258+
259+
bulkWriteStub.resolves({ deletedCount: 500 });
260+
261+
await removeDuplicateIdentifiersMigration();
262+
263+
expect(bulkWriteStub.calledTwice).to.be.true;
264+
});
265+
266+
it('should log document IDs as strings when ObjectIds are returned', async () => {
267+
const duplicateGroups = [
268+
{
269+
_id: {
270+
_environmentId: { toString: () => 'env-obj-id' },
271+
identifier: 'test-identifier',
272+
},
273+
count: 2,
274+
documentIds: [{ toString: () => 'kept-id' }, { toString: () => 'deleted-id' }],
275+
},
276+
];
277+
278+
mockCursor[Symbol.asyncIterator] = async function* () {
279+
for (const group of duplicateGroups) {
280+
yield group;
281+
}
282+
};
283+
284+
bulkWriteStub.resolves({ deletedCount: 1 });
285+
286+
await removeDuplicateIdentifiersMigration();
287+
288+
expect(
289+
loggerInfoStub.calledWith(
290+
sinon.match({
291+
message: 'Processing duplicate group',
292+
environmentId: 'env-obj-id',
293+
keptDocumentId: 'kept-id',
294+
deletingDocumentIds: ['deleted-id'],
295+
})
296+
)
297+
).to.be.true;
298+
});
299+
300+
it('should report correct total deleted count in final log', async () => {
301+
const duplicateGroups = [
302+
{
303+
_id: {
304+
_environmentId: 'env1',
305+
identifier: 'identifier-1',
306+
},
307+
count: 3,
308+
documentIds: ['doc1', 'doc2', 'doc3'],
309+
},
310+
];
311+
312+
mockCursor[Symbol.asyncIterator] = async function* () {
313+
for (const group of duplicateGroups) {
314+
yield group;
315+
}
316+
};
317+
318+
bulkWriteStub.resolves({ deletedCount: 2 });
319+
320+
await removeDuplicateIdentifiersMigration();
321+
322+
expect(loggerInfoStub.calledWith(sinon.match(/processed 1 duplicate groups, deleted 2 documents/))).to.be.true;
323+
});
324+
});

0 commit comments

Comments
 (0)