Inversion
Back to Examples

Data Processing Pipeline Example

Advanced

This example shows how to build a scalable data processing pipeline that can handle both batch and streaming data with advanced transformations.

Data processing pipeline visualization

Overview

This example demonstrates how to build a comprehensive data processing pipeline using Inversion that can handle both batch and streaming data. The pipeline includes data ingestion, transformation, enrichment, and delivery components, providing a complete solution for processing data at scale.

Features

  • Unified batch and stream processing
  • Scalable architecture that can handle terabytes of data
  • Advanced data transformations and enrichment
  • Support for multiple data sources and sinks
  • Error handling and recovery mechanisms
  • Monitoring and observability

Architecture

The data processing pipeline is built on a modern architecture that separates concerns and provides flexibility:

Data Sources

Connects to various data sources including databases, files, and streaming platforms.

Processing Engine

Executes transformations, aggregations, and enrichments on the data.

Data Sinks

Delivers processed data to various destinations like databases, APIs, or dashboards.

Implementation

Let's walk through the implementation of the data processing pipeline:

Step 1: Define the Pipeline Configuration

First, we'll define the configuration for our pipeline, including data sources, transformations, and sinks.

// pipeline-config.js
module.exports = {
  name: 'data-processing-pipeline',
  description: 'A comprehensive data processing pipeline for batch and streaming data',
  
  sources: [
    {
      id: 'batch-source',
      type: 'file',
      config: {
        path: '/data/input/*.csv',
        format: 'csv',
        options: {
          header: true,
          delimiter: ','
        }
      }
    },
    {
      id: 'stream-source',
      type: 'kafka',
      config: {
        bootstrapServers: 'kafka:9092',
        topics: ['data-stream'],
        groupId: 'pipeline-consumer'
      }
    }
  ],
  
  transformations: [
    {
      id: 'clean-data',
      type: 'filter',
      config: {
        condition: 'value IS NOT NULL AND value > 0'
      }
    },
    {
      id: 'transform-data',
      type: 'map',
      config: {
        mappings: {
          'timestamp': 'TO_TIMESTAMP(raw_timestamp)',
          'category': 'UPPER(category)',
          'value': 'value * 1.5',
          'status': {
            type: 'case',
            expression: 'value',
            cases: [
              { when: '< 100', then: 'low' },
              { when: '< 500', then: 'medium' },
              { default: 'high' }
            ]
          }
        }
      }
    },
    {
      id: 'enrich-data',
      type: 'join',
      config: {
        source: {
          type: 'database',
          config: {
            url: 'jdbc:postgresql://db:5432/metadata',
            table: 'categories'
          }
        },
        keys: ['category'],
        fields: ['description', 'priority']
      }
    },
    {
      id: 'aggregate-data',
      type: 'aggregate',
      config: {
        groupBy: ['category', 'status'],
        operations: [
          { field: 'value', op: 'sum', as: 'total_value' },
          { field: 'value', op: 'avg', as: 'avg_value' },
          { field: 'value', op: 'count', as: 'count' }
        ],
        window: {
          type: 'tumbling',
          size: '1h'
        }
      }
    }
  ],
  
  sinks: [
    {
      id: 'database-sink',
      type: 'database',
      config: {
        url: 'jdbc:postgresql://db:5432/analytics',
        table: 'processed_data',
        mode: 'upsert',
        keys: ['category', 'timestamp']
      }
    },
    {
      id: 'api-sink',
      type: 'http',
      config: {
        url: 'https://api.example.com/data',
        method: 'POST',
        headers: {
          'Content-Type': 'application/json',
          'Authorization': 'Bearer YOUR_API_TOKEN'
        },
        batchSize: 100
      }
    }
  ],
  
  execution: {
    parallelism: 4,
    checkpointInterval: '1m',
    restartStrategy: {
      attempts: 3,
      delay: '10s'
    }
  },
  
  monitoring: {
    metrics: true,
    logs: true,
    tracing: true
  }
};

Step 2: Create the Pipeline

Next, we'll create the pipeline using the Inversion API.

// create-pipeline.js
const inversion = require('@inversion/core');
const config = require('./pipeline-config');

