# /// script
# requires-python = ">=3.7"
# dependencies = [
# "aiohttp",
# "tqdm",
# ]
# ///
import argparse
import asyncio
import json
import logging
import os
from pathlib import Path
import aiohttp
from tqdm.asyncio import tqdm
logger = logging.getLogger(__name__)
def configure_logging(log_path=None, clean_logs=False):
"""Configurer le système de logs"""
# Configuration basique du logger
root_logger = logging.getLogger()
root_logger.setLevel(logging.INFO)
# Formateur pour les logs
formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")
# Gestionnaire pour la console
console_handler = logging.StreamHandler()
console_handler.setFormatter(formatter)
root_logger.addHandler(console_handler)
# Gestionnaire pour le fichier si un chemin est spécifié
if log_path:
log_file = Path(log_path)
# Nettoyer les logs existants si demandé
if clean_logs and log_file.exists():
log_file.unlink()
# Créer le répertoire parent si nécessaire
log_file.parent.mkdir(parents=True, exist_ok=True)
file_handler = logging.FileHandler(log_file)
file_handler.setFormatter(formatter)
root_logger.addHandler(file_handler)
logger.info(f"Les logs seront sauvegardés dans {log_file.absolute()}")
class AsyncUploadSession:
def __init__(
self, base_url: str, api_key: str, document_location: str, workspace_id: int | None = None, batch_size: int = 5
):
self.base_url = base_url
self.headers = {"Authorization": f"Bearer {api_key}"}
self.document_location = document_location
self.workspace_id = workspace_id
self.batch_size = batch_size
async def create_session(self, session: aiohttp.ClientSession) -> dict:
"""Créer une nouvelle session d'upload"""
# D'abord, désactiver toutes les sessions existantes
async with session.post(f"{self.base_url}/api/v2/upload-session/deactivate", headers=self.headers) as response:
await response.json()
logger.info("Sessions existantes désactivées avec succès")
# Puis créer une nouvelle session
async with session.post(f"{self.base_url}/api/v2/upload-session", headers=self.headers) as response:
response.raise_for_status()
session_data = await response.json()
logger.info("Nouvelle session créée avec succès")
return session_data
async def upload_file(
self,
session: aiohttp.ClientSession,
session_id: str,
file_path: Path,
semaphore: asyncio.Semaphore,
) -> tuple[Path, dict]:
"""uploader un seul fichier vers la session avec des données supplémentaires"""
# Utiliser le sémaphore pour limiter les appels simultanés
async with semaphore:
data = aiohttp.FormData()
data.add_field("description", "Téléchargement de fichier")
data.add_field("session_id", session_id)
data.add_field("collection_type", self.document_location)
if self.document_location == "workspace" and self.workspace_id:
data.add_field("workspace_id", str(self.workspace_id))
with open(file_path, "rb") as file_content:
data.add_field("file", file_content, filename=file_path.name, content_type="application/octet-stream")
async with session.post(
f"{self.base_url}/api/v2/upload-session/{session_id}", headers=self.headers, data=data
) as response:
status = response.status
response_body = await response.text()
if status >= 400:
logger.error(f"Erreur {status} lors de l'upload du fichier {file_path.name}: {response_body}")
response.raise_for_status()
result = json.loads(response_body)
return file_path, result
async def upload_files(self, files_dir: Path):
async with aiohttp.ClientSession() as session:
semaphore = asyncio.Semaphore(self.batch_size)
logger.info(f"Limitation des téléchargements simultanés à {self.batch_size}")
session_data = await self.create_session(session)
session_id = session_data["uuid"]
# Récupérer la liste des fichiers (y compris dans les sous-dossiers)
files = [f for f in files_dir.rglob("*") if f.is_file()]
tasks = [self.upload_file(session, session_id, file_path, semaphore) for file_path in files]
async def safe_task(task):
try:
return await task
except Exception as e:
return e
results = await tqdm.gather(*[safe_task(t) for t in tasks])
successful_uploads = []
failed_uploads = []
for file_path, result in zip(files, results):
if isinstance(result, Exception):
logger.error(f"Échec de l'upload du fichier : {file_path.name}: {result}")
failed_uploads.append(file_path.name)
else:
successful_uploads.append(file_path.name)
# Afficher le résumé
logger.info("=" * 50)
logger.info(f"Résumé de la session d'upload pour {len(files)} fichiers :")
logger.info(f"✅ Uploadés avec succès : {len(successful_uploads)} fichiers")
if failed_uploads:
logger.info(f"❌ Échec de l'upload : {len(failed_uploads)} fichiers")
logger.info("Fichiers échoués :")
for failed_file in failed_uploads:
logger.info(f" - {failed_file}")
logger.info("=" * 50)
return {"total": len(files), "successful": successful_uploads, "failed": failed_uploads}
async def main_async():
parser = argparse.ArgumentParser(description="Script utilisable pour uploader plusieurs fichiers vers Paradigm")
parser.add_argument("--api-key", default=None, help="Clé API Paradigm à utiliser.")
parser.add_argument("--base-url", default="http://localhost:8000", help="URL de base à utiliser.")
parser.add_argument("--files-dir", type=Path, required=True, help="Répertoire contenant les fichiers à uploader")
parser.add_argument(
"--document-location",
type=str,
choices=["private", "company", "workspace"],
default="company",
help="Espace Paradigm où stocker les documents. Par défaut l'espace entreprise.",
)
parser.add_argument(
"--workspace-id",
type=int,
help="L'ID de l'espace de travail où stocker les documents. "
"Applicable uniquement si l'emplacement du document est défini sur 'workspace'.",
)
parser.add_argument(
"--batch-size",
type=int,
default=5,
help="Nombre maximum d'uploads simultanés à effectuer",
)
parser.add_argument(
"--log-path",
type=str,
default="upload_session.log",
help="Chemin où sauvegarder les logs. Si non fourni, les logs ne seront affichés que dans la console.",
)
parser.add_argument(
"--clean-logs",
action="store_true",
help="Si défini, le fichier de log existant sera supprimé avant de commencer.",
)
args = parser.parse_args()
# Configurer le système de logs
configure_logging(args.log_path, args.clean_logs)
if args.api_key is None:
api_key = os.getenv("PARADIGM_API_KEY", None)
else:
api_key = args.api_key
assert api_key is not None, "Aucune clé API fournie."
# Vérifier si le répertoire existe
if not args.files_dir.exists():
raise FileNotFoundError(f"Répertoire introuvable : {args.files_dir}")
logger.info(f"Utilisation du répertoire : {args.files_dir.absolute()}")
files = list(args.files_dir.glob("*"))
logger.info(f"Fichiers trouvés : {[f.name for f in files]}")
uploader = AsyncUploadSession(args.base_url, api_key, args.document_location, args.workspace_id, args.batch_size)
await uploader.upload_files(args.files_dir)
def main():
asyncio.run(main_async())
if __name__ == "__main__":
main()