sparklearning

The Journey of a Shuffle Record

This is the story of how one record travels from the task that produced it to the task that consumes it—across the boundary between stages that we call shuffle. Understanding this journey explains why your job has multiple stages, where shuffle time goes, and what happens when an executor disappears.


Why shuffle exists

In Spark, a job is split into stages. Within a stage, each partition is processed independently: no data passes between tasks. Between stages, data must move: every record produced in one stage and needed in the next has to find its way to the right place. That movement is shuffle.

So shuffle is not an accident—it’s the bridge. Whenever you do a groupBy, join, or repartition, Spark draws a line: everything before the line is one stage (map side), everything after is another (reduce side). The map tasks write; the reduce tasks read. The journey of a shuffle record is the story of that write and that read.

Think of a postal sorting facility. Local mail carriers (map tasks) collect all letters from their neighbourhoods and drop them at the sorting centre. Workers sort every letter by destination zip code (partition id). Delivery drivers (reduce tasks) then pick up all the letters for their delivery zone from every sorting centre in the region—regardless of where the letters originally came from. Shuffle is that sorting-and-redistribution step in the middle.


The map side: “Where should this record go?”

Every record has a key (or is assigned a partition). The partitioner decides which reduce partition will need it. So the first thing that happens to a shuffle record on the map side is: which partition number is mine?

The map task doesn’t send records one-by-one to the reducers. Instead, it writes them into local storage in a way that the reduce tasks can later read by partition. Today, that almost always means sort-based shuffle.


Sort-based shuffle: one output per map task

In sort-based shuffle, each map task produces one logical output: a single file (or set of files) that contains all the records emitted by that task, organized so that reduce partition 0’s records are in one stretch, partition 1’s in another, and so on. That way, a reduce task only has to read the stretches that belong to its partition(s).

To build that output, the map task doesn’t write directly to the final file. It buffers records in memory. As records arrive, they are sorted (or grouped) by partition id—and sometimes by key within partition—so that when the buffer is flushed, it’s already in the right order. If the buffer gets too big, the excess is spilled to disk: sorted chunks that will later be merged. So the record you’re following might live in memory for a while, then end up in a spill file, and finally in the merged map output. Serialization happens early (often as soon as the record is produced) so that what’s buffered and spilled is bytes, not Java objects—that keeps memory and GC under control.

When the map task finishes, it flushes any remaining buffer, merges all spills (and in-memory segments) into one contiguous layout per partition, and writes that out. It also writes a small index that says “partition 0 starts here and is this long, partition 1 starts there and is that long, …” so that reduce tasks know exactly where to read.

The index file is like a table of contents in a book. The reduce task doesn’t read the whole book—it looks up “Chapter 7 starts on page 214” and skips straight there. Without the index, every reducer would have to scan the entire map output file to find its records.

The task then reports to the driver: I’m done; my output is at this executor, in this file; here are the sizes of each partition’s slice. That report is what the driver stores in the MapOutputTracker: the driver is the single place that knows, for every map task, where its output lives and how big each partition is.

So by the end of the map side, your record sits in a file on one executor’s disk (or in that executor’s block manager), and the driver knows it’s there.


The reduce side: “Where did my records go?”

A reduce task is responsible for one (or a few) reduce partitions. To get all records for its partition(s), it must read the corresponding slice from every map task’s output. So the reduce task first asks the driver (via MapOutputTracker): For this shuffle, give me the list of (executor, block id, size) for every map output that contains my partition. The driver answers with a list of blocks—some on the same executor as the reduce task (local), some on others (remote).

The reduce task then fetches those blocks. Local blocks are read from the local block manager. Remote blocks are requested over the network from the executors (or from the External Shuffle Service, if it’s running) that hold them. Fetches are throttled so that the reducer doesn’t pull too much data into memory at once: there’s a limit on how many bytes (or blocks) can be in flight. As each block arrives, it’s decompressed and deserialized, and the records are passed to the rest of the reduce task (e.g. into an aggregator or a sorter). So your record, which was serialized and perhaps compressed on the map side, is now read from a remote (or local) block, decompressed, deserialized, and handed to the operator that runs the reduce logic.

