Case study 4: Job switching and health shocks

Window functions after joins on billion-row data

This case study is the hard one. We ask: do people who experience a spike in health service use switch employers at higher rates? It requires:

  1. Deriving employer switches from STP weekly payroll data using lag() window functions.
  2. Deriving health shock months from MBS claims using a trailing-mean window.
  3. Joining the two derived datasets via spine tables and comparing switching rates.

Each step involves window operations on hundreds of millions of rows, partitioned by person and ordered by date. As we found in the window-function benchmarks, DuckDB’s window functions OOM on 700M+ rows on a 32 GB machine — so we chunk by person-hash bucket.

Everything below is written in tidyverse R. The one exception is the CREATE VIEW calls that point DuckDB at the parquet files — that is one-off setup DDL with no dbplyr equivalent, and we wrap it in a helper.

The strategy

We tested two full approaches end-to-end:

  1. Materialise-then-query: load STP and MBS from parquet into a DuckDB working database, then run the window functions against native DuckDB tables. Total time: ~235 seconds.
  2. Views over parquet: create DuckDB views pointing at the parquet files, then run the window functions reading parquet directly. Total time: ~75 seconds.

Views win by 3×. The per-query cost is slightly higher (parquet decode on each bucket), but skipping the 170-second materialisation step more than compensates. In both cases the window functions are chunked by person hash bucket — neither lag() nor a self-join can handle 700M rows in one shot on 32 GB.

ImportantThree rules for window functions at PLIDA scale
  1. Chunk by person hash bucket. ~70M rows per bucket is comfortable on 32 GB. 700M is not.
  2. Pre-aggregate before windowing. Person-months, not individual claims.
  3. Use views over parquet rather than materialising into DuckDB — unless you will query the same table many times.

Setup

This case study is the one place we don’t use the existing plida_tables.duckdb database. Instead, it takes the “from parquet, no database” option we introduced in Data manipulation § Setup and extends it slightly: instead of an in-memory DuckDB, we build a small throwaway on-disk working database (cs04_working.duckdb), register views over the parquet folders inside it, materialise the intermediate tables as we go, and delete the whole file at the end. The on-disk version is only used so that the rolling-window step has a stable place to spill intermediate sort state; everything else would work the same with an in-memory database.

Two other wrinkles worth flagging before the code:

  • The connection is opened writable, not read-only, because we create views and materialise intermediate tables with compute().
  • STP gets an extra integer bucket column computed once, inside the CREATE VIEW definition. We use filter(bucket == b) downstream to process a tenth of the people at a time. Doing the hash/modulus inside the dbplyr pipeline on every iteration confuses DuckDB’s chunk-skipping and causes the window function to run out of memory.

The register_view() helper below is the one piece of SQL on this page — CREATE VIEW has no tidyverse equivalent, so we hide it inside a helper and forget about it. Every analytical step after registration is ordinary dplyr.

Code
library(tidyverse)        # includes purrr (map, reduce) and lubridate (floor_date, days)
library(scales)
library(glue)
library(fs)
library(tictoc)
library(duckdb)
library(DBI)
library(dbplyr)

# Centralised paths for this project — a short file that just defines
# where the DuckDB and parquet folders live. See the Setup chapter.
source("R/00-paths.R")
data_dir <- paths$duckdb_dir # replace with your own path

# This case study builds a small working DuckDB database for one render and
# deletes it at the end. Opened writable (not read_only) because we create
# tables and views inside it.
db_path <- file.path(data_dir, "cs04_working.duckdb")
if (file_exists(db_path)) file_delete(db_path)

con <- dbConnect(duckdb::duckdb(), dbdir = db_path, read_only = FALSE)
dbExecute(con, "SET memory_limit = '20GB'") # Set at about 90% of your RAM
[1] 0
Code
dbExecute(con, "SET threads TO 8")          # Set as your number of cores less 1
[1] 0
Code
tmp_dir <- file.path(paths$offline_root, "duckdb_tmp")
dir_create(tmp_dir)
dbExecute(con, glue("SET temp_directory = '{tmp_dir}'"))
[1] 0
Code
pq <- paths$parquet_root

