Scaling and Performance
DataChain has a layered optimization architecture. Each layer eliminates a source of waste.
Parallel Execution
settings(parallel=N) runs N threads, each with its own model instance. parallel=-1 auto-detects CPU cores.
import datachain as dc
chain = (
dc.read_storage("s3://bucket/images/", type="image")
.settings(parallel=8, prefetch=10)
.map(emb=compute_embedding)
.save("image_embeddings")
)
Distributed Execution
settings(workers=K) spreads work across K machines via Studio's BYOC model (10-300 machine Kubernetes clusters). Threads and workers compose: parallel=8, workers=50 means 8 threads on each of 50 machines.
import datachain as dc
(
dc.read_storage("s3://bucket/images/", type="image")
.settings(parallel=8, workers=50, cache=True, prefetch=10)
.map(emb=compute_embedding)
.save("image_embeddings")
)
Data Stays in Storage
DataChain's storage-native architecture means files live in cloud storage and are never copied. The File abstraction operates by reference: only files needed by Python operations are downloaded, and only when their content is actually accessed.
Async Prefetch
prefetch=N downloads N files ahead while the current file is being processed, overlapping network I/O with computation. Tune by file size: ~10-16 for small files, 1 for large files.
File Cache
settings(cache=True) stores downloaded files locally. On the next run, the same files are served from cache.
import datachain as dc
chain = (
dc.read_storage("s3://bucket/images/", type="image")
.settings(parallel=8, cache=True)
.map(emb=clip_embedding)
.save("image_embeddings")
)
Cache identifies files by path, etag, and version. If a file is overwritten on storage, DataChain re-downloads it.
On distributed clusters in Studio, cache can be a shared volume so all machines read from the same cache directory.
Checkpoints
Checkpoints make failure recoverable without reprocessing from scratch. DataChain tracks execution state and reuses results of previous runs.
Two levels of checkpointing operate independently:
- Dataset checkpoints skip entire
save()calls when the chain hash matches - Python-operation checkpoints save per-row progress inside
map()andgen(); if an operation fails mid-execution, only unprocessed rows are recomputed
Checkpoints work in script-based execution only, not in the Python REPL or Jupyter notebooks.
Delta Updates
delta=True processes only new and changed files:
import datachain as dc
chain = (
dc.read_storage(
"s3://bucket/data/",
update=True,
delta=True,
delta_on="file.path",
delta_compare="file.mtime",
)
.map(result=process_file)
.save("processed_data")
)
The resulting dataset is always complete, containing both previously processed and newly processed records.
The Error Field Pattern
Standard approach for production queries that auto-retry failures:
import datachain as dc
def process_file(file):
try:
content = file.read_text()
result = analyze_content(content)
return {"content": content, "result": result, "error": ""}
except Exception as e:
return {"content": "", "result": "", "error": str(e)}
chain = (
dc.read_storage(
"s3://bucket/data/",
delta=True,
delta_on="file.path",
delta_retry="error",
)
.map(result=process_file)
.save("processed_files")
)
On each run, DataChain processes newly added files (delta) and re-processes previously failed records (retry). Transient failures resolve automatically.
Bucket Listing Cache
Storage listings are cached as datasets. Scanning a GCS bucket of 1M files takes minutes once, then queries resolve instantly.
Vectorized Operations
Data-engine operations (filter, group_by, order_by, mutate, aggregate) run as SQL inside the Query Engine at warehouse speed. The entire function library runs natively without Python.
Dataset Reuse
The most powerful optimization: starting from previously computed results. When a query consumes read_dataset("embeddings") instead of recomputing from raw files, it skips all upstream work. This is why save() is the default terminal operation.
Optimization Summary
| Layer | What it eliminates | How to enable |
|---|---|---|
| Data stays in storage | Unnecessary file copies | Automatic (File abstraction) |
| Bucket listing cache | Redundant storage scans | Automatic |
| Async prefetch | I/O idle time | settings(prefetch=N) |
| File cache | Redundant downloads | settings(cache=True) |
| Checkpoints | Wasted computation on failure | Automatic (scripts only) |
| Delta updates | Reprocessing unchanged data | delta=True on read |
| Parallel compute | Sequential bottleneck | settings(parallel=N) |
| Distributed compute | Single-machine ceiling | settings(workers=K) (Studio) |
| Vectorized ops | Python overhead for SQL-expressible work | Use data-engine operations |
| Dataset reuse | Recomputing upstream work | save() + read_dataset() |
Complete Example: Delta Processing for Growing Datasets
Process only new and changed files on each run. The resulting dataset is always complete:
import datachain as dc
chain = (
dc.read_storage(
"s3://bucket/incoming/",
update=True,
delta=True,
delta_on="file.path",
)
.settings(parallel=8)
.map(result=process_file)
.save("processed_incoming")
)
Each run adds new files to the dataset while preserving all previously processed results. Combined with checkpoints, this means a query can handle both growing data and mid-run failures without any special logic.