Skip to main content

Installation

pip install muster-sdk langgraph

Integration pattern

The recommended pattern is to emit quality signals at the end of your graph’s final node.
from langgraph.graph import StateGraph, END
from muster_sdk import beacon, quality
import uuid

beacon.register(agent_id="invoice-processor-v2", version="2.1.0")

def final_node(state: dict) -> dict:
    """Final node — validates output and emits to Muster."""
    output = state.get("output", {})
    job_id = state.get("job_id", str(uuid.uuid4()))

    subtotal_ok = abs(
        sum(item["amount"] for item in output.get("line_items", [])) -
        output.get("subtotal", 0)
    ) < 0.01

    quality.emit(
        job_id=job_id,
        overall_passed=bool(output) and subtotal_ok,
        token_input=state.get("token_input", 0),
        token_output=state.get("token_output", 0),
        model=state.get("model", "gpt-4o"),
        checks=[
            quality.Check("output_not_empty",       "HIGH", bool(output)),
            quality.Check("subtotal_arithmetic",    "HIGH", subtotal_ok,
                          expected=str(sum(i["amount"] for i in output.get("line_items", []))),
                          actual=str(output.get("subtotal", 0))),
            quality.Check("required_fields_present","HIGH",
                          all(k in output for k in ["vendor", "total", "date"])),
        ]
    )
    return state

# Build graph
workflow = StateGraph(dict)
workflow.add_node("extract", extract_node)
workflow.add_node("validate", validate_node)
workflow.add_node("final", final_node)

workflow.add_edge("extract", "validate")
workflow.add_edge("validate", "final")
workflow.add_edge("final", END)

graph = workflow.compile()

Tracking token usage across nodes

To accurately track tokens across a multi-node graph, accumulate in state:
def extract_node(state: dict) -> dict:
    with get_openai_callback() as cb:
        result = extraction_chain.invoke(state["input"])
    return {
        **state,
        "output": result,
        "token_input": state.get("token_input", 0) + cb.prompt_tokens,
        "token_output": state.get("token_output", 0) + cb.completion_tokens,
    }