
Suite du projet DevOps Buddy : notre assistant sait répondre aux questions. Maintenant, on va lui apprendre à auditer plusieurs fichiers de config en parallèle — Dockerfiles, pipelines CI, docker-compose… En 20 minutes, vous saurez diviser vos temps de traitement par 5 à 10.
Ce que vous allez apprendre
Section intitulée « Ce que vous allez apprendre »- Comprendre la différence entre
completion()(sync) etacompletion()(async) - Utiliser
asyncio.gather()pour lancer plusieurs appels LLM en parallèle - Analyser un fichier DevOps et obtenir un rapport structuré (type, problèmes, score)
- Limiter la concurrence avec
asyncio.Semaphorepour éviter le rate limiting - Gérer les erreurs dans un contexte asynchrone sans bloquer les autres tâches
Ce qu’on va construire
Section intitulée « Ce qu’on va construire »Un analyseur qui prend un dossier de fichiers et génère un rapport d’audit pour chaque fichier, en parallèle :
📁 projet/├── Dockerfile├── docker-compose.yml├── .gitlab-ci.yml└── terraform/main.tf
🔍 Analyse en cours...
✅ Dockerfile [2.1s] - 3 problèmes trouvés✅ docker-compose [1.8s] - 1 amélioration suggérée✅ .gitlab-ci.yml [2.3s] - OK, bonnes pratiques respectées✅ main.tf [1.9s] - 2 problèmes de sécurité
⏱️ Total: 2.5s (au lieu de ~8s en séquentiel)Pourquoi l’asynchrone ?
Section intitulée « Pourquoi l’asynchrone ? »Un appel LLM prend 1 à 5 secondes. En mode synchrone :
| Fichiers | Temps sync | Temps async | Gain |
|---|---|---|---|
| 5 | ~15s | ~3s | 5x |
| 10 | ~30s | ~4s | 7x |
| 20 | ~60s | ~6s | 10x |
Étape 1 : comprendre acompletion()
Section intitulée « Étape 1 : comprendre acompletion() »acompletion() est la version asynchrone de completion() :
import asynciofrom litellm import acompletionfrom dotenv import load_dotenv
load_dotenv()
async def main(): response = await acompletion( model="gpt-4.1-mini", messages=[ {"role": "user", "content": "Qu'est-ce qu'un multi-stage build Docker ?"} ] ) print(response.choices[0].message.content)
asyncio.run(main())Étape 2 : analyser un fichier
Section intitulée « Étape 2 : analyser un fichier »Créons une fonction qui analyse un fichier :
import asynciofrom pathlib import Pathfrom litellm import acompletionfrom dotenv import load_dotenv
load_dotenv()
SYSTEM_PROMPT = """Tu es DevOps Buddy, un auditeur expert en fichiers de configuration DevOps.
Pour chaque fichier, tu dois :1. Identifier le type de fichier (Dockerfile, CI, Terraform, etc.)2. Lister les problèmes de sécurité3. Suggérer des améliorations de performance4. Vérifier les bonnes pratiques
Format de réponse :## Type: [type]## Problèmes: [liste ou "Aucun"]## Améliorations: [liste ou "Aucune"]## Score: [1-10]"""
async def analyze_file(filepath: Path) -> dict: """Analyse un fichier DevOps et retourne un rapport."""
content = filepath.read_text()
response = await acompletion( model="gpt-4.1-mini", messages=[ {"role": "system", "content": SYSTEM_PROMPT}, {"role": "user", "content": f"Analyse ce fichier '{filepath.name}':\n\n```\n{content}\n```"} ], max_tokens=500 )
return { "file": filepath.name, "analysis": response.choices[0].message.content, "tokens": response.usage.total_tokens }
async def main(): # Exemple avec un Dockerfile dockerfile = Path("Dockerfile") if not dockerfile.exists(): dockerfile.write_text("""FROM python:3.11RUN pip install flaskCOPY . /appWORKDIR /appCMD ["python", "app.py"]""")
result = await analyze_file(dockerfile) print(f"📄 {result['file']}") print(result['analysis']) print(f"\n📊 Tokens: {result['tokens']}")
asyncio.run(main())Étape 3 : paralléliser avec asyncio.gather()
Section intitulée « Étape 3 : paralléliser avec asyncio.gather() »La puissance de l’async : lancer N analyses simultanément :
import asyncioimport timefrom pathlib import Pathfrom litellm import acompletionfrom dotenv import load_dotenv
load_dotenv()
SYSTEM_PROMPT = """Tu es DevOps Buddy, un auditeur DevOps. Analyse ce fichier et donne :1. Type de fichier2. Problèmes trouvés (sécurité, performance)3. Score /10
Sois concis (max 150 mots)."""
async def analyze_file(filepath: Path) -> dict: """Analyse un fichier et chronomètre.""" start = time.time()
content = filepath.read_text()
response = await acompletion( model="gpt-4.1-mini", messages=[ {"role": "system", "content": SYSTEM_PROMPT}, {"role": "user", "content": f"Fichier: {filepath.name}\n\n```\n{content}\n```"} ], max_tokens=300 )
elapsed = time.time() - start
return { "file": filepath.name, "time": elapsed, "analysis": response.choices[0].message.content, "tokens": response.usage.total_tokens }
async def analyze_project(directory: Path) -> list[dict]: """Analyse tous les fichiers DevOps d'un projet en parallèle."""
# Fichiers DevOps à rechercher patterns = [ "Dockerfile*", "docker-compose*.yml", "docker-compose*.yaml", ".gitlab-ci.yml", ".github/workflows/*.yml", "*.tf", "Jenkinsfile", "ansible/*.yml" ]
files = [] for pattern in patterns: files.extend(directory.glob(pattern))
if not files: print("⚠️ Aucun fichier DevOps trouvé") return []
print(f"🔍 Analyse de {len(files)} fichiers en parallèle...\n")
# Lancer TOUTES les analyses en parallèle results = await asyncio.gather( *[analyze_file(f) for f in files] )
return results
async def main(): # Créer des fichiers de test test_dir = Path(".")
# Dockerfile de test Path("Dockerfile").write_text("""FROM ubuntu:latestRUN apt-get update && apt-get install -y python3 curl wget gitCOPY . /appRUN chmod 777 /appCMD ["python3", "app.py"]""")
# docker-compose de test Path("docker-compose.yml").write_text("""version: '3.8'services: web: build: . ports: - "80:80" environment: - DEBUG=true - DB_PASSWORD=secret123""")
# GitLab CI de test Path(".gitlab-ci.yml").write_text("""stages: - build - deploy
build: stage: build script: - docker build -t myapp . - docker push myapp
deploy: stage: deploy script: - kubectl apply -f k8s/""")
start = time.time() results = await analyze_project(test_dir) total_time = time.time() - start
# Afficher les résultats for r in results: print(f"{'='*50}") print(f"📄 {r['file']} [{r['time']:.1f}s]") print(f"{'='*50}") print(r['analysis']) print()
# Résumé print(f"\n{'='*50}") print(f"📊 RÉSUMÉ") print(f"{'='*50}") print(f"Fichiers analysés: {len(results)}") print(f"Temps total: {total_time:.2f}s") sum_individual = sum(r['time'] for r in results) print(f"Temps si séquentiel: ~{sum_individual:.2f}s") print(f"Gain: {sum_individual/total_time:.1f}x plus rapide")
asyncio.run(main())Exécutez et observez le gain de temps :
python 08_parallel_analysis.pyÉtape 4 : limiter les appels simultanés
Section intitulée « Étape 4 : limiter les appels simultanés »Les APIs ont des limites de requêtes. Utilisez un semaphore pour contrôler le parallélisme :
import asynciofrom pathlib import Pathfrom litellm import acompletionfrom dotenv import load_dotenv
load_dotenv()
# Maximum 5 appels API simultanésMAX_CONCURRENT = 5semaphore = asyncio.Semaphore(MAX_CONCURRENT)
async def analyze_with_limit(filepath: Path, system_prompt: str) -> dict: """Analyse un fichier avec contrôle du parallélisme."""
async with semaphore: # Attend son tour si 5 appels sont déjà en cours content = filepath.read_text()
response = await acompletion( model="gpt-4.1-mini", messages=[ {"role": "system", "content": system_prompt}, {"role": "user", "content": f"Analyse: {filepath.name}\n\n{content}"} ] )
return { "file": filepath.name, "result": response.choices[0].message.content }
async def analyze_many(files: list[Path]) -> list[dict]: """Analyse plusieurs fichiers avec limite de concurrence."""
system_prompt = "Tu es un auditeur DevOps. Analyse brièvement ce fichier."
# Lance toutes les tâches — le semaphore contrôle le débit tasks = [analyze_with_limit(f, system_prompt) for f in files] results = await asyncio.gather(*tasks)
return results
async def main(): # Simule 15 fichiers à analyser files = [Path(f"config_{i}.yml") for i in range(15)] for f in files: f.write_text(f"# Config {f.name}\nkey: value")
print(f"🔍 Analyse de {len(files)} fichiers (max {MAX_CONCURRENT} simultanés)...") results = await analyze_many(files) print(f"✅ {len(results)} fichiers analysés")
# Nettoyage for f in files: f.unlink()
asyncio.run(main())Étape 5 : gestion des erreurs robuste
Section intitulée « Étape 5 : gestion des erreurs robuste »En production, gérez les erreurs sans interrompre le traitement :
import asynciofrom pathlib import Pathfrom litellm import acompletionfrom litellm.exceptions import RateLimitError, APIConnectionErrorfrom dotenv import load_dotenvimport time
load_dotenv()
async def analyze_safe(filepath: Path, max_retries: int = 3) -> dict: """Analyse avec retries et gestion d'erreurs."""
for attempt in range(max_retries): try: content = filepath.read_text()
response = await acompletion( model="gpt-4.1-mini", messages=[ {"role": "user", "content": f"Analyse brièvement: {content[:500]}"} ], timeout=30 )
return { "file": filepath.name, "status": "success", "result": response.choices[0].message.content }
except RateLimitError: wait = 2 ** attempt print(f"⏳ {filepath.name}: rate limit, attente {wait}s...") await asyncio.sleep(wait)
except APIConnectionError: print(f"🔌 {filepath.name}: connexion échouée, retry {attempt + 1}/{max_retries}") await asyncio.sleep(1)
except Exception as e: return { "file": filepath.name, "status": "error", "result": str(e) }
return { "file": filepath.name, "status": "failed", "result": "Échec après plusieurs tentatives" }
async def main(): files = [Path("Dockerfile")] # Votre liste de fichiers
# return_exceptions=True évite l'arrêt si une tâche échoue results = await asyncio.gather( *[analyze_safe(f) for f in files], return_exceptions=True )
# Résumé success = sum(1 for r in results if isinstance(r, dict) and r.get("status") == "success") print(f"\n✅ {success}/{len(results)} fichiers analysés avec succès")
asyncio.run(main())Structure du projet
Section intitulée « Structure du projet »lab-devops-buddy/├── .env├── 01_question_simple.py # Guide précédent├── ...├── 06_async_basic.py # Premier appel async├── 07_analyze_file.py # Analyse d'un fichier├── 08_parallel_analysis.py # Analyse parallèle├── 09_rate_limited.py # Avec limite de concurrence└── 10_error_handling.py # Gestion des erreursÀ retenir
Section intitulée « À retenir »acompletion(): version async decompletion()asyncio.gather(): lance N appels en parallèle- Semaphore : contrôle le nombre d’appels simultanés
return_exceptions=True: continue malgré les erreurs- Gain réel : 5x à 10x plus rapide sur des lots de fichiers
Prochaines étapes
Section intitulée « Prochaines étapes »Notre assistant analyse des fichiers, mais il ne connaît que ses connaissances de base. Dans le prochain guide, on va lui ajouter une base de connaissances DevOps pour qu’il réponde avec du contexte pertinent.