New: API Reference docs are live — integrate Cleanlist enrichment into your apps. View API docs →
Guides
Receiving Webhooks

TL;DR: ACK with 200 fast, persist the payload, process async. Use workflow_id as your idempotency key. If delivery fails, fall back to GET /public/enrich/status.

Receiving Webhooks

Cleanlist webhooks deliver the complete result set of a bulk enrichment to a URL you provide. This guide covers building a production-ready receiver — not just a "hello world."

The contract

PropertyValue
MethodPOST
Content typeapplication/json
BodySee Webhooks reference
Expected ACKHTTP 2xx
Timeout10 seconds
Retries5 attempts
Backoff1s, 2s, 4s, 8s, 16s

If you don't return a 2xx within 10 seconds, Cleanlist treats the attempt as failed and schedules a retry.

The 5 rules of a production receiver

1. ACK fast, process async

Don't do real work inside the request handler. Persist the payload, return 200, then process asynchronously in a background job. This way a slow downstream system never causes Cleanlist to retry.

2. Be idempotent

Cleanlist may deliver the same workflow_id multiple times if a previous attempt was ambiguous (e.g., 200 returned but TCP closed before Cleanlist read it). Use workflow_id as a unique key — if you've already processed it, ACK 200 and return.

3. Validate the workflow id

Cleanlist does not sign payloads. Anyone who knows your URL could POST anything to it. Defend by:

  • Storing the workflow_id of every bulk enrichment you submit
  • Rejecting webhooks for unknown workflow ids (404)
  • Optionally restricting your webhook URL by source IP (Cleanlist's outbound IPs are stable per environment — contact support for the list)

4. Handle truncation

If results_truncated > 0 in the payload, the result set was too large to fit. Fall back to:

GET /api/v1/public/enrich/status?workflow_id={workflow_id}

The results_endpoint field in the payload contains this URL pre-built.

5. Plan for retries

If you persist before ACKing and your DB is down, you have a choice:

  • Return 5xx → Cleanlist retries (good if the outage is brief)
  • Return 2xx → Cleanlist marks delivered (good if your queue can guarantee re-delivery)

Pick the one that matches your infra and document it so on-call knows what to expect.

Reference implementations

Express (Node.js)

import express from "express";
import { processCleanlistResults } from "./jobs.js";
import { db } from "./db.js";
 
const app = express();
app.use(express.json({ limit: "10mb" }));
 
app.post("/webhooks/cleanlist", async (req, res) => {
  const payload = req.body;
  const workflowId = payload?.workflow_id;
 
  if (!workflowId) {
    return res.status(400).json({ error: "missing workflow_id" });
  }
 
  // Rule 3: validate against our own records
  const record = await db.workflows.findOne({ id: workflowId });
  if (!record) {
    return res.status(404).json({ error: "unknown workflow_id" });
  }
 
  // Rule 2: idempotency
  if (record.processed_at) {
    return res.status(200).end();
  }
 
  // Rule 1: ACK fast, process async
  await db.webhookInbox.insert({
    workflow_id: workflowId,
    payload,
    received_at: new Date(),
  });
 
  // Kick off background processing
  await jobQueue.enqueue("process-cleanlist-results", { workflowId });
 
  res.status(200).end();
});
 
app.listen(3000, () => console.log("Receiver listening on :3000"));

The background job:

export async function processCleanlistResults({ workflowId }) {
  const inbox = await db.webhookInbox.findOne({ workflow_id: workflowId });
  const { results, summary, status } = inbox.payload;
 
  for (const row of results ?? []) {
    await db.leads.upsert({
      where: { source_id: row.task_id },
      create: {
        source_id: row.task_id,
        email: row.primary_email,
        email_status: row.primary_email_status,
        ...row,
      },
      update: {
        email: row.primary_email,
        email_status: row.primary_email_status,
        ...row,
      },
    });
  }
 
  // Handle truncation
  if (inbox.payload.results_truncated > 0) {
    await fetchFullResults(workflowId);
  }
 
  await db.workflows.update(workflowId, {
    processed_at: new Date(),
    summary,
    final_status: status,
  });
}

Flask (Python)

from flask import Flask, request, jsonify
from rq import Queue
from redis import Redis
 
from .db import db
from .jobs import process_cleanlist_results
 
app = Flask(__name__)
queue = Queue(connection=Redis())
 
@app.post("/webhooks/cleanlist")
def cleanlist_webhook():
    payload = request.get_json(silent=True) or {}
    workflow_id = payload.get("workflow_id")
 
    if not workflow_id:
        return jsonify(error="missing workflow_id"), 400
 
    record = db.workflows.find_one({"id": workflow_id})
    if not record:
        return jsonify(error="unknown workflow_id"), 404
 
    if record.get("processed_at"):
        return "", 200  # idempotent ACK
 
    db.webhook_inbox.insert_one({
        "workflow_id": workflow_id,
        "payload": payload,
        "received_at": datetime.utcnow(),
    })
 
    queue.enqueue(process_cleanlist_results, workflow_id)
 
    return "", 200

AWS Lambda (Node.js)

import { SQSClient, SendMessageCommand } from "@aws-sdk/client-sqs";
 
const sqs = new SQSClient({});
 
export const handler = async (event) => {
  const payload = JSON.parse(event.body);
  const workflowId = payload?.workflow_id;
 
  if (!workflowId) {
    return { statusCode: 400, body: '{"error":"missing workflow_id"}' };
  }
 
  // Hand off to SQS for downstream processing — Lambda returns instantly
  await sqs.send(
    new SendMessageCommand({
      QueueUrl: process.env.RESULTS_QUEUE_URL,
      MessageBody: event.body,
      MessageDeduplicationId: workflowId, // FIFO de-dupe
      MessageGroupId: "cleanlist",
    }),
  );
 
  return { statusCode: 200, body: "" };
};

The downstream Lambda consuming the SQS queue is where the actual lead persistence and CRM sync happens. This pattern keeps your webhook receiver tiny and fast.

Verifying delivery

If you suspect a webhook didn't arrive, check the audit endpoint:

deliveries = requests.get(
    "https://api.cleanlist.ai/api/v1/public/webhooks/deliveries",
    headers={"Authorization": f"Bearer {API_KEY}"},
    params={"workflow_id": workflow_id},
).json()
 
for d in deliveries:
    print(d["attempt_number"], d["status"], d["response_status_code"], d["error_message"])

This is the source of truth for whether Cleanlist tried, what status it got back, and how long each attempt took.

Fallback: pull instead of push

If all webhook attempts fail, the result is still available via the polling endpoint:

result = requests.get(
    "https://api.cleanlist.ai/api/v1/public/enrich/status",
    headers={"Authorization": f"Bearer {API_KEY}"},
    params={"workflow_id": workflow_id},
).json()

A robust integration always has both code paths — webhook for real-time, polling as a backup.

Testing locally

Use a tunneling tool to expose your local server to the internet:

Pass the public URL as your webhook_url and you'll see the payload land in your local receiver in real time.

For pure inspection (no real receiver yet), webhook.site (opens in a new tab) gives you a unique URL and a live request log.

Common mistakes

MistakeSymptomFix
Doing all the work inside the request handlerCleanlist retries because of timeoutsACK fast, process async
Trusting the payload blindlyRisk of attacker spoofingValidate workflow_id against your records
Not handling results_truncatedMissing rows for large batchesCheck the field, fall back to status endpoint
Returning 5xx from a permanent errorCleanlist retries forever (well, 5 times)Return 200 + log; or fix the bug
Forgetting idempotencyDuplicate rows in your DBUse workflow_id as a unique key

Related