See it in action on the hev-shop demo store.

API

Pipelines

The pipeline API keeps the code you need to index data simple and organized. A typical pipeline has two stages: extraction and chunking on CPU, followed by embedding on GPU. This guide walks through a best-practice layout for that pipeline; the concepts expand to N stages.

Document lifecycle

              put chunks             put vectors
  (new doc) ──────────► pending ──────────────► indexed

                           │ re-stage (idempotent)
  • pending — chunks stored, waiting for embedding.
  • indexed — vectors written to Turbopuffer.

embedding is a claim stage: documents sit in it only while leased to a worker, and recover to pending when a lease expires. Re-staging a document resets it to pending with new chunks, which is how you reprocess after source data changes.

File tree

indexer/
├── pipelines/
│   ├── extract-chunk.yaml   # CPU stage — Pipeline resource
│   └── embed.yaml           # GPU stage — Pipeline resource
├── extract_chunk.py         # read the source, stage chunks
├── embed.py                 # claim pending docs, write vectors
└── app.py                   # REST API: trigger a run, wait for completion

The two YAML files declare the worker images, pools, and scaling — see the Pipeline CRD for the fields. Both set pipelineId: products so the two workers share one queue. The rest of this page is the worker code — shown in Python and Go; every call is also a plain REST endpoint (see Write & Stage).

Extract and chunk

The CPU worker reads the source, splits text into chunks, and stages them. Staging chunks stores them durably (S3, cached in the document cache) and marks the document pending.

The worker hardcodes nothing: the operator injects the pipeline id, the gateway URL, and spec.sourceRef as environment variables — see the worker variables on the CRD page. The queue URL below comes from the sourceRef declared in pipelines/extract-chunk.yaml.

# extract_chunk.py
import asyncio
import json
import os

import boto3
from hevlayer import AsyncHevlayer

PIPELINE = os.environ["HEVLAYER_PIPELINE_ID"]
SOURCE = json.loads(os.environ["HEVLAYER_SOURCE_REF"])
sqs = boto3.client("sqs")


def chunks(text: str, size: int = 800) -> list[str]:
    return [text[i : i + size] for i in range(0, len(text), size)]


async def main() -> None:
    async with AsyncHevlayer(
        base_url=os.environ["HEVLAYER_BASE_URL"],
        api_key=os.environ.get("LAYER_GATEWAY_API_KEY"),
    ) as layer:
        while True:
            batch = sqs.receive_message(
                QueueUrl=SOURCE["queueUrl"], MaxNumberOfMessages=10,
            ).get("Messages", [])
            for m in batch:
                doc = json.loads(m["Body"])
                await layer.put_pipeline_document_chunks(PIPELINE, doc["id"], {
                    "chunks": [
                        {"id": f"{doc['id']}-{i}", "text": t}
                        for i, t in enumerate(chunks(doc["text"]))
                    ],
                })
                sqs.delete_message(QueueUrl=SOURCE["queueUrl"], ReceiptHandle=m["ReceiptHandle"])


asyncio.run(main())
// extract_chunk.go
package main

import (
	"context"
	"encoding/json"
	"fmt"
	"os"

	"github.com/aws/aws-sdk-go-v2/config"
	"github.com/aws/aws-sdk-go-v2/service/sqs"
	hevlayer "github.com/hev/layer/clients/go"
)

func chunks(text string, size int) []string {
	var out []string
	for i := 0; i < len(text); i += size {
		out = append(out, text[i:min(i+size, len(text))])
	}
	return out
}

