lag(), row_number(), rolling windows, and the billion-row pattern

Window functions are the bread and butter of panel-data work. “Did this person change employer?” is lag(). “What is their rolling average income?” is a trailing mean() over a window. “When did their health service use spike?” is a comparison to a trailing average.

On small data you write the window and move on. On PLIDA-scale data — hundreds of millions of rows per year of STP — you hit hard memory limits that force a different approach. This chapter covers both: the everyday dbplyr syntax for window functions, and the chunk-by-person-hash pattern that lets you run a lag() over a billion rows on a laptop.

The evidence for the scaling behaviour is in Appendix A4.

Setup

Same opening as the previous two chapters: tidyverse plus the DuckDB-facing packages, a memory/thread budget, and lazy tbl() handles to three tables. tictoc is included here because a couple of the window-function examples below materialise intermediate tables with compute() and we want to see how long each step takes.

Open a read-only connection to the plida_tables.duckdb database we built in Reading and storing.

Code
library(tidyverse)
library(scales)
library(tictoc)
library(duckdb)
library(DBI)
library(dbplyr)

# Centralised paths for this project — a short file that just defines
# where the DuckDB and parquet folders live. See the Setup chapter.
source("R/00-paths.R")
data_dir <- paths$duckdb_dir # replace with your own path

con <- dbConnect(
  duckdb::duckdb(),
  dbdir     = file.path(data_dir, "plida_tables.duckdb"),
  read_only = TRUE
)
dbExecute(con, "SET memory_limit = '20GB'") # Set at about 90% of your RAM
[1] 0
Code
dbExecute(con, "SET threads TO 8")          # Set as your number of cores less 1
[1] 0
Code
itr_db    <- tbl(con, "itr")        |> rename_with(tolower)
demo_db   <- tbl(con, "demo")       |> rename_with(tolower)
ato_sp_db <- tbl(con, "ato_spine")  |> rename_with(tolower)
Code
library(tidyverse)
library(scales)
library(tictoc)
library(glue)
library(duckdb)
library(DBI)
library(dbplyr)

source("R/00-paths.R")
pq <- paths$parquet_root # replace with your own parquet folder

con <- dbConnect(duckdb::duckdb())              # in-memory DuckDB, no file
dbExecute(con, "SET memory_limit = '20GB'")
dbExecute(con, "SET threads TO 8")

from_parquet <- function(glob) {
  tbl(con, sql(glue("SELECT * FROM read_parquet('{glob}')")))
}

itr_db    <- from_parquet(file.path(pq, "itr_context/*.parquet")) |> rename_with(tolower)
demo_db   <- from_parquet(file.path(pq, "demo/*.parquet"))        |> rename_with(tolower)
ato_sp_db <- from_parquet(file.path(pq, "ato-spine/*.parquet"))   |> rename_with(tolower)

Everyday windows: window_order() and window_frame()

We will work against a small derived person-year panel — one row per person per income year with their occupation — built from the demographics, the ATO spine, and the ITR:

Code
tic("Materialise person-year panel (temp table)")
wk_panel <- demo_db |>
  filter(year_of_birth >= 1975L, year_of_birth <= 1984L) |>
  select(spine_id, year_of_birth, core_gender) |>
  inner_join(ato_sp_db, by = "spine_id") |>
  inner_join(
    itr_db |>
      select(synthetic_aeuid, incm_yr, ocptn_grp_cd) |>
      filter(!is.na(ocptn_grp_cd)),
    by = "synthetic_aeuid"
  ) |>
  mutate(occ_major = as.integer(floor(ocptn_grp_cd / 1000))) |>
  select(spine_id, incm_yr, occ_major, year_of_birth, core_gender) |>
  compute(name = "wk_panel", temporary = TRUE, overwrite = TRUE)
toc()
Materialise person-year panel (temp table): 2.759 sec elapsed
Code
cat("Panel rows:", format(wk_panel |> summarise(n = n()) |> pull(n), big.mark = ","), "\n")
Panel rows: 36,145,630 

