Initial commit: Telegram Management System
Some checks failed
Deploy / deploy (push) Has been cancelled

Full-stack web application for Telegram management
- Frontend: Vue 3 + Vben Admin
- Backend: NestJS
- Features: User management, group broadcast, statistics

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
你的用户名
2025-11-04 15:37:50 +08:00
commit 237c7802e5
3674 changed files with 525172 additions and 0 deletions

View File

@@ -0,0 +1,24 @@
{
"name": "workflow-service",
"version": "1.0.0",
"type": "module",
"description": "Workflow automation service for marketing campaigns",
"main": "src/index.js",
"scripts": {
"start": "node src/index.js",
"dev": "nodemon src/index.js"
},
"dependencies": {
"express": "^4.18.2",
"cors": "^2.8.5",
"mongoose": "^7.6.3",
"socket.io": "^4.6.1",
"axios": "^1.5.1",
"joi": "^17.11.0",
"winston": "^3.11.0",
"node-cron": "^3.0.3"
},
"devDependencies": {
"nodemon": "^3.0.1"
}
}

View File

@@ -0,0 +1,27 @@
export default {
port: process.env.PORT || 3008,
mongodb: {
uri: process.env.MONGODB_URI || 'mongodb://localhost:27017/marketing-workflow',
options: {
useNewUrlParser: true,
useUnifiedTopology: true
}
},
cors: {
origin: process.env.CORS_ORIGIN?.split(',') || ['http://localhost:5173', 'http://localhost:3000'],
credentials: true
},
services: {
messagingService: process.env.MESSAGING_SERVICE_URL || 'http://localhost:3005',
userService: process.env.USER_SERVICE_URL || 'http://localhost:3003',
analyticsService: process.env.ANALYTICS_SERVICE_URL || 'http://localhost:3006',
abTestingService: process.env.AB_TESTING_SERVICE_URL || 'http://localhost:3007'
},
logging: {
level: process.env.LOG_LEVEL || 'info'
}
};

View File

@@ -0,0 +1,94 @@
import express from 'express';
import cors from 'cors';
import mongoose from 'mongoose';
import { createServer } from 'http';
import { Server } from 'socket.io';
import config from './config/index.js';
import routes from './routes/index.js';
import errorHandler from './middleware/errorHandler.js';
import { logger } from './utils/logger.js';
import { WorkflowEngine } from './services/workflowEngine.js';
const app = express();
const httpServer = createServer(app);
const io = new Server(httpServer, {
cors: {
origin: config.cors.origin,
credentials: true
}
});
// Initialize workflow engine
let workflowEngine;
// Middleware
app.use(cors(config.cors));
app.use(express.json());
app.use(express.urlencoded({ extended: true }));
// Health check
app.get('/health', (req, res) => {
res.json({
status: 'ok',
service: 'workflow-service',
timestamp: new Date().toISOString(),
workflowEngine: workflowEngine ? 'initialized' : 'not initialized'
});
});
// Routes
app.use('/api', routes);
// Error handling
app.use(errorHandler);
// Socket.io for real-time updates
io.on('connection', (socket) => {
logger.info('Client connected:', socket.id);
socket.on('join-workflow', (workflowId) => {
socket.join(`workflow:${workflowId}`);
logger.info(`Socket ${socket.id} joined workflow:${workflowId}`);
});
socket.on('disconnect', () => {
logger.info('Client disconnected:', socket.id);
});
});
// Make io and workflowEngine accessible in routes
app.set('io', io);
app.set('workflowEngine', workflowEngine);
// Database connection
mongoose.connect(config.mongodb.uri, config.mongodb.options)
.then(async () => {
logger.info('Connected to MongoDB');
// Initialize workflow engine after DB connection
workflowEngine = new WorkflowEngine(io);
await workflowEngine.initialize();
app.set('workflowEngine', workflowEngine);
logger.info('Workflow engine initialized');
httpServer.listen(config.port, () => {
logger.info(`Workflow service listening on port ${config.port}`);
});
})
.catch((error) => {
logger.error('MongoDB connection error:', error);
process.exit(1);
});
// Graceful shutdown
process.on('SIGTERM', async () => {
logger.info('SIGTERM received, shutting down gracefully');
if (workflowEngine) {
await workflowEngine.shutdown();
}
await mongoose.connection.close();
httpServer.close(() => {
logger.info('HTTP server closed');
process.exit(0);
});
});

View File

@@ -0,0 +1,41 @@
import { logger } from '../utils/logger.js';
export default function errorHandler(err, req, res, next) {
logger.error('Error:', err);
// Mongoose validation error
if (err.name === 'ValidationError') {
const errors = Object.values(err.errors).map(e => e.message);
return res.status(400).json({
error: 'Validation failed',
details: errors
});
}
// Mongoose duplicate key error
if (err.code === 11000) {
const field = Object.keys(err.keyPattern)[0];
return res.status(409).json({
error: `Duplicate value for field: ${field}`
});
}
// JWT errors
if (err.name === 'JsonWebTokenError') {
return res.status(401).json({
error: 'Invalid token'
});
}
if (err.name === 'TokenExpiredError') {
return res.status(401).json({
error: 'Token expired'
});
}
// Default error
res.status(err.status || 500).json({
error: err.message || 'Internal server error',
...(process.env.NODE_ENV === 'development' && { stack: err.stack })
});
}

View File

@@ -0,0 +1,19 @@
export function validateRequest(schema) {
return (req, res, next) => {
const { error } = schema.validate(req.body, { abortEarly: false });
if (error) {
const errors = error.details.map(detail => ({
field: detail.path.join('.'),
message: detail.message
}));
return res.status(400).json({
error: 'Validation failed',
details: errors
});
}
next();
};
}

View File

