Client Libraries
Build with Polari in minutes,
not days.
The Python SDK wraps the full four-layer pipeline in an async, typed client.
Authentication, job polling, retries, and cost tracking — handled. You get structured intelligence back.
Python
Async client with full Layer 0–3 support, typed Pydantic responses, retries, and
cost tracking.
Available now — v0.2.0
pypi.org/project/polari-sdk →
Coming soon
JavaScript / TypeScript
Same interface, same types. Single polari-sdk npm package with
TypeScript definitions built in.
In design — notify me at hello@polariapi.com
Setup
Installation & auth
Set your API key as an environment variable and use from_env(), or pass it directly:
export POLARI_API_KEY=pk_live_your_key_here
from polari import PolariClient
# Recommended — reads POLARI_API_KEY from environment
client = PolariClient.from_env()
# Or explicit
client = PolariClient(api_key="pk_live_your_key")
Authentication
API keys &
namespace mode
Your API key carries two things: your tier (which layers you can access) and your namespace mode (whose data
you read and write). Both are set at key issuance — they are not runtime parameters. No SDK configuration is
needed to use either.
| Mode |
Writes to |
Reads from |
Quality gate |
Tier |
| public (default) |
Shared pool |
Shared pool |
Enforced at 0.53 |
Any |
| private |
Your namespace only |
Your namespace only |
Bypassed |
Pro+ |
| shared |
Your namespace only |
Your namespace + shared pool |
Bypassed on your writes |
Pro+ |
Requesting a private namespace: During signup or at any time, contact
support@polariapi.com and specify which mode you need. We'll issue a
new key — no SDK changes required. Private namespace is available on Professional and Enterprise tiers. The
key itself is the capability token; there is no runtime flag to flip.
The pipeline
How articles move
through the layers
Each layer builds on the last.
An article ID from Layer 0 is what every downstream call needs.
0
Submit & score — client.layer0.analyze()
Send an article. The SDK handles the async job queue internally — submit, poll,
retrieve. You get back a quality score, token count, and an article_id. That ID is your key
into every other layer. For public namespace keys, articles below quality 0.53 are rejected
at the Layer 0 → Layer 1 handoff on the server side; the SDK surfaces this as success=False
on the Layer 1 result.
1
Extract meaning — client.layer1.process()
Pass the article_id from Layer 0. Get back named entities (people,
orgs, locations, events), sentence-level embeddings, and an article sentiment score. You can also query
the full entity index — search across all processed articles by name, type, or time window.
2
Cluster into stories — client.layer2.cluster()
Polari finds which other articles across all sources are covering the same
story and groups them into a cluster. You get a cluster_id, a confidence score, and whether
the article joined an existing cluster or seeded a new one.
3
Intelligence graph — client.layer3 Pro+
Query the entity relationship graph, surface trending entities by velocity, and
access cross-cluster narrative structure. Layer 3 operates on the full corpus — no article ID needed.
Requires Professional or Enterprise tier.
~
Public discourse — Echo layer Pro+ · REST only
Echo enriches story clusters with public reaction from Reddit and Bluesky —
controversy scoring, counter-narrative detection, and entity delta between press coverage and public
discussion. Echo is a parallel enrichment layer, not a sequential processing step. Call it by
cluster_id after Layer 2. A client.echo SDK wrapper is on the roadmap;
for now use the REST API directly.
Walkthrough
End-to-end
pipeline
A single article from
submission to story cluster in one script.
import asyncio
from polari import PolariClient, ArticleInput
async def main():
async with PolariClient.from_env() as client:
article = ArticleInput(
title="Fed Holds Rates Steady Amid Inflation Concerns",
content="The Federal Reserve held interest rates unchanged on Wednesday...",
url="https://reuters.com/markets/fed-rates-june-2026",
source="Reuters",
)
# ── Layer 0: submit, poll, retrieve ─────────────────────
l0 = await client.layer0.analyze(article)
print(f"[L0]
quality={l0.quality_score:.3f} id={l0.article_id}")
# → [L0] quality=0.821 id=art_8f7h2k9s
# ── Layer 1: entities + sentiment ───────────────────────
l1 = await client.layer1.process(
article_id=l0.article_id,
title=article.title,
content=article.content,
)
print(f"[L1]
entities={l1.stats.entity_count} sentiment={l1.sentiment_label}")
# → [L1] entities=8 sentiment=neutral
# people=['Jerome Powell'] orgs=['Federal Reserve', 'Reuters']
# ── Layer 2: cluster into a story ───────────────────────
l2 = await client.layer2.cluster(l0.article_id)
print(f"[L2]
cluster={l2.cluster_id} confidence={l2.confidence:.3f} new={l2.is_new}")
# → [L2] cluster=clus_9x3k2m8f confidence=0.941 new=False
# ── Layer 3: trending entities (Pro+) ───────────────────
trends = await client.layer3.get_trending_entities(limit=5)
for t in trends["trends"]:
print(f"[L3] {t['entity']}
velocity={t['velocity']:.2f}")
asyncio.run(main())
Layer 0 — client.layer0
analyze(article, include_embedding=False)
Submit an article through Layer 0. Handles the async job queue internally — polls until complete, then
returns the result. Deduplicates on URL: submitting the same URL twice returns the same
article_id.
result = await client.layer0.analyze(article, include_embedding=True)
result.article_id # "art_8f7h2k9s" — use in all downstream calls
result.quality_score # 0.821 — 0–1; below 0.53 filtered at L0→L1 (public
namespace)
result.token_count # 842
result.semantic_hash # content fingerprint for deduplication
result.embedding # List[float] len 384, or [0.0]*384 if not requested
result.is_duplicate # True if URL was already processed
analyze_batch(articles, batch_size=10)
Submit multiple articles. All jobs are polled concurrently — much faster than sequential calls.
batch_size controls articles per API call (max 50).
results = await client.layer0.analyze_batch(articles, batch_size=20)
for r in results:
print(f"{r.article_id}
quality={r.quality_score:.2f}")
search(query, limit=10)
Semantic search across the processed article corpus. Returns articles ranked by embedding similarity to the
query — not keyword match.
results = await client.layer0.search(
query="Federal Reserve interest rate decision",
limit=10,
)
for r in results:
print(f"{r['title']}
similarity={r['similarity']:.3f}")
Known limitation — search on large corpora. The semantic search endpoint performs a ChromaDB
scan that scales with corpus size. On corpora above ~50k articles, response times may increase noticeably. For
high-frequency search use cases, cache results or use limit to bound the scan. This is a known
infrastructure item being addressed.
Layer 1 — client.layer1
process(article_id, title, content, ...)
Run semantic analysis on an article that has passed through Layer 0. Returns entities, sentence embeddings,
and sentiment. For public namespace keys, articles below the quality gate are silently dropped — if
success=False, check layer0_result.quality_score.
l1 = await client.layer1.process(
article_id="art_8f7h2k9s",
title="Fed Holds Rates Steady",
content="The Federal Reserve held interest rates...",
)
l1.success # True if processed; False if filtered by quality gate
l1.entities # {"PERSON": ["Jerome Powell"], "ORG": ["Federal Reserve"]}
l1.sentiment_score # 0.23 (–1 negative → +1 positive)
l1.sentiment_label # "neutral"
l1.stats.entity_count # 8
l1.stats.sentence_count # 14 — always populated, even on cache hits
l1.sentences # list of sentence embeddings (empty on cache-hit re-process)
Known limitation — sentences on cached re-process. If process() is called on an
article that was already processed, l1.sentences returns an empty list.
l1.stats.sentence_count is still correct. This is a server-side issue being addressed — no
workaround in the SDK.
get_entities(query, entity_type, min_mentions, time_range, limit, offset)
Search the entity index across the full corpus — no article ID needed. Use this to find which people,
organizations, or locations are appearing most in coverage right now.
entities = await client.layer1.get_entities(
query="Federal Reserve",
entity_type="ORG", # PERSON · ORG · GPE · LOC · EVENT
min_mentions=5,
time_range="7d", # 1d · 7d · 30d
limit=20,
)
for e in entities["entities"]:
print(f"{e['name']}
mentions={e['mention_count']}")
get_entity_timeline(entity_name, entity_type, time_range)
Temporal distribution of mentions for a named entity — useful for spotting when a story broke, how fast it
spread, when it faded.
timeline = await client.layer1.get_entity_timeline(
entity_name="Jerome Powell",
entity_type="PERSON",
time_range="30d",
)
for bucket in timeline["timeline"]:
print(f"{bucket['date']}
count={bucket['count']}")
get_entity_sentiment(entity_name, entity_type, time_range)
Aggregate sentiment across all articles mentioning a named entity, with a time-bucketed breakdown for
tracking shifts.
sentiment = await client.layer1.get_entity_sentiment(
entity_name="Jerome Powell",
entity_type="PERSON",
time_range="30d",
)
sentiment["average_sentiment"] # 0.23 (–1 to +1)
sentiment["label"] # "neutral"
Layer 2 — client.layer2
cluster(article_id)
Place an article into the story cluster graph. Requires the article to have been processed through Layer 1.
Returns the cluster the article joined (or created), with a confidence score.
result = await client.layer2.cluster("art_8f7h2k9s")
result.cluster_id # "clus_9x3k2m8f"
result.confidence # 0.941
result.is_new # False — joined existing cluster
result.already_clustered # True if article was previously clustered
cluster_batch(article_ids)
Cluster multiple articles in a single call. Returns per-article cluster assignments plus aggregate stats.
batch = await client.layer2.cluster_batch(["art_abc", "art_def", "art_ghi"])
batch.stats.clustering_rate # fraction of articles placed into clusters
batch.stats.clusters_formed # new clusters seeded by this batch
for c in batch.clusters:
print(f"{c.cluster_id}
articles={c.article_count} sources={c.source_count}")
Layer 3 — client.layer3 Pro+
Layer 3 operates on the full
corpus — no article ID needed. All methods return plain dicts. Starter tier keys receive a
403 tier_restricted response.
# Graph statistics
stats = await client.layer3.get_stats()
# Entity relationships for a specific cluster
rels = await client.layer3.get_cluster_relationships("clus_9x3k2m8f")
# Trending entities across the corpus right now
trends = await client.layer3.get_trending_entities(min_velocity=0.5, limit=20)
# Rebuild the intelligence graph (synchronous — holds until complete)
await client.layer3.build_graph()
Echo layer — REST API Pro+ · SDK wrapper coming
Echo enriches story clusters with public reaction from Reddit and Bluesky — controversy scoring,
counter-narrative detection, and entity delta between editorial coverage and public discussion. It is a
parallel enrichment layer, not a sequential processing step: call it by cluster_id after Layer 2,
at any time.
SDK wrapper is on the roadmap. For now, call the Echo REST API directly using standard
requests or httpx with the same API key you use for the pipeline layers. The base
URL is https://echo.api.polariapi.com.
GET /v1/echo/story/{cluster_id}
Retrieve Echo enrichment for a single story cluster. Returns controversy score, sentiment, counter-narrative
signal, and the entity delta between press and public coverage.
import httpx
ECHO_BASE = "https://echo.api.polariapi.com"
headers = {"Authorization": f"Bearer {API_KEY}"}
async with httpx.AsyncClient() as http:
r = await http.get(f"{ECHO_BASE}/v1/echo/story/clus_9x3k2m8f", headers=headers)
echo = r.json()
echo["controversy_score"] # 0.84 — 0.0 low, 1.0 highly
contested
echo["sentiment_label"] # "negative" | "neutral" |
"positive"
echo["counter_narrative_detected"] # True
echo["counter_narrative_confidence"] # 0.79
echo["counter_narrative_summary"] # human-readable summary of
the counter-narrative
echo["novel_entities"] # entities public mentions that press
coverage doesn't
echo["editorial_entities"] # entities from press
coverage
echo["dominant_themes"] # top themes from public
discussion
echo["comment_count_analyzed"] # total Reddit + Bluesky
comments processed
echo["source_platform"] # "reddit" | "bluesky"
POST /v1/echo/stories/batch
Retrieve Echo enrichments for multiple clusters in one call. Returns a list in the same order as the
input IDs. Clusters with no enrichment yet return null.
r = await http.post(
f"{ECHO_BASE}/v1/echo/stories/batch",
headers=headers,
json={"story_ids": ["clus_9x3k2m8f", "clus_31e6c353", "clus_7a2313527876"]},
)
enrichments = r.json()["enrichments"] # list, same order as
input
GET /v1/echo/trending
Stories currently ranked highest by controversy score. Useful for surfacing clusters where public and
editorial framing diverge most sharply.
r = await http.get(
f"{ECHO_BASE}/v1/echo/trending",
headers=headers,
params={"limit": 20, "min_controversy": 0.6},
)
for story in r.json()["stories"]:
print(f"{story['story_id']}
controversy={story['controversy_score']:.2f}")
Errors
Error handling
AuthenticationError and ValidationError are not retried. Everything else uses
exponential backoff with jitter.
from polari.exceptions import (
AuthenticationError,
RateLimitError,
ValidationError,
RetryExhaustedError,
PolariError,
)
try:
result = await client.layer0.analyze(article)
except AuthenticationError:
print("Invalid or expired API key") # not retried
except RateLimitError:
print("Rate limit hit — back off and retry") # retried with backoff
except ValidationError as e:
print(f"Bad request: {e}")
# not retried
except RetryExhaustedError:
print("Max retries exceeded")
except PolariError as e:
print(f"Unexpected error:
{e}")
PolariError
├── PolariAPIError(status_code, message)
│ ├── RateLimitError # 429 — retried with backoff
│ ├── AuthenticationError # 401 — not retried
│ ├── ValidationError # 400/422 — not retried
│ └── ServerError # 5xx — retried with backoff
├── NetworkError # connectivity — retried
├── TimeoutError # request timeout — retried
├── ConfigurationError # invalid config — not retried
├── ProcessingError # article processing failure
└── RetryExhaustedError # max retries exceeded
Configuration
Client options
client = PolariClient(
api_key="pk_live_your_key",
timeout=60, # seconds per request
max_retries=3, # exponential backoff
enable_metrics=True, # track latency + success rate
enable_cost_tracking=True, # per-layer cost
accumulation
# Custom URLs — only needed for local dev or private deployments
base_url="https://layer0.api.polariapi.com",
layer1_url="https://layer1.api.polariapi.com",
layer2_url="https://layer2.api.polariapi.com",
layer3_url="https://layer3.api.polariapi.com",
)
| Method |
Returns |
Description |
| health_check() |
HealthStatus |
Checks all four layers concurrently. Per-layer boolean fields. |
| get_metrics() |
Metrics |
Request counts, average latency, success rate. |
| get_cost_summary() |
CostSummary |
Per-layer cost accumulation. L0 $0.001 · L1 $0.002 · L2 $0.001 · L3 $0.003. |
Reference
ArticleInput
| Field |
Type |
Description |
| content required |
string |
Article body. Minimum 50 characters. |
| title required |
string |
Article headline. |
| url |
string |
Canonical URL — used as deduplication key. |
| source |
string |
Publisher name. |
| author |
string |
Byline. |
| published_at |
datetime |
Original publication time. |
| metadata |
dict |
Arbitrary key/value pairs passed through to the result. |
Resources
Further reading