Operations
Function CRD
The Function CRD is a User Defined Function (UDF) that runs over rows
that already exist in an Index. It is the
right shape for classifiers, enrichment, backfills, fan-out from an
existing row, and deterministic re-upserts. UDFs are best defined in
YAML and invoked by the layer CLI. The
operator creates worker resources; the gateway owns discovery, queueing,
retries, leases, and completion markers. Workers own their data writes.
Use a Pipeline when external data becomes rows in Layer. Use a Function when compute starts from rows that are already in Layer.
apiVersion: hevlayer.com/v1alpha1
kind: Function
metadata:
name: tag-products
namespace: layer
spec:
targetNamespaces:
- products
inputs:
- id
- title
version: v1
filter:
- category
- Eq
- outdoor
worker:
image: ghcr.io/hev/tag-products:latest
dispatch: pull
computeClass: cpu
batchSize: 32
timeoutSeconds: 30
schedule:
discoveryIntervalSeconds: 300
leaseSeconds: 120
maxInFlightBatches: 8
maxConcurrentScans: 1
retry:
maxAttempts: 8
initialBackoffSeconds: 5
maxBackoffSeconds: 300
triggers:
- discovery
scaling:
pool: cpu
mode: autoscale
replicas:
min: 0
max: 6
Selection
Use targetNamespaces for explicit namespaces. Use indexSelector when
labels on Index resources should choose the namespaces.
filter preserves arbitrary JSON, including array-form Turbopuffer
filters. The operator stores the shape as-is; the gateway evaluates it
during discovery after AND-ing it with the generated completion-marker
predicate. Do not include a version-marker predicate in filter; the
gateway creates that from spec.version.
Worker
| Field | Purpose |
|---|---|
image | Worker image. |
dispatch | pull for SDK claim/poll workers, push for HTTP /run workers. |
computeClass | cpu or gpu. Defaults to cpu; when scaling.pool is omitted, the operator maps this to the stock cpu or gpu pool. |
port | Push-dispatch service port. |
batchSize | Rows per batch. |
timeoutSeconds | Worker call timeout. |
podSpec | Optional pod-level merge patch. |
To apply the CR, register the gateway UDF, trigger discovery, and watch
the queue with one command, use layer run -f.
The worker pod receives HEVLAYER_UDF_ID, HEVLAYER_BASE_URL,
HEVLAYER_UDF_BATCH_SIZE, HEVLAYER_UDF_TIMEOUT_SECONDS,
HEVLAYER_UDF_LEASE_SECONDS, and LAYER_GATEWAY_API_KEY. The gateway
bearer is sourced from the default VectorStore credential in
deriveFromStore mode, or from the configured inbound worker key in
keys mode.
Simple classifier
The Python client turns a normal function into the claim/process/complete
loop. output="tags" is client-side metadata: the CRD does not declare an
output attribute. run_udf_worker sends the returned value as a
completion attributes.tags patch, and the gateway stamps the reserved
completion marker in the same patch. The Go client drives the same
worker protocol directly, as does the TypeScript client — claim a batch,
process rows, report completions and failures.
import asyncio
from hevlayer.udf import PermanentError, TransientError, run_udf_worker, udf
@udf(inputs=["id", "title", "description"], output="tags", kind="tags")
def tag_product(*, id: str, title: str | None, description: str | None) -> list[str]:
if not title:
raise PermanentError(f"{id}: missing title")
try:
text = f"{title} {description or ''}".lower()
except TypeError as exc:
raise TransientError(str(exc)) from exc
tags: list[str] = []
if "wireless" in text:
tags.append("wireless")
if "waterproof" in text:
tags.append("waterproof")
return tags or ["uncategorized"]
if __name__ == "__main__":
asyncio.run(run_udf_worker(tag_product, udf_id="product-tags"))package main
import (
"context"
"os"
"strings"
hevlayer "github.com/hev/layer/clients/go"
)
func tags(title, description string) []string {
text := strings.ToLower(title + " " + description)
var out []string
if strings.Contains(text, "wireless") {
out = append(out, "wireless")
}
if strings.Contains(text, "waterproof") {
out = append(out, "waterproof")
}
if len(out) == 0 {
out = []string{"uncategorized"}
}
return out
}
func main() {
ctx := context.Background()
udfID := os.Getenv("HEVLAYER_UDF_ID")
layer := hevlayer.NewClient(
hevlayer.WithBaseURL(os.Getenv("HEVLAYER_BASE_URL")),
hevlayer.WithAPIKey(os.Getenv("LAYER_GATEWAY_API_KEY")),
)
for {
claimed, err := layer.ClaimUdfItems(ctx, udfID, &hevlayer.UdfClaimRequest{
WorkerID: "tag-products-0",
Limit: 32,
})
if err != nil {
continue
}
var done []hevlayer.UdfCompleteItem
var failed []hevlayer.UdfFailItem
for _, item := range claimed.Items {
title, _ := item.Input["title"].(string)
description, _ := item.Input["description"].(string)
if title == "" {
failed = append(failed, hevlayer.UdfFailItem{
Namespace: item.Namespace, ID: item.ID,
Kind: "permanent", Message: "missing title",
})
continue
}
done = append(done, hevlayer.UdfCompleteItem{
Namespace: item.Namespace,
ID: item.ID,
Attributes: map[string]interface{}{"tags": tags(title, description)},
})
}
if len(done) > 0 {
layer.CompleteUdfItems(ctx, udfID, &hevlayer.UdfCompleteRequest{
WorkerID: "tag-products-0", Items: done,
})
}
if len(failed) > 0 {
layer.FailUdfItems(ctx, udfID, &hevlayer.UdfFailRequest{
WorkerID: "tag-products-0", Items: failed,
})
}
}
}import { Hevlayer } from "hevlayer";
function tags(title: string, description: string): string[] {
const text = `${title} ${description}`.toLowerCase();
const out: string[] = [];
if (text.includes("wireless")) out.push("wireless");
if (text.includes("waterproof")) out.push("waterproof");
return out.length ? out : ["uncategorized"];
}
const udfId = process.env.HEVLAYER_UDF_ID!;
const layer = new Hevlayer({
baseUrl: process.env.HEVLAYER_BASE_URL,
apiKey: process.env.LAYER_GATEWAY_API_KEY,
});
while (true) {
const claimed = await layer.claimUdfItems(udfId, {
worker_id: "tag-products-0",
limit: 32,
});
const done = [];
const failed = [];
for (const item of claimed.items) {
const title = typeof item.input.title === "string" ? item.input.title : "";
const description = typeof item.input.description === "string" ? item.input.description : "";
if (!title) {
failed.push({
namespace: item.namespace,
id: item.id,
kind: "permanent",
message: "missing title",
});
continue;
}
done.push({
namespace: item.namespace,
id: item.id,
attributes: { tags: tags(title, description) },
});
}
if (done.length > 0) {
await layer.completeUdfItems(udfId, { worker_id: "tag-products-0", items: done });
}
if (failed.length > 0) {
await layer.failUdfItems(udfId, { worker_id: "tag-products-0", items: failed });
}
} In Python, function parameters are keyword-only and named to match
inputs; raise TransientError for retryable work and PermanentError
for unrecoverable input. In Go and TypeScript, report the same split
through FailUdfItems / failUdfItems with kind: "transient" or
kind: "permanent".
GPU classifier
More complicated classifiers (e.g. a vision-language classifier) may require a model to run on a GPU.
apiVersion: hevlayer.com/v1alpha1
kind: Function
metadata:
name: product-color
namespace: layer
spec:
targetNamespaces:
- amazon-products
inputs:
- id
- image_url
version: v1
worker:
image: ghcr.io/hev/hev-shop-udf-product-color:latest
dispatch: pull
computeClass: gpu
batchSize: 8
timeoutSeconds: 120
schedule:
leaseSeconds: 300
maxInFlightBatches: 2
triggers:
- discovery
scaling:
pool: gpu
mode: autoscale
replicas:
min: 0
max: 2
worker.computeClass: gpu defaults omitted scaling.pool to the
gpu pool from InfraRules/default.
The stock pool selects layer.hev.dev/node-role=worker-gpu, requests
one NVIDIA GPU, and carries the worker and NVIDIA tolerations:
computePools:
- name: gpu
kind: gpu
maxReplicasPerWorkload: 4
nodeSelector:
layer.hev.dev/node-role: worker-gpu
layer.hev.dev/compute: gpu
tolerations:
- key: layer.hev.dev/node-role
operator: Equal
value: worker-gpu
effect: NoSchedule
- key: nvidia.com/gpu
operator: Exists
effect: NoSchedule
resources:
requests: { memory: 4Gi, nvidia.com/gpu: "1" }
limits: { memory: 10Gi, nvidia.com/gpu: "1" }
The worker loads the model once at startup and classifies per row. CLIP zero-shot classification labels each product image with its dominant color:
import asyncio
import io
import httpx
import torch
from PIL import Image
from transformers import pipeline
from hevlayer.udf import PermanentError, TransientError, run_udf_worker, udf
COLORS = ["black", "white", "gray", "red", "blue", "green", "brown", "multicolor"]
classifier = pipeline(
"zero-shot-image-classification",
model="openai/clip-vit-large-patch14",
device="cuda" if torch.cuda.is_available() else "cpu",
)
@udf(inputs=["id", "image_url"], output="color", kind="classification")
def classify_color(*, id: str, image_url: str | None) -> str:
if not image_url:
raise PermanentError(f"{id}: missing image_url")
try:
resp = httpx.get(image_url, timeout=10.0, follow_redirects=True)
resp.raise_for_status()
image = Image.open(io.BytesIO(resp.content)).convert("RGB")
except httpx.HTTPError as exc:
raise TransientError(f"{id}: image fetch failed: {exc}") from exc
except OSError as exc:
raise PermanentError(f"{id}: undecodable image: {exc}") from exc
scores = classifier(image, candidate_labels=COLORS)
return scores[0]["label"]
if __name__ == "__main__":
asyncio.run(run_udf_worker(classify_color, udf_id="product-color"))
The worker image needs torch, transformers, pillow, and httpx
alongside the hevlayer Python client. Bake the model weights into the
image so
autoscaled pods do not re-download them on every cold start.
Sizing for inference: keep worker.batchSize low and
worker.timeoutSeconds high enough for one batch of forward passes,
and make schedule.leaseSeconds outlast a full batch so claims do not
reissue mid-inference. replicas.min: 1 keeps a warm worker when model
cold-start dominates; min: 0 scales to zero between sweeps.
Scaling
spec.scaling is the same scaling config
Pipelines use: a pool from
InfraRules/default, a mode, and replica bounds. For Functions,
mode: autoscale emits a KEDA ScaledObject triggered by
layer_udf_queue_depth. Replica maxima above the pool’s
maxReplicasPerWorkload are rejected in status.
Writeback
Workers own data writes. The common single-attribute case uses the
Python client’s sugar: @udf(output="tags") makes run_udf_worker send
returned values as attributes.tags in the completion call — in Go (or
over REST) the same thing is attributes on each completion item. The
gateway applies those attributes and the reserved completion marker in
one patch_columns write. Completion attributes must not use the
reserved _hevlayer_* prefix.
Python workers that need more control can declare the tpuf parameter,
write through the client, and return None; completion then stamps only
the marker. Use deterministic IDs when a Function creates rows so
at-least-once retries remain idempotent.
Deleting a Function garbage-collects operator-managed Kubernetes resources. It does not delete already-written attributes.
Lifecycle
kubectl get function product-tags
kubectl describe function product-tags
layer udf get product-tags
kubectl patch function product-tags --type=merge -p '{"spec":{"paused":true}}'
kubectl patch function product-tags --type=merge -p '{"spec":{"paused":false}}'
curl -X POST -H "authorization: Bearer $LAYER_GATEWAY_API_KEY" \
$LAYER_GATEWAY_URL/v2/udfs/product-tags/reset-failed
kubectl delete function product-tags
Version markers
spec.version is the re-run safety rail and defaults to v1. On
completion, the gateway stamps _hevlayer_udf_<function>_v with that
version, normalizing hyphens in the Function name to underscores. For
metadata.name: product-color, the marker is
_hevlayer_udf_product_color_v.
Discovery automatically looks for rows whose marker is missing, differs
from spec.version, or has an expired
_hevlayer_udf_<function>_stale_after marker. Bump spec.version when
a model, taxonomy, or prompt changes.
Tuning knobs
| Knob | What it bounds |
|---|---|
worker.batchSize | Rows per worker batch. |
worker.timeoutSeconds | Worker call timeout. |
schedule.leaseSeconds | How long a claim is held before reissue. |
schedule.discoveryIntervalSeconds | Time between discovery scan jobs. |
schedule.maxInFlightBatches | Concurrent worker batches per UDF. |
schedule.maxConcurrentScans | Concurrent namespace discovery jobs. |
retry.maxAttempts | Tries before a row lands in failed. |