Cours de Systèmes Distribués
Bienvenue dans le Cours de Systèmes Distribués ! Ce cours vous guidera des concepts fondamentaux jusqu’à la construction d’un système fonctionnel basé sur le consensus.
Pourquoi Apprendre les Systèmes Distribués ?
Les systèmes distribués sont partout. Chaque fois que vous utilisez un service web moderne, vous interagissez avec un système distribué :
- Plateformes de médias sociaux gérant des milliards d’utilisateurs
- Sites de commerce électronique traitant des millions de transactions
- Services de streaming diffusant du contenu à l’échelle mondiale
- Bases de données cloud stockant et répliquant des données à travers les continents
Comprendre les systèmes distribués est essentiel pour construire des applications évolutives et fiables.
Aperçu du Cours
Ce cours enseigne les concepts des systèmes distribués à travers une mise en œuvre pratique. Sur 10 sessions, vous construirez quatre applications distribuées de complexité croissante :
| Application | Sessions | Concepts |
|---|---|---|
| Système File/Travail | 1-2 | Producteur-consommateur, passage de messages, tolérance aux pannes |
| Magasin avec Réplication | 3-5 | Partitionnement, théorème CAP, élection de leader, cohérence |
| Système de Chat | 6-7 | WebSockets, pub/sub, ordonnancement des messages |
| Système de Consensus | 8-10 | Algorithme Raft, réplication de journal, machine à états |
Ce que Vous Apprendrez
À la fin de ce cours, vous serez capable de :
- Expliquer les concepts des systèmes distribués y compris le théorème CAP, les modèles de cohérence et le consensus
- Construire un système de file d’attente fonctionnel avec le modèle producteur-consommateur
- Implémenter un magasin clé-valeur répliqué avec élection de leader
- Créer un système de chat en temps réel avec messagerie pub/sub
- Développer un système basé sur le consensus en utilisant l’algorithme Raft
- Déployer tous les systèmes en utilisant Docker Compose sur votre machine locale
Public Cible
Ce cours est conçu pour les développeurs qui :
- Ont une expérience de base en programmation (fonctions, classes, POO de base)
- Sont novices en systèmes distribués
- Veulent comprendre comment fonctionnent les applications distribuées modernes
- Préfèrent apprendre en pratiquant plutôt que la théorie pure
Prérequis
- Programmation : À l’aise avec TypeScript ou Python
- Ligne de Commande : Familiarité de base avec les commandes du terminal
- Docker : Nous couvrirons la configuration Docker dans la section Configuration Docker
Aucune expérience préalable en systèmes distribués n’est requise !
Progression du Cours
graph TB
subgraph "Partie I : Fondamentaux"
A1[Qu'est-ce qu'un SD ?] --> A2[Passage de Messages]
A2 --> A3[Système de File]
end
subgraph "Partie II : Magasin de Données"
B1[Partitionnement] --> B2[Théorème CAP]
B2 --> B3[Réplication]
B3 --> B4[Cohérence]
end
subgraph "Partie III : Temps Réel"
C1[WebSockets] --> C2[Pub/Sub]
C2 --> C3[Système de Chat]
end
subgraph "Partie IV : Consensus"
D1[Qu'est-ce que le Consensus ?] --> D2[Algorithme Raft]
D2 --> D3[Élection de Leader]
D3 --> D4[Réplication de Journal]
D4 --> D5[Système de Consensus]
end
A3 --> B1
B4 --> C1
C3 --> D1
Format du Cours
Chaque session de 1,5 heure suit cette structure :
graph LR
A[Révision<br/>5 min] --> B[Concept<br/>20 min]
B --> C[Diagramme<br/>10 min]
C --> D[Démonstration<br/>15 min]
D --> E[Exercice<br/>25 min]
E --> F[Test<br/>10 min]
F --> G[Résumé<br/>5 min]
Composants de Session
- Explication de Concept : Des explications claires et adaptées aux débutants des concepts fondamentaux
- Diagrammes Visuels : Des diagrammes Mermaid montrant l’architecture et le flux des données
- Démonstration en Direct : Procédure pas à pas du code
- Exercice Pratique : Exercices pratiques pour renforcer l’apprentissage
- Exécution et Test : Vérifiez que votre implémentation fonctionne correctement
Exemples de Code
Chaque concept inclut des implémentations en TypeScript et Python :
// Exemple TypeScript
interface Message {
id: string;
content: string;
}
# Exemple Python
@dataclass
class Message:
id: str
content: str
Choisissez le langage avec lequel vous êtes le plus à l’aise, ou apprenez les deux !
Avant de Commencer
1. Configurez Votre Environnement
Suivez le Guide de Configuration Docker pour installer :
- Docker et Docker Compose
- Votre langage de programmation préféré (TypeScript ou Python)
2. Vérifiez Votre Installation
docker --version
docker-compose --version
3. Choisissez Votre Langage
Décidez si vous travaillerez avec TypeScript ou Python tout au long du cours. Les deux langages ont des exemples complets pour chaque concept.
Conseils d’Apprentissage
- Ne vous précipitez pas : Chaque concept s’appuie sur les précédents
- Exécutez le code : Suivez les exemples dans votre terminal
- Expérimentez : Modifiez le code et observez ce qui se passe
- Posez des questions : Utilisez le guide de dépannage quand vous êtes bloqué
- Construisez en public : Partagez votre progression et apprenez des autres
Ce que Vous Construirez
À la fin de ce cours, vous aurez quatre systèmes distribués fonctionnels :
- Système de File - Un système de traitement des tâches tolérant aux pannes
- Magasin Répliqué - Un magasin clé-valeur avec élection de leader
- Système de Chat - Un système de messagerie en temps réel avec présence
- Système de Consensus - Une base de données distribuée basée sur Raft
Tous les systèmes fonctionnent localement en utilisant Docker Compose — aucune infrastructure cloud n’est requise !
Commençons !
Prêt à plonger ? Continuez vers Chapitre 1 : Qu’est-ce qu’un Système Distribué ?
Qu’est-ce qu’un Système Distribué ?
Session 1, Partie 1 - 20 minutes
Objectifs d’Apprentissage
- Définir ce qu’est un système distribué
- Identifier les caractéristiques clés des systèmes distribués
- Comprendre pourquoi les systèmes distribués sont importants
- Reconnaître les systèmes distribués dans la vie quotidienne
Définition
Un système distribué (distributed system) est une collection d’ordinateurs indépendants qui apparaît à ses utilisateurs comme un système cohérent unique.
graph TB
subgraph "Utilisateurs Voient"
Single["Système Unique"]
end
subgraph "Réalité"
N1["Nœud 1"]
N2["Nœud 2"]
N3["Nœud 3"]
N4["Nœud N"]
N1 <--> N2
N2 <--> N3
N3 <--> N4
N4 <--> N1
end
Single -->|"apparaît comme"| N1
Single -->|"apparaît comme"| N2
Single -->|"apparaît comme"| N3
Idée Clé
La caractéristique déterminante est l’illusion d’unité — les utilisateurs interagissent avec ce qui semble être un seul système, tandis qu’en coulisses, plusieurs machines travaillent ensemble.
Trois Caractéristiques Clés
Selon Leslie Lamport, un système distribué est :
“Un système dans lequel la défaillance d’un ordinateur dont vous ignoriez même l’existence peut rendre votre propre ordinateur inutilisable.”
Cette définition met en évidence trois caractéristiques fondamentales :
1. Concurrence (Plusieurs Choses Se Produisent En Même Temps)
Plusieurs composants s’exécutent simultanément, entraînant des interactions complexes.
sequenceDiagram
participant U as Utilisateur
participant A as Serveur A
participant B as Serveur B
participant C as Serveur C
U->>A: Requête
A->>B: Requête
A->>C: Mise à jour
B-->>A: Réponse
C-->>A: Accusé
A-->>U: Résultat
2. Pas d’Horloge Globale
Chaque nœud a sa propre horloge. Il n’y a pas de “maintenant” unique dans le système.
graph LR
A[Horloge A : 10:00:01.123]
B[Horloge B : 10:00:02.456]
C[Horloge C : 09:59:59.789]
A -.->|latence réseau| B
B -.->|latence réseau| C
C -.->|latence réseau| A
Implication : Vous ne pouvez pas compter sur les horodatages pour ordonner les événements entre les nœuds. Vous avez besoin d’horloges logiques (nous en reparlerons dans les prochaines sessions !).
3. Défaillance Indépendante
Les composants peuvent tomber en panne indépendamment. Lorsqu’une partie tombe en panne, le reste peut continuer — ou peut devenir inutilisable.
stateDiagram-v2
[*] --> TousSains: Démarrage Système
TousSains --> DéfaillancePartielle: Un Nœud Tombe en Panne
TousSains --> DéfaillanceComplète: Nœuds Critiques Tombent en Panne
DéfaillancePartielle --> TousSains: Récupération
DéfaillancePartielle --> DéfaillanceComplète: Défaillance en Cascade
DéfaillanceComplète --> [*]
Pourquoi des Systèmes Distribués ?
Extensibilité
Mise à l’échelle Verticale (Scale Up) :
- Ajouter plus de ressources à une seule machine
- Finit par atteindre les limites matérielles/coût
Mise à l’échelle Horizontale (Scale Out) :
- Ajouter plus de machines au système
- Potentiel d’extensibilité pratiquement illimité
graph TB
subgraph "Mise à l'échelle Verticale"
Big[Gros Serveur Coûteux<br/>100 000 $]
end
subgraph "Mise à l'échelle Horizontale"
S1[Serveur Standard<br/>1 000 $]
S2[Serveur Standard<br/>1 000 $]
S3[Serveur Standard<br/>1 000 $]
S4[...]
end
Big <--> S1
Big <--> S2
Big <--> S3
Fiabilité et Disponibilité
Un point unique de défaillance est inacceptable pour les services critiques :
graph TB
subgraph "Système Unique"
S[Serveur Unique]
S -.-> X[❌ Défaillance = Pas de Service]
end
subgraph "Système Distribué"
N1[Nœud 1]
N2[Nœud 2]
N3[Nœud 3]
N1 <--> N2
N2 <--> N3
N3 <--> N1
N1 -.-> X2[❌ Un Tombe en Panne]
X2 --> OK[✓ Les Autres Continuent]
end
Latence (Distribution Géographique)
Placer les données plus près des utilisateurs améliore l’expérience :
graph TB
User[Utilisateur à New York]
subgraph "Distribution Globale"
NYC[Centre de Données NYC<br/>latence 10ms]
LON[Centre de Données Londres<br/>latence 70ms]
TKY[Centre de Données Tokyo<br/>latence 150ms]
end
User --> NYC
User -.-> LON
User -.-> TKY
NYC <--> LON
LON <--> TKY
TKY <--> NYC
Exemples de Systèmes Distribués
Exemples Quotidiens
| Système | Description | Avantage |
|---|---|---|
| Recherche Web | Serveurs de requêtes, serveurs d’index, serveurs de cache | Réponses rapides, toujours disponibles |
| Vidéo en Streaming | Réseaux de diffusion de contenu (CDNs) | Faible latence, haute qualité |
| Achats en Ligne | Catalogue de produits, panier, paiement, inventaire | Gère les pics de trafic |
| Réseaux Sociaux | Publications, commentaires, j’aime, notifications | Mises à jour en temps réel |
Exemples Techniques
Réplication de Base de Données :
graph LR
W[Écrire sur le Primaire] --> P[(DB Primaire)]
P --> R1[(Réplique 1)]
P --> R2[(Réplique 2)]
P --> R3[(Réplique 3)]
R1 --> Read1[Lire depuis la Réplique]
R2 --> Read2[Lire depuis la Réplique]
R3 --> Read3[Lire depuis la Réplique]
Répartition de Charge :
graph TB
Users[Utilisateurs]
LB[Répartiteur de Charge]
Users --> LB
LB --> S1[Serveur 1]
LB --> S2[Serveur 2]
LB --> S3[Serveur 3]
LB --> S4[Serveur N]
Compromis
Les systèmes distribués introduisent de la complexité :
| Défi | Description |
|---|---|
| Problèmes Réseau | Non fiable, latence variable, partitions |
| Concurrence | Conditions de course, interblocages, coordination |
| Défaillances Partielles | Certains composants fonctionnent, d’autres non |
| Cohérence | Garder les données synchronisées entre les nœuds |
Le Dilemme Fondamental :
“Les avantages de la distribution valent-ils la complexité ajoutée ?”
Pour la plupart des applications modernes, la réponse est oui — c’est pourquoi nous apprenons ceci !
Résumé
Points Clés à Retenir
- Systèmes distribués = plusieurs ordinateurs agissant comme un seul
- Trois caractéristiques : concurrence, pas d’horloge globale, défaillance indépendante
- Avantages : extensibilité, fiabilité, latence réduite
- Coûts : complexité, problèmes réseau, défis de cohérence
Vérifiez Votre Compréhension
- Pouvez-vous expliquer pourquoi il n’y a pas d’horloge globale dans un système distribué ?
- Donnez un exemple de système distribué que vous utilisez quotidiennement
- Pourquoi la défaillance indépendante rend-elle les systèmes distribués plus difficiles à construire ?
🧠 Quiz du Chapitre
Testez votre maîtrise de ces concepts ! Ces questions mettront au défi votre compréhension et révéleront les lacunes dans vos connaissances.
Suite
Maintenant que nous comprenons ce que sont les systèmes distribués, explorons comment ils communiquent : Passage de Messages
Passage de Messages
Session 1, Partie 2 - 25 minutes
Objectifs d’Apprentissage
- Comprendre le passage de messages comme modèle fondamental dans les systèmes distribués
- Distinguer entre la messagerie synchrone et asynchrone
- Apprendre les différentes garanties de livraison de messages
- Implémenter le passage de messages de base en TypeScript et Python
Qu’est-ce que le Passage de Messages ?
Dans les systèmes distribués, le passage de messages (message passing) est la façon dont les nœuds communiquent. Au lieu de la mémoire partagée ou des appels de fonction directs, les composants s’envoient des messages sur le réseau.
graph LR
A[Nœud A]
B[Nœud B]
M[Message]
A -->|envoyer| M
M -->|réseau| B
B -->|traiter| M
Idée Clé
“Dans les systèmes distribués, la communication n’est pas un appel de fonction — c’est une requête envoyée sur un réseau non fiable.”
Ce simple fait a des implications profondes sur tout ce que nous construisons.
Synchrone vs Asynchrone
Messagerie Synchrone (Requête-Réponse)
L’expéditeur attend une réponse avant de continuer.
sequenceDiagram
participant C as Client
participant S as Serveur
C->>S: Requête
Note over C: En attente...
S-->>C: Réponse
Note over C: Continuer
Caractéristiques :
- Simple à comprendre et à implémenter
- L’appelant est bloqué pendant l’appel
- Gestion des erreurs plus facile (retour immédiat)
- Peut entraîner de mauvaises performances et des défaillances en cascade
Messagerie Asynchrone (Fire-and-Forget)
L’expéditeur continue sans attendre de réponse.
sequenceDiagram
participant P as Producteur
participant Q as File
participant W as Worker
P->>Q: Envoyer Message
Note over P: Continuer immédiatement
Q->>W: Traiter Plus Tard
Note over W: Travail en cours...
W-->>P: Résultat (optionnel)
Caractéristiques :
- Non bloquant, meilleur débit
- Gestion des erreurs plus complexe
- Nécessite des ID de corrélation pour suivre les requêtes
- Permet un couplage souple entre les composants
Garanties de Livraison des Messages
Trois Sémantiques de Livraison
graph TB
subgraph "Au Plus Une Fois"
A1[Envoyer] --> A2[Peut être perdu]
A2 --> A3[Jamais dupliqué]
end
subgraph "Au Moins Une Fois"
B1[Envoyer] --> B2[Réessayer jusqu'à accusé]
B2 --> B3[Peut être dupliqué]
end
subgraph "Exactement Une Fois"
C1[Envoyer] --> C2[Déduplication]
C2 --> C3[Livraison parfaite]
end
Comparaison
| Garantie | Description | Coût | Cas d’Usage |
|---|---|---|---|
| Au Plus Une Fois | Le message peut être perdu, jamais dupliqué | Le plus bas | Journaux, métriques, données non critiques |
| Au Moins Une Fois | Le message garanti d’arriver, peut être dupliqué | Moyen | Notifications, files de tâches |
| Exactement Une Fois | Livraison parfaite, pas de doublons | Le plus élevé | Transactions financières, paiements |
Le Problème des Deux Généraux
Une preuve classique que la communication parfaite est impossible dans les réseaux non fiables :
graph LR
A[Général A<br/>Ville 1]
B[Général B<br/>Ville 2]
A -->|"Attaque à 20h ?"| B
B -->|"Acc : reçu"| A
A -->|"Acc : accusé reçu"| B
B -->|"Acc : accusé de l'accusé reçu"| A
Note[A : messages infinis nécessaires]
Implication : Vous ne pouvez jamais être certain à 100 % qu’un message a été reçu sans accusés infinis.
En pratique, nous acceptons l’incertitude et concevons des systèmes qui la tolèrent.
Modèles d’Architecture
Communication Directe
graph LR
A[Service A] --> B[Service B]
A --> C[Service C]
B --> D[Service D]
C --> D
- Simple, direct
- Couplage fort
- Difficile à faire évoluer indépendamment
File de Messages (Communication Indirecte)
graph TB
P[Producteur 1] --> Q[File de Messages]
P2[Producteur 2] --> Q
P3[Producteur N] --> Q
Q --> W1[Worker 1]
Q --> W2[Worker 2]
Q --> W3[Worker N]
- Couplage souple
- Facile à faire évoluer
- Met en tampon les requêtes pendant les pics de trafic
- Permet les nouvelles tentatives et la gestion des erreurs
Exemples d’Implémentation
TypeScript : HTTP (Synchrone)
// server.ts
import http from 'http';
const server = http.createServer((req, res) => {
if (req.method === 'POST' && req.url === '/message') {
let body = '';
req.on('data', chunk => body += chunk);
req.on('end', () => {
const message = JSON.parse(body);
console.log('Received:', message);
// Renvoyer la réponse (synchrone)
res.writeHead(200);
res.end(JSON.stringify({ status: 'processed', id: message.id }));
});
}
});
server.listen(3000, () => console.log('Server on :3000'));
// client.ts
import http from 'http';
function sendMessage(data: any): Promise<any> {
return new Promise((resolve, reject) => {
const postData = JSON.stringify(data);
const options = {
hostname: 'localhost',
port: 3000,
method: 'POST',
path: '/message',
headers: { 'Content-Type': 'application/json' }
};
const req = http.request(options, (res) => {
let body = '';
res.on('data', chunk => body += chunk);
res.on('end', () => resolve(JSON.parse(body)));
});
req.on('error', reject);
req.write(postData);
req.end();
});
}
// Usage : attend la réponse
sendMessage({ id: '1', content: 'Hello' })
.then(response => console.log('Got:', response));
Python : HTTP (Synchrone)
# server.py
from http.server import HTTPServer, BaseHTTPRequestHandler
import json
class MessageHandler(BaseHTTPRequestHandler):
def do_POST(self):
if self.path == '/message':
content_length = int(self.headers['Content-Length'])
post_data = self.rfile.read(content_length)
message = json.loads(post_data.decode())
print(f"Received: {message}")
# Renvoyer la réponse (synchrone)
response = json.dumps({'status': 'processed', 'id': message['id']})
self.send_response(200)
self.send_header('Content-Type', 'application/json')
self.end_headers()
self.wfile.write(response.encode())
server = HTTPServer(('localhost', 3000), MessageHandler)
print("Server on :3000")
server.serve_forever()
# client.py
import requests
import json
def send_message(data):
# Synchrone : attend la réponse
response = requests.post(
'http://localhost:3000/message',
json=data
)
return response.json()
# Usage
result = send_message({'id': '1', 'content': 'Hello'})
print(f"Got: {result}")
TypeScript : File Simple (Asynchrone)
// queue.ts
interface Message {
id: string;
data: any;
timestamp: number;
}
class MessageQueue {
private messages: Message[] = [];
private handlers: Map<string, (msg: Message) => void> = new Map();
publish(topic: string, data: any): string {
const message: Message = {
id: `${Date.now()}-${Math.random()}`,
data,
timestamp: Date.now()
};
this.messages.push(message);
console.log(`Published to ${topic}:`, message.id);
// Fire and forget - ne pas attendre le traitement
setImmediate(() => this.process(topic, message));
return message.id;
}
subscribe(topic: string, handler: (msg: Message) => void) {
this.handlers.set(topic, handler);
}
private process(topic: string, message: Message) {
const handler = this.handlers.get(topic);
if (handler) {
// Traiter de manière asynchrone - l'appelant n'attend pas
handler(message);
}
}
}
// Usage
const queue = new MessageQueue();
queue.subscribe('tasks', (msg) => {
console.log(`Processing task ${msg.id}:`, msg.data);
// Simuler un travail asynchrone
setTimeout(() => console.log(`Task ${msg.id} complete`), 1000);
});
// Publish retourne immédiatement - n'attend pas le traitement
const taskId = queue.publish('tasks', { type: 'email', to: 'user@example.com' });
console.log(`Task ${taskId} queued (not yet processed)`);
Python : File Simple (Asynchrone)
# queue.py
import time
import threading
from dataclasses import dataclass
from typing import Callable, Dict, Any
import uuid
@dataclass
class Message:
id: str
data: Any
timestamp: float
class MessageQueue:
def __init__(self):
self.messages = []
self.handlers: Dict[str, Callable[[Message], None]] = {}
self.lock = threading.Lock()
def publish(self, topic: str, data: Any) -> str:
message = Message(
id=f"{int(time.time()*1000)}-{uuid.uuid4().hex[:8]}",
data=data,
timestamp=time.time()
)
with self.lock:
self.messages.append(message)
print(f"Published to {topic}: {message.id}")
# Fire and forget - ne pas attendre le traitement
threading.Thread(
target=self._process,
args=(topic, message),
daemon=True
).start()
return message.id
def subscribe(self, topic: str, handler: Callable[[Message], None]):
self.handlers[topic] = handler
def _process(self, topic: str, message: Message):
handler = self.handlers.get(topic)
if handler:
# Traiter de manière asynchrone - l'appelant n'attend pas
handler(message)
# Usage
queue = MessageQueue()
def handle_task(msg: Message):
print(f"Processing task {msg.id}: {msg.data}")
# Simuler un travail asynchrone
time.sleep(1)
print(f"Task {msg.id} complete")
queue.subscribe('tasks', handle_task)
# Publish retourne immédiatement - n'attend pas le traitement
task_id = queue.publish('tasks', {'type': 'email', 'to': 'user@example.com'})
print(f"Task {task_id} queued (not yet processed)")
# Garder le thread principal en vie pour voir le traitement
time.sleep(2)
Modèles de Messages Courants
Requête-Réponse
// Appeler et attendre la réponse
const answer = await ask(question);
Fire-and-Forget
// Envoyer et continuer
notify(user);
Publier-S’Abonner
// Plusieurs récepteurs, un expéditeur
broker.publish('events', data);
Requête-Réponse (avec Corrélation)
// Envoyer la requête, obtenir la réponse plus tard
const replyTo = createReplyQueue();
broker.send(request, { replyTo });
// ... plus tard
const reply = await replyTo.receive();
Gestion des Erreurs
Le passage de messages sur les réseaux n’est pas fiable. Problèmes courants :
| Erreur | Cause | Stratégie de Gestion |
|---|---|---|
| Délai d’attente | Pas de réponse, réseau lent | Réessayer avec attente progressive |
| Connexion Refusée | Service indisponible | Disjoncteur, mettre en file pour plus tard |
| Message Perdu | Défaillance du réseau | Accusés de réception, nouvelles tentatives |
| Duplication | Nouvelle tentative après accusé lent | Opérations idempotentes |
Modèle de Nouvelle Tentative
async function sendMessageWithRetry(
message: any,
maxRetries = 3
): Promise<any> {
for (let attempt = 1; attempt <= maxRetries; attempt++) {
try {
return await sendMessage(message);
} catch (error) {
if (attempt === maxRetries) throw error;
// Attente exponentielle : 100ms, 200ms, 400ms
const delay = 100 * Math.pow(2, attempt - 1);
await new Promise(r => setTimeout(r, delay));
console.log(`Retry ${attempt}/${maxRetries}`);
}
}
}
Résumé
Points Clés à Retenir
- Passage de messages = comment les systèmes distribués communiquent
- Synchrone = attendre la réponse ; Asynchrone = fire and forget
- Garanties de livraison : au-plus-une-fois, au-moins-une-fois, exactement-une-fois
- Le réseau n’est pas fiable - concevez pour les défaillances et les nouvelles tentatives
- Choisissez le bon modèle pour votre cas d’usage
Vérifiez Votre Compréhension
- Quand utiliseriez-vous la messagerie synchrone vs asynchrone ?
- Quelle est la différence entre au-moins-une-fois et exactement-une-fois ?
- Pourquoi la communication parfaite est-elle impossible dans les systèmes distribués ?
🧠 Quiz du Chapitre
Testez votre maîtrise de ces concepts ! Ces questions mettront au défi votre compréhension et révéleront les lacunes dans vos connaissances.
Suite
Appliquons maintenant le passage de messages pour construire notre premier système distribué : Implémentation du Système de File
Implémentation du Système de File
Session 2 - Session complète (90 minutes)
Objectifs d’Apprentissage
- Comprendre le modèle producteur-consommateur
- Construire un système de file fonctionnel avec des workers concurrents
- Implémenter la tolérance aux pannes avec une logique de nouvelle tentative
- Déployer et tester le système avec Docker Compose
Le Modèle Producteur-Consommateur
Le modèle producteur-consommateur (producer-consumer pattern) est un modèle fondamental des systèmes distribués où :
- Les Producteurs créent et envoient des tâches à une file
- La File met en tampon les tâches entre les producteurs et les consommateurs
- Les Workers (consommateurs) traitent les tâches de la file
graph TB
subgraph "Producteurs"
P1[Producteur 1<br/>Serveur API]
P2[Producteur 2<br/>Planificateur]
P3[Producteur N<br/>Webhook]
end
subgraph "File"
Q[File de Messages<br/>Tampon de Tâches]
end
subgraph "Workers"
W1[Worker 1<br/>Processus]
W2[Worker 2<br/>Processus]
W3[Worker 3<br/>Processus]
end
P1 --> Q
P2 --> Q
P3 --> Q
Q --> W1
Q --> W2
Q --> W3
style Q fill:#f9f,stroke:#333,stroke-width:4px
Avantages Clés
| Avantage | Explication |
|---|---|
| Découplage | Les producteurs n’ont pas besoin de connaître les workers |
| Mise en Tampon | La file gère les pics de trafic |
| Extensibilité | Ajoutez/supprimez des workers indépendamment |
| Fiabilité | Les tâches persistent si les workers tombent en panne |
| Nouvelle Tentative | Les tâches échouées peuvent être remises en file |
Architecture du Système
Vue Complète du Système
sequenceDiagram
participant C as Client
participant P as Producteur
participant Q as File
participant W as Worker
participant DB as Magasin de Résultats
C->>P: HTTP POST /task
P->>Q: Mettre en File Tâche
Q-->>P: ID de Tâche
P-->>C: 202 Accepté
Note over Q,W: Traitement Asynchrone
Q->>W: Récupérer Tâche
W->>W: Traiter Tâche
W->>DB: Sauvegarder Résultat
W->>Q: Ack (Succès)
Q->>Q: Supprimer Tâche
Cycle de Vie d’une Tâche
stateDiagram-v2
[*] --> EnAttente: Création par le Producteur
EnAttente --> EnCours: Récupération par le Worker
EnCours --> Terminé: Succès
EnCours --> Échoué: Erreur
EnCours --> EnAttente: Nouvelle Tentative
Échoué --> EnAttente: Nombre max de nouvelles tentatives non atteint
Échoué --> LettreMorte: Nombre max de nouvelles tentatives atteint
Terminé --> [*]
LettreMorte --> [*]
Implémentation
Modèles de Données
Définition de Tâche :
interface Task {
id: string;
type: string; // 'email', 'image', 'report', etc.
payload: any;
status: 'pending' | 'processing' | 'completed' | 'failed';
createdAt: number;
retries: number;
maxRetries: number;
result?: any;
error?: string;
}
from dataclasses import dataclass, field
from typing import Any, Optional
@dataclass
class Task:
id: str
type: str # 'email', 'image', 'report', etc.
payload: Any
status: str = 'pending' # pending, processing, completed, failed
created_at: float = field(default_factory=time.time)
retries: int = 0
max_retries: int = 3
result: Optional[Any] = None
error: Optional[str] = None
Implémentation TypeScript
Structure du Projet
queue-system-ts/
├── package.json
├── docker-compose.yml
├── src/
│ ├── queue.ts # Implémentation de la file
│ ├── producer.ts # API du producteur
│ ├── worker.ts # Implémentation du worker
│ └── types.ts # Définitions de types
└── Dockerfile
Code TypeScript Complet
queue-system-ts/src/types.ts
export interface Task {
id: string;
type: string;
payload: any;
status: 'pending' | 'processing' | 'completed' | 'failed';
createdAt: number;
retries: number;
maxRetries: number;
result?: any;
error?: string;
}
export interface QueueMessage {
task: Task;
timestamp: number;
}
queue-system-ts/src/queue.ts
import { Task, QueueMessage } from './types';
export class Queue {
private pending: Task[] = [];
private processing: Map<string, Task> = new Map();
private completed: Task[] = [];
private failed: Task[] = [];
// Mettre en file une nouvelle tâche
enqueue(type: string, payload: any): string {
const task: Task = {
id: this.generateId(),
type,
payload,
status: 'pending',
createdAt: Date.now(),
retries: 0,
maxRetries: 3
};
this.pending.push(task);
console.log(`[Queue] Enqueued task ${task.id} (${type})`);
return task.id;
}
// Obtenir la prochaine tâche en attente (pour les workers)
dequeue(): Task | null {
if (this.pending.length === 0) return null;
const task = this.pending.shift()!;
task.status = 'processing';
this.processing.set(task.id, task);
console.log(`[Queue] Dequeued task ${task.id}`);
return task;
}
// Marquer la tâche comme terminée
complete(taskId: string, result?: any): void {
const task = this.processing.get(taskId);
if (!task) return;
task.status = 'completed';
task.result = result;
this.processing.delete(taskId);
this.completed.push(task);
console.log(`[Queue] Completed task ${taskId}`);
}
// Marquer la tâche comme échouée (réessayer si possible)
fail(taskId: string, error: string): void {
const task = this.processing.get(taskId);
if (!task) return;
task.retries++;
task.error = error;
if (task.retries >= task.maxRetries) {
task.status = 'failed';
this.processing.delete(taskId);
this.failed.push(task);
console.log(`[Queue] Task ${taskId} failed permanently after ${task.retries} retries`);
} else {
task.status = 'pending';
this.processing.delete(taskId);
this.pending.push(task);
console.log(`[Queue] Task ${taskId} failed, retrying (${task.retries}/${task.maxRetries})`);
}
}
// Obtenir les statistiques de la file
getStats() {
return {
pending: this.pending.length,
processing: this.processing.size,
completed: this.completed.length,
failed: this.failed.length
};
}
private generateId(): string {
return `task-${Date.now()}-${Math.random().toString(36).substr(2, 9)}`;
}
}
queue-system-ts/src/producer.ts
import http from 'http';
import { Queue } from './queue';
const queue = new Queue();
const server = http.createServer((req, res) => {
if (req.method === 'POST' && req.url === '/task') {
let body = '';
req.on('data', chunk => body += chunk);
req.on('end', () => {
try {
const { type, payload } = JSON.parse(body);
if (!type || !payload) {
res.writeHead(400);
res.end(JSON.stringify({ error: 'type and payload required' }));
return;
}
const taskId = queue.enqueue(type, payload);
res.writeHead(202); // Accepted
res.end(JSON.stringify({
taskId,
message: 'Task enqueued',
stats: queue.getStats()
}));
} catch (error) {
res.writeHead(400);
res.end(JSON.stringify({ error: 'Invalid JSON' }));
}
});
} else if (req.method === 'GET' && req.url === '/stats') {
res.writeHead(200);
res.end(JSON.stringify(queue.getStats()));
} else {
res.writeHead(404);
res.end(JSON.stringify({ error: 'Not found' }));
}
});
const PORT = process.env.PORT || 3000;
server.listen(PORT, () => {
console.log(`Producer API listening on port ${PORT}`);
});
export { queue };
queue-system-ts/src/worker.ts
import http from 'http';
import { Queue, Task } from './types';
// Simuler le traitement de tâches
async function processTask(task: Task): Promise<any> {
console.log(`[Worker] Processing task ${task.id} (${task.type})`);
// Simuler le travail
await new Promise(resolve => setTimeout(resolve, 1000 + Math.random() * 2000));
// Simuler des échocs occasionnels (20% de chance)
if (Math.random() < 0.2) {
throw new Error('Random processing error');
}
// Traiter en fonction du type de tâche
switch (task.type) {
case 'email':
return { sent: true, to: task.payload.to };
case 'image':
return { processed: true, url: task.payload.url };
case 'report':
return { generated: true, format: 'pdf' };
default:
return { result: 'processed' };
}
}
class Worker {
private id: string;
private queueUrl: string;
private running: boolean = false;
constructor(id: string, queueUrl: string) {
this.id = id;
this.queueUrl = queueUrl;
}
async start(): Promise<void> {
this.running = true;
console.log(`[Worker ${this.id}] Started`);
while (this.running) {
try {
await this.processNextTask();
} catch (error) {
console.error(`[Worker ${this.id}] Error:`, error);
await this.sleep(1000); // Attendre avant de réessayer
}
}
}
private async processNextTask(): Promise<void> {
// Récupérer la tâche de la file
const task = await this.fetchTask();
if (!task) {
await this.sleep(1000); // Pas de tâche, attendre
return;
}
try {
// Traiter la tâche
const result = await processTask(task);
// Marquer comme terminée
await this.completeTask(task.id, result);
} catch (error: any) {
// Marquer comme échouée
await this.failTask(task.id, error.message);
}
}
private async fetchTask(): Promise<Task | null> {
return new Promise((resolve, reject) => {
http.get(`${this.queueUrl}/dequeue`, (res) => {
let body = '';
res.on('data', chunk => body += chunk);
res.on('end', () => {
if (res.statusCode === 204) {
resolve(null); // Aucune tâche disponible
} else if (res.statusCode === 200) {
resolve(JSON.parse(body));
} else {
reject(new Error(`Unexpected status: ${res.statusCode}`));
}
});
}).on('error', reject);
});
}
private async completeTask(taskId: string, result: any): Promise<void> {
return new Promise((resolve, reject) => {
const data = JSON.stringify({ result });
http.request({
hostname: 'localhost',
port: 3000,
path: `/complete/${taskId}`,
method: 'POST',
headers: { 'Content-Type': 'application/json', 'Content-Length': data.length }
}, (res) => {
if (res.statusCode === 200) {
resolve();
} else {
reject(new Error(`Failed to complete task: ${res.statusCode}`));
}
}).on('error', reject).end(data);
});
}
private async failTask(taskId: string, error: string): Promise<void> {
return new Promise((resolve, reject) => {
const data = JSON.stringify({ error });
http.request({
hostname: 'localhost',
port: 3000,
path: `/fail/${taskId}`,
method: 'POST',
headers: { 'Content-Type': 'application/json', 'Content-Length': data.length }
}, (res) => {
if (res.statusCode === 200) {
resolve();
} else {
reject(new Error(`Failed to fail task: ${res.statusCode}`));
}
}).on('error', reject).end(data);
});
}
private sleep(ms: number): Promise<void> {
return new Promise(resolve => setTimeout(resolve, ms));
}
stop(): void {
this.running = false;
}
}
// Démarrer le worker
const workerId = process.env.WORKER_ID || 'worker-1';
const worker = new Worker(workerId, 'http://localhost:3000');
worker.start();
Implémentation Python
Structure du Projet
queue-system-py/
├── requirements.txt
├── docker-compose.yml
├── src/
│ ├── queue.py # Implémentation de la file
│ ├── producer.py # API du producteur
│ └── worker.py # Implémentation du worker
└── Dockerfile
Code Python Complet
queue-system-py/src/queue.py
import time
import uuid
from dataclasses import dataclass, field
from typing import Any, Optional, List, Dict
from enum import Enum
class TaskStatus(Enum):
PENDING = 'pending'
PROCESSING = 'processing'
COMPLETED = 'completed'
FAILED = 'failed'
@dataclass
class Task:
id: str
type: str
payload: Any
status: str = TaskStatus.PENDING.value
created_at: float = field(default_factory=time.time)
retries: int = 0
max_retries: int = 3
result: Optional[Any] = None
error: Optional[str] = None
class Queue:
def __init__(self):
self.pending: List[Task] = []
self.processing: Dict[str, Task] = {}
self.completed: List[Task] = []
self.failed: List[Task] = []
def enqueue(self, task_type: str, payload: Any) -> str:
"""Mettre en file une nouvelle tâche."""
task = Task(
id=f"task-{int(time.time()*1000)}-{uuid.uuid4().hex[:8]}",
type=task_type,
payload=payload
)
self.pending.append(task)
print(f"[Queue] Enqueued task {task.id} ({task_type})")
return task.id
def dequeue(self) -> Optional[Task]:
"""Obtenir la prochaine tâche en attente."""
if not self.pending:
return None
task = self.pending.pop(0)
task.status = TaskStatus.PROCESSING.value
self.processing[task.id] = task
print(f"[Queue] Dequeued task {task.id}")
return task
def complete(self, task_id: str, result: Any = None) -> None:
"""Marquer la tâche comme terminée."""
task = self.processing.pop(task_id, None)
if not task:
return
task.status = TaskStatus.COMPLETED.value
task.result = result
self.completed.append(task)
print(f"[Queue] Completed task {task_id}")
def fail(self, task_id: str, error: str) -> None:
"""Marquer la tâche comme échouée (réessayer si possible)."""
task = self.processing.pop(task_id, None)
if not task:
return
task.retries += 1
task.error = error
if task.retries >= task.max_retries:
task.status = TaskStatus.FAILED.value
self.failed.append(task)
print(f"[Queue] Task {task_id} failed permanently after {task.retries} retries")
else:
task.status = TaskStatus.PENDING.value
self.pending.append(task)
print(f"[Queue] Task {task_id} failed, retrying ({task.retries}/{task.max_retries})")
def get_stats(self) -> Dict[str, int]:
"""Obtenir les statistiques de la file."""
return {
'pending': len(self.pending),
'processing': len(self.processing),
'completed': len(self.completed),
'failed': len(self.failed)
}
queue-system-py/src/producer.py
from http.server import HTTPServer, BaseHTTPRequestHandler
import json
from queue import Queue
queue = Queue()
class ProducerHandler(BaseHTTPRequestHandler):
def do_POST(self):
if self.path == '/task':
content_length = int(self.headers['Content-Length'])
post_data = self.rfile.read(content_length)
try:
data = json.loads(post_data.decode())
task_type = data.get('type')
payload = data.get('payload')
if not task_type or not payload:
self.send_error(400, 'type and payload required')
return
task_id = queue.enqueue(task_type, payload)
response = json.dumps({
'taskId': task_id,
'message': 'Task enqueued',
'stats': queue.get_stats()
})
self.send_response(202) # Accepted
self.send_header('Content-Type', 'application/json')
self.end_headers()
self.wfile.write(response.encode())
except json.JSONDecodeError:
self.send_error(400, 'Invalid JSON')
def do_GET(self):
if self.path == '/stats':
response = json.dumps(queue.get_stats())
self.send_response(200)
self.send_header('Content-Type', 'application/json')
self.end_headers()
self.wfile.write(response.encode())
def log_message(self, format, *args):
pass # Supprimer la journalisation par défaut
if __name__ == '__main__':
import os
port = int(os.environ.get('PORT', 3000))
server = HTTPServer(('0.0.0.0', port), ProducerHandler)
print(f"Producer API listening on port {port}")
server.serve_forever()
queue-system-py/src/worker.py
import os
import time
import random
import requests
from typing import Optional, Dict, Any
from queue import Task
# Simuler le traitement de tâches
def process_task(task: Task) -> Any:
print(f"[Worker] Processing task {task.id} ({task.type})")
# Simuler le travail
time.sleep(1 + random.random() * 2)
# Simuler des échecs occasionnels (20% de chance)
if random.random() < 0.2:
raise Exception('Random processing error')
# Traiter en fonction du type de tâche
if task.type == 'email':
return {'sent': True, 'to': task.payload.get('to')}
elif task.type == 'image':
return {'processed': True, 'url': task.payload.get('url')}
elif task.type == 'report':
return {'generated': True, 'format': 'pdf'}
else:
return {'result': 'processed'}
class Worker:
def __init__(self, worker_id: str, queue_url: str):
self.id = worker_id
self.queue_url = queue_url
self.running = False
def start(self):
"""Démarrer la boucle du worker."""
self.running = True
print(f"[Worker {self.id}] Started")
while self.running:
try:
self.process_next_task()
except Exception as e:
print(f"[Worker {self.id}] Error: {e}")
time.sleep(1)
def process_next_task(self):
"""Récupérer et traiter la prochaine tâche."""
task = self.fetch_task()
if not task:
time.sleep(1) # Pas de tâche, attendre
return
try:
result = process_task(task)
self.complete_task(task['id'], result)
except Exception as e:
self.fail_task(task['id'], str(e))
def fetch_task(self) -> Optional[Dict]:
"""Récupérer la prochaine tâche de la file."""
try:
response = requests.get(f"{self.queue_url}/dequeue", timeout=5)
if response.status_code == 204:
return None # Aucune tâche
return response.json()
except requests.RequestException:
return None
def complete_task(self, task_id: str, result: Any):
"""Marquer la tâche comme terminée."""
requests.post(
f"{self.queue_url}/complete/{task_id}",
json={'result': result},
timeout=5
)
def fail_task(self, task_id: str, error: str):
"""Marquer la tâche comme échouée."""
requests.post(
f"{self.queue_url}/fail/{task_id}",
json={'error': error},
timeout=5
)
def stop(self):
"""Arrêter le worker."""
self.running = False
if __name__ == '__main__':
worker_id = os.environ.get('WORKER_ID', 'worker-1')
queue_url = os.environ.get('QUEUE_URL', 'http://localhost:3000')
worker = Worker(worker_id, queue_url)
worker.start()
Configuration Docker Compose
Version TypeScript (docker-compose.yml)
version: '3.8'
services:
producer:
build: ./src
ports:
- "3000:3000"
environment:
- PORT=3000
volumes:
- ./src:/app/src
command: npm run start:producer
worker-1:
build: ./src
environment:
- WORKER_ID=worker-1
depends_on:
- producer
volumes:
- ./src:/app/src
command: npm run start:worker
worker-2:
build: ./src
environment:
- WORKER_ID=worker-2
depends_on:
- producer
volumes:
- ./src:/app/src
command: npm run start:worker
worker-3:
build: ./src
environment:
- WORKER_ID=worker-3
depends_on:
- producer
volumes:
- ./src:/app/src
command: npm run start:worker
Dockerfile TypeScript
FROM node:18-alpine
WORKDIR /app
COPY package*.json ./
RUN npm install
COPY . .
CMD ["npm", "run", "start:producer"]
Version Python (docker-compose.yml)
version: '3.8'
services:
producer:
build: ./src
ports:
- "3000:3000"
environment:
- PORT=3000
volumes:
- ./src:/app/src
command: python src/producer.py
worker-1:
build: ./src
environment:
- WORKER_ID=worker-1
- QUEUE_URL=http://producer:3000
depends_on:
- producer
volumes:
- ./src:/app/src
command: python src/worker.py
worker-2:
build: ./src
environment:
- WORKER_ID=worker-2
- QUEUE_URL=http://producer:3000
depends_on:
- producer
volumes:
- ./src:/app/src
command: python src/worker.py
worker-3:
build: ./src
environment:
- WORKER_ID=worker-3
- QUEUE_URL=http://producer:3000
depends_on:
- producer
volumes:
- ./src:/app/src
command: python src/worker.py
Dockerfile Python
FROM python:3.11-alpine
WORKDIR /app
COPY requirements.txt ./
RUN pip install --no-cache-dir -r requirements.txt
COPY . .
CMD ["python", "src/producer.py"]
Exécution de l’Exemple
Étape 1 : Démarrer le Système
cd examples/01-queue
docker-compose up --build
Vous devriez voir une sortie comme :
producer | Producer API listening on port 3000
worker-1 | [Worker worker-1] Started
worker-2 | [Worker worker-2] Started
worker-3 | [Worker worker-3] Started
Étape 2 : Soumettre des Tâches
Ouvrez un nouveau terminal et soumettez quelques tâches :
# Soumettre une tâche email
curl -X POST http://localhost:3000/task \
-H "Content-Type: application/json" \
-d '{"type": "email", "payload": {"to": "user@example.com", "subject": "Hello"}}'
# Soumettre une tâche de traitement d'image
curl -X POST http://localhost:3000/task \
-H "Content-Type: application/json" \
-d '{"type": "image", "payload": {"url": "https://example.com/image.jpg"}}'
# Soumettre plusieurs tâches
for i in {1..10}; do
curl -X POST http://localhost:3000/task \
-H "Content-Type: application/json" \
-d "{\"type\": \"report\", \"payload\": {\"id\": $i}}"
done
Étape 3 : Observer le Traitement
Dans les journaux Docker, vous verrez :
worker-2 | [Queue] Dequeued task task-1234567890-abc123
worker-2 | [Worker] Processing task task-1234567890-abc123 (report)
worker-2 | [Queue] Completed task task-1234567890-abc123
Étape 4 : Vérifier les Statistiques
curl http://localhost:3000/stats
Réponse :
{
"pending": 5,
"processing": 3,
"completed": 12,
"failed": 0
}
Étape 5 : Tester la Tolérance aux Pannes
Arrêtez un worker :
docker-compose stop worker-1
Les tâches continuent d’être traitées par les workers restants. La file gère automatiquement la redistribution de la charge.
Exercices
Exercice 1 : Ajouter le Support des Priorités
Modifiez la file pour prendre en charge les tâches de priorité haute/normale/basse :
- Ajoutez un champ
priorityau modèle de Tâche - Modifiez
enqueue()pour trier les tâches en attente par priorité - Testez avec des tâches de priorité mixte
Exercice 2 : Implémenter une File des Lettres Mortes
Créez une file séparée pour les tâches définitivement échouées :
- Ajoutez une file
dead_letterpour stocker les tâches échouées - Ajoutez un point de terminaison API pour inspecter/réessayer les tâches de lettres mortes
- Journalisez les tâches échouées dans un fichier pour inspection manuelle
Exercice 3 : Ajouter la Planification de Tâches
Implémentez l’exécution différée des tâches :
- Ajoutez un horodatage
executeAtaux tâches - Modifiez les workers pour ignorer les tâches planifiées dans le futur
- Utilisez une minuterie/planificateur pour déplacer les tâches planifiées vers la file en attente
Résumé
Points Clés à Retenir
- Modèle producteur-consommateur découple la création de tâches du traitement
- Les files mettent en tampon les tâches et gèrent les pics de trafic
- Les workers évoluent indépendamment des producteurs
- La logique de nouvelle tentative fournit une tolérance aux pannes
- Docker Compose permet un déploiement local facile
Vérifiez Votre Compréhension
- Comment la file gère-t-elle les défaillances de workers ?
- Que se passe-t-il lorsqu’une tâche échoue et que le nombre max de nouvelles tentatives est atteint ?
- Pourquoi la file est-elle utile pour gérer les pics de trafic ?
- Comment ajouteriez-vous un nouveau type de worker (par exemple, un worker qui traite uniquement les emails) ?
🧠 Quiz du Chapitre
Testez votre maîtrise de ces concepts ! Ces questions mettront au défi votre compréhension et révéleront les lacunes dans vos connaissances.
Suite
Maintenant que nous avons construit un système de file, explorons comment partitionner les données sur plusieurs nœuds : Partitionnement des Données
Partitionnement des Données
Session 3, Partie 1 - 25 minutes
Objectifs d’Apprentissage
- Comprendre ce qu’est le partitionnement des données (sharding)
- Comparer le partitionnement basé sur le hachage vs par plage
- Apprendre comment le partitionnement affecte les performances des requêtes
- Reconnaître les compromis des différentes stratégies de partitionnement
Qu’est-ce que le Partitionnement ?
Le partitionnement des données (aussi appelé sharding) est le processus de répartition de vos données sur plusieurs nœuds basé sur une clé de partitionnement. Chaque nœud contient un sous-ensemble des données totales.
graph TB
subgraph "Vue de l'Application"
App["Votre Application"]
Data[("Toutes les Données")]
App --> Data
end
subgraph "Réalité : Stockage Partitionné"
Node1["Nœud 1<br/>Clés : user_1<br/>user_4<br/>user_7"]
Node2["Nœud 2<br/>Clés : user_2<br/>user_5<br/>user_8"]
Node3["Nœud 3<br/>Clés : user_3<br/>user_6<br/>user_9"]
end
App -->|"lecture/écriture"| Node1
App -->|"lecture/écriture"| Node2
App -->|"lecture/écriture"| Node3
style Node1 fill:#e1f5fe
style Node2 fill:#e1f5fe
style Node3 fill:#e1f5fe
Pourquoi Partitionner les Données ?
| Avantage | Description |
|---|---|
| Mise à l’échelle | Stocker plus de données que ce qui tient sur une seule machine |
| Performance | Distribuer la charge sur plusieurs nœuds |
| Disponibilité | La défaillance d’une partition n’affecte pas les autres |
Le Défi du Partitionnement
La question clé est : Comment décider quelles données vont sur quel nœud ?
graph LR
Key["user:12345"] --> Router{Fonction de<br/>Partitionnement}
Router -->|"hash(clé) % N"| N1[Nœud 1]
Router --> N2[Nœud 2]
Router --> N3[Nœud 3]
style Router fill:#ff9,stroke:#333,stroke-width:3px
Stratégies de Partitionnement
1. Partitionnement Basé sur le Hachage
Appliquer une fonction de hachage à la clé, puis modulo le nombre de nœuds :
nœud = hash(clé) % nombre_de_nœuds
graph TB
subgraph "Partitionnement Basé sur le Hachage (3 nœuds)"
Key1["user:alice"] --> H1["hash() % 3"]
Key2["user:bob"] --> H2["hash() % 3"]
Key3["user:carol"] --> H3["hash() % 3"]
H1 -->|"= 1"| N1[Nœud 1]
H2 -->|"= 2"| N2[Nœud 2]
H3 -->|"= 0"| N0[Nœud 0]
style N1 fill:#c8e6c9
style N2 fill:#c8e6c9
style N0 fill:#c8e6c9
end
Exemple TypeScript :
function getNode(key: string, totalNodes: number): number {
// Fonction de hachage simple
let hash = 0;
for (let i = 0; i < key.length; i++) {
hash = ((hash << 5) - hash) + key.charCodeAt(i);
hash = hash & hash; // Convertir en entier 32bit
}
return Math.abs(hash) % totalNodes;
}
// Exemples
console.log(getNode('user:alice', 3)); // => 1
console.log(getNode('user:bob', 3)); // => 2
console.log(getNode('user:carol', 3)); // => 0
Exemple Python :
def get_node(key: str, total_nodes: int) -> int:
"""Déterminer quel nœud doit stocker cette clé."""
hash_value = hash(key) # Fonction de hachage intégrée
return abs(hash_value) % total_nodes
# Exemples
print(get_node('user:alice', 3)) # => 1
print(get_node('user:bob', 3)) # => 2
print(get_node('user:carol', 3)) # => 0
Avantages :
- ✅ Distribution uniforme des données
- ✅ Simple à implémenter
- ✅ Pas de points chauds (en supposant une bonne fonction de hachage)
Désavantages :
- ❌ Ne permet pas des requêtes de plage efficaces
- ❌ Le rééquilibrage est coûteux lors de l’ajout/suppression de nœuds
2. Partitionnement Basé sur la Plage
Assigner des plages de clés à chaque nœud :
graph TB
subgraph "Partitionnement Basé sur la Plage (3 nœuds)"
R1["Nœud 1<br/>a-m"]
R2["Nœud 2<br/>n-S"]
R3["Nœud 3<br/>t-Z"]
Key1["alice"] --> R1
Key2["bob"] --> R1
Key3["nancy"] --> R2
Key4["steve"] --> R2
Key5["tom"] --> R3
Key6["zoe"] --> R3
style R1 fill:#c8e6c9
style R2 fill:#c8e6c9
style R3 fill:#c8e6c9
end
Exemple TypeScript :
interface Range {
start: string;
end: string;
node: number;
}
const ranges: Range[] = [
{ start: 'a', end: 'm', node: 1 },
{ start: 'n', end: 's', node: 2 },
{ start: 't', end: 'z', node: 3 }
];
function getNodeByRange(key: string): number {
for (const range of ranges) {
if (key >= range.start && key <= range.end) {
return range.node;
}
}
throw new Error(`Aucune plage trouvée pour la clé : ${key}`);
}
// Exemples
console.log(getNodeByRange('alice')); // => 1
console.log(getNodeByRange('nancy')); // => 2
console.log(getNodeByRange('tom')); // => 3
Exemple Python :
from typing import List, Tuple
ranges: List[Tuple[str, str, int]] = [
('a', 'm', 1),
('n', 's', 2),
('t', 'z', 3)
]
def get_node_by_range(key: str) -> int:
"""Déterminer quel nœud basé sur la plage de clés."""
for start, end, node in ranges:
if start <= key <= end:
return node
raise ValueError(f"Aucune plage trouvée pour la clé : {key}")
# Exemples
print(get_node_by_range('alice')) # => 1
print(get_node_by_range('nancy')) # => 2
print(get_node_by_range('tom')) # => 3
Avantages :
- ✅ Requêtes de plage efficaces
- ✅ Peut optimiser pour les modèles d’accès aux données
Désavantages :
- ❌ Distribution inégale (points chauds)
- ❌ Complexe à équilibrer la charge
Le Problème du Rééquilibrage
Que se passe-t-il lorsque vous ajoutez ou supprimez des nœuds ?
stateDiagram-v2
[*] --> Stable: 3 Nœuds
Stable --> Rééquilibrage: Ajouter Nœud 4
Rééquilibrage --> Stable: Déplacer 25% des données
Stable --> Rééquilibrage: Supprimer Nœud 2
Rééquilibrage --> Stable: Redistribuer les données
Problème du Hachage Modulo Simple
Avec hash(clé) % N, changer N de 3 à 4 signifie que la plupart des clés se déplacent vers différents nœuds :
| Clé | hash % 3 | hash % 4 | Déplacée ? |
|---|---|---|---|
| user:1 | 1 | 1 | ❌ |
| user:2 | 2 | 2 | ❌ |
| user:3 | 0 | 3 | ✅ |
| user:4 | 1 | 0 | ✅ |
| user:5 | 2 | 1 | ✅ |
| user:6 | 0 | 2 | ✅ |
75% des clés se sont déplacées !
Hachage Cohérent (Avancé)
Une technique pour minimiser le déplacement de données lorsque les nœuds changent :
graph TB
subgraph "Anneau de Hachage"
Ring["Anneau Virtuel (0 - 2^32)"]
N1["Nœud 1<br/>position : 100"]
N2["Nœud 2<br/>position : 500"]
N3["Nœud 3<br/>position : 900"]
K1["Clé A<br/>hash : 150"]
K2["Clé B<br/>hash : 600"]
K3["Clé C<br/>hash : 950"]
end
Ring --> N1
Ring --> N2
Ring --> N3
K1 -->|"sens horaire"| N2
K2 -->|"sens horaire"| N3
K3 -->|"sens horaire"| N1
style Ring fill:#f9f,stroke:#333,stroke-width:2px
Idée Clé : Chaque clé est assignée au premier nœud dans le sens horaire à partir de sa position de hachage.
Lors de l’ajout/suppression d’un nœud, seules les clés dans la plage de ce nœud se déplacent.
Modèles de Requêtes et Partitionnement
Vos modèles de requêtes devraient influencer votre stratégie de partitionnement :
Modèles de Requêtes Courants
| Type de Requête | Meilleur Partitionnement | Exemple |
|---|---|---|
| Recherches clé-valeur | Basé sur le hachage | Obtenir un utilisateur par ID |
| Analyses de plage | Basé sur la plage | Utilisateurs inscrits la semaine dernière |
| Accès multi-clés | Hachage composite | Commandes par client |
| Requêtes géographiques | Basé sur la localisation | Restaurants proches |
Exemple : Partitionnement des Données Utilisateur
graph TB
subgraph "Application : Réseau Social"
Query1["Obtenir le Profil Utilisateur<br/>SELECT * FROM users WHERE id = ?"]
Query2["Lister les Amis<br/>SELECT * FROM friends WHERE user_id = ?"]
Query3["Publications de Timeline<br/>SELECT * FROM posts WHERE created_at > ?"]
end
subgraph "Décision de Partitionnement"
Query1 -->|"hash(user_id)"| Hash[Hachage]
Query2 -->|"hash(user_id)"| Hash
Query3 -->|"range(created_at)"| Range[Plage]
end
subgraph "Résultat"
Hash --> H["Données utilisateur & amis<br/>partitionnées par user_id"]
Range --> R["Publications partitionnées<br/>par plage de dates"]
end
Résumé des Compromis
| Stratégie | Distribution | Requêtes de Plage | Rééquilibrage | Complexité |
|---|---|---|---|---|
| Basé sur le hachage | Uniforme | Pauvre | Coûteux | Faible |
| Basé sur la plage | Potentiellement inégale | Excellent | Modéré | Moyen |
| Hachage cohérent | Uniforme | Pauvre | Minimal | Élevé |
Exemples Réels
| Système | Stratégie de Partitionnement | Notes |
|---|---|---|
| Redis Cluster | Slots de hachage (16384 slots) | Hachage cohérent |
| Cassandra | Sensible aux jetons (anneau de hachage) | Partitionneur configurable |
| MongoDB | Plages de clés de sharding | Basé sur la plage sur la clé de sharding |
| DynamoDB | Hachage + plage (composite) | Supporte les clés composites |
| PostgreSQL | Pas natif | Utiliser des extensions comme Citus |
Résumé
Points Clés à Retenir
- Le partitionnement divise les données sur plusieurs nœuds pour la mise à l’échelle
- Le hachage donne une distribution uniforme mais de mauvaises requêtes de plage
- La plage permet les analyses de plage mais peut créer des points chauds
- Le rééquilibrage est un défi clé lorsque les nœuds changent
- Les modèles de requêtes devraient dicter votre stratégie de partitionnement
Vérifiez Votre Compréhension
- Pourquoi le partitionnement basé sur le hachage est-il meilleur pour une distribution uniforme ?
- Quand choisiriez-vous le partitionnement par plage plutôt que par hachage ?
- Qu’arrive-t-il au placement des données lorsque vous ajoutez un nouveau nœud avec le hachage modulo simple ?
- Comment le hachage cohérent minimise-t-il le déplacement de données ?
🧠 Quiz du Chapitre
Testez votre maîtrise de ces concepts ! Ces questions mettront au défi votre compréhension et révéleront toute lacune dans vos connaissances.
Et Ensuite
Maintenant que nous comprenons comment partitionner les données, explorons les compromis fondamentaux dans les systèmes de données distribués : Théorème CAP
Théorème CAP
Session 3, Partie 2 - 30 minutes
Objectifs d’Apprentissage
- Comprendre le théorème CAP et ses trois composantes
- Explorer les compromis entre Cohérence, Disponibilité et Tolérance aux Partitions
- Identifier les systèmes réels et leurs choix CAP
- Apprendre à appliquer la pensée CAP à la conception de systèmes
Qu’est-ce que le Théorème CAP ?
Le théorème CAP stipule qu’un magasin de données distribué ne peut fournir que deux des trois garanties suivantes :
graph TB
subgraph "Triangle CAP - Choisissez-en Deux"
C["Cohérence<br/>Chaque lecture reçoit<br/>l'écriture la plus récente"]
A["Disponibilité<br/>Chaque requête reçoit<br/>une réponse"]
P["Tolérance aux Partitions<br/>Le système opère<br/>malgré les défaillances réseau"]
end
C <--> A
A <--> P
P <--> C
style C fill:#ffcdd2
style A fill:#c8e6c9
style P fill:#bbdefb
Les Trois Composantes
1. Cohérence (C)
Chaque lecture reçoit l’écriture la plus récente ou une erreur.
Tous les nœuds voient les mêmes données au même moment. Si vous écrivez une valeur et la lisez immédiatement, vous obtenez la valeur que vous venez d’écrire.
sequenceDiagram
participant C as Client
participant N1 as Nœud 1
participant N2 as Nœud 2
participant N3 as Nœud 3
C->>N1: Écrire X = 10
N1->>N2: Répliquer X
N1->>N3: Répliquer X
N2-->>N1: Ack
N3-->>N1: Ack
N1-->>C: Écriture confirmée
Note over C,N3: Avant lecture...
C->>N2: Lire X
N2-->>C: X = 10 (plus récent)
Note over C,N3: Tous les nœuds sont d'accord !
Exemple : Un système bancaire où votre solde doit être précis sur toutes les agences.
2. Disponibilité (A)
Chaque requête reçoit une réponse (non-erreur), sans garantie qu’elle contient l’écriture la plus récente.
Le système reste opérationnel même lorsque certains nœuds échouent. Vous pouvez toujours lire et écrire, même si les données peuvent être obsolètes.
sequenceDiagram
participant C as Client
participant N1 as Nœud 1 (en vie)
participant N2 as Nœud 2 (mort)
C->>N1: Écrire X = 10
N1-->>C: Écriture confirmée
Note over C,N2: N2 est en panne mais N1 répond...
C->>N1: Lire X
N1-->>C: X = 10
Note over C,N2: Le système reste disponible !
Exemple : Un fil d’actualités sociales où montrer un contenu légèrement ancien est acceptable.
3. Tolérance aux Partitions (P)
Le système continue à opérer malgré un nombre arbitraire de messages étant abandonnés ou retardés par le réseau entre les nœuds.
Les partitions réseau sont inévitables dans les systèmes distribués. Le système doit les gérer avec grâce.
graph TB
subgraph "Partition Réseau"
N1["Nœud 1<br/>Ne peut atteindre N2, N3"]
N2["Nœud 2<br/>Ne peut atteindre N1"]
N3["Nœud 3<br/>Ne peut atteindre N1"]
end
N1 -.->|"🔴 Partition Réseau"| N2
N1 -.->|"🔴 Partition Réseau"| N3
N2 <--> N3
N2 <--> N3
style N1 fill:#ffcdd2
style N2 fill:#c8e6c9
style N3 fill:#c8e6c9
Aperçu Clé : Dans les systèmes distribués, P n’est pas optionnel — les partitions réseau ARRIVERONT.
Les Compromis
Puisque les partitions sont inévitables dans les systèmes distribués, le vrai choix est entre C et A pendant une partition :
stateDiagram-v2
[*] --> Normal
Normal --> Partitionné: Division Réseau
Partitionné --> CP: Choisir Cohérence
Partitionné --> AP: Choisir Disponibilité
CP --> Normal: Partition guérie
AP --> Normal: Partition guérie
note right of CP
Rejeter les écritures/lectures
jusqu'à la synchronisation des données
end note
note right of AP
Accepter les écritures/lectures
les données peuvent être obsolètes
end note
CP : Cohérence + Tolérance aux Partitions
Sacrifier la Disponibilité
Pendant une partition, le système retourne des erreurs ou bloque jusqu’à ce que la cohérence puisse être garantie.
sequenceDiagram
participant C as Client
participant N1 as Nœud 1 (primaire)
participant N2 as Nœud 2 (isolé)
Note over N1,N2: 🔴 Partition Réseau
C->>N1: Écrire X = 10
N1-->>C: ❌ Erreur : Impossible de répliquer
C->>N2: Lire X
N2-->>C: ❌ Erreur : Données indisponibles
Note over C,N2: Le système bloque plutôt<br/>que de retourner des données obsolètes
Exemples :
- MongoDB (avec souci d’écriture majoritaire)
- HBase
- Redis (avec configuration appropriée)
- SGBD traditionnels avec réplication synchrone
Utiliser lorsque : La précision des données est critique (systèmes financiers, inventaire)
AP : Disponibilité + Tolérance aux Partitions
Sacrifier la Cohérence
Pendant une partition, le système accepte les lectures et écritures, pouvant retourner des données obsolètes.
sequenceDiagram
participant C as Client
participant N1 as Nœud 1 (accepte écritures)
participant N2 as Nœud 2 (a anciennes données)
Note over N1,N2: 🔴 Partition Réseau
C->>N1: Écrire X = 10
N1-->>C: ✅ OK (écrit sur N1 seulement)
C->>N2: Lire X
N2-->>C: ✅ X = 5 (obsolète !)
Note over C,N2: Le système accepte les requêtes<br/>mais les données sont incohérentes
Exemples :
- Cassandra
- DynamoDB
- CouchDB
- Riak
Utiliser lorsque : Toujours répondre est plus important que la cohérence immédiate (médias sociaux, mise en cache, analyses)
CA : Cohérence + Disponibilité
Possible uniquement dans les systèmes à nœud unique
Sans partitions réseau (nœud unique ou réseau parfaitement fiable), vous pouvez avoir à la fois C et A.
graph TB
Single["Base de Données à Nœud Unique"]
Client["Client"]
Client --> Single
Single <--> Client
Note1[Pas de réseau = Pas de partitions]
Note --> Single
style Single fill:#fff9c4
Exemples :
- PostgreSQL à nœud unique
- MongoDB à nœud unique
- SGBD traditionnels sur un serveur
Réalité : Dans les systèmes distribués, CA n’est pas achievable car les réseaux ne sont pas parfaitement fiables.
Exemples CAP Réels
| Système | Choix CAP | Notes |
|---|---|---|
| Google Spanner | CP | Cohérence externe, toujours cohérent |
| Amazon DynamoDB | AP | Cohérence configurable |
| Cassandra | AP | Toujours inscriptible, cohérence ajustable |
| MongoDB | CP (par défaut) | Configurable en AP |
| Redis Cluster | AP | Réplication asynchrone |
| PostgreSQL | CA | Mode nœud unique |
| CockroachDB | CP | Sérialisabilité, gère les partitions |
| Couchbase | AP | Réplication Inter-Centres de Données |
Modèles de Cohérence
La “Cohérence” du théorème CAP est en fait la linéarisabilité (cohérence forte). Il existe plusieurs modèles de cohérence :
graph TB
subgraph "Spectre de Cohérence"
Strong["Cohérence Forte<br/>Linéarisabilité"]
Weak["Cohérence Faible<br/>Cohérence Finale"]
Strong --> S1["Cohérence<br/>Séquentielle"]
S1 --> S2["Cohérence<br/>Causale"]
S2 --> S3["Cohérence de<br/>Session"]
S3 --> S4["Lire Vos<br/>Écritures"]
S4 --> Weak
end
Modèles de Cohérence Forte
| Modèle | Description | Exemple |
|---|---|---|
| Linéarisable | Lecture la plus récente garantie | Transferts bancaires |
| Séquentielle | Les opérations apparaissent dans un certain ordre | Contrôle de version |
| Causale | Opérations causalement liées ordonnées | Applications de chat |
Modèles de Cohérence Faible
| Modèle | Description | Exemple |
|---|---|---|
| Lire Vos Écritures | L’utilisateur voit ses propres écritures | Profil de médias sociaux |
| Cohérence de Session | Cohérence dans une session | Panier d’achat |
| Cohérence Finale | Le système converge au fil du temps | DNS, CDN |
Exemple Pratique : Panier d’Achat
Voyons comment différents choix CAP affectent un système de panier d’achat :
Approche CP (Bloquer sur Partition)
sequenceDiagram
participant U as Utilisateur
participant S as Service
Note over U,S: 🔴 Partition réseau détectée
U->>S: Ajouter article au panier
S-->>U: ❌ Erreur : Service indisponible
Note over U,S: Utilisateur frustré,<br/>mais panier est toujours précis
Compromis : Ventes perdues, panier précis
Approche AP (Accepter Écritures)
sequenceDiagram
participant U as Utilisateur
participant S as Service
Note over U,S: 🔴 Partition réseau détectée
U->>S: Ajouter article au panier
S-->>U: ✅ OK (écrit localement)
Note over U,S: Utilisateur satisfait,<br/>mais panier peut être en conflit
Compromis : Utilisateurs satisfaits, conflits de fusion possibles ultérieurement
La Simplification “2 sur 3”
Le théorème CAP est souvent mal compris. La réalité est plus nuancée :
graph TB
subgraph "Réalité CAP"
CAP["Théorème CAP"]
CAP --> Malcompréhension["Vous devez choisir<br/>exactement 2"]
CAP --> Réalité["Vous pouvez avoir les 3<br/>en opération normale"]
CAP --> Vérité["Pendant partition,<br/>choisir C ou A"]
end
Aperçus Clés :
- P est obligatoire dans les systèmes distribués
- Pendant l’opération normale, vous pouvez avoir C + A + P
- Pendant une partition, vous choisissez entre C et A
- Plusieurs systèmes sont configurables (par exemple, DynamoDB)
Directives de Conception
Choisir CP Lorsque :
- ✅ Transactions financières
- ✅ Gestion d’inventaire
- ✅ Authentification/autorisation
- ✅ Tout système où les données obsolètes sont inacceptables
Choisir AP Lorsque :
- ✅ Fils d’actualités sociaux
- ✅ Recommandations de produits
- ✅ Analyses et journalisation
- ✅ Tout système où la disponibilité est critique
Techniques pour Équilibrer C et A :
| Technique | Description | Exemple |
|---|---|---|
| Lectures/écritures de quorum | Nécessite une reconnaissance majoritaire | DynamoDB |
| Cohérence ajustable | Laisser le client choisir par opération | Cassandra |
| Dégradation gracieuse | Changer de modes pendant partition | Plusieurs systèmes |
| Résolution de conflits | Fusionner les données divergentes ultérieurement | CRDTs |
Résumé
Points Clés à Retenir
- Théorème CAP : Vous ne pouvez pas avoir les trois dans une partition
- La tolérance aux partitions est obligatoire dans les systèmes distribués
- Le vrai choix : Cohérence vs Disponibilité pendant partition
- Plusieurs systèmes offrent des niveaux de cohérence ajustables
- Votre cas d’utilisation détermine le bon compromis
Vérifiez Votre Compréhension
- Pourquoi la tolérance aux partitions n’est-elle pas optionnelle dans les systèmes distribués ?
- Donnez un exemple où vous choisiriez CP plutôt que AP
- Qu’arrive-t-il à un système AP pendant une partition réseau ?
- Comment les lectures/écritures de quorum peuvent-elles aider à équilibrer C et A ?
🧠 Quiz du Chapitre
Testez votre maîtrise de ces concepts ! Ces questions mettront au défi votre compréhension et révéleront toute lacune dans vos connaissances.
Et Ensuite
Maintenant que nous comprenons les compromis CAP, construisons un simple magasin clé-valeur : Bases du Magasin
Bases du Système de Magasin
Session 3, Partie 3 - 35 minutes (démo de codage + pratique)
Objectifs d’Apprentissage
- Comprendre le modèle de données clé-valeur
- Construire un magasin clé-valeur à nœud unique en TypeScript
- Construire le même magasin en Python
- Déployer et tester le magasin en utilisant Docker Compose
- Effectuer des opérations de lecture/écriture de base via HTTP
Qu’est-ce qu’un Magasin Clé-Valeur ?
Un magasin clé-valeur est le type le plus simple de base de données :
graph LR
subgraph "Magasin Clé-Valeur"
KV[("Magasin de Données")]
K1["nom"] --> V1[""Alice""]
K2["âge"] --> V2["30"]
K3["ville"] --> V3[""NYC""]
K4["actif"] --> V4["true"]
K1 --> KV
K2 --> KV
K3 --> KV
K4 --> KV
end
Caractéristiques Clés :
- Modèle de données simple : clé → valeur
- Recherches rapides par clé
- Pas de requêtes complexes
- Sans schéma
Opérations de Base
| Opération | Description | Exemple |
|---|---|---|
| SET | Stocker une valeur pour une clé | SET user:1 Alice |
| GET | Récupérer une valeur par clé | GET user:1 → “Alice” |
| DELETE | Supprimer une clé | DELETE user:1 |
stateDiagram-v2
[*] --> NonExistant
NonExistant --> Existant: SET clé
Existant --> Existant: SET clé (mise à jour)
Existant --> NonExistant: DELETE clé
Existant --> Existant: GET clé (lecture)
NonExistant --> [*]: GET clé (null)
Implémentation
Nous allons construire un simple magasin clé-valeur basé sur HTTP avec des points de terminaison API REST.
Conception de l’API
GET /key/{clé} - Obtenir la valeur par clé
PUT /key/{clé} - Définir la valeur pour la clé
DELETE /key/{clé} - Supprimer la clé
GET /keys - Lister toutes les clés
Implémentation TypeScript
Structure du Projet
store-basics-ts/
├── package.json
├── tsconfig.json
├── Dockerfile
└── src/
└── store.ts # Implémentation complète du magasin
Code TypeScript Complet
store-basics-ts/src/store.ts
import http from 'http';
/**
* Magasin clé-valeur simple en mémoire
*/
class KeyValueStore {
private data: Map<string, any> = new Map();
/**
* Définir une paire clé-valeur
*/
set(key: string, value: any): void {
this.data.set(key, value);
console.log(`[Store] SET ${key} = ${JSON.stringify(value)}`);
}
/**
* Obtenir une valeur par clé
*/
get(key: string): any {
const value = this.data.get(key);
console.log(`[Store] GET ${key} => ${value !== undefined ? JSON.stringify(value) : 'null'}`);
return value;
}
/**
* Supprimer une clé
*/
delete(key: string): boolean {
const existed = this.data.delete(key);
console.log(`[Store] DELETE ${key} => ${existed ? 'succès' : 'non trouvé'}`);
return existed;
}
/**
* Obtenir toutes les clés
*/
keys(): string[] {
return Array.from(this.data.keys());
}
/**
* Obtenir les statistiques du magasin
*/
stats() {
return {
totalKeys: this.data.size,
keys: this.keys()
};
}
}
// Créer l'instance du magasin
const store = new KeyValueStore();
/**
* Serveur HTTP avec API clé-valeur
*/
const server = http.createServer((req, res) => {
// Activer CORS
res.setHeader('Access-Control-Allow-Origin', '*');
res.setHeader('Access-Control-Allow-Methods', 'GET, PUT, DELETE, OPTIONS');
res.setHeader('Access-Control-Allow-Headers', 'Content-Type');
if (req.method === 'OPTIONS') {
res.writeHead(200);
res.end();
return;
}
// Analyser l'URL
const url = new URL(req.url || '', `http://${req.headers.host}`);
// Route : GET /keys - Lister toutes les clés
if (req.method === 'GET' && url.pathname === '/keys') {
res.writeHead(200, { 'Content-Type': 'application/json' });
res.end(JSON.stringify(store.stats()));
return;
}
// Route : GET /key/{clé} - Obtenir la valeur
if (req.method === 'GET' && url.pathname.startsWith('/key/')) {
const key = url.pathname.slice(5); // Retirer '/key/'
const value = store.get(key);
if (value !== undefined) {
res.writeHead(200, { 'Content-Type': 'application/json' });
res.end(JSON.stringify({ key, value }));
} else {
res.writeHead(404, { 'Content-Type': 'application/json' });
res.end(JSON.stringify({ error: 'Key not found', key }));
}
return;
}
// Route : PUT /key/{clé} - Définir la valeur
if (req.method === 'PUT' && url.pathname.startsWith('/key/')) {
const key = url.pathname.slice(5); // Retirer '/key/'
let body = '';
req.on('data', chunk => body += chunk);
req.on('end', () => {
try {
const value = JSON.parse(body);
store.set(key, value);
res.writeHead(200, { 'Content-Type': 'application/json' });
res.end(JSON.stringify({ success: true, key, value }));
} catch (error) {
res.writeHead(400, { 'Content-Type': 'application/json' });
res.end(JSON.stringify({ error: 'Invalid JSON' }));
}
});
return;
}
// Route : DELETE /key/{clé} - Supprimer la clé
if (req.method === 'DELETE' && url.pathname.startsWith('/key/')) {
const key = url.pathname.slice(5); // Retirer '/key/'
const existed = store.delete(key);
if (existed) {
res.writeHead(200, { 'Content-Type': 'application/json' });
res.end(JSON.stringify({ success: true, key }));
} else {
res.writeHead(404, { 'Content-Type': 'application/json' });
res.end(JSON.stringify({ error: 'Key not found', key }));
}
return;
}
// 404 - Non trouvé
res.writeHead(404, { 'Content-Type': 'application/json' });
res.end(JSON.stringify({ error: 'Not found' }));
});
const PORT = process.env.PORT || 4000;
server.listen(PORT, () => {
console.log(`Key-Value Store listening on port ${PORT}`);
console.log(`\nAvailable endpoints:`);
console.log(` GET /key/{key} - Get value by key`);
console.log(` PUT /key/{key} - Set value for key`);
console.log(` DELETE /key/{key} - Delete key`);
console.log(` GET /keys - List all keys`);
});
store-basics-ts/package.json
{
"name": "store-basics-ts",
"version": "1.0.0",
"description": "Simple key-value store in TypeScript",
"main": "dist/store.js",
"scripts": {
"build": "tsc",
"start": "node dist/store.js",
"dev": "ts-node src/store.ts"
},
"dependencies": {},
"devDependencies": {
"@types/node": "^20.0.0",
"typescript": "^5.0.0",
"ts-node": "^10.9.0"
}
}
store-basics-ts/tsconfig.json
{
"compilerOptions": {
"target": "ES2020",
"module": "commonjs",
"outDir": "./dist",
"rootDir": "./src",
"strict": true,
"esModuleInterop": true
},
"include": ["src/**/*"]
}
store-basics-ts/Dockerfile
FROM node:18-alpine
WORKDIR /app
COPY package*.json ./
RUN npm install
COPY . .
RUN npm run build
EXPOSE 4000
CMD ["npm", "start"]
Implémentation Python
Structure du Projet
store-basics-py/
├── requirements.txt
├── Dockerfile
└── src/
└── store.py # Implémentation complète du magasin
Code Python Complet
store-basics-py/src/store.py
from http.server import HTTPServer, BaseHTTPRequestHandler
import json
from typing import Any, Dict
from urllib.parse import urlparse
class KeyValueStore:
"""Magasin clé-valeur simple en mémoire."""
def __init__(self):
self.data: Dict[str, Any] = {}
def set(self, key: str, value: Any) -> None:
"""Stocker une paire clé-valeur."""
self.data[key] = value
print(f"[Store] SET {key} = {json.dumps(value)}")
def get(self, key: str) -> Any:
"""Obtenir la valeur par clé."""
value = self.data.get(key)
print(f"[Store] GET {key} => {json.dumps(value) if value is not None else 'null'}")
return value
def delete(self, key: str) -> bool:
"""Supprimer une clé."""
existed = key in self.data
if existed:
del self.data[key]
print(f"[Store] DELETE {key} => {'success' if existed else 'not found'}")
return existed
def keys(self) -> list:
"""Obtenir toutes les clés."""
return list(self.data.keys())
def stats(self) -> dict:
"""Obtenir les statistiques du magasin."""
return {
'totalKeys': len(self.data),
'keys': self.keys()
}
# Créer l'instance du magasin
store = KeyValueStore()
class StoreHandler(BaseHTTPRequestHandler):
"""Gestionnaire de requêtes HTTP pour le magasin clé-valeur."""
def send_json_response(self, status: int, data: dict):
"""Envoyer une réponse JSON."""
self.send_response(status)
self.send_header('Content-Type', 'application/json')
self.send_header('Access-Control-Allow-Origin', '*')
self.end_headers()
self.wfile.write(json.dumps(data).encode())
def do_OPTIONS(self):
"""Gérer les requêtes préalables CORS."""
self.send_response(200)
self.send_header('Access-Control-Allow-Origin', '*')
self.send_header('Access-Control-Allow-Methods', 'GET, PUT, DELETE, OPTIONS')
self.send_header('Access-Control-Allow-Headers', 'Content-Type')
self.end_headers()
def do_GET(self):
"""Gérer les requêtes GET."""
parsed = urlparse(self.path)
# GET /keys - Lister toutes les clés
if parsed.path == '/keys':
self.send_json_response(200, store.stats())
return
# GET /key/{clé} - Obtenir la valeur
if parsed.path.startswith('/key/'):
key = parsed.path[5:] # Retirer '/key/'
value = store.get(key)
if value is not None:
self.send_json_response(200, {'key': key, 'value': value})
else:
self.send_json_response(404, {'error': 'Key not found', 'key': key})
return
# 404
self.send_json_response(404, {'error': 'Not found'})
def do_PUT(self):
"""Gérer les requêtes PUT (définir valeur)."""
parsed = urlparse(self.path)
# PUT /key/{clé} - Définir la valeur
if parsed.path.startswith('/key/'):
key = parsed.path[5:] # Retirer '/key/'
content_length = int(self.headers.get('Content-Length', 0))
body = self.rfile.read(content_length).decode('utf-8')
try:
value = json.loads(body)
store.set(key, value)
self.send_json_response(200, {'success': True, 'key': key, 'value': value})
except json.JSONDecodeError:
self.send_json_response(400, {'error': 'Invalid JSON'})
return
# 404
self.send_json_response(404, {'error': 'Not found'})
def do_DELETE(self):
"""Gérer les requêtes DELETE."""
parsed = urlparse(self.path)
# DELETE /key/{clé} - Supprimer la clé
if parsed.path.startswith('/key/'):
key = parsed.path[5:] # Retirer '/key/'
existed = store.delete(key)
if existed:
self.send_json_response(200, {'success': True, 'key': key})
else:
self.send_json_response(404, {'error': 'Key not found', 'key': key})
return
# 404
self.send_json_response(404, {'error': 'Not found'})
def log_message(self, format, *args):
"""Supprimer la journalisation par défaut."""
pass
def run_server(port: int = 4000):
"""Démarrer le serveur HTTP."""
server_address = ('', port)
httpd = HTTPServer(server_address, StoreHandler)
print(f"Key-Value Store listening on port {port}")
print(f"\nAvailable endpoints:")
print(f" GET /key/{{key}} - Get value by key")
print(f" PUT /key/{{key}} - Set value for key")
print(f" DELETE /key/{{key}} - Delete key")
print(f" GET /keys - List all keys")
httpd.serve_forever()
if __name__ == '__main__':
import os
port = int(os.environ.get('PORT', 4000))
run_server(port)
store-basics-py/requirements.txt
# Aucune dépendance externe requise - utilise uniquement la bibliothèque standard
store-basics-py/Dockerfile
FROM python:3.11-alpine
WORKDIR /app
COPY requirements.txt ./
RUN pip install --no-cache-dir -r requirements.txt
COPY . .
EXPOSE 4000
CMD ["python", "src/store.py"]
Configuration Docker Compose
Version TypeScript
examples/02-store/ts/docker-compose.yml
version: '3.8'
services:
store:
build: .
ports:
- "4000:4000"
environment:
- PORT=4000
volumes:
- ./src:/app/src
Version Python
examples/02-store/py/docker-compose.yml
version: '3.8'
services:
store:
build: .
ports:
- "4000:4000"
environment:
- PORT=4000
volumes:
- ./src:/app/src
Exécution de l’Exemple
Étape 1 : Démarrer le Magasin
TypeScript :
cd examples/02-store/ts
docker-compose up --build
Python :
cd examples/02-store/py
docker-compose up --build
Vous devriez voir :
store | Key-Value Store listening on port 4000
store |
store | Available endpoints:
store | GET /key/{key} - Get value by key
store | PUT /key/{key} - Set value for key
store | DELETE /key/{key} - Delete key
store | GET /keys - List all keys
Étape 2 : Stocker Quelques Valeurs
# Stocker une chaîne
curl -X PUT http://localhost:4000/key/name \
-H "Content-Type: application/json" \
-d '"Alice"'
# Stocker un nombre
curl -X PUT http://localhost:4000/key/age \
-H "Content-Type: application/json" \
-d '30'
# Stocker un objet
curl -X PUT http://localhost:4000/key/user:1 \
-H "Content-Type: application/json" \
-d '{"name": "Alice", "age": 30, "city": "NYC"}'
# Stocker une liste
curl -X PUT http://localhost:4000/key/tags \
-H "Content-Type: application/json" \
-d '["distributed", "systems", "course"]'
Étape 3 : Récupérer les Valeurs
# Obtenir une chaîne
curl http://localhost:4000/key/name
# Response: {"key":"name","value":"Alice"}
# Obtenir un nombre
curl http://localhost:4000/key/age
# Response: {"key":"age","value":30}
# Obtenir un objet
curl http://localhost:4000/key/user:1
# Response: {"key":"user:1","value":{"name":"Alice","age":30,"city":"NYC"}}
# Obtenir une liste
curl http://localhost:4000/key/tags
# Response: {"key":"tags","value":["distributed","systems","course"]}
# Essayer d'obtenir une clé inexistante
curl http://localhost:4000/key/nonexistent
# Response: {"error":"Key not found","key":"nonexistent"}
Étape 4 : Lister Toutes les Clés
curl http://localhost:4000/keys
# Response: {"totalKeys":4,"keys":["name","age","user:1","tags"]}
Étape 5 : Supprimer une Clé
# Supprimer une clé
curl -X DELETE http://localhost:4000/key/age
# Response: {"success":true,"key":"age"}
# Vérifier qu'elle a disparu
curl http://localhost:4000/key/age
# Response: {"error":"Key not found","key":"age"}
# Vérifier les clés restantes
curl http://localhost:4000/keys
# Response: {"totalKeys":3,"keys":["name","user:1","tags"]}
Architecture du Système
graph TB
subgraph "Magasin Clé-Valeur à Nœud Unique"
Client["Applications Clientes"]
API["API HTTP"]
Store[("Données en<br/>Mémoire")]
Client -->|"GET/PUT/DELETE"| API
API --> Store
end
style Store fill:#f9f,stroke:#333,stroke-width:3px
Exercices
Exercice 1 : Ajouter le Support TTL (Time-To-Live)
Modifier le magasin pour expirer automatiquement les clés après un temps spécifié :
- Ajouter un paramètre
ttloptionnel à l’opération SET - Suivre quand chaque clé devrait expirer
- Retourner null pour les clés expirées
- Implémenter un mécanisme de nettoyage
Indice : Stocker les métadonnées alongside les valeurs, ou utiliser une carte d’expiration séparée.
Exercice 2 : Ajouter des Motifs de Clés
Ajouter le support des caractères génériques pour les recherches de clés :
- Implémenter
GET /keys?pattern=user:*pour lister les clés correspondantes - Supporter les correspondances avec caractère générique
*simple - Tester avec des motifs comme
user:*,*:admin, etc.
Exercice 3 : Ajouter la Persistance des Données
Actuellement les données sont perdues lorsque le serveur redémarre. Ajouter la persistance :
- Sauvegarder les données dans un fichier JSON à chaque écriture
- Charger les données depuis le fichier au démarrage
- Gérer les écritures simultanées en toute sécurité
Résumé
Points Clés à Retenir
- Les magasins clé-valeur sont des systèmes de stockage de données simples mais puissants
- Opérations de base : SET, GET, DELETE
- L’API HTTP fournit une interface simple pour l’accès à distance
- Les magasins à nœud unique sont CA (Cohérent + Disponible) selon la perspective CAP
- Prochaines étapes : Ajouter la réplication pour la tolérance aux pannes (Session 4)
Vérifiez Votre Compréhension
- Quelles sont les quatre opérations de base que nous avons implémentées ?
- Comment notre magasin gère-t-il les requêtes pour les clés inexistantes ?
- Qu’arrive-t-il aux données lorsque le conteneur Docker s’arrête ?
- Pourquoi ce magasin à nœud unique est-il “CA” selon les termes CAP ?
🧠 Quiz du Chapitre
Testez votre maîtrise de ces concepts ! Ces questions mettront au défi votre compréhension et révéleront toute lacune dans vos connaissances.
Et Ensuite
Notre simple magasin fonctionne, mais qu’arrive-t-il lorsqu’un nœud échoue ? Ajoutons la réplication : Réplication (Session 4)
Replication et Election de Leader
Session 4 - Session complète
Objectifs d’Apprentissage
- Comprendre pourquoi nous répliquons les données
- Apprendre la réplication à leader unique vs multi-leader
- Implémenter la réplication basée sur un leader
- Construire un mécanisme simple d’élection de leader
- Déployer un magasin répliqué à 3 nœuds
Pourquoi Répliquer les Données ?
Dans notre magasin à nœud unique de la Session 3, que se passe-t-il lorsque le nœud tombe en panne ?
Réponse : Toutes les données sont perdues et le système devient indisponible.
graph LR
subgraph "Nœud Unique - Pas de Tolérance aux Pannes"
C[Clients] --> N[Node 1]
N1[Node 1<br/>❌ FAILED]
style N1 fill:#f66,stroke:#333,stroke-width:3px
end
La réplication résout ce problème en gardant des copies des données sur plusieurs nœuds :
graph TB
subgraph "Magasin Répliqué - Tolérant aux Pannes"
C[Clients]
L[Leader<br/>Node 1]
F1[Suiveur<br/>Node 2]
F2[Suiveur<br/>Node 3]
C --> L
L -->|"réplique"| F1
L -->|"réplique"| F2
end
style L fill:#6f6,stroke:#333,stroke-width:3px
Avantages de la Réplication :
- Tolérance aux pannes : Si un nœud tombe en panne, les autres ont les données
- Mise à l’échelle des lectures : Les clients peuvent lire depuis n’importe quel réplica
- Faible latence : Placer les répliques plus près des utilisateurs
- Haute disponibilité : Le système continue pendant les pannes de nœuds
Stratégies de Réplication
Réplication à Leader Unique
Également appelée : primaire-réplique, maître-esclave, actif-passif
sequenceDiagram
participant C as Client
participant L as Leader
participant F1 as Suiveur 1
participant F2 as Suiveur 2
Note over C,F2: Opération d'Écriture
C->>L: PUT /key/name "Alice"
L->>L: Écrire dans le stockage local
L->>F1: Répliquer : SET name = "Alice"
L->>F2: Répliquer : SET name = "Alice"
F1->>L: ACK
F2->>L: ACK
L->>C: Réponse : Success
Note over C,F2: Opération de Lecture
C->>L: GET /key/name
L->>C: Réponse : "Alice"
Note over C,F2: Ou lire depuis le suiveur
C->>F1: GET /key/name
F1->>C: Réponse : "Alice"
Caractéristiques :
- Le Leader gère toutes les écritures
- Les Suiveurs se répliquent depuis le leader
- Les Lectures peuvent aller vers le leader ou les suiveurs
- Modèle de cohérence simple
Réplication Multi-Leader
Également appelée : multi-maître, actif-actif
graph TB
subgraph "Réplication Multi-Leader"
C1[Client 1]
C2[Client 2]
L1[Leader 1<br/>Datacenter A]
L2[Leader 2<br/>Datacenter B]
F1[Suiveur 1]
F2[Suiveur 2]
C1 --> L1
C2 --> L2
L1 <-->|"résoudre les conflits"| L2
L1 --> F1
L2 --> F2
end
style L1 fill:#6f6,stroke:#333,stroke-width:3px
style L2 fill:#6f6,stroke:#333,stroke-width:3px
Caractéristiques :
- Plusieurs nœuds acceptent les écritures
- Résolution de conflits plus complexe
- Mieux pour les configurations géo-distribuées
- Nous ne l’implémenterons pas (sujet avancé)
Réplication Synchrone vs Asynchrone
sequenceDiagram
participant C as Client
participant L as Leader
par Réplication Synchrone
L->>F: Répliquer l'écriture
F->>L: ACK (doit attendre)
L->>C: Success (après confirmation des répliques)
and Réplication Asynchrone
L->>C: Success (immédiatement)
L--xF: Répliquer en arrière-plan
end
participant F as Suiveur
| Stratégie | Avantages | Inconvénients |
|---|---|---|
| Synchrone | Cohérence forte, aucune perte de données | Écritures plus lentes, bloquant |
| Asynchrone | Écritures rapides, non-bloquant | Perte de données en cas de panne du leader, lectures périmées |
Pour ce cours, nous utiliserons la réplication asynchrone pour simplifier.
Élection de Leader
Lorsque le leader tombe en panne, les suiveurs doivent élire un nouveau leader :
stateDiagram-v2
[*] --> Suiveur: Le nœud démarre
Suiveur --> Candidat: Pas de heartbeat du leader
Candidat --> Leader: Gagne l'élection (majorité des votes)
Candidat --> Suiveur: Perd l'élection
Leader --> Suiveur: Détecte un terme/nœud supérieur
Suiveur --> [*]: Le nœud s'arrête
L’Algorithme du Bully
Un algorithme simple d’élection de leader :
- Détecter la panne du leader : Pas de heartbeat pendant la période de timeout
- Démarrer l’élection : Le nœud avec l’ID le plus élevé devient candidat leader
- Voter : Les nœuds avec des numéros inférieurs votent pour le candidat
- Devenir leader : Le candidat devient leader si la majorité est d’accord
sequenceDiagram
participant N1 as Nœud 1<br/>(Leader)
participant N2 as Nœud 2
participant N3 as Nœud 3
Note over N1,N3: Fonctionnement Normal
N1->>N2: Heartbeat
N1->>N3: Heartbeat
Note over N1,N3: Panne du Leader
N1--xN2: Heartbeat timeout !
N1--xN3: Heartbeat timeout !
Note over N2,N3: Début de l'Élection
N2->>N3: Demande de vote (ID=2)
N3->>N2: Voter pour N2 (2 > 3 ? Non, attendre)
Note over N2,N3: En fait, N3 a un ID plus élevé
N3->>N2: Demande de vote (ID=3)
N2->>N3: Voter pour N3 (3 > 2, oui !)
Note over N2,N3: N3 Devient Leader
N3->>N2: Je suis le leader
N3->>N2: Heartbeat
Pour simplifier, nous utiliserons une approche plus simple :
- Le nœud avec l’ID le plus bas devient leader
- Si le leader tombe en panne, le prochain plus bas devient leader
- Pas de vote, juste une sélection basée sur l’ordre
Implémentation
Implémentation TypeScript
Structure du Projet :
replicated-store-ts/
├── package.json
├── tsconfig.json
├── Dockerfile
├── docker-compose.yml
└── src/
└── node.ts # Nœud répliqué avec élection de leader
replicated-store-ts/src/node.ts
import http from 'http';
/**
* Configuration du nœud
*/
const config = {
nodeId: process.env.NODE_ID || 'node-1',
port: parseInt(process.env.PORT || '4000'),
peers: (process.env.PEERS || '').split(',').filter(Boolean),
heartbeatInterval: 2000, // ms
electionTimeout: 6000, // ms
};
type NodeRole = 'leader' | 'follower' | 'candidate';
/**
* Nœud de Magasin Répliqué
*/
class StoreNode {
public nodeId: string;
public role: NodeRole;
public term: number;
public data: Map<string, any>;
public peers: string[];
private leaderId: string | null;
private lastHeartbeat: number;
private heartbeatTimer?: NodeJS.Timeout;
private electionTimer?: NodeJS.Timeout;
constructor(nodeId: string, peers: string[]) {
this.nodeId = nodeId;
this.role = 'follower';
this.term = 0;
this.data = new Map();
this.peers = peers;
this.leaderId = null;
this.lastHeartbeat = Date.now();
this.startElectionTimer();
this.startHeartbeat();
}
/**
* Démarrer le timer de timeout d'élection
*/
private startElectionTimer() {
this.electionTimer = setTimeout(() => {
const timeSinceHeartbeat = Date.now() - this.lastHeartbeat;
if (timeSinceHeartbeat > config.electionTimeout && this.role !== 'leader') {
console.log(`[${this.nodeId}] Election timeout ! Démarrage de l'élection...`);
this.startElection();
}
this.startElectionTimer();
}, config.electionTimeout);
}
/**
* Démarrer l'élection de leader (simplifié : l'ID le plus bas gagne)
*/
private startElection() {
this.term++;
this.role = 'candidate';
// Stratégie simple : le nœud avec l'ID le plus bas devient leader
const allNodes = [this.nodeId, ...this.peers].sort();
const lowestNode = allNodes[0];
if (this.nodeId === lowestNode) {
this.becomeLeader();
} else {
this.role = 'follower';
this.leaderId = lowestNode;
console.log(`[${this.nodeId}] En attente de ${lowestNode} pour devenir leader`);
}
}
/**
* Devenir le leader
*/
private becomeLeader() {
this.role = 'leader';
this.leaderId = this.nodeId;
console.log(`[${this.nodeId}] 👑 Devenu LEADER pour le terme ${this.term}`);
// Répliquer immédiatement aux suiveurs
this.replicateToFollowers();
}
/**
* Démarrer le heartbeat vers les suiveurs
*/
private startHeartbeat() {
this.heartbeatTimer = setInterval(() => {
if (this.role === 'leader') {
this.sendHeartbeat();
}
}, config.heartbeatInterval);
}
/**
* Envoyer le heartbeat à tous les suiveurs
*/
private sendHeartbeat() {
const heartbeat = {
type: 'heartbeat',
leaderId: this.nodeId,
term: this.term,
timestamp: Date.now(),
};
this.peers.forEach(peerUrl => {
this.sendToPeer(peerUrl, '/internal/heartbeat', heartbeat)
.catch(err => console.log(`[${this.nodeId}] Échec de l'envoi du heartbeat à ${peerUrl}:`, err.message));
});
}
/**
* Répliquer les données à tous les suiveurs
*/
private replicateToFollowers() {
// Convertir Map en objet pour la réplication
const dataObj = Object.fromEntries(this.data);
this.peers.forEach(peerUrl => {
this.sendToPeer(peerUrl, '/internal/replicate', {
type: 'replicate',
leaderId: this.nodeId,
term: this.term,
data: dataObj,
}).catch(err => console.log(`[${this.nodeId}] Réplication échouée vers ${peerUrl}:`, err.message));
});
}
/**
* Gérer le heartbeat du leader
*/
handleHeartbeat(heartbeat: any) {
if (heartbeat.term >= this.term) {
this.term = heartbeat.term;
this.lastHeartbeat = Date.now();
this.leaderId = heartbeat.leaderId;
this.role = 'follower';
if (this.role !== 'follower') {
console.log(`[${this.nodeId}] Rétrogradation en suiveur, terme ${this.term}`);
}
}
}
/**
* Gérer la réplication du leader
*/
handleReplication(message: any) {
if (message.term >= this.term) {
this.term = message.term;
this.leaderId = message.leaderId;
this.role = 'follower';
this.lastHeartbeat = Date.now();
// Fusionner les données répliquées
Object.entries(message.data).forEach(([key, value]) => {
this.data.set(key, value);
});
console.log(`[${this.nodeId}] ${Object.keys(message.data).length} clés répliquées depuis le leader`);
}
}
/**
* Envoyer des données à un nœud pair
*/
private async sendToPeer(peerUrl: string, path: string, data: any): Promise<void> {
return new Promise((resolve, reject) => {
const url = new URL(path, peerUrl);
const options = {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
};
const req = http.request(url, options, (res) => {
if (res.statusCode === 200) {
resolve();
} else {
reject(new Error(`Status ${res.statusCode}`));
}
});
req.on('error', reject);
req.write(JSON.stringify(data));
req.end();
});
}
/**
* Définir une paire clé-valeur (seulement sur le leader)
*/
set(key: string, value: any): boolean {
if (this.role !== 'leader') {
return false;
}
this.data.set(key, value);
console.log(`[${this.nodeId}] SET ${key} = ${JSON.stringify(value)}`);
// Répliquer aux suiveurs
this.replicateToFollowers();
return true;
}
/**
* Obtenir une valeur par clé
*/
get(key: string): any {
const value = this.data.get(key);
console.log(`[${this.nodeId}] GET ${key} => ${value !== undefined ? JSON.stringify(value) : 'null'}`);
return value;
}
/**
* Supprimer une clé
*/
delete(key: string): boolean {
if (this.role !== 'leader') {
return false;
}
const existed = this.data.delete(key);
console.log(`[${this.nodeId}] DELETE ${key} => ${existed ? 'success' : 'not found'}`);
// Répliquer aux suiveurs
this.replicateToFollowers();
return existed;
}
/**
* Obtenir le statut du nœud
*/
getStatus() {
return {
nodeId: this.nodeId,
role: this.role,
term: this.term,
leaderId: this.leaderId,
totalKeys: this.data.size,
keys: Array.from(this.data.keys()),
};
}
}
// Créer le nœud
const node = new StoreNode(config.nodeId, config.peers);
/**
* Serveur HTTP
*/
const server = http.createServer((req, res) => {
res.setHeader('Content-Type', 'application/json');
res.setHeader('Access-Control-Allow-Origin', '*');
res.setHeader('Access-Control-Allow-Methods', 'GET, POST, PUT, DELETE, OPTIONS');
res.setHeader('Access-Control-Allow-Headers', 'Content-Type');
if (req.method === 'OPTIONS') {
res.writeHead(200);
res.end();
return;
}
const url = new URL(req.url || '', `http://${req.headers.host}`);
// Route : POST /internal/heartbeat - Heartbeat du leader
if (req.method === 'POST' && url.pathname === '/internal/heartbeat') {
let body = '';
req.on('data', chunk => body += chunk);
req.on('end', () => {
try {
const heartbeat = JSON.parse(body);
node.handleHeartbeat(heartbeat);
res.writeHead(200);
res.end(JSON.stringify({ success: true }));
} catch (error) {
res.writeHead(400);
res.end(JSON.stringify({ error: 'Invalid request' }));
}
});
return;
}
// Route : POST /internal/replicate - Réplication du leader
if (req.method === 'POST' && url.pathname === '/internal/replicate') {
let body = '';
req.on('data', chunk => body += chunk);
req.on('end', () => {
try {
const message = JSON.parse(body);
node.handleReplication(message);
res.writeHead(200);
res.end(JSON.stringify({ success: true }));
} catch (error) {
res.writeHead(400);
res.end(JSON.stringify({ error: 'Invalid request' }));
}
});
return;
}
// Route : GET /status - Statut du nœud
if (req.method === 'GET' && url.pathname === '/status') {
res.writeHead(200);
res.end(JSON.stringify(node.getStatus()));
return;
}
// Route : GET /key/{key} - Obtenir une valeur
if (req.method === 'GET' && url.pathname.startsWith('/key/')) {
const key = url.pathname.slice(5);
const value = node.get(key);
if (value !== undefined) {
res.writeHead(200);
res.end(JSON.stringify({ key, value, nodeRole: node.role }));
} else {
res.writeHead(404);
res.end(JSON.stringify({ error: 'Key not found', key }));
}
return;
}
// Route : PUT /key/{key} - Définir une valeur (leader uniquement)
if (req.method === 'PUT' && url.pathname.startsWith('/key/')) {
const key = url.pathname.slice(5);
if (node.role !== 'leader') {
res.writeHead(503);
res.end(JSON.stringify({
error: 'Not the leader',
currentRole: node.role,
leaderId: node.leaderId || 'Unknown',
}));
return;
}
let body = '';
req.on('data', chunk => body += chunk);
req.on('end', () => {
try {
const value = JSON.parse(body);
node.set(key, value);
res.writeHead(200);
res.end(JSON.stringify({ success: true, key, value, leaderId: node.nodeId }));
} catch (error) {
res.writeHead(400);
res.end(JSON.stringify({ error: 'Invalid JSON' }));
}
});
return;
}
// Route : DELETE /key/{key} - Supprimer une clé (leader uniquement)
if (req.method === 'DELETE' && url.pathname.startsWith('/key/')) {
const key = url.pathname.slice(5);
if (node.role !== 'leader') {
res.writeHead(503);
res.end(JSON.stringify({
error: 'Not the leader',
currentRole: node.role,
leaderId: node.leaderId || 'Unknown',
}));
return;
}
const existed = node.delete(key);
if (existed) {
res.writeHead(200);
res.end(JSON.stringify({ success: true, key, leaderId: node.nodeId }));
} else {
res.writeHead(404);
res.end(JSON.stringify({ error: 'Key not found', key }));
}
return;
}
// 404
res.writeHead(404);
res.end(JSON.stringify({ error: 'Not found' }));
});
server.listen(config.port, () => {
console.log(`[${config.nodeId}] Store Node écoutant sur le port ${config.port}`);
console.log(`[${config.nodeId}] Pairs : ${config.peers.join(', ') || 'none'}`);
console.log(`[${config.nodeId}] Points de terminaison disponibles :`);
console.log(` GET /status - Statut et rôle du nœud`);
console.log(` GET /key/{key} - Obtenir une valeur`);
console.log(` PUT /key/{key} - Définir une valeur (leader uniquement)`);
console.log(` DEL /key/{key} - Supprimer une clé (leader uniquement)`);
});
replicated-store-ts/package.json
{
"name": "replicated-store-ts",
"version": "1.0.0",
"description": "Replicated key-value store with leader election in TypeScript",
"main": "dist/node.js",
"scripts": {
"build": "tsc",
"start": "node dist/node.js",
"dev": "ts-node src/node.ts"
},
"dependencies": {},
"devDependencies": {
"@types/node": "^20.0.0",
"typescript": "^5.0.0",
"ts-node": "^10.9.0"
}
}
replicated-store-ts/tsconfig.json
{
"compilerOptions": {
"target": "ES2020",
"module": "commonjs",
"outDir": "./dist",
"rootDir": "./src",
"strict": true,
"esModuleInterop": true
},
"include": ["src/**/*"]
}
replicated-store-ts/Dockerfile
FROM node:18-alpine
WORKDIR /app
COPY package*.json ./
RUN npm install
COPY . .
RUN npm run build
EXPOSE 4000
CMD ["npm", "start"]
Implémentation Python
replicated-store-py/src/node.py
import os
import json
import time
import threading
from http.server import HTTPServer, BaseHTTPRequestHandler
from typing import Any, Dict, List, Optional
from urllib.parse import urlparse, parse_qs
from urllib.request import Request, urlopen
from urllib.error import URLError
class StoreNode:
"""Nœud de magasin répliqué avec élection de leader."""
def __init__(self, node_id: str, peers: List[str]):
self.node_id = node_id
self.role: str = 'follower' # leader, follower, candidate
self.term = 0
self.data: Dict[str, Any] = {}
self.peers = peers
self.leader_id: Optional[str] = None
self.last_heartbeat = time.time()
# Configuration
self.heartbeat_interval = 2.0 # secondes
self.election_timeout = 6.0 # secondes
# Démarrer le timer d'élection
self.start_election_timer()
# Démarrer le thread de heartbeat
self.start_heartbeat_thread()
def start_election_timer(self):
"""Démarrer le timer de timeout d'élection."""
def election_timer():
while True:
time.sleep(1)
time_since = time.time() - self.last_heartbeat
if time_since > self.election_timeout and self.role != 'leader':
print(f"[{self.node_id}] Election timeout ! Démarrage de l'élection...")
self.start_election()
thread = threading.Thread(target=election_timer, daemon=True)
thread.start()
def start_election(self):
"""Démarrer l'élection de leader (le plus simple : l'ID le plus bas gagne)."""
self.term += 1
self.role = 'candidate'
# Stratégie simple : le nœud avec l'ID le plus bas devient leader
all_nodes = sorted([self.node_id] + self.peers)
lowest_node = all_nodes[0]
if self.node_id == lowest_node:
self.become_leader()
else:
self.role = 'follower'
self.leader_id = lowest_node
print(f"[{self.node_id}] En attente de {lowest_node} pour devenir leader")
def become_leader(self):
"""Devenir le leader."""
self.role = 'leader'
self.leader_id = self.node_id
print(f"[{self.node_id}] 👑 Devenu LEADER pour le terme {self.term}")
# Répliquer immédiatement aux suiveurs
self.replicate_to_followers()
def start_heartbeat_thread(self):
"""Démarrer le heartbeat vers les suiveurs."""
def heartbeat_loop():
while True:
time.sleep(self.heartbeat_interval)
if self.role == 'leader':
self.send_heartbeat()
thread = threading.Thread(target=heartbeat_loop, daemon=True)
thread.start()
def send_heartbeat(self):
"""Envoyer le heartbeat à tous les suiveurs."""
heartbeat = {
'type': 'heartbeat',
'leader_id': self.node_id,
'term': self.term,
'timestamp': int(time.time() * 1000),
}
for peer in self.peers:
try:
self.send_to_peer(peer, '/internal/heartbeat', heartbeat)
except Exception as e:
print(f"[{self.node_id}] Échec de l'envoi du heartbeat à {peer} : {e}")
def replicate_to_followers(self):
"""Répliquer les données à tous les suiveurs."""
message = {
'type': 'replicate',
'leader_id': self.node_id,
'term': self.term,
'data': self.data,
}
for peer in self.peers:
try:
self.send_to_peer(peer, '/internal/replicate', message)
except Exception as e:
print(f"[{self.node_id}] Réplication échouée vers {peer} : {e}")
def handle_heartbeat(self, heartbeat: dict):
"""Gérer le heartbeat du leader."""
if heartbeat['term'] >= self.term:
self.term = heartbeat['term']
self.last_heartbeat = time.time()
self.leader_id = heartbeat['leader_id']
if self.role != 'follower':
print(f"[{self.node_id}] Rétrogradation en suiveur, terme {self.term}")
self.role = 'follower'
def handle_replication(self, message: dict):
"""Gérer la réplication du leader."""
if message['term'] >= self.term:
self.term = message['term']
self.leader_id = message['leader_id']
self.role = 'follower'
self.last_heartbeat = time.time()
# Fusionner les données répliquées
self.data.update(message['data'])
print(f"[{self.node_id}] {len(message['data'])} clés répliquées depuis le leader")
def send_to_peer(self, peer_url: str, path: str, data: dict) -> None:
"""Envoyer des données à un nœud pair."""
url = f"{peer_url}{path}"
body = json.dumps(data).encode('utf-8')
req = Request(url, data=body, headers={'Content-Type': 'application/json'}, method='POST')
with urlopen(req, timeout=1) as response:
if response.status != 200:
raise Exception(f"Status {response.status}")
def set(self, key: str, value: Any) -> bool:
"""Définir une paire clé-valeur (seulement sur le leader)."""
if self.role != 'leader':
return False
self.data[key] = value
print(f"[{self.node_id}] SET {key} = {json.dumps(value)}")
# Répliquer aux suiveurs
self.replicate_to_followers()
return True
def get(self, key: str) -> Any:
"""Obtenir une valeur par clé."""
value = self.data.get(key)
print(f"[{self.node_id}] GET {key} => {json.dumps(value) if value is not None else 'null'}")
return value
def delete(self, key: str) -> bool:
"""Supprimer une clé (seulement sur le leader)."""
if self.role != 'leader':
return False
existed = key in self.data
if existed:
del self.data[key]
print(f"[{self.node_id}] DELETE {key} => {'success' if existed else 'not found'}")
# Répliquer aux suiveurs
self.replicate_to_followers()
return existed
def get_status(self) -> dict:
"""Obtenir le statut du nœud."""
return {
'node_id': self.node_id,
'role': self.role,
'term': self.term,
'leader_id': self.leader_id,
'total_keys': len(self.data),
'keys': list(self.data.keys()),
}
# Créer le nœud
config = {
'node_id': os.environ.get('NODE_ID', 'node-1'),
'port': int(os.environ.get('PORT', '4000')),
'peers': [p for p in os.environ.get('PEERS', '').split(',') if p],
}
node = StoreNode(config['node_id'], config['peers'])
class NodeHandler(BaseHTTPRequestHandler):
"""Gestionnaire de requêtes HTTP pour le nœud de magasin."""
def send_json_response(self, status: int, data: dict):
"""Envoyer une réponse JSON."""
self.send_response(status)
self.send_header('Content-Type', 'application/json')
self.send_header('Access-Control-Allow-Origin', '*')
self.end_headers()
self.wfile.write(json.dumps(data).encode())
def do_OPTIONS(self):
"""Gérer le pré-vol CORS."""
self.send_response(200)
self.send_header('Access-Control-Allow-Origin', '*')
self.send_header('Access-Control-Allow-Methods', 'GET, POST, PUT, DELETE, OPTIONS')
self.send_header('Access-Control-Allow-Headers', 'Content-Type')
self.end_headers()
def do_POST(self):
"""Gérer les requêtes POST."""
parsed = urlparse(self.path)
# POST /internal/heartbeat
if parsed.path == '/internal/heartbeat':
content_length = int(self.headers.get('Content-Length', 0))
body = self.rfile.read(content_length).decode('utf-8')
try:
heartbeat = json.loads(body)
node.handle_heartbeat(heartbeat)
self.send_json_response(200, {'success': True})
except (json.JSONDecodeError, KeyError):
self.send_json_response(400, {'error': 'Invalid request'})
return
# POST /internal/replicate
if parsed.path == '/internal/replicate':
content_length = int(self.headers.get('Content-Length', 0))
body = self.rfile.read(content_length).decode('utf-8')
try:
message = json.loads(body)
node.handle_replication(message)
self.send_json_response(200, {'success': True})
except (json.JSONDecodeError, KeyError):
self.send_json_response(400, {'error': 'Invalid request'})
return
self.send_json_response(404, {'error': 'Not found'})
def do_GET(self):
"""Gérer les requêtes GET."""
parsed = urlparse(self.path)
# GET /status
if parsed.path == '/status':
self.send_json_response(200, node.get_status())
return
# GET /key/{key}
if parsed.path.startswith('/key/'):
key = parsed.path[5:] # Retirer '/key/'
value = node.get(key)
if value is not None:
self.send_json_response(200, {'key': key, 'value': value, 'node_role': node.role})
else:
self.send_json_response(404, {'error': 'Key not found', 'key': key})
return
self.send_json_response(404, {'error': 'Not found'})
def do_PUT(self):
"""Gérer les requêtes POST (définir une valeur)."""
parsed = urlparse(self.path)
# PUT /key/{key}
if parsed.path.startswith('/key/'):
key = parsed.path[5:]
if node.role != 'leader':
self.send_json_response(503, {
'error': 'Not the leader',
'current_role': node.role,
'leader_id': node.leader_id or 'Unknown',
})
return
content_length = int(self.headers.get('Content-Length', 0))
body = self.rfile.read(content_length).decode('utf-8')
try:
value = json.loads(body)
node.set(key, value)
self.send_json_response(200, {'success': True, 'key': key, 'value': value, 'leader_id': node.node_id})
except json.JSONDecodeError:
self.send_json_response(400, {'error': 'Invalid JSON'})
return
self.send_json_response(404, {'error': 'Not found'})
def do_DELETE(self):
"""Gérer les requêtes DELETE."""
parsed = urlparse(self.path)
# DELETE /key/{key}
if parsed.path.startswith('/key/'):
key = parsed.path[5:]
if node.role != 'leader':
self.send_json_response(503, {
'error': 'Not the leader',
'current_role': node.role,
'leader_id': node.leader_id or 'Unknown',
})
return
existed = node.delete(key)
if existed:
self.send_json_response(200, {'success': True, 'key': key, 'leader_id': node.node_id})
else:
self.send_json_response(404, {'error': 'Key not found', 'key': key})
return
self.send_json_response(404, {'error': 'Not found'})
def log_message(self, format, *args):
"""Supprimer la journalisation par défaut."""
pass
def run_server(port: int):
"""Démarrer le serveur HTTP."""
server_address = ('', port)
httpd = HTTPServer(server_address, NodeHandler)
print(f"[{config['node_id']}] Store Node écoutant sur le port {port}")
print(f"[{config['node_id']}] Pairs : {', '.join(config['peers']) or 'none'}")
print(f"[{config['node_id']}] Points de terminaison disponibles :")
print(f" GET /status - Statut et rôle du nœud")
print(f" GET /key/{{key}} - Obtenir une valeur")
print(f" PUT /key/{{key}} - Définir une valeur (leader uniquement)")
print(f" DEL /key/{{key}} - Supprimer une clé (leader uniquement)")
httpd.serve_forever()
if __name__ == '__main__':
run_server(config['port'])
replicated-store-py/requirements.txt
# Pas de dépendances externes - utilise uniquement la bibliothèque standard
replicated-store-py/Dockerfile
FROM python:3.11-alpine
WORKDIR /app
COPY requirements.txt ./
RUN pip install --no-cache-dir -r requirements.txt
COPY . .
EXPOSE 4000
CMD ["python", "src/node.py"]
Configuration Docker Compose
Version TypeScript
examples/02-store/ts/docker-compose.yml
version: '3.8'
services:
node1:
build: .
container_name: store-ts-node1
ports:
- "4001:4000"
environment:
- NODE_ID=node-1
- PORT=4000
- PEERS=http://node2:4000,http://node3:4000
networks:
- store-network
node2:
build: .
container_name: store-ts-node2
ports:
- "4002:4000"
environment:
- NODE_ID=node-2
- PORT=4000
- PEERS=http://node1:4000,http://node3:4000
networks:
- store-network
node3:
build: .
container_name: store-ts-node3
ports:
- "4003:4000"
environment:
- NODE_ID=node-3
- PORT=4000
- PEERS=http://node1:4000,http://node2:4000
networks:
- store-network
networks:
store-network:
driver: bridge
Version Python
examples/02-store/py/docker-compose.yml
version: '3.8'
services:
node1:
build: .
container_name: store-py-node1
ports:
- "4001:4000"
environment:
- NODE_ID=node-1
- PORT=4000
- PEERS=http://node2:4000,http://node3:4000
networks:
- store-network
node2:
build: .
container_name: store-py-node2
ports:
- "4002:4000"
environment:
- NODE_ID=node-2
- PORT=4000
- PEERS=http://node1:4000,http://node3:4000
networks:
- store-network
node3:
build: .
container_name: store-py-node3
ports:
- "4003:4000"
environment:
- NODE_ID=node-3
- PORT=4000
- PEERS=http://node1:4000,http://node2:4000
networks:
- store-network
networks:
store-network:
driver: bridge
Exécution de l’Exemple
Étape 1 : Démarrer le Cluster à 3 Nœuds
TypeScript :
cd distributed-systems-course/examples/02-store/ts
docker-compose up --build
Python :
cd distributed-systems-course/examples/02-store/py
docker-compose up --build
Vous devriez voir l’élection de leader se produire automatiquement :
store-ts-node1 | [node-1] Store Node écoutant sur le port 4000
store-ts-node2 | [node-2] Store Node écoutant sur le port 4000
store-ts-node3 | [node-3] Store Node écoutant sur le port 4000
store-ts-node1 | [node-1] 👑 Devenu LEADER pour le terme 1
store-ts-node2 | [node-2] En attente de node-1 pour devenir leader
store-ts-node3 | [node-3] En attente de node-1 pour devenir leader
Étape 2 : Vérifier le Statut des Nœuds
# Vérifier tous les nœuds
curl http://localhost:4001/status
curl http://localhost:4002/status
curl http://localhost:4003/status
Réponse du node-1 (leader) :
{
"nodeId": "node-1",
"role": "leader",
"term": 1,
"leaderId": "node-1",
"totalKeys": 0,
"keys": []
}
Réponse du node-2 (suiveur) :
{
"nodeId": "node-2",
"role": "follower",
"term": 1,
"leaderId": "node-1",
"totalKeys": 0,
"keys": []
}
Étape 3 : Écrire au Leader
# Écrire au leader (node-1)
curl -X PUT http://localhost:4001/key/name \
-H "Content-Type: application/json" \
-d '"Alice"'
curl -X PUT http://localhost:4001/key/age \
-H "Content-Type: application/json" \
-d '30'
curl -X PUT http://localhost:4001/key/city \
-H "Content-Type: application/json" \
-d '"NYC"'
Réponse :
{
"success": true,
"key": "name",
"value": "Alice",
"leaderId": "node-1"
}
Étape 4 : Lire depuis les Suiveurs
Les données devraient être répliquées à tous les suiveurs :
curl http://localhost:4002/key/name
curl http://localhost:4003/key/city
Réponse :
{
"key": "name",
"value": "Alice",
"nodeRole": "follower"
}
Étape 5 : Essayer d’Écrire à un Suiveur (Devrait Échouer)
curl -X PUT http://localhost:4002/key/test \
-H "Content-Type: application/json" \
-d '"should fail"'
Réponse :
{
"error": "Not the leader",
"currentRole": "follower",
"leaderId": "node-1"
}
Étape 6 : Simuler une Panne de Leader
# Dans un terminal séparé, arrêter le leader
docker-compose stop node1
# Vérifier le statut de node-2 - devrait devenir le nouveau leader
curl http://localhost:4002/status
Après quelques secondes :
store-ts-node2 | [node-2] Election timeout ! Démarrage de l'élection...
store-ts-node2 | [node-2] 👑 Devenu LEADER pour le terme 2
store-ts-node3 | [node-3] En attente de node-2 pour devenir leader
Étape 7 : Écrire au Nouveau Leader
# Maintenant node-2 est le leader
curl -X PUT http://localhost:4002/key/newleader \
-H "Content-Type: application/json" \
-d '"node-2"'
Étape 8 : Redémarrer l’Ancien Leader
# Redémarrer node-1
docker-compose start node1
# Vérifier le statut - devrait devenir suiveur
curl http://localhost:4001/status
Réponse :
{
"nodeId": "node-1",
"role": "follower",
"term": 2,
"leaderId": "node-2",
...
}
Architecture du Système
graph TB
subgraph "Magasin Répliqué à 3 Nœuds"
Clients["Clients"]
N1["Node 1<br/>👑 Leader"]
N2["Node 2<br/>Suiveur"]
N3["Node 3<br/>Suiveur"]
Clients -->|"Write"| N1
Clients -->|"Read"| N1
Clients -->|"Read"| N2
Clients -->|"Read"| N3
N1 <-->|"Heartbeat<br/>Réplication"| N2
N1 <-->|"Heartbeat<br/>Réplication"| N3
end
style N1 fill:#6f6,stroke:#333,stroke-width:3px
Exercices
Exercice 1 : Tester la Tolérance aux Pannes
- Démarrer le cluster et écrire quelques données
- Arrêter différents nœuds un par un
- Vérifier que le système continue de fonctionner
- Que se passe-t-il lorsque vous arrêtez 2 nœuds sur 3 ?
Exercice 2 : Observer le Délai de Réplication
- Ajouter un petit délai (par ex. 100ms) à la réplication
- Écrire des données au leader
- Lire immédiatement depuis un suiveur
- Que voyez-vous ? Cela démontre la cohérence événementielle.
Exercice 3 : Améliorer l’Élection de Leader
L’élection actuelle est très simple. Essayez de l’améliorer :
- Ajouter des timeouts d’élection aléatoires (comme Raft)
- Implémenter un vrai vote (pas seulement le plus petit ID)
- Ajouter un pré-vote pour éviter de perturber le leader actuel
Résumé
Points Clés à Retenir
- La Réplication copie les données sur plusieurs nœuds pour la tolérance aux pannes
- La Réplication à leader unique est simple mais toutes les écritures passent par le leader
- L’élection de leader assure qu’un nouveau leader est choisi quand le leader actuel tombe en panne
- La Réplication asynchrone est rapide mais peut perdre des données en cas de panne du leader
- La Cohérence lecture-après-écriture n’est PAS garantie lors de la lecture depuis les suiveurs
Compromis
| Approche | Avantages | Inconvénients |
|---|---|---|
| Leader unique | Simple, cohérence forte | Le leader est un goulot d’étranglement, point de défaillance unique |
| Multi-leader | Pas de goulot d’étranglement, écritures n’importe où | Résolution de conflits complexe |
| Réplication synchrone | Aucune perte de données | Écritures lentes, bloquant |
| Réplication asynchrone | Écritures rapides | Perte de données possible, lectures périmées |
Vérifiez Votre Compréhension
- Pourquoi répliquons-nous les données ?
- Quelle est la différence entre leader et suiveur ?
- Que se passe-t-il lorsqu’un client essaie d’écrire à un suiveur ?
- Comment fonctionne l’élection de leader dans notre implémentation ?
- Quel est le compromis entre la réplication synchrone et asynchrone ?
🧠 Quiz du Chapitre
Testez votre maîtrise de ces concepts ! Ces questions mettront au défi votre compréhension et révéleront toute lacune dans vos connaissances.
Suite
Nous avons une réplication fonctionnelle, mais notre modèle de cohérence est basique. Explorons les niveaux de cohérence : Modèles de Cohérence (Session 5)
Modèles de Cohérence
Session 5 - Session complète
Objectifs d’Apprentissage
- Comprendre les différents modèles de cohérence dans les systèmes distribués
- Apprendre les compromis entre la cohérence forte et la cohérence événementielle
- Implémenter des niveaux de cohérence configurables dans un magasin répliqué
- Expérimenter les effets des niveaux de cohérence à travers des exercices pratiques
Qu’est-ce que la Cohérence ?
Dans un magasin répliqué, la cohérence définit les garanties que vous avez sur les données que vous lisez. Lorsque les données sont copiées sur plusieurs nœuds, vous ne verrez pas toujours l’écriture la plus récente immédiatement.
graph TB
subgraph "Une Écriture se Produit"
C[Client]
L[Leader]
L -->|Write "name = Alice"| L
end
subgraph "Mais Que Lisez-Vous ?"
F1[Suiveur 1<br/>name = Alice]
F2[Suiveur 2<br/>name = ???]
F3[Suiveur 3<br/>name = ???]
C -->|Read| F1
C -->|Read| F2
C -->|Read| F3
end
La question : Si vous lisez depuis un suiveur, verrez-vous “Alice” ou l’ancienne valeur ?
La réponse dépend de votre modèle de cohérence.
Spectre de Cohérence
Les modèles de cohérence existent sur un spectre du plus fort au plus faible :
graph LR
A[Cohérence<br/>Forte]
B[Read Your Writes]
C[Lectures Monotones]
D[Cohérence Causale]
E[Cohérence<br/>Événementielle]
A ====> B ====> C ====> D ====> E
style A fill:#6f6
style B fill:#9f6
style C fill:#cf6
style D fill:#ff6
style E fill:#f96
Cohérence Forte
Définition : Chaque lecture reçoit l’écriture la plus récente ou une erreur.
sequenceDiagram
participant C as Client
participant L as Leader
participant F as Suiveur
Note over C,F: Le temps s'écoule vers le bas
C->>L: SET name = "Alice"
L->>L: Écriture confirmée
Note over C,F: La cohérence forte nécessite :
Note over C,F: Attendre la réplication...
L->>F: Répliquer : name = "Alice"
F->>L: ACK
L->>C: Réponse : Success
C->>F: GET name
F->>C: "Alice" (toujours à jour !)
Caractéristiques :
- Les lecteurs voient toujours les données les plus récentes
- Aucune lecture périmée possible
- Performances plus lentes (doit attendre la réplication)
- Modèle mental simple
Quand l’utiliser : Transactions financières, gestion des stocks, opérations critiques
Cohérence Événementielle
Définition : Si aucune nouvelle mise à jour n’est faite, éventuellement tous les accès retourneront la dernière valeur mise à jour.
sequenceDiagram
participant C as Client
participant L as Leader
participant F1 as Suiveur 1
participant F2 as Suiveur 2
Note over C,F2: Le temps s'écoule vers le bas
C->>L: SET name = "Alice"
L->>C: Réponse : Success (immédiatement !)
Note over C,F2: Le leader n'a pas encore répliqué...
C->>F1: GET name
F1->>C: "Alice" (répliqué !)
C->>F2: GET name
F2->>C: "Bob" (périmé !)
Note over C,F2: Un moment plus tard...
L->>F2: Répliquer : name = "Alice"
C->>F2: GET name
F2->>C: "Alice" (mis à jour !)
Caractéristiques :
- Les lectures sont rapides (pas d’attente de réplication)
- Vous pouvez voir des données périmées
- Éventuellement, tous les nœuds convergent
- Modèle mental plus complexe
Quand l’utiliser : Flux de médias sociaux, recommandations de produits, analyses
Cohérence Read-Your-Writes
Un terrain d’entente : vous voyez toujours vos propres écritures, mais ne voyez pas forcément les écritures des autres immédiatement.
sequenceDiagram
participant C1 as Client 1
participant C2 as Client 2
participant L as Leader
participant F as Suiveur
C1->>L: SET name = "Alice"
L->>C1: Success
C1->>F: GET name
Note over C1,F: Read-your-writes:<br/>C1 voit "Alice"
F->>C1: "Alice"
C2->>F: GET name
Note over C2,F: Peut voir des données périmées
F->>C2: "Bob" (périmé !)
Le Théorème CAP Réexaminé
Vous avez appris CAP dans la Session 4. Relions-le à la cohérence :
| Combinaison | Modèle de Cohérence | Systèmes Exemple |
|---|---|---|
| CP | Cohérence forte | ZooKeeper, etcd, MongoDB (avec w:majority) |
| AP | Cohérence événementielle | Cassandra, DynamoDB, CouchDB |
| CA (impossible à grande échelle) | Cohérence forte | Bases de données à nœud unique (SGBDR) |
Cohérence Basée sur Quorum
Un moyen pratique de contrôler la cohérence est d’utiliser des quorums. Un quorum est une majorité de nœuds.
graph TB
subgraph "Cluster à 3 Nœuds"
N1[Nœud 1]
N2[Nœud 2]
N3[Nœud 3]
Q[Quorum = 2<br/>⌈3/2⌉ = 2]
end
N1 -.-> Q
N2 -.-> Q
N3 -.-> Q
style Q fill:#6f6,stroke:#333,stroke-width:3px
Quorum d’Écriture (W)
Nombre de nœuds qui doivent acquitter une écriture :
W > N/2 → Cohérence forte (majorité)
W = 1 → Rapide mais cohérence faible
W = N → La plus forte mais la plus lente
Quorum de Lecture (R)
Nombre de nœuds à interroger et comparer pour une lecture :
R + W > N → Cohérence forte garantie
R + W ≤ N → Cohérence événementielle
Niveaux de Cohérence
| R + W | Cohérence | Performance | Cas d’Usage |
|---|---|---|---|
| N + 1 > N (impossible) | La plus forte | Lente | Données critiques |
| R + W > N | Forte | Moyenne | Banque, stocks |
| R + W ≤ N | Événementielle | Rapide | Médias sociaux, cache |
Implémentation
Nous allons étendre notre magasin répliqué de la Session 4 pour supporter des niveaux de cohérence configurables.
Implémentation TypeScript
Structure du Projet :
consistent-store-ts/
├── package.json
├── tsconfig.json
├── Dockerfile
├── docker-compose.yml
└── src/
└── node.ts # Nœud avec cohérence configurable
consistent-store-ts/src/node.ts
import http from 'http';
/**
* Configuration du nœud
*/
const config = {
nodeId: process.env.NODE_ID || 'node-1',
port: parseInt(process.env.PORT || '4000'),
peers: (process.env.PEERS || '').split(',').filter(Boolean),
heartbeatInterval: 2000,
electionTimeout: 6000,
// Paramètres de cohérence
writeQuorum: parseInt(process.env.WRITE_QUORUM || '2'), // W
readQuorum: parseInt(process.env.READ_QUORUM || '1'), // R
};
type NodeRole = 'leader' | 'follower' | 'candidate';
type ConsistencyLevel = 'strong' | 'eventual' | 'read_your_writes';
/**
* Nœud de Magasin Répliqué avec Cohérence Configurable
*/
class StoreNode {
public nodeId: string;
public role: NodeRole;
public term: number;
public data: Map<string, any>;
public peers: string[];
private leaderId: string | null;
private lastHeartbeat: number;
private heartbeatTimer?: NodeJS.Timeout;
private electionTimer?: NodeJS.Timeout;
private pendingWrites: Map<string, any[]>; // Pour read-your-writes
constructor(nodeId: string, peers: string[]) {
this.nodeId = nodeId;
this.role = 'follower';
this.term = 0;
this.data = new Map();
this.peers = peers;
this.leaderId = null;
this.lastHeartbeat = Date.now();
this.pendingWrites = new Map();
this.startElectionTimer();
this.startHeartbeat();
}
/**
* Démarrer le timer de timeout d'élection
*/
private startElectionTimer() {
this.electionTimer = setTimeout(() => {
const timeSinceHeartbeat = Date.now() - this.lastHeartbeat;
if (timeSinceHeartbeat > config.electionTimeout && this.role !== 'leader') {
console.log(`[${this.nodeId}] Election timeout ! Démarrage de l'élection...`);
this.startElection();
}
this.startElectionTimer();
}, config.electionTimeout);
}
/**
* Démarrer l'élection de leader
*/
private startElection() {
this.term++;
this.role = 'candidate';
const allNodes = [this.nodeId, ...this.peers].sort();
const lowestNode = allNodes[0];
if (this.nodeId === lowestNode) {
this.becomeLeader();
} else {
this.role = 'follower';
this.leaderId = lowestNode;
console.log(`[${this.nodeId}] En attente de ${lowestNode} pour devenir leader`);
}
}
/**
* Devenir leader
*/
private becomeLeader() {
this.role = 'leader';
this.leaderId = this.nodeId;
console.log(`[${this.nodeId}] 👑 Devenu LEADER pour le terme ${this.term}`);
this.replicateToFollowers();
}
/**
* Démarrer le heartbeat vers les suiveurs
*/
private startHeartbeat() {
this.heartbeatTimer = setInterval(() => {
if (this.role === 'leader') {
this.sendHeartbeat();
}
}, config.heartbeatInterval);
}
/**
* Envoyer le heartbeat à tous les suiveurs
*/
private sendHeartbeat() {
const heartbeat = {
type: 'heartbeat',
leaderId: this.nodeId,
term: this.term,
timestamp: Date.now(),
};
this.peers.forEach(peerUrl => {
this.sendToPeer(peerUrl, '/internal/heartbeat', heartbeat)
.catch(err => console.log(`[${this.nodeId}] Échec du heartbeat vers ${peerUrl}`));
});
}
/**
* Répliquer les données aux suiveurs avec accusé de réception du quorum
*/
private async replicateToFollowers(): Promise<boolean> {
const dataObj = Object.fromEntries(this.data);
// Envoyer à tous les suiveurs en parallèle
const promises = this.peers.map(peerUrl =>
this.sendToPeer(peerUrl, '/internal/replicate', {
type: 'replicate',
leaderId: this.nodeId,
term: this.term,
data: dataObj,
}).catch(err => {
console.log(`[${this.nodeId}] Réplication échouée vers ${peerUrl}`);
return false;
})
);
// Attendre que tous se terminent
const results = await Promise.all(promises);
// Compter les succès (ce nœud compte comme 1)
const successes = results.filter(r => r !== false).length + 1;
// Vérifier si nous avons atteint le quorum d'écriture
const achievedQuorum = successes >= config.writeQuorum;
console.log(`[${this.nodeId}] Réplication : ${successes}/${this.peers.length + 1} nœuds (W=${config.writeQuorum})`);
return achievedQuorum;
}
/**
* Gérer le heartbeat du leader
*/
handleHeartbeat(heartbeat: any) {
if (heartbeat.term >= this.term) {
this.term = heartbeat.term;
this.lastHeartbeat = Date.now();
this.leaderId = heartbeat.leaderId;
if (this.role !== 'follower') {
this.role = 'follower';
}
}
}
/**
* Gérer la réplication du leader
*/
handleReplication(message: any) {
if (message.term >= this.term) {
this.term = message.term;
this.leaderId = message.leaderId;
this.role = 'follower';
this.lastHeartbeat = Date.now();
Object.entries(message.data).forEach(([key, value]) => {
this.data.set(key, value);
});
}
}
/**
* Envoyer des données à un nœud pair
*/
private async sendToPeer(peerUrl: string, path: string, data: any): Promise<void> {
return new Promise((resolve, reject) => {
const url = new URL(path, peerUrl);
const options = {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
};
const req = http.request(url, options, (res) => {
if (res.statusCode === 200) {
resolve();
} else {
reject(new Error(`Status ${res.statusCode}`));
}
});
req.on('error', reject);
req.write(JSON.stringify(data));
req.end();
});
}
/**
* Définir une paire clé-valeur avec accusé de réception du quorum
*/
async set(key: string, value: any): Promise<{ success: boolean; achievedQuorum: boolean }> {
if (this.role !== 'leader') {
return { success: false, achievedQuorum: false };
}
this.data.set(key, value);
console.log(`[${this.nodeId}] SET ${key} = ${JSON.stringify(value)}`);
// Répliquer aux suiveurs
const achievedQuorum = await this.replicateToFollowers();
return { success: true, achievedQuorum };
}
/**
* Obtenir une valeur avec cohérence configurable
*/
async get(key: string, consistency: ConsistencyLevel = 'eventual'): Promise<any> {
const localValue = this.data.get(key);
// Pour la cohérence événementielle, retourner la valeur locale immédiatement
if (consistency === 'eventual') {
console.log(`[${this.nodeId}] GET ${key} => ${JSON.stringify(localValue)} (événementielle)`);
return localValue;
}
// Pour la cohérence forte, interroger un quorum de nœuds
if (consistency === 'strong') {
const values = await this.getFromQuorum(key);
console.log(`[${this.nodeId}] GET ${key} => ${JSON.stringify(values.latest)} (forte depuis ${values.responses} nœuds)`);
return values.latest;
}
// Pour read-your-writes, vérifier les écritures en attente
if (consistency === 'read_your_writes') {
const pending = this.pendingWrites.get(key);
const valueToReturn = pending && pending.length > 0 ? pending[pending.length - 1] : localValue;
console.log(`[${this.nodeId}] GET ${key} => ${JSON.stringify(valueToReturn)} (read-your-writes)`);
return valueToReturn;
}
return localValue;
}
/**
* Interroger un quorum de nœuds et retourner la valeur la plus récente
*/
private async getFromQuorum(key: string): Promise<{ latest: any; responses: number }> {
// Interroger tous les pairs
const promises = this.peers.map(peerUrl =>
this.queryPeer(peerUrl, '/internal/get', { key })
.then(result => ({ success: true, value: result.value, version: result.version || 0 }))
.catch(err => {
console.log(`[${this.nodeId}] Query échouée vers ${peerUrl}`);
return { success: false, value: null, version: 0 };
})
);
const results = await Promise.all(promises);
// Ajouter la valeur locale
results.push({
success: true,
value: this.data.get(key),
version: this.data.has(key) ? 1 : 0,
});
// Compter les réponses réussies
const successful = results.filter(r => r.success);
// Retourner si nous avons le quorum de lecture
if (successful.length >= config.readQuorum) {
// Retourner la valeur la plus récente (version simple : première non-nulle)
const latest = successful.find(r => r.value !== undefined)?.value;
return { latest, responses: successful.length };
}
// Retour à la valeur locale
return { latest: this.data.get(key), responses: successful.length };
}
/**
* Interroger un pair pour une clé
*/
private async queryPeer(peerUrl: string, path: string, data: any): Promise<any> {
return new Promise((resolve, reject) => {
const url = new URL(path, peerUrl);
const options = {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
};
const req = http.request(url, options, (res) => {
let body = '';
res.on('data', chunk => body += chunk);
res.on('end', () => {
if (res.statusCode === 200) {
resolve(JSON.parse(body));
} else {
reject(new Error(`Status ${res.statusCode}`));
}
});
});
req.on('error', reject);
req.write(JSON.stringify(data));
req.end();
});
}
/**
* Supprimer une clé
*/
async delete(key: string): Promise<{ success: boolean; achievedQuorum: boolean }> {
if (this.role !== 'leader') {
return { success: false, achievedQuorum: false };
}
const existed = this.data.delete(key);
console.log(`[${this.nodeId}] DELETE ${key}`);
await this.replicateToFollowers();
return { success: existed, achievedQuorum: true };
}
/**
* Obtenir le statut du nœud
*/
getStatus() {
return {
nodeId: this.nodeId,
role: this.role,
term: this.term,
leaderId: this.leaderId,
totalKeys: this.data.size,
keys: Array.from(this.data.keys()),
config: {
writeQuorum: config.writeQuorum,
readQuorum: config.readQuorum,
totalNodes: this.peers.length + 1,
},
};
}
}
// Créer le nœud
const node = new StoreNode(config.nodeId, config.peers);
/**
* Serveur HTTP
*/
const server = http.createServer((req, res) => {
res.setHeader('Content-Type', 'application/json');
res.setHeader('Access-Control-Allow-Origin', '*');
res.setHeader('Access-Control-Allow-Methods', 'GET, POST, PUT, DELETE, OPTIONS');
res.setHeader('Access-Control-Allow-Headers', 'Content-Type');
if (req.method === 'OPTIONS') {
res.writeHead(200);
res.end();
return;
}
const url = new URL(req.url || '', `http://${req.headers.host}`);
// Route : POST /internal/heartbeat
if (req.method === 'POST' && url.pathname === '/internal/heartbeat') {
let body = '';
req.on('data', chunk => body += chunk);
req.on('end', () => {
try {
const heartbeat = JSON.parse(body);
node.handleHeartbeat(heartbeat);
res.writeHead(200);
res.end(JSON.stringify({ success: true }));
} catch (error) {
res.writeHead(400);
res.end(JSON.stringify({ error: 'Invalid request' }));
}
});
return;
}
// Route : POST /internal/replicate
if (req.method === 'POST' && url.pathname === '/internal/replicate') {
let body = '';
req.on('data', chunk => body += chunk);
req.on('end', () => {
try {
const message = JSON.parse(body);
node.handleReplication(message);
res.writeHead(200);
res.end(JSON.stringify({ success: true }));
} catch (error) {
res.writeHead(400);
res.end(JSON.stringify({ error: 'Invalid request' }));
}
});
return;
}
// Route : POST /internal/get - Requête interne pour les lectures de quorum
if (req.method === 'POST' && url.pathname === '/internal/get') {
let body = '';
req.on('data', chunk => body += chunk);
req.on('end', () => {
try {
const { key } = JSON.parse(body);
const value = node.data.get(key);
res.writeHead(200);
res.end(JSON.stringify({ value, version: value !== undefined ? 1 : 0 }));
} catch (error) {
res.writeHead(400);
res.end(JSON.stringify({ error: 'Invalid request' }));
}
});
return;
}
// Route : GET /status
if (req.method === 'GET' && url.pathname === '/status') {
res.writeHead(200);
res.end(JSON.stringify(node.getStatus()));
return;
}
// Route : GET /key/{key}?consistency=strong|eventual|read_your_writes
if (req.method === 'GET' && url.pathname.startsWith('/key/')) {
const key = url.pathname.slice(5);
const consistency = (url.searchParams.get('consistency') || 'eventual') as ConsistencyLevel;
node.get(key, consistency).then(value => {
if (value !== undefined) {
res.writeHead(200);
res.end(JSON.stringify({ key, value, nodeRole: node.role, consistency }));
} else {
res.writeHead(404);
res.end(JSON.stringify({ error: 'Key not found', key }));
}
});
return;
}
// Route : PUT /key/{key}
if (req.method === 'PUT' && url.pathname.startsWith('/key/')) {
const key = url.pathname.slice(5);
if (node.role !== 'leader') {
res.writeHead(503);
res.end(JSON.stringify({
error: 'Not the leader',
currentRole: node.role,
leaderId: node.leaderId || 'Unknown',
}));
return;
}
let body = '';
req.on('data', chunk => body += chunk);
req.on('end', () => {
try {
const value = JSON.parse(body);
node.set(key, value).then(result => {
res.writeHead(200);
res.end(JSON.stringify({
success: result.success,
key,
value,
leaderId: node.nodeId,
achievedQuorum: result.achievedQuorum,
writeQuorum: config.writeQuorum,
}));
});
} catch (error) {
res.writeHead(400);
res.end(JSON.stringify({ error: 'Invalid JSON' }));
}
});
return;
}
// Route : DELETE /key/{key}
if (req.method === 'DELETE' && url.pathname.startsWith('/key/')) {
const key = url.pathname.slice(5);
if (node.role !== 'leader') {
res.writeHead(503);
res.end(JSON.stringify({
error: 'Not the leader',
currentRole: node.role,
leaderId: node.leaderId || 'Unknown',
}));
return;
}
node.delete(key).then(result => {
if (result.success) {
res.writeHead(200);
res.end(JSON.stringify({ success: true, key, leaderId: node.nodeId }));
} else {
res.writeHead(404);
res.end(JSON.stringify({ error: 'Key not found', key }));
}
});
return;
}
// 404
res.writeHead(404);
res.end(JSON.stringify({ error: 'Not found' }));
});
server.listen(config.port, () => {
console.log(`[${config.nodeId}] Consistent Store écoutant sur le port ${config.port}`);
console.log(`[${config.nodeId}] Quorum d'Écriture (W) : ${config.writeQuorum}, Quorum de Lecture (R) : ${config.readQuorum}`);
console.log(`[${config.nodeId}] Pairs : ${config.peers.join(', ') || 'none'}`);
console.log(`[${config.nodeId}] Points de terminaison disponibles :`);
console.log(` GET /status - Statut du nœud`);
console.log(` GET /key/{key}?consistency=level - Obtenir avec niveau de cohérence`);
console.log(` PUT /key/{key} - Définir une valeur (leader uniquement)`);
console.log(` DEL /key/{key} - Supprimer une clé (leader uniquement)`);
});
consistent-store-ts/package.json
{
"name": "consistent-store-ts",
"version": "1.0.0",
"description": "Replicated key-value store with configurable consistency",
"main": "dist/node.js",
"scripts": {
"build": "tsc",
"start": "node dist/node.js",
"dev": "ts-node src/node.ts"
},
"dependencies": {},
"devDependencies": {
"@types/node": "^20.0.0",
"typescript": "^5.0.0",
"ts-node": "^10.9.0"
}
}
consistent-store-ts/tsconfig.json
{
"compilerOptions": {
"target": "ES2020",
"module": "commonjs",
"outDir": "./dist",
"rootDir": "./src",
"strict": true,
"esModuleInterop": true
},
"include": ["src/**/*"]
}
consistent-store-ts/Dockerfile
FROM node:18-alpine
WORKDIR /app
COPY package*.json ./
RUN npm install
COPY . .
RUN npm run build
EXPOSE 4000
CMD ["npm", "start"]
Implémentation Python
consistent-store-py/src/node.py
import os
import json
import time
import threading
import asyncio
from http.server import HTTPServer, BaseHTTPRequestHandler
from typing import Any, Dict, List, Optional, Literal
from urllib.parse import urlparse, parse_qs
from urllib.request import Request, urlopen
from urllib.error import URLError
ConsistencyLevel = Literal['strong', 'eventual', 'read_your_writes']
class StoreNode:
"""Nœud de magasin répliqué avec cohérence configurable."""
def __init__(self, node_id: str, peers: List[str]):
self.node_id = node_id
self.role: str = 'follower'
self.term = 0
self.data: Dict[str, Any] = {}
self.peers = peers
self.leader_id: Optional[str] = None
self.last_heartbeat = time.time()
self.pending_writes: Dict[str, List[Any]] = {}
# Configuration
self.heartbeat_interval = 2.0
self.election_timeout = 6.0
self.write_quorum = int(os.environ.get('WRITE_QUORUM', '2'))
self.read_quorum = int(os.environ.get('READ_QUORUM', '1'))
# Démarrer les timers
self.start_election_timer()
self.start_heartbeat_thread()
def start_election_timer(self):
"""Démarrer le timer de timeout d'élection."""
def election_timer():
while True:
time.sleep(1)
time_since = time.time() - self.last_heartbeat
if time_since > self.election_timeout and self.role != 'leader':
print(f"[{self.node_id}] Election timeout ! Démarrage de l'élection...")
self.start_election()
thread = threading.Thread(target=election_timer, daemon=True)
thread.start()
def start_election(self):
"""Démarrer l'élection de leader."""
self.term += 1
self.role = 'candidate'
all_nodes = sorted([self.node_id] + self.peers)
lowest_node = all_nodes[0]
if self.node_id == lowest_node:
self.become_leader()
else:
self.role = 'follower'
self.leader_id = lowest_node
print(f"[{self.node_id}] En attente de {lowest_node} pour devenir leader")
def become_leader(self):
"""Devenir leader."""
self.role = 'leader'
self.leader_id = self.node_id
print(f"[{self.node_id}] 👑 Devenu LEADER pour le terme {self.term}")
self.replicate_to_followers()
def start_heartbeat_thread(self):
"""Démarrer le heartbeat vers les suiveurs."""
def heartbeat_loop():
while True:
time.sleep(self.heartbeat_interval)
if self.role == 'leader':
self.send_heartbeat()
thread = threading.Thread(target=heartbeat_loop, daemon=True)
thread.start()
def send_heartbeat(self):
"""Envoyer le heartbeat à tous les suiveurs."""
heartbeat = {
'type': 'heartbeat',
'leader_id': self.node_id,
'term': self.term,
'timestamp': int(time.time() * 1000),
}
for peer in self.peers:
try:
self.send_to_peer(peer, '/internal/heartbeat', heartbeat)
except Exception as e:
print(f"[{self.node_id}] Échec du heartbeat vers {peer} : {e}")
def replicate_to_followers(self) -> bool:
"""Répliquer les données aux suiveurs et vérifier le quorum."""
message = {
'type': 'replicate',
'leader_id': self.node_id,
'term': self.term,
'data': self.data,
}
successes = 1 # Ce nœud compte
for peer in self.peers:
try:
self.send_to_peer(peer, '/internal/replicate', message)
successes += 1
except Exception as e:
print(f"[{self.node_id}] Réplication échouée vers {peer} : {e}")
achieved_quorum = successes >= self.write_quorum
print(f"[{self.node_id}] Réplication : {successes}/{len(self.peers) + 1} nœuds (W={self.write_quorum})")
return achieved_quorum
def handle_heartbeat(self, heartbeat: dict):
"""Gérer le heartbeat du leader."""
if heartbeat['term'] >= self.term:
self.term = heartbeat['term']
self.last_heartbeat = time.time()
self.leader_id = heartbeat['leader_id']
if self.role != 'follower':
self.role = 'follower'
def handle_replication(self, message: dict):
"""Gérer la réplication du leader."""
if message['term'] >= self.term:
self.term = message['term']
self.leader_id = message['leader_id']
self.role = 'follower'
self.last_heartbeat = time.time()
self.data.update(message['data'])
def send_to_peer(self, peer_url: str, path: str, data: dict) -> None:
"""Envoyer des données à un nœud pair."""
url = f"{peer_url}{path}"
body = json.dumps(data).encode('utf-8')
req = Request(url, data=body, headers={'Content-Type': 'application/json'}, method='POST')
with urlopen(req, timeout=1) as response:
if response.status != 200:
raise Exception(f"Status {response.status}")
def set(self, key: str, value: Any) -> Dict[str, Any]:
"""Définir une paire clé-valeur avec accusé de réception du quorum."""
if self.role != 'leader':
return {'success': False, 'achieved_quorum': False}
self.data[key] = value
print(f"[{self.node_id}] SET {key} = {json.dumps(value)}")
achieved_quorum = self.replicate_to_followers()
return {'success': True, 'achieved_quorum': achieved_quorum}
def get(self, key: str, consistency: ConsistencyLevel = 'eventual') -> Any:
"""Obtenir une valeur avec cohérence configurable."""
local_value = self.data.get(key)
if consistency == 'eventual':
print(f"[{self.node_id}] GET {key} => {json.dumps(local_value)} (événementielle)")
return local_value
if consistency == 'strong':
latest, responses = self.get_from_quorum(key)
print(f"[{self.node_id}] GET {key} => {json.dumps(latest)} (forte depuis {responses} nœuds)")
return latest
if consistency == 'read_your_writes':
pending = self.pending_writes.get(key, [])
value_to_return = pending[-1] if pending else local_value
print(f"[{self.node_id}] GET {key} => {json.dumps(value_to_return)} (read-your-writes)")
return value_to_return
return local_value
def get_from_quorum(self, key: str) -> tuple:
"""Interroger un quorum de nœuds et retourner la valeur la plus récente."""
results = []
# Interroger tous les pairs
for peer in self.peers:
try:
result = self.query_peer(peer, '/internal/get', {'key': key})
results.append({
'success': True,
'value': result.get('value'),
'version': result.get('version', 0),
})
except Exception as e:
print(f"[{self.node_id}] Query échouée vers {peer} : {e}")
results.append({'success': False, 'value': None, 'version': 0})
# Ajouter la valeur locale
results.append({
'success': True,
'value': self.data.get(key),
'version': 1 if key in self.data else 0,
})
# Filtrer les réponses réussies
successful = [r for r in results if r['success']]
if len(successful) >= self.read_quorum:
# Retourner la première valeur non-nulle
for r in successful:
if r['value'] is not None:
return r['value'], len(successful)
return self.data.get(key), len(successful)
def query_peer(self, peer_url: str, path: str, data: dict) -> dict:
"""Interroger un pair pour une clé."""
url = f"{peer_url}{path}"
body = json.dumps(data).encode('utf-8')
req = Request(url, data=body, headers={'Content-Type': 'application/json'}, method='POST')
with urlopen(req, timeout=1) as response:
if response.status == 200:
return json.loads(response.read().decode('utf-8'))
raise Exception(f"Status {response.status}")
def delete(self, key: str) -> Dict[str, Any]:
"""Supprimer une clé."""
if self.role != 'leader':
return {'success': False, 'achieved_quorum': False}
existed = key in self.data
if existed:
del self.data[key]
print(f"[{self.node_id}] DELETE {key}")
self.replicate_to_followers()
return {'success': existed, 'achieved_quorum': True}
def get_status(self) -> dict:
"""Obtenir le statut du nœud."""
return {
'node_id': self.node_id,
'role': self.role,
'term': self.term,
'leader_id': self.leader_id,
'total_keys': len(self.data),
'keys': list(self.data.keys()),
'config': {
'write_quorum': self.write_quorum,
'read_quorum': self.read_quorum,
'total_nodes': len(self.peers) + 1,
},
}
# Créer le nœud
config = {
'node_id': os.environ.get('NODE_ID', 'node-1'),
'port': int(os.environ.get('PORT', '4000')),
'peers': [p for p in os.environ.get('PEERS', '').split(',') if p],
}
node = StoreNode(config['node_id'], config['peers'])
class NodeHandler(BaseHTTPRequestHandler):
"""Gestionnaire de requêtes HTTP pour le nœud de magasin."""
def send_json_response(self, status: int, data: dict):
"""Envoyer une réponse JSON."""
self.send_response(status)
self.send_header('Content-Type', 'application/json')
self.send_header('Access-Control-Allow-Origin', '*')
self.end_headers()
self.wfile.write(json.dumps(data).encode())
def do_OPTIONS(self):
"""Gérer le pré-vol CORS."""
self.send_response(200)
self.send_header('Access-Control-Allow-Origin', '*')
self.send_header('Access-Control-Allow-Methods', 'GET, POST, PUT, DELETE, OPTIONS')
self.send_header('Access-Control-Allow-Headers', 'Content-Type')
self.end_headers()
def do_POST(self):
"""Gérer les requêtes POST."""
parsed = urlparse(self.path)
if parsed.path == '/internal/heartbeat':
content_length = int(self.headers.get('Content-Length', 0))
body = self.rfile.read(content_length).decode('utf-8')
try:
heartbeat = json.loads(body)
node.handle_heartbeat(heartbeat)
self.send_json_response(200, {'success': True})
except (json.JSONDecodeError, KeyError):
self.send_json_response(400, {'error': 'Invalid request'})
return
if parsed.path == '/internal/replicate':
content_length = int(self.headers.get('Content-Length', 0))
body = self.rfile.read(content_length).decode('utf-8')
try:
message = json.loads(body)
node.handle_replication(message)
self.send_json_response(200, {'success': True})
except (json.JSONDecodeError, KeyError):
self.send_json_response(400, {'error': 'Invalid request'})
return
if parsed.path == '/internal/get':
content_length = int(self.headers.get('Content-Length', 0))
body = self.rfile.read(content_length).decode('utf-8')
try:
data = json.loads(body)
key = data.get('key')
value = node.data.get(key)
self.send_json_response(200, {'value': value, 'version': 1 if value is not None else 0})
except (json.JSONDecodeError, KeyError):
self.send_json_response(400, {'error': 'Invalid request'})
return
self.send_json_response(404, {'error': 'Not found'})
def do_GET(self):
"""Gérer les requêtes GET."""
parsed = urlparse(self.path)
if parsed.path == '/status':
self.send_json_response(200, node.get_status())
return
if parsed.path.startswith('/key/'):
key = parsed.path[5:]
consistency = parsed.query.split('=')[-1] if '=' in parsed.query else 'eventual'
if consistency not in ['strong', 'eventual', 'read_your_writes']:
consistency = 'eventual'
value = node.get(key, consistency)
if value is not None:
self.send_json_response(200, {
'key': key,
'value': value,
'node_role': node.role,
'consistency': consistency,
})
else:
self.send_json_response(404, {'error': 'Key not found', 'key': key})
return
self.send_json_response(404, {'error': 'Not found'})
def do_PUT(self):
"""Gérer les requêtes PUT."""
parsed = urlparse(self.path)
if parsed.path.startswith('/key/'):
key = parsed.path[5:]
if node.role != 'leader':
self.send_json_response(503, {
'error': 'Not the leader',
'current_role': node.role,
'leader_id': node.leader_id or 'Unknown',
})
return
content_length = int(self.headers.get('Content-Length', 0))
body = self.rfile.read(content_length).decode('utf-8')
try:
value = json.loads(body)
result = node.set(key, value)
self.send_json_response(200, {
'success': result['success'],
'key': key,
'value': value,
'leader_id': node.node_id,
'achieved_quorum': result['achieved_quorum'],
'write_quorum': node.write_quorum,
})
except json.JSONDecodeError:
self.send_json_response(400, {'error': 'Invalid JSON'})
return
self.send_json_response(404, {'error': 'Not found'})
def do_DELETE(self):
"""Gérer les requêtes DELETE."""
parsed = urlparse(self.path)
if parsed.path.startswith('/key/'):
key = parsed.path[5:]
if node.role != 'leader':
self.send_json_response(503, {
'error': 'Not the leader',
'current_role': node.role,
'leader_id': node.leader_id or 'Unknown',
})
return
result = node.delete(key)
if result['success']:
self.send_json_response(200, {'success': True, 'key': key, 'leader_id': node.node_id})
else:
self.send_json_response(404, {'error': 'Key not found', 'key': key})
return
self.send_json_response(404, {'error': 'Not found'})
def log_message(self, format, *args):
"""Supprimer la journalisation par défaut."""
pass
def run_server(port: int):
"""Démarrer le serveur HTTP."""
server_address = ('', port)
httpd = HTTPServer(server_address, NodeHandler)
print(f"[{config['node_id']}] Consistent Store écoutant sur le port {port}")
print(f"[{config['node_id']}] Quorum d'Écriture (W) : {node.write_quorum}, Quorum de Lecture (R) : {node.read_quorum}")
print(f"[{config['node_id']}] Pairs : {', '.join(config['peers']) or 'none'}")
print(f"[{config['node_id']}] Points de terminaison disponibles :")
print(f" GET /status - Statut du nœud")
print(f" GET /key/{{key}}?consistency=level - Obtenir avec niveau de cohérence")
print(f" PUT /key/{{key}} - Définir une valeur (leader uniquement)")
print(f" DEL /key/{{key}} - Supprimer une clé (leader uniquement)")
httpd.serve_forever()
if __name__ == '__main__':
run_server(config['port'])
consistent-store-py/requirements.txt
# Pas de dépendances externes - utilise uniquement la bibliothèque standard
consistent-store-py/Dockerfile
FROM python:3.11-alpine
WORKDIR /app
COPY requirements.txt ./
RUN pip install --no-cache-dir -r requirements.txt
COPY . .
EXPOSE 4000
CMD ["python", "src/node.py"]
Configuration Docker Compose
Version TypeScript
examples/03-consistent-store/ts/docker-compose.yml
version: '3.8'
services:
node1:
build: .
container_name: consistent-ts-node1
ports:
- "4001:4000"
environment:
- NODE_ID=node-1
- PORT=4000
- PEERS=http://node2:4000,http://node3:4000
- WRITE_QUORUM=2
- READ_QUORUM=1
networks:
- consistent-network
node2:
build: .
container_name: consistent-ts-node2
ports:
- "4002:4000"
environment:
- NODE_ID=node-2
- PORT=4000
- PEERS=http://node1:4000,http://node3:4000
- WRITE_QUORUM=2
- READ_QUORUM=1
networks:
- consistent-network
node3:
build: .
container_name: consistent-ts-node3
ports:
- "4003:4000"
environment:
- NODE_ID=node-3
- PORT=4000
- PEERS=http://node1:4000,http://node2:4000
- WRITE_QUORUM=2
- READ_QUORUM=1
networks:
- consistent-network
networks:
consistent-network:
driver: bridge
Version Python
examples/03-consistent-store/py/docker-compose.yml
version: '3.8'
services:
node1:
build: .
container_name: consistent-py-node1
ports:
- "4001:4000"
environment:
- NODE_ID=node-1
- PORT=4000
- PEERS=http://node2:4000,http://node3:4000
- WRITE_QUORUM=2
- READ_QUORUM=1
networks:
- consistent-network
node2:
build: .
container_name: consistent-py-node2
ports:
- "4002:4000"
environment:
- NODE_ID=node-2
- PORT=4000
- PEERS=http://node1:4000,http://node3:4000
- WRITE_QUORUM=2
- READ_QUORUM=1
networks:
- consistent-network
node3:
build: .
container_name: consistent-py-node3
ports:
- "4003:4000"
environment:
- NODE_ID=node-3
- PORT=4000
- PEERS=http://node1:4000,http://node2:4000
- WRITE_QUORUM=2
- READ_QUORUM=1
networks:
- consistent-network
networks:
consistent-network:
driver: bridge
Exécution de l’Exemple
Étape 1 : Démarrer le Cluster
TypeScript :
cd distributed-systems-course/examples/03-consistent-store/ts
docker-compose up --build
Python :
cd distributed-systems-course/examples/03-consistent-store/py
docker-compose up --build
Vous devriez voir :
consistent-ts-node1 | [node-1] 👑 Devenu LEADER pour le terme 1
consistent-ts-node1 | [node-1] Quorum d'Écriture (W) : 2, Quorum de Lecture (R) : 1
consistent-ts-node2 | [node-2] En attente de node-1 pour devenir leader
consistent-ts-node3 | [node-3] En attente de node-1 pour devenir leader
Étape 2 : Tester la Cohérence Événementielle (Défaut)
# Écrire au leader
curl -X PUT http://localhost:4001/key/name \
-H "Content-Type: application/json" \
-d '"Alice"'
# Lire immédiatement depuis le suiveur (cohérence événementielle)
curl http://localhost:4002/key/name
Vous pourriez voir :
- Immédiatement après l’écriture :
null(le suiveur n’a pas encore reçu la réplication) - Un moment plus tard :
"Alice"(le suiveur a récupéré)
Étape 3 : Tester la Cohérence Forte
# Lire avec cohérence forte (attend le quorum)
curl "http://localhost:4002/key/name?consistency=strong"
Cela interroge plusieurs nœuds et retourne la valeur confirmée la plus récente.
Étape 4 : Observer le Comportement du Quorum
Vérifiez le statut pour voir vos paramètres de quorum :
curl http://localhost:4001/status
Réponse :
{
"nodeId": "node-1",
"role": "leader",
"config": {
"writeQuorum": 2,
"readQuorum": 1,
"totalNodes": 3
}
}
Étape 5 : Tester Différents Paramètres de Quorum
Arrêtez docker-compose et modifiez les variables d’environnement :
Essayer W=3 (Le plus fort) :
environment:
- WRITE_QUORUM=3
- READ_QUORUM=1
Essayer W=1 (Le plus faible) :
environment:
- WRITE_QUORUM=1
- READ_QUORUM=1
Observez comment le système se comporte différemment avec chaque paramètre.
Comparaison de Cohérence
graph TB
subgraph "Mêmes Données, Différents Niveaux de Cohérence"
W[Write : name = Alice]
subgraph "Cohérence Forte<br/>Lente mais Précise"
S1[Nœud 1 : Alice]
S2[Nœud 2 : Alice]
S3[Nœud 3 : Alice]
R1[Lire → Alice]
end
subgraph "Cohérence Événementielle<br/>Rapide mais Possiblement Périmée"
E1[Nœud 1 : Alice]
E2[Nœud 2 : Bob]
E3[Nœud 3 : ???]
R2[Lire → Bob ou ???]
end
end
W --> S1
W --> S2
W --> S3
W --> E1
W -.->|retardé| E2
W -.->|retardé| E3
style R1 fill:#6f6
style R2 fill:#f96
Exercices
Exercice 1 : Expérimenter la Cohérence Événementielle
- Démarrer le cluster
- Écrire une valeur au leader
- Immédiatement lire depuis un suiveur (dans les 100ms)
- Que voyez-vous ? Est-ce la nouvelle valeur ou l’ancienne ?
Exercice 2 : Comparer les Niveaux de Cohérence
Écrivez un script qui :
- Définit une clé à une nouvelle valeur
- Lit immédiatement avec
consistency=eventual - Lit immédiatement avec
consistency=strong - Compare les résultats
Exercice 3 : Ajuster le Quorum pour Différents Cas d’Usage
Pour chaque scénario, quels paramètres de quorum choisiriez-vous ?
| Scénario | W (Écriture) | R (Lecture) | R + W | Cohérence | Pourquoi ? |
|---|---|---|---|---|---|
| Transfert de solde bancaire | ? | ? | ? | ? | |
| J’aime sur les médias sociaux | ? | ? | ? | ? | |
| Panier d’achat | ? | ? | ? | ? | |
| Vue du profil utilisateur | ? | ? | ? | ? |
Exercice 4 : Implémenter la Réparation de Lecture
Lorsqu’une lecture périmée est détectée, mettre à jour le nœud périmé avec la valeur la plus récente. Indice : Dans la lecture forte, si vous trouvez une valeur plus récente sur un nœud, envoyez-la aux nœuds avec des valeurs plus anciennes.
Résumé
Points Clés à Retenir
- La cohérence est un spectre de la forte à l’événementielle
- Cohérence forte = toujours voir les données les plus récentes, mais plus lent
- Cohérence événementielle = lectures rapides, mais peut voir des données périmées
- Configuration du quorum (W + R) contrôle le niveau de cohérence :
R + W > N→ Cohérence forteR + W ≤ N→ Cohérence événementielle
- Compromis : Vous ne pouvez pas avoir à la fois la cohérence forte ET la haute disponibilité (théorème CAP)
Arbre de Décision de Cohérence
Besoin de lire les données les plus récentes immédiatement ?
├─ Oui → Utiliser la cohérence forte (R + W > N)
│ └─ Accepter des performances plus lentes
└─ Non → Utiliser la cohérence événementielle (R + W ≤ N)
└─ Obtenir des lectures plus rapides, accepter un certain péremé
Exemples du Monde Réel
| Système | Cohérence par Défaut | Configurable ? |
|---|---|---|
| DynamoDB | Cohérence événementielle | Oui (paramètre ConsistentRead) |
| Cassandra | Cohérence événementielle | Oui (niveau CONSISTENCY) |
| MongoDB | Forte (w:majority) | Oui (writeConcern, readConcern) |
| CouchDB | Cohérence événementielle | Oui (paramètres r, w) |
| etcd | Forte | Non (toujours forte) |
Vérifiez Votre Compréhension
- Quelle est la différence entre la cohérence forte et événementielle ?
- Comment la configuration du quorum (R, W) affecte-t-elle la cohérence ?
- Quand choisiriez-vous la cohérence événementielle plutôt que forte ?
- Que garantit
R + W > N? - Pourquoi ne pouvons-nous pas avoir à la fois la cohérence forte et la haute disponibilité pendant les partitions ?
🧠 Quiz du Chapitre
Testez votre maîtrise de ces concepts ! Ces questions mettront au défi votre compréhension et révéleront toute lacune dans vos connaissances.
Suite
Nous avons construit un magasin répliqué avec cohérence configurable. Ajoutons maintenant la communication en temps réel : WebSockets (Session 6)
WebSockets
Session 6, Partie 1 - 20 minutes
Objectifs d’apprentissage
- Comprendre le protocole WebSocket et ses avantages par rapport à HTTP
- Apprendre le cycle de vie d’une connexion WebSocket
- Implémenter des serveurs et clients WebSocket en TypeScript et Python
- Gérer la gestion des connexions et les scénarios d’erreur
Introduction
Dans les sessions précédentes, nous avons construit des systèmes utilisant HTTP - un protocole requête-réponse. Le client demande, le serveur répond. Mais que se passe-t-il si nous avons besoin d’une communication en temps réel, bidirectionnelle ?
Voici les WebSockets : un protocole qui permet la communication full-duplex sur une seule connexion TCP.
sequenceDiagram
participant Client
participant Server
Note over Client,Server: HTTP Requête-Réponse (Traditionnel)
Client->>Server: GET /data
Server-->>Client: Response
Client->>Server: GET /data
Server-->>Client: Response
Note over Client,Server: WebSocket (Temps Réel)
Client->>Server: HTTP Upgrade Request
Server-->>Client: 101 Switching Protocols
Client->>Server: Message 1
Server-->>Client: Message 2
Client->>Server: Message 3
Server-->>Client: Message 4
Client->>Server: Message 5
WebSocket vs HTTP
| Aspect | HTTP | WebSocket |
|---|---|---|
| Communication | Half-duplex (requête-réponse) | Full-duplex (bidirectionnelle) |
| Connexion | Nouvelle connexion par requête | Connexion persistante |
| Latence | Plus élevée (surcharge HTTP) | Plus faible (trames, non paquets) |
| État | Sans état (stateless) | Connexion avec état (stateful) |
| Push serveur | Nécessite polling/SSE | Support natif du push |
Quand utiliser les WebSockets
Idéal pour :
- Les applications de chat
- La collaboration en temps réel (édition, jeux)
- Les tableaux de bord et monitoring en direct
- Les jeux multijoueurs
Pas idéal pour :
- Les opérations CRUD simples (utiliser REST)
- La récupération de données unique
- L’accès aux ressources sans état
Le protocole WebSocket
Poignée de main (Handshake)
Les WebSockets commencent par HTTP, puis effectuent une mise à niveau (upgrade) vers le protocole WebSocket :
stateDiagram-v2
[*] --> HTTP: Client envoie requête HTTP
HTTP --> Handshake: Serveur reçoit
Handshake --> WebSocket: 101 Switching Protocols
WebSocket --> Connected: Full-duplex établi
Connected --> Messaging: Envoyer/recevoir trames
Messaging --> Closing: Trame de fermeture envoyée
Closing --> [*]: Connexion terminée
Requête HTTP (mise à niveau) :
GET /chat HTTP/1.1
Host: server.example.com
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ==
Sec-WebSocket-Version: 13
Réponse HTTP (acceptation) :
HTTP/1.1 101 Switching Protocols
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Accept: s3pPLMBiTxaQ9kYGzzhZRbK+xOo=
Structure des trames
Les messages WebSocket sont envoyés sous forme de trames, non de paquets HTTP :
+--------+--------+--------+--------+ +--------+
| FIN | RSV1-3 | Opcode | Mask | ... | Payload|
| 1 bit | 3 bits | 4 bits | 1 bit | | |
+--------+--------+--------+--------+ +--------+
Opcodes courants :
- 0x1: Trame de texte
- 0x2: Trame binaire
- 0x8: Fermer la connexion
- 0x9: Ping
- 0xA: Pong
Cycle de vie WebSocket
stateDiagram-v2
[*] --> Connecting: ws://localhost:8080
Connecting --> Open: Handshake terminé (101)
Open --> Message: Envoyer/Recevoir des données
Message --> Open: Continuer
Open --> Closing: Fermeture normale ou erreur
Closing --> Closed: Connexion TCP terminée
Closed --> [*]
note right of Connecting
Le client envoie HTTP Upgrade
Le serveur répond avec 101
end note
note right of Message
Messagerie full-duplex
Aucune surcharge par message
end note
note right of Closing
Échange de trames de fermeture
Arrêt gracieux
end note
Implémentation : TypeScript
Nous utiliserons la bibliothèque ws - le standard de facto pour WebSockets dans Node.js.
Implémentation du serveur
// examples/03-chat/ts/ws-server.ts
import { WebSocketServer, WebSocket } from 'ws';
interface ChatMessage {
type: 'message' | 'join' | 'leave';
username: string;
content: string;
timestamp: number;
}
const wss = new WebSocketServer({ port: 8080 });
const clients = new Map<WebSocket, string>();
console.log('WebSocket server running on ws://localhost:8080');
wss.on('connection', (ws: WebSocket) => {
console.log('New client connected');
// Message de bienvenue
ws.send(JSON.stringify({
type: 'message',
username: 'System',
content: 'Welcome! Please identify yourself.',
timestamp: Date.now()
} as ChatMessage));
// Gérer les messages entrants
ws.on('message', (data: Buffer) => {
try {
const message: ChatMessage = JSON.parse(data.toString());
if (message.type === 'join') {
// Enregistrer le nom d'utilisateur
clients.set(ws, message.username);
console.log(`${message.username} joined`);
// Diffuser à tous les clients
broadcast({
type: 'message',
username: 'System',
content: `${message.username} has joined the chat`,
timestamp: Date.now()
});
} else if (message.type === 'message') {
const username = clients.get(ws) || 'Anonymous';
console.log(`${username}: ${message.content}`);
// Diffuser le message
broadcast({
type: 'message',
username,
content: message.content,
timestamp: Date.now()
});
}
} catch (error) {
console.error('Invalid message:', error);
}
});
// Gérer la déconnexion
ws.on('close', () => {
const username = clients.get(ws);
if (username) {
console.log(`${username} disconnected`);
clients.delete(ws);
broadcast({
type: 'message',
username: 'System',
content: `${username} has left the chat`,
timestamp: Date.now()
});
}
});
// Gérer les erreurs
ws.on('error', (error) => {
console.error('WebSocket error:', error);
});
});
function broadcast(message: ChatMessage): void {
const data = JSON.stringify(message);
wss.clients.forEach((client) => {
if (client.readyState === WebSocket.OPEN) {
client.send(data);
}
});
}
// Heartbeat pour détecter les connexions obsolètes
const interval = setInterval(() => {
wss.clients.forEach((ws) => {
if (ws.isAlive === false) {
return ws.terminate();
}
ws.isAlive = false;
ws.ping();
});
}, 30000);
wss.on('close', () => {
clearInterval(interval);
});
Implémentation du client
// examples/03-chat/ts/ws-client.ts
import { WebSocket } from 'ws';
interface ChatMessage {
type: 'message' | 'join' | 'leave';
username: string;
content: string;
timestamp: number;
}
class ChatClient {
private ws: WebSocket;
private username: string;
private reconnectAttempts = 0;
private readonly maxReconnectAttempts = 5;
constructor(url: string, username: string) {
this.username = username;
this.ws = this.connect(url);
}
private connect(url: string): WebSocket {
const ws = new WebSocket(url);
ws.on('open', () => {
console.log('Connected to chat server');
this.reconnectAttempts = 0;
// Nous identifier
this.send({
type: 'join',
username: this.username,
content: '',
timestamp: Date.now()
});
});
ws.on('message', (data: Buffer) => {
const message: ChatMessage = JSON.parse(data.toString());
this.displayMessage(message);
});
ws.on('close', () => {
console.log('Disconnected from server');
// Tenter la reconnexion
if (this.reconnectAttempts < this.maxReconnectAttempts) {
this.reconnectAttempts++;
const delay = Math.min(1000 * Math.pow(2, this.reconnectAttempts), 30000);
console.log(`Reconnecting in ${delay}ms... (attempt ${this.reconnectAttempts})`);
setTimeout(() => {
this.ws = this.connect(url);
}, delay);
}
});
ws.on('error', (error) => {
console.error('WebSocket error:', error.message);
});
// Répondre aux pings
ws.on('ping', () => {
ws.pong();
});
return ws;
}
public send(message: ChatMessage): void {
if (this.ws.readyState === WebSocket.OPEN) {
this.ws.send(JSON.stringify(message));
} else {
console.error('Cannot send message: connection not open');
}
}
public sendMessage(content: string): void {
this.send({
type: 'message',
username: this.username,
content,
timestamp: Date.now()
});
}
private displayMessage(message: ChatMessage): void {
const time = new Date(message.timestamp).toLocaleTimeString();
console.log(`[${time}] ${message.username}: ${message.content}`);
}
public close(): void {
this.ws.close();
}
}
// Interface CLI
const username = process.argv[2] || `User${Math.floor(Math.random() * 1000)}`;
const client = new ChatClient('ws://localhost:8080', username);
console.log(`You are logged in as: ${username}`);
console.log('Type a message and press Enter to send. Press Ctrl+C to exit.');
// Lire depuis stdin
process.stdin.setEncoding('utf8');
process.stdin.on('data', (chunk: Buffer) => {
const text = chunk.toString().trim();
if (text) {
client.sendMessage(text);
}
});
// Gérer l'arrêt gracieux
process.on('SIGINT', () => {
console.log('\nShutting down...');
client.close();
process.exit(0);
});
Configuration du package
// examples/03-chat/ts/package.json
{
"name": "chat-websocket-example",
"version": "1.0.0",
"type": "module",
"scripts": {
"server": "node --loader ts-node/esm ws-server.ts",
"client": "node --loader ts-node/esm ws-client.ts"
},
"dependencies": {
"ws": "^8.18.0"
},
"devDependencies": {
"@types/ws": "^8.5.12",
"ts-node": "^10.9.2",
"typescript": "^5.6.3"
}
}
Implémentation : Python
Nous utiliserons la bibliothèque websockets - une implémentation WebSocket entièrement conforme.
Implémentation du serveur
# examples/03-chat/py/ws_server.py
import asyncio
import json
import logging
from datetime import datetime
from typing import Set
import websockets
from websockets.server import WebSocketServerProtocol
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Suivre les clients connectés
clients: Set[WebSocketServerProtocol] = set()
usernames: dict[WebSocketServerProtocol, str] = {}
async def broadcast(message: dict) -> None:
"""Envoyer un message à tous les clients connectés."""
if clients:
await asyncio.gather(
*[client.send(json.dumps(message)) for client in clients if client.open],
return_exceptions=True
)
async def handle_client(websocket: WebSocketServerProtocol) -> None:
"""Gérer une connexion client."""
clients.add(websocket)
logger.info(f"New client connected. Total clients: {len(clients)}")
try:
# Envoyer un message de bienvenue
welcome_msg = {
"type": "message",
"username": "System",
"content": "Welcome! Please identify yourself.",
"timestamp": datetime.now().timestamp()
}
await websocket.send(json.dumps(welcome_msg))
# Gérer les messages
async for message in websocket:
try:
data = json.loads(message)
if data.get("type") == "join":
# Enregistrer le nom d'utilisateur
username = data.get("username", "Anonymous")
usernames[websocket] = username
logger.info(f"{username} joined")
# Diffuser la notification de rejoindre
await broadcast({
"type": "message",
"username": "System",
"content": f"{username} has joined the chat",
"timestamp": datetime.now().timestamp()
})
elif data.get("type") == "message":
# Diffuser le message
username = usernames.get(websocket, "Anonymous")
content = data.get("content", "")
logger.info(f"{username}: {content}")
await broadcast({
"type": "message",
"username": username,
"content": content,
"timestamp": datetime.now().timestamp()
})
except json.JSONDecodeError:
logger.error("Invalid JSON received")
except Exception as e:
logger.error(f"Error handling message: {e}")
except websockets.exceptions.ConnectionClosed:
logger.info("Client disconnected unexpectedly")
finally:
# Nettoyage
username = usernames.get(websocket)
if username:
del usernames[websocket]
await broadcast({
"type": "message",
"username": "System",
"content": f"{username} has left the chat",
"timestamp": datetime.now().timestamp()
})
clients.discard(websocket)
logger.info(f"Client removed. Total clients: {len(clients)}")
async def main():
"""Démarrer le serveur WebSocket."""
async with websockets.serve(handle_client, "localhost", 8080):
logger.info("WebSocket server running on ws://localhost:8080")
await asyncio.Future() # Run forever
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
logger.info("Server stopped")
Implémentation du client
# examples/03-chat/py/ws_client.py
import asyncio
import json
import sys
from datetime import datetime
import websockets
from websockets.client import WebSocketClientProtocol
class ChatClient:
def __init__(self, url: str, username: str):
self.url = url
self.username = username
self.websocket: WebSocketClientProtocol | None = None
self.reconnect_attempts = 0
self.max_reconnect_attempts = 5
async def connect(self) -> None:
"""Connecter au serveur WebSocket."""
backoff = 1
while self.reconnect_attempts < self.max_reconnect_attempts:
try:
async with websockets.connect(self.url) as ws:
self.websocket = ws
self.reconnect_attempts = 0
print(f"Connected to {self.url}")
# Nous identifier
await self.send({
"type": "join",
"username": self.username,
"content": "",
"timestamp": datetime.now().timestamp()
})
# Commencer à recevoir des messages
receive_task = asyncio.create_task(self.receive_messages())
# Attendre la fermeture de la connexion
await ws.wait_closed()
# Annuler la tâche de réception
receive_task.cancel()
try:
await receive_task
except asyncio.CancelledError:
pass
print("Disconnected from server")
except (ConnectionRefusedError, OSError) as e:
self.reconnect_attempts += 1
print(f"Connection failed: {e}")
print(f"Reconnecting in {backoff}s... (attempt {self.reconnect_attempts})")
await asyncio.sleep(backoff)
backoff = min(backoff * 2, 30)
print("Max reconnection attempts reached. Giving up.")
async def receive_messages(self) -> None:
"""Recevoir et afficher les messages du serveur."""
if not self.websocket:
return
try:
async for message in self.websocket:
data = json.loads(message)
self.display_message(data)
except asyncio.CancelledError:
pass
except Exception as e:
print(f"Error receiving message: {e}")
async def send(self, message: dict) -> None:
"""Envoyer un message au serveur."""
if self.websocket and not self.websocket.closed:
await self.websocket.send(json.dumps(message))
else:
print("Cannot send message: connection not open")
def display_message(self, message: dict) -> None:
"""Afficher un message reçu."""
timestamp = datetime.fromtimestamp(message["timestamp"]).strftime("%H:%M:%S")
print(f"[{timestamp}] {message['username']}: {message['content']}")
async def stdin_reader(client: ChatClient):
"""Lire depuis stdin et envoyer des messages."""
loop = asyncio.get_event_loop()
while True:
line = await loop.run_in_executor(None, sys.stdin.readline)
text = line.strip()
if text:
await client.send({
"type": "message",
"username": client.username,
"content": text,
"timestamp": datetime.now().timestamp()
})
async def main():
"""Exécuter le client de chat."""
username = sys.argv[1] if len(sys.argv) > 1 else f"User{asyncio.get_event_loop().time() % 1000:.0f}"
client = ChatClient("ws://localhost:8080", username)
print(f"You are logged in as: {username}")
print("Type a message and press Enter to send. Press Ctrl+C to exit.")
# Exécuter la connexion et le lecteur stdin simultanément
connect_task = asyncio.create_task(client.connect())
# Donner du temps à la connexion pour s'établir
await asyncio.sleep(0.5)
stdin_task = asyncio.create_task(stdin_reader(client))
try:
await asyncio.gather(connect_task, stdin_task)
except KeyboardInterrupt:
print("\nShutting down...")
finally:
connect_task.cancel()
stdin_task.cancel()
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
pass
Configuration requise
# examples/03-chat/py/requirements.txt
websockets==13.1
Configuration Docker Compose
Version TypeScript
# examples/03-chat/ts/docker-compose.yml
version: '3.8'
services:
server:
build:
context: .
dockerfile: Dockerfile
ports:
- "8080:8080"
environment:
- NODE_ENV=production
restart: unless-stopped
# examples/03-chat/ts/Dockerfile
FROM node:20-alpine
WORKDIR /app
COPY package.json package-lock.json ./
RUN npm ci --only=production
COPY . .
RUN npx tsc
EXPOSE 8080
CMD ["node", "dist/ws-server.js"]
Version Python
# examples/03-chat/py/docker-compose.yml
version: '3.8'
services:
server:
build:
context: .
dockerfile: Dockerfile
ports:
- "8080:8080"
restart: unless-stopped
# examples/03-chat/py/Dockerfile
FROM python:3.12-alpine
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY . .
EXPOSE 8080
CMD ["python", "ws_server.py"]
Exécution des exemples
TypeScript
# Installer les dépendances
cd examples/03-chat/ts
npm install
# Démarrer le serveur
npm run server
# Dans un autre terminal, démarrer un client
npm run client Alice
# Dans un autre terminal, démarrer un autre client
npm run client Bob
Python
# Installer les dépendances
cd examples/03-chat/py
pip install -r requirements.txt
# Démarrer le serveur
python ws_server.py
# Dans un autre terminal, démarrer un client
python ws_client.py Alice
# Dans un autre terminal, démarrer un autre client
python ws_client.py Bob
Avec Docker
# Démarrer le serveur
docker-compose up -d
# Vérifier les logs
docker-compose logs -f
# Se connecter avec un client (exécuter depuis l'hôte)
npm run client Alice # ou python ws_client.py Alice
Bonnes pratiques de gestion des connexions
1. Heartbeat/Ping-Pong
Détecter les connexions obsolètes avant qu’elles ne causent des problèmes :
// Le serveur envoie un ping toutes les 30 secondes
setInterval(() => {
wss.clients.forEach((ws) => {
if (ws.isAlive === false) return ws.terminate();
ws.isAlive = false;
ws.ping();
});
}, 30000);
// Le client répond automatiquement
ws.on('ping', () => ws.pong());
2. Reconnexion avec backoff exponentiel
Ne pas surcharger le serveur lorsqu’il est en panne :
function reconnect(attempts: number) {
const delay = Math.min(1000 * Math.pow(2, attempts), 30000);
setTimeout(() => connect(), delay);
}
3. Arrêt gracieux
// Envoyer une trame de fermeture avant de terminer
ws.close(1000, 'Normal closure');
// Attendre l'accusé de réception de la trame de fermeture
ws.on('close', () => {
console.log('Connection closed cleanly');
});
4. Sérialisation des messages
Toujours valider les messages entrants :
function safeParse(data: string): Message | null {
try {
const msg = JSON.parse(data);
if (msg.type && msg.username) {
return msg;
}
} catch {}
return null;
}
Pièges courants
| Piège | Symptôme | Solution |
|---|---|---|
| Pas de gestion de la reconnexion | Le client cesse de fonctionner sur une coupure réseau | Implémenter la reconnexion avec backoff exponentiel |
Ignorer l’événement close | Fuites de mémoire des clients obsolètes | Toujours nettoyer à la déconnexion |
| Blocage de la boucle d’événements | Messages retardés | Utiliser async/await correctement, éviter le travail CPU intensif |
- Heartbeat manquant | Les connexions obsolètes restent | Implémenter ping/pong |
- Pas de validation des messages | Plantages sur des données malformées | Toujours essayer/attraper l’analyse JSON |
Test de votre implémentation WebSocket
# Utiliser websocat (comme curl pour WebSockets)
# Installation : cargo install websocat
# Connecter et envoyer/recevoir des messages
echo '{"type":"join","username":"TestUser","content":"","timestamp":123456}' | \
websocat ws://localhost:8080
# Mode interactif
websocat ws://localhost:8080
Résumé
Les WebSockets permettent la communication en temps réel bidirectionnelle entre clients et serveurs :
- Protocole : Poignée de main HTTP avec mise à niveau → connexion TCP persistante
- Communication : Messagerie full-duplex avec une surcharge minimale
- Cycle de vie : Connecting → Open → Messaging → Closing → Closed
- Bonnes pratiques : Heartbeats, arrêt gracieux, gestion de la reconnexion
Dans la section suivante, nous développerons cette base pour implémenter la messagerie pub/sub pour les systèmes de chat multi-salles.
Exercices
Exercice 1 : Ajouter la messagerie privée
Étendre le système de chat pour prendre en charge les messages privés entre utilisateurs :
// Format de message pour les messages privés
{
type: 'private',
from: 'Alice',
to: 'Bob',
content: 'Hey Bob, are you there?',
timestamp: 1234567890
}
Exigences :
- Ajouter un type de message
private - Acheminer les messages privés uniquement au destinataire prévu
- Afficher un indicateur de “message privé” dans l’interface
Exercice 2 : Indicateurs de frappe
Afficher quand un utilisateur est en train de taper :
// Message d'indicateur de frappe
{
type: 'typing',
username: 'Alice',
isTyping: true,
timestamp: 1234567890
}
Exigences :
- Envoyer
typing.startlorsque l’utilisateur commence à taper - Envoyer
typing.stopaprès 2 secondes d’inactivité - Afficher “Alice est en train de taper…” aux autres utilisateurs
Exercice 3 : État de connexion
Afficher l’état de connexion en temps réel à l’utilisateur :
Exigences :
- Afficher : Connecting → Connected → Disconnected → Reconnecting
- Utiliser des indicateurs visuels (point vert, point rouge, spinner)
- Afficher la latence ping/pong en millisecondes
Exercice 4 : Historique des messages avec reconnexion
Lorsqu’un client se reconnecte, lui envoyer les messages qu’il a manqués :
Exigences :
- Stocker les 100 derniers messages sur le serveur
- Lors de la reconnexion du client, envoyer les messages depuis son dernier horodatage
- Dédupliquer les messages que le client possède déjà
🧠 Quiz du chapitre
Testez votre maîtrise de ces concepts ! Ces questions mettront au défi votre compréhension et révéleront les lacunes dans vos connaissances.
Messagerie Pub/Sub et Ordonnancement des Messages
Session 7, Partie 1 - 45 minutes
Objectifs d’apprentissage
- Comprendre le modèle de messagerie publish-subscribe
- Apprendre le routage basé sur les sujets et le routage basé sur le contenu
- Implémenter le suivi de présence et les abonnements
- Comprendre les défis de l’ordonnancement des messages dans les systèmes distribués
- Implémenter des numéros de séquence pour l’ordonnancement causal
Qu’est-ce que Pub/Sub ?
Le modèle publish-subscribe est un modèle de messagerie où les expéditeurs (publishers) envoient des messages à un système intermédiaire, et le système achemine les messages aux récepteurs intéressés (subscribers). Les publishers et subscribers sont découplés - ils ne se connaissent pas.
Avantages clés
- Découplage : Les publishers n’ont pas besoin de savoir qui s’abonne
- Extensibilité : Ajouter des subscribers sans modifier les publishers
- Flexibilité : Gestion dynamique des abonnements
- Asynchronie : Les publishers envoient et continuent ; les subscribers traitent quand ils sont prêts
Pub/Sub vs Messagerie directe
graph TB
subgraph "Messagerie directe"
P1[Producer] -->|Direct| C1[Consumer 1]
P1 -->|Direct| C2[Consumer 2]
P1 -->|Direct| C3[Consumer 3]
end
subgraph "Messagerie Pub/Sub"
P2[Publisher] -->|Publish| B[Broker]
S1[Subscriber 1] -->|Subscribe| B
S2[Subscriber 2] -->|Subscribe| B
S3[Subscriber 3] -->|Subscribe| B
end
| Aspect | Messagerie directe | Pub/Sub |
|---|---|---|
| Couplage | Fort (le producteur connaît les consumers) | Faible (le producteur ne connaît pas les consumers) |
| Flexibilité | Faible (les changements affectent le producteur) | Élevée (abonnements dynamiques) |
| Complexité | Simple | Modérée (nécessite un broker) |
| Cas d’usage | Point-à-point, requête-réponse | Diffusion, événements, notifications |
Modèles Pub/Sub
1. Routage basé sur les sujets
Les subscribers expriment leur intérêt pour des sujets (channels). Les messages sont acheminés en fonction du sujet auquel ils sont publiés.
sequenceDiagram
participant S1 as Subscriber 1
participant S2 as Subscriber 2
participant S3 as Subscriber 3
participant B as Broker
participant P as Publisher
Note over S1,S3: Phase d'abonnement
S1->>B: subscribe("sports")
S2->>B: subscribe("sports")
S3->>B: subscribe("news")
Note over S1,S3: Phase de publication
P->>B: publish("sports", "Game starts!")
B->>S1: deliver("Game starts!")
B->>S2: deliver("Game starts!")
P->>B: publish("news", "Breaking story!")
B->>S3: deliver("Breaking story!")
Cas d’usage : Salles de chat, catégories de notifications, flux d’événements
2. Routage basé sur le contenu
Les subscribers spécifient des critères de filtrage. Les messages sont acheminés en fonction de leur contenu.
graph LR
P[Publisher] -->|{"type": "order", "value": >100}| B[Content Router]
B -->|Matches filter| S1[High-Value Handler]
B -->|Matches filter| S2[Order Logger]
B -.->|No match| S3[Low-Value Handler]
Cas d’usage : Filtrage d’événements, règles de routage complexes, données de capteurs IoT
3. Suivi de présence
Dans les systèmes en temps réel, savoir qui est en ligne (presence) est essentiel pour :
- Afficher le statut en ligne/hors ligne
- Livrer les messages uniquement aux utilisateurs actifs
- Gérer les connexions et reconnexions
- Gérer gracieusement les déconnexions utilisateurs
stateDiagram-v2
[*] --> Offline: Utilisateur créé
Offline --> Connecting: Demande de connexion
Connecting --> Online: Auth réussie
Connecting --> Offline: Auth échouée
Online --> Away: Pas d'activité
Online --> Offline: Déconnexion
Away --> Online: Activité détectée
Online --> [*]: Utilisateur supprimé
Ordonnancement des messages
Le problème de l’ordonnancement
Dans les systèmes distribués, les messages peuvent arriver dans le désordre en raison de :
- Variations de latence réseau
- Serveurs multiples traitant des messages
- Nouvelles tentatives et retransmissions de messages
- Publishers simultanés
Types d’ordonnancement
| Type d’ordonnancement | Description | Difficulté |
|---|---|---|
| FIFO | Les messages du même expéditeur arrivent dans l’ordre d’envoi | Facile |
| Causal | Les messages causalement liés sont ordonnés | Modérée |
| Total | Tous les messages sont ordonnés globalement | Difficile |
Pourquoi l’ordonnancement est important
Considérons une application de chat :
sequenceDiagram
participant A as Alice
participant S as Server
participant B as Bob
Note over A,B: Sans ordonnancement - confusion !
A->>S: "Let's meet at 5pm"
A->>S: "Never mind, 6pm instead"
S--xB: "Never mind, 6pm instead"
S--xB: "Let's meet at 5pm"
Note over B: Bob voit les messages dans le désordre !
Avec un ordonnancement approprié utilisant des numéros de séquence :
sequenceDiagram
participant A as Alice
participant S as Server
participant B as Bob
Note over A,B: Avec numéros de séquence - correct !
A->>S: [msg#1] "Let's meet at 5pm"
A->>S: [msg#2] "Never mind, 6pm instead"
S--xB: [msg#1] "Let's meet at 5pm"
S--xB: [msg#2] "Never mind, 6pm instead"
Note over B: Bob livre dans l'ordre par numéro de séquence
Implémentation : Chat Pub/Sub avec ordonnancement
Construisons un système de chat pub/sub avec :
- Routage basé sur les sujets (salles de chat)
- Suivi de présence
- Ordonnancement des messages avec numéros de séquence
Implémentation TypeScript
pubsub-server.ts - Serveur Pub/Sub avec ordonnancement :
// src: examples/03-chat/ts/pubsub-server.ts
interface Message {
id: string;
room: string;
sender: string;
content: string;
sequence: number;
timestamp: number;
}
interface Subscriber {
id: string;
userId: string;
rooms: Set<string>;
ws: WebSocket;
}
class PubSubServer {
private subscribers: Map<string, Subscriber> = new Map();
private roomSequences: Map<string, number> = new Map();
private messageHistory: Map<string, Message[]> = new Map();
private server: WebSocketServer;
constructor(port: number = 8080) {
this.server = new WebSocketServer({ port });
this.setupHandlers();
console.log(`Pub/Sub server running on port ${port}`);
}
private setupHandlers() {
this.server.on('connection', (ws: WebSocket) => {
const subscriberId = this.generateId();
ws.on('message', (data: string) => {
try {
const msg = JSON.parse(data.toString());
this.handleMessage(subscriberId, msg, ws);
} catch (err) {
ws.send(JSON.stringify({ error: 'Invalid message format' }));
}
});
ws.on('close', () => {
this.handleDisconnect(subscriberId);
});
});
}
private handleMessage(subscriberId: string, msg: any, ws: WebSocket) {
switch (msg.type) {
case 'subscribe':
this.handleSubscribe(subscriberId, msg.room, msg.userId, ws);
break;
case 'unsubscribe':
this.handleUnsubscribe(subscriberId, msg.room);
break;
case 'publish':
this.handlePublish(msg);
break;
case 'get_history':
this.handleGetHistory(msg.room, ws);
break;
}
}
private handleSubscribe(
subscriberId: string,
room: string,
userId: string,
ws: WebSocket
) {
if (!this.subscribers.has(subscriberId)) {
this.subscribers.set(subscriberId, {
id: subscriberId,
userId,
rooms: new Set(),
ws,
});
}
const subscriber = this.subscribers.get(subscriberId)!;
subscriber.rooms.add(room);
// Initialiser l'état de la salle si nécessaire
if (!this.roomSequences.has(room)) {
this.roomSequences.set(room, 0);
this.messageHistory.set(room, []);
}
// Envoyer une notification de présence
this.broadcast(room, {
type: 'presence',
userId,
action: 'join',
timestamp: Date.now(),
});
// Envoyer le numéro de séquence actuel
ws.send(JSON.stringify({
type: 'subscribed',
room,
sequence: this.roomSequences.get(room),
}));
console.log(`${userId} subscribed to ${room}`);
}
private handleUnsubscribe(subscriberId: string, room: string) {
const subscriber = this.subscribers.get(subscriberId);
if (subscriber) {
subscriber.rooms.delete(room);
// Envoyer une notification de présence
this.broadcast(room, {
type: 'presence',
userId: subscriber.userId,
action: 'leave',
timestamp: Date.now(),
});
}
}
private handlePublish(msg: any) {
const { room, sender, content } = msg;
const sequence = (this.roomSequences.get(room) || 0) + 1;
this.roomSequences.set(room, sequence);
const message: Message = {
id: this.generateId(),
room,
sender,
content,
sequence,
timestamp: Date.now(),
};
// Stocker dans l'historique
const history = this.messageHistory.get(room) || [];
history.push(message);
this.messageHistory.set(room, history.slice(-100)); // Garder les 100 derniers
// Diffuser à tous les subscribers
this.broadcast(room, {
type: 'message',
...message,
});
}
private handleGetHistory(room: string, ws: WebSocket) {
const history = this.messageHistory.get(room) || [];
ws.send(JSON.stringify({
type: 'history',
room,
messages: history,
}));
}
private broadcast(room: string, payload: any) {
const payloadStr = JSON.stringify(payload);
for (const [_, subscriber] of this.subscribers) {
if (subscriber.rooms.has(room) && subscriber.ws.readyState === WebSocket.OPEN) {
subscriber.ws.send(payloadStr);
}
}
}
private handleDisconnect(subscriberId: string) {
const subscriber = this.subscribers.get(subscriberId);
if (subscriber) {
// Notifier toutes les salles où l'utilisateur était
for (const room of subscriber.rooms) {
this.broadcast(room, {
type: 'presence',
userId: subscriber.userId,
action: 'leave',
timestamp: Date.now(),
});
}
this.subscribers.delete(subscriberId);
}
}
private generateId(): string {
return Math.random().toString(36).substring(2, 15);
}
}
const PORT = parseInt(process.env.PORT || '8080');
new PubSubServer(PORT);
pubsub-client.ts - Client avec tampon d’ordonnancement :
// src: examples/03-chat/ts/pubsub-client.ts
interface ClientMessage {
type: string;
sequence?: number;
[key: string]: any;
}
class PubSubClient {
private ws: WebSocket | null = null;
private userId: string;
private messageBuffer: Map<string, Map<number, ClientMessage>> = new Map();
private expectedSequence: Map<string, number> = new Map();
private reconnectAttempts = 0;
private maxReconnectAttempts = 5;
constructor(
private url: string,
userId?: string
) {
this.userId = userId || `user-${Math.random().toString(36).substring(7)}`;
}
connect() {
this.ws = new WebSocket(this.url);
this.ws.on('open', () => {
console.log(`Connected as ${this.userId}`);
this.reconnectAttempts = 0;
});
this.ws.on('message', (data: string) => {
const msg: ClientMessage = JSON.parse(data.toString());
this.handleMessage(msg);
});
this.ws.on('close', () => {
console.log('Disconnected. Attempting to reconnect...');
this.reconnect();
});
this.ws.on('error', (err) => {
console.error('WebSocket error:', err);
});
}
private handleMessage(msg: ClientMessage) {
switch (msg.type) {
case 'subscribed':
this.expectedSequence.set(msg.room, (msg.sequence || 0) + 1);
console.log(`Subscribed to ${msg.room} at sequence ${msg.sequence}`);
break;
case 'message':
this.handleOrderedMessage(msg.room, msg);
break;
case 'presence':
console.log(`${msg.userId} ${msg.action}ed`);
break;
case 'history':
console.log(`Received ${msg.messages.length} historical messages`);
msg.messages.forEach((m: ClientMessage) => this.displayMessage(m));
break;
}
}
private handleOrderedMessage(room: string, msg: ClientMessage) {
const seq = msg.sequence!;
// Initialiser le tampon si nécessaire
if (!this.messageBuffer.has(room)) {
this.messageBuffer.set(room, new Map());
}
const buffer = this.messageBuffer.get(room)!;
const expected = this.expectedSequence.get(room) || 1;
if (seq === expected) {
// Message attendu - livrer immédiatement
this.displayMessage(msg);
this.expectedSequence.set(room, seq + 1);
// Vérifier le tampon pour les messages suivants
this.deliverBufferedMessages(room);
} else if (seq > expected) {
// Message futur - le mettre en tampon
buffer.set(seq, msg);
console.log(`Buffered message ${seq} (expecting ${expected})`);
}
// seq < expected: ancien message, ignorer
}
private deliverBufferedMessages(room: string) {
const buffer = this.messageBuffer.get(room);
if (!buffer) return;
const expected = this.expectedSequence.get(room) || 1;
while (buffer.has(expected)) {
const msg = buffer.get(expected)!;
this.displayMessage(msg);
buffer.delete(expected);
this.expectedSequence.set(room, expected + 1);
}
}
private displayMessage(msg: ClientMessage) {
console.log(`[${msg.sequence}] ${msg.sender}: ${msg.content}`);
}
subscribe(room: string) {
this.send({ type: 'subscribe', room, userId: this.userId });
}
unsubscribe(room: string) {
this.send({ type: 'unsubscribe', room });
}
publish(room: string, content: string) {
this.send({
type: 'publish',
room,
sender: this.userId,
content,
});
}
getHistory(room: string) {
this.send({ type: 'get_history', room });
}
private send(payload: any) {
if (this.ws?.readyState === WebSocket.OPEN) {
this.ws.send(JSON.stringify(payload));
} else {
console.error('WebSocket not connected');
}
}
private reconnect() {
if (this.reconnectAttempts < this.maxReconnectAttempts) {
this.reconnectAttempts++;
const delay = Math.min(1000 * Math.pow(2, this.reconnectAttempts), 30000);
setTimeout(() => this.connect(), delay);
} else {
console.error('Max reconnection attempts reached');
}
}
}
// Usage en CLI
const args = process.argv.slice(2);
const url = args[0] || 'ws://localhost:8080';
const client = new PubSubClient(url);
client.connect();
// Interface readline simple
const readline = require('readline');
const rl = readline.createInterface({
input: process.stdin,
output: process.stdout,
});
console.log('Commands: /join <room>, /leave <room>, /history <room>, /quit');
console.log('Any other input will be sent to the current room');
let currentRoom = '';
const showPrompt = () => {
if (currentRoom) {
rl.question(`[${currentRoom}]> `, (input) => {
if (input === '/quit') {
client.ws?.close();
rl.close();
process.exit(0);
} else if (input.startsWith('/join ')) {
currentRoom = input.substring(6);
client.subscribe(currentRoom);
} else if (input.startsWith('/leave ')) {
const room = input.substring(7);
client.unsubscribe(room);
if (room === currentRoom) currentRoom = '';
} else if (input.startsWith('/history ')) {
const room = input.substring(9);
client.getHistory(room);
} else if (input && currentRoom) {
client.publish(currentRoom, input);
}
showPrompt();
});
} else {
rl.question('(no room)> ', (input) => {
if (input.startsWith('/join ')) {
currentRoom = input.substring(6);
client.subscribe(currentRoom);
}
showPrompt();
});
}
};
showPrompt();
Implémentation Python
pubsub_server.py - Serveur Pub/Sub avec ordonnancement :
# src: examples/03-chat/py/pubsub_server.py
import asyncio
import json
import time
from typing import Dict, Set, List
from dataclasses import dataclass, asdict
import websockets
from websockets.server import WebSocketServerProtocol
@dataclass
class Message:
id: str
room: str
sender: str
content: str
sequence: int
timestamp: int
class PubSubServer:
def __init__(self, port: int = 8080):
self.port = port
self.subscribers: Dict[str, dict] = {}
self.room_sequences: Dict[str, int] = {}
self.message_history: Dict[str, List[Message]] = {}
async def handle_connection(self, ws: WebSocketServerProtocol):
subscriber_id = self._generate_id()
try:
async for message in ws:
try:
data = json.loads(message)
await self.handle_message(subscriber_id, data, ws)
except json.JSONDecodeError:
await ws.send(json.dumps({"error": "Invalid message format"}))
finally:
await self.handle_disconnect(subscriber_id)
async def handle_message(self, subscriber_id: str, msg: dict, ws: WebSocketServerProtocol):
msg_type = msg.get("type")
if msg_type == "subscribe":
await self.handle_subscribe(subscriber_id, msg["room"], msg["userId"], ws)
elif msg_type == "unsubscribe":
await self.handle_unsubscribe(subscriber_id, msg["room"])
elif msg_type == "publish":
await self.handle_publish(msg)
elif msg_type == "get_history":
await self.handle_get_history(msg["room"], ws)
async def handle_subscribe(
self, subscriber_id: str, room: str, user_id: str, ws: WebSocketServerProtocol
):
if subscriber_id not in self.subscribers:
self.subscribers[subscriber_id] = {
"id": subscriber_id,
"userId": user_id,
"rooms": set(),
"ws": ws,
}
subscriber = self.subscribers[subscriber_id]
subscriber["rooms"].add(room)
# Initialiser l'état de la salle
if room not in self.room_sequences:
self.room_sequences[room] = 0
self.message_history[room] = []
# Envoyer une notification de présence
await self.broadcast(room, {
"type": "presence",
"userId": user_id,
"action": "join",
"timestamp": int(time.time() * 1000),
})
# Envoyer le numéro de séquence actuel
await ws.send(json.dumps({
"type": "subscribed",
"room": room,
"sequence": self.room_sequences[room],
}))
print(f"{user_id} subscribed to {room}")
async def handle_unsubscribe(self, subscriber_id: str, room: str):
subscriber = self.subscribers.get(subscriber_id)
if subscriber:
subscriber["rooms"].discard(room)
await self.broadcast(room, {
"type": "presence",
"userId": subscriber["userId"],
"action": "leave",
"timestamp": int(time.time() * 1000),
})
async def handle_publish(self, msg: dict):
room = msg["room"]
sender = msg["sender"]
content = msg["content"]
sequence = self.room_sequences.get(room, 0) + 1
self.room_sequences[room] = sequence
message = Message(
id=self._generate_id(),
room=room,
sender=sender,
content=content,
sequence=sequence,
timestamp=int(time.time() * 1000),
)
# Stocker dans l'historique
history = self.message_history[room]
history.append(message)
self.message_history[room] = history[-100:] # Garder les 100 derniers
# Diffuser
await self.broadcast(room, {
"type": "message",
**asdict(message),
})
async def handle_get_history(self, room: str, ws: WebSocketServerProtocol):
history = self.message_history.get(room, [])
await ws.send(json.dumps({
"type": "history",
"room": room,
"messages": [asdict(m) for m in history],
}))
async def broadcast(self, room: str, payload: dict):
payload_str = json.dumps(payload)
tasks = []
for subscriber in self.subscribers.values():
if room in subscriber["rooms"]:
ws = subscriber["ws"]
if not ws.closed:
tasks.append(ws.send(payload_str))
if tasks:
await asyncio.gather(*tasks, return_exceptions=True)
async def handle_disconnect(self, subscriber_id: str):
subscriber = self.subscribers.get(subscriber_id)
if subscriber:
# Notifier toutes les salles
for room in list(subscriber["rooms"]):
await self.broadcast(room, {
"type": "presence",
"userId": subscriber["userId"],
"action": "leave",
"timestamp": int(time.time() * 1000),
})
del self.subscribers[subscriber_id]
def _generate_id(self) -> str:
import random
import string
return ''.join(random.choices(string.ascii_lowercase + string.digits, k=12))
async def start(self):
print(f"Pub/Sub server running on port {self.port}")
async with websockets.serve(self.handle_connection, "", self.port):
await asyncio.Future() # Run forever
if __name__ == "__main__":
import os
port = int(os.environ.get("PORT", "8080"))
server = PubSubServer(port)
asyncio.run(server.start())
pubsub_client.py - Client avec tampon d’ordonnancement :
# src: examples/03-chat/py/pubsub_client.py
import asyncio
import json
import time
from typing import Dict, Optional
import websockets
from websockets.client import WebSocketClientProtocol
class PubSubClient:
def __init__(self, url: str, user_id: Optional[str] = None):
self.url = url
self.user_id = user_id or f"user-{int(time.time())}"
self.ws: Optional[WebSocketClientProtocol] = None
self.message_buffer: Dict[str, Dict[int, dict]] = {}
self.expected_sequence: Dict[str, int] = {}
self.reconnect_attempts = 0
self.max_reconnect_attempts = 5
async def connect(self):
try:
self.ws = await websockets.connect(self.url)
print(f"Connected as {self.user_id}")
self.reconnect_attempts = 0
asyncio.create_task(self.listen())
except Exception as e:
print(f"Connection failed: {e}")
await self.reconnect()
async def listen(self):
if not self.ws:
return
try:
async for message in self.ws:
data = json.loads(message)
await self.handle_message(data)
except websockets.exceptions.ConnectionClosed:
print("Disconnected. Attempting to reconnect...")
await self.reconnect()
async def handle_message(self, msg: dict):
msg_type = msg.get("type")
if msg_type == "subscribed":
room = msg["room"]
self.expected_sequence[room] = msg.get("sequence", 0) + 1
print(f"Subscribed to {room} at sequence {msg.get('sequence', 0)}")
elif msg_type == "message":
await self.handle_ordered_message(msg["room"], msg)
elif msg_type == "presence":
print(f"{msg['userId']} {msg['action']}ed")
elif msg_type == "history":
print(f"Received {len(msg['messages'])} historical messages")
for m in msg["messages"]:
self.display_message(m)
async def handle_ordered_message(self, room: str, msg: dict):
seq = msg["sequence"]
if room not in self.message_buffer:
self.message_buffer[room] = {}
buffer = self.message_buffer[room]
expected = self.expected_sequence.get(room, 1)
if seq == expected:
# Message attendu - livrer immédiatement
self.display_message(msg)
self.expected_sequence[room] = seq + 1
# Vérifier le tampon pour les messages suivants
await self.deliver_buffered_messages(room)
elif seq > expected:
# Message futur - le mettre en tampon
buffer[seq] = msg
print(f"Buffered message {seq} (expecting {expected})")
async def deliver_buffered_messages(self, room: str):
buffer = self.message_buffer.get(room, {})
expected = self.expected_sequence.get(room, 1)
while expected in buffer:
msg = buffer[expected]
self.display_message(msg)
del buffer[expected]
self.expected_sequence[room] = expected + 1
expected += 1
def display_message(self, msg: dict):
print(f"[{msg['sequence']}] {msg['sender']}: {msg['content']}")
async def subscribe(self, room: str):
await self.send({"type": "subscribe", "room": room, "userId": self.user_id})
async def unsubscribe(self, room: str):
await self.send({"type": "unsubscribe", "room": room})
async def publish(self, room: str, content: str):
await self.send({
"type": "publish",
"room": room,
"sender": self.user_id,
"content": content,
})
async def get_history(self, room: str):
await self.send({"type": "get_history", "room": room})
async def send(self, payload: dict):
if self.ws and not self.ws.closed:
await self.ws.send(json.dumps(payload))
else:
print("WebSocket not connected")
async def reconnect(self):
if self.reconnect_attempts < self.max_reconnect_attempts:
self.reconnect_attempts += 1
delay = min(1000 * (2 ** self.reconnect_attempts), 30000) / 1000
await asyncio.sleep(delay)
await self.connect()
else:
print("Max reconnection attempts reached")
async def main():
import sys
url = sys.argv[1] if len(sys.argv) > 1 else "ws://localhost:8080"
client = PubSubClient(url)
await client.connect()
# CLI simple
current_room = ""
print('Commands: /join <room>, /leave <room>, /history <room>, /quit')
while True:
try:
prompt = f"[{current_room}]> " if current_room else "(no room)> "
line = await asyncio.get_event_loop().run_in_executor(None, input, prompt)
if line == "/quit":
break
elif line.startswith("/join "):
current_room = line[6:]
await client.subscribe(current_room)
elif line.startswith("/leave "):
room = line[7:]
await client.unsubscribe(room)
if room == current_room:
current_room = ""
elif line.startswith("/history "):
room = line[9:]
await client.get_history(room)
elif line and current_room:
await client.publish(current_room, line)
except EOFError:
break
if client.ws:
await client.ws.close()
if __name__ == "__main__":
asyncio.run(main())
Exécution des exemples
Version TypeScript
cd distributed-systems-course/examples/03-chat/ts
# Installer les dépendances
npm install
# Démarrer le serveur
PORT=8080 npx ts-node pubsub-server.ts
# Dans un autre terminal, démarrer un client
npx ts-node pubsub-client.ts
Version Python
cd distributed-systems-course/examples/03-chat/py
# Installer les dépendances
pip install -r requirements.txt
# Démarrer le serveur
PORT=8080 python pubsub_server.py
# Dans un autre terminal, démarrer un client
python pubsub_client.py
Docker Compose
docker-compose.yml (TypeScript) :
services:
pubsub-server:
build: .
ports:
- "8080:8080"
environment:
- PORT=8080
docker-compose up
Test du système Pub/Sub
Test 1 : Pub/Sub de base
- Démarrer trois clients dans des terminaux séparés
- Client 1 :
/join general - Client 2 :
/join general - Client 1 :
Hello everyone! - Le client 2 devrait recevoir le message
- Client 3 :
/join general - Client 3 :
/history general- devrait voir les messages précédents
Test 2 : Salles multiples
- Client 1 :
/join sports - Client 2 :
/join news - Client 1 :
Game starting!(uniquement dans sports) - Client 2 :
Breaking news!(uniquement dans news) - Client 3 :
/join sportset/join news(reçoit les deux)
Test 3 : Ordonnancement des messages
- Démarrer un client et rejoindre une salle
- Envoyer des messages rapidement :
msg1,msg2,msg3 - Observer les numéros de séquence :
[1],[2],[3] - Noter que l’ordre est préservé
Test 4 : Suivi de présence
- Démarrer deux clients
- Les deux rejoignent la même salle
- Observer les notifications de présence (utilisateur rejoint/parti)
- Déconnecter un client (Ctrl+C)
- L’autre client reçoit la notification de départ
Exercices
Exercice 1 : Implémenter le cache des derniers messages
Ajouter une fonctionnalité pour stocker uniquement les derniers N messages par salle (déjà implémenté comme 100 dans le code).
Tâches :
- Rendre la taille de l’historique configurable via une variable d’environnement
- Ajouter une commande
/clear_historypour les administrateurs - Ajouter un TTL (time-to-live) pour les anciens messages
Exercice 2 : Implémenter les messages privés
Étendre le système pour prendre en charge les messages directs entre utilisateurs.
Exigences :
- Les messages privés ne doivent être livrés qu’au destinataire
- Utiliser un format de sujet spécial :
@username - Inclure l’authentification de l’expéditeur
Indice : Vous devrez modifier la méthode handlePublish pour vérifier le préfixe @.
Exercice 3 : Ajouter les accusés de réception de messages
Implémenter des accusés de réception pour garantir la livraison des messages.
Exigences :
- Les clients doivent ACK les messages reçus
- Le serveur suit les messages non accusés
- À la reconnexion, le serveur renvoie les messages non accusés
Indice : Ajoutez un type de message ack et suivez les messages en attente par subscriber.
Pièges courants
| Piège | Symptôme | Solution |
|---|---|---|
| Désynchronisation des numéros de séquence | Messages non affichés | Se réabonner pour réinitialiser la séquence |
| Fuite de mémoire de l’historique | Utilisation mémoire croissante | Implémenter des limites de taille d’historique |
| Mises à jour de présence manquantes | Statut en ligne obsolète | Ajouter des messages heartbeat/ping |
| Conditions de course | Messages perdus lors de la reconnexion | Mettre en tampon les messages pendant la déconnexion |
Exemples réels
| Système | Implémentation Pub/Sub | Stratégie d’ordonnancement |
|---|---|---|
| Redis Pub/Sub | Canaux basés sur des sujets | Aucune garantie d’ordonnancement |
| Apache Kafka | Sujets partitionnés | Ordonnancement par partition |
| Google Cloud Pub/Sub | Sujets avec abonnements | Livraison exactement une fois |
| AWS SNS | Diffusion basée sur des sujets | Ordonnancement au mieux (best-effort) |
| RabbitMQ | Liaison exchange/queue | FIFO dans la file |
Résumé
- Pub/Sub découple les publishers des subscribers via un broker intermédiaire
- Le routage basé sur les sujets est le modèle le plus simple et le plus courant
- Le suivi de présence permet le statut en ligne/hors ligne dans les systèmes temps réel
- L’ordonnancement des messages nécessite des numéros de séquence et la mise en tampon
- L’ordonnancement causal est réalisable avec une complexité modeste
- L’ordonnancement total est coûteux et souvent inutile
Suivant : Implémentation du système de chat →
🧠 Quiz du chapitre
Testez votre maîtrise de ces concepts ! Ces questions mettront au défi votre compréhension et révéleront les lacunes dans vos connaissances.
Implémentation du Système de Chat
Session 7 - Session complète (90 minutes)
Objectifs d’apprentissage
- Construire un système de chat en temps réel complet avec WebSockets
- Implémenter l’ordonnancement des messages avec numéros de séquence
- Gérer la gestion de présence (utilisateurs en ligne/hors ligne)
- Ajouter la persistance des messages pour l’historique
- Déployer plusieurs salles de chat avec Docker Compose
Architecture du système
Notre système de chat rassemble tous les concepts des sessions 6-7 :
graph TB
subgraph "Clients"
C1[Navigateur Utilisateur 1]
C2[Navigateur Utilisateur 2]
C3[Navigateur Utilisateur 3]
end
subgraph "Serveur de Chat"
WS[Gestionnaire WebSocket]
PS[Moteur Pub/Sub]
SM[Gestionnaire de Séquence]
PM[Gestionnaire de Présence]
MS[Stockage de Messages]
WS --> PS
WS --> SM
WS --> PM
PS --> SM
SM --> MS
end
C1 -->|WebSocket| WS
C2 -->|WebSocket| WS
C3 -->|WebSocket| WS
subgraph "Persistance"
DB[(Base de Messages)]
end
MS --> DB
style WS fill:#e3f2fd
style PS fill:#fff3e0
style SM fill:#f3e5f5
Composants clés
| Composant | Responsabilité |
|---|---|
| Gestionnaire WebSocket | Gère les connexions client, envoie/reçoit les messages |
| Moteur Pub/Sub | Achemine les messages vers les salles, gère les abonnements |
| Gestionnaire de Séquence | Attribue des numéros de séquence, assure l’ordonnancement |
| Gestionnaire de Présence | Suit le statut en ligne/hors ligne, heartbeat |
| Stockage de Messages | Persiste les messages pour l’historique et la relecture |
Flux des messages
sequenceDiagram
participant U1 as Utilisateur 1
participant WS as Gestionnaire WebSocket
participant PS as Pub/Sub
participant SM as Séquenceur
participant DB as Stockage de Messages
participant U2 as Utilisateur 2
U1->>WS: CONNECT("general")
WS->>PS: subscribe("general", U1)
WS->>PM: mark_online(U1)
PS->>U2: BROADCAST("Utilisateur 1 a rejoint")
Note over U1,U2: Envoi d'un message
U1->>WS: SEND("general", "Bonjour!")
WS->>PS: publish("general", msg)
PS->>SM: get_sequence(msg)
SM->>DB: save(msg, seq=1)
SM->>PS: return seq=1
PS->>U1: BROADCAST(msg, seq=1)
PS->>U2: BROADCAST(msg, seq=1)
Note over U1,U2: Utilisateur 2 se reconnecte
U2->>WS: CONNECT("general", last_seq=0)
WS->>DB: get_messages(since=0)
DB->>U2: REPLAY([msg1, msg2, ...])
Implémentation TypeScript
Structure du projet
chat-system/
├── package.json
├── tsconfig.json
├── src/
│ ├── types.ts # Définitions de type
│ ├── pub-sub.ts # Moteur Pub/Sub
│ ├── sequencer.ts # Gestionnaire de numéros de séquence
│ ├── presence.ts # Gestion de présence
│ ├── store.ts # Persistance des messages
│ ├── server.ts # Serveur WebSocket
│ └── index.ts # Point d'entrée
├── public/
│ └── client.html # Client de démo
├── Dockerfile
└── docker-compose.yml
1. Définitions de type
// src/types.ts
export interface Message {
id: string;
room: string;
user: string;
content: string;
sequence: number;
timestamp: number;
}
export interface Client {
id: string;
user: string;
rooms: Set<string>;
ws: WebSocket;
lastSeen: number;
}
export interface Presence {
user: string;
status: 'online' | 'offline' | 'away';
lastSeen: number;
}
export type MessageHandler = (client: Client, message: Message) => void;
2. Moteur Pub/Sub
// src/pub-sub.ts
import { Message, Client, MessageHandler } from './types';
export class PubSub {
private subscriptions: Map<string, Set<Client>> = new Map();
private handlers: Map<string, MessageHandler[]> = new Map();
subscribe(room: string, client: Client): void {
if (!this.subscriptions.has(room)) {
this.subscriptions.set(room, new Set());
}
this.subscriptions.get(room)!.add(client);
client.rooms.add(room);
}
unsubscribe(room: string, client: Client): void {
const subs = this.subscriptions.get(room);
if (subs) {
subs.delete(client);
if (subs.size === 0) {
this.subscriptions.delete(room);
}
}
client.rooms.delete(room);
}
publish(room: string, message: Message): void {
const subs = this.subscriptions.get(room);
if (subs) {
for (const client of subs) {
this.sendToClient(client, message);
}
}
this.emit('message', message);
}
on(event: string, handler: MessageHandler): void {
if (!this.handlers.has(event)) {
this.handlers.set(event, []);
}
this.handlers.get(event)!.push(handler);
}
private emit(event: string, message: Message): void {
const handlers = this.handlers.get(event) || [];
handlers.forEach(h => h(null!, message));
}
private sendToClient(client: Client, message: Message): void {
if (client.ws.readyState === client.ws.OPEN) {
client.ws.send(JSON.stringify({
type: 'message',
data: message
}));
}
}
getSubscribers(room: string): Client[] {
return Array.from(this.subscriptions.get(room) || []);
}
getRooms(): string[] {
return Array.from(this.subscriptions.keys());
}
}
3. Gestionnaire de séquence
// src/sequencer.ts
import { Message } from './types';
export class Sequencer {
private sequences: Map<string, number> = new Map();
getNext(room: string): number {
const current = this.sequences.get(room) || 0;
const next = current + 1;
this.sequences.set(room, next);
return next;
}
setCurrent(room: string, sequence: number): void {
this.sequences.set(room, sequence);
}
getCurrent(room: string): number {
return this.sequences.get(room) || 0;
}
sequenceMessage(message: Message): Message {
const seq = this.getNext(message.room);
return { ...message, sequence: seq };
}
}
4. Gestionnaire de présence
// src/presence.ts
import { Client, Presence } from './types';
const HEARTBEAT_INTERVAL = 30000; // 30 secondes
const OFFLINE_TIMEOUT = 60000; // 60 secondes
export class PresenceManager {
private users: Map<string, Presence> = new Map();
private clients: Map<string, Client> = new Map();
private intervals: Map<string, NodeJS.Timeout> = new Map();
register(client: Client): void {
this.clients.set(client.id, client);
this.updatePresence(client.user, 'online');
this.startHeartbeat(client);
}
unregister(client: Client): void {
this.stopHeartbeat(client);
this.clients.delete(client.id);
this.updatePresence(client.user, 'offline');
}
updatePresence(user: string, status: 'online' | 'offline' | 'away'): void {
this.users.set(user, {
user,
status,
lastSeen: Date.now()
});
}
getPresence(user: string): Presence | undefined {
return this.users.get(user);
}
getOnlineUsers(): string[] {
const now = Date.now();
return Array.from(this.users.values())
.filter(p => p.status === 'online' && (now - p.lastSeen) < OFFLINE_TIMEOUT)
.map(p => p.user);
}
getPresenceInRoom(room: string): Presence[] {
const now = Date.now();
const usersInRoom = new Set<string>();
for (const client of this.clients.values()) {
if (client.rooms.has(room)) {
usersInRoom.add(client.user);
}
}
return Array.from(usersInRoom)
.map(user => this.users.get(user)!)
.filter(p => p && (now - p.lastSeen) < OFFLINE_TIMEOUT);
}
private startHeartbeat(client: Client): void {
const interval = setInterval(() => {
if (client.ws.readyState === client.ws.OPEN) {
client.ws.send(JSON.stringify({ type: 'heartbeat' }));
this.updatePresence(client.user, 'online');
}
}, HEARTBEAT_INTERVAL);
this.intervals.set(client.id, interval);
}
private stopHeartbeat(client: Client): void {
const interval = this.intervals.get(client.id);
if (interval) {
clearInterval(interval);
this.intervals.delete(client.id);
}
}
cleanup(): void {
for (const interval of this.intervals.values()) {
clearInterval(interval);
}
this.intervals.clear();
}
}
5. Stockage de messages
// src/store.ts
import { Message } from './types';
import fs from 'fs/promises';
import path from 'path';
export class MessageStore {
private basePath: string;
constructor(basePath: string = './data/messages') {
this.basePath = basePath;
}
async save(message: Message): Promise<void> {
const roomPath = path.join(this.basePath, message.room);
await fs.mkdir(roomPath, { recursive: true });
const filename = path.join(roomPath, `${message.sequence}.json`);
await fs.writeFile(filename, JSON.stringify(message, null, 2));
}
async getMessages(room: string, since: number = 0, limit: number = 100): Promise<Message[]> {
const roomPath = path.join(this.basePath, room);
const messages: Message[] = [];
try {
const files = await fs.readdir(roomPath);
const jsonFiles = files
.filter(f => f.endsWith('.json'))
.map(f => parseInt(f.replace('.json', '')))
.filter(seq => seq > since)
.sort((a, b) => a - b)
.slice(0, limit);
for (const seq of jsonFiles) {
const content = await fs.readFile(path.join(roomPath, `${seq}.json`), 'utf-8');
messages.push(JSON.parse(content));
}
} catch (err) {
// La salle n'existe pas encore
}
return messages;
}
async getLastSequence(room: string): Promise<number> {
const roomPath = path.join(this.basePath, room);
try {
const files = await fs.readdir(roomPath);
const sequences = files
.filter(f => f.endsWith('.json'))
.map(f => parseInt(f.replace('.json', '')));
return sequences.length > 0 ? Math.max(...sequences) : 0;
} catch {
return 0;
}
}
}
6. Serveur WebSocket
// src/server.ts
import { WebSocketServer, WebSocket } from 'ws';
import { createServer } from 'http';
import { v4 as uuidv4 } from 'uuid';
import { PubSub } from './pub-sub';
import { Sequencer } from './sequencer';
import { PresenceManager } from './presence';
import { MessageStore } from './store';
import { Client, Message } from './types';
const PORT = process.env.PORT || 8080;
export class ChatServer {
private wss: WebSocketServer;
private pubSub: PubSub;
private sequencer: Sequencer;
private presence: PresenceManager;
private store: MessageStore;
constructor() {
const server = createServer();
this.wss = new WebSocketServer({ server });
this.pubSub = new PubSub();
this.sequencer = new Sequencer();
this.presence = new PresenceManager();
this.store = new MessageStore();
this.setupHandlers();
}
private setupHandlers(): void {
this.wss.on('connection', (ws: WebSocket) => {
const clientId = uuidv4();
const client: Client = {
id: clientId,
user: `user_${clientId.slice(0, 8)}`,
rooms: new Set(),
ws,
lastSeen: Date.now()
};
console.log(`Client connected: ${client.id}`);
ws.on('message', async (data: string) => {
try {
const msg = JSON.parse(data);
await this.handleMessage(client, msg);
} catch (err) {
console.error('Error handling message:', err);
}
});
ws.on('close', () => {
console.log(`Client disconnected: ${client.id}`);
for (const room of client.rooms) {
this.pubSub.publish(room, {
id: uuidv4(),
room,
user: 'system',
content: `${client.user} left the room`,
sequence: this.sequencer.getCurrent(room),
timestamp: Date.now()
});
this.pubSub.unsubscribe(room, client);
}
this.presence.unregister(client);
});
// Envoyer un message de bienvenue
this.sendToClient(client, {
type: 'connected',
data: { clientId: client.id, user: client.user }
});
this.presence.register(client);
});
}
private async handleMessage(client: Client, msg: any): Promise<void> {
switch (msg.type) {
case 'join':
await this.handleJoin(client, msg.room);
break;
case 'leave':
this.handleLeave(client, msg.room);
break;
case 'message':
await this.handleChatMessage(client, msg.data);
break;
case 'presence':
this.handlePresenceRequest(client, msg.room);
break;
case 'history':
await this.handleHistoryRequest(client, msg.room, msg.since);
break;
default:
console.log('Unknown message type:', msg.type);
}
}
private async handleJoin(client: Client, room: string): Promise<void> {
console.log(`${client.user} joining room: ${room}`);
// S'abonner à la salle
this.pubSub.subscribe(room, client);
// Envoyer la présence actuelle
const presence = this.presence.getPresenceInRoom(room);
this.sendToClient(client, {
type: 'presence',
data: { room, users: presence }
});
// Annoncer le rejoindre
this.pubSub.publish(room, {
id: uuidv4(),
room,
user: 'system',
content: `${client.user} joined the room`,
sequence: this.sequencer.getCurrent(room),
timestamp: Date.now()
});
// Envoyer les messages récents
const history = await this.store.getMessages(room, 0, 50);
if (history.length > 0) {
this.sendToClient(client, {
type: 'history',
data: { room, messages: history }
});
}
}
private handleLeave(client: Client, room: string): void {
console.log(`${client.user} leaving room: ${room}`);
this.pubSub.unsubscribe(room, client);
this.pubSub.publish(room, {
id: uuidv4(),
room,
user: 'system',
content: `${client.user} left the room`,
sequence: this.sequencer.getCurrent(room),
timestamp: Date.now()
});
}
private async handleChatMessage(client: Client, data: any): Promise<void> {
const { room, content } = data;
if (!client.rooms.has(room)) {
this.sendError(client, 'Not subscribed to room');
return;
}
const message: Message = {
id: uuidv4(),
room,
user: client.user,
content,
sequence: 0, // Sera assigné
timestamp: Date.now()
};
// Assigner un numéro de séquence
const sequenced = this.sequencer.sequenceMessage(message);
// Sauvegarder dans le stockage
await this.store.save(sequenced);
// Publier à tous les subscribers
this.pubSub.publish(room, sequenced);
console.log(`[${room}] ${client.user}: ${content} (seq: ${sequenced.sequence})`);
}
private handlePresenceRequest(client: Client, room: string): void {
const presence = this.presence.getPresenceInRoom(room);
this.sendToClient(client, {
type: 'presence',
data: { room, users: presence }
});
}
private async handleHistoryRequest(client: Client, room: string, since: number = 0): Promise<void> {
const messages = await this.store.getMessages(room, since);
this.sendToClient(client, {
type: 'history',
data: { room, messages }
});
}
private sendToClient(client: Client, data: any): void {
if (client.ws.readyState === client.ws.OPEN) {
client.ws.send(JSON.stringify(data));
}
}
private sendError(client: Client, message: string): void {
this.sendToClient(client, {
type: 'error',
data: { message }
});
}
listen(): void {
const server = this.wss.server!;
server.listen(PORT, () => {
console.log(`Chat server listening on port ${PORT}`);
});
}
}
7. Point d’entrée
// src/index.ts
import { ChatServer } from './server';
const server = new ChatServer();
server.listen();
8. Package.json
{
"name": "chat-system",
"version": "1.0.0",
"description": "Real-time chat system with WebSockets",
"main": "dist/index.js",
"scripts": {
"build": "tsc",
"start": "node dist/index.js",
"dev": "ts-node src/index.ts"
},
"dependencies": {
"ws": "^8.18.0",
"uuid": "^11.0.3"
},
"devDependencies": {
"@types/node": "^22.10.2",
"@types/ws": "^8.5.13",
"@types/uuid": "^10.0.0",
"ts-node": "^10.9.2",
"typescript": "^5.7.2"
}
}
9. Dockerfile
FROM node:20-alpine
WORKDIR /app
COPY package*.json ./
RUN npm ci --only=production
COPY . .
RUN npm run build
EXPOSE 8080
CMD ["npm", "start"]
10. Docker Compose
version: '3.8'
services:
chat:
build: .
ports:
- "8080:8080"
volumes:
- ./data:/app/data
environment:
- PORT=8080
restart: unless-stopped
Implémentation Python
Structure du projet
chat-system/
├── requirements.txt
├── src/
│ ├── __init__.py
│ ├── types.py
│ ├── pub_sub.py
│ ├── sequencer.py
│ ├── presence.py
│ ├── store.py
│ ├── server.py
│ └── main.py
├── public/
│ └── client.html
├── Dockerfile
└── docker-compose.yml
1. Définitions de type
# src/types.py
from dataclasses import dataclass, field
from typing import Set
import websockets.server
import datetime
@dataclass
class Message:
id: str
room: str
user: str
content: str
sequence: int
timestamp: float
@dataclass
class Client:
id: str
user: str
rooms: Set[str] = field(default_factory=set)
websocket: websockets.server.WebSocketServerProtocol = None
last_seen: float = field(default_factory=lambda: datetime.datetime.now().timestamp())
@dataclass
class Presence:
user: str
status: str # 'online', 'offline', 'away'
last_seen: float
2. Moteur Pub/Sub
# src/pub_sub.py
from typing import Dict, Set, List, Callable, Any
from .types import Message, Client
class PubSub:
def __init__(self):
self.subscriptions: Dict[str, Set[Client]] = {}
self.handlers: Dict[str, List[Callable]] = {}
def subscribe(self, room: str, client: Client) -> None:
if room not in self.subscriptions:
self.subscriptions[room] = set()
self.subscriptions[room].add(client)
client.rooms.add(room)
def unsubscribe(self, room: str, client: Client) -> None:
if room in self.subscriptions:
self.subscriptions[room].discard(client)
if not self.subscriptions[room]:
del self.subscriptions[room]
client.rooms.discard(room)
async def publish(self, room: str, message: Message) -> None:
if room in self.subscriptions:
for client in self.subscriptions[room]:
await self._send_to_client(client, message)
await self._emit('message', message)
async def _send_to_client(self, client: Client, message: Message) -> None:
if client.websocket and not client.websocket.closed:
import json
await client.websocket.send(json.dumps({
'type': 'message',
'data': message.__dict__
}))
async def _emit(self, event: str, message: Message) -> None:
handlers = self.handlers.get(event, [])
for handler in handlers:
await handler(None, message)
def get_subscribers(self, room: str) -> List[Client]:
return list(self.subscriptions.get(room, set()))
def get_rooms(self) -> List[str]:
return list(self.subscriptions.keys())
3. Gestionnaire de séquence
# src/sequencer.py
from typing import Dict
from .types import Message
class Sequencer:
def __init__(self):
self.sequences: Dict[str, int] = {}
def get_next(self, room: str) -> int:
current = self.sequences.get(room, 0)
next_seq = current + 1
self.sequences[room] = next_seq
return next_seq
def set_current(self, room: str, sequence: int) -> None:
self.sequences[room] = sequence
def get_current(self, room: str) -> int:
return self.sequences.get(room, 0)
def sequence_message(self, message: Message) -> Message:
seq = self.get_next(message.room)
message.sequence = seq
return message
4. Gestionnaire de présence
# src/presence.py
import asyncio
import datetime
from typing import Dict, List, Set
from .types import Client, Presence
HEARTBEAT_INTERVAL = 30 # secondes
OFFLINE_TIMEOUT = 60 # secondes
class PresenceManager:
def __init__(self):
self.users: Dict[str, Presence] = {}
self.clients: Dict[str, Client] = {}
self.tasks: Dict[str, asyncio.Task] = {}
def register(self, client: Client) -> None:
self.clients[client.id] = client
self.update_presence(client.user, 'online')
self.tasks[client.id] = asyncio.create_task(self._heartbeat(client))
def unregister(self, client: Client) -> None:
if client.id in self.tasks:
self.tasks[client.id].cancel()
del self.tasks[client.id]
if client.id in self.clients:
del self.clients[client.id]
self.update_presence(client.user, 'offline')
def update_presence(self, user: str, status: str) -> None:
self.users[user] = Presence(
user=user,
status=status,
last_seen=datetime.datetime.now().timestamp()
)
def get_presence(self, user: str) -> Presence | None:
return self.users.get(user)
def get_online_users(self) -> List[str]:
now = datetime.datetime.now().timestamp()
return [
p.user for p in self.users.values()
if p.status == 'online' and (now - p.last_seen) < OFFLINE_TIMEOUT
]
def get_presence_in_room(self, room: str) -> List[Presence]:
now = datetime.datetime.now().timestamp()
users_in_room = set()
for client in self.clients.values():
if room in client.rooms:
users_in_room.add(client.user)
return [
self.users.get(user)
for user in users_in_room
if user in self.users and (now - self.users[user].last_seen) < OFFLINE_TIMEOUT
]
async def _heartbeat(self, client: Client) -> None:
import json
while True:
try:
if client.websocket and not client.websocket.closed:
await client.websocket.send(json.dumps({'type': 'heartbeat'}))
self.update_presence(client.user, 'online')
except asyncio.CancelledError:
break
except Exception:
pass
await asyncio.sleep(HEARTBEAT_INTERVAL)
def cleanup(self) -> None:
for task in self.tasks.values():
task.cancel()
self.tasks.clear()
5. Stockage de messages
# src/store.py
import os
import json
import asyncio
from pathlib import Path
from typing import List
from .types import Message
class MessageStore:
def __init__(self, base_path: str = './data/messages'):
self.base_path = Path(base_path)
async def save(self, message: Message) -> None:
room_path = self.base_path / message.room
room_path.mkdir(parents=True, exist_ok=True)
filename = room_path / f'{message.sequence}.json'
with open(filename, 'w') as f:
json.dump(message.__dict__, f, indent=2)
async def get_messages(self, room: str, since: int = 0, limit: int = 100) -> List[Message]:
room_path = self.base_path / room
messages = []
if not room_path.exists():
return messages
try:
files = [f for f in os.listdir(room_path) if f.endswith('.json')]
sequences = sorted([
int(f.replace('.json', ''))
for f in files
if int(f.replace('.json', '')) > since
])[:limit]
for seq in sequences:
with open(room_path / f'{seq}.json', 'r') as f:
data = json.load(f)
messages.append(Message(**data))
except FileNotFoundError:
pass
return messages
async def get_last_sequence(self, room: str) -> int:
room_path = self.base_path / room
if not room_path.exists():
return 0
try:
files = [f for f in os.listdir(room_path) if f.endswith('.json')]
sequences = [int(f.replace('.json', '')) for f in files]
return max(sequences) if sequences else 0
except FileNotFoundError:
return 0
6. Serveur WebSocket
# src/server.py
import websockets
import json
import uuid
import asyncio
from typing import Any
from .pub_sub import PubSub
from .sequencer import Sequencer
from .presence import PresenceManager
from .store import MessageStore
from .types import Client, Message
PORT = int(os.getenv('PORT', 8080))
class ChatServer:
def __init__(self):
self.pub_sub = PubSub()
self.sequencer = Sequencer()
self.presence = PresenceManager()
self.store = MessageStore()
async def handle_client(self, websocket, path):
client_id = str(uuid.uuid4())
client = Client(
id=client_id,
user=f"user_{client_id[:8]}",
websocket=websocket,
rooms=set()
)
print(f"Client connected: {client.id}")
await self._send_to_client(client, {
'type': 'connected',
'data': {'clientId': client.id, 'user': client.user}
})
self.presence.register(client)
try:
async for message in websocket:
msg = json.loads(message)
await self.handle_message(client, msg)
except websockets.exceptions.ConnectionClosed:
print(f"Client disconnected: {client.id}")
finally:
for room in list(client.rooms):
await self.pub_sub.publish(room, Message(
id=str(uuid.uuid4()),
room=room,
user='system',
content=f"{client.user} left the room",
sequence=self.sequencer.get_current(room),
timestamp=asyncio.get_event_loop().time()
))
self.pub_sub.unsubscribe(room, client)
self.presence.unregister(client)
async def handle_message(self, client: Client, msg: Any) -> None:
handlers = {
'join': self.handle_join,
'leave': self.handle_leave,
'message': self.handle_chat_message,
'presence': self.handle_presence_request,
'history': self.handle_history_request
}
handler = handlers.get(msg.get('type'))
if handler:
await handler(client, msg)
else:
print(f"Unknown message type: {msg.get('type')}")
async def handle_join(self, client: Client, msg: Any) -> None:
room = msg.get('room')
print(f"{client.user} joining room: {room}")
self.pub_sub.subscribe(room, client)
presence = self.presence.get_presence_in_room(room)
await self._send_to_client(client, {
'type': 'presence',
'data': {'room': room, 'users': [p.__dict__ for p in presence]}
})
await self.pub_sub.publish(room, Message(
id=str(uuid.uuid4()),
room=room,
user='system',
content=f"{client.user} joined the room",
sequence=self.sequencer.get_current(room),
timestamp=asyncio.get_event_loop().time()
))
history = await self.store.get_messages(room, 0, 50)
if history:
await self._send_to_client(client, {
'type': 'history',
'data': {'room': room, 'messages': [m.__dict__ for m in history]}
})
def handle_leave(self, client: Client, msg: Any) -> None:
room = msg.get('room')
print(f"{client.user} leaving room: {room}")
self.pub_sub.unsubscribe(room, client)
async def handle_chat_message(self, client: Client, msg: Any) -> None:
data = msg.get('data', {})
room = data.get('room')
if room not in client.rooms:
await self._send_error(client, 'Not subscribed to room')
return
message = Message(
id=str(uuid.uuid4()),
room=room,
user=client.user,
content=data.get('content', ''),
sequence=0,
timestamp=asyncio.get_event_loop().time()
)
sequenced = self.sequencer.sequence_message(message)
await self.store.save(sequenced)
await self.pub_sub.publish(room, sequenced)
print(f"[{room}] {client.user}: {sequenced.content} (seq: {sequenced.sequence})")
async def handle_presence_request(self, client: Client, msg: Any) -> None:
room = msg.get('room')
presence = self.presence.get_presence_in_room(room)
await self._send_to_client(client, {
'type': 'presence',
'data': {'room': room, 'users': [p.__dict__ for p in presence]}
})
async def handle_history_request(self, client: Client, msg: Any) -> None:
room = msg.get('room')
since = msg.get('since', 0)
messages = await self.store.get_messages(room, since)
await self._send_to_client(client, {
'type': 'history',
'data': {'room': room, 'messages': [m.__dict__ for m in messages]}
})
async def _send_to_client(self, client: Client, data: Any) -> None:
if client.websocket and not client.websocket.closed:
await client.websocket.send(json.dumps(data))
async def _send_error(self, client: Client, message: str) -> None:
await self._send_to_client(client, {
'type': 'error',
'data': {'message': message}
})
async def start(self):
print(f"Chat server listening on port {PORT}")
async with websockets.serve(self.handle_client, "", PORT):
await asyncio.Future() # Run forever
7. Point d’entrée
# src/main.py
import asyncio
import os
from server import ChatServer
async def main():
server = ChatServer()
await server.start()
if __name__ == '__main__':
asyncio.run(main())
8. Configuration requise
websockets==13.1
aiofiles==24.1.0
9. Dockerfile
FROM python:3.12-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY . .
EXPOSE 8080
CMD ["python", "src/main.py"]
10. Docker Compose
version: '3.8'
services:
chat:
build: .
ports:
- "8080:8080"
volumes:
- ./data:/app/data
environment:
- PORT=8080
restart: unless-stopped
Exécution du système de chat
TypeScript
# Installer les dépendances
npm install
# Compiler
npm run build
# Démarrer le serveur
npm start
# Avec Docker Compose
docker-compose up
Python
# Installer les dépendances
pip install -r requirements.txt
# Démarrer le serveur
python src/main.py
# Avec Docker Compose
docker-compose up
Exercices
Exercice 1 : Opérations de chat de base
- Démarrer le serveur de chat
- Connecter deux clients WebSocket
- Rejoindre la même salle
- Envoyer des messages et vérifier que les deux clients les reçoivent
- Quitter la salle et vérifier la diffusion
Exercice 2 : Ordonnancement des messages
- Envoyer plusieurs messages rapidement depuis différents clients
- Vérifier que tous les messages ont des numéros de séquence uniques et séquentiels
- Déconnecter et reconnecter un client
- Demander l’historique des messages et vérifier que l’ordonnancement est préservé
Exercice 3 : Gestion de la présence
- Connecter plusieurs clients à différentes salles
- Rejoindre une salle et vérifier les diffusions de présence
- Simuler une défaillance réseau (tuer un client sans partir correctement)
- Vérifier que la détection hors ligne intervient après le délai d’attente
Exercice 4 : Persistance des messages
- Envoyer des messages à une salle
- Arrêter le serveur
- Vérifier que les messages sont sauvegardés sur disque
- Redémarrer le serveur
- Connecter un nouveau client et vérifier qu’il reçoit l’historique des messages
Pièges courants
| Problème | Cause | Solution |
|---|---|---|
| Messages non ordonnés | Numéros de séquence manquants | Toujours séquencer avant de publier |
| Anciens messages non reçus | Pas demander l’historique lors de la jointure | Implémenter la relecture à la connexion |
| La présence affiche hors ligne | Heartbeat non envoyé | S’assurer que la boucle heartbeat fonctionne |
| Messages en double | Republication des messages sauvegardés | Publier uniquement les nouveaux messages, pas l’historique |
Points clés à retenir
- Pub/Sub permet la communication multi-salle extensible
- Les numéros de séquence garantissent l’ordonnancement des messages sur tous les clients
- La gestion de présence nécessite à la fois des heartbeats actifs et une détection de délai d’attente passive
- La persistance des messages permet aux clients de se reconnecter et de recevoir l’historique
- Docker Compose simplifie le déploiement et les tests du système complet
🧠 Quiz du chapitre
Testez votre maîtrise de ces concepts ! Ces questions mettront au défi votre compréhension et révéleront les lacunes dans vos connaissances.
Le Problème du Consensus
Chapitre 11 — Comment faire s’accorder plusieurs machines sur une même décision ?
Histoire : La Cuisine Chaotique
Imagine une grande cuisine de restaurant. Il y a trois chefs : Alice, Bob et Carole. Chacun reçoit les commandes directement des serveurs, sans se concerter avec les autres.
Un soir de rush, le serveur dit à Alice : « Table 5, une pizza. » Un autre serveur dit à Bob : « Table 5, des pâtes. » Et un troisième dit à Carole : « Table 5, une salade. »
Que se passe-t-il ? Trois plats différents sortent pour la même table. Le client reçoit une pizza, des pâtes ET une salade — il n’en avait commandé qu’un seul. Le cuisinier est confus, le client est mécontent, et tout le monde perd du temps.
La semaine suivante, le restaurant change d’organisation. Maintenant, il y a un chef principal. Tous les serveurs transmettent les commandes au chef principal, qui décide de l’ordre et distribue les tâches : « Alice, tu fais la pizza. Bob, les pâtes. Carole, la salade. » Tout le monde est d’accord. Le service se déroule sans accroc.
Mieux encore : si le chef principal doit s’absenter, les autres chefs se mettent d’accord pour en choisir un nouveau. La cuisine continue de fonctionner. C’est ça, la robustesse d’un bon système de consensus.
C’est exactement le problème du consensus : comment un groupe peut-il se mettre d’accord sur une décision unique, même quand chacun reçoit des informations différentes au départ ?
Le Même Problème, en Informatique
Remplaçons nos chefs par des ordinateurs — des nœuds dans un système distribué. Trois machines reçoivent des requêtes en même temps. Si elles ne se coordonnent pas, chaque machine peut avoir une vision différente de ce qui s’est passé.
Imagine une base de données où le nœud A croit que le solde d’un compte est 100€, le nœud B croit qu’il est 200€, et le nœud C croit qu’il est 50€. Lequel a raison ? Si un client retire de l’argent, quelle valeur utiliser ? C’est le chaos — exactement comme notre cuisine sans chef principal.
Voici ce qui arrive quand les nœuds ne s’accordent pas. On appelle ça un split-brain (cerveau divisé) :
graph LR
subgraph "Sans Consensus — Split-Brain"
N1["Nœud A<br/>valeur = 1"]
N2["Nœud B<br/>valeur = 2"]
N3["Nœud C<br/>valeur = 3"]
end
N1 ~~~ N2 ~~~ N3
Problem["Laquelle est correcte ?"]
Et voici ce qui se passe quand les nœuds parviennent à un consensus :
graph LR
subgraph "Avec Consensus"
A1["Nœud A<br/>valeur = 2"]
A2["Nœud B<br/>valeur = 2"]
A3["Nœud C<br/>valeur = 2"]
end
A1 ~~~ A2 ~~~ A3
Solved["Tous d'accord !"]
La différence est simple : sans consensus, chaque nœud décide tout seul. Avec consensus, tous les nœuds choisissent la même valeur.
Ce problème apparaît partout dans les systèmes distribués : élection d’un leader, réplication de données, verrous distribués, configuration partagée… Dès que plusieurs machines doivent se coordonner, on a besoin de consensus.
Ce Dont Nous Avons Besoin
Pour qu’un consensus fonctionne correctement, il doit respecter quatre propriétés. Pas besoin de formules mathématiques — c’est du bon sens. Tu peux les retenir comme les règles d’un bon vote démocratique :
« Tous les nœuds sont d’accord » (Accord) Si le nœud A décide « valeur = 5 », alors le nœud B doit aussi décider « valeur = 5 ». Personne ne peut décider quelque chose de différent. Sinon, à quoi bon ?
« La valeur choisie a vraiment été proposée » (Validité) Le groupe ne peut pas inventer une valeur qui n’a jamais été proposée par personne. Si personne n’a suggéré « 7 », on ne peut pas décider « 7 ». C’est comme un vote : on ne peut élire que quelqu’un qui s’est présenté.
« On finit par décider, on ne tourne pas en boucle » (Terminaison) L’algorithme doit s’arrêter un jour. On ne peut pas voter éternellement sans jamais aboutir. Le consensus doit être atteint en un temps fini — pas dans un million d’années.
« Personne ne change d’avis » (Intégrité) Une fois qu’un nœud a pris sa décision, elle est définitive. Pas de retour en arrière. C’est comme signer un contrat : une fois signé, on ne peut pas annuler sa signature.
Les trois premières (accord, validité, intégrité) sont des garanties de sécurité — rien de mauvais ne se produit. La terminaison est une garantie de vitalité — le système finit par avancer.
Ces quatre propriétés forment un tout. Si une seule manque, le consensus ne fonctionne pas. Imagine un vote sans garantie de fin : les électeurs attendraient indéfiniment.
Pourquoi C’est Si Difficile
Tu pourrais te dire : « C’est simple, on vote et c’est tout ! Trois nœuds, majorité de deux, fini. » Oui, dans un monde parfait. Mais dans un vrai réseau, trois problèmes rendent tout beaucoup plus compliqué qu’un simple vote.
Pas d’horloge globale
Chaque ordinateur a sa propre horloge interne, et elles ne sont jamais parfaitement synchronisées. Même une différence de quelques millisecondes pose problème, car dans un système distribué, l’ordre des événements est crucial. On ne peut pas dire avec certitude « l’événement X s’est produit avant l’événement Y » :
sequenceDiagram
participant A as "Nœud A (10:00:01)"
participant B as "Nœud B (10:00:05)"
participant C as "Nœud C (10:00:03)"
Note over A: A propose valeur = 1
A->>B: envoi(valeur = 1)
Note over B: B reçoit à 10:00:07
Note over C: C propose valeur = 2
C->>B: envoi(valeur = 2)
Note over B: B reçoit à 10:00:08
Note over B: Qui a proposé en premier ?<br/>Impossible à dire !
Sans horloge commune, deux nœuds peuvent ordonner les mêmes événements différemment. C’est comme si deux personnes regardaient la même course mais chacune avec sa propre montre, légèrement décalée.
Pour résoudre ce problème, les systèmes distribués utilisent des horloges logiques (comme les horodatages de Lamport) au lieu d’horloges réelles. Mais ça, c’est une autre histoire.
Messages perdus ou retardés
Sur internet, un message peut disparaître, arriver en retard, ou même arriver dans le désordre. Ce n’est pas de la mauvaise volonté — c’est la nature des réseaux informatiques. Un câble coupé, un routeur surchargé, une interférence wifi… et pouf, ton message disparaît :
stateDiagram-v2
[*] --> Envoyé: "Nœud envoie un message"
Envoyé --> Livré: "Arrive normalement"
Envoyé --> Perdu: "Disparaît dans le réseau"
Envoyé --> Retardé: "Réseau lent"
Retardé --> Livré: "Arrive en retard"
Perdu --> [*]
Livré --> [*]
Pire encore : si un message n’arrive pas, on ne peut pas savoir s’il a été perdu ou s’il est simplement en retard. Peut-être qu’il arrivera dans 5 secondes, peut-être jamais. On ne peut pas attendre éternellement.
Ce problème est fondamental : impossible de distinguer un nœud lent d’un nœud en panne. C’est d’ailleurs la base du résultat FLP que nous allons voir juste après.
Les nœuds peuvent tomber en panne
Un ordinateur peut planter à n’importe quel moment — y compris juste après avoir reçu une information critique mais avant de l’avoir partagée avec les autres :
graph TB
N1["Nœud 1 : en vie ✓"]
N2["Nœud 2 : PANNE ✗<br/>données non validées"]
N3["Nœud 3 : en vie ✓"]
N1 --- N2 --- N3
Q["Que deviennent les données<br/>que le Nœud 2 n'a pas<br/>eu le temps de partager ?"]
C’est comme si un membre d’un jury s’évanouissait juste après avoir entendu un témoignage crucial, avant d’avoir pu le raconter aux autres jurés. L’information est perdue.
Et ce n’est pas tout : un nœud peut aussi redémarrer après une panne, en ayant oublié tout ce qui s’est passé avant. On appelle ça un crash avec perte de mémoire. Le système doit être capable de fonctionner même quand certains participants disparaissent temporairement.
Ces trois problèmes sont inévitables dans tout réseau réel. Un bon algorithme de consensus doit fonctionner malgré eux.
FLP (Version Simple)
En 1985, trois chercheurs — Fischer, Lynch et Paterson — ont prouvé quelque chose de déconcertant. Leur résultat, appelé FLP, dit ceci :
Dans un réseau asynchrone (où les messages peuvent prendre un temps arbitraire), même avec un seul nœud qui peut tomber en panne, aucun algorithme déterministe ne peut garantir le consensus à coup sûr.
En d’autres termes : en théorie, le consensus parfait est impossible. Pas « difficile » — impossible.
Mais ne t’inquiète pas ! En pratique, ce n’est pas un problème bloquant. Les systèmes réels contournent FLP de deux façons :
- Ils utilisent des délais d’attente (timeouts) — si un message n’arrive pas dans un délai raisonnable, on suppose qu’il est perdu et on agit sans lui.
- Ils utilisent de l’aléatoire (randomisation) — au lieu de décider de façon déterministe, on introduit du hasard pour éviter les blocages.
Le réseau n’est jamais vraiment asynchrone pour toujours. Tôt ou tard, les messages finissent par arriver.
En résumé : FLP dit « c’est impossible en théorie », mais la pratique dit « on s’en rapproche très bien avec des astuces simples ». C’est un peu comme voler — théoriquement impossible pour les humains, mais avec les bons outils (des avions), on y arrive très bien.
La Bonne Nouvelle : Raft
Heureusement, en 2014, Diego Ongaro et John Ousterhout ont créé Raft — un algorithme conçu exprès pour être compréhensible.
L’ancêtre, Paxos (inventé par Leslie Lamport en 1998), fonctionne correctement et offre les mêmes garanties, mais il est notoirement difficile à comprendre. Même les experts s’arrachent les cheveux avec Paxos. Son créateur lui-même a écrit un papier intitulé « Paxos Made Simple »… que personne n’a trouvé simple.
Raft décompose le consensus en deux phases claires :
graph TB
Election["Phase 1 : Élection<br/>Choisir un chef parmi les nœuds"]
Replication["Phase 2 : Réplication<br/>Le chef décide, les autres copient"]
Election --> Replication
Replication --> Result["Tous les nœuds<br/>sont d'accord"]
C’est exactement comme notre cuisine : d’abord on choisit un chef principal (élection), puis le chef principal distribue les tâches (réplication). Simple, non ?
Raft ne sacrifie aucune des quatre propriétés du consensus (accord, validité, terminaison, intégrité). Il offre les mêmes garanties que Paxos, mais avec une conception que l’on peut expliquer sur un tableau blanc en 10 minutes.
Dans les prochains chapitres, nous verrons en détail comment Raft gère l’élection du chef, puis comment il réplique les décisions pour que tous les nœuds restent synchronisés. Prépare-toi — on entre dans le cœur du moteur !
Résumé
- Le consensus est le problème consistant à faire s’accorder plusieurs machines sur une même décision
- Il faut quatre propriétés : accord (tous d’accord), validité (la valeur a été proposée), terminaison (on finit par décider) et intégrité (pas de changement d’avis)
- C’est difficile car il n’y a pas d’horloge globale, les messages se perdent et les nœuds tombent en panne
- Le résultat FLP dit que le consensus parfait est théoriquement impossible en réseau asynchrone
- En pratique, on utilise des délais d’attente et de l’aléatoire pour contourner cette impossibilité
- Raft (2014) est un algorithme de consensus conçu pour être compréhensible, en deux phases : élection puis réplication
Exercices
-
Où est le consensus ? Donne un exemple d’un système qui a besoin de consensus (par exemple, une base de données distribuée) et un qui n’en a pas besoin (par exemple, un cache CDN). Explique pourquoi dans chaque cas.
-
Panne en cours de route Imagine trois nœuds qui essaient de se mettre d’accord sur une valeur. Le nœud 3 tombe en panne au milieu du vote. Les deux autres peuvent-ils quand même prendre une décision ? Que se passe-t-il quand le nœud 3 revient ?
-
Accord vs Terminaison Si un système garantit l’accord (tous les nœuds décident la même chose) mais pas la terminaison, que peut-il se passer de pire ? Et si c’est l’inverse — terminaison garantie mais pas l’accord ?
Quiz
Élire un Chef
Chapitre 12 — Comment un groupe de machines choisit-il son coordinateur ?
Histoire : L’Élection du Délégué
Imagine une classe de 30 élèves. Le professeur demande : « On a besoin d’un délégué. Levez la main si vous êtes candidat ! »
Trois élèves lèvent la main : Alice, Bob et Carole. Chacun explique pourquoi il ou elle serait un bon délégué. Puis tout le monde vote — mais il faut la majorité pour gagner. Si personne ne l’obtient, on recommence. Alice reçoit 16 voix sur 30. Elle devient déléguée.
Son rôle ? Coordonner les demandes, transmettre les messages au professeur, s’assurer que tout le monde est sur la même longueur d’onde. Si Alice déménage, la classe organise une nouvelle élection. C’est exactement comme ça que Raft choisit son chef (leader).
Dans Raft, le chef est le nœud qui coordonne tout. Les autres nœuds sont des suiveurs (followers). Si le chef tombe en panne, les suiveurs organisent une nouvelle élection automatiquement.
Pourquoi un Chef ?
Sans chef, tous les nœuds parlent en même temps. Chacun essaie de coordonner les autres, personne ne s’écoute, et le résultat est le chaos :
graph LR
subgraph "Sans Chef"
A["Nœud A<br/>« Moi chef ! »"]
B["Nœud B<br/>« Non, moi ! »"]
C["Nœud C<br/>« Vous avez tort ! »"]
end
Chaos["Résultat : personne ne s'entend"]
Avec un chef, un seul nœud prend les décisions et les autres suivent. C’est plus simple, plus rapide, et plus fiable :
graph LR
subgraph "Avec un Chef"
L["Chef<br/>« Voici la décision »"]
F1["Suiveur 1<br/>« Compris ! »"]
F2["Suiveur 2<br/>« Compris ! »"]
L --> F1
L --> F2
end
OK["Résultat : tout le monde est d'accord"]
Raft utilise un leader fort : toutes les décisions passent par le chef. Ça simplifie énormément les choses, car un seul nœud est responsable de la coordination.
Les Trois Rôles
Chaque nœud Raft peut jouer trois rôles, exactement comme dans notre élection de délégué :
- Suiveur (follower) : comme un élève qui attend les instructions du délégué. Il écoute, obéit, et ne prend pas d’initiative.
- Candidat (candidate) : comme un élève qui lève la main pour se présenter. Il demande des votes pour devenir chef.
- Chef (leader) : comme le délégué élu. Il coordonne tout le groupe et prend les décisions.
Voici comment un nœud passe d’un rôle à l’autre :
stateDiagram-v2
[*] --> Suiveur: Démarrage
Suiveur --> Candidat: Délai d'attente expiré<br/>pas de nouvelles du chef
Candidat --> Chef: Majorité de votes obtenue
Candidat --> Suiveur: Mandat supérieur découvert
Chef --> Suiveur: Mandat supérieur découvert
Suiveur --> Suiveur: Battement de cœur reçu du chef
Au démarrage, tous les nœuds sont des suiveurs. Si un suiveur n’entend pas le chef pendant un certain temps, il devient candidat et lance une élection.
Les Mandats (Terms)
Raft organise le temps en mandats (terms) — comme des mandats présidentiels. Chaque élection démarre un nouveau mandat, et les numéros de mandat ne font qu’augmenter. Jamais de retour en arrière.
Voici à quoi ressemble une succession de mandats :
timeline
title Les Mandats dans Raft
Mandat 1 : Chef A élu
: Fonctionnement normal
: Le chef A tombe en panne
Mandat 2 : Élection en cours
: Vote partagé — personne n'a la majorité
: Nouvelle élection...
Mandat 3 : Chef B élu
: Fonctionnement normal
Trois règles importantes :
- Les mandats ne font qu’augmenter — jamais de retour en arrière.
- Si un nœud découvre un mandat supérieur au sien, il redescend immédiatement au rang de suiveur.
- Un seul chef par mandat — c’est garanti par les règles de vote.
Si l’ancien chef revient après une panne et essaie de donner des ordres avec un vieux mandat, les autres nœuds l’ignorent. Il doit redevenir suiveur et attendre les instructions du nouveau chef.
L’Élection, Étape par Étape
Voici comment se déroule une élection, du début à la fin.
Étape 1 : Un suiveur s’impatiente. Le chef n’a pas envoyé de message depuis un moment. Le délai d’attente (timeout) du suiveur expire — il suppose que le chef est en panne.
Étape 2 : Il devient candidat. Le suiveur incrémente le numéro de mandat et vote pour lui-même.
Étape 3 : Il demande des votes. Il envoie un message RequestVote à tous les nœuds du cluster.
Étape 4 : Les votes arrivent. Chaque nœud décide d’accorder ou refuser son vote selon des règles strictes.
Étape 5 : Il devient chef. S’il obtient la majorité, il envoie des battements de cœur (heartbeats) pour annoncer sa victoire et empêcher de nouvelles élections.
Ce diagramme montre les 5 étapes en action :
sequenceDiagram
participant S as Suiveur → Candidat
participant A as Nœud A
participant B as Nœud B
Note over S: Étape 1 : délai expiré !
Note over S: Étape 2 : mandat = mandat + 1<br/>vote pour soi-même
S->>A: Étape 3 : RequestVote(mandat=3)
S->>B: RequestVote(mandat=3)
A-->>S: Étape 4 : Vote OUI
B-->>S: Vote OUI
Note over S: Majorité obtenue !
Note over S: Étape 5 : devient chef
S->>A: Battement de cœur
S->>B: Battement de cœur
Le candidat vote toujours pour lui-même. Avec 5 nœuds, il lui faut au moins 3 votes (lui-même + 2 autres) pour gagner.
Et Si Deux Candidats Se Présentent ?
Parfois, deux suiveurs s’impatientent en même temps. Les deux deviennent candidats, et les votes se divisent. Personne n’obtient la majorité — c’est un vote partagé (split vote).
Voici ce qui arrive avec des délais identiques :
graph LR
subgraph "Délais fixes — vote partagé"
A1["A : candidat à t=150ms"]
B1["B : candidat à t=150ms"]
C1["C : candidat à t=150ms"]
end
Result1["1 vote chacun — personne ne gagne !"]
La solution ? Des délais aléatoires. Chaque nœud choisit un temps d’attente différent dans une plage (par exemple 150-300ms) :
gantt
title Délais aléatoires : un seul candidat démarre en premier
dateFormat x
axisFormat %L
Nœud A (180ms) :a1, 0, 180
Nœud B (280ms) :b1, 0, 280
Nœud C (200ms) :c1, 0, 200
C devient candidat :milestone, m1, 200, 0s
Le Nœud C atteint son délai en premier (200ms) et devient candidat. Les Nœuds A et B reçoivent sa demande de vote et réinitialisent leur délai. Résultat : C obtient la majorité avant que les autres ne deviennent candidats.
Les délais aléatoires sont la clé qui permet à Raft de contourner le résultat FLP vu au chapitre précédent. C’est l’astuce qui rend le consensus possible en pratique.
Les Règles de Vote (en pseudocode)
Quand un nœud reçoit une demande de vote, il suit trois règles strictes :
fonction voteDemande(mandat_candidat, id_candidat, journal_candidat):
si mandat_candidat < mon_mandat → NON
si j'ai déjà voté ce mandat → NON
si journal_candidat pas à jour → NON
sinon → OUI
- Règle 1 : Un mandat inférieur ? Refusé. Ce candidat est en retard sur l’état actuel du cluster.
- Règle 2 : Déjà voté dans ce mandat ? Refusé. Un seul vote par nœud et par mandat — ça garantit qu’un seul chef peut être élu.
- Règle 3 : Le journal (log) du candidat doit être à jour. On ne veut pas élire un chef qui a manqué des informations. On compare d’abord le mandat de la dernière entrée, puis l’index.
La règle du journal est cruciale : elle garantit qu’un chef élu possède toutes les informations validées par les mandats précédents. C’est ce qui assure la sécurité du consensus.
Résumé
- Raft utilise un chef (leader) unique pour coordonner toutes les décisions du cluster
- Chaque nœud peut jouer trois rôles : suiveur, candidat ou chef
- Le temps est divisé en mandats (terms) qui ne font qu’augmenter — un seul chef par mandat
- L’élection se déclenche quand un suiveur n’entend plus le chef : il devient candidat et demande des votes
- Les délais aléatoires empêchent les votes partagés et garantissent qu’une élection aboutit rapidement
- Les trois règles de vote assurent qu’un seul chef est élu par mandat, avec un journal complet
Exercices
-
Timeouts en action : Nœud A (délai 150ms), B (délai 280ms), C (délai 200ms). Qui devient candidat en premier ? Que se passe-t-il si A et C tombent en panne juste avant l’élection ?
-
Le journal du candidat : Le Nœud A a un journal avec 10 entrées au mandat 3. Le Nœud B a 12 entrées au mandat 2. Si B demande le vote de A, A doit-il accepter ? Pourquoi ?
-
Mandat obsolète : Un chef envoie un ordre avec le mandat 3, mais les autres nœuds sont au mandat 5. Que se passe-t-il ? Et si le chef envoie avec le mandat 6 ?
Quiz
Le Journal Partagé
Chapitre 13 — Comment le chef fait-il pour que tous les nœuds gardent la même trace des décisions ?
Histoire : Le Cahier de Décisions
Imagine une équipe de cinq personnes qui gèrent un projet ensemble. Chaque jour, l’équipière principale — appelons-la Alice — note toutes les décisions du jour dans un grand cahier : « Lundi : on utilise React. Mardi : la base sera PostgreSQL. Mercredi : on déploie vendredi. »
Chaque membre de l’équipe possède son propre cahier et copie scrupuleusement les décisions d’Alice. À la fin de chaque journée, tous les cahiers sont identiques. Si jamais il y a un doute, il suffit de comparer les pages : « Page 3, on a la même chose ? Oui ? Alors tout va bien. »
Mais que se passe-t-il si Bob était absent mardi ? Pas de panique : le mercredi, Alice lui montre ce qu’il a manqué. « Regarde, mardi on a décidé PostgreSQL, c’est la page 2 de mon cahier. Recopie ça dans ton cahier à la bonne page. » Bob recopie, et son cahier est à jour.
Et si un jour Alice est malade ? L’équipe élit une nouvelle équipière principale. Celle-ci utilise son cahier — qui est à jour car elle a tout copié — pour continuer à noter les décisions. Personne ne perd aucune information, le projet continue sans interruption.
Dans Raft, ce cahier de décisions s’appelle le journal (log). Le chef écrit les décisions, les suiveurs les recopient. Si le chef tombe en panne, le nouveau chef possède un journal complet et peut continuer. Tout est dans le cahier.
Qu’est-ce qu’un Journal ?
Un journal est une liste ordonnée d’entrées, comme les pages d’un cahier. Chaque entrée contient trois choses :
- un index (le numéro de page — 1, 2, 3…)
- un mandat (quel était le numéro de mandat quand cette entrée a été ajoutée)
- une commande (la décision elle-même, comme « SET x = 5 »)
Dans notre cahier d’équipe, l’entrée numéro 3 ressemblerait à ça : page 3, décision prise sous le mandat d’Alice, commande « SET y = 2 ». Chaque nœud du cluster possède son propre journal et s’assure qu’il correspond à celui du chef. L’index augmente toujours (1, 2, 3, …), mais le mandat peut rester le même ou changer quand un nouveau chef est élu.
Voici trois nœuds avec leurs journaux. Certains sont en avance, d’autres en retard. La notation [index, mandat] signifie « entrée à cet index, ajoutée pendant ce mandat » :
graph LR
subgraph "Journaux des trois nœuds"
L["Chef<br/>[1,1] [2,1] [3,1] [4,2] [5,2]"]
F1["Suiveur A<br/>[1,1] [2,1] [3,1] [4,2]"]
F2["Suiveur B<br/>[1,1] [2,1]"]
end
Le chef a 5 entrées, le Suiveur A en a 4 (il lui manque la dernière), le Suiveur B seulement 2 (il a pris du retard). Le chef va aider les suiveurs à se mettre à jour en leur envoyant les entrées manquantes — c’est la réplication.
Tu remarques que les mandats changent en cours de journal : les entrées 1-3 sont du mandat 1, les entrées 4-5 du mandat 2. Ça arrive quand un nouveau chef est élu — il continue le journal là où l’ancien chef s’est arrêté, mais avec son propre numéro de mandat.
Le journal est la mémoire du cluster. Tout ce qui a été décidé s’y trouve, dans l’ordre exact où ça a été décidé. C’est la source de vérité commune de tout le système.
La Règle d’Or
Raft repose sur une propriété fondamentale appelée propriété de correspondance du journal (log matching property). Elle est si importante qu’on peut l’appeler la Règle d’Or de Raft :
Si deux journaux possèdent une entrée avec le même index et le même mandat, alors toutes les entrées précédentes sont identiques.
C’est comme nos cahiers d’équipe : si ton cahier et le mien sont d’accord sur la page 5, alors nous sommes forcément d’accord sur les pages 1 à 4. Pourquoi ? Parce que chaque nouvelle page dépend de la précédente — on ne peut pas avoir la bonne page 5 sans avoir eu la bonne page 4 avant.
graph LR
subgraph "Propriété de correspondance"
L["Chef : [1,1] [2,1] [3,2] [4,2] ✓"]
F["Suiveur : [1,1] [2,1] [3,2] [4,2] ✓"]
end
Match["Même entrée à l'index 4<br/>→ tout ce qui précède est identique"]
Cette propriété est garantie par la façon dont Raft ajoute les entrées : le chef envoie toujours l’index et le mandat de l’entrée précédente avec chaque nouvelle entrée. Si le suiveur ne trouve pas cette entrée précédente à la bonne place, il refuse d’ajouter la nouvelle. C’est un mécanisme de vérification en cascade : chaque entrée confirme la précédente, qui confirme la sienne, et ainsi de suite.
Résultat : les journaux ne peuvent diverger qu’à partir de la fin. Les entrées déjà validées ne sont jamais remises en question — elles sont gravées dans le marbre.
La Réplication en 5 Étapes
Voici comment une commande client voyage du chef jusqu’à tous les journaux. Chaque étape est essentielle pour garantir que personne n’est laissé pour compte. C’est le cœur battant de Raft.
Étape 1 : Le client envoie une commande. Un client demande au chef : « SET x = 5 ». Seul le chef reçoit les demandes des clients — c’est son rôle de coordinateur.
Étape 2 : Le chef ajoute à son journal. Le chef crée une nouvelle entrée avec le prochain index disponible, le mandat courant, et la commande. Il l’ajoute à la fin de son journal, mais ne la valide pas encore. Pour l’instant, c’est provisoire.
Étape 3 : Le chef envoie AppendEntries. Le chef envoie un message AppendEntries à tous les suiveurs en parallèle. Ce message contient la nouvelle entrée + l’index et le mandat de l’entrée précédente (c’est ça qui garantit la propriété de correspondance vue plus haut). Ce message sert aussi de battement de cœur (heartbeat) — même s’il n’y a pas de nouvelle entrée, le chef envoie des AppendEntries vides régulièrement pour signaler « je suis toujours là ».
Étape 4 : Les suiveurs ajoutent et répondent. Chaque suiveur vérifie d’abord que l’entrée précédente correspond bien à ce qu’il a dans son journal. Si oui, il ajoute la nouvelle entrée et répond « OK ». Si non, il refuse — le chef devra revenir à une étape précédente.
Étape 5 : Majorité → validé ! Quand le chef reçoit des réponses positives de la majorité des nœuds (lui-même inclus), l’entrée est validée (committed). Il applique alors la commande à la machine à états et répond au client. La commande est maintenant définitive — elle ne sera jamais annulée.
sequenceDiagram
participant C as Client
participant Chef as Chef
participant A as Suiveur A
participant B as Suiveur B
C->>Chef: Étape 1 : SET x = 5
Note over Chef: Étape 2 : ajout au journal<br/>[6, 3, SET x=5]
Chef->>A: Étape 3 : AppendEntries
Chef->>B: AppendEntries
Note over A: Vérifie entrée précédente
Note over B: Vérifie entrée précédente
A-->>Chef: Étape 4 : OK
B-->>Chef: OK
Note over Chef: Étape 5 : majorité ! Validé
Chef-->>C: Résultat : x = 5
Le client attend que l’entrée soit validée avant de recevoir une réponse. Ça garantit qu’il ne reçoit un « OK » que si la majorité du cluster a enregistré sa commande. Pas de faux espoirs : si le client reçoit un accusé de réception, c’est que c’est définitif.
Note que le chef n’attend pas tous les suiveurs — juste la majorité. Dans un cluster de 5, si 3 nœuds (le chef + 2 suiveurs) ont l’entrée, c’est suffisant. Les deux autres suiveurs seront mis à jour plus tard, à leur rythme. C’est ce qui rend Raft rapide : il avance à la vitesse de la majorité, pas du plus lent.
Et Si les Journaux Divergent ?
Parfois, un suiveur a des entrées que le chef n’a pas — par exemple, si un ancien chef (d’un mandat précédent) avait commencé à répliquer des entrées avant de tomber en panne. Ces entrées n’ont jamais été validées par la majorité, donc elles ne sont pas définitives. Le nouveau chef doit les corriger.
C’est comme si un ancien équipier principal avait noté des décisions provisoires dans son cahier avant de partir. Le nouvel équipier principal les efface et les remplace par les bonnes. Seules les pages validées (signées par la majorité de l’équipe) sont protégées.
Voici un exemple concret de divergence :
graph LR
subgraph "Avant correction"
L1["Chef : [1,1] [2,2] [3,2]"]
F1["Suiveur : [1,1] [2,1] [3,1] [4,3]"]
end
Les journaux divergent à l’index 2 : le chef a [2,2] mais le suiveur a [2,1]. Le suiveur a même une entrée supplémentaire [4,3] d’un ancien mandat qui n’a jamais été validée.
Le chef résout ça en reculant étape par étape, comme quelqu’un qui chercherait la dernière page commune entre deux cahiers :
- Le chef envoie
AppendEntriespour l’index 4 avec l’entrée précédente[3,2]. - Le suiveur vérifie : son entrée à l’index 3 est
[3,1]— ça ne correspond pas. - Le suiveur répond « NON ». Le chef recule d’un cran et essaie l’index 3.
- Même problème : le suiveur a
[2,1], le chef attend[2,2]. Encore « NON ». - Le chef essaie l’index 1 : les deux ont
[1,1]— correspondance trouvée ! - À partir de là, le chef envoie les entrées correctes (index 2, puis 3). Le suiveur écrase ses vieilles entrées incorrectes.
Ce processus est entièrement automatique — le chef et le suiveur n’ont pas besoin d’intervention humaine. Le chef persévère jusqu’à trouver le point de divergence, puis corrige. C’est méthodique et fiable.
graph LR
subgraph "Après correction"
L2["Chef : [1,1] [2,2] [3,2]"]
F2["Suiveur : [1,1] [2,2] [3,2]"]
end
OK["Journaux synchronisés !"]
Les entrées non validées d’un ancien mandat sont simplement écrasées. Seules les entrées validées (commises par la majorité) sont garanties de ne jamais être perdues. C’est pour ça que la distinction entre « présent dans le journal » et « validé » est si importante.
La Validation (Commit)
Jusqu’ici, on a vu comment les entrées sont ajoutées aux journaux. Mais il y a une différence cruciale entre « présent dans le journal » et « définitivement accepté ». C’est la validation.
Une entrée est validée (committed) quand le chef sait qu’elle est stockée sur la majorité des nœuds. À partir de ce moment, elle ne sera jamais perdue — même si la moitié du cluster tombe en panne juste après.
La règle est simple : une entrée est validée dès que la majorité des nœuds l’ont dans leur journal. Dans un cluster de 5, il suffit que 3 nœuds (le chef + 2 suiveurs) l’aient stockée. Pas besoin d’unanimité — la majorité suffit.
Mais il y a une subtilité importante. Le chef ne valide une entrée d’un ancien mandat que lorsqu’au moins une entrée du mandat courant est stockée sur la majorité des nœuds. Pourquoi ? Parce que c’est l’élection du nouveau chef qui prouve indirectement que les anciennes entrées étaient bien répliquées. Ça garantit qu’une entrée validée ne sera jamais écrasée par un futur chef. C’est un mécanisme indirect mais très élégant.
Pense à la validation comme un contrat signé : avant la signature, tout peut changer. Après, c’est définitif. Le client ne reçoit une réponse qu’après la validation — il n’a jamais à s’inquiéter qu’une décision soit annulée.
En pratique, le chef inclut son index de validation dans chaque
AppendEntries. Ça permet aux suiveurs de savoir quelles entrées sont définitives et de les appliquer à leur machine à états. C’est un mécanisme simple mais puissant : une seule valeur qui dit « jusque-là, c’est sûr ».
La Machine à États
Le journal contient les décisions, mais qui les exécute réellement ? C’est le rôle de la machine à états (state machine). Chaque nœud possède sa propre machine à états qui lit les entrées du journal, dans l’ordre, et les applique une par une.
Pense à la machine à états comme un interprète : elle prend chaque commande du journal et l’exécute pour construire la base de données finale.
Si le journal dit « SET x=1, puis SET y=3, puis SET x=5 », la machine à états exécute ces trois commandes dans l’ordre et arrive à l’état final x=5, y=3.
graph LR
J["Journal<br/>[SET x=1] [SET y=3] [SET x=5]"] --> SM["Machine à états"]
SM --> KV["Résultat :<br/>x = 5, y = 3"]
L’analogie est simple : le journal est la recette, la machine à états est le plat cuisiné. La recette dit « ajoute un œuf, puis du sucre, puis de la farine ». La machine à états suit la recette dans l’ordre et produit le gâteau. Si tu suis la même recette, tu obtiens le même gâteau — garanti.
Et c’est là que tout s’assemble. Souviens-toi du chapitre 11 : on voulait que tous les nœuds soient d’accord. Le journal garantit que tous les nœuds voient les mêmes commandes. La machine à états garantit qu’ils les exécutent dans le même ordre. Donc ils arrivent tous au même résultat. Le consensus est atteint, commande par commande, entrée par entrée.
Chaque nœud applique les entrées validées seulement — jamais les entrées provisoires. Comme tous les journaux sont identiques jusqu’à l’index validé (merci la propriété de correspondance), toutes les machines à états produisent le même résultat. C’est comme ça que Raft garantit que tous les nœuds sont d’accord sur l’état du système.
La machine à états est déterministe : les mêmes entrées dans le même ordre produisent toujours le même état. C’est ce qui permet à chaque nœud de construire une copie identique de la base de données, indépendamment et sans communication supplémentaire.
Le jour où un nœud redémarre après une panne, il lui suffit de relire son journal depuis le début et de rejouer chaque commande dans la machine à états. En quelques secondes, il retrouve son état complet. Le journal est la mémoire permanente, la machine à états est reconstruite à la volée.
Résumé
- Le journal (log) est la liste ordonnée de toutes les décisions du cluster — chaque entrée a un index, un mandat et une commande
- La propriété de correspondance garantit que si deux journaux sont identiques à un index donné, tout ce qui précède est aussi identique
- La réplication se fait en 5 étapes : client → chef ajoute → AppendEntries → suiveurs vérifient et ajoutent → majorité = validé
- Quand les journaux divergent, le chef recule jusqu’à trouver la dernière correspondance, puis écrase les entrées incorrectes
- Une entrée est validée (committed) quand la majorité l’a stockée — à partir de ce moment, elle ne sera jamais perdue
- La machine à états applique les entrées validées dans l’ordre pour construire l’état final — tous les nœuds obtiennent le même résultat
- Le journal et la machine à états ensemble réalisent le consensus : mêmes commandes, même ordre, même résultat
Exercices
-
Réplication en action : Le chef a le journal
[SET x=1, SET y=2, SET z=3]. Le suiveur a[SET x=1]. Que doit envoyer le chef au suiveur pour le rattraper ? Quels index et mandats le suiveur vérifie-t-il avant d’accepter les nouvelles entrées ? Que se passe-t-il si le suiveur refuse ? -
Entrée validée : Un cluster de 5 nœuds. Le chef envoie
AppendEntriesà 4 suiveurs. 3 répondent OK (plus le chef lui-même = 4 sur 5). L’entrée est-elle validée ? Que se passerait-il si seulement 1 suiveur répondait OK ? -
Divergence de journal : Le chef a
[1,1] [2,2] [3,2]. Le suiveur a[1,1] [2,1] [3,1]. À quel index les journaux divergent-ils ? Combien d’étapes le chef devra-t-il reculer pour trouver la dernière correspondance ? Que se passe-t-il après ?
Quiz
Raft en Action
Chapitre 14 — Tu as construit le moteur. Maintenant, conduisons — et voyons ce qui se passe quand les choses cassent.
Tu connais maintenant les trois piliers de Raft : le consensus (chapitre 11), l’élection du chef (chapitre 12) et la réplication du journal (chapitre 13). Chaque pièce du puzzle est en place. Mais un système distribué ne vit pas dans un monde parfait. Les pannes arrivent, le réseau se coupe, et les nœuds reviennent avec des informations obsolètes.
Ce chapitre met tout ensemble. On va regarder le système complet en action, puis simuler trois scénarios de panne réels. Accroche-toi — c’est là que Raft montre sa force.
Le Système Complet
Avant de casser quoi que ce soit, voici l’architecture complète de Raft. Tu as déjà vu chaque élément individuellement. Maintenant, regarde comment ils s’emboîtent :
graph TB
subgraph "Clients"
C1["Client 1<br/>SET x = 5"]
C2["Client 2<br/>GET x"]
end
subgraph "Cluster Raft"
L["Chef<br/>Journal : 1 2 3<br/>Machine à états"]
F1["Suiveur A<br/>Journal : 1 2 3"]
F2["Suiveur B<br/>Journal : 1 2 3"]
F3["Suiveur C<br/>Journal : 1 2"]
F4["Suiveur D<br/>Journal : 1 2 3"]
end
C1 -->|"commande"| L
C2 -->|"lecture"| L
L -->|"AppendEntries"| F1
L -->|"AppendEntries"| F2
L -->|"AppendEntries"| F3
L -->|"AppendEntries"| F4
F1 -->|"OK"| L
F2 -->|"OK"| L
F3 -->|"OK"| L
F4 -->|"OK"| L
L -->|"résultat"| C1
L -->|"valeur"| C2
Rappel rapide des rôles : le chef reçoit les commandes des clients, les ajoute à son journal (log), et les envoie aux suiveurs (followers) via AppendEntries. Quand la majorité a confirmé, l’entrée est validée (committed) et appliquée à la machine à états (state machine). Le client reçoit alors sa réponse.
Raft fonctionne en deux phases : élire un chef (chapitre 12), puis répliquer les décisions via le journal (chapitre 13). Tout le reste — les pannes, les réseaux coupés, les retours en arrière — est géré automatiquement.
Scénario 1 : Le Chef Tombe en Panne
Le cluster tourne normalement. Le chef envoie ses battements de cœur (heartbeats) réguliers. Soudain, le chef plante — plus de réponse, plus de battements de cœur. Que se passe-t-il ?
Voici le scénario étape par étape :
sequenceDiagram
participant C as Client
participant L as Chef (mandat 3)
participant F1 as Suiveur A
participant F2 as Suiveur B
participant F3 as Suiveur C
C->>L: SET x = 10
L->>L: Ajoute au journal
L->>F1: AppendEntries
L->>F2: AppendEntries
Note over L: 💥 Le chef tombe en panne
Note over F1,F3: Délai d'attente expire...
F1->>F1: Devient candidate (mandat 4)
F1->>F2: RequestVote
F1->>F3: RequestVote
F2->>F1: Vote OUI
F3->>F1: Vote OUI
Note over F1: Majorité obtenue !
F1->>F2: Battement de cœur
F1->>F3: Battement de cœur
C->>F1: SET y = 20
F1->>F2: AppendEntries (SET y = 20)
F1->>F3: AppendEntries (SET y = 20)
Pas de panique — tout est automatique :
- Les suiveurs attendent les battements de cœur du chef. Quand leur délai d’attente (timeout) expire, ils savent que le chef est injoignable.
- Le premier suiveur dont le délai expire devient candidat et lance une élection (chapitre 12).
- S’il obtient la majorité, il devient le nouveau chef et commence à envoyer des battements de cœur.
- Le nouveau chef utilise son journal — qui est à jour car il a tout répliqué — pour continuer. Les suiveurs en retard se mettent à jour via AppendEntries.
La transition est invisible pour le client. Le client réessaie sa commande auprès du nouveau chef. Dans un système réel, cette transition prend quelques dizaines de millisecondes.
Scénario 2 : Le Réseau Se Coupe
Cinq nœuds fonctionnent en harmonie. Puis le réseau se coupe : trois nœuds (A, B, C) restent connectés entre eux, et deux nœuds (D, E) sont isolés. C’est une partition réseau (network partition).
graph LR
subgraph "Côté majorité (3 nœuds)"
A["Nœud A<br/>Chef, mandat 5"]
B["Nœud B<br/>Suiveur"]
C["Nœud C<br/>Suiveur"]
end
subgraph "Côté minorité (2 nœuds)"
D["Nœud D<br/>Suiveur isolé"]
E["Nœud E<br/>Suiveur isolé"]
end
A ---|"OK"| B
A ---|"OK"| C
D -.-|"✗ Pas de connexion"| A
E -.-|"✗ Pas de connexion"| A
D --- E
Que se passe-t-il de chaque côté ?
Côté majorité (3 nœuds sur 5) : le chef A continue d’envoyer des battements de cœur à B et C. Il a toujours la majorité (3 sur 5), donc il peut valider de nouvelles entrées. Le système continue de fonctionner normalement.
Côté minorité (2 nœuds sur 5) : les suiveurs D et E ne reçoivent plus de battements de cœur. Leurs délais d’attente expirent. L’un d’eux devient candidat et demande des votes. Mais avec seulement 2 nœuds, impossible d’obtenir la majorité (il faut 3 sur 5). Les élections échouent, encore et encore. Aucune entrée ne peut être validée de ce côté.
C’est la sécurité de Raft : le côté minorité ne peut rien valider. S’il le pouvait, on aurait deux clusters indépendants avec des données différentes — un split-brain. La règle de majorité empêche ça.
Quand le réseau se répare, D et E reçoivent des AppendEntries du chef A (mandat 5). Ils constatent que le mandat d’A est plus élevé que le leur. Ils acceptent immédiatement de redevenir suiveurs et se synchronisent avec le journal d’A. Tout rentre dans l’ordre.
Scénario 3 : L’Ancien Chef Revient
Le chef du mandat 3 plante. Une élection a lieu, et le nœud B devient le nouveau chef (mandat 4). Le système continue normalement. Puis, le nœud A — l’ancien chef — redémarre et revient dans le cluster.
sequenceDiagram
participant A as Ancien chef (mandat 3)
participant B as Nouveau chef (mandat 4)
participant C as Suiveur C
Note over A: Redémarre, croit être chef (mandat 3)
A->>B: AppendEntries (mandat 3)
B->>A: "Mon mandat est 4, pas 3"
Note over A: Mandat reçu (4) > mon mandat (3)
A->>A: Redescend au rang de suiveur
A->>B: Accepte le mandat 4
B->>A: AppendEntries (met à jour le journal)
Note over A: Synchronisé ! Tout est en ordre.
L’ancien chef pense encore être le chef du mandat 3. Il essaie d’envoyer des commandes aux autres nœuds. Mais dès qu’il reçoit un message avec le mandat 4 (supérieur au sien), la règle est claire :
Si tu reçois un message avec un mandat supérieur au tien, tu redescends immédiatement au rang de suiveur. Pas de débat, pas de négociation.
C’est la beauté des mandats (terms) : ils ne font qu’augmenter. Un ancien chef ne peut jamais « voler » le leadership. Dès qu’il contacte le cluster, il découvre le nouveau mandat et se soumet automatiquement. Son journal est ensuite corrigé par le nouveau chef via AppendEntries.
Quand Avez-Vous Besoin du Consensus ?
Tu n’as pas besoin de consensus partout. Voici un guide pour décider :
| Situation | Consensus nécessaire ? | Pourquoi |
|---|---|---|
| Base de données distribuée | Oui | Tous les nœuds doivent être d’accord sur les données |
| Cache CDN | Non | La cohérence éventuelle (eventual consistency) suffit |
| Verrou distribué | Oui | Un seul nœud doit détenir le verrou à la fois |
| Élection de leader | Oui | Un seul leader à la fois, pas de split-brain |
| Système en lecture seule | Non | Pas de décision à prendre ensemble |
| File de messages | Ça dépend | Si l’ordre exact compte, oui. Sinon, non. |
La règle est simple : si plusieurs nœuds doivent être strictement d’accord sur l’état → consensus. Si la cohérence éventuelle suffit → pas besoin. Le consensus a un coût (plus lent, plus complexe), donc ne l’utilise que quand c’est nécessaire.
Paxos vs Raft (En Bref)
Tu as peut-être entendu parler de Paxos, l’ancêtre des algorithmes de consensus. Il a été publié en 1998 et est correct mathématiquement, mais il est tellement complexe que même des chercheurs expérimentés ont du mal à le comprendre — et encore plus à l’implémenter correctement.
Raft a été créé en 2014 avec un objectif différent : être compréhensible. Il offre les mêmes garanties que Paxos, mais sa structure est beaucoup plus claire.
| Paxos | Raft | |
|---|---|---|
| Année | 1998 | 2014 |
| Philosophie | Correct mais opaque | Correct et compréhensible |
| Leader | Pas toujours clair | Toujours un chef unique |
| Journal | Non structuré | Structuré et ordonné |
| Facilité d’implémentation | Très difficile | Beaucoup plus simple |
Si Paxos est la théorie, Raft en est la pratique. Les deux garantissent la même chose — mais Raft est celui que tu peux expliquer à un collègue autour d’un café.
Dans le Vrai Monde
Raft n’est pas qu’une théorie. Voici des systèmes que tu utilises peut-être tous les jours :
- etcd — Le magasin clé-valeur derrière Kubernetes. Chaque cluster Kubernetes utilise etcd pour stocker sa configuration, et etcd utilise Raft pour rester cohérent.
- Consul (HashiCorp) — Un outil de découverte de services et de configuration. Utilise Raft pour garantir que tous les nœuds voient la même configuration.
- CockroachDB — Une base de données SQL distribuée. Utilise une variante de Raft pour répliquer les données à travers plusieurs datacenters.
Résumé du Parcours
Bravo — tu viens de traverser les quatre chapitres sur le consensus. Voici le voyage complet :
graph LR
C11["Chapitre 11<br/>Le Problème<br/>« Pourquoi s'accorder ? »"] --> C12["Chapitre 12<br/>Élire un Chef<br/>« Qui décide ? »"]
C12 --> C13["Chapitre 13<br/>Le Journal<br/>« Comment répliquer ? »"]
C13 --> C14["Chapitre 14<br/>Raft en Action<br/>« Et si ça casse ? »"]
Ce que tu as appris :
- Chapitre 11 : Le consensus est le problème fondamental — faire s’accorder plusieurs machines malgré les pannes et les retards.
- Chapitre 12 : Raft élit un chef avec des mandats, des votes et des délais aléatoires. Un seul chef par mandat.
- Chapitre 13 : Le chef réplique son journal via AppendEntries. La majorité valide. Les conflits se résolvent automatiquement.
- Chapitre 14 : Le système complet gère les pannes de chef, les partitions réseau et les retours d’anciens chefs. La règle de majorité empêche les split-brain.
Raft n’est pas parfait — il a des limites (latence, coût de la majorité). Mais c’est l’un des algorithmes les plus élégants pour résoudre l’un des problèmes les plus difficiles de l’informatique distribuée. Et maintenant, tu le comprends.
Exercices
-
Partition réseau : Un cluster de 5 nœuds se coupe en deux groupes (3 et 2). Le côté 3 peut-il valider une entrée ? Le côté 2 peut-il ? Que se passe-t-il quand le réseau se répare ?
-
Ancien chef : Le chef du mandat 7 plante. Un nouveau chef est élu (mandat 8). L’ancien chef redémarre et envoie un AppendEntries avec mandat 7 au nouveau chef. Que se passe-t-il ?
-
Choisir le bon outil : Tu dois construire un système où 3 serveurs gèrent un compteur partagé. Chaque incrémentation doit être exacte — pas de doublon, pas de perte. As-tu besoin de consensus ? Pourquoi ?
Quiz
Configuration Docker
Ce guide couvre l’installation de Docker et Docker Compose pour exécuter les exemples du cours.
Installation de Docker
Linux
curl -fsSL https://get.docker.com -o get-docker.sh
sudo sh get-docker.sh
sudo usermod -aG docker $USER
macOS
Téléchargez Docker Desktop depuis docker.com
Windows
Téléchargez Docker Desktop depuis docker.com
Vérifier l’Installation
docker --version
docker-compose --version
Exécuter les Exemples du Cours
Chaque chapitre inclut un fichier Docker Compose :
cd examples/01-queue
docker-compose up
Commandes Courantes
# Démarrer les services
docker-compose up
# Démarrer en arrière-plan
docker-compose up -d
# Voir les journaux
docker-compose logs
# Arrêter les services
docker-compose down
# Reconstruire après des changements de code
docker-compose up --build
Dépannage
Voir Dépannage pour les problèmes courants.
Dépannage
Problèmes courants et solutions lors de l’utilisation des exemples du cours.
Problèmes Docker
Port Déjà Utilisé
Error: bind: address already in use
Solution : Changez le port dans docker-compose.yml ou arrêtez le service en conflit.
Permission Refusée
Error: permission denied while trying to connect to the Docker daemon
Solution : Ajoutez votre utilisateur au groupe docker :
sudo usermod -aG docker $USER
newgrp docker
Problèmes de Build
TypeScript : Module Non Trouvé
Solution : Installez les dépendances :
npm install
Python : Module Non Trouvé
Solution : Installez les dépendances :
pip install -r requirements.txt
Problèmes d’Exécution
Connexion Refusée
Solution : Vérifiez que tous les services sont en cours d’exécution :
docker-compose ps
Le Nœud ne Peut pas se Connecter aux Pairs
Solution : Vérifiez la configuration réseau dans docker-compose.yml. Assurez-vous que tous les nœuds sont sur le même réseau.
Obtenir de l’Aide
Si vous rencontrez des problèmes non couverts ici :
- Consultez les journaux Docker :
docker-compose logs - Vérifiez votre installation Docker :
docker --version - Voir Pour Aller Plus Loin pour des ressources supplémentaires
Pour Aller Plus Loin
Ressources pour approfondir votre compréhension des systèmes distribués.
Livres
| Titre | Auteur | Focus |
|---|---|---|
| Designing Data-Intensive Applications | Martin Kleppmann | Conception moderne de bases de données et systèmes distribués |
| Distributed Systems: Principles and Paradigms | Tanenbaum & van Steen | Fondements académiques |
| Introduction to Reliable Distributed Programming | Cachin, Guerraoui, Rodrigues | Fondements formels |
Articles
Fondamentaux
- Brewer, E. A. (2000). “Towards robust distributed systems”
- Gilbert, S. & Lynch, N. (2002). “Brewer’s conjecture and the feasibility of consistent, available, partition-tolerant web services”
- Fischer, M. J., Lynch, N. A., & Paterson, M. S. (1985). “Impossibility of distributed consensus with one faulty process”
Consensus
- Ongaro, D. & Ousterhout, J. (2014). “In Search of an Understandable Consensus Algorithm (Raft)”
- Lamport, L. (2001). “Paxos Made Simple”
Ressources en Ligne
- The Raft Consensus Algorithm
- Jepsen: Distributed Systems Safety Analysis
- Distributed Systems Reading List
Cours Vidéo
- MIT 6.824: Distributed Systems
- Stanford CS247: Advanced Distributed Systems
Pratique
- Construisez votre propre système distribué à partir de zéro
- Contribuez à des bases de données distribuées open source
- Participez à des hackathons de systèmes distribués