Skip to content

Python Operations

Python functions run batch processing on a chain to generate new chain values. A function takes fields from one or more rows of the data and outputs new fields. Functions run at scale on multiple workers and processes.

Any Python function works as an operation. The classes below are useful to implement a "stateful" operation where a plain function is insufficient, such as when additional setup() or teardown() steps need to happen before or after the processing function runs.

UDFBase

UDFBase()

Bases: AbstractUDF

Base class for stateful user-defined functions.

Any class that inherits from it must have a process() method that takes input params from one or more rows in the chain and produces the expected output.

Optionally, the class may include these methods: - setup() to run code on each worker before process() is called. - teardown() to run code on each worker after process() completes.

Example
import datachain as dc
import open_clip

class ImageEncoder(dc.Mapper):
    def __init__(self, model_name: str, pretrained: str):
        self.model_name = model_name
        self.pretrained = pretrained

    def setup(self):
        self.model, _, self.preprocess = (
            open_clip.create_model_and_transforms(
                self.model_name, self.pretrained
            )
        )

    def process(self, file) -> list[float]:
        img = file.get_value()
        img = self.preprocess(img).unsqueeze(0)
        emb = self.model.encode_image(img)
        return emb[0].tolist()

(
    dc.read_storage(
        "gs://datachain-demo/fashion-product-images/images", type="image"
    )
    .limit(5)
    .map(
        ImageEncoder("ViT-B-32", "laion2b_s34b_b79k"),
        params=["file"],
        output={"emb": list[float]},
    )
    .show()
)
Source code in datachain/lib/udf.py
def __init__(self):
    self.params: SignalSchema | None = None
    self.output = None
    self._func = None

verbose_name property

verbose_name

Returns the name of the function or class that implements the UDF.

hash

hash(include_body: bool = True) -> str

Creates SHA hash of this UDF function. It takes into account function, inputs and outputs.

For function-based UDFs, hashes self._func. For class-based UDFs, hashes the process method.

When include_body=False, the function body is excluded (identity-only: module + qualname + defaults). Lambdas always include their bytecode since they share the name ''.

Source code in datachain/lib/udf.py
def hash(self, include_body: bool = True) -> str:
    """
    Creates SHA hash of this UDF function. It takes into account function,
    inputs and outputs.

    For function-based UDFs, hashes self._func.
    For class-based UDFs, hashes the process method.

    When include_body=False, the function body is excluded (identity-only:
    __module__ + __qualname__ + defaults). Lambdas always include their
    bytecode since they share the name '<lambda>'.
    """
    # Hash user code: either _func (function-based) or process method (class-based)
    func_to_hash = self._func or self.process

    parts = [
        hash_callable(func_to_hash, include_body=include_body),
        self.params.hash() if self.params else "",
        self.output.hash(),
    ]

    return hashlib.sha256(
        b"".join([bytes.fromhex(part) for part in parts])
    ).hexdigest()

process

process(*args, **kwargs)

Processing function that needs to be defined by user

Source code in datachain/lib/udf.py
def process(self, *args, **kwargs):
    """Processing function that needs to be defined by user"""
    if not self._func:
        raise NotImplementedError("UDF processing is not implemented")
    return self._func(*args, **kwargs)

setup

setup()

Initialization process executed on each worker before processing begins. This is needed for tasks like pre-loading ML models prior to scoring.

Source code in datachain/lib/udf.py
def setup(self):
    """Initialization process executed on each worker before processing begins.
    This is needed for tasks like pre-loading ML models prior to scoring.
    """

teardown

teardown()

Teardown process executed on each process/worker after processing ends. This is needed for tasks like closing connections to end-points.

Source code in datachain/lib/udf.py
def teardown(self):
    """Teardown process executed on each process/worker after processing ends.
    This is needed for tasks like closing connections to end-points.
    """

Aggregator

Aggregator()

Bases: UDFBase

Inherit from this class to pass to DataChain.agg().

Source code in datachain/lib/udf.py
def __init__(self):
    self.params: SignalSchema | None = None
    self.output = None
    self._func = None

Generator

Generator()

Bases: UDFBase

Inherit from this class to pass to DataChain.gen().

Source code in datachain/lib/udf.py
def __init__(self):
    self.params: SignalSchema | None = None
    self.output = None
    self._func = None

Mapper

Mapper()

Bases: UDFBase

Inherit from this class to pass to DataChain.map().

Source code in datachain/lib/udf.py
def __init__(self):
    self.params: SignalSchema | None = None
    self.output = None
    self._func = None