Docs
Early Access hello@polariapi.com
Client Libraries

Build with Polari in minutes,
not days.

The Python SDK wraps the full four-layer pipeline in an async, typed client. Authentication, job polling, retries, and cost tracking — handled. You get structured intelligence back.

pypi polari-sdk 0.2.0
pip install polari-sdk
Python 3.10+
Python
Async client with full Layer 0–3 support, typed Pydantic responses, retries, and cost tracking.
Available now — v0.2.0
pypi.org/project/polari-sdk →
Coming soon
JavaScript / TypeScript
Same interface, same types. Single polari-sdk npm package with TypeScript definitions built in.
In design — notify me at hello@polariapi.com

Setup

Installation & auth

BASH
pip install polari-sdk

Set your API key as an environment variable and use from_env(), or pass it directly:

BASH
export POLARI_API_KEY=pk_live_your_key_here
PYTHON
from polari import PolariClient # Recommended — reads POLARI_API_KEY from environment client = PolariClient.from_env() # Or explicit client = PolariClient(api_key="pk_live_your_key")

Authentication

API keys & namespace mode

Your API key carries two things: your tier (which layers you can access) and your namespace mode (whose data you read and write). Both are set at key issuance — they are not runtime parameters. No SDK configuration is needed to use either.

Mode Writes to Reads from Quality gate Tier
public (default) Shared pool Shared pool Enforced at 0.53 Any
private Your namespace only Your namespace only Bypassed Pro+
shared Your namespace only Your namespace + shared pool Bypassed on your writes Pro+
Requesting a private namespace: During signup or at any time, contact support@polariapi.com and specify which mode you need. We'll issue a new key — no SDK changes required. Private namespace is available on Professional and Enterprise tiers. The key itself is the capability token; there is no runtime flag to flip.

The pipeline

How articles move through the layers

Each layer builds on the last. An article ID from Layer 0 is what every downstream call needs.

0
Submit & score — client.layer0.analyze()
Send an article. The SDK handles the async job queue internally — submit, poll, retrieve. You get back a quality score, token count, and an article_id. That ID is your key into every other layer. For public namespace keys, articles below quality 0.53 are rejected at the Layer 0 → Layer 1 handoff on the server side; the SDK surfaces this as success=False on the Layer 1 result.
1
Extract meaning — client.layer1.process()
Pass the article_id from Layer 0. Get back named entities (people, orgs, locations, events), sentence-level embeddings, and an article sentiment score. You can also query the full entity index — search across all processed articles by name, type, or time window.
2
Cluster into stories — client.layer2.cluster()
Polari finds which other articles across all sources are covering the same story and groups them into a cluster. You get a cluster_id, a confidence score, and whether the article joined an existing cluster or seeded a new one.
3
Intelligence graph — client.layer3 Pro+
Query the entity relationship graph, surface trending entities by velocity, and access cross-cluster narrative structure. Layer 3 operates on the full corpus — no article ID needed. Requires Professional or Enterprise tier.
~
Public discourse — Echo layer Pro+ · REST only
Echo enriches story clusters with public reaction from Reddit and Bluesky — controversy scoring, counter-narrative detection, and entity delta between press coverage and public discussion. Echo is a parallel enrichment layer, not a sequential processing step. Call it by cluster_id after Layer 2. A client.echo SDK wrapper is on the roadmap; for now use the REST API directly.

Walkthrough

End-to-end pipeline

A single article from submission to story cluster in one script.