func main() {
	ctx := context.Background()
	pipeline := os.Getenv("HEVLAYER_PIPELINE_ID")
	var source struct {
		QueueURL string `json:"queueUrl"`
	}
	json.Unmarshal([]byte(os.Getenv("HEVLAYER_SOURCE_REF")), &source)

	cfg, _ := config.LoadDefaultConfig(ctx)
	queue := sqs.NewFromConfig(cfg)
	layer := hevlayer.NewClient(
		hevlayer.WithBaseURL(os.Getenv("HEVLAYER_BASE_URL")),
		hevlayer.WithAPIKey(os.Getenv("LAYER_GATEWAY_API_KEY")),
	)

	for {
		batch, err := queue.ReceiveMessage(ctx, &sqs.ReceiveMessageInput{
			QueueUrl:            &source.QueueURL,
			MaxNumberOfMessages: 10,
		})
		if err != nil {
			continue
		}
		for _, m := range batch.Messages {
			var doc struct {
				ID   string `json:"id"`
				Text string `json:"text"`
			}
			json.Unmarshal([]byte(*m.Body), &doc)
			var staged []hevlayer.Chunk
			for i, t := range chunks(doc.Text, 800) {
				staged = append(staged, hevlayer.Chunk{ID: fmt.Sprintf("%s-%d", doc.ID, i), Text: t})
			}
			layer.PutPipelineDocumentChunks(ctx, pipeline, doc.ID,
				&hevlayer.PutChunksRequest{Chunks: staged})
			queue.DeleteMessage(ctx, &sqs.DeleteMessageInput{
				QueueUrl:      &source.QueueURL,
				ReceiptHandle: m.ReceiptHandle,
			})
		}
	}
}
// extract_chunk.ts
import {
  DeleteMessageCommand,
  ReceiveMessageCommand,
  SQSClient,
} from "@aws-sdk/client-sqs";
import { Hevlayer } from "hevlayer";

const PIPELINE = process.env.HEVLAYER_PIPELINE_ID!;
const SOURCE = JSON.parse(process.env.HEVLAYER_SOURCE_REF!);
const sqs = new SQSClient({});
const layer = new Hevlayer({
  baseUrl: process.env.HEVLAYER_BASE_URL,
  apiKey: process.env.LAYER_GATEWAY_API_KEY,
});

function chunks(text: string, size = 800): string[] {
  const out: string[] = [];
  for (let i = 0; i < text.length; i += size) out.push(text.slice(i, i + size));
  return out;
}

while (true) {
  const batch = await sqs.send(new ReceiveMessageCommand({
    QueueUrl: SOURCE.queueUrl,
    MaxNumberOfMessages: 10,
  }));
  for (const message of batch.Messages ?? []) {
    const doc = JSON.parse(message.Body ?? "{}");
    await layer.putPipelineDocumentChunks(PIPELINE, doc.id, {
      chunks: chunks(doc.text).map((text, i) => ({ id: `${doc.id}-${i}`, text })),
    });
    await sqs.send(new DeleteMessageCommand({
      QueueUrl: SOURCE.queueUrl,
      ReceiptHandle: message.ReceiptHandle,
    }));
  }
}

Embed

The GPU worker claims pending documents, reads their chunks back, and writes vectors. Writing vectors upserts to Turbopuffer and marks the document indexed. Claims are leased, so a worker that crashes loses nothing.

# embed.py
import asyncio
import os

from hevlayer import AsyncHevlayer
from sentence_transformers import SentenceTransformer

PIPELINE = os.environ["HEVLAYER_PIPELINE_ID"]
model = SentenceTransformer("all-MiniLM-L6-v2")


async def main() -> None:
    async with AsyncHevlayer(
        base_url=os.environ["HEVLAYER_BASE_URL"],
        api_key=os.environ.get("LAYER_GATEWAY_API_KEY"),
    ) as layer:
        while True:
            claimed = await layer.claim_documents(PIPELINE, {
                "stage": "pending",
                "claim_stage": "embedding",
                "limit": 16,
                "worker_id": "embed-0",
            })
            for doc_id in claimed.documents:
                doc_chunks = await layer.get_pipeline_document_chunks(PIPELINE, doc_id)
                vectors = model.encode([c.text for c in doc_chunks])
                await layer.put_pipeline_document_vectors(PIPELINE, doc_id, {
                    "vectors": [
                        {"id": c.id, "vector": v.tolist(), "attributes": {"text": c.text}}
                        for c, v in zip(doc_chunks, vectors)
                    ],
                })


asyncio.run(main())
// embed.go
package main

import (
	"context"
	"os"

	hevlayer "github.com/hev/layer/clients/go"
)

