Path: blob/master/node_modules/@adiwajshing/baileys/lib/Utils/event-buffer.js
1129 views
"use strict";1var __importDefault = (this && this.__importDefault) || function (mod) {2return (mod && mod.__esModule) ? mod : { "default": mod };3};4Object.defineProperty(exports, "__esModule", { value: true });5exports.makeEventBuffer = void 0;6const events_1 = __importDefault(require("events"));7const Types_1 = require("../Types");8const messages_1 = require("./messages");9const process_message_1 = require("./process-message");10const BUFFERABLE_EVENT = [11'chats.upsert',12'chats.update',13'chats.delete',14'contacts.upsert',15'contacts.update',16'messages.upsert',17'messages.update',18'messages.delete',19'messages.reaction',20'message-receipt.update',21'groups.update',22];23const BUFFERABLE_EVENT_SET = new Set(BUFFERABLE_EVENT);24/**25* The event buffer logically consolidates different events into a single event26* making the data processing more efficient.27* @param ev the baileys event emitter28*/29const makeEventBuffer = (logger) => {30const ev = new events_1.default();31let data = makeBufferData();32let isBuffering = false;33let preBufferTask = Promise.resolve();34// take the generic event and fire it as a baileys event35ev.on('event', (map) => {36for (const event in map) {37ev.emit(event, map[event]);38}39});40function buffer() {41if (!isBuffering) {42logger.trace('buffering events');43isBuffering = true;44return true;45}46return false;47}48async function flush() {49if (!isBuffering) {50return;51}52logger.trace('releasing buffered events...');53await preBufferTask;54isBuffering = false;55const consolidatedData = consolidateEvents(data);56if (Object.keys(consolidatedData).length) {57ev.emit('event', consolidatedData);58}59data = makeBufferData();60logger.trace('released buffered events');61}62return {63process(handler) {64const listener = (map) => {65handler(map);66};67ev.on('event', listener);68return () => {69ev.off('event', listener);70};71},72emit(event, evData) {73if (isBuffering && BUFFERABLE_EVENT_SET.has(event)) {74append(data, event, evData, logger);75return true;76}77return ev.emit('event', { [event]: evData });78},79processInBuffer(task) {80if (isBuffering) {81preBufferTask = Promise.allSettled([preBufferTask, task]);82}83},84buffer,85flush,86createBufferedFunction(work) {87return async (...args) => {88const started = buffer();89try {90const result = await work(...args);91return result;92}93finally {94if (started) {95await flush();96}97}98};99},100on: (...args) => ev.on(...args),101off: (...args) => ev.off(...args),102removeAllListeners: (...args) => ev.removeAllListeners(...args),103};104};105exports.makeEventBuffer = makeEventBuffer;106const makeBufferData = () => {107return {108chatUpserts: {},109chatUpdates: {},110chatDeletes: new Set(),111contactUpserts: {},112contactUpdates: {},113messageUpserts: {},114messageUpdates: {},115messageReactions: {},116messageDeletes: {},117messageReceipts: {},118groupUpdates: {}119};120};121function append(data, event, eventData, logger) {122switch (event) {123case 'chats.upsert':124for (const chat of eventData) {125let upsert = data.chatUpserts[chat.id] || {};126upsert = concatChats(upsert, chat);127if (data.chatUpdates[chat.id]) {128logger.debug({ chatId: chat.id }, 'absorbed chat update in chat upsert');129upsert = concatChats(data.chatUpdates[chat.id], upsert);130delete data.chatUpdates[chat.id];131}132if (data.chatDeletes.has(chat.id)) {133data.chatDeletes.delete(chat.id);134}135data.chatUpserts[chat.id] = upsert;136}137break;138case 'chats.update':139for (const update of eventData) {140const chatId = update.id;141// if there is an existing upsert, merge the update into it142const upsert = data.chatUpserts[chatId];143if (upsert) {144concatChats(upsert, update);145}146else {147// merge the update into the existing update148const chatUpdate = data.chatUpdates[chatId] || {};149data.chatUpdates[chatId] = concatChats(chatUpdate, update);150}151// if the chat has been updated152// ignore any existing chat delete153if (data.chatDeletes.has(chatId)) {154data.chatDeletes.delete(chatId);155}156}157break;158case 'chats.delete':159for (const chatId of eventData) {160data.chatDeletes.add(chatId);161// remove any prior updates & upserts162if (data.chatUpdates[chatId]) {163delete data.chatUpdates[chatId];164}165if (data.chatUpserts[chatId]) {166delete data.chatUpserts[chatId];167}168}169break;170case 'contacts.upsert':171for (const contact of eventData) {172let upsert = data.contactUpserts[contact.id] || {};173upsert = Object.assign(upsert, contact);174if (data.contactUpdates[contact.id]) {175upsert = Object.assign(data.contactUpdates[contact.id], upsert);176delete data.contactUpdates[contact.id];177}178data.contactUpserts[contact.id] = upsert;179}180break;181case 'contacts.update':182const contactUpdates = eventData;183for (const update of contactUpdates) {184const id = update.id;185// merge into prior upsert186const upsert = data.contactUpserts[update.id];187if (upsert) {188Object.assign(upsert, update);189}190else {191// merge into prior update192const contactUpdate = data.contactUpdates[id] || {};193data.contactUpdates[id] = Object.assign(contactUpdate, update);194}195}196break;197case 'messages.upsert':198const { messages, type } = eventData;199for (const message of messages) {200const key = stringifyMessageKey(message.key);201const existing = data.messageUpserts[key];202if (existing) {203message.messageTimestamp = existing.message.messageTimestamp;204}205if (data.messageUpdates[key]) {206logger.debug('absorbed prior message update in message upsert');207Object.assign(message, data.messageUpdates[key].update);208delete data.messageUpdates[key];209}210data.messageUpserts[key] = {211message,212type: type === 'notify' || (existing === null || existing === void 0 ? void 0 : existing.type) === 'notify'213? 'notify'214: type215};216}217break;218case 'messages.update':219const msgUpdates = eventData;220for (const { key, update } of msgUpdates) {221const keyStr = stringifyMessageKey(key);222const existing = data.messageUpserts[keyStr];223if (existing) {224Object.assign(existing.message, update);225// if the message was received & read by us226// the chat counter must have been incremented227// so we need to decrement it228if (update.status === Types_1.WAMessageStatus.READ && !key.fromMe) {229decrementChatReadCounterIfMsgDidUnread(existing.message);230}231}232else {233const msgUpdate = data.messageUpdates[keyStr] || { key, update: {} };234Object.assign(msgUpdate.update, update);235data.messageUpdates[keyStr] = msgUpdate;236}237}238break;239case 'messages.delete':240const deleteData = eventData;241if ('keys' in deleteData) {242const { keys } = deleteData;243for (const key of keys) {244const keyStr = stringifyMessageKey(key);245data.messageDeletes[keyStr] = key;246if (data.messageUpserts[keyStr]) {247delete data.messageUpserts[keyStr];248}249if (data.messageUpdates[keyStr]) {250delete data.messageUpdates[keyStr];251}252}253}254else {255// TODO: add support256}257break;258case 'messages.reaction':259const reactions = eventData;260for (const { key, reaction } of reactions) {261const keyStr = stringifyMessageKey(key);262const existing = data.messageUpserts[keyStr];263if (existing) {264(0, messages_1.updateMessageWithReaction)(existing.message, reaction);265}266else {267data.messageReactions[keyStr] = data.messageReactions[keyStr]268|| { key, reactions: [] };269(0, messages_1.updateMessageWithReaction)(data.messageReactions[keyStr], reaction);270}271}272break;273case 'message-receipt.update':274const receipts = eventData;275for (const { key, receipt } of receipts) {276const keyStr = stringifyMessageKey(key);277const existing = data.messageUpserts[keyStr];278if (existing) {279(0, messages_1.updateMessageWithReceipt)(existing.message, receipt);280}281else {282data.messageReceipts[keyStr] = data.messageReceipts[keyStr]283|| { key, userReceipt: [] };284(0, messages_1.updateMessageWithReceipt)(data.messageReceipts[keyStr], receipt);285}286}287break;288case 'groups.update':289const groupUpdates = eventData;290for (const update of groupUpdates) {291const id = update.id;292const groupUpdate = data.groupUpdates[id] || {};293data.groupUpdates[id] = Object.assign(groupUpdate, update);294}295break;296default:297throw new Error(`"${event}" cannot be buffered`);298}299function decrementChatReadCounterIfMsgDidUnread(message) {300// decrement chat unread counter301// if the message has already been marked read by us302const chatId = message.key.remoteJid;303const chat = data.chatUpdates[chatId] || data.chatUpserts[chatId];304if ((0, process_message_1.isRealMessage)(message)305&& (0, process_message_1.shouldIncrementChatUnread)(message)306&& typeof (chat === null || chat === void 0 ? void 0 : chat.unreadCount) === 'number'307&& chat.unreadCount > 0) {308logger.debug({ chatId: chat.id }, 'decrementing chat counter');309chat.unreadCount -= 1;310if (chat.unreadCount === 0) {311delete chat.unreadCount;312}313}314}315}316function consolidateEvents(data) {317const map = {};318const chatUpsertList = Object.values(data.chatUpserts);319if (chatUpsertList.length) {320map['chats.upsert'] = chatUpsertList;321}322const chatUpdateList = Object.values(data.chatUpdates);323if (chatUpdateList.length) {324map['chats.update'] = chatUpdateList;325}326const chatDeleteList = Array.from(data.chatDeletes);327if (chatDeleteList.length) {328map['chats.delete'] = chatDeleteList;329}330const messageUpsertList = Object.values(data.messageUpserts);331if (messageUpsertList.length) {332const type = messageUpsertList[0].type;333map['messages.upsert'] = {334messages: messageUpsertList.map(m => m.message),335type336};337}338const messageUpdateList = Object.values(data.messageUpdates);339if (messageUpdateList.length) {340map['messages.update'] = messageUpdateList;341}342const messageDeleteList = Object.values(data.messageDeletes);343if (messageDeleteList.length) {344map['messages.delete'] = { keys: messageDeleteList };345}346const messageReactionList = Object.values(data.messageReactions).flatMap(({ key, reactions }) => reactions.flatMap(reaction => ({ key, reaction })));347if (messageReactionList.length) {348map['messages.reaction'] = messageReactionList;349}350const messageReceiptList = Object.values(data.messageReceipts).flatMap(({ key, userReceipt }) => userReceipt.flatMap(receipt => ({ key, receipt })));351if (messageReceiptList.length) {352map['message-receipt.update'] = messageReceiptList;353}354const contactUpsertList = Object.values(data.contactUpserts);355if (contactUpsertList.length) {356map['contacts.upsert'] = contactUpsertList;357}358const contactUpdateList = Object.values(data.contactUpdates);359if (contactUpdateList.length) {360map['contacts.update'] = contactUpdateList;361}362const groupUpdateList = Object.values(data.groupUpdates);363if (groupUpdateList.length) {364map['groups.update'] = groupUpdateList;365}366return map;367}368function concatChats(a, b) {369if (b.unreadCount === null) {370// neutralize unread counter371if (a.unreadCount < 0) {372a.unreadCount = undefined;373b.unreadCount = undefined;374}375}376if (typeof a.unreadCount === 'number' && typeof b.unreadCount === 'number') {377b = { ...b };378if (b.unreadCount >= 0) {379b.unreadCount = Math.max(b.unreadCount, 0) + Math.max(a.unreadCount, 0);380}381}382return Object.assign(a, b);383}384const stringifyMessageKey = (key) => `${key.remoteJid},${key.id},${key.fromMe ? '1' : '0'}`;385386387