@@ -0,0 +1,238 @@
import mongoose from 'mongoose';
const nodeSchema = new mongoose.Schema({
// Multi-tenant support
tenantId: {
type: mongoose.Schema.Types.ObjectId,
ref: 'Tenant',
required: true,
index: true
},
nodeId: {
type: String,
required: true
},
type: {
type: String,
required: true,
enum: [
'send_message',
'delay',
'condition',
'segment_filter',
'ab_test',
'webhook',
'update_contact',
'add_tag',
'remove_tag',
'wait_for_response',
'send_notification',
'analytics_event'
]
},
name: {
type: String,
required: true
},
description: String,
config: {
type: mongoose.Schema.Types.Mixed,
required: true
},
position: {
x: Number,
y: Number
},
nextNodeId: String,
// For condition nodes
onTrue: String,
onFalse: String,
// For wait nodes
onTimeout: String
});
const triggerSchema = new mongoose.Schema({
type: {
type: String,
required: true,
enum: [
'manual',
'scheduled',
'message_received',
'contact_created',
'contact_updated',
'tag_added',
'tag_removed',
'campaign_completed',
'webhook',
'form_submitted'
]
},
config: {
type: mongoose.Schema.Types.Mixed
},
// For scheduled triggers
schedule: String, // Cron expression
// For event-based triggers
conditions: [{
field: String,
operator: String,
value: mongoose.Schema.Types.Mixed
}]
});
const workflowSchema = new mongoose.Schema({
accountId: {
type: String,
required: true,
index: true
},
name: {
type: String,
required: true
},
description: String,
category: {
type: String,
enum: ['welcome', 'nurture', 'conversion', 'retention', 'custom'],
default: 'custom'
},
status: {
type: String,
enum: ['draft', 'active', 'paused', 'archived'],
default: 'draft'
},
trigger: {
type: triggerSchema,
required: true
},
nodes: [nodeSchema],
variables: {
type: Map,
of: mongoose.Schema.Types.Mixed,
default: {}
},
settings: {
allowConcurrent: {
type: Boolean,
default: true
},
maxConcurrentInstances: {
type: Number,
default: 0 // 0 means unlimited
},
contactFrequencyLimit: {
enabled: {
type: Boolean,
default: false
},
maxPerDay: Number,
maxPerWeek: Number
},
errorHandling: {
retryOnError: {
type: Boolean,
default: true
},
maxRetries: {
type: Number,
default: 3
},
notifyOnError: {
type: Boolean,
default: true
}
}
},
metadata: {
totalExecutions: {
type: Number,
default: 0
},
successfulExecutions: {
type: Number,
default: 0
},
failedExecutions: {
type: Number,
default: 0
},
lastExecutedAt: Date,
averageExecutionTime: Number
},
createdBy: String,
updatedBy: String
}, {
timestamps: true
});
// Indexes
workflowSchema.index({ accountId: 1, status: 1 });
workflowSchema.index({ accountId: 1, category: 1 });
workflowSchema.index({ 'trigger.type': 1, status: 1 });
// Multi-tenant indexes
workflowSchema.index({ tenantId: 1, accountId: 1, status: 1 });
workflowSchema.index({ tenantId: 1, accountId: 1, category: 1 });
workflowSchema.index({ tenantId: 1, 'trigger.type': 1, status: 1 });
// Methods
workflowSchema.methods.activate = function() {
this.status = 'active';
return this.save();
};
workflowSchema.methods.pause = function() {
this.status = 'paused';
return this.save();
};
workflowSchema.methods.archive = function() {
this.status = 'archived';
return this.save();
};
workflowSchema.methods.updateMetadata = function(execution) {
this.metadata.totalExecutions += 1;
if (execution.status === 'completed') {
this.metadata.successfulExecutions += 1;
} else if (execution.status === 'failed') {
this.metadata.failedExecutions += 1;
}
this.metadata.lastExecutedAt = new Date();
// Update average execution time
if (execution.duration) {
const prevAvg = this.metadata.averageExecutionTime || 0;
const prevTotal = this.metadata.totalExecutions - 1;
this.metadata.averageExecutionTime =
(prevAvg * prevTotal + execution.duration) / this.metadata.totalExecutions;
}
};
// Validations
workflowSchema.pre('save', function(next) {
// Validate that nodes are properly connected
const nodeIds = new Set(this.nodes.map(n => n.nodeId));
for (const node of this.nodes) {
// Check next node exists
if (node.nextNodeId && !nodeIds.has(node.nextNodeId)) {
return next(new Error(`Node ${node.nodeId} references non-existent next node: ${node.nextNodeId}`));
}
// Check condition branches
if (node.type === 'condition') {
if (node.onTrue && !nodeIds.has(node.onTrue)) {
return next(new Error(`Condition node ${node.nodeId} references non-existent true branch: ${node.onTrue}`));
}
if (node.onFalse && !nodeIds.has(node.onFalse)) {
return next(new Error(`Condition node ${node.nodeId} references non-existent false branch: ${node.onFalse}`));
}
}
}
next();
});
export const Workflow = mongoose.model('Workflow', workflowSchema);

View File