PYTHON
import asyncio from polari import PolariClient, ArticleInput async def main(): async with PolariClient.from_env() as client: article = ArticleInput( title="Fed Holds Rates Steady Amid Inflation Concerns", content="The Federal Reserve held interest rates unchanged on Wednesday...", url="https://reuters.com/markets/fed-rates-june-2026", source="Reuters", ) # ── Layer 0: submit, poll, retrieve ───────────────────── l0 = await client.layer0.analyze(article) print(f"[L0] quality={l0.quality_score:.3f} id={l0.article_id}") # → [L0] quality=0.821 id=art_8f7h2k9s # ── Layer 1: entities + sentiment ─────────────────────── l1 = await client.layer1.process( article_id=l0.article_id, title=article.title, content=article.content, ) print(f"[L1] entities={l1.stats.entity_count} sentiment={l1.sentiment_label}") # → [L1] entities=8 sentiment=neutral # people=['Jerome Powell'] orgs=['Federal Reserve', 'Reuters'] # ── Layer 2: cluster into a story ─────────────────────── l2 = await client.layer2.cluster(l0.article_id) print(f"[L2] cluster={l2.cluster_id} confidence={l2.confidence:.3f} new={l2.is_new}") # → [L2] cluster=clus_9x3k2m8f confidence=0.941 new=False # ── Layer 3: trending entities (Pro+) ─────────────────── trends = await client.layer3.get_trending_entities(limit=5) for t in trends["trends"]: print(f"[L3] {t['entity']} velocity={t['velocity']:.2f}") asyncio.run(main())

Layer 0 — client.layer0

analyze(article, include_embedding=False)

Submit an article through Layer 0. Handles the async job queue internally — polls until complete, then returns the result. Deduplicates on URL: submitting the same URL twice returns the same article_id.

PYTHON
result = await client.layer0.analyze(article, include_embedding=True) result.article_id # "art_8f7h2k9s" — use in all downstream calls result.quality_score # 0.821 — 0–1; below 0.53 filtered at L0→L1 (public namespace) result.token_count # 842 result.semantic_hash # content fingerprint for deduplication result.embedding # List[float] len 384, or [0.0]*384 if not requested result.is_duplicate # True if URL was already processed

analyze_batch(articles, batch_size=10)

Submit multiple articles. All jobs are polled concurrently — much faster than sequential calls. batch_size controls articles per API call (max 50).

PYTHON
results = await client.layer0.analyze_batch(articles, batch_size=20) for r in results: print(f"{r.article_id} quality={r.quality_score:.2f}")

search(query, limit=10)

Semantic search across the processed article corpus. Returns articles ranked by embedding similarity to the query — not keyword match.

PYTHON
results = await client.layer0.search( query="Federal Reserve interest rate decision", limit=10, ) for r in results: print(f"{r['title']} similarity={r['similarity']:.3f}")
Known limitation — search on large corpora. The semantic search endpoint performs a ChromaDB scan that scales with corpus size. On corpora above ~50k articles, response times may increase noticeably. For high-frequency search use cases, cache results or use limit to bound the scan. This is a known infrastructure item being addressed.

Layer 1 — client.layer1

process(article_id, title, content, ...)

Run semantic analysis on an article that has passed through Layer 0. Returns entities, sentence embeddings, and sentiment. For public namespace keys, articles below the quality gate are silently dropped — if success=False, check layer0_result.quality_score.

PYTHON
l1 = await client.layer1.process( article_id="art_8f7h2k9s", title="Fed Holds Rates Steady", content="The Federal Reserve held interest rates...", ) l1.success # True if processed; False if filtered by quality gate l1.entities # {"PERSON": ["Jerome Powell"], "ORG": ["Federal Reserve"]} l1.sentiment_score # 0.23 (–1 negative → +1 positive) l1.sentiment_label # "neutral" l1.stats.entity_count # 8 l1.stats.sentence_count # 14 — always populated, even on cache hits l1.sentences # list of sentence embeddings (empty on cache-hit re-process)
Known limitation — sentences on cached re-process. If process() is called on an article that was already processed, l1.sentences returns an empty list. l1.stats.sentence_count is still correct. This is a server-side issue being addressed — no workaround in the SDK.

get_entities(query, entity_type, min_mentions, time_range, limit, offset)