If the reducer supports it, contiguous blocks from the same executor can be fetched in one request (batch fetch), which reduces overhead. Either way, the reducer sees a single logical stream of records for its partition, assembled from many map outputs.


Streaming, not waiting: the reducer starts as soon as the first block arrives

The reducer does not wait for all shuffle blocks to be present before it starts applying reduce logic. It works in a streaming, pipelined way.

The shuffle read is implemented as an iterator: when the reduce logic (e.g. an aggregator or the next operator) asks for the next record, that iterator may be reading from a block that’s already been fetched. If the current block is exhausted, the iterator moves to the next block—which might already be in memory (prefetched) or might trigger a wait for that block to be fetched. So the reducer starts consuming records as soon as the first block is available and keeps consuming while more blocks are still in flight. Fetches are throttled (e.g. by a limit on how many bytes can be in flight at once), but several blocks can be fetched in parallel. So while the reducer is processing records from block one, the fetcher can already be bringing in blocks two, three, and so on. Fetch and process overlap.

It’s like reading a book while it’s still being printed. You don’t wait for the printer to finish all 500 pages before you start reading—you start reading Chapter 1 as soon as those pages are ready, while the printer is still running off Chapters 4 and 5. The reading and printing pipeline overlap.

If the reduce side needs to sort by key, all records are fed into an external sorter—but they are still fed in as they arrive. The sorter receives a stream of records and may spill to disk as it goes. We don’t wait for every block to be fetched before starting to insert into the sorter; we stream into the sorter while more blocks are still being fetched.


When the executor that wrote the data is gone

Map output lives on the executor that ran the map task. If that executor is still running when the reduce task runs, the block is fetched from that executor. But if the executor has died (crashed, preempted, or been removed), its in-memory blocks are gone. Its shuffle files might still be on disk—if Spark (or the cluster) wrote them to local disk that survives the process, or if an External Shuffle Service is running.

The External Shuffle Service is a separate process (often on the same node) that can serve shuffle files written by executors on that node.

Think of the External Shuffle Service as a warehouse reception desk that stays open after the workers go home. The warehouse workers (executors) unloaded the goods (shuffle files) and went home for the night. The next shift’s drivers (reduce tasks) can still collect the parcels from the reception desk—the goods are still there even though the workers who unloaded them are gone.

If the node itself is gone, or the service isn’t running, the fetch fails. The reduce task then reports a fetch failure. Spark responds by re-running the map stage so that the lost output is regenerated; then the reduce stage runs again.


Why sort-based shuffle (and not hash shuffle)

Older Spark used a “hash” shuffle: each map task wrote one file per reduce partition. With many reduce partitions, that meant many open files and a lot of random I/O.

Imagine writing 200 separate letters, each in its own envelope, for 200 different people—all at once. You’d need 200 pens open simultaneously. Sort-based shuffle instead writes one long letter, sorted by recipient, and attaches a directory at the front. Each recipient looks up their section and reads only that. One file, one pen, one pass.

Sort-based shuffle replaced hash shuffle: one (logical) output per map task, with records sorted by partition id so that each partition’s slice is a contiguous segment. Reducers then read contiguous segments instead of many tiny files. That reduces open-file pressure, improves I/O patterns, and scales better when the number of partitions is large.


Bringing it together

A shuffle record is produced in a map task, assigned a partition, buffered (and possibly spilled) in sorted form, then written into that map task’s output file with an index. The driver records where that output lives. On the reduce side, the reducer asks the driver for every block that contains its partition, then fetches those blocks—local or remote, or from the External Shuffle Service if the original executor is gone. It does not wait for all blocks to arrive: it consumes in a streaming way, processing records as blocks become available (with prefetch so fetch and process overlap). The record is decompressed, deserialized, and fed into the reduce logic. If a fetch fails and the data can’t be recovered, the map stage is re-run. So the journey of a shuffle record is: produce → buffer and sort → spill if needed → merge and write → register with the driver → fetch on the reduce side → deserialize and consume. That journey is why shuffle is expensive—and why it’s the boundary between stages.