async function createPipeline() {
  try {
    // Initialize the pipeline
    const pipeline = new inversion.Pipeline(config);
    
    // Validate the pipeline configuration
    const validationResult = await pipeline.validate();
    if (!validationResult.valid) {
      console.error('Pipeline configuration is invalid:', validationResult.errors);
      process.exit(1);
    }
    
    // Create the pipeline
    await pipeline.create();
    console.log('Pipeline created successfully!');
    
    return pipeline;
  } catch (error) {
    console.error('Error creating pipeline:', error);
    process.exit(1);
  }
}

module.exports = createPipeline;

Batch Processing

Let's implement the batch processing component of our pipeline:

// batch-processor.js
const createPipeline = require('./create-pipeline');

async function runBatchProcessing() {
  try {
    // Create the pipeline
    const pipeline = await createPipeline();
    
    // Configure batch processing
    pipeline.setBatchMode({
      source: 'batch-source',
      parallelism: 4,
      checkpointing: true
    });
    
    // Add event listeners
    pipeline.on('batch:start', (batchId) => {
      console.log(`Batch processing started: ${batchId}`);
    });
    
    pipeline.on('batch:progress', (batchId, progress) => {
      console.log(`Batch processing progress: ${batchId}, ${progress.percentage}% complete`);
      console.log(`Processed: ${progress.processed}, Failed: ${progress.failed}`);
    });
    
    pipeline.on('batch:complete', (batchId, stats) => {
      console.log(`Batch processing completed: ${batchId}`);
      console.log(`Total records: ${stats.total}, Processed: ${stats.processed}, Failed: ${stats.failed}`);
      console.log(`Duration: ${stats.duration}ms`);
    });
    
    pipeline.on('batch:error', (batchId, error) => {
      console.error(`Batch processing error: ${batchId}`, error);
    });
    
    // Start batch processing
    const batchId = await pipeline.startBatch();
    console.log(`Batch processing initiated with ID: ${batchId}`);
    
    // Wait for batch completion
    const result = await pipeline.waitForBatchCompletion(batchId);
    console.log('Batch processing result:', result);
    
    return result;
  } catch (error) {
    console.error('Error in batch processing:', error);
    process.exit(1);
  }
}

// Run the batch processing if this script is executed directly
if (require.main === module) {
  runBatchProcessing();
}

module.exports = runBatchProcessing;

Stream Processing

Now, let's implement the stream processing component:

// stream-processor.js
const createPipeline = require('./create-pipeline');

async function runStreamProcessing() {
  try {
    // Create the pipeline
    const pipeline = await createPipeline();
    
    // Configure stream processing
    pipeline.setStreamMode({
      source: 'stream-source',
      checkpointInterval: '1m',
      idleTimeout: '5m'
    });
    
    // Add event listeners
    pipeline.on('stream:start', () => {
      console.log('Stream processing started');
    });
    
    pipeline.on('stream:checkpoint', (checkpoint) => {
      console.log(`Stream checkpoint created: ${checkpoint.id}`);
      console.log(`Records processed since last checkpoint: ${checkpoint.recordsProcessed}`);
    });
    
    pipeline.on('stream:stats', (stats) => {
      console.log('Stream processing stats:');
      console.log(`Records processed: ${stats.recordsProcessed}`);
      console.log(`Processing rate: ${stats.recordsPerSecond} records/sec`);
      console.log(`Latency: ${stats.averageLatency}ms`);
    });
    
    pipeline.on('stream:error', (error) => {
      console.error('Stream processing error:', error);
    });
    
    // Start stream processing
    await pipeline.startStream();
    console.log('Stream processing started');
    
    // Keep the process running
    process.on('SIGINT', async () => {
      console.log('Stopping stream processing...');
      await pipeline.stopStream();
      process.exit(0);
    });
    
    // Log stats periodically
    setInterval(() => {
      const stats = pipeline.getStreamStats();
      console.log('Current stream stats:', stats);
    }, 60000);
    
    return pipeline;
  } catch (error) {
    console.error('Error in stream processing:', error);
    process.exit(1);
  }
}

// Run the stream processing if this script is executed directly
if (require.main === module) {
  runStreamProcessing();
}

module.exports = runStreamProcessing;

Deployment

To deploy the data processing pipeline, you can use Inversion's deployment capabilities:

inversion deploy --type pipeline --name data-processing-pipeline

This will deploy your pipeline to Inversion's cloud platform, making it available for both batch and stream processing.

Resources

Here are some additional resources to help you build your data processing pipeline:

Next Steps

Ready to take your pipeline to the next level? Check out these related examples: