Passer au contenu principal

Aperçu

Plusieurs endpoints API sont disponibles pour créer et gérer les Sessions d’Upload dans la plateforme Paradigm. Le script présenté dans cet article fournit une implémentation asynchrone pour uploader plusieurs fichiers. Il utilise les bibliothèques Python asyncio et aiohttp pour gérer efficacement les d’uploads de fichiers simultanés. Avant de commencer tout d’upload, le script désactive automatiquement toutes les sessions d’upload existantes et crée une nouvelle session dédiée. Vous pouvez choisir où uploader vos documents :
  • Dans votre espace privé
  • Dans votre espace entreprise
  • Dans un espace de travail partagé avec d’autres membres sélectionnés de l’entreprise
Par défaut, les documents seront uploadés dans l’espace entreprise.

Prérequis

  • Python 3.7+
  • Installation des packages requis : pip install aiohttp tqdm
    • aiohttp : Pour les requêtes HTTP asynchrones
    • tqdm : Pour la visualisation de la barre de progression

Contrôle de la Concurrence

Le script utilise un sémaphore pour limiter le nombre d’uploads simultanés, évitant la surcharge du serveur tout en maintenant un débit efficace. Ajustez le paramètre --batch_size en fonction de vos conditions réseau et de la capacité du serveur.

Arguments de Ligne de Commande

python upload_session_async.py \
    --api-key="your_api_key" \
    --base-url="http://localhost:8000" \
    --files-dir="/chemin/vers/fichiers" \
    --document-location="company" \
    --workspace-id=123

Description des Arguments

ArgumentDescriptionValeur par défaut
--api-key (requis)Clé API Paradigm (peut aussi être définie via la variable d’environnement PARADIGM_API_KEY)None
--base-urlURL de base de l’API Paradigmhttp://localhost:8000
--files-dir (requis)Répertoire contenant les fichiers à uploaderNone
--document-locationOù stocker les documents (choix : private, company, workspace)company
--workspace-idID de l’espace de travail pour le stockage dans un espace de travail (requis si document_location est workspace)None
--batch-sizeNombre maximum d’uploads simultanés5

API de Statut de Session d’Upload

Un endpoint API dédié est disponible pour surveiller le progrès de la session d’upload, vérifier quels documents ont été intégrés avec succès, et identifier les échecs. Endpoint : GET /api/v2/upload-session/{uuid:uuid} Réponse : L’endpoint retourne des détails sur la session d’upload, incluant son identifiant unique, le statut de validité, les horodatages de création et de dernière mise à jour, et une liste des documents uploadés avec leurs statuts (intégré ou échoué).

Script d’Upload Complet

Le script ci-dessous fournit un suivi en temps réel et assure la transparence dans le flux de traitement des documents :
# /// script
# requires-python = ">=3.7"
# dependencies = [
#   "aiohttp",
#   "tqdm",
# ]
# ///

import argparse
import asyncio
import json
import logging
import os
from pathlib import Path

import aiohttp
from tqdm.asyncio import tqdm

logger = logging.getLogger(__name__)

def configure_logging(log_path=None, clean_logs=False):
    """Configurer le système de logs"""
    # Configuration basique du logger
    root_logger = logging.getLogger()
    root_logger.setLevel(logging.INFO)

    # Formateur pour les logs
    formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")

    # Gestionnaire pour la console
    console_handler = logging.StreamHandler()
    console_handler.setFormatter(formatter)
    root_logger.addHandler(console_handler)

    # Gestionnaire pour le fichier si un chemin est spécifié
    if log_path:
        log_file = Path(log_path)

        # Nettoyer les logs existants si demandé
        if clean_logs and log_file.exists():
            log_file.unlink()

        # Créer le répertoire parent si nécessaire
        log_file.parent.mkdir(parents=True, exist_ok=True)

        file_handler = logging.FileHandler(log_file)
        file_handler.setFormatter(formatter)
        root_logger.addHandler(file_handler)
        logger.info(f"Les logs seront sauvegardés dans {log_file.absolute()}")

