Skip to main content

Documentation Index

Fetch the complete documentation index at: https://agentrail.app/docs/llms.txt

Use this file to discover all available pages before exploring further.

The Python AgentRailClient provides async methods for every stage of the AgentRail task lifecycle. All method names use snake_case and all response data is returned as validated Pydantic v2 models. You construct the client once and use it across your harness, typically as an async context manager.

Construct the client

import asyncio
import os

from agentrail import AgentRailClient, TaskStatus

async def main():
    api_key = os.environ["AGENTRAIL_API_KEY"]
    base_url = os.getenv("AGENTRAIL_BASE_URL", "http://127.0.0.1:3000")

    async with AgentRailClient(base_url=base_url, api_key=api_key) as client:
        tasks = await client.list_my_tasks(status=TaskStatus.TODO, limit=10)

        for task in tasks.data:
            print(f"{task.identifier}: {task.title}")
            print(f"Next actions: {', '.join(task.available_actions)}")

asyncio.run(main())
Python models expose snake_case attributes. The available_actions list tells you what operations are currently permitted on each task.

Submit, observe, and ship

The following example shows the complete lifecycle: submitting work, reading CI and review status, then shipping when all conditions are met.
import asyncio
import os

from agentrail import (
    AgentRailClient,
    PullRequestSubmitOptions,
    ShipEnvironment,
    ShipMode,
    TaskShipRequest,
    TaskStatus,
    TaskSubmitMode,
    TaskSubmitRequest,
)


async def main():
    async with AgentRailClient(
        base_url=os.getenv("AGENTRAIL_BASE_URL", "http://127.0.0.1:3000"),
        api_key=os.environ["AGENTRAIL_API_KEY"],
    ) as client:
        tasks = await client.list_my_tasks(status=TaskStatus.IN_PROGRESS, limit=1)
        if not tasks.data:
            return

        task = tasks.data[0]
        detail = await client.get_task(task.id)

        if "submit" in detail.data.available_actions:
            submission = await client.submit_task(
                task.id,
                TaskSubmitRequest(
                    summary="Implemented the task and pushed commits to the task branch.",
                    mode=TaskSubmitMode.ADAPTER_MANAGED,
                    pull_request=PullRequestSubmitOptions(
                        title=f"Submit {detail.data.identifier}",
                        draft=False,
                    ),
                ),
                idempotency_key=f"submit-{task.id}-v1",
            )
            print(f"PR: {submission.data.pr_url}")

        ci = await client.get_task_ci_status(task.id)
        review = await client.get_task_review_feedback(task.id)
        refreshed = await client.get_task(task.id)

        head_sha = (
            ci.data.head_sha
            or review.data.latest_decision.head_sha
            or refreshed.data.head_sha
        )

        if (
            "ship" in refreshed.data.available_actions
            and ci.data.overall_status.value == "passed"
            and review.data.latest_decision.outcome.value == "approved"
            and head_sha
        ):
            await client.ship_task(
                task.id,
                TaskShipRequest(
                    mode=ShipMode.MERGE_AND_DEPLOY,
                    target_environment=ShipEnvironment.PRODUCTION,
                    expected_head_sha=head_sha,
                ),
                idempotency_key=f"ship-{task.id}-{head_sha}",
            )


asyncio.run(main())
Always gate shipping on all four conditions: "ship" in available_actions, overall_status.value == "passed", outcome.value == "approved", and a resolved head_sha. Sending a stale SHA returns a 409.

Method reference

list_my_tasks

tasks = await client.list_my_tasks(status=TaskStatus.TODO, limit=10)
Pass a TaskStatus enum value to filter by status. Available values include TaskStatus.TODO, TaskStatus.IN_PROGRESS, and others.

get_task

detail = await client.get_task(task_id)
Returns a TaskDetailResponse with the full task object including available_actions, branch, head SHA, and current state.

submit_task

submission = await client.submit_task(
    task_id,
    TaskSubmitRequest(
        summary="Work description.",
        mode=TaskSubmitMode.ADAPTER_MANAGED,
        pull_request=PullRequestSubmitOptions(title="PR title", draft=False),
    ),
    idempotency_key="submit-tsk_123-v1",
)
Use TaskSubmitMode.ADAPTER_MANAGED for real provider automation. AgentRail creates or reuses the provider PR and returns the PR metadata in submission.data.pr_url.

get_task_ci_status

ci = await client.get_task_ci_status(task_id)
# ci.data.overall_status.value: "passed" | "failed" | "pending" | ...
# ci.data.head_sha: str | None

get_task_review_feedback

review = await client.get_task_review_feedback(task_id)
# review.data.latest_decision.outcome.value: "approved" | "changes_requested" | ...
# review.data.latest_decision.head_sha: str | None

ship_task

await client.ship_task(
    task_id,
    TaskShipRequest(
        mode=ShipMode.MERGE_AND_DEPLOY,
        target_environment=ShipEnvironment.PRODUCTION,
        expected_head_sha=head_sha,
    ),
    idempotency_key=f"ship-{task_id}-{head_sha}",
)
expected_head_sha acts as an optimistic lock. The ship is rejected if the branch has advanced since you read the SHA.

Idempotency keys

submit_task and ship_task both require an idempotency_key keyword argument. See authentication for key naming conventions.