---
title: "Case study 4: Job switching and health shocks"
subtitle: "Window functions after joins on billion-row data"
---
This case study is the hard one. We ask: **do people who experience a spike in health service use switch employers at higher rates?** It requires:
1. Deriving _employer switches_ from STP weekly payroll data using `lag()` window functions.
2. Deriving _health shock_ months from MBS claims using a trailing-mean window.
3. Joining the two derived datasets via spine tables and comparing switching rates.
Each step involves window operations on hundreds of millions of rows, partitioned by person and ordered by date. As we found in the [window-function benchmarks](A4-window-function-benchmarks.qmd), DuckDB's window functions OOM on 700M+ rows on a 32 GB machine — so we chunk by person-hash bucket.
Everything below is written in tidyverse R. The one exception is the `CREATE VIEW` calls that point DuckDB at the parquet files — that is one-off setup DDL with no dbplyr equivalent, and we wrap it in a helper.
## The strategy
We tested two full approaches end-to-end:
1. **Materialise-then-query:** load STP and MBS from parquet into a DuckDB working database, then run the window functions against native DuckDB tables. Total time: ~235 seconds.
2. **Views over parquet:** create DuckDB views pointing at the parquet files, then run the window functions reading parquet directly. Total time: ~75 seconds.
Views win by 3×. The per-query cost is slightly higher (parquet decode on each bucket), but skipping the 170-second materialisation step more than compensates. In both cases the window functions are chunked by person hash bucket — neither `lag()` nor a self-join can handle 700M rows in one shot on 32 GB.
::: {.callout-important}
## Three rules for window functions at PLIDA scale
1. **Chunk by person hash bucket.** ~70M rows per bucket is comfortable on 32 GB. 700M is not.
2. **Pre-aggregate before windowing.** Person-months, not individual claims.
3. **Use views over parquet** rather than materialising into DuckDB — unless you will query the same table many times.
:::
## Setup
This case study is the one place we don't use the existing `plida_tables.duckdb` database. Instead, it takes the **"from parquet, no database"** option we introduced in [Data manipulation § Setup](02-data-manipulation.qmd#setup) and extends it slightly: instead of an in-memory DuckDB, we build a small **throwaway on-disk working database** (`cs04_working.duckdb`), register views over the parquet folders inside it, materialise the intermediate tables as we go, and delete the whole file at the end. The on-disk version is only used so that the rolling-window step has a stable place to spill intermediate sort state; everything else would work the same with an in-memory database.
Two other wrinkles worth flagging before the code:
- The connection is opened **writable**, not read-only, because we create views and materialise intermediate tables with `compute()`.
- STP gets an extra integer `bucket` column computed once, inside the `CREATE VIEW` definition. We use `filter(bucket == b)` downstream to process a tenth of the people at a time. Doing the hash/modulus inside the dbplyr pipeline on every iteration confuses DuckDB's chunk-skipping and causes the window function to run out of memory.
The `register_view()` helper below is the one piece of SQL on this page — `CREATE VIEW` has no tidyverse equivalent, so we hide it inside a helper and forget about it. Every analytical step after registration is ordinary dplyr.
```{r setup}
library(tidyverse) # includes purrr (map, reduce) and lubridate (floor_date, days)
library(scales)
library(glue)
library(fs)
library(tictoc)
library(duckdb)
library(DBI)
library(dbplyr)
# Centralised paths for this project — a short file that just defines
# where the DuckDB and parquet folders live. See the Setup chapter.
source("R/00-paths.R")
data_dir <- paths$duckdb_dir # replace with your own path
# This case study builds a small working DuckDB database for one render and
# deletes it at the end. Opened writable (not read_only) because we create
# tables and views inside it.
db_path <- file.path(data_dir, "cs04_working.duckdb")
if (file_exists(db_path)) file_delete(db_path)
con <- dbConnect(duckdb::duckdb(), dbdir = db_path, read_only = FALSE)
dbExecute(con, "SET memory_limit = '20GB'") # Set at about 90% of your RAM
dbExecute(con, "SET threads TO 8") # Set as your number of cores less 1
tmp_dir <- file.path(paths$offline_root, "duckdb_tmp")
dir_create(tmp_dir)
dbExecute(con, glue("SET temp_directory = '{tmp_dir}'"))
pq <- paths$parquet_root
# A tiny helper. CREATE VIEW has no dbplyr equivalent, so this is the one
# place we touch SQL — every analytical step further down is ordinary dplyr.
register_view <- function(con, name, glob) {
dbExecute(con, glue(
"CREATE OR REPLACE VIEW {name} AS SELECT * FROM read_parquet('{glob}')"
))
invisible(NULL)
}
tic("Register views over parquet")
# STP gets an extra `bucket` column (0-9) computed once in the view. We
# use this downstream to process one tenth of the people at a time. Doing
# the same arithmetic inside the dbplyr pipeline on every iteration
# confuses DuckDB's chunk-skipping, so the one-off view is the right
# place to compute it.
dbExecute(con, glue("CREATE OR REPLACE VIEW stp AS
SELECT *, CAST(hash(SYNTHETIC_AEUID) % 10 AS INTEGER) AS bucket
FROM read_parquet('{pq}/stp/stp-fy2122*.parquet')
WHERE ABN_HASH_TRUNC IS NOT NULL"))
register_view(con, "mbs", glue("{pq}/dhda-mbs/*.parquet"))
register_view(con, "ato_spine", glue("{pq}/ato-spine/*.parquet"))
register_view(con, "dhda_spine", glue("{pq}/dhda-spine/*.parquet"))
register_view(con, "demo", glue("{pq}/demo/*.parquet"))
toc()
stp <- tbl(con, "stp") |> rename_with(tolower)
mbs <- tbl(con, "mbs") |> rename_with(tolower)
ato_spine <- tbl(con, "ato_spine") |> rename_with(tolower)
dhda_spine <- tbl(con, "dhda_spine") |> rename_with(tolower)
```
Views are instant — DuckDB just notes the parquet path. The 697 million STP rows and 372 million MBS rows stay on disk until a query needs them. Every step from here on is plain dplyr.
## Step 1: Derive employer switches from STP (chunked)
An employer switch is when `abn_hash_trunc` changes from one week to the next. We use `lag()` with a `window_order()` clause — but process 10 hash buckets independently so each fits in memory.
```{r derive-switches}
n_buckets <- 10
bucket_switches <- function(b) {
stp |>
filter(bucket == b) |>
select(synthetic_aeuid, week_ending, abn_hash_trunc,
gross_pay = pmt_sumry_totl_grs_pmt_amt) |>
group_by(synthetic_aeuid) |>
window_order(week_ending) |>
mutate(prev_employer = lag(abn_hash_trunc)) |>
ungroup() |>
filter(!is.na(prev_employer),
abn_hash_trunc != prev_employer) |>
transmute(synthetic_aeuid,
week_ending,
new_employer = abn_hash_trunc,
prev_employer,
gross_pay)
}
tic("Derive employer switches (10 buckets, from parquet)")
stp_switches <- map(0:(n_buckets - 1), \(b) bucket_switches(b) |> compute()) |>
reduce(union_all) |>
compute(name = "stp_switches", temporary = FALSE, overwrite = TRUE)
toc()
n_switches <- stp_switches |> summarise(n = n()) |> pull(n)
n_switchers <- stp_switches |> summarise(n = n_distinct(synthetic_aeuid)) |> pull(n)
cat("Switches:", comma(n_switches), "across", comma(n_switchers), "people\n")
```
Every step is tidyverse. `bucket_switches(b)` is a lazy pipeline that reads ~70M STP rows for one bucket, drops down to four columns (keeping memory small during the sort), applies `lag()` partitioned by person, and keeps only the rows where the employer actually changed. `map()` runs the pipeline once per bucket; `compute()` writes each bucket's result to a small temp table inside DuckDB. `reduce(union_all)` stacks the ten temp tables into one lazy query, and the final `compute(name = "stp_switches", temporary = FALSE)` writes the combined result out as a persistent table.
::: {.callout-note}
## Why chunking works
`lag()` here is partitioned by `synthetic_aeuid`. Every person's timeline belongs to exactly one bucket, so processing one bucket at a time gives the same answer as doing all 30 million people at once. Each iteration only has to sort 70M rows, which fits in 20 GB.
Running `lag()` on the full 700M-row table blows past the memory budget and the process is killed — see [A4](A4-window-function-benchmarks.qmd#scaling-behaviour) for the full failure log.
:::
::: {.callout-tip}
## Self-join as an alternative to `lag()`
For fixed-interval lags (exactly 7 days here), you can replace `lag()` with a self-join. Shift the "previous" week forward by 7 days, then inner-join on equality:
```r
stp_b <- stp |> filter(bucket == 0L)
stp_prev <- stp_b |>
transmute(synthetic_aeuid,
week_ending = week_ending + 7L,
prev_employer = abn_hash_trunc)
stp_b |>
inner_join(stp_prev, by = c("synthetic_aeuid", "week_ending")) |>
filter(abn_hash_trunc != prev_employer)
```
This works because STP records are always exactly 7 days apart. It does _not_ work if the gap between records is irregular — for that, stick with `lag()`, which just looks back one row regardless of the time gap. At 70M rows per bucket the two approaches run in similar time (2-4s each). See [A4](A4-window-function-benchmarks.qmd#how-the-self-join-works) for the full comparison.
:::
## Step 2: Derive health shocks from MBS
A "health shock" is a month where someone's MBS service count is at least double their trailing 3-month average. Two steps: collapse 372M claims to person-months, then apply a rolling window on the smaller table.
```{r derive-monthly}
tic("Collapse MBS to person-months")
mbs_monthly <- mbs |>
mutate(month = floor_date(dos, "month")) |>
group_by(synthetic_aeuid, month) |>
summarise(services = sum(numserv, na.rm = TRUE),
fees = sum(feecharged, na.rm = TRUE),
claims = n(), .groups = "drop") |>
compute(name = "mbs_monthly", temporary = FALSE, overwrite = TRUE)
toc()
cat("Person-months:", comma(mbs_monthly |> summarise(n = n()) |> pull(n)), "\n")
```
`floor_date(dos, "month")` rounds each service date down to the first of its month. `summarise()` aggregates claims per person per month. `compute()` writes the result out to a persistent table so the rolling window in the next step doesn't have to redo this group-by.
Now the rolling window: for each person, compute the mean number of services over the previous three months (not counting the current month), and flag months where services are at least double that trailing mean.
```{r derive-shocks}
tic("Flag health shocks (rolling window on person-months)")
mbs_shocks <- mbs_monthly |>
group_by(synthetic_aeuid) |>
window_order(month) |>
window_frame(-3, -1) |> # ROWS BETWEEN 3 PRECEDING AND 1 PRECEDING
mutate(trailing_avg = mean(services, na.rm = TRUE)) |>
ungroup() |>
filter(!is.na(trailing_avg),
trailing_avg > 0,
services >= 2 * trailing_avg) |>
transmute(synthetic_aeuid,
shock_month = month,
services, trailing_avg) |>
compute(name = "mbs_shocks", temporary = FALSE, overwrite = TRUE)
toc()
n_shocks <- mbs_shocks |> summarise(n = n()) |> pull(n)
n_shocked <- mbs_shocks |> summarise(n = n_distinct(synthetic_aeuid)) |> pull(n)
cat("Health shock events:", comma(n_shocks),
"across", comma(n_shocked), "people\n")
```
`window_frame(from, to)` says which rows to include in the rolling window: `-3` means three rows back, `-1` means one row back. So `window_frame(-3, -1)` means "look at rows 3, 2, and 1 rows before the current row." Negative = earlier, positive = later, `0` = the current row. Everything else is ordinary `group_by() |> window_order() |> mutate()`.
::: {.callout-note}
## Aggregate first, then window
Collapsing 372M claims to ~206M person-months _before_ windowing is what makes this step work at all. 206M rows fit in memory for the sort. Running the rolling window directly on 372M raw claims would hit the same memory limit we hit with STP.
:::
## Step 3: Link switches to shocks via spine
STP uses ATO identifiers; MBS uses DHDA identifiers. The spine tables bridge them via shared `spine_id`. The linkage is a double join: ATO-aeuid → spine_id → DHDA-aeuid.
```{r link}
tic("Cross-spine join: switches x shocks")
crosswalk <- ato_spine |> select(ato_aeuid = synthetic_aeuid, spine_id) |>
inner_join(dhda_spine |> select(dhda_aeuid = synthetic_aeuid, spine_id),
by = "spine_id") |>
select(ato_aeuid, dhda_aeuid)
shocks_via_dhda <- mbs_shocks |>
select(dhda_aeuid = synthetic_aeuid, shock_month)
post_shock <- stp_switches |>
inner_join(crosswalk, by = c("synthetic_aeuid" = "ato_aeuid")) |>
inner_join(shocks_via_dhda, by = "dhda_aeuid") |>
filter(week_ending >= shock_month,
week_ending < shock_month + days(90L)) |>
mutate(switch_month = floor_date(week_ending, "month")) |>
group_by(switch_month) |>
summarise(switches_post_shock = n(),
people = n_distinct(synthetic_aeuid),
.groups = "drop") |>
arrange(switch_month) |>
collect()
toc()
post_shock
```
Three joins, one date filter, one groupby. No SQL.
`shock_month + days(90L)` just works: dbplyr translates it to DuckDB's date arithmetic, so "within three months of the shock" is a plain `filter()`.
## Plot
```{r baseline}
tic("Baseline switching rate")
baseline <- stp_switches |>
mutate(switch_month = floor_date(week_ending, "month")) |>
group_by(switch_month) |>
summarise(switches = n(),
people = n_distinct(synthetic_aeuid),
.groups = "drop") |>
arrange(switch_month) |>
collect()
toc()
```
```{r plot}
#| fig-width: 8
#| fig-height: 5
if (nrow(baseline) > 0 && nrow(post_shock) > 0) {
bind_rows(
baseline |> mutate(group = "All switchers",
month = as.Date(switch_month),
n = switches) |> select(month, n, group),
post_shock |> mutate(group = "Post health-shock (3-month window)",
month = as.Date(switch_month),
n = switches_post_shock) |> select(month, n, group)
) |>
ggplot(aes(month, n, colour = group)) +
geom_line(linewidth = 0.6) +
scale_y_continuous(labels = label_comma()) +
labs(x = NULL, y = "Employer switches", colour = NULL,
title = "Employer switches: post-health-shock vs all workers",
subtitle = "STP FY2122 x MBS 2015, fplida synthetic data") +
theme_minimal(base_size = 11) +
theme(legend.position = "bottom")
} else {
cat("No switches detected — expected with synthetic data that does not model job mobility.\n")
cat("The code, strategy, and timings above are the deliverable of this case study.\n")
}
```
::: {.callout-note}
## Synthetic data limitation
The `fplida` synthetic data does not model job switching — each person keeps the same employer throughout. So the switch count is 0. The pipeline design, memory management, and timings are real; the _results_ would be meaningful on actual PLIDA data where people do change jobs.
:::
## What it cost
```{r summary-table}
#| echo: false
tibble::tribble(
~step, ~approach, ~time,
"Register views over parquet", "CREATE VIEW ... read_parquet()", "instant",
"Derive employer switches (STP)", "Chunked lag(), 10 person-hash buckets", "~45s",
"Collapse MBS to person-months", "mutate(floor_date) + summarise()", "~20s",
"Flag health shocks", "window_frame(-3, -1) + mean()", "~10s",
"Cross-spine join: switches x shocks", "Two inner_join()s via crosswalk", "<1s",
) |>
knitr::kable(col.names = c("Step", "Approach", "Time"))
```
Total: about 75 seconds. None of the steps exceeded 20 GB of RAM. The same pipeline against materialised DuckDB tables takes ~235 seconds (the 170-second load dominates). Views over parquet are the right default when you're running the pipeline once; materialise only if you'll re-query the same table many times.
## Why we chose this approach
We tried four approaches before settling on views + chunked `lag()`. All of them — and the reason each failed or succeeded — are documented in the [window-function benchmarks](A4-window-function-benchmarks.qmd):
- `lag()` on 700M rows: out of memory (sort needs too much space)
- Self-join on 700M rows: out of memory (lookup structure too big)
- `lag()` on 70M rows (one bucket): 2.3 seconds, success
- Self-join on 70M rows: 2.0 seconds, success
- Chunked self-join, 10 buckets: 20 seconds, success
- Chunked `lag()` over parquet views, 10 buckets: 45 seconds, success
- **Views + chunked `lag()` via dbplyr**: 75 seconds end-to-end, best total time
## Why this is all tidyverse
Every analytical step above — the chunked `lag()`, the group-by to person-months, the trailing mean, the cross-spine double join, the date filter — is ordinary dplyr. dbplyr turns it into SQL behind the scenes. If you are ever curious what SQL it generated, call `show_query()` on the lazy table.
The only SQL on this page is `register_view()` — one line per source table, because there is no tidyverse equivalent of "tell DuckDB that this parquet folder is now a table called X". Everything after that is dplyr.
```{r disconnect}
#| include: false
dbDisconnect(con, shutdown = TRUE)
file_delete(db_path)
```