ExonaExona API
Guides

Guide: Polling for Results

How to implement a robust polling loop for async scan results.

Why polling?

Exona scans involve live web research and multi-step AI analysis. This typically takes 30–120 seconds: too long for a synchronous HTTP connection to remain open. Instead, you create a scan and poll for the result.

This guide shows you how to implement a production-grade polling loop.


Basic polling loop

import requests
import time
 
def run_scan_and_wait(company_name, company_website, api_key):
    """
    Create a scan and wait for it to complete. Returns the result object.
    """
    base_url = "https://platform.exonalab.com/api/v1"
    headers = {
        "Authorization": f"Bearer {api_key}",
        "Content-Type": "application/json",
    }
 
    # Step 1: Create the scan
    response = requests.post(
        f"{base_url}/scans",
        headers=headers,
        json={"company_name": company_name, "company_website": company_website},
        timeout=15,
    )
    response.raise_for_status()
    scan_id = response.json()["id"]
    print(f"Scan started: {scan_id}")
 
    # Step 2: Poll until complete
    while True:
        r = requests.get(f"{base_url}/scans/{scan_id}", headers=headers, timeout=15)
        r.raise_for_status()
        data = r.json()
 
        if data["status"] == "completed":
            return data["result"]
        elif data["status"] == "failed":
            error = data.get("error", {})
            raise RuntimeError(
                f"Scan failed [{error.get('code')}]: {error.get('message')}"
            )
 
        # Respect Retry-After header; default to 10 seconds
        wait = int(r.headers.get("Retry-After", 10))
        print(f"Status: {data['status']}: waiting {wait}s")
        time.sleep(wait)

Production-grade version

For production systems, add a timeout, exponential backoff on errors, and proper logging.

import requests
import time
import logging
 
logger = logging.getLogger(__name__)
 
class ExonaClient:
    def __init__(self, api_key, base_url="https://platform.exonalab.com/api/v1"):
        self.api_key = api_key
        self.base_url = base_url
        self.session = requests.Session()
        self.session.headers.update({
            "Authorization": f"Bearer {api_key}",
            "Content-Type": "application/json",
        })
 
    def create_scan(self, company_name, company_website, **kwargs):
        response = self.session.post(
            f"{self.base_url}/scans",
            json={"company_name": company_name, "company_website": company_website, **kwargs},
            timeout=15,
        )
        response.raise_for_status()
        data = response.json()
        logger.info(f"Scan created: {data['id']}")
        return data
 
    def get_scan(self, scan_id):
        response = self.session.get(
            f"{self.base_url}/scans/{scan_id}",
            timeout=15,
        )
        response.raise_for_status()
        return response.json(), response.headers
 
    def scan_and_wait(
        self,
        company_name,
        company_website,
        poll_interval=10,
        timeout=300,
        **kwargs,
    ):
        scan = self.create_scan(company_name, company_website, **kwargs)
        scan_id = scan["id"]
 
        # If this was a cache hit, return immediately
        if scan.get("status") == "completed":
            logger.info(f"Cache hit for {scan_id}")
            return scan["result"]
 
        deadline = time.time() + timeout
        consecutive_errors = 0
 
        while time.time() < deadline:
            try:
                data, headers = self.get_scan(scan_id)
                consecutive_errors = 0  # reset on success
 
                if data["status"] == "completed":
                    logger.info(f"Scan {scan_id} completed")
                    return data["result"]
                elif data["status"] == "failed":
                    error = data.get("error", {})
                    raise RuntimeError(
                        f"Scan {scan_id} failed [{error.get('code')}]: {error.get('message')}"
                    )
 
                wait = int(headers.get("Retry-After", poll_interval))
                logger.debug(f"Scan {scan_id}: {data['status']}: waiting {wait}s")
                time.sleep(wait)
 
            except requests.HTTPError as e:
                consecutive_errors += 1
                if e.response.status_code == 429:
                    wait = int(e.response.headers.get("Retry-After", 60))
                    logger.warning(f"Rate limited: waiting {wait}s")
                    time.sleep(wait)
                elif consecutive_errors >= 3:
                    raise
                else:
                    backoff = 2 ** consecutive_errors
                    logger.warning(f"HTTP error (attempt {consecutive_errors}): backing off {backoff}s")
                    time.sleep(backoff)
 
        raise TimeoutError(f"Scan {scan_id} did not complete within {timeout}s")
 
 
# Usage
client = ExonaClient(api_key="exo_live_xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx")
 
result = client.scan_and_wait(
    company_name="Acme AI Ltd",
    company_website="https://acme.ai",
    max_age_days=30,
)
 
print(result["risk_assessment"]["overall_risk_level"])

Scanning multiple companies in parallel

import concurrent.futures
 
companies = [
    {"company_name": "Acme AI", "company_website": "https://acme.ai"},
    {"company_name": "Zenith ML", "company_website": "https://zenith.ml"},
    {"company_name": "Cortex Risk", "company_website": "https://cortexrisk.com"},
]
 
client = ExonaClient(api_key="exo_live_xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx")
 
def scan_company(company):
    try:
        result = client.scan_and_wait(
            company_name=company["company_name"],
            company_website=company["company_website"],
            max_age_days=30,
        )
        return {
            "company": company["company_name"],
            "risk_level": result["risk_assessment"]["overall_risk_level"],
            "error": None,
        }
    except Exception as e:
        return {"company": company["company_name"], "risk_level": None, "error": str(e)}
 
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
    results = list(executor.map(scan_company, companies))
 
for r in results:
    if r["error"]:
        print(f"{r['company']}: ERROR: {r['error']}")
    else:
        print(f"{r['company']}: {r['risk_level']}")

ScenarioInterval
Interactive (user waiting)5 seconds
Background batch15 seconds
Very large batch (100+ companies)30 seconds

Polling more frequently than every 5 seconds provides no benefit and will cause you to hit the rate limit faster.

On this page