class AsyncUploadSession:
    def __init__(
        self, base_url: str, api_key: str, document_location: str, workspace_id: int | None = None, batch_size: int = 5
    ):
        self.base_url = base_url
        self.headers = {"Authorization": f"Bearer {api_key}"}
        self.document_location = document_location
        self.workspace_id = workspace_id
        self.batch_size = batch_size

    async def create_session(self, session: aiohttp.ClientSession) -> dict:
        """Créer une nouvelle session d'upload"""
        # D'abord, désactiver toutes les sessions existantes
        async with session.post(f"{self.base_url}/api/v2/upload-session/deactivate", headers=self.headers) as response:
            await response.json()
            logger.info("Sessions existantes désactivées avec succès")

        # Puis créer une nouvelle session
        async with session.post(f"{self.base_url}/api/v2/upload-session", headers=self.headers) as response:
            response.raise_for_status()
            session_data = await response.json()
            logger.info("Nouvelle session créée avec succès")
            return session_data

    async def upload_file(
        self,
        session: aiohttp.ClientSession,
        session_id: str,
        file_path: Path,
        semaphore: asyncio.Semaphore,
    ) -> tuple[Path, dict]:
        """uploader un seul fichier vers la session avec des données supplémentaires"""
        # Utiliser le sémaphore pour limiter les appels simultanés
        async with semaphore:
            data = aiohttp.FormData()
            data.add_field("description", "Téléchargement de fichier")
            data.add_field("session_id", session_id)
            data.add_field("collection_type", self.document_location)

            if self.document_location == "workspace" and self.workspace_id:
                data.add_field("workspace_id", str(self.workspace_id))

            with open(file_path, "rb") as file_content:
                data.add_field("file", file_content, filename=file_path.name, content_type="application/octet-stream")

                async with session.post(
                    f"{self.base_url}/api/v2/upload-session/{session_id}", headers=self.headers, data=data
                ) as response:
                    status = response.status
                    response_body = await response.text()
                    if status >= 400:
                        logger.error(f"Erreur {status} lors de l'upload du fichier {file_path.name}: {response_body}")
                        response.raise_for_status()
                    result = json.loads(response_body)
                    return file_path, result

    async def upload_files(self, files_dir: Path):
        async with aiohttp.ClientSession() as session:
            semaphore = asyncio.Semaphore(self.batch_size)
            logger.info(f"Limitation des téléchargements simultanés à {self.batch_size}")

            session_data = await self.create_session(session)
            session_id = session_data["uuid"]

            # Récupérer la liste des fichiers (y compris dans les sous-dossiers)
            files = [f for f in files_dir.rglob("*") if f.is_file()]

            tasks = [self.upload_file(session, session_id, file_path, semaphore) for file_path in files]

            async def safe_task(task):
                try:
                    return await task
                except Exception as e:
                    return e

            results = await tqdm.gather(*[safe_task(t) for t in tasks])

            successful_uploads = []
            failed_uploads = []

            for file_path, result in zip(files, results):
                if isinstance(result, Exception):
                    logger.error(f"Échec de l'upload du fichier : {file_path.name}: {result}")
                    failed_uploads.append(file_path.name)
                else:
                    successful_uploads.append(file_path.name)

            # Afficher le résumé
            logger.info("=" * 50)
            logger.info(f"Résumé de la session d'upload pour {len(files)} fichiers :")
            logger.info(f"✅ Uploadés avec succès : {len(successful_uploads)} fichiers")
            if failed_uploads:
                logger.info(f"❌ Échec de l'upload : {len(failed_uploads)} fichiers")
                logger.info("Fichiers échoués :")
                for failed_file in failed_uploads:
                    logger.info(f"  - {failed_file}")
            logger.info("=" * 50)

            return {"total": len(files), "successful": successful_uploads, "failed": failed_uploads}

async def main_async():
    parser = argparse.ArgumentParser(description="Script utilisable pour uploader plusieurs fichiers vers Paradigm")
    parser.add_argument("--api-key", default=None, help="Clé API Paradigm à utiliser.")
    parser.add_argument("--base-url", default="http://localhost:8000", help="URL de base à utiliser.")
    parser.add_argument("--files-dir", type=Path, required=True, help="Répertoire contenant les fichiers à uploader")
    parser.add_argument(
        "--document-location",
        type=str,
        choices=["private", "company", "workspace"],
        default="company",
        help="Espace Paradigm où stocker les documents. Par défaut l'espace entreprise.",
    )
    parser.add_argument(
        "--workspace-id",
        type=int,
        help="L'ID de l'espace de travail où stocker les documents. "
        "Applicable uniquement si l'emplacement du document est défini sur 'workspace'.",
    )
    parser.add_argument(
        "--batch-size",
        type=int,
        default=5,
        help="Nombre maximum d'uploads simultanés à effectuer",
    )
    parser.add_argument(
        "--log-path",
        type=str,
        default="upload_session.log",
        help="Chemin où sauvegarder les logs. Si non fourni, les logs ne seront affichés que dans la console.",
    )
    parser.add_argument(
        "--clean-logs",
        action="store_true",
        help="Si défini, le fichier de log existant sera supprimé avant de commencer.",
    )
    args = parser.parse_args()

    # Configurer le système de logs
    configure_logging(args.log_path, args.clean_logs)

    if args.api_key is None:
        api_key = os.getenv("PARADIGM_API_KEY", None)
    else:
        api_key = args.api_key

    assert api_key is not None, "Aucune clé API fournie."

    # Vérifier si le répertoire existe
    if not args.files_dir.exists():
        raise FileNotFoundError(f"Répertoire introuvable : {args.files_dir}")

    logger.info(f"Utilisation du répertoire : {args.files_dir.absolute()}")
    files = list(args.files_dir.glob("*"))
    logger.info(f"Fichiers trouvés : {[f.name for f in files]}")

    uploader = AsyncUploadSession(args.base_url, api_key, args.document_location, args.workspace_id, args.batch_size)
    await uploader.upload_files(args.files_dir)

def main():
    asyncio.run(main_async())

if __name__ == "__main__":
    main()
I