func main() {
	ctx := context.Background()
	pipeline := os.Getenv("HEVLAYER_PIPELINE_ID")
	layer := hevlayer.NewClient(
		hevlayer.WithBaseURL(os.Getenv("HEVLAYER_BASE_URL")),
		hevlayer.WithAPIKey(os.Getenv("LAYER_GATEWAY_API_KEY")),
	)

	for {
		claimed, err := layer.ClaimDocuments(ctx, pipeline, &hevlayer.ClaimDocumentsRequest{
			Stage:      "pending",
			ClaimStage: "embedding",
			Limit:      16,
			WorkerID:   "embed-0",
		})
		if err != nil {
			continue
		}
		for _, docID := range claimed.Documents {
			docChunks, err := layer.GetPipelineDocumentChunks(ctx, pipeline, docID)
			if err != nil {
				continue
			}
			texts := make([]string, len(*docChunks))
			for i, c := range *docChunks {
				texts[i] = c.Text
			}
			vectors := embed(texts) // your embedding model or service
			entries := make([]hevlayer.VectorEntry, len(*docChunks))
			for i, c := range *docChunks {
				entries[i] = hevlayer.VectorEntry{
					ID:         c.ID,
					Vector:     vectors[i],
					Attributes: map[string]interface{}{"text": c.Text},
				}
			}
			layer.PutPipelineDocumentVectors(ctx, pipeline, docID,
				&hevlayer.PutVectorsRequest{Vectors: entries})
		}
	}
}
// embed.ts
import { Hevlayer } from "hevlayer";

const PIPELINE = process.env.HEVLAYER_PIPELINE_ID!;
const layer = new Hevlayer({
  baseUrl: process.env.HEVLAYER_BASE_URL,
  apiKey: process.env.LAYER_GATEWAY_API_KEY,
});

while (true) {
  const claimed = await layer.claimDocuments(PIPELINE, {
    stage: "pending",
    claim_stage: "embedding",
    limit: 16,
    worker_id: "embed-0",
  });
  for (const docId of claimed.documents) {
    const docChunks = await layer.getPipelineDocumentChunks(PIPELINE, docId);
    const vectors = await embed(docChunks.map((chunk) => chunk.text));
    await layer.putPipelineDocumentVectors(PIPELINE, docId, {
      vectors: docChunks.map((chunk, i) => ({
        id: chunk.id,
        vector: vectors[i],
        attributes: { text: chunk.text },
      })),
    });
  }
}

Deploy

Build the two workers into the images your YAML references and push them to a registry your cluster can pull — Layer does not build images. Then apply the resources:

kubectl apply -f pipelines/

The operator creates one Deployment per resource and the embed pool’s KEDA object. Order doesn’t matter here: the app creates the gateway pipeline before it enqueues a batch (staging into a pipeline id that doesn’t exist returns 404), so workers never see a missing pipeline. Nothing else to wire: the CRD types themselves install with the Helm chart.

Trigger a run

The app exposes the pipeline to the rest of your system as one endpoint: POST /index-runs sends a batch to the source queue, then waits for the run to complete and returns the snapshot it produced. The pipeline is created on first use — this is where the target namespace is set in code.

# app.py
import asyncio
import json
import os
import time

import boto3
from fastapi import FastAPI
from hevlayer import AsyncHevlayer, HevlayerError

QUEUE = "https://sqs.us-east-1.amazonaws.com/123456789/product-updates"
sqs = boto3.client("sqs")
app = FastAPI()
layer = AsyncHevlayer(
    base_url=os.environ["HEVLAYER_BASE_URL"],
    api_key=os.environ.get("LAYER_GATEWAY_API_KEY"),
)


@app.post("/index-runs")
async def index_run(documents: list[dict]) -> dict:
    started_ms = int(time.time() * 1000)
    try:
        await layer.create_pipeline({"id": "products", "target_namespace": "products"})
    except HevlayerError as e:
        if e.status_code != 409:  # 409: already exists
            raise
    for doc in documents:
        sqs.send_message(QueueUrl=QUEUE, MessageBody=json.dumps(doc))
    await drain()
    sha = await next_snapshot(after_ms=started_ms)
    return {"documents": len(documents), "snapshot": sha}
// app.go
var (
	queueURL = "https://sqs.us-east-1.amazonaws.com/123456789/product-updates"
	queue    *sqs.Client // sqs.NewFromConfig in main
	layer    = hevlayer.NewClient(
		hevlayer.WithBaseURL(os.Getenv("HEVLAYER_BASE_URL")),
		hevlayer.WithAPIKey(os.Getenv("LAYER_GATEWAY_API_KEY")),
	)
)

