This example demonstrates how to use D2TS with ElectricSQL to build a real-time data pipeline that processes changes incrementally. The example implements an issue tracking system with users, issues, and comments.
This example showcases:
- Setting up ElectricSQL with PostgreSQL
- Building an incremental data pipeline using D2TS
- Performing joins, aggregations, and transformations on real-time data
- Processing data changes efficiently with differential dataflow
The example consists of:
- A PostgreSQL database paired with ElectricSQL for syncing out changes in real-time
- Schema for users, issues, and comments
- A D2TS pipeline that:
- Joins issues with their creators (users)
- Counts comments for each issue
- Consolidates the data into a unified view
The example implements a simple issue tracking system with:
- Users: People who create issues and comments
- Issues: Tasks or bugs with properties like priority and status
- Comments: Text comments on issues
- Node.js and pnpm
- Docker and Docker Compose (for running PostgreSQL and ElectricSQL)
-
Start the backend services
pnpm backend:up
This starts PostgreSQL and ElectricSQL services using Docker Compose.
-
Set up the database
pnpm db:migrate
This applies the database migrations to create the necessary tables.
-
Load sample data
pnpm db:load-data
This loads sample users, issues, and comments data into the database.
-
Run the example
pnpm start
This starts the D2TS pipeline that consumes data from ElectricSQL and processes it incrementally. The pipeline will output the processed data to the console.
-
Reset everything (optional)
pnpm reset
This command tears down the services, recreates them, applies migrations, and loads fresh data.
The pipeline in src/index.ts
demonstrates:
- Creating a D2TS graph - The foundation for the data processing pipeline
- Setting up inputs - Connecting ElectricSQL shape streams to D2TS inputs
- Building transformations:
- Calculating comment counts per issue
- Joining issues with their creators
- Transforming and consolidating the data
- Consuming the results - Outputting processed data as it changes
The ElectricSQL integration uses MultiShapeStream
to consume multiple shapes (one per table) from the same Electric instance and the electricStreamToD2Input
helper to connect Electric streams to D2TS inputs.
- Differential Dataflow: D2TS enables incremental computations, only reprocessing what's changed
- ElectricSQL ShapeStreams: Real-time data streams that emit changes to the database
- LSN-based Processing: Changes are processed based on PostgreSQL Log Sequence Numbers (LSNs) for consistency - these are used at the "version" of the data passed to D2TS