freshcrate
Home > Frameworks > dagengine

Description

Type-safe DAG execution engine for AI workflows

README

@dagengine/core

🚀 Type-Safe DAG Engine for AI Workflows

Define task dependencies. Get automatic parallelization, cost tracking, and 10x speedup.

npm version License TypeScript

🚀 Quick Start📖 Documentation💬 Discussions🐛 Issues📦 Examples


🎯 What is dagengine?

dagengine is a TypeScript DAG engine that turns sequential AI workflows into parallel ones automatically.

The Problem

// ❌ What most developers do (sequential, slow, expensive)
for (const item of items) {
  const sentiment = await ai.analyze(item);  // Wait...
  const topics = await ai.extract(item);     // Wait...
  const summary = await ai.summarize(item);  // Wait...
}
// Result: 100 items × 15 seconds = 25 minutes, $15

The Solution

// ✅ With dagengine (parallel, fast, cheap)
const engine = new DagEngine({ plugin: new MyPlugin() });
const result = await engine.process(items);
// Result: 100 items in 2.5 minutes, $5

10x faster. 67% cheaper. Zero orchestration code.

Define dependencies → get automatic parallelization.


🚀 5-Minute Quick Start

Install

npm install @dagengine/core

Requirements: Node.js ≥ 18.0.0, TypeScript ≥ 5.0 (recommended)

Example: Analyze Customer Reviews

import { DagEngine, Plugin, type PromptContext, type ProviderSelection } from '@dagengine/core';

// Define result types (optional but helps with TypeScript)
interface SentimentResult {
	sentiment: "positive" | "negative" | "neutral";
	score: number;
}

interface TopicsResult {
	topics: string[];
}

// 1. Define your workflow
class ReviewAnalyzer extends Plugin {
	constructor() {
		super('analyzer', 'Review Analyzer', 'Analyzes reviews');
		this.dimensions = ['sentiment', 'topics', 'summary'];
	}

	defineDependencies(): Record<string, string[]> {
		return {
			summary: ['sentiment', 'topics']
		};
	}

	createPrompt(context: PromptContext): string {
		const content = context.sections[0]?.content || '';

		if (context.dimension === 'sentiment') {
			return `Analyze sentiment: "${content}"
      Return JSON: {"sentiment": "positive|negative|neutral", "score": 0-1}`;
		}

		if (context.dimension === 'topics') {
			return `Extract topics: "${content}"
      Return JSON: {"topics": ["topic1", "topic2"]}`;
		}

		if (context.dimension === 'summary') {
			const sentiment = context.dependencies.sentiment?.data as SentimentResult;
			const topics = context.dependencies.topics?.data as TopicsResult;


			return `Create a ${sentiment.sentiment} summary covering ${topics.topics.join(', ')}:
      "${content}"
      Return JSON: {"summary": "summary text"}`;
		}

		throw new Error(`Unknown dimension: ${context.dimension}`);
	}

	selectProvider(): ProviderSelection {
		return {
			provider: 'anthropic',
			options: { model: 'claude-3-5-haiku-20241022' }
		};
	}
}

// 2. Process your data
async function main(): Promise<void> {
	// Validate API key
	if (!process.env.ANTHROPIC_API_KEY) {
		console.error('❌ Missing ANTHROPIC_API_KEY environment variable');
		console.error('Set it with: export ANTHROPIC_API_KEY="your-key"');
		process.exit(1);
	}

	// Create engine
	const engine = new DagEngine({
		plugin: new ReviewAnalyzer(),
		providers: {
			anthropic: { apiKey: process.env.ANTHROPIC_API_KEY },
		}
	});

	// Prepare reviews
	const reviews = [
		{ content: 'Great product!', metadata: { id: 1 } },
		{ content: 'Not good.', metadata: { id: 2 } }
	];

	// Process
	const result = await engine.process(reviews);

	// Display results
	console.log(JSON.stringify(result.sections[0]?.results, null, 4));
}

// 3. Run with error handling
main().catch((error: Error) => {
	console.error('❌ Processing failed:', error.message);
	process.exit(1);
});

What just happened?

  • sentiment and topics ran in parallel (both have no dependencies)
  • summary waited for both to complete
  • ✅ All sections processed in parallel
  • ✅ 2 reviews × 3 dimensions = 6 AI calls, all optimized automatically

Next: Full DocumentationExamplesProduction Guide


📊 Why Choose dagengine?

Feature DIY Code LangChain dagengine
Setup Manual loops Learn LCEL 2 methods
Parallelization Manual Manual Automatic
Cost Tracking Manual calc Manual calc Built-in
TypeScript ✅ Full ⚠️ Partial ✅ Full
Code (100 items) 150 lines 80 lines 25 lines
Best For Small scripts RAG/Agents Orchestration

Use dagengine when:

  • ✅ Processing 100+ items with multiple AI analyses
  • ✅ Want automatic parallelization without complexity
  • ✅ Need built-in cost tracking
  • ✅ TypeScript projects

