Documentation Index
Fetch the complete documentation index at: https://agentrail.app/docs/llms.txt
Use this file to discover all available pages before exploring further.
The Python AgentRailClient provides async methods for every stage of the AgentRail task lifecycle. All method names use snake_case and all response data is returned as validated Pydantic v2 models. You construct the client once and use it across your harness, typically as an async context manager.
Construct the client
import asyncio
import os
from agentrail import AgentRailClient, TaskStatus
async def main():
api_key = os.environ["AGENTRAIL_API_KEY"]
base_url = os.getenv("AGENTRAIL_BASE_URL", "http://127.0.0.1:3000")
async with AgentRailClient(base_url=base_url, api_key=api_key) as client:
tasks = await client.list_my_tasks(status=TaskStatus.TODO, limit=10)
for task in tasks.data:
print(f"{task.identifier}: {task.title}")
print(f"Next actions: {', '.join(task.available_actions)}")
asyncio.run(main())
Python models expose snake_case attributes. The available_actions list tells you what operations are currently permitted on each task.
Submit, observe, and ship
The following example shows the complete lifecycle: submitting work, reading CI and review status, then shipping when all conditions are met.
import asyncio
import os
from agentrail import (
AgentRailClient,
PullRequestSubmitOptions,
ShipEnvironment,
ShipMode,
TaskShipRequest,
TaskStatus,
TaskSubmitMode,
TaskSubmitRequest,
)
async def main():
async with AgentRailClient(
base_url=os.getenv("AGENTRAIL_BASE_URL", "http://127.0.0.1:3000"),
api_key=os.environ["AGENTRAIL_API_KEY"],
) as client:
tasks = await client.list_my_tasks(status=TaskStatus.IN_PROGRESS, limit=1)
if not tasks.data:
return
task = tasks.data[0]
detail = await client.get_task(task.id)
if "submit" in detail.data.available_actions:
submission = await client.submit_task(
task.id,
TaskSubmitRequest(
summary="Implemented the task and pushed commits to the task branch.",
mode=TaskSubmitMode.ADAPTER_MANAGED,
pull_request=PullRequestSubmitOptions(
title=f"Submit {detail.data.identifier}",
draft=False,
),
),
idempotency_key=f"submit-{task.id}-v1",
)
print(f"PR: {submission.data.pr_url}")
ci = await client.get_task_ci_status(task.id)
review = await client.get_task_review_feedback(task.id)
refreshed = await client.get_task(task.id)
head_sha = (
ci.data.head_sha
or review.data.latest_decision.head_sha
or refreshed.data.head_sha
)
if (
"ship" in refreshed.data.available_actions
and ci.data.overall_status.value == "passed"
and review.data.latest_decision.outcome.value == "approved"
and head_sha
):
await client.ship_task(
task.id,
TaskShipRequest(
mode=ShipMode.MERGE_AND_DEPLOY,
target_environment=ShipEnvironment.PRODUCTION,
expected_head_sha=head_sha,
),
idempotency_key=f"ship-{task.id}-{head_sha}",
)
asyncio.run(main())
Always gate shipping on all four conditions: "ship" in available_actions, overall_status.value == "passed", outcome.value == "approved", and a resolved head_sha. Sending a stale SHA returns a 409.
Method reference
list_my_tasks
tasks = await client.list_my_tasks(status=TaskStatus.TODO, limit=10)
Pass a TaskStatus enum value to filter by status. Available values include TaskStatus.TODO, TaskStatus.IN_PROGRESS, and others.
get_task
detail = await client.get_task(task_id)
Returns a TaskDetailResponse with the full task object including available_actions, branch, head SHA, and current state.
submit_task
submission = await client.submit_task(
task_id,
TaskSubmitRequest(
summary="Work description.",
mode=TaskSubmitMode.ADAPTER_MANAGED,
pull_request=PullRequestSubmitOptions(title="PR title", draft=False),
),
idempotency_key="submit-tsk_123-v1",
)
Use TaskSubmitMode.ADAPTER_MANAGED for real provider automation. AgentRail creates or reuses the provider PR and returns the PR metadata in submission.data.pr_url.
get_task_ci_status
ci = await client.get_task_ci_status(task_id)
# ci.data.overall_status.value: "passed" | "failed" | "pending" | ...
# ci.data.head_sha: str | None
get_task_review_feedback
review = await client.get_task_review_feedback(task_id)
# review.data.latest_decision.outcome.value: "approved" | "changes_requested" | ...
# review.data.latest_decision.head_sha: str | None
ship_task
await client.ship_task(
task_id,
TaskShipRequest(
mode=ShipMode.MERGE_AND_DEPLOY,
target_environment=ShipEnvironment.PRODUCTION,
expected_head_sha=head_sha,
),
idempotency_key=f"ship-{task_id}-{head_sha}",
)
expected_head_sha acts as an optimistic lock. The ship is rejected if the branch has advanced since you read the SHA.
Idempotency keys
submit_task and ship_task both require an idempotency_key keyword argument. See authentication for key naming conventions.