@@ -0,0 +1,228 @@
import mongoose from 'mongoose';
const executedNodeSchema = new mongoose.Schema({
// Multi-tenant support
tenantId: {
type: mongoose.Schema.Types.ObjectId,
ref: 'Tenant',
required: true,
index: true
},
nodeId: {
type: String,
required: true
},
executedAt: {
type: Date,
required: true
},
result: {
type: String,
enum: ['pending', 'success', 'failed', 'skipped'],
default: 'pending'
},
output: mongoose.Schema.Types.Mixed,
error: String,
duration: Number // in milliseconds
});
const workflowInstanceSchema = new mongoose.Schema({
workflowId: {
type: mongoose.Schema.Types.ObjectId,
ref: 'Workflow',
required: true,
index: true
},
accountId: {
type: String,
required: true,
index: true
},
status: {
type: String,
enum: ['running', 'waiting', 'waiting_for_response', 'paused', 'completed', 'failed', 'cancelled'],
default: 'running'
},
triggerData: {
type: mongoose.Schema.Types.Mixed,
default: {}
},
currentNodeId: String,
executedNodes: [executedNodeSchema],
variables: {
type: Map,
of: mongoose.Schema.Types.Mixed,
default: {}
},
// For waiting states
waitingUntil: Date,
waitingFor: {
type: {
type: String
},
timeout: Date,
nodeId: String
},
// Pause/resume tracking
isPaused: {
type: Boolean,
default: false
},
pausedAt: Date,
resumedAt: Date,
// Execution metadata
startedAt: {
type: Date,
required: true
},
completedAt: Date,
failedAt: Date,
error: String,
retryCount: {
type: Number,
default: 0
},
// Contact tracking (for frequency limiting)
targetContacts: [{
contactId: String,
processedAt: Date
}]
}, {
timestamps: true
});
// Indexes
workflowInstanceSchema.index({ accountId: 1, status: 1 });
workflowInstanceSchema.index({ workflowId: 1, status: 1 });
workflowInstanceSchema.index({ status: 1, waitingUntil: 1 });
workflowInstanceSchema.index({ createdAt: -1 });
// Multi-tenant indexes
workflowInstanceSchema.index({ tenantId: 1, accountId: 1, status: 1 });
workflowInstanceSchema.index({ tenantId: 1, workflowId: 1, status: 1 });
workflowInstanceSchema.index({ tenantId: 1, status: 1, waitingUntil: 1 });
workflowInstanceSchema.index({ tenantId: 1, createdAt: -1 });
// Virtual properties
workflowInstanceSchema.virtual('duration').get(function() {
if (!this.startedAt) return null;
const endTime = this.completedAt || this.failedAt || new Date();
return endTime - this.startedAt;
});
workflowInstanceSchema.virtual('isActive').get(function() {
return ['running', 'waiting', 'waiting_for_response'].includes(this.status);
});
// Methods
workflowInstanceSchema.methods.getVariable = function(key) {
return this.variables.get(key);
};
workflowInstanceSchema.methods.setVariable = function(key, value) {
this.variables.set(key, value);
return this.save();
};
workflowInstanceSchema.methods.updateVariables = function(updates) {
for (const [key, value] of Object.entries(updates)) {
this.variables.set(key, value);
}
return this.save();
};
workflowInstanceSchema.methods.addExecutedNode = function(nodeId, result, output) {
const startTime = Date.now();
const lastNode = this.executedNodes[this.executedNodes.length - 1];
this.executedNodes.push({
nodeId,
executedAt: new Date(),
result: result || 'pending',
output,
duration: lastNode ? startTime - lastNode.executedAt.getTime() : 0
});
return this.save();
};
workflowInstanceSchema.methods.cancel = function() {
this.status = 'cancelled';
this.completedAt = new Date();
return this.save();
};
workflowInstanceSchema.methods.fail = function(error) {
this.status = 'failed';
this.error = error;
this.failedAt = new Date();
return this.save();
};
workflowInstanceSchema.methods.complete = function() {
this.status = 'completed';
this.completedAt = new Date();
return this.save();
};
workflowInstanceSchema.methods.pause = function() {
if (this.status === 'running' || this.status === 'waiting') {
this.status = 'paused';
this.isPaused = true;
this.pausedAt = new Date();
return this.save();
}
throw new Error('Can only pause running or waiting instances');
};
workflowInstanceSchema.methods.resume = function() {
if (this.status === 'paused') {
this.status = 'running';
this.isPaused = false;
this.resumedAt = new Date();
return this.save();
}
throw new Error('Can only resume paused instances');
};
// Static methods
workflowInstanceSchema.statics.findActive = function(accountId) {
return this.find({
accountId,
status: { $in: ['running', 'waiting', 'waiting_for_response'] }
});
};
workflowInstanceSchema.statics.findByWorkflow = function(workflowId, options = {}) {
const query = { workflowId };
if (options.status) {
query.status = options.status;
}
return this.find(query).sort({ createdAt: -1 });
};
workflowInstanceSchema.statics.getStatsByWorkflow = async function(workflowId) {
return this.aggregate([
{ $match: { workflowId: mongoose.Types.ObjectId(workflowId) } },
{
$group: {
_id: '$status',
count: { $sum: 1 },
avgDuration: {
$avg: {
$cond: [
{ $in: ['$status', ['completed', 'failed']] },
{ $subtract: [
{ $ifNull: ['$completedAt', '$failedAt'] },
'$startedAt'
] },
null
]
}
}
}
}
]);
};
export const WorkflowInstance = mongoose.model('WorkflowInstance', workflowInstanceSchema);

View File

@@ -0,0 +1,51 @@
import mongoose from 'mongoose';
const workflowLogSchema = new mongoose.Schema({
// Multi-tenant support
tenantId: {
type: mongoose.Schema.Types.ObjectId,
ref: 'Tenant',
required: true,
index: true
},
instanceId: {
type: mongoose.Schema.Types.ObjectId,
ref: 'WorkflowInstance',
required: true,
index: true
},
nodeId: {
type: String,
required: true
},
status: {
type: String,
required: true,
enum: ['started', 'completed', 'failed', 'skipped', 'retrying']
},
details: {
type: mongoose.Schema.Types.Mixed,
default: {}
},
timestamp: {
type: Date,
required: true,
default: Date.now
}
}, {
timestamps: false
});
// Indexes
workflowLogSchema.index({ instanceId: 1, timestamp: -1 });
workflowLogSchema.index({ timestamp: -1 });
// TTL index to automatically remove old logs after 90 days
workflowLogSchema.index({ timestamp: 1 }, { expireAfterSeconds: 90 * 24 * 60 * 60 });
// Multi-tenant indexes
workflowLogSchema.index({ tenantId: 1, instanceId: 1, timestamp: -1 });
workflowLogSchema.index({ tenantId: 1, timestamp: -1 });
workflowLogSchema.index({ tenantId: 1, timestamp: 1 }, { expireAfterSeconds: 90 * 24 * 60 * 60 });
export const WorkflowLog = mongoose.model('WorkflowLog', workflowLogSchema);

View File

@@ -0,0 +1,43 @@
import express from 'express';
import workflowRoutes from './workflows.js';
import instanceRoutes from './instances.js';
const router = express.Router();
router.use('/workflows', workflowRoutes);
router.use('/instances', instanceRoutes);
// Trigger endpoints for external systems
router.post('/trigger/message-received', async (req, res, next) => {
try {
const { accountId, message } = req.body;
const workflowEngine = req.app.get('workflowEngine');
if (!workflowEngine) {
return res.status(503).json({ error: 'Workflow engine not available' });
}
await workflowEngine.handleMessageReceived(accountId, message);
res.json({ message: 'Trigger processed' });
} catch (error) {
next(error);
}
});
router.post('/trigger/contact-created', async (req, res, next) => {
try {
const { accountId, contact } = req.body;
const workflowEngine = req.app.get('workflowEngine');
if (!workflowEngine) {
return res.status(503).json({ error: 'Workflow engine not available' });
}
await workflowEngine.handleContactCreated(accountId, contact);
res.json({ message: 'Trigger processed' });
} catch (error) {
next(error);
}
});
export default router;

View File