compute() materialises the lazy pipeline into a temporary DuckDB table and hands back a tbl() pointing at it. No data has left DuckDB. We do this here because the next three window queries all reuse the same panel — without compute(), dbplyr would re-run the joins every time.

Two idioms you need for dbplyr window functions:

  • group_by() sets the PARTITION BY. Same mental model as tibble-level grouping.
  • window_order() sets the ORDER BY of the window. This is easy to miss. arrange() alone is not enough, because arrange() sorts the final output, not the window partition.

row_number(): first observation per person

Code
first_returns <- wk_panel |>
  group_by(spine_id) |>
  window_order(incm_yr) |>
  mutate(return_num = row_number()) |>
  ungroup() |>
  filter(return_num == 1L) |>
  count(incm_yr, occ_major) |>
  collect()
head(first_returns)

⏱️ row_number() per person: 1.054s

row_number(), rank(), dense_rank() all work the same way. Useful patterns:

  • First / last observation per person: row_number() == 1L after window_order().
  • Deduplicate keeping latest: same, but window_order(desc(date)).
  • Chronological index: expose row_number() as a column for later joins.

lag() for year-over-year change

Did someone change ANZSCO major group from one return to the next?

Code
switches <- wk_panel |>
  group_by(spine_id) |>
  window_order(incm_yr) |>
  mutate(prev_occ = lag(occ_major)) |>
  ungroup() |>
  filter(!is.na(prev_occ), prev_occ != occ_major) |>
  count(incm_yr, prev_occ, occ_major) |>
  collect()
head(switches)

⏱️ lag() per person: 1.036s

Code
switches |>
  filter(prev_occ %in% 1:8, occ_major %in% 1:8) |>
  group_by(incm_yr) |>
  summarise(transitions = sum(n), .groups = "drop") |>
  ggplot(aes(incm_yr, transitions)) +
  geom_col(fill = "#2c7fb8") +
  scale_y_continuous(labels = label_comma()) +
  scale_x_continuous(breaks = seq(2010, 2022, 2)) +
  labs(x = "Income year", y = "Occupation-group transitions",
       title = "Year-over-year ANZSCO major-group transitions",
       subtitle = "Derived with a lag() window on the person-year panel") +
  theme_minimal(base_size = 11)

lead() works the same way, looking forward. Pair it with lag() when you need “what came before and after this observation”.

Rolling averages: window_frame()

Frame clauses — “the previous three rows in time order, within this person” — are expressed in dbplyr with window_frame(from, to). It translates to SQL’s ROWS BETWEEN from PRECEDING AND to FOLLOWING (negative numbers are PRECEDING, zero is CURRENT ROW, positive is FOLLOWING).

Example: the trailing-3-year average of occ_major for each person. Not an especially meaningful quantity for a categorical variable, but it demonstrates the mechanic.

Code
rolling <- wk_panel |>
  group_by(spine_id) |>
  window_order(incm_yr) |>
  window_frame(-3, -1) |>
  mutate(trailing_mean_occ = mean(occ_major, na.rm = TRUE)) |>
  ungroup() |>
  filter(!is.na(trailing_mean_occ)) |>
  head(6) |>
  collect()
rolling

⏱️ rolling mean (window_frame): 1.066s

The same idiom works for any window-appropriate aggregate: sum(), mean(), median(), min(), max(). It also works for quantile_cont(x, p). See show_query() on the pipe to check the OVER (... ROWS BETWEEN ...) clause dbplyr generated.

ImportantThe window-function rule of thumb
  • Simple partitioned windowsrow_number(), rank(), lag(), lead()group_by() + window_order() + mutate().
  • Frame-based windowsROWS BETWEEN N PRECEDING AND M PRECEDING/FOLLOWING — add window_frame(N, M) to the pipe.
  • Arrow is rarely the right tool for anything beyond a trivial row_number() inside arrange(). Use DuckDB.

