Guides
Guide: Polling for Results
How to implement a robust polling loop for async scan results.
Why polling?
Exona scans involve live web research and multi-step AI analysis. This typically takes 30–120 seconds: too long for a synchronous HTTP connection to remain open. Instead, you create a scan and poll for the result.
This guide shows you how to implement a production-grade polling loop.
Basic polling loop
import requests
import time
def run_scan_and_wait(company_name, company_website, api_key):
"""
Create a scan and wait for it to complete. Returns the result object.
"""
base_url = "https://platform.exonalab.com/api/v1"
headers = {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json",
}
# Step 1: Create the scan
response = requests.post(
f"{base_url}/scans",
headers=headers,
json={"company_name": company_name, "company_website": company_website},
timeout=15,
)
response.raise_for_status()
scan_id = response.json()["id"]
print(f"Scan started: {scan_id}")
# Step 2: Poll until complete
while True:
r = requests.get(f"{base_url}/scans/{scan_id}", headers=headers, timeout=15)
r.raise_for_status()
data = r.json()
if data["status"] == "completed":
return data["result"]
elif data["status"] == "failed":
error = data.get("error", {})
raise RuntimeError(
f"Scan failed [{error.get('code')}]: {error.get('message')}"
)
# Respect Retry-After header; default to 10 seconds
wait = int(r.headers.get("Retry-After", 10))
print(f"Status: {data['status']}: waiting {wait}s")
time.sleep(wait)Production-grade version
For production systems, add a timeout, exponential backoff on errors, and proper logging.
import requests
import time
import logging
logger = logging.getLogger(__name__)
class ExonaClient:
def __init__(self, api_key, base_url="https://platform.exonalab.com/api/v1"):
self.api_key = api_key
self.base_url = base_url
self.session = requests.Session()
self.session.headers.update({
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json",
})
def create_scan(self, company_name, company_website, **kwargs):
response = self.session.post(
f"{self.base_url}/scans",
json={"company_name": company_name, "company_website": company_website, **kwargs},
timeout=15,
)
response.raise_for_status()
data = response.json()
logger.info(f"Scan created: {data['id']}")
return data
def get_scan(self, scan_id):
response = self.session.get(
f"{self.base_url}/scans/{scan_id}",
timeout=15,
)
response.raise_for_status()
return response.json(), response.headers
def scan_and_wait(
self,
company_name,
company_website,
poll_interval=10,
timeout=300,
**kwargs,
):
scan = self.create_scan(company_name, company_website, **kwargs)
scan_id = scan["id"]
# If this was a cache hit, return immediately
if scan.get("status") == "completed":
logger.info(f"Cache hit for {scan_id}")
return scan["result"]
deadline = time.time() + timeout
consecutive_errors = 0
while time.time() < deadline:
try:
data, headers = self.get_scan(scan_id)
consecutive_errors = 0 # reset on success
if data["status"] == "completed":
logger.info(f"Scan {scan_id} completed")
return data["result"]
elif data["status"] == "failed":
error = data.get("error", {})
raise RuntimeError(
f"Scan {scan_id} failed [{error.get('code')}]: {error.get('message')}"
)
wait = int(headers.get("Retry-After", poll_interval))
logger.debug(f"Scan {scan_id}: {data['status']}: waiting {wait}s")
time.sleep(wait)
except requests.HTTPError as e:
consecutive_errors += 1
if e.response.status_code == 429:
wait = int(e.response.headers.get("Retry-After", 60))
logger.warning(f"Rate limited: waiting {wait}s")
time.sleep(wait)
elif consecutive_errors >= 3:
raise
else:
backoff = 2 ** consecutive_errors
logger.warning(f"HTTP error (attempt {consecutive_errors}): backing off {backoff}s")
time.sleep(backoff)
raise TimeoutError(f"Scan {scan_id} did not complete within {timeout}s")
# Usage
client = ExonaClient(api_key="exo_live_xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx")
result = client.scan_and_wait(
company_name="Acme AI Ltd",
company_website="https://acme.ai",
max_age_days=30,
)
print(result["risk_assessment"]["overall_risk_level"])Scanning multiple companies in parallel
import concurrent.futures
companies = [
{"company_name": "Acme AI", "company_website": "https://acme.ai"},
{"company_name": "Zenith ML", "company_website": "https://zenith.ml"},
{"company_name": "Cortex Risk", "company_website": "https://cortexrisk.com"},
]
client = ExonaClient(api_key="exo_live_xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx")
def scan_company(company):
try:
result = client.scan_and_wait(
company_name=company["company_name"],
company_website=company["company_website"],
max_age_days=30,
)
return {
"company": company["company_name"],
"risk_level": result["risk_assessment"]["overall_risk_level"],
"error": None,
}
except Exception as e:
return {"company": company["company_name"], "risk_level": None, "error": str(e)}
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
results = list(executor.map(scan_company, companies))
for r in results:
if r["error"]:
print(f"{r['company']}: ERROR: {r['error']}")
else:
print(f"{r['company']}: {r['risk_level']}")Recommended poll intervals
| Scenario | Interval |
|---|---|
| Interactive (user waiting) | 5 seconds |
| Background batch | 15 seconds |
| Very large batch (100+ companies) | 30 seconds |
Polling more frequently than every 5 seconds provides no benefit and will cause you to hit the rate limit faster.