See it in action on the hev-shop demo store.

Operations

Function CRD

The Function CRD is a User Defined Function (UDF) that runs over rows that already exist in an Index. It is the right shape for classifiers, enrichment, backfills, fan-out from an existing row, and deterministic re-upserts. UDFs are best defined in YAML and invoked by the layer CLI. The operator creates worker resources; the gateway owns discovery, queueing, retries, leases, and completion markers. Workers own their data writes.

Use a Pipeline when external data becomes rows in Layer. Use a Function when compute starts from rows that are already in Layer.

apiVersion: hevlayer.com/v1alpha1
kind: Function
metadata:
  name: tag-products
  namespace: layer
spec:
  targetNamespaces:
    - products
  inputs:
    - id
    - title
  version: v1
  filter:
    - category
    - Eq
    - outdoor
  worker:
    image: ghcr.io/hev/tag-products:latest
    dispatch: pull
    computeClass: cpu
    batchSize: 32
    timeoutSeconds: 30
  schedule:
    discoveryIntervalSeconds: 300
    leaseSeconds: 120
    maxInFlightBatches: 8
    maxConcurrentScans: 1
  retry:
    maxAttempts: 8
    initialBackoffSeconds: 5
    maxBackoffSeconds: 300
  triggers:
    - discovery
  scaling:
    pool: cpu
    mode: autoscale
    replicas:
      min: 0
      max: 6

Selection

Use targetNamespaces for explicit namespaces. Use indexSelector when labels on Index resources should choose the namespaces.

filter preserves arbitrary JSON, including array-form Turbopuffer filters. The operator stores the shape as-is; the gateway evaluates it during discovery after AND-ing it with the generated completion-marker predicate. Do not include a version-marker predicate in filter; the gateway creates that from spec.version.

Worker

FieldPurpose
imageWorker image.
dispatchpull for SDK claim/poll workers, push for HTTP /run workers.
computeClasscpu or gpu. Defaults to cpu; when scaling.pool is omitted, the operator maps this to the stock cpu or gpu pool.
portPush-dispatch service port.
batchSizeRows per batch.
timeoutSecondsWorker call timeout.
podSpecOptional pod-level merge patch.

To apply the CR, register the gateway UDF, trigger discovery, and watch the queue with one command, use layer run -f.

The worker pod receives HEVLAYER_UDF_ID, HEVLAYER_BASE_URL, HEVLAYER_UDF_BATCH_SIZE, HEVLAYER_UDF_TIMEOUT_SECONDS, HEVLAYER_UDF_LEASE_SECONDS, and LAYER_GATEWAY_API_KEY. The gateway bearer is sourced from the default VectorStore credential in deriveFromStore mode, or from the configured inbound worker key in keys mode.

Simple classifier

The Python client turns a normal function into the claim/process/complete loop. output="tags" is client-side metadata: the CRD does not declare an output attribute. run_udf_worker sends the returned value as a completion attributes.tags patch, and the gateway stamps the reserved completion marker in the same patch. The Go client drives the same worker protocol directly, as does the TypeScript client — claim a batch, process rows, report completions and failures.

import asyncio
from hevlayer.udf import PermanentError, TransientError, run_udf_worker, udf


@udf(inputs=["id", "title", "description"], output="tags", kind="tags")
def tag_product(*, id: str, title: str | None, description: str | None) -> list[str]:
    if not title:
        raise PermanentError(f"{id}: missing title")
    try:
        text = f"{title} {description or ''}".lower()
    except TypeError as exc:
        raise TransientError(str(exc)) from exc

    tags: list[str] = []
    if "wireless" in text:
        tags.append("wireless")
    if "waterproof" in text:
        tags.append("waterproof")
    return tags or ["uncategorized"]


if __name__ == "__main__":
    asyncio.run(run_udf_worker(tag_product, udf_id="product-tags"))
package main

import (
	"context"
	"os"
	"strings"

	hevlayer "github.com/hev/layer/clients/go"
)

func tags(title, description string) []string {
	text := strings.ToLower(title + " " + description)
	var out []string
	if strings.Contains(text, "wireless") {
		out = append(out, "wireless")
	}
	if strings.Contains(text, "waterproof") {
		out = append(out, "waterproof")
	}
	if len(out) == 0 {
		out = []string{"uncategorized"}
	}
	return out
}