The scaling problem: when the obvious lag() cannot run

Everything above ran happily on a panel of a few tens of millions of rows. Try the same lag() over the full STP payroll table — 697 million rows for one financial year, 3.5 billion across all five — and it will not complete on a 32 GB machine. The evidence is in A4 §Scaling behaviour:

Rows per window operation Status on 32 GB machine (24 GB to DuckDB)
~70M Comfortable — lag() in ~2s
~200M Untested, likely works
~700M OOM for both lag() and self-join
~3.5B OOM, disk spill exceeded 166 GB

The failure mode is the same whether you use lag() or a self-join:

  • lag() fails because DuckDB needs to sort the whole table first (so it knows what “one row before” means), and the sort is too big.
  • The self-join fails because DuckDB needs to build a big lookup structure with the whole table in it, and that structure is too big.

Both run out of memory long before they finish. You need a strategy that works on one slice of the data at a time.

The pattern: chunk by a group of people at a time

Instead of running lag() over the whole table, split the people into, say, 10 groups at write-time and process one group at a time. Each group is one-tenth of the total, which fits. Because the window is partition by person, every person’s whole timeline is either in this group or it isn’t — so there are no edge effects at the group boundaries.

How do you split 30 million people into 10 roughly-equal groups? Take a hash of the person ID (a number that always gives the same output for the same input), and use hash % 10 to assign each person a bucket from 0 to 9. The hash guarantees the buckets are roughly the same size and the assignment is deterministic.

One practical wrinkle: it is worth computing that bucket column at view-definition time, not inside every query. If you try to write filter(hash(synthetic_aeuid) %% 10 == 0) inside the dbplyr pipeline, the translator emits a slightly different SQL expression that DuckDB cannot use to skip chunks, and each bucket ends up reading the whole file. Bake a plain integer bucket column into the view once, and every downstream query is a simple filter(bucket == b).

library(duckdb); library(DBI); library(dplyr); library(dbplyr); library(purrr)

con <- dbConnect(duckdb::duckdb(), dbdir = "my.duckdb", read_only = FALSE)
dbExecute(con, "SET memory_limit = '20GB'")

