154 lines
7.2 KiB
TypeScript
154 lines
7.2 KiB
TypeScript
/* eslint-disable @typescript-eslint/no-use-before-define */
|
|
/* eslint-disable @typescript-eslint/explicit-function-return-type */
|
|
import 'reflect-metadata';
|
|
|
|
import { modelsLoaders } from '@src/__indexes/__loaders';
|
|
import ChangeStreamModel from '@lib/seed/services/change-stream/change-stream.model';
|
|
import { StreamOperationType, PostHookStatus } from '@lib/seed/services/change-stream/change-stream.components';
|
|
import { AsyncHooksService } from '@lib/__hooks';
|
|
import { createApolloContext } from '@seed/graphql/Middleware';
|
|
import { SettingsCache } from '@seed/graphql/Settings';
|
|
|
|
export const hookHandler = async (event: any): Promise<void> => {
|
|
const settingsI = SettingsCache.getInstance();
|
|
await settingsI.refreshCache();
|
|
|
|
process.env.isHook = 'true';
|
|
|
|
const { streamId, notificationRessourceId } = event;
|
|
|
|
if (streamId) {
|
|
await streamHandler(streamId);
|
|
} else if (notificationRessourceId) {
|
|
await notificationHandler(notificationRessourceId);
|
|
} else {
|
|
console.log('[ASYNC - ERROR]', 'was called with no info');
|
|
}
|
|
|
|
return;
|
|
};
|
|
|
|
async function notificationHandler(streamId: string) {
|
|
const stream = await (await new ChangeStreamModel().db()).findOneAndUpdate(
|
|
{ _id: streamId, hookStatus: PostHookStatus.new },
|
|
{ $set: { hookStatus: PostHookStatus.inProcess } },
|
|
);
|
|
|
|
if (stream && stream.value) {
|
|
const streamData = stream.value;
|
|
console.log('[ASYNC - STREAM] Stream', stream.value.collection, stream.value.operation);
|
|
|
|
try {
|
|
if (streamData.operation == StreamOperationType.hook) {
|
|
const HookService = new AsyncHooksService();
|
|
|
|
if (!HookService[streamData.collection])
|
|
throw `[ASYNC - STREAM] ERROR - No AsyncHooksService for this ${streamData.collection}. Are you sure you put it in the AsyncHooksService ?`;
|
|
|
|
if (streamData.insertedValues && streamData.insertedValues.length > 0) {
|
|
const paramsArray = streamData.insertedValues as any[];
|
|
|
|
for (let index = 0; index < paramsArray.length; index++) {
|
|
// Deal with CTX
|
|
if (paramsArray[index].ctx) {
|
|
paramsArray[index] = await createApolloContext(paramsArray[index].ctx.user._id, paramsArray[index].ctx.organisationId);
|
|
}
|
|
}
|
|
await HookService[streamData.collection](...paramsArray);
|
|
} else await HookService[streamData.collection]();
|
|
} else {
|
|
if (!modelsLoaders[streamData.collection])
|
|
throw `[ASYNC - STREAM] ERROR - No ModelLoaders for this ${streamData.collection}. Are you sure you put it in the ModelLoaders ?`;
|
|
|
|
console.log('streamData', streamData);
|
|
const model = modelsLoaders[streamData.collection];
|
|
const modelData = await model.getOne({ _id: streamData.documentKey }, null);
|
|
console.log('model', model);
|
|
|
|
switch (streamData.operation) {
|
|
case StreamOperationType.insert:
|
|
await model.afterCreate();
|
|
break;
|
|
case StreamOperationType.update:
|
|
await model.afterUpdate();
|
|
break;
|
|
case StreamOperationType.delete:
|
|
await model.afterDelete();
|
|
break;
|
|
default:
|
|
throw `[ASYNC - STREAM] ERROR - No operation ${streamData.operation} for this ${streamData.collection}`;
|
|
}
|
|
}
|
|
await (await new ChangeStreamModel().db()).updateOne({ _id: streamId }, { $set: { hookStatus: PostHookStatus.completed } });
|
|
console.log('[ASYNC - STREAM] COMPLETE', { _id: streamId });
|
|
} catch (error) {
|
|
console.error('[ASYNC - STREAM] ERROR', error);
|
|
await (await new ChangeStreamModel().db()).updateOne({ _id: streamId }, { $set: { hookStatus: PostHookStatus.error } });
|
|
}
|
|
} else {
|
|
console.error('[ASYNC - STREAM] ERROR', 'No new stream found', streamId);
|
|
}
|
|
}
|
|
|
|
async function streamHandler(streamId: string) {
|
|
const stream = await (await new ChangeStreamModel().db()).findOneAndUpdate(
|
|
{ _id: streamId, hookStatus: PostHookStatus.new },
|
|
{ $set: { hookStatus: PostHookStatus.inProcess } },
|
|
);
|
|
|
|
if (stream && stream.value) {
|
|
const streamData = stream.value;
|
|
console.log('[ASYNC - STREAM] Stream', stream.value.collection, stream.value.operation);
|
|
|
|
try {
|
|
if (streamData.operation == StreamOperationType.hook) {
|
|
const HookService = new AsyncHooksService();
|
|
|
|
if (!HookService[streamData.collection])
|
|
throw `[ASYNC - STREAM] ERROR - No AsyncHooksService for this ${streamData.collection}. Are you sure you put it in the AsyncHooksService ?`;
|
|
|
|
if (streamData.insertedValues && streamData.insertedValues.length > 0) {
|
|
const paramsArray = streamData.insertedValues as any[];
|
|
|
|
for (let index = 0; index < paramsArray.length; index++) {
|
|
// Deal with CTX
|
|
if (paramsArray[index].ctx) {
|
|
paramsArray[index] = await createApolloContext(paramsArray[index].ctx.user._id, paramsArray[index].ctx.organisationId);
|
|
}
|
|
}
|
|
await HookService[streamData.collection](...paramsArray);
|
|
} else await HookService[streamData.collection]();
|
|
} else {
|
|
if (!modelsLoaders[streamData.collection])
|
|
throw `[ASYNC - STREAM] ERROR - No ModelLoaders for this ${streamData.collection}. Are you sure you put it in the ModelLoaders ?`;
|
|
|
|
console.log('streamData', streamData);
|
|
const model = modelsLoaders[streamData.collection];
|
|
const modelData = await model.getOne({ _id: streamData.documentKey }, null);
|
|
console.log('model', model);
|
|
|
|
switch (streamData.operation) {
|
|
case StreamOperationType.insert:
|
|
await model.afterCreate();
|
|
break;
|
|
case StreamOperationType.update:
|
|
await model.afterUpdate();
|
|
break;
|
|
case StreamOperationType.delete:
|
|
await model.afterDelete();
|
|
break;
|
|
default:
|
|
throw `[ASYNC - STREAM] ERROR - No operation ${streamData.operation} for this ${streamData.collection}`;
|
|
}
|
|
}
|
|
await (await new ChangeStreamModel().db()).updateOne({ _id: streamId }, { $set: { hookStatus: PostHookStatus.completed } });
|
|
console.error('[ASYNC - STREAM] COMPLETE', { _id: streamId });
|
|
} catch (error) {
|
|
console.error('[ASYNC - STREAM] ERROR', error);
|
|
await (await new ChangeStreamModel().db()).updateOne({ _id: streamId }, { $set: { hookStatus: PostHookStatus.error } });
|
|
}
|
|
} else {
|
|
console.error('[ASYNC - STREAM] ERROR', 'No new stream found', streamId);
|
|
}
|
|
}
|