Code
library(tidyverse)
library(scales)
library(knitr)
# Centralised paths for this project — a short file that just defines
# where the DuckDB and parquet folders live. See the Setup chapter.
source("R/00-paths.R")lag() and self-join strategies on billion-row data
This appendix backs up the claims in Window functions. The question: on a 32 GB machine (24 GB allocated to DuckDB), what is the largest table on which a lag()-style window function actually finishes?
Detect employer switches in STP: a person whose abn_hash_trunc (employer identifier) changes from one week to the next.
Unlike A2 and A3, there is no benchmark log to read on this page — the window-function experiments were mostly about what succeeded vs what crashed on different sizes of input, and the written record below is the main deliverable. The one-bucket, ten-bucket, and whole-year runs described here were done in R/08-benchmark-window-vs-join.R; their timings are quoted inline rather than rendered from a parquet log. The setup just loads tidyverse plus knitr and scales for the handful of formatted tables below.
lag() on the full tableThe textbook dbplyr approach:
Result: out of memory. DuckDB tried to sort 3.5 billion rows, spilled 166 GB to disk, and ran out of disk space.
lag() on 1 year (700M rows)Filtered to FY2122 only, materialised as a separate table first.
Result: out of memory. Even on a fresh DuckDB database with only 5.5 GB of data, the lag() window function exceeded 24 GB of RAM trying to sort and materialise 700M rows across 30M partitions.
Instead of lag(), join the table to itself on (person, week = week + 7 days). Shift the “previous” week forward by 7 days on the right-hand side and inner-join on equality:
Result: out of memory. The hash table for 700M rows still exceeded 24 GB.
On a 32 GB machine, neither lag() nor a self-join can process 700M rows in one shot. lag() fails because the sort is too large. The self-join fails because the hash table is too large. The scale of person-week PLIDA data breaks both approaches at the table level.
Partition people into 10 buckets using a hash of the person ID. Process each bucket independently. Each bucket has ~70M rows — well within the 24 GB memory budget.
The bucket column is precomputed at view-definition time (one line of DDL, once), so the per-bucket filter in dbplyr is a plain integer comparison:
# One-off view, with an integer bucket column baked in.
dbExecute(con, "
CREATE OR REPLACE VIEW stp AS
SELECT *, CAST(hash(SYNTHETIC_AEUID) % 10 AS INTEGER) AS bucket
FROM read_parquet('stp/stp-fy2122*.parquet')
WHERE ABN_HASH_TRUNC IS NOT NULL
")
stp <- tbl(con, "stp") |> rename_with(tolower)
n_buckets <- 10
# One bucket as a lazy dbplyr pipeline.
switches_one <- function(b) {
stp |>
filter(bucket == b) |>
select(synthetic_aeuid, week_ending, abn_hash_trunc) |>
group_by(synthetic_aeuid) |>
window_order(week_ending) |>
mutate(prev_employer = lag(abn_hash_trunc)) |>
ungroup() |>
filter(!is.na(prev_employer),
abn_hash_trunc != prev_employer)
}
# Run the 10 buckets, stack, persist.
purrr::map(0:(n_buckets - 1), \(b) switches_one(b) |> compute()) |>
purrr::reduce(union_all) |>
compute(name = "switches", temporary = FALSE, overwrite = TRUE)Result: success.
| Per bucket | All 10 buckets | |
|---|---|---|
| lag() | 2.3 s | ~23 s (projected) |
| Self-join | 2.0 s | 20.0 s (measured) |
At 70M rows per bucket, both lag() and the self-join complete in about 2 seconds. Processing all 10 buckets sequentially takes ~20 seconds for the full 700M-row year. Extrapolating to all 5 years: about 100 seconds for the entire 3.5 billion-row STP corpus.
The self-join is worth understanding even though it’s not faster than lag() at the same scale — because it works differently, and in some contexts that matters.
lag() says “for each row, look back one position within this person’s timeline.” To define “one position back”, DuckDB must sort all rows by (person, week). That’s O(N log N).
The self-join says “for each row, find the row for the same person exactly one week earlier.” DuckDB builds a hash table on (person, week) and probes it. That’s O(N).
Visually, for one person’s timeline:
Week | Employer | lag() sees | Self-join matches
---------|----------|-----------------|-------------------
Week 1 | A | NULL | no match (no Week 0)
Week 2 | A | A (from Wk 1) | A (from Wk 1)
Week 3 | B | A (from Wk 2) | A (from Wk 2) ← SWITCH
Week 4 | B | B (from Wk 3) | B (from Wk 3)
Both detect the A → B switch at Week 3.
lag()
The self-join works when the lag has a known, fixed time offset — here, exactly 7 days. If your data has irregular spacing (some people with records every week, others every fortnight), the self-join will miss non-standard gaps. lag() always looks back one row regardless of the time gap. For STP, weekly by construction, they give the same result.
Aim for 50–100M rows per bucket. More buckets = more iterations but safer memory. Fewer = fewer iterations but closer to the limit.
| Rows per operation | 32 GB machine (24 GB to DuckDB) | Status |
|---|---|---|
| ~70M | 2s per window function | Comfortable |
| ~200M | Untested, likely works | Try it |
| ~700M | OOM for both lag() and self-join | Too many |
| ~3.5B | OOM, disk spill exceeded 166 GB | Way too many |
The cross-dataset cousin of this problem — “find the most recent health service before each job switch” — is usually better expressed as a DuckDB ASOF JOIN than as a windowed timeline on both tables. It directly finds the nearest temporal match. dbplyr does not (yet) translate ASOF JOIN, so this is the one operation on the site where dropping to dbGetQuery() is worth it. For most “nearest before” questions, however, rounding both sides to a monthly grain and inner-joining on equality is clearer and stays tidyverse — see Window functions § Cross-dataset temporal joins.
For production pipelines that need to process this data daily, the next step beyond chunked DuckDB is a custom streaming processor in Rust called from R via extendr. A reader that walks sorted parquet files and maintains (current_person, previous_employer) state uses O(1) memory and runs at disk speed — no sorting, no hash tables, no chunking. This is overkill for exploratory analysis but the right tool for a pipeline that runs on every data refresh.
---
title: "A4 · Window function benchmarks"
subtitle: "lag() and self-join strategies on billion-row data"
---
This appendix backs up the claims in [Window functions](04-window-functions.qmd). The question: on a 32 GB machine (24 GB allocated to DuckDB), what is the largest table on which a `lag()`-style window function actually finishes?
## The task
Detect employer switches in STP: a person whose `abn_hash_trunc` (employer identifier) changes from one week to the next.
- 697 million rows for a single financial year (FY2122).
- 3.5 billion rows across all five years.
## Setup
Unlike [A2](A2-data-manipulation-benchmarks.qmd) and [A3](A3-join-benchmarks.qmd), there is no benchmark log to read on this page — the window-function experiments were mostly about what succeeded vs what crashed on different sizes of input, and the written record below is the main deliverable. The one-bucket, ten-bucket, and whole-year runs described here were done in `R/08-benchmark-window-vs-join.R`; their timings are quoted inline rather than rendered from a parquet log. The setup just loads tidyverse plus `knitr` and `scales` for the handful of formatted tables below.
```{r setup}
library(tidyverse)
library(scales)
library(knitr)
# Centralised paths for this project — a short file that just defines
# where the DuckDB and parquet folders live. See the Setup chapter.
source("R/00-paths.R")
```
## Scaling behaviour
### Attempt 1: `lag()` on the full table
The textbook dbplyr approach:
```r
stp |>
group_by(synthetic_aeuid) |>
window_order(week_ending) |>
mutate(prev_employer = lag(abn_hash_trunc)) |>
ungroup() |>
filter(!is.na(prev_employer),
abn_hash_trunc != prev_employer) |>
compute(name = "switches", temporary = FALSE)
```
**Result: out of memory.** DuckDB tried to sort 3.5 billion rows, spilled 166 GB to disk, and ran out of disk space.
### Attempt 2: `lag()` on 1 year (700M rows)
Filtered to FY2122 only, materialised as a separate table first.
**Result: out of memory.** Even on a fresh DuckDB database with only 5.5 GB of data, the `lag()` window function exceeded 24 GB of RAM trying to sort and materialise 700M rows across 30M partitions.
### Attempt 3: Self-join on 1 year (700M rows)
Instead of `lag()`, join the table to itself on `(person, week = week + 7 days)`. Shift the "previous" week forward by 7 days on the right-hand side and inner-join on equality:
```r
stp_prev <- stp_1yr |>
transmute(synthetic_aeuid,
week_ending = week_ending + 7L,
prev_employer = abn_hash_trunc)
stp_1yr |>
inner_join(stp_prev, by = c("synthetic_aeuid", "week_ending")) |>
filter(abn_hash_trunc != prev_employer)
```
**Result: out of memory.** The hash table for 700M rows still exceeded 24 GB.
::: {.callout-important}
## The hard lesson
On a 32 GB machine, **neither `lag()` nor a self-join can process 700M rows in one shot.** `lag()` fails because the sort is too large. The self-join fails because the hash table is too large. The scale of person-week PLIDA data breaks both approaches at the table level.
:::
## The chunked approach
Partition people into 10 buckets using a hash of the person ID. Process each bucket independently. Each bucket has ~70M rows — well within the 24 GB memory budget.
The `bucket` column is precomputed at view-definition time (one line of DDL, once), so the per-bucket filter in dbplyr is a plain integer comparison:
```r
# One-off view, with an integer bucket column baked in.
dbExecute(con, "
CREATE OR REPLACE VIEW stp AS
SELECT *, CAST(hash(SYNTHETIC_AEUID) % 10 AS INTEGER) AS bucket
FROM read_parquet('stp/stp-fy2122*.parquet')
WHERE ABN_HASH_TRUNC IS NOT NULL
")
stp <- tbl(con, "stp") |> rename_with(tolower)
n_buckets <- 10
# One bucket as a lazy dbplyr pipeline.
switches_one <- function(b) {
stp |>
filter(bucket == b) |>
select(synthetic_aeuid, week_ending, abn_hash_trunc) |>
group_by(synthetic_aeuid) |>
window_order(week_ending) |>
mutate(prev_employer = lag(abn_hash_trunc)) |>
ungroup() |>
filter(!is.na(prev_employer),
abn_hash_trunc != prev_employer)
}
# Run the 10 buckets, stack, persist.
purrr::map(0:(n_buckets - 1), \(b) switches_one(b) |> compute()) |>
purrr::reduce(union_all) |>
compute(name = "switches", temporary = FALSE, overwrite = TRUE)
```
**Result: success.**
| | Per bucket | All 10 buckets |
|---|---|---|
| **lag()** | 2.3 s | ~23 s (projected) |
| **Self-join** | 2.0 s | **20.0 s** (measured) |
At 70M rows per bucket, both `lag()` and the self-join complete in about 2 seconds. Processing all 10 buckets sequentially takes ~20 seconds for the full 700M-row year. Extrapolating to all 5 years: about 100 seconds for the entire 3.5 billion-row STP corpus.
## How the self-join works
The self-join is worth understanding even though it's not faster than `lag()` at the same scale — because it works differently, and in some contexts that matters.
`lag()` says "for each row, look back one _position_ within this person's timeline." To define "one position back", DuckDB must **sort all rows** by `(person, week)`. That's O(N log N).
The self-join says "for each row, find the row for the same person exactly one _week_ earlier." DuckDB builds a **hash table** on `(person, week)` and probes it. That's O(N).
Visually, for one person's timeline:
```
Week | Employer | lag() sees | Self-join matches
---------|----------|-----------------|-------------------
Week 1 | A | NULL | no match (no Week 0)
Week 2 | A | A (from Wk 1) | A (from Wk 1)
Week 3 | B | A (from Wk 2) | A (from Wk 2) ← SWITCH
Week 4 | B | B (from Wk 3) | B (from Wk 3)
```
Both detect the A → B switch at Week 3.
::: {.callout-note}
## When to use a self-join instead of `lag()`
The self-join works when the lag has a **known, fixed time offset** — here, exactly 7 days. If your data has irregular spacing (some people with records every week, others every fortnight), the self-join will miss non-standard gaps. `lag()` always looks back one _row_ regardless of the time gap. For STP, weekly by construction, they give the same result.
:::
::: {.callout-tip}
## Choosing the number of buckets
Aim for 50–100M rows per bucket. More buckets = more iterations but safer memory. Fewer = fewer iterations but closer to the limit.
:::
## Sizing guidance
| Rows per operation | 32 GB machine (24 GB to DuckDB) | Status |
|---|---|---|
| ~70M | 2s per window function | Comfortable |
| ~200M | Untested, likely works | Try it |
| ~700M | OOM for both lag() and self-join | Too many |
| ~3.5B | OOM, disk spill exceeded 166 GB | Way too many |
## ASOF joins for cross-dataset temporal questions
The cross-dataset cousin of this problem — "find the most recent health service before each job switch" — is usually better expressed as a DuckDB `ASOF JOIN` than as a windowed timeline on both tables. It directly finds the nearest temporal match. dbplyr does not (yet) translate `ASOF JOIN`, so this is the one operation on the site where dropping to `dbGetQuery()` is worth it. For most "nearest before" questions, however, rounding both sides to a monthly grain and inner-joining on equality is clearer and stays tidyverse — see [Window functions § Cross-dataset temporal joins](04-window-functions.qmd#cross-dataset-temporal-joins-round-to-a-common-grain).
## Beyond DuckDB
For production pipelines that need to process this data daily, the next step beyond chunked DuckDB is a custom **streaming processor in Rust** called from R via [extendr](https://extendr.github.io/rextendr/). A reader that walks sorted parquet files and maintains `(current_person, previous_employer)` state uses O(1) memory and runs at disk speed — no sorting, no hash tables, no chunking. This is overkill for exploratory analysis but the right tool for a pipeline that runs on every data refresh.