# One-off view definition: the bucket column is precomputed, so dbplyr
# can filter on it as a plain integer column.
dbExecute(con, "
  CREATE OR REPLACE VIEW stp AS
  SELECT *, CAST(hash(SYNTHETIC_AEUID) % 10 AS INTEGER) AS bucket
  FROM read_parquet('stp/*.parquet')
  WHERE ABN_HASH_TRUNC IS NOT NULL
")

stp <- tbl(con, "stp") |> rename_with(tolower)
n_buckets <- 10

# A single bucket, as a dbplyr pipeline.
switches_one <- function(b) {
  stp |>
    filter(bucket == b) |>
    select(synthetic_aeuid, week_ending, abn_hash_trunc) |>
    group_by(synthetic_aeuid) |>
    window_order(week_ending) |>
    mutate(prev_employer = lag(abn_hash_trunc)) |>
    ungroup() |>
    filter(!is.na(prev_employer),
           abn_hash_trunc != prev_employer)
}

# Run the 10 buckets and stack them into one persistent table.
switches <- map(0:(n_buckets - 1), \(b) switches_one(b) |> compute()) |>
  reduce(union_all) |>
  compute(name = "switches", temporary = FALSE, overwrite = TRUE)

Every analytical step is tidyverse. compute() writes each bucket’s result to a small temp table inside DuckDB; reduce(union_all) stacks the ten temp tables into a single lazy query; the final compute(name = ..., temporary = FALSE) writes the stacked result out as a persistent table. The only SQL on the page is CREATE VIEW — one line, once — because there is no tidyverse verb for “tell DuckDB that this parquet folder is now a table called stp”.

Each iteration handles one-tenth of the people. The loop takes about 20 seconds end-to-end for the full 700M-row year, and ~100 seconds for the full 3.5B-row five-year STP corpus. Per-bucket timings are in A4.

NoteKeep only the columns you need before the window

The select() in switches_one() is there for memory, not style. When DuckDB sorts the table for lag(), the memory it needs is proportional to how wide each row is, not just how many rows there are. Carrying all 20 STP columns through a 70M-row window will run out of memory where carrying 4 columns is comfortable. Project down to exactly what the window and downstream filters need.

TipChoosing the number of buckets

Aim for 50–100M rows per bucket. More buckets = more iterations but safer memory. Fewer = fewer iterations but closer to the limit. On 32 GB with memory_limit = '20GB', 10 buckets is a good default for STP.

lag() vs self-join: when each wins

The chunked pattern works with either lag() or a self-join. They are not interchangeable.

  • lag() looks back one row within the ordered partition, whatever the time gap. Use this when the data has irregular spacing — some people with records every week, others every fortnight — and “the previous observation for this person” is genuinely what you want.
  • A self-join on (person, t = t + Δ) looks back one fixed time offset (e.g. exactly 7 days). Use this when the data has regular spacing by construction (weekly STP records), or when an irregular previous-observation would be misleading.

For STP — weekly by construction — both give the same answer. At 70M rows per bucket, timings are within a few percent of each other. The self-join form stays tidyverse too: shift the “previous” week forward by 7 days, then join on equality.

stp_b    <- stp |> filter(bucket == 0L)
stp_prev <- stp_b |>
  transmute(synthetic_aeuid,
            week_ending   = week_ending + 7L,    # shift previous week forward
            prev_employer = abn_hash_trunc)

stp_b |>
  inner_join(stp_prev, by = c("synthetic_aeuid", "week_ending")) |>
  filter(abn_hash_trunc != prev_employer)

No window function at all — just an ordinary inner join.

Cross-dataset temporal joins: round to a common grain

A related problem: you have an event table (job switches) and a fact table with irregular dates (MBS services). You want, for each event, the most recent service before the event. The tidyverse-friendly way to handle this is to round both sides to a common grain — usually month — and join on equality. You pre-aggregate MBS to monthly totals per person, then inner-join switches to that monthly table on (person, month).

mbs_monthly <- mbs |>
  mutate(month = floor_date(svcdate, unit = "month")) |>
  group_by(synthetic_aeuid, month) |>
  summarise(services = sum(numserv, na.rm = TRUE),
            fees     = sum(feecharged, na.rm = TRUE), .groups = "drop")

switches_m <- switches |>
  mutate(month = floor_date(week_ending, unit = "month"))

switches_m |>
  inner_join(mbs_monthly, by = c("synthetic_aeuid", "month"))

For the exact “nearest record strictly before this event” semantics, DuckDB has ASOF JOIN. It is one of the very few operations this site flags as genuinely SQL-only; if you need it, use it via dbGetQuery(). For almost every question that starts “the most recent record before…”, the round-to-month form above is clearer and enough.

Beyond DuckDB: a streaming processor

For production pipelines that process the full STP corpus on every refresh, the next step up from chunked DuckDB is to write a small program (usually in Rust, called from R via extendr) that walks the parquet files in person order and keeps track of the previous employer as it goes. It never sorts anything, never builds a lookup structure — it just reads, compares, writes. Fast, and uses essentially no memory. Overkill for exploratory work; exactly right for a pipeline that runs overnight every night.

For everything on this site, chunked DuckDB with dbplyr is the recipe.

What lives in A4

  • The failure log: lag() on 3.5B rows (disk-spill exhaustion); lag() on 700M rows (memory exhaustion); self-join on 700M rows (memory exhaustion).
  • Per-bucket timings for both lag() and the self-join approach on 70M-row buckets.
  • Projected timings across the full corpus.