backend/lib/seed/engine/utils/streams/stream.service.ts
2025-05-14 21:45:16 +02:00

85 lines
3.1 KiB
TypeScript

import { AsyncHooksService } from '@lib/__hooks';
import { EngineModel } from '@seed/engine/EngineModel';
import { IEngineSchema } from '@seed/engine/EngineSchema';
import DB from '@seed/services/database/DBService';
import { asyncHook } from '@seed/services/hooks/hooks.decorator';
import { AccountTypeEnum } from '@src/accounts/account.components';
import { ModelCollectionEnum } from '@src/__indexes/__collections';
import { v4 as uuid } from 'uuid';
import { StreamOperationType, PostHookStatus } from './schemas/stream.components';
import { StreamDBInterfaceSchema } from './schemas/stream.schema';
export const createChangeStream = async <T extends EngineModel<any, any, any>>(model: T, operation: StreamOperationType): Promise<void> => {
let operationFunction;
const changeStreamId = uuid();
const base = {
_id: changeStreamId,
operation,
collection: model.collectionName,
documentKey: model._id,
hookStatus: PostHookStatus.new,
createdAt: new Date(),
updatedAt: new Date(),
};
switch (operation) {
case StreamOperationType.insert:
(base as any).insertedValues = model.get();
operationFunction = 'afterCreate';
break;
case StreamOperationType.update:
case StreamOperationType.replace:
(base as any).updatedValues = model.get();
operationFunction = 'afterUpdate';
break;
case StreamOperationType.delete:
(base as any).updatedValues = model.get();
operationFunction = 'afterDelete';
break;
default:
break;
}
const dataToSave: StreamDBInterfaceSchema & IEngineSchema = {
...base,
permissions: {
r: [AccountTypeEnum.admin],
w: [AccountTypeEnum.admin],
d: [AccountTypeEnum.admin],
},
};
await (await DB.getInstance()).db.collection(ModelCollectionEnum['streams']).insertOne(dataToSave);
if (operationFunction && model[operationFunction] && (await asyncHook(dataToSave))) {
await model[operationFunction]();
await (await DB.getInstance()).db
.collection(ModelCollectionEnum['streams'])
.updateOne({ _id: changeStreamId }, { $set: { hookStatus: PostHookStatus.completed } });
}
};
export const createAsyncStream = async <T extends EngineModel<any, any, any>>(collection: keyof AsyncHooksService, data: any): Promise<void> => {
const changeStreamId = uuid();
const base = {
_id: changeStreamId,
operation: StreamOperationType.hook,
collection,
insertedValues: data,
hookStatus: PostHookStatus.new,
createdAt: new Date(),
updatedAt: new Date(),
};
const dataToSave: StreamDBInterfaceSchema & IEngineSchema = {
...base,
permissions: {
r: [AccountTypeEnum.admin],
w: [AccountTypeEnum.admin],
d: [AccountTypeEnum.admin],
},
};
await (await DB.getInstance()).db.collection(ModelCollectionEnum['streams']).insertOne(dataToSave);
};