🚀 Type-Safe DAG Engine for AI Workflows
Define task dependencies. Get automatic parallelization, cost tracking, and 10x speedup.
🚀 Quick Start • 📖 Documentation • 💬 Discussions • 🐛 Issues • 📦 Examples
dagengine is a TypeScript DAG engine that turns sequential AI workflows into parallel ones automatically.
// ❌ What most developers do (sequential, slow, expensive)
for (const item of items) {
const sentiment = await ai.analyze(item); // Wait...
const topics = await ai.extract(item); // Wait...
const summary = await ai.summarize(item); // Wait...
}
// Result: 100 items × 15 seconds = 25 minutes, $15// ✅ With dagengine (parallel, fast, cheap)
const engine = new DagEngine({ plugin: new MyPlugin() });
const result = await engine.process(items);
// Result: 100 items in 2.5 minutes, $510x faster. 67% cheaper. Zero orchestration code.
Define dependencies → get automatic parallelization.
npm install @dagengine/coreRequirements: Node.js ≥ 18.0.0, TypeScript ≥ 5.0 (recommended)
import { DagEngine, Plugin, type PromptContext, type ProviderSelection } from '@dagengine/core';
// Define result types (optional but helps with TypeScript)
interface SentimentResult {
sentiment: "positive" | "negative" | "neutral";
score: number;
}
interface TopicsResult {
topics: string[];
}
// 1. Define your workflow
class ReviewAnalyzer extends Plugin {
constructor() {
super('analyzer', 'Review Analyzer', 'Analyzes reviews');
this.dimensions = ['sentiment', 'topics', 'summary'];
}
defineDependencies(): Record<string, string[]> {
return {
summary: ['sentiment', 'topics']
};
}
createPrompt(context: PromptContext): string {
const content = context.sections[0]?.content || '';
if (context.dimension === 'sentiment') {
return `Analyze sentiment: "${content}"
Return JSON: {"sentiment": "positive|negative|neutral", "score": 0-1}`;
}
if (context.dimension === 'topics') {
return `Extract topics: "${content}"
Return JSON: {"topics": ["topic1", "topic2"]}`;
}
if (context.dimension === 'summary') {
const sentiment = context.dependencies.sentiment?.data as SentimentResult;
const topics = context.dependencies.topics?.data as TopicsResult;
return `Create a ${sentiment.sentiment} summary covering ${topics.topics.join(', ')}:
"${content}"
Return JSON: {"summary": "summary text"}`;
}
throw new Error(`Unknown dimension: ${context.dimension}`);
}
selectProvider(): ProviderSelection {
return {
provider: 'anthropic',
options: { model: 'claude-3-5-haiku-20241022' }
};
}
}
// 2. Process your data
async function main(): Promise<void> {
// Validate API key
if (!process.env.ANTHROPIC_API_KEY) {
console.error('❌ Missing ANTHROPIC_API_KEY environment variable');
console.error('Set it with: export ANTHROPIC_API_KEY="your-key"');
process.exit(1);
}
// Create engine
const engine = new DagEngine({
plugin: new ReviewAnalyzer(),
providers: {
anthropic: { apiKey: process.env.ANTHROPIC_API_KEY },
}
});
// Prepare reviews
const reviews = [
{ content: 'Great product!', metadata: { id: 1 } },
{ content: 'Not good.', metadata: { id: 2 } }
];
// Process
const result = await engine.process(reviews);
// Display results
console.log(JSON.stringify(result.sections[0]?.results, null, 4));
}
// 3. Run with error handling
main().catch((error: Error) => {
console.error('❌ Processing failed:', error.message);
process.exit(1);
});What just happened?
- ✅
sentimentandtopicsran in parallel (both have no dependencies) - ✅
summarywaited for both to complete - ✅ All sections processed in parallel
- ✅ 2 reviews × 3 dimensions = 6 AI calls, all optimized automatically
Next: Full Documentation • Examples • Production Guide
| Feature | DIY Code | LangChain | dagengine |
|---|---|---|---|
| Setup | Manual loops | Learn LCEL | 2 methods |
| Parallelization | Manual | Manual | Automatic |
| Cost Tracking | Manual calc | Manual calc | Built-in |
| TypeScript | ✅ Full | ✅ Full | |
| Code (100 items) | 150 lines | 80 lines | 25 lines |
| Best For | Small scripts | RAG/Agents | Orchestration |
Use dagengine when:
- ✅ Processing 100+ items with multiple AI analyses
- ✅ Want automatic parallelization without complexity
- ✅ Need built-in cost tracking
- ✅ TypeScript projects
Skip dagengine when:
- ❌ Single AI calls (overkill)
- ❌ Need RAG/agents (use LangChain)
- ❌ Python projects (we're TypeScript-only)
|
Define task dependencies once. Engine handles execution order, parallelization, and coordination automatically. No queues, workers, or complex orchestration code. Skip low-value processing with conditional execution. Route tasks to optimal models. Track costs per dimension in real-time with automatic token counting. Automatic retry with exponential backoff. Provider fallback chains. Graceful error recovery with partial results. Battle-tested reliability. |
Use Anthropic Claude, OpenAI GPT, Google Gemini with a unified interface. Switch providers per dimension. Mix models in one workflow. Full async/await support. Integrate databases, caches, APIs at every processing stage. Transform data mid-pipeline. Complete control when you need it. Built-in cost and token tracking per dimension and provider. Progress callbacks with throughput metrics. Detailed breakdowns in results. |
const sections = [
{
content: 'Customer review text here',
metadata: { id: 1, userId: 123, productId: 'SKU-789' }
}
];Sections are the pieces of data you analyze (reviews, emails, documents, etc.).
this.dimensions = ['sentiment', 'topics', 'summary'];Dimensions are the analyses you run. Each dimension processes all sections.
defineDependencies() {
return {
sentiment: [], // No dependencies (runs first)
topics: [], // No dependencies (runs first)
summary: ['sentiment', 'topics'] // Waits for both
};
}Dependencies control execution order. Engine automatically parallelizes independent tasks.
Execution Plan:
sentiment ──┐
├─→ Both run in parallel → summary
topics ─────┘
Section Dimensions (default) - Analyze each item independently:
this.dimensions = ['sentiment']; // Runs once per sectionGlobal Dimensions - Analyze all items together:
this.dimensions = [
{ name: 'categorize', scope: 'global' } // Runs once for all sections
];class SmartAnalyzer extends Plugin {
dimensions = ['quality_check', 'deep_analysis'];
defineDependencies() {
return { deep_analysis: ['quality_check'] };
}
shouldSkipSectionDimension(context) {
if (context.dimension === 'deep_analysis') {
const quality = context.dependencies.quality_check.data;
return quality.score < 0.7; // Skip low-quality items
}
return false;
}
selectProvider(dimension) {
if (dimension === 'quality_check') {
return {
provider: 'anthropic',
options: { model: 'claude-3-5-haiku-20241022' } // Cheap model
};
}
return {
provider: 'anthropic',
options: { model: 'claude-3-7-sonnet-20250219' } // Expensive model
};
}
}Result: 100 items → 40 high-quality → 60% fewer expensive API calls
selectProvider() {
return {
provider: 'anthropic',
options: { model: 'claude-sonnet-4-5-20250929' },
fallbacks: [
{ provider: 'openai', options: { model: 'gpt-4o' } },
{ provider: 'gemini', options: { model: 'gemini-2.5-pro' } }
]
};
}Automatic failover: If Anthropic fails, automatically tries OpenAI, then Gemini.
class CategoryAnalyzer extends Plugin {
dimensions = [
'classify',
{ name: 'group_by_category', scope: 'global' },
'analyze_category'
];
transformSections(context) {
if (context.dimension === 'group_by_category') {
const categories = context.result.data.categories;
// Transform: 100 sections → 5 category groups
return categories.map(cat => ({
content: cat.items.join('\n\n'),
metadata: { category: cat.name, count: cat.items.length }
}));
}
}
}Result: Analyze 5 category groups instead of 100 individual items (95% fewer API calls)
class DatabaseIntegratedPlugin extends Plugin {
async beforeProcessStart(context) {
// Initialize connections
await this.db.connect();
}
async shouldSkipSectionDimension(context) {
// Check cache before processing
const cached = await this.redis.get(`${context.section.id}:${context.dimension}`);
if (cached) return true;
return false;
}
async afterDimensionExecute(context) {
// Save results to database
await this.db.results.insert({
section: context.section.id,
dimension: context.dimension,
data: context.result.data
});
}
async afterProcessComplete(context) {
// Cleanup
await this.db.disconnect();
}
}All 18 hooks support async/await for seamless external service integration.
- Quick Start - Get started in 5 minutes
- Core Concepts - Understand sections, dimensions, dependencies
- Examples - Complete working examples
- Hello World - Your first plugin
- Dependencies - Control execution order
- Section vs Global - Two dimension types
- Transformations - Reshape data mid-pipeline
- Skip Logic - Optimize costs
- Multi-Provider - Route to different models
- Async Hooks - Database integration
- Error Handling - Graceful recovery
- Portkey Gateway - Rate limit protection & caching
- Production Quickstart - Complete production workflow
- Configuration - Engine config options
- Lifecycle Hooks - All 18 hooks explained
- Type Definitions - TypeScript interfaces
| Provider | Description | Best For | Docs |
|---|---|---|---|
| Anthropic | Claude models for reasoning and analysis | Complex tasks, deep reasoning | Docs |
| OpenAI | GPT models for general-purpose tasks | Fast responses, versatile workflows | Docs |
| Google Gemini | Gemini models for high-speed processing | High throughput, multimodal inputs | Docs |
Mix and match: Route different dimensions to different providers in the same workflow.
selectProvider(dimension) {
if (dimension === 'quality_check') {
return { provider: 'gemini', options: { model: 'gemini-1.5-flash' } };
}
if (dimension === 'deep_analysis') {
return { provider: 'anthropic', options: { model: 'claude-sonnet-4-5-20250929' } };
}
}dagengine supports Portkey as a unified AI gateway for advanced features:
| Feature | Direct Mode | With Portkey Gateway |
|---|---|---|
| Automatic Retries | ✅ Engine-level | ✅ Gateway-level with smart backoff |
| Rate Limit Handling | ✅ Automatic with queuing | |
| Semantic Caching | ❌ | ✅ Reduce costs and latency |
| Load Balancing | ❌ | ✅ Multi-provider routing |
| Observability | ✅ Basic | ✅ Full dashboard & analytics |
Enable Portkey:
providers: {
anthropic: {
apiKey: process.env.ANTHROPIC_API_KEY,
gateway: 'portkey',
gatewayApiKey: process.env.PORTKEY_API_KEY,
gatewayConfig: 'pc-my-config-id' // Optional: retry/cache config
}
}Learn more: Portkey Integration Guide • Portkey Docs
const engine = new DagEngine({
plugin: new MyPlugin(),
// Provider credentials
providers: {
anthropic: { apiKey: process.env.ANTHROPIC_API_KEY },
openai: { apiKey: process.env.OPENAI_API_KEY }
},
// Execution settings
execution: {
concurrency: 10, // Max parallel operations
maxRetries: 3, // Retry attempts
retryDelay: 1000, // Base delay (ms)
timeout: 60000, // Default timeout
continueOnError: true // Process partial results
},
// Cost tracking
pricing: {
models: {
'claude-sonnet-4-5-20250929': {
inputPer1M: 3.00,
outputPer1M: 15.00
}
}
},
// Progress display
progressDisplay: {
display: 'bar', // 'simple' | 'bar' | 'multi' | 'none'
showDimensions: true
}
});# Clone repository
git clone https://github.com/dagengine/dagengine.git
cd dagengine
# Install dependencies
npm install
# Run tests
npm test
# Run tests with coverage
npm run test:coverage
# Type check
npm run type-check
# Build
npm run build
# Run all checks
npm run validateWe welcome contributions! Here's how to get started:
- Fork the repository on GitHub
- Clone your fork:
git clone https://github.com/YOUR_USERNAME/dagengine.git - Create a branch:
git checkout -b feature/your-feature-name - Make your changes and add tests
- Run validation:
npm run validate - Commit:
git commit -m "feat: add your feature" - Push:
git push origin feature/your-feature-name - Open a Pull Request on GitHub
- Code Style: We use Prettier and ESLint (run
npm run format && npm run lint:fix) - Tests: Add tests for new features (run
npm test) - Types: Maintain full TypeScript coverage (run
npm run type-check) - Commits: Use Conventional Commits format
See CONTRIBUTING.md for detailed guidelines.
- 📖 Check the docs - Documentation
- 🔍 Search existing Q&A - GitHub Discussions
- 💬 Ask a question - Start a discussion
- 🐛 Found a bug? - Open an issue
- 📦 npm Package - Install and updates
- ⭐ GitHub - Star for updates
- 💬 Discussions - Feature requests, Q&A
We take security seriously. See SECURITY.md for our security policy.