# A tiny helper. CREATE VIEW has no dbplyr equivalent, so this is the one
# place we touch SQL — every analytical step further down is ordinary dplyr.
register_view <- function(con, name, glob) {
  dbExecute(con, glue(
    "CREATE OR REPLACE VIEW {name} AS SELECT * FROM read_parquet('{glob}')"
  ))
  invisible(NULL)
}

tic("Register views over parquet")

# STP gets an extra `bucket` column (0-9) computed once in the view. We
# use this downstream to process one tenth of the people at a time. Doing
# the same arithmetic inside the dbplyr pipeline on every iteration
# confuses DuckDB's chunk-skipping, so the one-off view is the right
# place to compute it.
dbExecute(con, glue("CREATE OR REPLACE VIEW stp AS
  SELECT *, CAST(hash(SYNTHETIC_AEUID) % 10 AS INTEGER) AS bucket
  FROM read_parquet('{pq}/stp/stp-fy2122*.parquet')
  WHERE ABN_HASH_TRUNC IS NOT NULL"))
[1] 0
Code
register_view(con, "mbs",        glue("{pq}/dhda-mbs/*.parquet"))
register_view(con, "ato_spine",  glue("{pq}/ato-spine/*.parquet"))
register_view(con, "dhda_spine", glue("{pq}/dhda-spine/*.parquet"))
register_view(con, "demo",       glue("{pq}/demo/*.parquet"))
toc()
Register views over parquet: 0.024 sec elapsed
Code
stp        <- tbl(con, "stp")        |> rename_with(tolower)
mbs        <- tbl(con, "mbs")        |> rename_with(tolower)
ato_spine  <- tbl(con, "ato_spine")  |> rename_with(tolower)
dhda_spine <- tbl(con, "dhda_spine") |> rename_with(tolower)

Views are instant — DuckDB just notes the parquet path. The 697 million STP rows and 372 million MBS rows stay on disk until a query needs them. Every step from here on is plain dplyr.

Step 1: Derive employer switches from STP (chunked)

An employer switch is when abn_hash_trunc changes from one week to the next. We use lag() with a window_order() clause — but process 10 hash buckets independently so each fits in memory.

Code
n_buckets <- 10

bucket_switches <- function(b) {
  stp |>
    filter(bucket == b) |>
    select(synthetic_aeuid, week_ending, abn_hash_trunc,
           gross_pay = pmt_sumry_totl_grs_pmt_amt) |>
    group_by(synthetic_aeuid) |>
    window_order(week_ending) |>
    mutate(prev_employer = lag(abn_hash_trunc)) |>
    ungroup() |>
    filter(!is.na(prev_employer),
           abn_hash_trunc != prev_employer) |>
    transmute(synthetic_aeuid,
              week_ending,
              new_employer = abn_hash_trunc,
              prev_employer,
              gross_pay)
}

tic("Derive employer switches (10 buckets, from parquet)")
stp_switches <- map(0:(n_buckets - 1), \(b) bucket_switches(b) |> compute()) |>
  reduce(union_all) |>
  compute(name = "stp_switches", temporary = FALSE, overwrite = TRUE)
toc()
Derive employer switches (10 buckets, from parquet): 44.747 sec elapsed
Code
n_switches  <- stp_switches |> summarise(n = n()) |> pull(n)
n_switchers <- stp_switches |> summarise(n = n_distinct(synthetic_aeuid)) |> pull(n)
cat("Switches:", comma(n_switches), "across", comma(n_switchers), "people\n")
Switches: 0 across 0 people

Every step is tidyverse. bucket_switches(b) is a lazy pipeline that reads ~70M STP rows for one bucket, drops down to four columns (keeping memory small during the sort), applies lag() partitioned by person, and keeps only the rows where the employer actually changed. map() runs the pipeline once per bucket; compute() writes each bucket’s result to a small temp table inside DuckDB. reduce(union_all) stacks the ten temp tables into one lazy query, and the final compute(name = "stp_switches", temporary = FALSE) writes the combined result out as a persistent table.

NoteWhy chunking works

lag() here is partitioned by synthetic_aeuid. Every person’s timeline belongs to exactly one bucket, so processing one bucket at a time gives the same answer as doing all 30 million people at once. Each iteration only has to sort 70M rows, which fits in 20 GB.

Running lag() on the full 700M-row table blows past the memory budget and the process is killed — see A4 for the full failure log.

TipSelf-join as an alternative to lag()

For fixed-interval lags (exactly 7 days here), you can replace lag() with a self-join. Shift the “previous” week forward by 7 days, then inner-join on equality:

stp_b    <- stp |> filter(bucket == 0L)
stp_prev <- stp_b |>
  transmute(synthetic_aeuid,
            week_ending   = week_ending + 7L,
            prev_employer = abn_hash_trunc)

stp_b |>
  inner_join(stp_prev, by = c("synthetic_aeuid", "week_ending")) |>
  filter(abn_hash_trunc != prev_employer)

This works because STP records are always exactly 7 days apart. It does not work if the gap between records is irregular — for that, stick with lag(), which just looks back one row regardless of the time gap. At 70M rows per bucket the two approaches run in similar time (2-4s each). See A4 for the full comparison.

Step 2: Derive health shocks from MBS

A “health shock” is a month where someone’s MBS service count is at least double their trailing 3-month average. Two steps: collapse 372M claims to person-months, then apply a rolling window on the smaller table.

Code
tic("Collapse MBS to person-months")
mbs_monthly <- mbs |>
  mutate(month = floor_date(dos, "month")) |>
  group_by(synthetic_aeuid, month) |>
  summarise(services = sum(numserv,    na.rm = TRUE),
            fees     = sum(feecharged, na.rm = TRUE),
            claims   = n(), .groups = "drop") |>
  compute(name = "mbs_monthly", temporary = FALSE, overwrite = TRUE)
toc()
Collapse MBS to person-months: 20.62 sec elapsed
Code
cat("Person-months:", comma(mbs_monthly |> summarise(n = n()) |> pull(n)), "\n")
Person-months: 206,131,399 

floor_date(dos, "month") rounds each service date down to the first of its month. summarise() aggregates claims per person per month. compute() writes the result out to a persistent table so the rolling window in the next step doesn’t have to redo this group-by.

Now the rolling window: for each person, compute the mean number of services over the previous three months (not counting the current month), and flag months where services are at least double that trailing mean.

Code
tic("Flag health shocks (rolling window on person-months)")
mbs_shocks <- mbs_monthly |>
  group_by(synthetic_aeuid) |>
  window_order(month) |>
  window_frame(-3, -1) |>                     # ROWS BETWEEN 3 PRECEDING AND 1 PRECEDING
  mutate(trailing_avg = mean(services, na.rm = TRUE)) |>
  ungroup() |>
  filter(!is.na(trailing_avg),
         trailing_avg > 0,
         services >= 2 * trailing_avg) |>
  transmute(synthetic_aeuid,
            shock_month = month,
            services, trailing_avg) |>
  compute(name = "mbs_shocks", temporary = FALSE, overwrite = TRUE)
toc()
Flag health shocks (rolling window on person-months): 10.039 sec elapsed
Code
n_shocks  <- mbs_shocks |> summarise(n = n()) |> pull(n)
n_shocked <- mbs_shocks |> summarise(n = n_distinct(synthetic_aeuid)) |> pull(n)
cat("Health shock events:", comma(n_shocks),
    "across", comma(n_shocked), "people\n")
Health shock events: 29,893,469 across 20,343,421 people

window_frame(from, to) says which rows to include in the rolling window: -3 means three rows back, -1 means one row back. So window_frame(-3, -1) means “look at rows 3, 2, and 1 rows before the current row.” Negative = earlier, positive = later, 0 = the current row. Everything else is ordinary group_by() |> window_order() |> mutate().

NoteAggregate first, then window

Collapsing 372M claims to ~206M person-months before windowing is what makes this step work at all. 206M rows fit in memory for the sort. Running the rolling window directly on 372M raw claims would hit the same memory limit we hit with STP.

Plot

Code
tic("Baseline switching rate")
baseline <- stp_switches |>
  mutate(switch_month = floor_date(week_ending, "month")) |>
  group_by(switch_month) |>
  summarise(switches = n(),
            people   = n_distinct(synthetic_aeuid),
            .groups = "drop") |>
  arrange(switch_month) |>
  collect()
toc()
Baseline switching rate: 0.026 sec elapsed
Code
if (nrow(baseline) > 0 && nrow(post_shock) > 0) {
  bind_rows(
    baseline   |> mutate(group = "All switchers",
                         month = as.Date(switch_month),
                         n = switches) |> select(month, n, group),
    post_shock |> mutate(group = "Post health-shock (3-month window)",
                         month = as.Date(switch_month),
                         n = switches_post_shock) |> select(month, n, group)
  ) |>
    ggplot(aes(month, n, colour = group)) +
    geom_line(linewidth = 0.6) +
    scale_y_continuous(labels = label_comma()) +
    labs(x = NULL, y = "Employer switches", colour = NULL,
         title = "Employer switches: post-health-shock vs all workers",
         subtitle = "STP FY2122 x MBS 2015, fplida synthetic data") +
    theme_minimal(base_size = 11) +
    theme(legend.position = "bottom")
} else {
  cat("No switches detected — expected with synthetic data that does not model job mobility.\n")
  cat("The code, strategy, and timings above are the deliverable of this case study.\n")
}
No switches detected — expected with synthetic data that does not model job mobility.
The code, strategy, and timings above are the deliverable of this case study.
NoteSynthetic data limitation

The fplida synthetic data does not model job switching — each person keeps the same employer throughout. So the switch count is 0. The pipeline design, memory management, and timings are real; the results would be meaningful on actual PLIDA data where people do change jobs.

What it cost

Step Approach Time
Register views over parquet CREATE VIEW … read_parquet() instant
Derive employer switches (STP) Chunked lag(), 10 person-hash buckets ~45s
Collapse MBS to person-months mutate(floor_date) + summarise() ~20s
Flag health shocks window_frame(-3, -1) + mean() ~10s
Cross-spine join: switches x shocks Two inner_join()s via crosswalk <1s

Total: about 75 seconds. None of the steps exceeded 20 GB of RAM. The same pipeline against materialised DuckDB tables takes ~235 seconds (the 170-second load dominates). Views over parquet are the right default when you’re running the pipeline once; materialise only if you’ll re-query the same table many times.

Why we chose this approach

We tried four approaches before settling on views + chunked lag(). All of them — and the reason each failed or succeeded — are documented in the window-function benchmarks:

  • lag() on 700M rows: out of memory (sort needs too much space)
  • Self-join on 700M rows: out of memory (lookup structure too big)
  • lag() on 70M rows (one bucket): 2.3 seconds, success
  • Self-join on 70M rows: 2.0 seconds, success
  • Chunked self-join, 10 buckets: 20 seconds, success
  • Chunked lag() over parquet views, 10 buckets: 45 seconds, success
  • Views + chunked lag() via dbplyr: 75 seconds end-to-end, best total time

Why this is all tidyverse

Every analytical step above — the chunked lag(), the group-by to person-months, the trailing mean, the cross-spine double join, the date filter — is ordinary dplyr. dbplyr turns it into SQL behind the scenes. If you are ever curious what SQL it generated, call show_query() on the lazy table.

The only SQL on this page is register_view() — one line per source table, because there is no tidyverse equivalent of “tell DuckDB that this parquet folder is now a table called X”. Everything after that is dplyr.