Search the entity index across the full corpus — no article ID needed. Use this to find which people, organizations, or locations are appearing most in coverage right now.

PYTHON
entities = await client.layer1.get_entities( query="Federal Reserve", entity_type="ORG", # PERSON · ORG · GPE · LOC · EVENT min_mentions=5, time_range="7d", # 1d · 7d · 30d limit=20, ) for e in entities["entities"]: print(f"{e['name']} mentions={e['mention_count']}")

get_entity_timeline(entity_name, entity_type, time_range)

Temporal distribution of mentions for a named entity — useful for spotting when a story broke, how fast it spread, when it faded.

PYTHON
timeline = await client.layer1.get_entity_timeline( entity_name="Jerome Powell", entity_type="PERSON", time_range="30d", ) for bucket in timeline["timeline"]: print(f"{bucket['date']} count={bucket['count']}")

get_entity_sentiment(entity_name, entity_type, time_range)

Aggregate sentiment across all articles mentioning a named entity, with a time-bucketed breakdown for tracking shifts.

PYTHON
sentiment = await client.layer1.get_entity_sentiment( entity_name="Jerome Powell", entity_type="PERSON", time_range="30d", ) sentiment["average_sentiment"] # 0.23 (–1 to +1) sentiment["label"] # "neutral"

Layer 2 — client.layer2

cluster(article_id)

Place an article into the story cluster graph. Requires the article to have been processed through Layer 1. Returns the cluster the article joined (or created), with a confidence score.

PYTHON
result = await client.layer2.cluster("art_8f7h2k9s") result.cluster_id # "clus_9x3k2m8f" result.confidence # 0.941 result.is_new # False — joined existing cluster result.already_clustered # True if article was previously clustered

cluster_batch(article_ids)

Cluster multiple articles in a single call. Returns per-article cluster assignments plus aggregate stats.

PYTHON
batch = await client.layer2.cluster_batch(["art_abc", "art_def", "art_ghi"]) batch.stats.clustering_rate # fraction of articles placed into clusters batch.stats.clusters_formed # new clusters seeded by this batch for c in batch.clusters: print(f"{c.cluster_id} articles={c.article_count} sources={c.source_count}")

Layer 3 — client.layer3 Pro+

Layer 3 operates on the full corpus — no article ID needed. All methods return plain dicts. Starter tier keys receive a 403 tier_restricted response.

PYTHON
# Graph statistics stats = await client.layer3.get_stats() # Entity relationships for a specific cluster rels = await client.layer3.get_cluster_relationships("clus_9x3k2m8f") # Trending entities across the corpus right now trends = await client.layer3.get_trending_entities(min_velocity=0.5, limit=20) # Rebuild the intelligence graph (synchronous — holds until complete) await client.layer3.build_graph()

Echo layer — REST API Pro+ · SDK wrapper coming

Echo enriches story clusters with public reaction from Reddit and Bluesky — controversy scoring, counter-narrative detection, and entity delta between editorial coverage and public discussion. It is a parallel enrichment layer, not a sequential processing step: call it by cluster_id after Layer 2, at any time.

SDK wrapper is on the roadmap. For now, call the Echo REST API directly using standard requests or httpx with the same API key you use for the pipeline layers. The base URL is https://echo.api.polariapi.com.

GET /v1/echo/story/{cluster_id}

Retrieve Echo enrichment for a single story cluster. Returns controversy score, sentiment, counter-narrative signal, and the entity delta between press and public coverage.

PYTHON
import httpx ECHO_BASE = "https://echo.api.polariapi.com" headers = {"Authorization": f"Bearer {API_KEY}"} async with httpx.AsyncClient() as http: r = await http.get(f"{ECHO_BASE}/v1/echo/story/clus_9x3k2m8f", headers=headers) echo = r.json() echo["controversy_score"] # 0.84 — 0.0 low, 1.0 highly contested echo["sentiment_label"] # "negative" | "neutral" | "positive" echo["counter_narrative_detected"] # True echo["counter_narrative_confidence"] # 0.79 echo["counter_narrative_summary"] # human-readable summary of the counter-narrative echo["novel_entities"] # entities public mentions that press coverage doesn't echo["editorial_entities"] # entities from press coverage echo["dominant_themes"] # top themes from public discussion echo["comment_count_analyzed"] # total Reddit + Bluesky comments processed echo["source_platform"] # "reddit" | "bluesky"

POST /v1/echo/stories/batch

Retrieve Echo enrichments for multiple clusters in one call. Returns a list in the same order as the input IDs. Clusters with no enrichment yet return null.

PYTHON
r = await http.post( f"{ECHO_BASE}/v1/echo/stories/batch", headers=headers, json={"story_ids": ["clus_9x3k2m8f", "clus_31e6c353", "clus_7a2313527876"]}, ) enrichments = r.json()["enrichments"] # list, same order as input

GET /v1/echo/trending

Stories currently ranked highest by controversy score. Useful for surfacing clusters where public and editorial framing diverge most sharply.

PYTHON
r = await http.get( f"{ECHO_BASE}/v1/echo/trending", headers=headers, params={"limit": 20, "min_controversy": 0.6}, ) for story in r.json()["stories"]: print(f"{story['story_id']} controversy={story['controversy_score']:.2f}")

Errors

Error handling

AuthenticationError and ValidationError are not retried. Everything else uses exponential backoff with jitter.

PYTHON
from polari.exceptions import ( AuthenticationError, RateLimitError, ValidationError, RetryExhaustedError, PolariError, ) try: result = await client.layer0.analyze(article) except AuthenticationError: print("Invalid or expired API key") # not retried except RateLimitError: print("Rate limit hit — back off and retry") # retried with backoff except ValidationError as e: print(f"Bad request: {e}") # not retried except RetryExhaustedError: print("Max retries exceeded") except PolariError as e: print(f"Unexpected error: {e}")
HIERARCHY
PolariError ├── PolariAPIError(status_code, message) │ ├── RateLimitError # 429 — retried with backoff │ ├── AuthenticationError # 401 — not retried │ ├── ValidationError # 400/422 — not retried │ └── ServerError # 5xx — retried with backoff ├── NetworkError # connectivity — retried ├── TimeoutError # request timeout — retried ├── ConfigurationError # invalid config — not retried ├── ProcessingError # article processing failure └── RetryExhaustedError # max retries exceeded

Configuration

Client options

PYTHON
client = PolariClient( api_key="pk_live_your_key", timeout=60, # seconds per request max_retries=3, # exponential backoff enable_metrics=True, # track latency + success rate enable_cost_tracking=True, # per-layer cost accumulation # Custom URLs — only needed for local dev or private deployments base_url="https://layer0.api.polariapi.com", layer1_url="https://layer1.api.polariapi.com", layer2_url="https://layer2.api.polariapi.com", layer3_url="https://layer3.api.polariapi.com", )
Method Returns Description
health_check() HealthStatus Checks all four layers concurrently. Per-layer boolean fields.
get_metrics() Metrics Request counts, average latency, success rate.
get_cost_summary() CostSummary Per-layer cost accumulation. L0 $0.001 · L1 $0.002 · L2 $0.001 · L3 $0.003.

Reference

ArticleInput

Field Type Description
content required string Article body. Minimum 50 characters.
title required string Article headline.
url string Canonical URL — used as deduplication key.
source string Publisher name.
author string Byline.
published_at datetime Original publication time.
metadata dict Arbitrary key/value pairs passed through to the result.

Resources

Further reading

GitHub
Source, issues, and changelog. Open to stars and bug reports.
github.com/polariapi/polari-sdk →
Layer docs
What each layer does, what it returns, and how the pipeline connects.
Layer 0 — Token Intelligence →