import { AsyncHooksService } from '@lib/__hooks'; import { EngineModel } from '@seed/engine/EngineModel'; import { IEngineSchema } from '@seed/engine/EngineSchema'; import DB from '@seed/services/database/DBService'; import { asyncHook } from '@seed/services/hooks/hooks.decorator'; import { AccountTypeEnum } from '@src/accounts/account.components'; import { ModelCollectionEnum } from '@src/__indexes/__collections'; import { v4 as uuid } from 'uuid'; import { StreamOperationType, PostHookStatus } from './schemas/stream.components'; import { StreamDBInterfaceSchema } from './schemas/stream.schema'; export const createChangeStream = async >(model: T, operation: StreamOperationType): Promise => { let operationFunction; const changeStreamId = uuid(); const base = { _id: changeStreamId, operation, collection: model.collectionName, documentKey: model._id, hookStatus: PostHookStatus.new, createdAt: new Date(), updatedAt: new Date(), }; switch (operation) { case StreamOperationType.insert: (base as any).insertedValues = model.get(); operationFunction = 'afterCreate'; break; case StreamOperationType.update: case StreamOperationType.replace: (base as any).updatedValues = model.get(); operationFunction = 'afterUpdate'; break; case StreamOperationType.delete: (base as any).updatedValues = model.get(); operationFunction = 'afterDelete'; break; default: break; } const dataToSave: StreamDBInterfaceSchema & IEngineSchema = { ...base, permissions: { r: [AccountTypeEnum.admin], w: [AccountTypeEnum.admin], d: [AccountTypeEnum.admin], }, }; await (await DB.getInstance()).db.collection(ModelCollectionEnum['streams']).insertOne(dataToSave); if (operationFunction && model[operationFunction] && (await asyncHook(dataToSave))) { await model[operationFunction](); await (await DB.getInstance()).db .collection(ModelCollectionEnum['streams']) .updateOne({ _id: changeStreamId }, { $set: { hookStatus: PostHookStatus.completed } }); } }; export const createAsyncStream = async >(collection: keyof AsyncHooksService, data: any): Promise => { const changeStreamId = uuid(); const base = { _id: changeStreamId, operation: StreamOperationType.hook, collection, insertedValues: data, hookStatus: PostHookStatus.new, createdAt: new Date(), updatedAt: new Date(), }; const dataToSave: StreamDBInterfaceSchema & IEngineSchema = { ...base, permissions: { r: [AccountTypeEnum.admin], w: [AccountTypeEnum.admin], d: [AccountTypeEnum.admin], }, }; await (await DB.getInstance()).db.collection(ModelCollectionEnum['streams']).insertOne(dataToSave); };