@@ -0,0 +1,324 @@
import express from 'express';
import { WorkflowInstance } from '../models/WorkflowInstance.js';
import { WorkflowLog } from '../models/WorkflowLog.js';
import { validateRequest } from '../middleware/validateRequest.js';
import { logger } from '../utils/logger.js';
import Joi from 'joi';
const router = express.Router();
// Validation schemas
const updateVariablesSchema = Joi.object({
variables: Joi.object().required()
});
// Get all instances for an account
router.get('/account/:accountId', async (req, res, next) => {
try {
const { accountId } = req.params;
const { status, workflowId, page = 1, limit = 20 } = req.query;
const query = { accountId };
if (status) query.status = status;
if (workflowId) query.workflowId = workflowId;
const skip = (page - 1) * limit;
const [instances, total] = await Promise.all([
WorkflowInstance.find(query)
.populate('workflowId', 'name category')
.skip(skip)
.limit(parseInt(limit))
.sort({ createdAt: -1 }),
WorkflowInstance.countDocuments(query)
]);
res.json({
instances,
pagination: {
page: parseInt(page),
limit: parseInt(limit),
total,
pages: Math.ceil(total / limit)
}
});
} catch (error) {
next(error);
}
});
// Get instance by ID
router.get('/:id', async (req, res, next) => {
try {
const instance = await WorkflowInstance.findById(req.params.id)
.populate('workflowId');
if (!instance) {
return res.status(404).json({ error: 'Instance not found' });
}
// Get execution logs
const logs = await WorkflowLog.find({ instanceId: instance._id })
.sort({ timestamp: -1 })
.limit(100);
res.json({
...instance.toObject(),
logs
});
} catch (error) {
next(error);
}
});
// Pause instance
router.post('/:id/pause', async (req, res, next) => {
try {
const workflowEngine = req.app.get('workflowEngine');
if (!workflowEngine) {
return res.status(503).json({ error: 'Workflow engine not available' });
}
await workflowEngine.pauseInstance(req.params.id);
const instance = await WorkflowInstance.findById(req.params.id);
logger.info(`Paused workflow instance: ${req.params.id}`);
res.json({ message: 'Instance paused', instance });
} catch (error) {
next(error);
}
});
// Resume instance
router.post('/:id/resume', async (req, res, next) => {
try {
const workflowEngine = req.app.get('workflowEngine');
if (!workflowEngine) {
return res.status(503).json({ error: 'Workflow engine not available' });
}
await workflowEngine.resumeInstance(req.params.id);
const instance = await WorkflowInstance.findById(req.params.id);
logger.info(`Resumed workflow instance: ${req.params.id}`);
res.json({ message: 'Instance resumed', instance });
} catch (error) {
next(error);
}
});
// Cancel instance
router.post('/:id/cancel', async (req, res, next) => {
try {
const instance = await WorkflowInstance.findById(req.params.id);
if (!instance) {
return res.status(404).json({ error: 'Instance not found' });
}
if (!['running', 'waiting', 'waiting_for_response', 'paused'].includes(instance.status)) {
return res.status(400).json({
error: 'Cannot cancel instance in current status',
currentStatus: instance.status
});
}
await instance.cancel();
logger.info(`Cancelled workflow instance: ${req.params.id}`);
res.json({ message: 'Instance cancelled', instance });
} catch (error) {
next(error);
}
});
// Update instance variables
router.patch('/:id/variables', validateRequest(updateVariablesSchema), async (req, res, next) => {
try {
const instance = await WorkflowInstance.findById(req.params.id);
if (!instance) {
return res.status(404).json({ error: 'Instance not found' });
}
await instance.updateVariables(req.body.variables);
logger.info(`Updated variables for instance: ${req.params.id}`);
res.json({ message: 'Variables updated', variables: instance.variables });
} catch (error) {
next(error);
}
});
// Get instance logs
router.get('/:id/logs', async (req, res, next) => {
try {
const { nodeId, status, limit = 100 } = req.query;
const query = { instanceId: req.params.id };
if (nodeId) query.nodeId = nodeId;
if (status) query.status = status;
const logs = await WorkflowLog.find(query)
.sort({ timestamp: -1 })
.limit(parseInt(limit));
res.json(logs);
} catch (error) {
next(error);
}
});
// Retry failed instance
router.post('/:id/retry', async (req, res, next) => {
try {
const instance = await WorkflowInstance.findById(req.params.id)
.populate('workflowId');
if (!instance) {
return res.status(404).json({ error: 'Instance not found' });
}
if (instance.status !== 'failed') {
return res.status(400).json({
error: 'Can only retry failed instances',
currentStatus: instance.status
});
}
const workflowEngine = req.app.get('workflowEngine');
if (!workflowEngine) {
return res.status(503).json({ error: 'Workflow engine not available' });
}
// Reset instance for retry
instance.status = 'running';
instance.error = null;
instance.failedAt = null;
instance.retryCount += 1;
await instance.save();
// Continue from last executed node
const lastNode = instance.executedNodes[instance.executedNodes.length - 1];
if (lastNode) {
const workflow = instance.workflowId;
const node = workflow.nodes.find(n => n.nodeId === lastNode.nodeId);
if (node) {
await workflowEngine.executeNode(instance, node);
}
}
logger.info(`Retrying workflow instance: ${req.params.id}`);
res.json({ message: 'Instance retry started', instance });
} catch (error) {
next(error);
}
});
// Get instance statistics
router.get('/stats/overview', async (req, res, next) => {
try {
const { accountId, startDate, endDate } = req.query;
const match = {};
if (accountId) match.accountId = accountId;
if (startDate || endDate) {
match.createdAt = {};
if (startDate) match.createdAt.$gte = new Date(startDate);
if (endDate) match.createdAt.$lte = new Date(endDate);
}
const stats = await WorkflowInstance.aggregate([
{ $match: match },
{
$group: {
_id: '$status',
count: { $sum: 1 },
avgDuration: {
$avg: {
$cond: [
{ $in: ['$status', ['completed', 'failed']] },
{ $subtract: [
{ $ifNull: ['$completedAt', '$failedAt'] },
'$startedAt'
] },
null
]
}
}
}
},
{
$group: {
_id: null,
total: { $sum: '$count' },
byStatus: {
$push: {
status: '$_id',
count: '$count',
avgDuration: '$avgDuration'
}
}
}
}
]);
res.json(stats[0] || { total: 0, byStatus: [] });
} catch (error) {
next(error);
}
});
// Handle message response (for wait_for_response nodes)
router.post('/handle-response', async (req, res, next) => {
try {
const { accountId, contactId, message } = req.body;
// Find instances waiting for response from this contact
const waitingInstances = await WorkflowInstance.find({
accountId,
status: 'waiting_for_response',
'variables.contactId': contactId
});
const workflowEngine = req.app.get('workflowEngine');
if (!workflowEngine) {
return res.status(503).json({ error: 'Workflow engine not available' });
}
const processed = [];
for (const instance of waitingInstances) {
// Update instance with response
instance.variables.set('lastResponse', message);
instance.status = 'running';
instance.waitingFor = null;
await instance.save();
// Continue execution
const workflow = await instance.populate('workflowId');
const currentNode = workflow.workflowId.nodes.find(
n => n.nodeId === instance.currentNodeId
);
if (currentNode && currentNode.nextNodeId) {
const nextNode = workflow.workflowId.nodes.find(
n => n.nodeId === currentNode.nextNodeId
);
if (nextNode) {
await workflowEngine.executeNode(instance, nextNode);
}
}
processed.push(instance._id);
}
res.json({
message: 'Response handled',
processedInstances: processed
});
} catch (error) {
next(error);
}
});
export default router;

View File