func indexRun(w http.ResponseWriter, r *http.Request) {
	ctx := r.Context()
	startedMs := time.Now().UnixMilli()
	var documents []map[string]interface{}
	json.NewDecoder(r.Body).Decode(&documents)

	_, err := layer.CreatePipeline(ctx, &hevlayer.CreatePipelineRequest{
		ID:              "products",
		TargetNamespace: "products",
	})
	var herr *hevlayer.HevlayerError
	if err != nil && !(errors.As(err, &herr) && herr.StatusCode == 409) { // 409: already exists
		http.Error(w, err.Error(), http.StatusBadGateway)
		return
	}
	for _, doc := range documents {
		body, _ := json.Marshal(doc)
		mb := string(body)
		queue.SendMessage(ctx, &sqs.SendMessageInput{QueueUrl: &queueURL, MessageBody: &mb})
	}
	drain(ctx)
	sha := nextSnapshot(ctx, startedMs)
	json.NewEncoder(w).Encode(map[string]interface{}{
		"documents": len(documents),
		"snapshot":  sha,
	})
}
// app.ts
import { SendMessageCommand, SQSClient } from "@aws-sdk/client-sqs";
import { Hevlayer } from "hevlayer";

const queueUrl = "https://sqs.us-east-1.amazonaws.com/123456789/product-updates";
const queue = new SQSClient({});
const layer = new Hevlayer({
  baseUrl: process.env.HEVLAYER_BASE_URL,
  apiKey: process.env.LAYER_GATEWAY_API_KEY,
});

async function indexRun(documents: Record<string, unknown>[]) {
  const startedMs = Date.now();
  await layer.ensurePipeline({ id: "products", target_namespace: "products" });
  for (const doc of documents) {
    await queue.send(new SendMessageCommand({
      QueueUrl: queueUrl,
      MessageBody: JSON.stringify(doc),
    }));
  }
  await drain();
  return { documents: documents.length, snapshot: await nextSnapshot(startedMs) };
}

Wait for completion

A run is complete in two steps: the queue drains, then the consistency watcher observes the namespace stable and writes a snapshot past the run’s watermark. pending_count is the same signal KEDA scales on — when it reaches zero, the embed pool scales back to zero. The snapshot SHA addresses facet listings and counts exact at that watermark; flip your application to it.

# app.py
async def drain() -> None:
    while True:
        status = await layer.get_pipeline_status("products")
        if status.pending_count == 0:  # status.counts: {"pending": 0, "indexed": 8530}
            return
        await asyncio.sleep(10)


async def next_snapshot(after_ms: int) -> str:
    while True:
        history = await layer.list_namespace_history("products", limit=1)
        if history and history[0].watermark_ms >= after_ms:
            return history[0].sha
        await asyncio.sleep(30)
// app.go
func drain(ctx context.Context) {
	for {
		status, err := layer.GetPipelineStatus(ctx, "products")
		if err == nil && status.PendingCount == 0 { // status.Counts: {"pending": 0, "indexed": 8530}
			return
		}
		time.Sleep(10 * time.Second)
	}
}

func nextSnapshot(ctx context.Context, afterMs int64) string {
	for {
		history, err := layer.ListNamespaceHistory(ctx, "products",
			&hevlayer.ListNamespaceHistoryParams{Limit: 1})
		if err == nil && len(history) > 0 && history[0].WatermarkMs >= afterMs {
			return history[0].Sha
		}
		time.Sleep(30 * time.Second)
	}
}
// app.ts
const sleep = (ms: number) => new Promise((resolve) => setTimeout(resolve, ms));

async function drain() {
  while (true) {
    const status = await layer.getPipelineStatus("products");
    if (status.pending_count === 0) return;
    await sleep(10_000);
  }
}

async function nextSnapshot(afterMs: number): Promise<string> {
  while (true) {
    const history = await layer.listNamespaceHistory("products", { limit: 1 });
    if (history.length > 0 && history[0].watermark_ms >= afterMs) {
      return history[0].sha;
    }
    await sleep(30_000);
  }
}

Once vectors are indexed, query and fetch them through the namespace API — see Query & Fetch.

Failure model

  • Turbopuffer write failures are hard: the vectors route returns 502 and the document stays in embedding for re-claim.
  • Aerospike cache failures do not block chunk reads when S3 backing is present; PostgreSQL connectivity failures return 500 and should be retried with backoff. The stop-writes recovery path and the metrics to watch live in the failure-mode runbook.
  • Lease expiry is handled server-side. A worker that crashes mid-embedding has its documents recovered on the next claim sweep.
esc