func main() {
	ctx := context.Background()
	udfID := os.Getenv("HEVLAYER_UDF_ID")
	layer := hevlayer.NewClient(
		hevlayer.WithBaseURL(os.Getenv("HEVLAYER_BASE_URL")),
		hevlayer.WithAPIKey(os.Getenv("LAYER_GATEWAY_API_KEY")),
	)

	for {
		claimed, err := layer.ClaimUdfItems(ctx, udfID, &hevlayer.UdfClaimRequest{
			WorkerID: "tag-products-0",
			Limit:    32,
		})
		if err != nil {
			continue
		}
		var done []hevlayer.UdfCompleteItem
		var failed []hevlayer.UdfFailItem
		for _, item := range claimed.Items {
			title, _ := item.Input["title"].(string)
			description, _ := item.Input["description"].(string)
			if title == "" {
				failed = append(failed, hevlayer.UdfFailItem{
					Namespace: item.Namespace, ID: item.ID,
					Kind: "permanent", Message: "missing title",
				})
				continue
			}
			done = append(done, hevlayer.UdfCompleteItem{
				Namespace:  item.Namespace,
				ID:         item.ID,
				Attributes: map[string]interface{}{"tags": tags(title, description)},
			})
		}
		if len(done) > 0 {
			layer.CompleteUdfItems(ctx, udfID, &hevlayer.UdfCompleteRequest{
				WorkerID: "tag-products-0", Items: done,
			})
		}
		if len(failed) > 0 {
			layer.FailUdfItems(ctx, udfID, &hevlayer.UdfFailRequest{
				WorkerID: "tag-products-0", Items: failed,
			})
		}
	}
}
import { Hevlayer } from "hevlayer";

function tags(title: string, description: string): string[] {
  const text = `${title} ${description}`.toLowerCase();
  const out: string[] = [];
  if (text.includes("wireless")) out.push("wireless");
  if (text.includes("waterproof")) out.push("waterproof");
  return out.length ? out : ["uncategorized"];
}

const udfId = process.env.HEVLAYER_UDF_ID!;
const layer = new Hevlayer({
  baseUrl: process.env.HEVLAYER_BASE_URL,
  apiKey: process.env.LAYER_GATEWAY_API_KEY,
});

while (true) {
  const claimed = await layer.claimUdfItems(udfId, {
    worker_id: "tag-products-0",
    limit: 32,
  });
  const done = [];
  const failed = [];
  for (const item of claimed.items) {
    const title = typeof item.input.title === "string" ? item.input.title : "";
    const description = typeof item.input.description === "string" ? item.input.description : "";
    if (!title) {
      failed.push({
        namespace: item.namespace,
        id: item.id,
        kind: "permanent",
        message: "missing title",
      });
      continue;
    }
    done.push({
      namespace: item.namespace,
      id: item.id,
      attributes: { tags: tags(title, description) },
    });
  }
  if (done.length > 0) {
    await layer.completeUdfItems(udfId, { worker_id: "tag-products-0", items: done });
  }
  if (failed.length > 0) {
    await layer.failUdfItems(udfId, { worker_id: "tag-products-0", items: failed });
  }
}

In Python, function parameters are keyword-only and named to match inputs; raise TransientError for retryable work and PermanentError for unrecoverable input. In Go and TypeScript, report the same split through FailUdfItems / failUdfItems with kind: "transient" or kind: "permanent".

GPU classifier

More complicated classifiers (e.g. a vision-language classifier) may require a model to run on a GPU.

apiVersion: hevlayer.com/v1alpha1
kind: Function
metadata:
  name: product-color
  namespace: layer
spec:
  targetNamespaces:
    - amazon-products
  inputs:
    - id
    - image_url
  version: v1
  worker:
    image: ghcr.io/hev/hev-shop-udf-product-color:latest
    dispatch: pull
    computeClass: gpu
    batchSize: 8
    timeoutSeconds: 120
  schedule:
    leaseSeconds: 300
    maxInFlightBatches: 2
  triggers:
    - discovery
  scaling:
    pool: gpu
    mode: autoscale
    replicas:
      min: 0
      max: 2

worker.computeClass: gpu defaults omitted scaling.pool to the gpu pool from InfraRules/default. The stock pool selects layer.hev.dev/node-role=worker-gpu, requests one NVIDIA GPU, and carries the worker and NVIDIA tolerations:

computePools:
  - name: gpu
    kind: gpu
    maxReplicasPerWorkload: 4
    nodeSelector:
      layer.hev.dev/node-role: worker-gpu
      layer.hev.dev/compute: gpu
    tolerations:
      - key: layer.hev.dev/node-role
        operator: Equal
        value: worker-gpu
        effect: NoSchedule
      - key: nvidia.com/gpu
        operator: Exists
        effect: NoSchedule
    resources:
      requests: { memory: 4Gi, nvidia.com/gpu: "1" }
      limits: { memory: 10Gi, nvidia.com/gpu: "1" }