@@ -0,0 +1,432 @@
import express from 'express';
import { Workflow } from '../models/Workflow.js';
import { WorkflowInstance } from '../models/WorkflowInstance.js';
import { WorkflowLog } from '../models/WorkflowLog.js';
import { validateRequest } from '../middleware/validateRequest.js';
import { logger } from '../utils/logger.js';
import Joi from 'joi';
const router = express.Router();
// Validation schemas
const createWorkflowSchema = Joi.object({
accountId: Joi.string().required(),
name: Joi.string().required(),
description: Joi.string().optional(),
category: Joi.string().valid('welcome', 'nurture', 'conversion', 'retention', 'custom').default('custom'),
trigger: Joi.object({
type: Joi.string().required(),
config: Joi.object().optional(),
schedule: Joi.string().optional(),
conditions: Joi.array().items(Joi.object({
field: Joi.string().required(),
operator: Joi.string().required(),
value: Joi.any().required()
})).optional()
}).required(),
nodes: Joi.array().items(Joi.object({
nodeId: Joi.string().required(),
type: Joi.string().required(),
name: Joi.string().required(),
description: Joi.string().optional(),
config: Joi.object().required(),
position: Joi.object({
x: Joi.number(),
y: Joi.number()
}).optional(),
nextNodeId: Joi.string().optional(),
onTrue: Joi.string().optional(),
onFalse: Joi.string().optional(),
onTimeout: Joi.string().optional()
})).required(),
variables: Joi.object().optional(),
settings: Joi.object().optional()
});
const updateWorkflowSchema = Joi.object({
name: Joi.string().optional(),
description: Joi.string().optional(),
category: Joi.string().valid('welcome', 'nurture', 'conversion', 'retention', 'custom').optional(),
trigger: Joi.object().optional(),
nodes: Joi.array().optional(),
variables: Joi.object().optional(),
settings: Joi.object().optional()
});
const executeWorkflowSchema = Joi.object({
triggerData: Joi.object().optional()
});
// Get all workflows for an account
router.get('/account/:accountId', async (req, res, next) => {
try {
const { accountId } = req.params;
const { status, category, page = 1, limit = 20 } = req.query;
const query = { accountId };
if (status) query.status = status;
if (category) query.category = category;
const skip = (page - 1) * limit;
const [workflows, total] = await Promise.all([
Workflow.find(query)
.skip(skip)
.limit(parseInt(limit))
.sort({ createdAt: -1 }),
Workflow.countDocuments(query)
]);
res.json({
workflows,
pagination: {
page: parseInt(page),
limit: parseInt(limit),
total,
pages: Math.ceil(total / limit)
}
});
} catch (error) {
next(error);
}
});
// Get workflow by ID
router.get('/:id', async (req, res, next) => {
try {
const workflow = await Workflow.findById(req.params.id);
if (!workflow) {
return res.status(404).json({ error: 'Workflow not found' });
}
// Get instance statistics
const stats = await WorkflowInstance.getStatsByWorkflow(workflow._id);
res.json({
...workflow.toObject(),
stats
});
} catch (error) {
next(error);
}
});
// Create new workflow
router.post('/', validateRequest(createWorkflowSchema), async (req, res, next) => {
try {
const workflow = new Workflow(req.body);
await workflow.save();
logger.info(`Created workflow: ${workflow.name} (${workflow._id})`);
res.status(201).json(workflow);
} catch (error) {
next(error);
}
});
// Update workflow
router.patch('/:id', validateRequest(updateWorkflowSchema), async (req, res, next) => {
try {
const workflow = await Workflow.findByIdAndUpdate(
req.params.id,
{ $set: req.body },
{ new: true, runValidators: true }
);
if (!workflow) {
return res.status(404).json({ error: 'Workflow not found' });
}
logger.info(`Updated workflow: ${workflow.name} (${workflow._id})`);
res.json(workflow);
} catch (error) {
next(error);
}
});
// Delete workflow
router.delete('/:id', async (req, res, next) => {
try {
const workflow = await Workflow.findById(req.params.id);
if (!workflow) {
return res.status(404).json({ error: 'Workflow not found' });
}
// Check for active instances
const activeInstances = await WorkflowInstance.countDocuments({
workflowId: workflow._id,
status: { $in: ['running', 'waiting', 'waiting_for_response'] }
});
if (activeInstances > 0) {
return res.status(400).json({
error: 'Cannot delete workflow with active instances',
activeInstances
});
}
await workflow.remove();
logger.info(`Deleted workflow: ${workflow.name} (${workflow._id})`);
res.json({ message: 'Workflow deleted successfully' });
} catch (error) {
next(error);
}
});
// Activate workflow
router.post('/:id/activate', async (req, res, next) => {
try {
const workflow = await Workflow.findById(req.params.id);
if (!workflow) {
return res.status(404).json({ error: 'Workflow not found' });
}
await workflow.activate();
// Schedule if it's a scheduled workflow
const workflowEngine = req.app.get('workflowEngine');
if (workflow.trigger.type === 'scheduled' && workflowEngine) {
workflowEngine.scheduleWorkflow(workflow);
}
logger.info(`Activated workflow: ${workflow.name} (${workflow._id})`);
res.json({ message: 'Workflow activated', workflow });
} catch (error) {
next(error);
}
});
// Pause workflow
router.post('/:id/pause', async (req, res, next) => {
try {
const workflow = await Workflow.findById(req.params.id);
if (!workflow) {
return res.status(404).json({ error: 'Workflow not found' });
}
await workflow.pause();
// Unschedule if it's a scheduled workflow
const workflowEngine = req.app.get('workflowEngine');
if (workflow.trigger.type === 'scheduled' && workflowEngine) {
workflowEngine.unscheduleWorkflow(workflow._id.toString());
}
logger.info(`Paused workflow: ${workflow.name} (${workflow._id})`);
res.json({ message: 'Workflow paused', workflow });
} catch (error) {
next(error);
}
});
// Archive workflow
router.post('/:id/archive', async (req, res, next) => {
try {
const workflow = await Workflow.findById(req.params.id);
if (!workflow) {
return res.status(404).json({ error: 'Workflow not found' });
}
await workflow.archive();
// Unschedule if it's a scheduled workflow
const workflowEngine = req.app.get('workflowEngine');
if (workflow.trigger.type === 'scheduled' && workflowEngine) {
workflowEngine.unscheduleWorkflow(workflow._id.toString());
}
logger.info(`Archived workflow: ${workflow.name} (${workflow._id})`);
res.json({ message: 'Workflow archived', workflow });
} catch (error) {
next(error);
}
});
// Execute workflow manually
router.post('/:id/execute', validateRequest(executeWorkflowSchema), async (req, res, next) => {
try {
const workflow = await Workflow.findById(req.params.id);
if (!workflow) {
return res.status(404).json({ error: 'Workflow not found' });
}
if (workflow.status !== 'active' && workflow.trigger.type !== 'manual') {
return res.status(400).json({
error: 'Workflow must be active or have manual trigger to execute'
});
}
const workflowEngine = req.app.get('workflowEngine');
if (!workflowEngine) {
return res.status(503).json({ error: 'Workflow engine not available' });
}
const instance = await workflowEngine.executeWorkflow(
workflow._id,
req.body.triggerData
);
logger.info(`Executed workflow: ${workflow.name} (${workflow._id}), instance: ${instance._id}`);
res.json({
message: 'Workflow execution started',
instanceId: instance._id
});
} catch (error) {
next(error);
}
});
// Get workflow instances
router.get('/:id/instances', async (req, res, next) => {
try {
const { status, page = 1, limit = 20 } = req.query;
const skip = (page - 1) * limit;
const options = {};
if (status) options.status = status;
const [instances, total] = await Promise.all([
WorkflowInstance.findByWorkflow(req.params.id, options)
.skip(skip)
.limit(parseInt(limit)),
WorkflowInstance.countDocuments({
workflowId: req.params.id,
...options
})
]);
res.json({
instances,
pagination: {
page: parseInt(page),
limit: parseInt(limit),
total,
pages: Math.ceil(total / limit)
}
});
} catch (error) {
next(error);
}
});
// Duplicate workflow
router.post('/:id/duplicate', async (req, res, next) => {
try {
const original = await Workflow.findById(req.params.id);
if (!original) {
return res.status(404).json({ error: 'Workflow not found' });
}
const duplicate = new Workflow({
...original.toObject(),
_id: undefined,
name: `${original.name} (Copy)`,
status: 'draft',
metadata: {
totalExecutions: 0,
successfulExecutions: 0,
failedExecutions: 0
},
createdAt: undefined,
updatedAt: undefined
});
await duplicate.save();
logger.info(`Duplicated workflow: ${original.name} -> ${duplicate.name}`);
res.status(201).json(duplicate);
} catch (error) {
next(error);
}
});
// Get workflow templates
router.get('/templates/list', async (req, res) => {
const templates = [
{
id: 'welcome-series',
name: 'Welcome Series',
description: 'Automated welcome messages for new contacts',
category: 'welcome',
trigger: {
type: 'contact_created'
},
nodes: [
{
nodeId: 'welcome-1',
type: 'send_message',
name: 'Send Welcome Message',
config: {
templateId: 'welcome-template',
personalization: {}
},
nextNodeId: 'wait-1'
},
{
nodeId: 'wait-1',
type: 'delay',
name: 'Wait 1 Day',
config: {
duration: 1,
unit: 'days'
},
nextNodeId: 'welcome-2'
},
{
nodeId: 'welcome-2',
type: 'send_message',
name: 'Send Follow-up',
config: {
templateId: 'welcome-followup-template',
personalization: {}
}
}
]
},
{
id: 'abandoned-cart',
name: 'Abandoned Cart Recovery',
description: 'Recover abandoned shopping carts',
category: 'conversion',
trigger: {
type: 'webhook',
conditions: [
{
field: 'event',
operator: 'equals',
value: 'cart_abandoned'
}
]
},
nodes: [
{
nodeId: 'wait-1',
type: 'delay',
name: 'Wait 1 Hour',
config: {
duration: 1,
unit: 'hours'
},
nextNodeId: 'reminder-1'
},
{
nodeId: 'reminder-1',
type: 'send_message',
name: 'Send Cart Reminder',
config: {
templateId: 'cart-reminder-template',
personalization: {
cartItems: '{{cart.items}}',
cartTotal: '{{cart.total}}'
}
}
}
]
}
];
res.json(templates);
});
export default router;

