Building a Real-Time Crypto Data Pipeline and Alert System
A trading data aggregation pipeline processing CEX and DEX feeds, with sub-second alert delivery to Telegram and Slack.
0
Exchange feeds
+4 from MVP0ms
Alert latency (median)
P95: ~420ms~0K
Events/minute
Peak load0.0%
Uptime
Excluding exchange-sideThe Problem
The trader’s workflow had reached its breaking point. He was following dozens of markets across Binance, Coinbase, Kraken, and OKX, and keeping up meant constantly jumping between exchange dashboards. It was manageable when he only cared about a handful of pairs. At scale, it became unreliable. Price levels were missed overnight. Signals were discovered too late to act on. Alerts configured on one platform did nothing to help when the same asset moved somewhere else.
What he needed was clear. Every market he cared about had to be monitored in real time. Price and volume data from multiple exchanges had to be evaluated against his alert conditions continuously, and when something triggered, a Telegram notification had to arrive immediately. There was no need for charts or dashboards. The system only needed to watch, decide, and notify.
The first version was functional but flawed. Within the first week, I had connected to eight exchange WebSocket feeds and implemented alert evaluation backed by Redis. Alerts were detected correctly, but speed was the problem. End to end latency ranged from two to four seconds, and sometimes longer under load. The alert evaluation ran on a fixed interval, and each cycle processed conditions sequentially. When markets moved quickly, notifications arrived after the opportunity was already gone. The system worked in principle, but it was not fast enough to be useful.
The Approach
The polling model had to go. The problem wasn't the polling interval itself; it was the architecture. Polling means the alert engine wakes up on a timer and asks "did anything change?" A push model means the pipeline tells the alert engine "something changed, evaluate now." The difference in latency is the polling interval itself, plus the evaluation time that was previously serialized across all conditions on each tick.
Redis Streams with consumer groups gave me the push model I needed. Each exchange feed writes price events to a stream. The alert engine runs as a consumer group with multiple workers, each pulling from the stream as events arrive. No polling. No timer. An event lands in the stream and a worker picks it up within milliseconds.
The other problem was exchange normalization. Every exchange has a different WebSocket message format. Binance sends
{
"e":"trade",
"s":"BTCUSDT",
"p":"67234.50",
"q":"0.012"
}
Coinbase sends
{
"type":"ticker",
"product_id":"BTC-USD",
"price":"67234.50",
"last_size":"0.012"
}
Kraken sends an array. OKX sends a nested object with a data array inside. Before any alert logic could run, every event needed to pass through a normalization layer that produced a consistent internal format: { exchange, pair, price, volume, timestamp }.
Architecture
WebSocket Manager
The WebSocket manager maintains one persistent connection per exchange feed. Each connection runs in its own class with three responsibilities: connect, normalize incoming messages, and write normalized events to the Redis Stream.
Reconnect logic was the first thing I got wrong. The initial version reconnected immediately on close, which meant a flapping exchange connection could generate thousands of reconnect attempts per minute and flood the logs. Exponential backoff with jitter solved it: start at 1 second, double on each failure, cap at 60 seconds, add ±20% random jitter to prevent thundering herd when multiple connections drop simultaneously (which happens during exchange maintenance windows).
const delay = Math.min(BASE_DELAY * 2 ** attempt, MAX_DELAY);
const jitter = delay * 0.2 * (Math.random() * 2 - 1);
await sleep(delay + jitter);
Each exchange class implements a normalize(raw: unknown): NormalizedEvent | null method. If the message doesn't match the expected shape, it returns null and the manager skips the write. This matters for Binance, which sends subscription confirmation messages and heartbeat pings on the same connection as trade data.
Redis Streams Pipeline
Normalized events go into a single Redis Stream: market:events. The key is the stream name; the value is the normalized event fields. I set a MAXLEN ~100000 cap with the approximate trimming flag to keep memory bounded without the overhead of exact trimming on every write.
Events are stored in a TimescaleDB hypertable partitioned by day, with a 90-day retention policy. Continuous aggregates provide pre-computed OHLCV data for the client's charting needs without querying raw events.
The alert engine runs as a consumer group named alert-workers with four workers. Each worker calls XREADGROUP in a loop, processes the batch, and acknowledges each message after evaluation. If a worker crashes mid-batch, the unacknowledged messages stay in the pending entries list and get reclaimed by another worker after a timeout. No events are lost.
const messages = await redis.xreadgroup(
'GROUP',
'alert-workers',
workerId,
'COUNT',
50,
'BLOCK',
100,
'STREAMS',
'market:events',
'>',
);
The BLOCK 100 argument means each worker blocks for up to 100ms waiting for new messages before returning an empty result. In practice, at peak load the workers are never blocking: events arrive faster than the 100ms window. At low load, the blocking prevents busy-waiting.
Alert Engine
Each alert condition is stored in Redis as a hash: the pair, the condition type (price above/below, volume spike, percent change), the threshold, and the delivery targets. When a normalized event arrives, the engine fetches all active conditions for that pair and evaluates each one.
Alert deduplication was the second thing I got wrong. Without it, a price that sits at a threshold for 30 seconds generates 30 seconds worth of identical alerts. I added a sliding window in Redis: when an alert fires, write a key alert:fired:{alertId} with a TTL equal to the alert's cooldown period. Before firing, check if the key exists. If it does, skip. If it doesn't, fire and set the key.
const dedupKey = `alert:fired:${alert.id}`;
const alreadyFired = await redis.set(dedupKey, '1', 'NX', 'EX', alert.cooldownSeconds);
if (!alreadyFired) return; // already fired within cooldown window
The NX flag makes the set conditional: it only succeeds if the key doesn't exist. This is atomic, so two workers evaluating the same alert simultaneously can't both fire it.
Delivery Layer
Telegram and Slack delivery run as separate worker processes that consume from a second stream: alert:delivery. The alert engine writes to this stream after deduplication passes. The delivery workers pull from it and call the respective APIs.
Alert delivery jobs are queued via BullMQ to handle retry logic and rate limiting for Telegram and Slack APIs.
Telegram's Bot API is straightforward. Slack's incoming webhooks are equally simple. The complexity is retry handling: both APIs have rate limits, and Slack's webhook rate limit is per-channel, not per-workspace. A burst of alerts to the same channel can hit the limit and start returning 429s. The delivery worker backs off on 429 responses using the Retry-After header value, then requeues the message.
Key Technical Decisions
Redis Streams vs Kafka
Kafka was the obvious choice for a high-throughput event pipeline. Consumer groups, durable storage, replay capability. I chose Redis Streams instead, and the reason was operational: the client was running this on a single VPS with no existing Kafka infrastructure. Standing up a Kafka cluster for a single-node deployment would have meant running ZooKeeper or KRaft, managing broker configuration, and dealing with Kafka's operational complexity for a workload that Redis Streams handles without any of that overhead.
Redis Streams gives you consumer groups, at-least-once delivery, and a pending entries list for crash recovery. That's 90% of what Kafka provides for this use case. The 10% I gave up was multi-partition parallelism and long-term log retention. Neither mattered here: the pipeline processes events fast enough that a single stream partition isn't a bottleneck, and events older than a few minutes have no value for real-time alerting.
If this pipeline needed to scale to hundreds of exchanges or retain events for backtesting, Kafka would be the right call. At 12 exchanges and ~12K events per minute, Redis Streams is the right call.
Exchange-Specific Normalization
Every exchange has its own WebSocket protocol quirks. Binance requires a subscription message after connecting and sends a result: null confirmation that looks like an error if you're not expecting it. Coinbase's WebSocket disconnects clients that don't send a heartbeat every 30 seconds. Kraken sends trade data as an array where the first element is the channel ID, not a named field. OKX requires a login message for private channels and sends a pong response to keep-alive pings that needs to be filtered before normalization.
I wrote a separate adapter class for each exchange rather than trying to handle all the quirks in a single normalization function. Each adapter knows its exchange's protocol, handles its specific keep-alive requirements, and exposes a single normalize method. Adding a new exchange means writing a new adapter, not modifying existing ones.
ccxt handles REST API fallback for CEX historical data (backfilling gaps when WebSocket reconnects). DEX feeds use Alchemy's WebSocket subscriptions directly. ccxt doesn't support DEX protocols.
The DEX feeds (Uniswap, dYdX) were a different problem entirely. There's no WebSocket API: you subscribe to on-chain events via a node provider (Alchemy, in this case) and parse the swap event logs. The normalization layer treats these the same as CEX events once they're parsed, but the ingestion path is completely different.
WebSocket Connection Pooling
At 12 exchanges, some with multiple pairs per connection and some with one connection per pair, the total WebSocket connection count reached ~30. Node.js handles this fine, but I needed visibility into connection health. Each connection reports its state to a central registry: connected, reconnecting, or failed. A health check endpoint exposes this registry so the client can see at a glance which feeds are live.
The registry also tracks message rates per connection. If a connection is open but the message rate drops to zero for more than 30 seconds, it's treated as a silent failure and reconnected. Some exchanges stop sending data without closing the connection, and without this check those feeds would silently go stale.
Results
The latency improvement was the metric that mattered. Before the Redis Streams rewrite, end-to-end alert latency ranged from 847ms at best to 4,891ms at P99. After, detection through delivery runs in 38ms to 198ms across all phases.
Build-Phase Throughput (events/min)
Pipeline throughput grew from 5,247 events per minute at launch to 12,200 by the end of the 8-week build. The plateau at weeks 4 and 5 (7,100 to 8,600 events/min) was a period of feed consolidation: I removed several low-signal pairs and added higher-volume ones, which temporarily reduced raw event count while improving signal quality. The jump at week 6 (to 10,400) came from adding four new exchange feeds, including two DEX token pairs via Alchemy.
Alert Latency by Phase (ms)
The chart shows latency before and after the Redis Streams rewrite across each phase of the alert pipeline. Detection dropped from 847ms to 52ms. End-to-end P99 dropped from 4,891ms to 198ms. The delivery phase (71ms after) is now the dominant contributor to total latency, which is expected: it's a network call to an external API.
Uptime over the 8-week build was 98.6%, measured as time the internal pipeline was operational; exchange-side maintenance windows excluded. The three outages were all exchange-side: Binance had two maintenance windows and Coinbase had one unplanned outage. The pipeline handled all three correctly, reconnecting automatically when the feeds came back online.
Failure and Recovery
During week 5, while running the pipeline against live Binance feeds in staging, Binance changed their WebSocket message format without warning. The s field for symbol moved to a nested object in a subset of streams. The adapter silently dropped ~40% of Binance events for 6 hours before the message rate monitor caught it. Fixing it was straightforward (update the normalization path), but the incident exposed a gap: we had health checks for connection state but not for schema validity. I added a schema assertion in the normalization layer that rejects and logs unexpected shapes rather than silently returning null.
What Worked
Redis Streams consumer groups. The move from polling to push-based event consumption was the single biggest improvement. The latency drop was immediate and the architecture became easier to reason about: events flow in one direction, workers process them as they arrive, nothing is polling anything.
Per-exchange adapter classes. Isolating each exchange's protocol quirks into its own class meant that adding exchange 17 didn't require touching the code for exchanges 1 through 16. When Binance changed their WebSocket message format in month 3, the fix was contained to the Binance adapter.
Atomic deduplication with Redis SET NX. The sliding window deduplication pattern was simple to implement and impossible to race. Two workers can't both fire the same alert because the SET NX is atomic. The cooldown TTL handles the time dimension without any cron jobs or cleanup logic.
Silent failure detection. Checking message rates per connection caught three cases where an exchange feed had gone stale without closing the WebSocket. Without that check, the client would have been missing data from those feeds without knowing it.
Collaborator. Built for a solo trader who defined the alert conditions and exchange priority list. I handled all engineering; he handled the trading logic and decided which pairs to monitor.
What I'd Reconsider
The alert condition storage model is a Redis hash per condition. That works fine at the current scale (a few hundred conditions), but querying conditions by pair requires either a secondary index or fetching all conditions and filtering in the application layer. I chose the latter, which is fast enough now but won't scale if the condition count grows significantly. A sorted set keyed by pair would make the lookup O(log n) instead of O(n).
The delivery layer writes to a second Redis Stream (alert:delivery) before calling the Telegram or Slack APIs. This adds a hop but decouples alert evaluation from delivery latency. In practice, the delivery stream is almost always empty: workers consume faster than the alert engine produces. The decoupling is correct architecturally, but the second stream adds operational complexity (another consumer group to monitor, another pending entries list to watch) for a benefit that hasn't been needed yet.
I'd also reconsider the single-node Redis deployment. Redis Sentinel would give automatic failover if the primary goes down. Right now, a Redis crash takes down the entire pipeline. For a trading alert system, that's a meaningful risk. The client accepted it as a cost tradeoff, but it's the first thing I'd change if the system needed higher availability guarantees.
The health check endpoint is pull-based. Someone has to look at it. A push-based alert (PagerDuty or even a simple cron that hits the endpoint and sends a Telegram message if anything is red) would have caught the Binance schema incident in minutes, not hours.
Built with: Node.js, TypeScript, Redis, WebSockets, Telegram Bot API, Slack API