Skip dagengine when:

  • ❌ Single AI calls (overkill)
  • ❌ Need RAG/agents (use LangChain)
  • ❌ Python projects (we're TypeScript-only)

⚡ Key Features

🎯 Zero Infrastructure

Define task dependencies once. Engine handles execution order, parallelization, and coordination automatically. No queues, workers, or complex orchestration code.

💰 Cost Optimized

Skip low-value processing with conditional execution. Route tasks to optimal models. Track costs per dimension in real-time with automatic token counting.

🔄 Production Ready

Automatic retry with exponential backoff. Provider fallback chains. Graceful error recovery with partial results. Battle-tested reliability.

🌍 Multi-Provider Support

Use Anthropic Claude, OpenAI GPT, Google Gemini with a unified interface. Switch providers per dimension. Mix models in one workflow.

🪝 18 Lifecycle Hooks

Full async/await support. Integrate databases, caches, APIs at every processing stage. Transform data mid-pipeline. Complete control when you need it.

📊 Real-Time Tracking

Built-in cost and token tracking per dimension and provider. Progress callbacks with throughput metrics. Detailed breakdowns in results.


💡 Core Concepts

1️⃣ Sections (Your Data)

const sections = [
  { 
    content: 'Customer review text here',
    metadata: { id: 1, userId: 123, productId: 'SKU-789' }
  }
];

Sections are the pieces of data you analyze (reviews, emails, documents, etc.).

2️⃣ Dimensions (Your Tasks)

this.dimensions = ['sentiment', 'topics', 'summary'];

Dimensions are the analyses you run. Each dimension processes all sections.

3️⃣ Dependencies (Execution Order)

defineDependencies() {
  return {
    sentiment: [],           // No dependencies (runs first)
    topics: [],              // No dependencies (runs first)
    summary: ['sentiment', 'topics']  // Waits for both
  };
}

Dependencies control execution order. Engine automatically parallelizes independent tasks.

Execution Plan:
sentiment ──┐
            ├─→ Both run in parallel → summary
topics ─────┘

4️⃣ Two Dimension Types

Section Dimensions (default) - Analyze each item independently:

this.dimensions = ['sentiment'];  // Runs once per section

Global Dimensions - Analyze all items together:

this.dimensions = [
  { name: 'categorize', scope: 'global' }  // Runs once for all sections
];

🎨 Advanced Features

Cost Optimization with Skip Logic

class SmartAnalyzer extends Plugin {
  dimensions = ['quality_check', 'deep_analysis'];
  
  defineDependencies() {
    return { deep_analysis: ['quality_check'] };
  }

  shouldSkipSectionDimension(context) {
    if (context.dimension === 'deep_analysis') {
      const quality = context.dependencies.quality_check.data;
      return quality.score < 0.7;  // Skip low-quality items
    }
    return false;
  }

  selectProvider(dimension) {
    if (dimension === 'quality_check') {
      return {
        provider: 'anthropic',
        options: { model: 'claude-3-5-haiku-20241022' }  // Cheap model
      };
    }
    
    return {
      provider: 'anthropic',
      options: { model: 'claude-3-7-sonnet-20250219' }  // Expensive model
    };
  }
}

Result: 100 items → 40 high-quality → 60% fewer expensive API calls

Provider Fallback Chains

selectProvider() {
  return {
    provider: 'anthropic',
    options: { model: 'claude-sonnet-4-5-20250929' },
    fallbacks: [
      { provider: 'openai', options: { model: 'gpt-4o' } },
      { provider: 'gemini', options: { model: 'gemini-2.5-pro' } }
    ]
  };
}

Automatic failover: If Anthropic fails, automatically tries OpenAI, then Gemini.

Data Transformations

class CategoryAnalyzer extends Plugin {
  dimensions = [
    'classify',
    { name: 'group_by_category', scope: 'global' },
    'analyze_category'
  ];

  transformSections(context) {
    if (context.dimension === 'group_by_category') {
      const categories = context.result.data.categories;
      
      // Transform: 100 sections → 5 category groups
      return categories.map(cat => ({
        content: cat.items.join('\n\n'),
        metadata: { category: cat.name, count: cat.items.length }
      }));
    }
  }
}

Result: Analyze 5 category groups instead of 100 individual items (95% fewer API calls)

Async Integration Hooks

class DatabaseIntegratedPlugin extends Plugin {
  async beforeProcessStart(context) {
    // Initialize connections
    await this.db.connect();
  }

  async shouldSkipSectionDimension(context) {
    // Check cache before processing
    const cached = await this.redis.get(`${context.section.id}:${context.dimension}`);
    if (cached) return true;
    return false;
  }

  async afterDimensionExecute(context) {
    // Save results to database
    await this.db.results.insert({
      section: context.section.id,
      dimension: context.dimension,
      data: context.result.data
    });
  }

  async afterProcessComplete(context) {
    // Cleanup
    await this.db.disconnect();
  }
}

All 18 hooks support async/await for seamless external service integration.


📚 Documentation

🎓 Learn

📖 Fundamentals (Step-by-Step Guides)

  1. Hello World - Your first plugin
  2. Dependencies - Control execution order
  3. Section vs Global - Two dimension types
  4. Transformations - Reshape data mid-pipeline
  5. Skip Logic - Optimize costs
  6. Multi-Provider - Route to different models
  7. Async Hooks - Database integration
  8. Error Handling - Graceful recovery

🚀 Advanced

🔧 API Reference


🌐 Supported Providers

Provider Description Best For Docs
Anthropic Claude models for reasoning and analysis Complex tasks, deep reasoning Docs
OpenAI GPT models for general-purpose tasks Fast responses, versatile workflows Docs
Google Gemini Gemini models for high-speed processing High throughput, multimodal inputs Docs

Mix and match: Route different dimensions to different providers in the same workflow.

selectProvider(dimension) {
  if (dimension === 'quality_check') {
    return { provider: 'gemini', options: { model: 'gemini-1.5-flash' } };
  }
  if (dimension === 'deep_analysis') {
    return { provider: 'anthropic', options: { model: 'claude-sonnet-4-5-20250929' } };
  }
}

🔄 Gateway Support

dagengine supports Portkey as a unified AI gateway for advanced features:

Feature Direct Mode With Portkey Gateway
Automatic Retries ✅ Engine-level ✅ Gateway-level with smart backoff
Rate Limit Handling ⚠️ Manual ✅ Automatic with queuing
Semantic Caching ✅ Reduce costs and latency
Load Balancing ✅ Multi-provider routing
Observability ✅ Basic ✅ Full dashboard & analytics

Enable Portkey:

providers: {
  anthropic: {
    apiKey: process.env.ANTHROPIC_API_KEY,
    gateway: 'portkey',
    gatewayApiKey: process.env.PORTKEY_API_KEY,
    gatewayConfig: 'pc-my-config-id'  // Optional: retry/cache config
  }
}

Learn more: Portkey Integration GuidePortkey Docs


📦 Configuration

const engine = new DagEngine({
  plugin: new MyPlugin(),
  
  // Provider credentials
  providers: {
    anthropic: { apiKey: process.env.ANTHROPIC_API_KEY },
    openai: { apiKey: process.env.OPENAI_API_KEY }
  },
  
  // Execution settings
  execution: {
    concurrency: 10,        // Max parallel operations
    maxRetries: 3,          // Retry attempts
    retryDelay: 1000,       // Base delay (ms)
    timeout: 60000,         // Default timeout
    continueOnError: true   // Process partial results
  },
  
  // Cost tracking
  pricing: {
    models: {
      'claude-sonnet-4-5-20250929': {
        inputPer1M: 3.00,
        outputPer1M: 15.00
      }
    }
  },
  
  // Progress display
  progressDisplay: {
    display: 'bar',         // 'simple' | 'bar' | 'multi' | 'none'
    showDimensions: true
  }
});

🛠️ Development

# Clone repository
git clone https://github.com/dagengine/dagengine.git
cd dagengine

# Install dependencies
npm install

# Run tests
npm test

# Run tests with coverage
npm run test:coverage

# Type check
npm run type-check

# Build
npm run build

# Run all checks
npm run validate

🤝 Contributing

We welcome contributions! Here's how to get started:

Quick Start

  1. Fork the repository on GitHub
  2. Clone your fork: git clone https://github.com/YOUR_USERNAME/dagengine.git
  3. Create a branch: git checkout -b feature/your-feature-name
  4. Make your changes and add tests
  5. Run validation: npm run validate
  6. Commit: git commit -m "feat: add your feature"
  7. Push: git push origin feature/your-feature-name
  8. Open a Pull Request on GitHub

Development Guidelines

  • Code Style: We use Prettier and ESLint (run npm run format && npm run lint:fix)
  • Tests: Add tests for new features (run npm test)
  • Types: Maintain full TypeScript coverage (run npm run type-check)
  • Commits: Use Conventional Commits format

Need Help?

See CONTRIBUTING.md for detailed guidelines.


💬 Community & Support

🙋 Need Help?

  1. 📖 Check the docs - Documentation
  2. 🔍 Search existing Q&A - GitHub Discussions
  3. 💬 Ask a question - Start a discussion
  4. 🐛 Found a bug? - Open an issue

🚀 Stay Updated


🔒 Security

We take security seriously. See SECURITY.md for our security policy.

Similar Packages

opencode-magic-contextOpenCode plugin for Magic Context — cache-aware infinite context, cross-session memory, and background history compression for AI coding agentsv0.13.2
delight-ai-agentSendbird AI-Agent v1.12.0
astack🤖 A composable framework for building AI applications.v0.1.1-beta.0
commonlyA social platform for humans and AI agents, built and maintained by its own AI team. Connect any agent via HTTP.v1.0.0
laravel-travel-agentMulti-Agent workflow running into a Laravel application with Neuron PHP AI framework0.0.0