Skip to main content

Documentation Index

Fetch the complete documentation index at: https://docs.getmuster.io/llms.txt

Use this file to discover all available pages before exploring further.

How it works

The Bedrock connector:
  • Discovers all Bedrock agents via boto3.list_agents()
  • Streams invocations by reading InvokeAgent events from CloudTrail
Elitery deploys and manages the connector inside your AWS environment or customer cluster.

What Elitery needs from you

IAM permissions — Elitery requires a read-only IAM role with:
{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Action": [
        "bedrock:ListAgents",
        "bedrock:GetAgent",
        "bedrock:ListTagsForResource"
      ],
      "Resource": "*"
    },
    {
      "Effect": "Allow",
      "Action": [
        "logs:FilterLogEvents",
        "logs:GetLogEvents"
      ],
      "Resource": "arn:aws:logs:*:*:log-group:<your-cloudtrail-log-group>:*"
    }
  ]
}
CloudTrail setup — ensure CloudTrail is logging to CloudWatch Logs with Bedrock API calls enabled. Provide Elitery with:
AWS_REGION=ap-southeast-1
BEDROCK_CLOUDTRAIL_LOG_GROUP=/aws/cloudtrail/your-trail
For Kubernetes deployments, Elitery uses IAM Roles for Service Accounts (IRSA) — no AWS keys are stored directly.

What your developers do

Nothing. Bedrock agents are fully managed — no code changes required. For precise checks on critical Bedrock agents, add the SDK or a plain HTTP quality signal after invoking the agent:
import httpx, threading, uuid

def invoke_bedrock_agent(agent_id, session_id, input_text):
    job_id = str(uuid.uuid4())
    
    response = bedrock_agent_runtime.invoke_agent(
        agentId=agent_id,
        sessionId=session_id,
        inputText=input_text,
    )
    output = "".join(chunk["chunk"]["bytes"].decode()
                     for chunk in response["completion"]
                     if "chunk" in chunk)
    
    # Optional: add precise checks
    threading.Thread(target=lambda: httpx.post(
        f"{MUSTER_URL}/api/v1/jobs/{job_id}/quality",
        json={"agent_id": "bedrock-" + agent_id, "checks": [
            {"check_id": "output_not_empty", "severity": "HIGH", "passed": bool(output)},
        ]},
        timeout=2.0,
    ), daemon=True).start()
    
    return output