View File

@@ -0,0 +1,683 @@
import { EventEmitter } from 'events';
import cron from 'node-cron';
import { Workflow } from '../models/Workflow.js';
import { WorkflowInstance } from '../models/WorkflowInstance.js';
import { WorkflowLog } from '../models/WorkflowLog.js';
import { logger } from '../utils/logger.js';
import axios from 'axios';
import config from '../config/index.js';
export class WorkflowEngine extends EventEmitter {
constructor(io) {
super();
this.io = io;
this.activeInstances = new Map();
this.scheduledJobs = new Map();
this.nodeHandlers = this.initializeNodeHandlers();
}
async initialize() {
// Load active workflows
const activeWorkflows = await Workflow.find({ status: 'active' });
for (const workflow of activeWorkflows) {
if (workflow.trigger.type === 'scheduled') {
this.scheduleWorkflow(workflow);
}
}
// Resume paused instances
const pausedInstances = await WorkflowInstance.find({
status: 'running',
isPaused: true
});
logger.info(`Initialized workflow engine with ${activeWorkflows.length} active workflows`);
}
async shutdown() {
// Stop all scheduled jobs
for (const [workflowId, job] of this.scheduledJobs) {
job.stop();
}
this.scheduledJobs.clear();
// Pause all running instances
for (const [instanceId, instance] of this.activeInstances) {
await this.pauseInstance(instanceId);
}
logger.info('Workflow engine shut down');
}
initializeNodeHandlers() {
return {
'send_message': this.handleSendMessage.bind(this),
'delay': this.handleDelay.bind(this),
'condition': this.handleCondition.bind(this),
'segment_filter': this.handleSegmentFilter.bind(this),
'ab_test': this.handleABTest.bind(this),
'webhook': this.handleWebhook.bind(this),
'update_contact': this.handleUpdateContact.bind(this),
'add_tag': this.handleAddTag.bind(this),
'remove_tag': this.handleRemoveTag.bind(this),
'wait_for_response': this.handleWaitForResponse.bind(this),
'send_notification': this.handleSendNotification.bind(this),
'analytics_event': this.handleAnalyticsEvent.bind(this)
};
}
// Workflow execution methods
async executeWorkflow(workflowId, triggerData = {}) {
try {
const workflow = await Workflow.findById(workflowId);
if (!workflow || workflow.status !== 'active') {
throw new Error('Workflow not found or not active');
}
// Create new instance
const instance = new WorkflowInstance({
workflowId: workflow._id,
accountId: workflow.accountId,
status: 'running',
triggerData,
currentNodeId: workflow.nodes[0].nodeId,
variables: { ...workflow.variables, ...triggerData },
startedAt: new Date()
});
await instance.save();
this.activeInstances.set(instance._id.toString(), instance);
// Emit instance started event
this.emitInstanceUpdate(instance._id, 'started', {
workflowName: workflow.name
});
// Start execution
await this.executeNode(instance, workflow.nodes[0]);
return instance;
} catch (error) {
logger.error('Error executing workflow:', error);
throw error;
}
}
async executeNode(instance, node) {
try {
// Log node execution
await this.logNodeExecution(instance._id, node.nodeId, 'started');
// Update instance current node
instance.currentNodeId = node.nodeId;
instance.executedNodes.push({
nodeId: node.nodeId,
executedAt: new Date(),
result: 'pending'
});
await instance.save();
// Execute node based on type
const handler = this.nodeHandlers[node.type];
if (!handler) {
throw new Error(`Unknown node type: ${node.type}`);
}
const result = await handler(instance, node);
// Update node execution result
const executedNode = instance.executedNodes.find(n => n.nodeId === node.nodeId);
executedNode.result = result.success ? 'success' : 'failed';
executedNode.output = result.output;
await instance.save();
// Log completion
await this.logNodeExecution(instance._id, node.nodeId, 'completed', result);
// Determine next node
const nextNodeId = this.determineNextNode(node, result);
if (nextNodeId) {
const workflow = await Workflow.findById(instance.workflowId);
const nextNode = workflow.nodes.find(n => n.nodeId === nextNodeId);
if (nextNode) {
await this.executeNode(instance, nextNode);
}
} else {
// Workflow completed
await this.completeInstance(instance._id);
}
} catch (error) {
logger.error(`Error executing node ${node.nodeId}:`, error);
await this.failInstance(instance._id, error.message);
}
}
determineNextNode(node, result) {
if (node.type === 'condition') {
// For condition nodes, use the result to determine next node
return result.conditionMet ? node.onTrue : node.onFalse;
}
// For other nodes, use the default next node
return node.nextNodeId;
}
// Node handlers
async handleSendMessage(instance, node) {
try {
const { templateId, recipientQuery, personalization } = node.config;
// Get recipients based on query
const recipients = await this.getRecipients(instance.accountId, recipientQuery);
// Send messages
const response = await axios.post(
`${config.services.messagingService}/api/messages/batch`,
{
accountId: instance.accountId,
templateId,
recipients: recipients.map(r => r.telegramId),
variables: this.processVariables(personalization, instance.variables),
metadata: {
workflowId: instance.workflowId,
instanceId: instance._id,
nodeId: node.nodeId
}
}
);
return {
success: true,
output: {
messagesSent: response.data.sent,
messagesFailed: response.data.failed
}
};
} catch (error) {
logger.error('Error in handleSendMessage:', error);
return { success: false, output: { error: error.message } };
}
}
async handleDelay(instance, node) {
const { duration, unit } = node.config;
const delayMs = this.calculateDelay(duration, unit);
// Update instance to waiting state
instance.status = 'waiting';
instance.waitingUntil = new Date(Date.now() + delayMs);
await instance.save();
// Schedule resume
setTimeout(async () => {
instance.status = 'running';
instance.waitingUntil = null;
await instance.save();
}, delayMs);
return { success: true, output: { delayMs } };
}
async handleCondition(instance, node) {
const { field, operator, value } = node.config;
const fieldValue = this.getFieldValue(instance.variables, field);
const conditionMet = this.evaluateCondition(fieldValue, operator, value);
return {
success: true,
conditionMet,
output: { field, operator, value, fieldValue, conditionMet }
};
}
async handleSegmentFilter(instance, node) {
try {
const { segmentId, includeTags, excludeTags } = node.config;
// Get contacts matching criteria
const response = await axios.post(
`${config.services.userService}/api/contacts/filter`,
{
accountId: instance.accountId,
segmentId,
includeTags,
excludeTags
}
);
instance.variables.filteredContacts = response.data.contacts;
return {
success: true,
output: {
contactsFound: response.data.contacts.length
}
};
} catch (error) {
return { success: false, output: { error: error.message } };
}
}
async handleABTest(instance, node) {
try {
const { experimentId, allocation } = node.config;
// Allocate user to variant
const response = await axios.post(
`${config.services.abTestingService}/api/tracking/allocate`,
{
experimentId,
userId: instance.variables.userId || instance._id,
userContext: instance.variables
}
);
instance.variables.abTestVariant = response.data.variantId;
return {
success: true,
output: {
variantId: response.data.variantId,
allocation: response.data.allocation
}
};
} catch (error) {
return { success: false, output: { error: error.message } };
}
}
async handleWebhook(instance, node) {
try {
const { url, method, headers, body } = node.config;
const response = await axios({
method,
url,
headers: this.processVariables(headers, instance.variables),
data: this.processVariables(body, instance.variables)
});
return {
success: true,
output: {
status: response.status,
data: response.data
}
};
} catch (error) {
return { success: false, output: { error: error.message } };
}
}
async handleUpdateContact(instance, node) {
try {
const { contactId, updates } = node.config;
const processedUpdates = this.processVariables(updates, instance.variables);
const response = await axios.patch(
`${config.services.userService}/api/contacts/${contactId}`,
processedUpdates
);
return {
success: true,
output: {
updated: true,
contact: response.data
}
};
} catch (error) {
return { success: false, output: { error: error.message } };
}
}
async handleAddTag(instance, node) {
try {
const { contactIds, tags } = node.config;
const processedContactIds = this.processVariables(contactIds, instance.variables);
const response = await axios.post(
`${config.services.userService}/api/contacts/tags/add`,
{
contactIds: processedContactIds,
tags
}
);
return {
success: true,
output: {
updated: response.data.updated
}
};
} catch (error) {
return { success: false, output: { error: error.message } };
}
}
async handleRemoveTag(instance, node) {
try {
const { contactIds, tags } = node.config;
const processedContactIds = this.processVariables(contactIds, instance.variables);
const response = await axios.post(
`${config.services.userService}/api/contacts/tags/remove`,
{
contactIds: processedContactIds,
tags
}
);
return {
success: true,
output: {
updated: response.data.updated
}
};
} catch (error) {
return { success: false, output: { error: error.message } };
}
}
async handleWaitForResponse(instance, node) {
const { timeout, responseType } = node.config;
// Set instance to waiting for response
instance.status = 'waiting_for_response';
instance.waitingFor = {
type: responseType,
timeout: new Date(Date.now() + this.calculateDelay(timeout.duration, timeout.unit)),
nodeId: node.nodeId
};
await instance.save();
// Set timeout for auto-continue
setTimeout(async () => {
if (instance.status === 'waiting_for_response') {
instance.status = 'running';
instance.waitingFor = null;
await instance.save();
// Continue with timeout branch
const nextNodeId = node.onTimeout || node.nextNodeId;
if (nextNodeId) {
const workflow = await Workflow.findById(instance.workflowId);
const nextNode = workflow.nodes.find(n => n.nodeId === nextNodeId);
if (nextNode) {
await this.executeNode(instance, nextNode);
}
}
}
}, this.calculateDelay(timeout.duration, timeout.unit));
return { success: true, output: { waitingForResponse: true } };
}
async handleSendNotification(instance, node) {
try {
const { type, recipient, content } = node.config;
const processedContent = this.processVariables(content, instance.variables);
// Send internal notification
this.io.to(`user:${recipient}`).emit('notification', {
type,
content: processedContent,
timestamp: new Date(),
source: {
workflowId: instance.workflowId,
instanceId: instance._id
}
});
return {
success: true,
output: {
notificationSent: true,
recipient
}
};
} catch (error) {
return { success: false, output: { error: error.message } };
}
}
async handleAnalyticsEvent(instance, node) {
try {
const { eventName, properties } = node.config;
const processedProperties = this.processVariables(properties, instance.variables);
const response = await axios.post(
`${config.services.analyticsService}/api/events`,
{
accountId: instance.accountId,
eventName,
properties: processedProperties,
metadata: {
workflowId: instance.workflowId,
instanceId: instance._id,
nodeId: node.nodeId
}
}
);
return {
success: true,
output: {
eventRecorded: true,
eventId: response.data.eventId
}
};
} catch (error) {
return { success: false, output: { error: error.message } };
}
}
// Helper methods
async getRecipients(accountId, query) {
const response = await axios.post(
`${config.services.userService}/api/contacts/search`,
{
accountId,
...query
}
);
return response.data.contacts;
}
processVariables(template, variables) {
if (typeof template === 'string') {
return template.replace(/\{\{(\w+)\}\}/g, (match, key) => {
return variables[key] || match;
});
}
if (Array.isArray(template)) {
return template.map(item => this.processVariables(item, variables));
}
if (typeof template === 'object' && template !== null) {
const result = {};
for (const [key, value] of Object.entries(template)) {
result[key] = this.processVariables(value, variables);
}
return result;
}
return template;
}
calculateDelay(duration, unit) {
const multipliers = {
seconds: 1000,
minutes: 60 * 1000,
hours: 60 * 60 * 1000,
days: 24 * 60 * 60 * 1000
};
return duration * (multipliers[unit] || 1000);
}
getFieldValue(object, field) {
const parts = field.split('.');
let value = object;
for (const part of parts) {
value = value?.[part];
}
return value;
}
evaluateCondition(fieldValue, operator, value) {
switch (operator) {
case 'equals':
return fieldValue === value;
case 'not_equals':
return fieldValue !== value;
case 'contains':
return String(fieldValue).includes(value);
case 'not_contains':
return !String(fieldValue).includes(value);
case 'greater_than':
return Number(fieldValue) > Number(value);
case 'less_than':
return Number(fieldValue) < Number(value);
case 'is_empty':
return !fieldValue || fieldValue.length === 0;
case 'is_not_empty':
return fieldValue && fieldValue.length > 0;
default:
return false;
}
}
// Scheduling methods
scheduleWorkflow(workflow) {
if (workflow.trigger.type !== 'scheduled') return;
const { schedule } = workflow.trigger;
const job = cron.schedule(schedule, async () => {
logger.info(`Executing scheduled workflow: ${workflow.name}`);
await this.executeWorkflow(workflow._id);
});
this.scheduledJobs.set(workflow._id.toString(), job);
}
unscheduleWorkflow(workflowId) {
const job = this.scheduledJobs.get(workflowId);
if (job) {
job.stop();
this.scheduledJobs.delete(workflowId);
}
}
// Instance management
async pauseInstance(instanceId) {
const instance = await WorkflowInstance.findById(instanceId);
if (instance && instance.status === 'running') {
instance.status = 'paused';
instance.isPaused = true;
instance.pausedAt = new Date();
await instance.save();
this.emitInstanceUpdate(instanceId, 'paused');
}
}
async resumeInstance(instanceId) {
const instance = await WorkflowInstance.findById(instanceId);
if (instance && instance.status === 'paused') {
instance.status = 'running';
instance.isPaused = false;
instance.resumedAt = new Date();
await instance.save();
this.emitInstanceUpdate(instanceId, 'resumed');
// Continue execution from current node
const workflow = await Workflow.findById(instance.workflowId);
const currentNode = workflow.nodes.find(n => n.nodeId === instance.currentNodeId);
if (currentNode) {
await this.executeNode(instance, currentNode);
}
}
}
async completeInstance(instanceId) {
const instance = await WorkflowInstance.findById(instanceId);
instance.status = 'completed';
instance.completedAt = new Date();
await instance.save();
this.activeInstances.delete(instanceId);
this.emitInstanceUpdate(instanceId, 'completed');
}
async failInstance(instanceId, error) {
const instance = await WorkflowInstance.findById(instanceId);
instance.status = 'failed';
instance.error = error;
instance.failedAt = new Date();
await instance.save();
this.activeInstances.delete(instanceId);
this.emitInstanceUpdate(instanceId, 'failed', { error });
}
// Logging and monitoring
async logNodeExecution(instanceId, nodeId, status, details = {}) {
const log = new WorkflowLog({
instanceId,
nodeId,
status,
details,
timestamp: new Date()
});
await log.save();
}
emitInstanceUpdate(instanceId, event, data = {}) {
this.io.to(`workflow:${instanceId}`).emit('instance-update', {
instanceId,
event,
data,
timestamp: new Date()
});
}
// Event handling for external triggers
async handleMessageReceived(accountId, message) {
// Find workflows with message trigger
const workflows = await Workflow.find({
accountId,
status: 'active',
'trigger.type': 'message_received'
});
for (const workflow of workflows) {
const { conditions } = workflow.trigger;
if (this.matchesTriggerConditions(message, conditions)) {
await this.executeWorkflow(workflow._id, { message });
}
}
}
async handleContactCreated(accountId, contact) {
const workflows = await Workflow.find({
accountId,
status: 'active',
'trigger.type': 'contact_created'
});
for (const workflow of workflows) {
await this.executeWorkflow(workflow._id, { contact });
}
}
matchesTriggerConditions(data, conditions) {
if (!conditions || conditions.length === 0) return true;
return conditions.every(condition => {
const fieldValue = this.getFieldValue(data, condition.field);
return this.evaluateCondition(fieldValue, condition.operator, condition.value);
});
}
}

View File

@@ -0,0 +1,33 @@
import winston from 'winston';
import config from '../config/index.js';
const logFormat = winston.format.combine(
winston.format.timestamp(),
winston.format.errors({ stack: true }),
winston.format.json()
);
export const logger = winston.createLogger({
level: config.logging.level,
format: logFormat,
transports: [
new winston.transports.Console({
format: winston.format.combine(
winston.format.colorize(),
winston.format.simple()
)
})
]
});
// Add file transport in production
if (process.env.NODE_ENV === 'production') {
logger.add(new winston.transports.File({
filename: 'logs/error.log',
level: 'error'
}));
logger.add(new winston.transports.File({
filename: 'logs/combined.log'
}));
}