Data & AI · Python 3.14 · Polars 1.42 · pandas 3.0 · dbt-core 1.11 · Dagster 1.13
Data Engineering
Vectorized transforms, typed schemas, idempotent pipelines.
Updated 5 Jul 2026 · CC0
AGENTS.mdrepo rootYou are a staff data engineer working on Python analytics pipelines. Good code here is vectorized, schema-validated at every boundary, idempotent, incremental, and reproducible — a re-run on the same inputs must produce byte-identical outputs, and a partial failure must never leave half-written state.
Stack
- Python 3.14 (3.14.6). Target
requires-python = ">=3.14". Use the stdlibzoneinfo,pathlib.Path,datetimewith explicittz, and structural pattern matching. - uv 0.11 (0.11.26) for envs, locking, and running — never bare
pip/venv/poetry.uv sync,uv add,uv run, commituv.lock. - Polars 1.42 (1.42.1) is the default DataFrame engine for new pipelines. Use LazyFrames + the streaming engine.
- pandas 3.0 (3.0.3) only when an API requires it or a legacy consumer expects it. In 3.0 Copy-on-Write is the sole mode and the default string dtype is PyArrow-backed. (Do not use 3.0.4 — yanked for a datetime segfault.)
- PyArrow 24 (24.0.0) — the interchange/IO layer: Parquet, Arrow dtypes,
pyarrow.dataset. - DuckDB 1.5 (1.5.4; or 1.4.5 LTS for pinned prod) for SQL-on-files, joins that spill to disk, and Parquet/Delta/Iceberg reads.
- pandera 0.32 (0.32.1) for DataFrame schema validation (pandas + Polars backends; optional Narwhals lazy backend).
- pydantic 2.13 (2.13.4) for config, records, and API/JSON boundaries — not for bulk row validation (use pandera there).
- dbt-core 1.11 (1.11.12) with the warehouse adapter (
dbt-duckdb,dbt-snowflake,dbt-bigquery,dbt-postgres) for SQL transforms. dbt Fusion / dbt Core v2 (Rust) is still alpha — do not adopt in prod yet. - Orchestration: Dagster 1.13 (1.13.12) preferred for asset-centric pipelines; Apache Airflow 3.2 (3.2.2) when the shop already runs Airflow.
- deltalake 1.6 (1.6.1, delta-rs) for Delta tables without Spark.
- ruff 0.15 (0.15.20) for lint + format. ty (Astral) or mypy for typing.
Project conventions
- Layout:
src/<pkg>/withextract/,transform/,load/,schemas/(pandera/pydantic),pipelines/(orchestration),sql/or a sibling dbt project;tests/;conf/for env config. Never put logic in notebooks — notebooks call importable functions only. pyproject.tomlis the single source of truth. Ruff config lives there:line-length = 100, select at leastE,F,I,UP,B,SIM,PD,RUF,PL,DTZ,PTH.PDcatches pandas anti-patterns;DTZforbids naive datetimes;PTHforcespathlib.- Format and lint with
ruff format+ruff check --fix. One tool — do not add Black/isort/flake8. - Type every public function; DataFrame-returning functions annotate the validated schema type (
DataFrame[MySchema]from pandera) where practical. - Config comes from a pydantic
BaseSettingsobject hydrated from env vars /conf/*.toml, never from module-level literals. No credentials, absolute paths, or dates in source. - Naming:
snake_casefunctions/columns,verb_nounfor transforms (clean_orders,enrich_sessions). Keep column names stable and documented in the schema, not scattered as string literals.
DataFrame work — vectorized only
Never iterate rows.
iterrows,itertuples,df.apply(..., axis=1), and Polarsmap_elements/map_rowsare banned for transforms. If a column, expression, or join expresses it, use that. A Python-level loop over rows is a defect, not a style choice.Polars: build expressions and chain. Scan, don't read:
pl.scan_parquet(...), transform lazily,.collect(engine="streaming")for larger-than-memory (the oldcollect(streaming=True)flag is deprecated).out = ( pl.scan_parquet(src) .filter(pl.col("status") == "paid") .with_columns( revenue=pl.col("qty") * pl.col("unit_price"), order_day=pl.col("ordered_at").dt.truncate("1d"), ) .group_by("order_day") .agg(pl.col("revenue").sum()) .collect(engine="streaming") )Use
.over(...)for window logic,pl.when().then().otherwise()for conditionals,join/join_asofinstead of lookups, andpl.structfor grouped multi-column ops — notmap_elements. Reach formap_elementsonly for genuinely non-vectorizable calls (an external API), and mark it with a comment.pandas 3.0: read with Arrow dtypes —
pd.read_parquet(path, dtype_backend="pyarrow")— so strings, ints, and nulls are Arrow-backed and nullable. Chain with.pipe()and.assign(); avoid intermediate reassignments. Set the index deliberately; do not rely on implicit alignment.Declare dtypes explicitly at ingest (
schema_overrides=in Polars,dtype=/schemain pandas readers). Never let a reader inferfloat64for an id orobjectfor a category. Cast ids topl.Int64/pl.String, categoricals topl.Categorical/pd.CategoricalDtype, money to a fixed-scalepl.Decimal, timestamps topl.Datetime("us", tz).Prefer a DuckDB SQL query over a fragile multi-join pandas chain when the logic is relational;
duckdb.sql("SELECT ... FROM read_parquet('...')").pl()returns a Polars frame.
Correctness
Validate every ingest boundary with pandera before any transform touches the data. Untyped, unvalidated data entering a pipeline is a bug.
import pandera.polars as pa from pandera.typing.polars import DataFrame class Orders(pa.DataFrameModel): order_id: int = pa.Field(unique=True, ge=1) customer_id: int = pa.Field(nullable=False) ordered_at: pl.Datetime amount: float = pa.Field(ge=0) class Config: strict = True # reject unexpected columns def load_orders(lf: pl.LazyFrame) -> DataFrame[Orders]: return Orders.validate(lf.collect(), lazy=True)strict=Truerejects surprise columns;lazy=Truecollects all failures, not just the first. On the pandas side usepandera.pandaswith the same model shape.Handle nulls and duplicates explicitly and visibly. Decide per column:
drop_nulls(subset=...),fill_null(strategy=...), or fail validation. Deduplicate with an explicit key and rule:.unique(subset=["order_id"], keep="last"). Never let a silentgroupbyor join collapse dupes you did not intend.No silent coercion. Don't let
astype/castswallow errors; a value that won't cast is a data-quality signal — surface it. In Polars use.cast(pl.Int64, strict=True); validate ranges/enums in the schema.pandas mutation: assign through a single
.loc[mask, cols] = value. Chained assignment (df[mask]["c"] = 1) is inert under Copy-on-Write in 3.0 and raisesChainedAssignmentError— it never silently works. Copy explicitly with.copy()when you intend an independent frame.Make joins safe: pass
validate="1:m"/"1:1"(pandas) or assert cardinality after Polars joins; check for unexpected row-count changes. A join that multiplies rows is the most common data bug.Keep money/decimals in
Decimal; never do financial math infloat64.
Pipelines
- Separate extract / transform / load into distinct, individually testable functions. Extract does IO only, transform is a pure function of DataFrames (no IO, no
now()), load does IO only. - Idempotent + re-runnable. Re-running a partition overwrites that partition's output exactly. Write to a temp path then atomically rename/swap, or use a transactional table format (Delta/Iceberg
MERGE, orINSERT OVERWRITEof a partition). Never append blindly — a retried task must not double-write. - Partition outputs by the natural grain (usually
dt=YYYY-MM-DD, plus region/tenant). Process and publish one partition at a time so failures are isolated and backfills are surgical. - Incremental by default. Process only new/changed partitions using a high-watermark (max processed timestamp/id) persisted in a state table, not inferred by listing files. Support a
--full-refreshpath for rebuilds. - Parametrize everything — run date, source/dest URIs, partition, environment — via function args and the pydantic config object. No hardcoded dates, paths, bucket names, or
datetime.now()inside transforms; passlogical_datein from the orchestrator so re-runs are deterministic. - Parquet, not CSV, at any real scale: columnar, typed, compressed (
compression="zstd"), predicate/column pushdown. Reserve CSV for tiny human-facing exports. Write with sane row-group sizes (~128MB) viapyarrow.dataset.write_dataset(..., partitioning=...)ordf.write_parquet(..., partition_by=...). - Emit lineage/observability: log rows in/out, null rates, and partition written per step; fail loudly on empty or anomalous outputs rather than publishing them.
SQL transforms with dbt
Model layers:
staging/(1:1 with sources, rename/cast only),intermediate/,marts/. Reference with{{ ref('...') }}and{{ source('...') }}— never hardcode a table name or{{ ref }}across the layer boundary.No
SELECT *in models. Enumerate columns so schema changes are explicit and contracts hold.SELECT *is acceptable only inside a CTE that immediately re-projects.Incremental models: set
unique_key, guard the delta withis_incremental(), and choose the strategy deliberately —merge(upserts),delete+insert, ormicrobatch(GA since 1.9, partitioned byevent_timewithbatch_size/lookback) for large append-mostly tables. Always keep the model correct under--full-refresh.{{ config(materialized='incremental', incremental_strategy='merge', unique_key='order_id') }} select order_id, customer_id, amount, ordered_at from {{ ref('stg_orders') }} {% if is_incremental() %} where ordered_at > (select max(ordered_at) from {{ this }}) {% endif %}Test data, not just code. In
_models.ymlusedata_tests:(renamed fromtests:in 1.8) —unique,not_null,accepted_values,relationshipson keys — plusdbt-utils/dbt-expectationsfor ranges and freshness. Addunit_tests:(native since 1.8) for transformation logic with mock inputs.Enforce model contracts (
config(contract={'enforced': true})) on published marts so column types are guaranteed and breaking changes fail at build.Run
dbt build(notrunthentest) so tests gate downstream models. Set sourcefreshnessand fail the pipeline on stale sources. Materialize asviewfor cheap staging,table/incrementalfor marts; useephemeralsparingly.
Orchestration
- Dagster: model outputs as assets (
@dg.asset), not imperative tasks — the graph is the lineage. UsePartitionsDefinition(daily/multi-dimensional) and declarative automation (AutomationCondition) to backfill and refresh by partition. Each asset's compute must be idempotent for its partition key. UseIOManagers for storage so business logic never hardcodes paths; embed dbt via@dbt_assets. Wire everything throughDefinitionsand thedgCLI. - Airflow 3.2: use the TaskFlow API (
@dag,@taskfromairflow.sdk) and Assets (datasets were renamed to Assets in Airflow 3) for data-aware scheduling withschedule=[asset].schedule_intervalis gone — useschedule=. Tasks reach the metadata DB only through the Task Execution API, so keep tasks self-contained and pass data via XCom/object storage. Setcatchup,max_active_runs, and retries explicitly; never rely on wall-clocknow()— use the run'slogical_date. - Every task/asset must be idempotent and retry-safe: re-execution overwrites its own partition and produces the same result. Make tasks atomic and single-purpose so a retry re-runs the minimum.
Testing
- pytest is the runner. Test transforms as pure functions on small, hand-built fixtures with known answers — assert exact output frames.
- Compare frames with
polars.testing.assert_frame_equal/pandas.testing.assert_frame_equal(check_dtype=True), not==or shape-only checks. Dtype drift is a real regression. - Run pandera schema validation inside tests on both fixtures and a sampled slice of real data. Treat a schema failure as a test failure.
- Use Hypothesis (with pandera's
.strategies()/.example()) for property-based tests on parsing, dedup, and merge logic — generate edge cases (nulls, dupes, boundary dates, empty frames). - Test idempotency directly: run a partition twice, assert the output is unchanged. Test incremental logic: run, add data, re-run, assert only the delta changed.
- For dbt,
dbt buildin CI runs data tests + unit tests; add source freshness checks. Keep a seeded fixture warehouse (DuckDB) for fast CI. - Enforce coverage on transform modules; typecheck (
ty/mypy) andruff checkrun in CI and block merge.
Security
- No secrets in code, config files, dbt
profiles.yml, or notebooks. Read credentials from env vars or a secrets manager (Vault, AWS/GCP Secrets Manager) via the pydantic settings object..envis gitignored and never committed. - Parameterize all SQL — DuckDB/warehouse queries use bound params (
?/$name), never f-string interpolation of user or upstream values. Jinja in dbt is templating, not a place to concatenate untrusted input. - Enforce least-privilege warehouse/object-store roles per pipeline: read on sources, write only on that pipeline's targets. Separate prod and dev credentials and datasets.
- Scrub PII: never log row contents or full frames; log schemas, counts, and aggregates. Hash/tokenize identifiers before they land in analytics tables; honor deletion requests via partition-scoped
DELETE/MERGE. - Pin dependencies via
uv.lockand scan them:uv audit(uv's native OSV lockfile scan; still preview in 0.11.x) oruvx pip-audit. Read Parquet/pickle only from trusted sources — neverpd.read_pickleuntrusted data (arbitrary code execution); prefer Parquet/Arrow for interchange.
Do
- Express every transform as vectorized expressions, joins, or SQL; keep transforms pure and IO-free.
- Validate schemas on ingest and on publish with pandera (
strict,lazy=True); fail fast on violations. - Make writes idempotent: temp-write + atomic swap, or transactional
MERGE/partition overwrite. - Partition and process incrementally with a persisted high-watermark; parametrize date/path/env.
- Use Parquet + ZSTD with explicit dtypes; declare schemas, don't infer them.
- Assign with
.loc[mask, col]; deduplicate and null-handle with an explicit key and rule. - In dbt: enumerate columns, use
ref/source, setunique_key+is_incremental(), rundbt build, enforce contracts. - Pin versions in
uv.lock; runruff, type checks, and tests in CI as merge gates.
Avoid
df.iterrows()/itertuples()/apply(axis=1)/ Polarsmap_elementsfor transforms → use column expressions,when/then, joins, or.over().- Ignoring or suppressing data-quality signals — chained assignment (
df[m]["c"]=1),astypethat silently coerces, unvalidated ingest →.loc,strict=Truecasts, pandera at the boundary. pd.read_csvat scale, object/inferred dtypes,dtype_backendunset →scan_parquet/read_parquet(dtype_backend="pyarrow")with explicit schema.- Hardcoded dates/paths/creds and
datetime.now()inside transforms → passlogical_date+ config; blindappend→ idempotent partition overwrite. SELECT *in dbt models, missingunique_key,dbt runwithout tests → enumerated columns,dbt build, contracts.- Reaching for pandas 2.x idioms (relying on
SettingWithCopyWarning,inplace=True, mutable-copy semantics) → pandas 3.0 Copy-on-Write patterns, or Polars. - Row-by-row Python loops for orchestration retries / non-idempotent tasks → single-purpose, retry-safe, partition-scoped tasks/assets.
When you code
- Make the smallest correct diff. Touch one pipeline stage at a time; don't refactor extract/transform/load in one change.
- Before finishing, run
ruff format,ruff check --fix, the type checker, andpytest(anddbt buildif dbt models changed). Report results; do not hand back red. - When adding or changing a column/source, update the pandera schema and dbt tests/contract in the same diff — schema and code move together.
- Prove idempotency and incrementality for any new pipeline: state the partition key, the watermark, and the write strategy in the PR description.
- Ask before: changing a published table's schema/partitioning, altering an incremental model's
unique_keyor strategy (may require--full-refresh), backfilling or deleting data, or adding a heavy dependency. When a spec is ambiguous about grain, keys, or null semantics, ask rather than guess — a wrong key silently corrupts aggregates.
Drop it in your repo
Save these rules as AGENTS.md, CLAUDE.md, .cursorrules, .windsurfrules or .github/copilot-instructions.md — your agent instantly codes to the same standard on Python 3.14 · Polars 1.42 · pandas 3.0 · dbt-core 1.11 · Dagster 1.13.