---
title: "Window functions"
subtitle: "lag(), row_number(), rolling windows, and the billion-row pattern"
---
Window functions are the bread and butter of panel-data work. "Did this person change employer?" is `lag()`. "What is their rolling average income?" is a trailing `mean()` over a window. "When did their health service use spike?" is a comparison to a trailing average.
On small data you write the window and move on. On PLIDA-scale data — hundreds of millions of rows per year of STP — you hit hard memory limits that force a different approach. This chapter covers both: the everyday dbplyr syntax for window functions, and the chunk-by-person-hash pattern that lets you run a `lag()` over a billion rows on a laptop.
The evidence for the scaling behaviour is in [Appendix A4](A4-window-function-benchmarks.qmd).
## Setup
Same opening as the previous two chapters: tidyverse plus the DuckDB-facing packages, a memory/thread budget, and lazy `tbl()` handles to three tables. `tictoc` is included here because a couple of the window-function examples below materialise intermediate tables with `compute()` and we want to see how long each step takes.
::: {.panel-tabset}
## From a DuckDB database (default)
Open a **read-only** connection to the `plida_tables.duckdb` database we built in [Reading and storing](01-reading-storing.qmd).
```{r setup}
library(tidyverse)
library(scales)
library(tictoc)
library(duckdb)
library(DBI)
library(dbplyr)
# Centralised paths for this project — a short file that just defines
# where the DuckDB and parquet folders live. See the Setup chapter.
source("R/00-paths.R")
data_dir <- paths$duckdb_dir # replace with your own path
con <- dbConnect(
duckdb::duckdb(),
dbdir = file.path(data_dir, "plida_tables.duckdb"),
read_only = TRUE
)
dbExecute(con, "SET memory_limit = '20GB'") # Set at about 90% of your RAM
dbExecute(con, "SET threads TO 8") # Set as your number of cores less 1
itr_db <- tbl(con, "itr") |> rename_with(tolower)
demo_db <- tbl(con, "demo") |> rename_with(tolower)
ato_sp_db <- tbl(con, "ato_spine") |> rename_with(tolower)
```
## From parquet, no database file
```{r setup-parquet}
#| eval: false
library(tidyverse)
library(scales)
library(tictoc)
library(glue)
library(duckdb)
library(DBI)
library(dbplyr)
source("R/00-paths.R")
pq <- paths$parquet_root # replace with your own parquet folder
con <- dbConnect(duckdb::duckdb()) # in-memory DuckDB, no file
dbExecute(con, "SET memory_limit = '20GB'")
dbExecute(con, "SET threads TO 8")
from_parquet <- function(glob) {
tbl(con, sql(glue("SELECT * FROM read_parquet('{glob}')")))
}
itr_db <- from_parquet(file.path(pq, "itr_context/*.parquet")) |> rename_with(tolower)
demo_db <- from_parquet(file.path(pq, "demo/*.parquet")) |> rename_with(tolower)
ato_sp_db <- from_parquet(file.path(pq, "ato-spine/*.parquet")) |> rename_with(tolower)
```
:::
```{r scaffold}
#| include: false
# Internal rendering scaffolding for the `#| time_it: "..."` chunk option
# used below. Readers can ignore this.
library(knitr)
timings <- tibble()
knitr::knit_hooks$set(time_it = function(before, options, envir) {
if (before) {
gc(full = TRUE, verbose = FALSE)
assign(".time_it_t0", Sys.time(), envir = envir)
} else {
secs <- as.numeric(difftime(Sys.time(), get(".time_it_t0", envir = envir), units = "secs"))
task <- options$time_it
approach <- if (!is.null(options$approach)) options$approach else "duckdb"
timings <<- bind_rows(timings, tibble(task = task, approach = approach, seconds = secs))
sprintf("\n::: {.callout-note appearance='minimal' collapse='false'}\n⏱️ **%s**: **%.3fs**\n:::\n", task, secs)
}
})
```
## Everyday windows: `window_order()` and `window_frame()`
We will work against a small derived person-year panel — one row per person per income year with their occupation — built from the demographics, the ATO spine, and the ITR:
```{r derive-panel}
#| message: false
tic("Materialise person-year panel (temp table)")
wk_panel <- demo_db |>
filter(year_of_birth >= 1975L, year_of_birth <= 1984L) |>
select(spine_id, year_of_birth, core_gender) |>
inner_join(ato_sp_db, by = "spine_id") |>
inner_join(
itr_db |>
select(synthetic_aeuid, incm_yr, ocptn_grp_cd) |>
filter(!is.na(ocptn_grp_cd)),
by = "synthetic_aeuid"
) |>
mutate(occ_major = as.integer(floor(ocptn_grp_cd / 1000))) |>
select(spine_id, incm_yr, occ_major, year_of_birth, core_gender) |>
compute(name = "wk_panel", temporary = TRUE, overwrite = TRUE)
toc()
cat("Panel rows:", format(wk_panel |> summarise(n = n()) |> pull(n), big.mark = ","), "\n")
```
`compute()` materialises the lazy pipeline into a temporary DuckDB table and hands back a `tbl()` pointing at it. No data has left DuckDB. We do this here because the next three window queries all reuse the same panel — without `compute()`, dbplyr would re-run the joins every time.
Two idioms you need for dbplyr window functions:
- **`group_by()` sets the `PARTITION BY`.** Same mental model as tibble-level grouping.
- **`window_order()` sets the `ORDER BY`** of the window. This is easy to miss. `arrange()` alone is _not_ enough, because `arrange()` sorts the final output, not the window partition.
### `row_number()`: first observation per person
```{r rn-duckdb}
#| time_it: "row_number() per person"
first_returns <- wk_panel |>
group_by(spine_id) |>
window_order(incm_yr) |>
mutate(return_num = row_number()) |>
ungroup() |>
filter(return_num == 1L) |>
count(incm_yr, occ_major) |>
collect()
head(first_returns)
```
`row_number()`, `rank()`, `dense_rank()` all work the same way. Useful patterns:
- **First / last observation per person**: `row_number() == 1L` after `window_order()`.
- **Deduplicate keeping latest**: same, but `window_order(desc(date))`.
- **Chronological index**: expose `row_number()` as a column for later joins.
### `lag()` for year-over-year change
Did someone change ANZSCO major group from one return to the next?
```{r lag-duckdb}
#| time_it: "lag() per person"
switches <- wk_panel |>
group_by(spine_id) |>
window_order(incm_yr) |>
mutate(prev_occ = lag(occ_major)) |>
ungroup() |>
filter(!is.na(prev_occ), prev_occ != occ_major) |>
count(incm_yr, prev_occ, occ_major) |>
collect()
head(switches)
```
```{r plot-switches}
#| fig-width: 8
#| fig-height: 4.5
switches |>
filter(prev_occ %in% 1:8, occ_major %in% 1:8) |>
group_by(incm_yr) |>
summarise(transitions = sum(n), .groups = "drop") |>
ggplot(aes(incm_yr, transitions)) +
geom_col(fill = "#2c7fb8") +
scale_y_continuous(labels = label_comma()) +
scale_x_continuous(breaks = seq(2010, 2022, 2)) +
labs(x = "Income year", y = "Occupation-group transitions",
title = "Year-over-year ANZSCO major-group transitions",
subtitle = "Derived with a lag() window on the person-year panel") +
theme_minimal(base_size = 11)
```
`lead()` works the same way, looking forward. Pair it with `lag()` when you need "what came before _and_ after this observation".
### Rolling averages: `window_frame()`
Frame clauses — "the previous three rows in time order, within this person" — are expressed in dbplyr with `window_frame(from, to)`. It translates to SQL's `ROWS BETWEEN from PRECEDING AND to FOLLOWING` (negative numbers are PRECEDING, zero is CURRENT ROW, positive is FOLLOWING).
Example: the trailing-3-year average of `occ_major` for each person. Not an especially meaningful quantity for a categorical variable, but it demonstrates the mechanic.
```{r rolling-duckdb}
#| time_it: "rolling mean (window_frame)"
rolling <- wk_panel |>
group_by(spine_id) |>
window_order(incm_yr) |>
window_frame(-3, -1) |>
mutate(trailing_mean_occ = mean(occ_major, na.rm = TRUE)) |>
ungroup() |>
filter(!is.na(trailing_mean_occ)) |>
head(6) |>
collect()
rolling
```
The same idiom works for any window-appropriate aggregate: `sum()`, `mean()`, `median()`, `min()`, `max()`. It also works for `quantile_cont(x, p)`. See `show_query()` on the pipe to check the `OVER (... ROWS BETWEEN ...)` clause dbplyr generated.
::: {.callout-important}
## The window-function rule of thumb
- **Simple partitioned windows** — `row_number()`, `rank()`, `lag()`, `lead()` — `group_by()` + `window_order()` + `mutate()`.
- **Frame-based windows** — `ROWS BETWEEN N PRECEDING AND M PRECEDING/FOLLOWING` — add `window_frame(N, M)` to the pipe.
- **Arrow is rarely the right tool** for anything beyond a trivial `row_number()` inside `arrange()`. Use DuckDB.
:::
## The scaling problem: when the obvious `lag()` cannot run
Everything above ran happily on a panel of a few tens of millions of rows. Try the same `lag()` over the full STP payroll table — 697 million rows for one financial year, 3.5 billion across all five — and it will not complete on a 32 GB machine. The evidence is in [A4 §Scaling behaviour](A4-window-function-benchmarks.qmd#scaling-behaviour):
| Rows per window operation | Status on 32 GB machine (24 GB to DuckDB) |
|---------------------------|-------------------------------------------|
| ~70M | Comfortable — `lag()` in ~2s |
| ~200M | Untested, likely works |
| ~700M | **OOM** for both `lag()` and self-join |
| ~3.5B | **OOM**, disk spill exceeded 166 GB |
The failure mode is the same whether you use `lag()` or a self-join:
- `lag()` fails because DuckDB needs to sort the whole table first (so it knows what "one row before" means), and the sort is too big.
- The self-join fails because DuckDB needs to build a big lookup structure with the whole table in it, and that structure is too big.
Both run out of memory long before they finish. You need a strategy that works on one slice of the data at a time.
## The pattern: chunk by a group of people at a time
Instead of running `lag()` over the whole table, split the people into, say, 10 groups at write-time and process one group at a time. Each group is one-tenth of the total, which fits. Because the window is `partition by person`, every person's whole timeline is either in this group or it isn't — so there are no edge effects at the group boundaries.
How do you split 30 million people into 10 roughly-equal groups? Take a hash of the person ID (a number that always gives the same output for the same input), and use `hash % 10` to assign each person a bucket from 0 to 9. The hash guarantees the buckets are roughly the same size and the assignment is deterministic.
One practical wrinkle: it is worth computing that bucket column at view-definition time, not inside every query. If you try to write `filter(hash(synthetic_aeuid) %% 10 == 0)` inside the dbplyr pipeline, the translator emits a slightly different SQL expression that DuckDB cannot use to skip chunks, and each bucket ends up reading the whole file. Bake a plain integer `bucket` column into the view once, and every downstream query is a simple `filter(bucket == b)`.
```r
library(duckdb); library(DBI); library(dplyr); library(dbplyr); library(purrr)
con <- dbConnect(duckdb::duckdb(), dbdir = "my.duckdb", read_only = FALSE)
dbExecute(con, "SET memory_limit = '20GB'")
# One-off view definition: the bucket column is precomputed, so dbplyr
# can filter on it as a plain integer column.
dbExecute(con, "
CREATE OR REPLACE VIEW stp AS
SELECT *, CAST(hash(SYNTHETIC_AEUID) % 10 AS INTEGER) AS bucket
FROM read_parquet('stp/*.parquet')
WHERE ABN_HASH_TRUNC IS NOT NULL
")
stp <- tbl(con, "stp") |> rename_with(tolower)
n_buckets <- 10
# A single bucket, as a dbplyr pipeline.
switches_one <- function(b) {
stp |>
filter(bucket == b) |>
select(synthetic_aeuid, week_ending, abn_hash_trunc) |>
group_by(synthetic_aeuid) |>
window_order(week_ending) |>
mutate(prev_employer = lag(abn_hash_trunc)) |>
ungroup() |>
filter(!is.na(prev_employer),
abn_hash_trunc != prev_employer)
}
# Run the 10 buckets and stack them into one persistent table.
switches <- map(0:(n_buckets - 1), \(b) switches_one(b) |> compute()) |>
reduce(union_all) |>
compute(name = "switches", temporary = FALSE, overwrite = TRUE)
```
Every analytical step is tidyverse. `compute()` writes each bucket's result to a small temp table inside DuckDB; `reduce(union_all)` stacks the ten temp tables into a single lazy query; the final `compute(name = ..., temporary = FALSE)` writes the stacked result out as a persistent table. The only SQL on the page is `CREATE VIEW` — one line, once — because there is no tidyverse verb for "tell DuckDB that this parquet folder is now a table called `stp`".
Each iteration handles one-tenth of the people. The loop takes about 20 seconds end-to-end for the full 700M-row year, and ~100 seconds for the full 3.5B-row five-year STP corpus. Per-bucket timings are in [A4](A4-window-function-benchmarks.qmd#the-chunked-approach).
::: {.callout-note}
## Keep only the columns you need before the window
The `select()` in `switches_one()` is there for memory, not style. When DuckDB sorts the table for `lag()`, the memory it needs is proportional to how _wide_ each row is, not just how many rows there are. Carrying all 20 STP columns through a 70M-row window will run out of memory where carrying 4 columns is comfortable. Project down to exactly what the window and downstream filters need.
:::
::: {.callout-tip}
## Choosing the number of buckets
Aim for **50–100M rows per bucket**. More buckets = more iterations but safer memory. Fewer = fewer iterations but closer to the limit. On 32 GB with `memory_limit = '20GB'`, 10 buckets is a good default for STP.
:::
## `lag()` vs self-join: when each wins
The chunked pattern works with either `lag()` or a self-join. They are not interchangeable.
- **`lag()` looks back one _row_** within the ordered partition, whatever the time gap. Use this when the data has irregular spacing — some people with records every week, others every fortnight — and "the previous observation for this person" is genuinely what you want.
- **A self-join on `(person, t = t + Δ)` looks back one _fixed time offset_** (e.g. exactly 7 days). Use this when the data has regular spacing by construction (weekly STP records), or when an irregular previous-observation would be misleading.
For STP — weekly by construction — both give the same answer. At 70M rows per bucket, timings are within a few percent of each other. The self-join form stays tidyverse too: shift the "previous" week forward by 7 days, then join on equality.
```r
stp_b <- stp |> filter(bucket == 0L)
stp_prev <- stp_b |>
transmute(synthetic_aeuid,
week_ending = week_ending + 7L, # shift previous week forward
prev_employer = abn_hash_trunc)
stp_b |>
inner_join(stp_prev, by = c("synthetic_aeuid", "week_ending")) |>
filter(abn_hash_trunc != prev_employer)
```
No window function at all — just an ordinary inner join.
## Cross-dataset temporal joins: round to a common grain
A related problem: you have an event table (job switches) and a fact table with irregular dates (MBS services). You want, for each event, the most recent service _before_ the event. The tidyverse-friendly way to handle this is to **round both sides to a common grain** — usually month — and join on equality. You pre-aggregate MBS to monthly totals per person, then inner-join switches to that monthly table on `(person, month)`.
```r
mbs_monthly <- mbs |>
mutate(month = floor_date(svcdate, unit = "month")) |>
group_by(synthetic_aeuid, month) |>
summarise(services = sum(numserv, na.rm = TRUE),
fees = sum(feecharged, na.rm = TRUE), .groups = "drop")
switches_m <- switches |>
mutate(month = floor_date(week_ending, unit = "month"))
switches_m |>
inner_join(mbs_monthly, by = c("synthetic_aeuid", "month"))
```
For the exact "nearest record strictly before this event" semantics, DuckDB has `ASOF JOIN`. It is one of the very few operations this site flags as genuinely SQL-only; if you need it, use it via `dbGetQuery()`. For almost every question that starts "the most recent record before…", the round-to-month form above is clearer and enough.
## Beyond DuckDB: a streaming processor
For production pipelines that process the full STP corpus on every refresh, the next step up from chunked DuckDB is to write a small program (usually in Rust, called from R via [extendr](https://extendr.github.io/rextendr/)) that walks the parquet files in person order and keeps track of the previous employer as it goes. It never sorts anything, never builds a lookup structure — it just reads, compares, writes. Fast, and uses essentially no memory. Overkill for exploratory work; exactly right for a pipeline that runs overnight every night.
For everything on this site, chunked DuckDB with dbplyr is the recipe.
## What lives in [A4](A4-window-function-benchmarks.qmd)
- The failure log: `lag()` on 3.5B rows (disk-spill exhaustion); `lag()` on 700M rows (memory exhaustion); self-join on 700M rows (memory exhaustion).
- Per-bucket timings for both `lag()` and the self-join approach on 70M-row buckets.
- Projected timings across the full corpus.
```{r disconnect}
#| include: false
invisible(dbExecute(con, "DROP TABLE IF EXISTS wk_panel"))
dbDisconnect(con, shutdown = TRUE)
```