The worker loads the model once at startup and classifies per row. CLIP zero-shot classification labels each product image with its dominant color:

import asyncio
import io

import httpx
import torch
from PIL import Image
from transformers import pipeline

from hevlayer.udf import PermanentError, TransientError, run_udf_worker, udf

COLORS = ["black", "white", "gray", "red", "blue", "green", "brown", "multicolor"]

classifier = pipeline(
    "zero-shot-image-classification",
    model="openai/clip-vit-large-patch14",
    device="cuda" if torch.cuda.is_available() else "cpu",
)


@udf(inputs=["id", "image_url"], output="color", kind="classification")
def classify_color(*, id: str, image_url: str | None) -> str:
    if not image_url:
        raise PermanentError(f"{id}: missing image_url")
    try:
        resp = httpx.get(image_url, timeout=10.0, follow_redirects=True)
        resp.raise_for_status()
        image = Image.open(io.BytesIO(resp.content)).convert("RGB")
    except httpx.HTTPError as exc:
        raise TransientError(f"{id}: image fetch failed: {exc}") from exc
    except OSError as exc:
        raise PermanentError(f"{id}: undecodable image: {exc}") from exc

    scores = classifier(image, candidate_labels=COLORS)
    return scores[0]["label"]


if __name__ == "__main__":
    asyncio.run(run_udf_worker(classify_color, udf_id="product-color"))

The worker image needs torch, transformers, pillow, and httpx alongside the hevlayer Python client. Bake the model weights into the image so autoscaled pods do not re-download them on every cold start.

Sizing for inference: keep worker.batchSize low and worker.timeoutSeconds high enough for one batch of forward passes, and make schedule.leaseSeconds outlast a full batch so claims do not reissue mid-inference. replicas.min: 1 keeps a warm worker when model cold-start dominates; min: 0 scales to zero between sweeps.

Scaling

spec.scaling is the same scaling config Pipelines use: a pool from InfraRules/default, a mode, and replica bounds. For Functions, mode: autoscale emits a KEDA ScaledObject triggered by layer_udf_queue_depth. Replica maxima above the pool’s maxReplicasPerWorkload are rejected in status.

Writeback

Workers own data writes. The common single-attribute case uses the Python client’s sugar: @udf(output="tags") makes run_udf_worker send returned values as attributes.tags in the completion call — in Go (or over REST) the same thing is attributes on each completion item. The gateway applies those attributes and the reserved completion marker in one patch_columns write. Completion attributes must not use the reserved _hevlayer_* prefix.

Python workers that need more control can declare the tpuf parameter, write through the client, and return None; completion then stamps only the marker. Use deterministic IDs when a Function creates rows so at-least-once retries remain idempotent.

Deleting a Function garbage-collects operator-managed Kubernetes resources. It does not delete already-written attributes.

Lifecycle

kubectl get function product-tags
kubectl describe function product-tags

layer udf get product-tags

kubectl patch function product-tags --type=merge -p '{"spec":{"paused":true}}'
kubectl patch function product-tags --type=merge -p '{"spec":{"paused":false}}'

curl -X POST -H "authorization: Bearer $LAYER_GATEWAY_API_KEY" \
  $LAYER_GATEWAY_URL/v2/udfs/product-tags/reset-failed

kubectl delete function product-tags

Version markers

spec.version is the re-run safety rail and defaults to v1. On completion, the gateway stamps _hevlayer_udf_<function>_v with that version, normalizing hyphens in the Function name to underscores. For metadata.name: product-color, the marker is _hevlayer_udf_product_color_v.

Discovery automatically looks for rows whose marker is missing, differs from spec.version, or has an expired _hevlayer_udf_<function>_stale_after marker. Bump spec.version when a model, taxonomy, or prompt changes.

Tuning knobs

KnobWhat it bounds
worker.batchSizeRows per worker batch.
worker.timeoutSecondsWorker call timeout.
schedule.leaseSecondsHow long a claim is held before reissue.
schedule.discoveryIntervalSecondsTime between discovery scan jobs.
schedule.maxInFlightBatchesConcurrent worker batches per UDF.
schedule.maxConcurrentScansConcurrent namespace discovery jobs.
retry.maxAttemptsTries before a row lands in failed.
esc