Skip to content

Data Engine Operations

Data operations execute directly in the Query Engine (SQLite locally, ClickHouse in Studio). They never spin up Python runtimes, never download files, and scale to millions or billions of records.

The rule: if it can be expressed as a data operation, it should be. Never materialize a chain with to_pandas() or to_list() just to run aggregation or grouping in Python; use native operations instead.

Native vs Non-Native

Non-native (pulls data into Python, iterates manually):

import datachain as dc

files = dc.read_storage("gs://datachain-demo/").to_list()
totals = {}
for f, in files:
    ext = f.path.rsplit(".", 1)[-1].lower()
    total_bytes, file_count = totals.get(ext, (0, 0))
    totals[ext] = (total_bytes + f.size, file_count + 1)

Native (runs entirely in the Query Engine):

import datachain as dc

(
    dc.read_storage("gs://datachain-demo/")
    .filter(dc.C("file.size") > 0)
    .group_by(
        count=dc.func.count(),
        total=dc.func.sum(dc.C("file.size")),
        partition_by=dc.func.path.file_ext(dc.C("file.path")),
    )
    .order_by("total", descending=True)
    .show()
)

Aggregate Analytics on Nested Objects

The Query Engine reaches into Pydantic models serialized in the database, including deeply nested fields:

import datachain as dc

chain = dc.read_dataset("llm_responses")

cost = (
    chain.sum("response.usage.prompt_tokens") * 0.000002
    + chain.sum("response.usage.completion_tokens") * 0.000006
)
print(f"Spent ${cost:.2f} on {chain.count()} calls")

response.usage.prompt_tokens traverses two levels of Pydantic nesting. The sum runs entirely in SQL: no Python runtime, no deserialization.

Operations Catalog

Every operation returns a new chain. Chains are immutable and composable.

Reading and Combining

  • merge: join two chains on shared keys (like SQL JOIN)
  • union: vertical concatenation (also the | operator)
  • subtract: set difference, rows in left chain not in right
  • distinct: deduplicate rows
import datachain as dc

images = dc.read_storage("s3://bucket/images/", type="image")
labels = dc.read_json("s3://bucket/labels/*json", column="meta")
annotated = images.merge(labels, on="id", right_on="meta.id")

Filtering and Selecting

  • filter: keep rows matching a condition
  • select: keep only named columns
  • select_except: drop named columns
  • limit / offset: pagination
  • sample: random subset
import datachain as dc

chain = dc.read_storage("s3://bucket/data/")
chain.filter(dc.C("score") > 0.9)
chain.filter((dc.C("meta.inference.class_") == "cat") & (dc.C("meta.confidence") > 0.93))
chain.filter(dc.C("detection.label").contains("person"))

Sorting and Grouping

  • order_by: sort by columns (ascending or descending)
  • group_by: partition rows and apply aggregate functions
  • shuffle: randomize row order
import datachain as dc

chain.group_by(
    avg_size=dc.func.avg("file.size"),
    count=dc.func.count(),
    partition_by="category",
)

chain.order_by("score", descending=True)

Mutate

mutate adds or replaces columns using native expressions that run at warehouse speed. It accepts dc.func.* expressions, column arithmetic, and column comparisons:

import datachain as dc

chain = dc.read_storage("s3://bucket/images/").mutate(
    ext=dc.func.path.file_ext("file.path"),
    parent=dc.func.path.parent("file.path"),
)

chain.mutate(
    total=dc.C("price") * dc.C("quantity"),
    label=dc.func.case(
        (dc.C("score") > 0.9, "high"),
        (dc.C("score") > 0.5, "medium"),
        else_="low",
    ),
    is_large=dc.C("file.size") > 1_000_000,
)

Mutate does NOT accept lambdas or Python callables; use map() for those. See the mutate vs map comparison below.

Comparing

  • diff: compare two chains and surface differences (A/D/M/S status)
  • file_diff: compare file-level changes between chains

Configuring

  • settings: set parallel, cache, namespace, project, and other execution parameters
  • setup: run initialization code on each worker before processing
  • chunk: split chain into fixed-size chunks for batch processing

Terminal Operations

  • Persist: save (named dataset), persist (anonymous cache)
  • Export: to_pandas, to_parquet, to_csv, to_json, to_jsonl, to_pytorch, to_storage, to_database
  • Inspect: show, collect, count, sum, avg, min, max

Mutate vs Map

Aspect mutate map
Runs in Query Engine (SQL, data operation) Python runtime (Python function)
Speed Warehouse speed, vectorized Per-record Python execution
Accepts dc.func.*, column arithmetic, comparisons Any Python callable
File access No Yes (can read file content)
Parallelism Automatic (engine-level) Requires .settings(parallel=True)
Use when Deriving columns from existing data Processing file content, calling models/LLMs

Complete Examples

Merging Files with Metadata

The most common pattern: files in storage, annotations in a sidecar format. Read both, derive a join key, merge, filter, and export:

import datachain as dc

images = dc.read_storage("gs://bucket/images/*jpg", anon=True)
meta = dc.read_json("gs://bucket/images/*json", column="meta", anon=True)

images_id = images.map(id=lambda file: file.path.split(".")[-2])
annotated = images_id.merge(meta, on="id", right_on="meta.id")

high_conf = annotated.filter(
    (dc.C("meta.inference.confidence") > 0.93)
    & (dc.C("meta.inference.class_") == "cat")
)
high_conf.to_storage("high-confidence-cats/", signal="file")

LLM Cost Tracking

Compute API costs across thousands of calls using aggregate analytics on nested Pydantic fields, no deserialization, no Python runtime:

import datachain as dc

chain = dc.read_dataset("llm_responses")

cost = (
    chain.sum("response.usage.prompt_tokens") * 0.000002
    + chain.sum("response.usage.completion_tokens") * 0.000006
)
print(f"Spent ${cost:.2f} on {chain.count()} calls")