Concurrent calls with AsyncPareta
AsyncPareta lets you fire many requests at once instead of one at a time. When
you have a batch of inference prompts to score, or several eval runs to kick off,
running them concurrently turns a wall of sequential round-trips into a single
asyncio.gather. The same surface as the sync Pareta
client, with every resource method async def and the streaming iterators
async.
This page shows how to:
- run a batch of
chat.completionsconcurrently and collect results - bound concurrency so you do not hammer an endpoint (backpressure)
- handle errors per task so one failure does not sink the batch
- launch and await several eval runs at once
One AsyncPareta instance wraps a single pooled httpx.AsyncClient. Build it
once, share it across all your coroutines, and close it once. Do not make a
client per request.
Python
import asyncio
from pareta import AsyncPareta
async def main():
async with AsyncPareta.from_env() as pa: # reads PARETA_API_KEY
resp = await pa.chat.completions.create(
model="ep_invoice_extract",
messages=[{"role": "user", "content": "Extract the total."}],
)
print(resp.choices[0].message.content)
asyncio.run(main())
TypeScript
The TS SDK has no sync/async split — there is one Pareta class, and every
I/O method already returns a Promise you await. No AsyncPareta, no
asyncio.run, no async with: build one client and share it.
import { Pareta } from "pareta";
const pa = Pareta.fromEnv(); // reads PARETA_API_KEY
const resp = await pa.chat.completions.create({
model: "ep_invoice_extract",
messages: [{ role: "user", content: "Extract the total." }],
});
console.log(resp.choices[0].message.content);
Inference is OpenAI-compatible and metered: each successful completion debits
your org balance, and a zero balance raises InsufficientCreditsError (402).
Top-up is browser-only, so the SDK never exposes balance or payment. model is
an endpoint id from endpoints.deploy() (or any model
id your org can reach); Pareta hides the hardware, so there is no GPU knob to
pass.
Fan out a batch of completions
asyncio.gather runs every coroutine concurrently and returns results in input
order. Because all calls share the same client, httpx pools and reuses
connections for you.
Python
import asyncio
from pareta import AsyncPareta
PROMPTS = [
"Extract the invoice total.",
"Extract the vendor name.",
"Extract the due date.",
"Extract the line-item count.",
]
async def classify_one(pa, prompt, document):
resp = await pa.chat.completions.create(
model="ep_invoice_extract",
messages=[
{"role": "system", "content": "You are an invoice parser."},
{"role": "user", "content": f"{prompt}\n\n{document}"},
],
temperature=0,
max_tokens=64,
)
return resp.choices[0].message.content
async def main():
document = "INVOICE #4471 ... TOTAL $1,240.00 ..."
async with AsyncPareta.from_env() as pa:
answers = await asyncio.gather(
*(classify_one(pa, p, document) for p in PROMPTS)
)
for prompt, answer in zip(PROMPTS, answers):
print(f"{prompt} -> {answer}")
asyncio.run(main())
TypeScript
Promise.all is the direct analog of asyncio.gather: it runs every promise
concurrently and resolves to results in input order. The shared fetch keep-alive
pool reuses connections for you.
import { Pareta } from "pareta";
const PROMPTS = [
"Extract the invoice total.",
"Extract the vendor name.",
"Extract the due date.",
"Extract the line-item count.",
];
async function classifyOne(pa: Pareta, prompt: string, document: string) {
const resp = await pa.chat.completions.create({
model: "ep_invoice_extract",
messages: [
{ role: "system", content: "You are an invoice parser." },
{ role: "user", content: `${prompt}\n\n${document}` },
],
temperature: 0,
max_tokens: 64,
});
return resp.choices[0].message.content;
}
const document = "INVOICE #4471 ... TOTAL $1,240.00 ...";
const pa = Pareta.fromEnv();
const answers = await Promise.all(
PROMPTS.map((p) => classifyOne(pa, p, document)),
);
for (let i = 0; i < PROMPTS.length; i++) {
console.log(`${PROMPTS[i]} -> ${answers[i]}`);
}
If any coroutine raises, gather propagates the first exception and the rest are
cancelled. That is rarely what you want for a batch. The next two sections fix
both halves of the problem: capacity (backpressure) and partial failure.
Bound concurrency with a semaphore
Firing 5,000 prompts at gather opens as many tasks at once, overruns the
connection pool, and is the fastest way to earn a RateLimitError (429) or push
a cold endpoint into EndpointNotReadyError (503). An asyncio.Semaphore caps
how many calls are in flight at any moment. The rest queue and drain as slots
free up.
Python
import asyncio
from pareta import AsyncPareta
MAX_IN_FLIGHT = 16
async def complete(pa, sem, messages):
async with sem: # acquire a slot; release on exit
resp = await pa.chat.completions.create(
model="ep_invoice_extract",
messages=messages,
temperature=0,
)
return resp.choices[0].message.content
async def run_batch(documents):
sem = asyncio.Semaphore(MAX_IN_FLIGHT)
async with AsyncPareta.from_env() as pa:
tasks = [
complete(pa, sem, [{"role": "user", "content": f"Summarize:\n{d}"}])
for d in documents
]
return await asyncio.gather(*tasks)
# 1,000 docs, but never more than 16 concurrent requests
results = asyncio.run(run_batch([f"doc {i}" for i in range(1000)]))
print(len(results))
TypeScript
JS has no built-in asyncio.Semaphore, so a small worker-pool does the same job:
spin up MAX_IN_FLIGHT workers that each pull from a shared cursor until the
queue drains. That caps in-flight calls without pulling in a dependency.
import { Pareta } from "pareta";
const MAX_IN_FLIGHT = 16;
async function complete(pa: Pareta, messages: Array<{ role: string; content: string }>) {
const resp = await pa.chat.completions.create({
model: "ep_invoice_extract",
messages,
temperature: 0,
});
return resp.choices[0].message.content;
}
async function runBatch(documents: string[]): Promise<Array<string | null>> {
const pa = Pareta.fromEnv();
const results: Array<string | null> = new Array(documents.length);
let cursor = 0;
// N workers drain a shared cursor → never more than N requests in flight.
const worker = async () => {
for (let i = cursor++; i < documents.length; i = cursor++) {
results[i] = await complete(pa, [
{ role: "user", content: `Summarize:\n${documents[i]}` },
]);
}
};
await Promise.all(
Array.from({ length: Math.min(MAX_IN_FLIGHT, documents.length) }, worker),
);
return results;
}
// 1,000 docs, but never more than 16 concurrent requests
const docs = Array.from({ length: 1000 }, (_, i) => `doc ${i}`);
const results = await runBatch(docs);
console.log(results.length);
Pick MAX_IN_FLIGHT to match what the endpoint can sustain. 8 to 32 is a sane
starting band; tune it against the endpoint's latency from
endpoints.metrics(). The SDK already retries 429,
503, and 5xx with exponential backoff (max_retries, default 2), so the
semaphore is your first line of defense and retries are the backstop.
Handle errors per task
Pass return_exceptions=True to gather and every coroutine resolves to either
its result or the exception it raised, in order. The batch always completes; you
decide what to do with the failures. This is the right default for fan-out work.
Python
import asyncio
from pareta import (
AsyncPareta,
ParetaError,
InsufficientCreditsError,
EndpointNotReadyError,
RateLimitError,
APITimeoutError,
)
MAX_IN_FLIGHT = 16
async def complete(pa, sem, doc):
async with sem:
resp = await pa.chat.completions.create(
model="ep_invoice_extract",
messages=[{"role": "user", "content": f"Extract the total from:\n{doc}"}],
temperature=0,
)
return resp.choices[0].message.content
async def main(documents):
sem = asyncio.Semaphore(MAX_IN_FLIGHT)
async with AsyncPareta.from_env() as pa:
outcomes = await asyncio.gather(
*(complete(pa, sem, d) for d in documents),
return_exceptions=True,
)
ok, failed = [], []
for doc, outcome in zip(documents, outcomes):
if isinstance(outcome, InsufficientCreditsError):
# Org balance hit zero mid-batch. Nothing else will succeed —
# stop and top up in the dashboard.
raise outcome
if isinstance(outcome, BaseException):
failed.append((doc, outcome))
else:
ok.append((doc, outcome))
print(f"{len(ok)} succeeded, {len(failed)} failed")
for doc, err in failed:
if isinstance(err, EndpointNotReadyError):
reason = "endpoint cold/stopped" # 503
elif isinstance(err, RateLimitError):
reason = "rate limited after retries" # 429
elif isinstance(err, APITimeoutError):
reason = "timed out"
elif isinstance(err, ParetaError):
reason = str(err)
else:
reason = repr(err)
print(f" retry {doc!r}: {reason}")
return ok, failed
asyncio.run(main([f"doc {i}" for i in range(50)]))
TypeScript
Promise.allSettled is the analog of gather(..., return_exceptions=True): it
never short-circuits, and every entry resolves to {status:"fulfilled", value}
or {status:"rejected", reason}. Switch on the error class with instanceof.
import {
Pareta,
ParetaError,
InsufficientCreditsError,
EndpointNotReadyError,
RateLimitError,
APITimeoutError,
} from "pareta";
const MAX_IN_FLIGHT = 16;
async function complete(pa: Pareta, doc: string) {
const resp = await pa.chat.completions.create({
model: "ep_invoice_extract",
messages: [{ role: "user", content: `Extract the total from:\n${doc}` }],
temperature: 0,
});
return resp.choices[0].message.content;
}
async function main(documents: string[]) {
const pa = Pareta.fromEnv();
// Worker pool bounds in-flight calls (see the semaphore section above).
const outcomes: Array<{ ok: true; value: string | null } | { ok: false; error: unknown }> =
new Array(documents.length);
let cursor = 0;
const worker = async () => {
for (let i = cursor++; i < documents.length; i = cursor++) {
try {
outcomes[i] = { ok: true, value: await complete(pa, documents[i]) };
} catch (error) {
outcomes[i] = { ok: false, error };
}
}
};
await Promise.all(
Array.from({ length: Math.min(MAX_IN_FLIGHT, documents.length) }, worker),
);
const ok: Array<[string, string | null]> = [];
const failed: Array<[string, unknown]> = [];
for (let i = 0; i < documents.length; i++) {
const outcome = outcomes[i];
if (outcome.ok) {
ok.push([documents[i], outcome.value]);
} else if (outcome.error instanceof InsufficientCreditsError) {
// Org balance hit zero mid-batch. Nothing else will succeed —
// stop and top up in the dashboard.
throw outcome.error;
} else {
failed.push([documents[i], outcome.error]);
}
}
console.log(`${ok.length} succeeded, ${failed.length} failed`);
for (const [doc, err] of failed) {
let reason: string;
if (err instanceof EndpointNotReadyError) {
reason = "endpoint cold/stopped"; // 503
} else if (err instanceof RateLimitError) {
reason = "rate limited after retries"; // 429
} else if (err instanceof APITimeoutError) {
reason = "timed out";
} else if (err instanceof ParetaError) {
reason = err.message;
} else {
reason = String(err);
}
console.log(` retry ${JSON.stringify(doc)}: ${reason}`);
}
return { ok, failed };
}
const docs = Array.from({ length: 50 }, (_, i) => `doc ${i}`);
await main(docs);
Notes on the error types (all subclass ParetaError):
InsufficientCreditsError(402) is fatal for the whole batch, not just one task. The balance is shared across the org, so once it hits zero every remaining call fails the same way. Stop early and top up.EndpointNotReadyError(503) means the endpoint is stopped, cold-starting, or its provider is down. Often transient; safe to retry the failed subset after astart()or a short wait.RateLimitError(429) surfaces only after the SDK exhausts its own retries. If you see these, lowerMAX_IN_FLIGHT.APITimeoutErroris raised aftermax_retries. Long generations may need a largertimeout=on the client (default is 60s, 10s connect).
Because return_exceptions=True never cancels siblings, you can re-run just
failed on the next pass.
Streaming under concurrency
Async streaming mirrors the sync path with one twist: create(...) is a
coroutine, so you await it — and because stream=True, the awaited result is
an async iterator you then async for over. (Non-streaming create is awaited
too, returning the ChatCompletion.)
Python
import asyncio
from pareta import AsyncPareta
async def stream_into(pa, prompt, sink):
stream = await pa.chat.completions.create( # await → returns the async iterator
model="ep_invoice_extract",
messages=[{"role": "user", "content": prompt}],
stream=True,
)
async for chunk in stream:
sink.append(chunk.choices[0].delta.content or "")
async def main():
sinks = {p: [] for p in ("Summarize doc A.", "Summarize doc B.")}
async with AsyncPareta.from_env() as pa:
await asyncio.gather(
*(stream_into(pa, p, sink) for p, sink in sinks.items())
)
for prompt, parts in sinks.items():
print(prompt, "->", "".join(parts))
asyncio.run(main())
TypeScript
In TS, stream: true makes create(...) return an AsyncIterable<ChatCompletionChunk>
directly (not a Promise — don't await the call), which you drive with
for await … of. Run several at once with Promise.all, exactly like the batch.
import { Pareta } from "pareta";
async function streamInto(pa: Pareta, prompt: string, sink: string[]) {
const stream = pa.chat.completions.create({ // stream:true → AsyncIterable, no await
model: "ep_invoice_extract",
messages: [{ role: "user", content: prompt }],
stream: true,
});
for await (const chunk of stream) {
sink.push(chunk.choices[0].delta.content || "");
}
}
const sinks = new Map<string, string[]>([
["Summarize doc A.", []],
["Summarize doc B.", []],
]);
const pa = Pareta.fromEnv();
await Promise.all(
[...sinks].map(([prompt, sink]) => streamInto(pa, prompt, sink)),
);
for (const [prompt, parts] of sinks) {
console.log(prompt, "->", parts.join(""));
}
chunk.choices[0].delta.content is the incremental text. Streams end on
[DONE]; the SDK closes them for you. Retries only cover the initial handshake,
so a mid-stream drop raises immediately rather than silently resuming.
Concurrent eval runs
The same pattern launches several eval runs at once. With
wait=True, each runs.create(...) polls the run to completion using
asyncio.sleep under the hood, so the coroutines yield the event loop while they
wait. That makes a fan-out of wait=True runs genuinely concurrent.
Python
import asyncio
from pareta import AsyncPareta
# Compare candidate aliases on three tasks, each against its frontier baselines.
JOBS = [
("contract-key-fields", ["qwen-1", "llama-2"]),
("invoice-extraction", ["qwen-1", "pixtral-1"]),
("doc-classification", ["llama-1", "qwen-2"]),
]
async def eval_task(pa, eval_set_id, models):
run = await pa.evals.runs.create(
eval_set=eval_set_id,
models=models, # per-task open-model aliases
frontier="benchmarked", # frontier models on this task's leaderboard
wait=True, # polls until terminal (completed/failed)
timeout=1200.0,
)
return run
async def main(eval_set_ids):
async with AsyncPareta.from_env() as pa:
runs = await asyncio.gather(
*(eval_task(pa, sid, models)
for sid, (_, models) in zip(eval_set_ids, JOBS)),
return_exceptions=True,
)
for outcome in runs:
if isinstance(outcome, BaseException):
print("run failed to launch/finish:", outcome)
continue
run = outcome
if run.status == "failed":
print(f"{run.id}: failed — {run.error_detail}")
continue
print(f"{run.id}: {run.status} cost ${run.cost}") # Decimal dollars
for r in run.results:
print(f" {r.model_id} ({r.kind}): quality={r.quality_mean}")
# eval_set_ids from earlier pa.evals.sets.create(...) calls
asyncio.run(main(["es_abc", "es_def", "es_ghi"]))
TypeScript
runs.create({ wait: true }) returns a Promise that polls to terminal, so a
Promise.allSettled fans out several runs concurrently. The timeout and
pollInterval options are in seconds (matching the Python eval poller). The
billed total is run.cost — a fixed-2dp string here, not a Decimal — and
run.costMicroUsd keeps the raw micro-USD integer.
import { Pareta } from "pareta";
// Compare candidate aliases on three tasks, each against its frontier baselines.
const JOBS: Array<[string, string[]]> = [
["contract-key-fields", ["qwen-1", "llama-2"]],
["invoice-extraction", ["qwen-1", "pixtral-1"]],
["doc-classification", ["llama-1", "qwen-2"]],
];
async function evalTask(pa: Pareta, evalSetId: string, models: string[]) {
return pa.evals.runs.create({
evalSet: evalSetId,
models, // per-task open-model aliases
frontier: "benchmarked", // frontier models on this task's leaderboard
wait: true, // polls until terminal (completed/failed)
timeout: 1200,
});
}
async function main(evalSetIds: string[]) {
const pa = Pareta.fromEnv();
const runs = await Promise.allSettled(
evalSetIds.map((sid, i) => evalTask(pa, sid, JOBS[i][1])),
);
for (const outcome of runs) {
if (outcome.status === "rejected") {
console.log("run failed to launch/finish:", outcome.reason);
continue;
}
const run = outcome.value;
if (run.status === "failed") {
console.log(`${run.id}: failed — ${run.errorDetail}`);
continue;
}
console.log(`${run.id}: ${run.status} cost $${run.cost}`); // dollars (string)
for (const r of run.results) {
console.log(` ${r.modelId} (${r.kind}): quality=${r.qualityMean}`);
}
}
}
// evalSetIds from earlier pa.evals.sets.create(...) calls
await main(["es_abc", "es_def", "es_ghi"]);
Eval runs are metered too: the org balance is debited for the compute (open
candidates plus any frontier baselines), and an empty balance raises
InsufficientCreditsError (402). run.cost is the billed total as Decimal
dollars floored to cents; run.cost_micro_usd keeps the raw micro-USD integer if
you need sub-cent precision. Result model_ids are per-task public aliases, not
real model ids.
If you do not want to block on completion, drop wait=True and the call returns
immediately with a queued EvalRun; await pa.evals.runs.wait(run.id) later, or
poll pa.evals.runs.retrieve(run.id) yourself.
Checklist
- One
AsyncParetaper process, shared across coroutines.async with(orawait pa.aclose()) to release the pool. asyncio.gather(*tasks)to fan out;return_exceptions=Trueso one failure does not cancel the batch.asyncio.Semaphore(N)to bound in-flight calls — your backpressure valve.- Treat
InsufficientCreditsErroras batch-fatal; retryEndpointNotReadyErrorand the residualRateLimitErrorsubset. - Always
await create(). Forstream=Truethe awaited result is the async iterator youasync forover; for non-streaming it is theChatCompletion.
See also
- The client — constructor,
from_env, retries, timeouts - Chat completions — full inference surface and streaming
- Endpoints — deploy, start/stop, and operate endpoints
- Evals — eval sets, runs, and frontier baselines
- Errors — the full
ParetaErrorhierarchy