Data Processing Pipeline Example
This example shows how to build a scalable data processing pipeline that can handle both batch and streaming data with advanced transformations.
Overview
This example demonstrates how to build a comprehensive data processing pipeline using Inversion that can handle both batch and streaming data. The pipeline includes data ingestion, transformation, enrichment, and delivery components, providing a complete solution for processing data at scale.
Features
- Unified batch and stream processing
- Scalable architecture that can handle terabytes of data
- Advanced data transformations and enrichment
- Support for multiple data sources and sinks
- Error handling and recovery mechanisms
- Monitoring and observability
Architecture
The data processing pipeline is built on a modern architecture that separates concerns and provides flexibility:
Data Sources
Connects to various data sources including databases, files, and streaming platforms.
Processing Engine
Executes transformations, aggregations, and enrichments on the data.
Data Sinks
Delivers processed data to various destinations like databases, APIs, or dashboards.
Implementation
Let's walk through the implementation of the data processing pipeline:
Step 1: Define the Pipeline Configuration
First, we'll define the configuration for our pipeline, including data sources, transformations, and sinks.
// pipeline-config.js module.exports = { name: 'data-processing-pipeline', description: 'A comprehensive data processing pipeline for batch and streaming data', sources: [ { id: 'batch-source', type: 'file', config: { path: '/data/input/*.csv', format: 'csv', options: { header: true, delimiter: ',' } } }, { id: 'stream-source', type: 'kafka', config: { bootstrapServers: 'kafka:9092', topics: ['data-stream'], groupId: 'pipeline-consumer' } } ], transformations: [ { id: 'clean-data', type: 'filter', config: { condition: 'value IS NOT NULL AND value > 0' } }, { id: 'transform-data', type: 'map', config: { mappings: { 'timestamp': 'TO_TIMESTAMP(raw_timestamp)', 'category': 'UPPER(category)', 'value': 'value * 1.5', 'status': { type: 'case', expression: 'value', cases: [ { when: '< 100', then: 'low' }, { when: '< 500', then: 'medium' }, { default: 'high' } ] } } } }, { id: 'enrich-data', type: 'join', config: { source: { type: 'database', config: { url: 'jdbc:postgresql://db:5432/metadata', table: 'categories' } }, keys: ['category'], fields: ['description', 'priority'] } }, { id: 'aggregate-data', type: 'aggregate', config: { groupBy: ['category', 'status'], operations: [ { field: 'value', op: 'sum', as: 'total_value' }, { field: 'value', op: 'avg', as: 'avg_value' }, { field: 'value', op: 'count', as: 'count' } ], window: { type: 'tumbling', size: '1h' } } } ], sinks: [ { id: 'database-sink', type: 'database', config: { url: 'jdbc:postgresql://db:5432/analytics', table: 'processed_data', mode: 'upsert', keys: ['category', 'timestamp'] } }, { id: 'api-sink', type: 'http', config: { url: 'https://api.example.com/data', method: 'POST', headers: { 'Content-Type': 'application/json', 'Authorization': 'Bearer YOUR_API_TOKEN' }, batchSize: 100 } } ], execution: { parallelism: 4, checkpointInterval: '1m', restartStrategy: { attempts: 3, delay: '10s' } }, monitoring: { metrics: true, logs: true, tracing: true } };
Step 2: Create the Pipeline
Next, we'll create the pipeline using the Inversion API.
// create-pipeline.js const inversion = require('@inversion/core'); const config = require('./pipeline-config'); async function createPipeline() { try { // Initialize the pipeline const pipeline = new inversion.Pipeline(config); // Validate the pipeline configuration const validationResult = await pipeline.validate(); if (!validationResult.valid) { console.error('Pipeline configuration is invalid:', validationResult.errors); process.exit(1); } // Create the pipeline await pipeline.create(); console.log('Pipeline created successfully!'); return pipeline; } catch (error) { console.error('Error creating pipeline:', error); process.exit(1); } } module.exports = createPipeline;
Batch Processing
Let's implement the batch processing component of our pipeline:
// batch-processor.js const createPipeline = require('./create-pipeline'); async function runBatchProcessing() { try { // Create the pipeline const pipeline = await createPipeline(); // Configure batch processing pipeline.setBatchMode({ source: 'batch-source', parallelism: 4, checkpointing: true }); // Add event listeners pipeline.on('batch:start', (batchId) => { console.log(`Batch processing started: ${batchId}`); }); pipeline.on('batch:progress', (batchId, progress) => { console.log(`Batch processing progress: ${batchId}, ${progress.percentage}% complete`); console.log(`Processed: ${progress.processed}, Failed: ${progress.failed}`); }); pipeline.on('batch:complete', (batchId, stats) => { console.log(`Batch processing completed: ${batchId}`); console.log(`Total records: ${stats.total}, Processed: ${stats.processed}, Failed: ${stats.failed}`); console.log(`Duration: ${stats.duration}ms`); }); pipeline.on('batch:error', (batchId, error) => { console.error(`Batch processing error: ${batchId}`, error); }); // Start batch processing const batchId = await pipeline.startBatch(); console.log(`Batch processing initiated with ID: ${batchId}`); // Wait for batch completion const result = await pipeline.waitForBatchCompletion(batchId); console.log('Batch processing result:', result); return result; } catch (error) { console.error('Error in batch processing:', error); process.exit(1); } } // Run the batch processing if this script is executed directly if (require.main === module) { runBatchProcessing(); } module.exports = runBatchProcessing;
Stream Processing
Now, let's implement the stream processing component:
// stream-processor.js const createPipeline = require('./create-pipeline'); async function runStreamProcessing() { try { // Create the pipeline const pipeline = await createPipeline(); // Configure stream processing pipeline.setStreamMode({ source: 'stream-source', checkpointInterval: '1m', idleTimeout: '5m' }); // Add event listeners pipeline.on('stream:start', () => { console.log('Stream processing started'); }); pipeline.on('stream:checkpoint', (checkpoint) => { console.log(`Stream checkpoint created: ${checkpoint.id}`); console.log(`Records processed since last checkpoint: ${checkpoint.recordsProcessed}`); }); pipeline.on('stream:stats', (stats) => { console.log('Stream processing stats:'); console.log(`Records processed: ${stats.recordsProcessed}`); console.log(`Processing rate: ${stats.recordsPerSecond} records/sec`); console.log(`Latency: ${stats.averageLatency}ms`); }); pipeline.on('stream:error', (error) => { console.error('Stream processing error:', error); }); // Start stream processing await pipeline.startStream(); console.log('Stream processing started'); // Keep the process running process.on('SIGINT', async () => { console.log('Stopping stream processing...'); await pipeline.stopStream(); process.exit(0); }); // Log stats periodically setInterval(() => { const stats = pipeline.getStreamStats(); console.log('Current stream stats:', stats); }, 60000); return pipeline; } catch (error) { console.error('Error in stream processing:', error); process.exit(1); } } // Run the stream processing if this script is executed directly if (require.main === module) { runStreamProcessing(); } module.exports = runStreamProcessing;
Deployment
To deploy the data processing pipeline, you can use Inversion's deployment capabilities:
inversion deploy --type pipeline --name data-processing-pipeline
This will deploy your pipeline to Inversion's cloud platform, making it available for both batch and stream processing.
Resources
Here are some additional resources to help you build your data processing pipeline:
- Pipeline API Reference
- Data Transformation Tutorial
- Advanced Pipeline Example
- Inversion GitHub Repository
Next Steps
Ready to take your pipeline to the next level? Check out these related examples: