Diamond Careers
APIExemples

Exemples Python

Exemples Python (3.10+) avec httpx : authentification, lecture, écriture, téléversement, retry.

Cette page propose des exemples Python (3.10+) avec httpx (recommandée). Adaptable facilement à requests.

Installation

pip install httpx python-dotenv

Avec un fichier .env à la racine :

DIAMOND_BASE=https://api.diamondcareers.fr/diamond/v1
DIAMOND_EMAIL=integration@votre-etablissement.com
DIAMOND_PASSWORD=VotreMotDePasseTechnique

Client minimal

import os
import httpx
from dotenv import load_dotenv

load_dotenv()

DIAMOND_BASE = os.environ["DIAMOND_BASE"]

_token: str | None = None


def get_token() -> str:
    global _token
    if _token:
        return _token
    response = httpx.post(
        f"{DIAMOND_BASE}/auth/login",
        json={
            "email": os.environ["DIAMOND_EMAIL"],
            "password": os.environ["DIAMOND_PASSWORD"],
        },
        timeout=10.0,
    )
    response.raise_for_status()
    _token = response.json()["token"]
    return _token


def api(path: str, method: str = "GET", **kwargs) -> dict | list:
    headers = kwargs.pop("headers", {})
    headers["Authorization"] = f"Bearer {get_token()}"
    if "json" in kwargs:
        headers.setdefault("Content-Type", "application/json")
    response = httpx.request(
        method,
        f"{DIAMOND_BASE}{path}",
        headers=headers,
        timeout=30.0,
        **kwargs,
    )
    if not response.is_success:
        try:
            error = response.json()
        except Exception:
            error = {"message": response.text}
        raise httpx.HTTPStatusError(
            error.get("message", f"HTTP {response.status_code}"),
            request=response.request,
            response=response,
        )
    if response.status_code == 204:
        return {}
    return response.json()

Lister vos offres publiées

offers = api("/offres", params={"status": "publish", "per_page": 100})
print(f"{len(offers)} offres publiées")
for offer in offers:
    meta = offer.get("meta", {})
    print(f"- {offer['title']} ({meta.get('city')}) — {meta.get('contract_type')}")

Synchronisation par delta

from datetime import datetime, timezone

def sync_offers_incremental(since: datetime) -> int:
    since_iso = since.astimezone(timezone.utc).isoformat()
    page = 1
    processed = 0

    while True:
        offers = api("/offres", params={
            "filter[modified_after]": since_iso,
            "per_page": 100,
            "page": page,
        })
        if not offers:
            break
        for offer in offers:
            process_offer(offer)
            processed += 1
        if len(offers) < 100:
            break
        page += 1
    return processed

Créer une offre

new_offer = api("/offres", method="POST", json={
    "title": "Chef de partie pâtisserie — restaurant gastronomique",
    "meta": {
        "contract_type": "cdi",
        "city": "Paris",
        "country": "France",
        "salary_min": 32000,
        "salary_max": 38000,
        "salary_display": "32 000 € – 38 000 €",
        "description": "<p>Au sein d une brigade de 18 personnes...</p>",
        "missions": "<ul><li>Encadrer 4 commis</li></ul>",
        "profile_required": "<p>3 ans d expérience minimum.</p>",
        "start_date": "2026-09-01",
    },
})
print(f"Offre créée : {new_offer['id']}")

Faire évoluer le statut d'une candidature

def set_application_status(application_id: int, status: str, comment: str | None = None):
    payload = {"status": status}
    if comment:
        payload["comment"] = comment
    return api(f"/candidatures/{application_id}/status", method="PATCH", json=payload)


set_application_status(18472, "interview", "Entretien prévu le 18 mai à 14 h.")

Téléverser un visuel public

def upload_image(file_path: str, mime: str = "image/jpeg") -> dict:
    with open(file_path, "rb") as f:
        files = {"file": (os.path.basename(file_path), f, mime)}
        headers = {"Authorization": f"Bearer {get_token()}"}
        response = httpx.post(
            f"{DIAMOND_BASE}/upload",
            files=files,
            headers=headers,
            timeout=60.0,
        )
        response.raise_for_status()
        return response.json()


media = upload_image("./visuel-offre.jpg")
print(f"URL publique : {media['file_url']}")

Récupérer un CV (fichier privé)

def download_secure_file(path: str, dest: str) -> None:
    headers = {"Authorization": f"Bearer {get_token()}"}
    with httpx.stream(
        "GET",
        f"{DIAMOND_BASE}/secure-file",
        params={"path": path},
        headers=headers,
        timeout=60.0,
    ) as response:
        response.raise_for_status()
        with open(dest, "wb") as f:
            for chunk in response.iter_bytes():
                f.write(chunk)


download_secure_file("cv/9281/abcd-1234.pdf", "./telecharge.pdf")

Gestion des erreurs et retry

import time

def api_with_retry(path: str, method: str = "GET", max_retries: int = 3, **kwargs):
    last_error = None
    for attempt in range(max_retries):
        try:
            return api(path, method=method, **kwargs)
        except httpx.HTTPStatusError as e:
            last_error = e
            status = e.response.status_code
            if 400 <= status < 500 and status != 429:
                raise
            delay = min(30, 1 * (2 ** attempt))
            print(f"Tentative {attempt + 1} échouée ({status}), retry dans {delay}s")
            time.sleep(delay)
    raise last_error

Itération paresseuse

from typing import Iterator

def iterate_all(path: str, per_page: int = 100, **params) -> Iterator[dict]:
    page = 1
    while True:
        items = api(path, params={**params, "per_page": per_page, "page": page})
        if not items:
            return
        for item in items:
            yield item
        if len(items) < per_page:
            return
        page += 1


for offer in iterate_all("/offres", status="publish"):
    process_offer(offer)

Synchronisation parallèle maîtrisée

import asyncio
import httpx

async def sync_parallel(items: list[dict], concurrency: int = 5):
    semaphore = asyncio.Semaphore(concurrency)

    async def push(item):
        async with semaphore:
            async with httpx.AsyncClient() as client:
                response = await client.post(
                    f"{DIAMOND_BASE}/offres",
                    headers={"Authorization": f"Bearer {get_token()}"},
                    json=item,
                    timeout=30.0,
                )
                response.raise_for_status()
                return response.json()

    return await asyncio.gather(*[push(item) for item in items])

Bonnes pratiques

  • Cachez le jeton mais pas plus longtemps que sa durée de vie réelle (lisez le claim exp).
  • Retry avec backoff exponentiel pour les erreurs 5xx et 429.
  • Loguez avec un format structuré (JSON) pour faciliter le debugging et la surveillance.
  • httpx.AsyncClient ou aiohttp pour les charges importantes : meilleure performance que requests.
  • Validez les données reçues avant transmission à votre ATS : Pydantic est un excellent choix.

On this page