Observables et Operators RxJS
01/05/2025Environ 5 minCours
Observables et Operators RxJS en Angular
📚 Table des matières
- Qu'est-ce qu'un Observable ?
- Observables vs Promises
- Créer des Observables
- Souscrire aux Observables
- Les Operators (Pipes) RxJS
- Operators de Transformation
- Operators de Filtrage
- Operators de Combinaison
- Gestion des erreurs
- Observables dans Angular
- Exemples pratiques
🔍 Qu'est-ce qu'un Observable ?
Un Observable est un flux de données qui peut émettre des valeurs au fil du temps. C'est comme un tuyau d'eau où les données coulent.
Concepts clés :
- Stream : Flux de données
- Observer : Ce qui "écoute" le flux
- Subscription : La connexion entre Observable et Observer
- Operators : Fonctions qui transforment le flux
// Analogie simple
// Observable = Chaîne YouTube
// Observer = Abonné qui regarde
// Subscription = Abonnement
// Operators = Filtres/Effets sur les vidéos
🆚 Observables vs Promises
Promises | Observables |
---|---|
Une seule valeur | Multiples valeurs |
Eager (s'exécute immédiatement) | Lazy (s'exécute à la souscription) |
Non annulable | Annulable |
.then() / .catch() | .pipe() avec operators |
// Promise - Une seule valeur
const promise = fetch('/api/user/1')
.then(response => response.json());
// Observable - Flux de valeurs
const observable$ = new Observable(observer => {
observer.next('Valeur 1');
observer.next('Valeur 2');
observer.next('Valeur 3');
observer.complete();
});
🏗️ Créer des Observables
1. Observable basique
import { Observable } from 'rxjs';
const basicObservable$ = new Observable<string>(observer => {
observer.next('Hello');
observer.next('World');
observer.complete();
});
2. Avec of()
- Valeurs statiques
import { of } from 'rxjs';
const numbers$ = of(1, 2, 3, 4, 5);
const users$ = of(
{ id: 1, name: 'Alice' },
{ id: 2, name: 'Bob' }
);
3. Avec from()
- Arrays/Promises
import { from } from 'rxjs';
const fromArray$ = from([1, 2, 3, 4, 5]);
const fromPromise$ = from(fetch('/api/data'));
4. Avec interval()
- Émissions périodiques
import { interval } from 'rxjs';
const timer$ = interval(1000); // Émet 0, 1, 2, 3... chaque seconde
5. Avec fromEvent()
- Événements DOM
import { fromEvent } from 'rxjs';
const clicks$ = fromEvent(document, 'click');
const keyups$ = fromEvent(document, 'keyup');
👂 Souscrire aux Observables
import { Component, OnInit, OnDestroy } from '@angular/core';
import { Subscription } from 'rxjs';
@Component({...})
export class ExampleComponent implements OnInit, OnDestroy {
private subscription: Subscription = new Subscription();
ngOnInit() {
// Souscription simple
const sub1 = numbers$.subscribe({
next: value => console.log('Reçu:', value),
error: err => console.error('Erreur:', err),
complete: () => console.log('Terminé!')
});
// Ajouter à la subscription principale
this.subscription.add(sub1);
}
ngOnDestroy() {
// IMPORTANT: Se désabonner pour éviter les fuites mémoire
this.subscription.unsubscribe();
}
}
🔧 Les Operators (Pipes) RxJS
Les operators sont des fonctions qui transforment, filtrent ou combinent les flux de données.
Syntaxe de base :
observable$.pipe(
operator1(),
operator2(),
operator3()
).subscribe(result => console.log(result));
🔄 Operators de Transformation
1. map()
- Transformer chaque valeur
import { map } from 'rxjs/operators';
const numbers$ = of(1, 2, 3, 4, 5);
numbers$.pipe(
map(x => x * 2)
).subscribe(console.log); // 2, 4, 6, 8, 10
2. pluck()
- Extraire une propriété
import { pluck } from 'rxjs/operators';
const users$ = of(
{ id: 1, name: 'Alice', age: 25 },
{ id: 2, name: 'Bob', age: 30 }
);
users$.pipe(
pluck('name')
).subscribe(console.log); // 'Alice', 'Bob'
3. switchMap()
- Changer d'Observable
import { switchMap } from 'rxjs/operators';
const userId$ = of(1, 2, 3);
userId$.pipe(
switchMap(id => this.http.get(`/api/users/${id}`))
).subscribe(user => console.log(user));
4. mergeMap()
- Fusionner plusieurs Observables
import { mergeMap } from 'rxjs/operators';
const urls$ = of('/api/users', '/api/posts', '/api/comments');
urls$.pipe(
mergeMap(url => this.http.get(url))
).subscribe(data => console.log(data));
🔍 Operators de Filtrage
1. filter()
- Filtrer les valeurs
import { filter } from 'rxjs/operators';
const numbers$ = of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
numbers$.pipe(
filter(x => x % 2 === 0)
).subscribe(console.log); // 2, 4, 6, 8, 10
2. take()
- Prendre X valeurs
import { take } from 'rxjs/operators';
interval(1000).pipe(
take(5)
).subscribe(console.log); // 0, 1, 2, 3, 4 puis se termine
3. takeUntil()
- Prendre jusqu'à un signal
import { takeUntil } from 'rxjs/operators';
import { Subject } from 'rxjs';
const destroy$ = new Subject();
interval(1000).pipe(
takeUntil(destroy$)
).subscribe(console.log);
// Arrêter après 5 secondes
setTimeout(() => destroy$.next(), 5000);
4. distinctUntilChanged()
- Éviter les doublons consécutifs
import { distinctUntilChanged } from 'rxjs/operators';
const values$ = of(1, 1, 2, 2, 2, 3, 3, 1);
values$.pipe(
distinctUntilChanged()
).subscribe(console.log); // 1, 2, 3, 1
5. debounceTime()
- Attendre un délai
import { debounceTime } from 'rxjs/operators';
const searchInput$ = fromEvent(inputElement, 'input');
searchInput$.pipe(
debounceTime(300), // Attendre 300ms après la dernière frappe
map(event => event.target.value)
).subscribe(searchTerm => {
// Faire la recherche
console.log('Recherche:', searchTerm);
});
🤝 Operators de Combinaison
1. combineLatest()
- Combiner les dernières valeurs
import { combineLatest } from 'rxjs';
const firstName$ = of('John');
const lastName$ = of('Doe');
const age$ = of(30);
combineLatest([firstName$, lastName$, age$]).pipe(
map(([first, last, age]) => ({ fullName: `${first} ${last}`, age }))
).subscribe(console.log); // { fullName: 'John Doe', age: 30 }
2. merge()
- Fusionner plusieurs flux
import { merge } from 'rxjs';
const clicks$ = fromEvent(document, 'click');
const keyups$ = fromEvent(document, 'keyup');
merge(clicks$, keyups$).subscribe(event => {
console.log('Événement:', event.type);
});
3. concat()
- Concaténer en séquence
import { concat } from 'rxjs';
const first$ = of(1, 2, 3);
const second$ = of(4, 5, 6);
concat(first$, second$).subscribe(console.log); // 1, 2, 3, 4, 5, 6
❌ Gestion des erreurs
1. catchError()
- Capturer les erreurs
import { catchError } from 'rxjs/operators';
import { of } from 'rxjs';
this.http.get('/api/data').pipe(
catchError(error => {
console.error('Erreur API:', error);
return of([]); // Valeur par défaut
})
).subscribe(data => console.log(data));
2. retry()
- Réessayer automatiquement
import { retry } from 'rxjs/operators';
this.http.get('/api/data').pipe(
retry(3), // Réessayer 3 fois
catchError(error => of([]))
).subscribe(data => console.log(data));
3. retryWhen()
- Réessayer avec condition
import { retryWhen, delay, take } from 'rxjs/operators';
this.http.get('/api/data').pipe(
retryWhen(errors =>
errors.pipe(
delay(1000), // Attendre 1 seconde
take(3) // Maximum 3 tentatives
)
)
).subscribe(data => console.log(data));
🅰️ Observables dans Angular
1. HttpClient
import { HttpClient } from '@angular/common/http';
@Injectable()
export class UserService {
constructor(private http: HttpClient) {}
getUsers(): Observable<User[]> {
return this.http.get<User[]>('/api/users').pipe(
map(users => users.filter(user => user.active)),
catchError(error => of([]))
);
}
}
2. Forms reactifs
import { FormControl } from '@angular/forms';
const searchControl = new FormControl();
searchControl.valueChanges.pipe(
debounceTime(300),
distinctUntilChanged(),
switchMap(query => this.searchService.search(query))
).subscribe(results => {
this.searchResults = results;
});
3. Router
import { ActivatedRoute } from '@angular/router';
constructor(private route: ActivatedRoute) {}
ngOnInit() {
this.route.params.pipe(
map(params => params['id']),
switchMap(id => this.userService.getUser(id))
).subscribe(user => {
this.user = user;
});
}
💡 Exemples pratiques
1. Recherche en temps réel
@Component({
template: `
<input [formControl]="searchControl" placeholder="Rechercher...">
<div *ngFor="let result of searchResults$ | async">
{{ result.name }}
</div>
`
})
export class SearchComponent implements OnInit {
searchControl = new FormControl();
searchResults$: Observable<any[]>;
constructor(private searchService: SearchService) {}
ngOnInit() {
this.searchResults$ = this.searchControl.valueChanges.pipe(
debounceTime(300),
distinctUntilChanged(),
filter(query => query.length > 2),
switchMap(query => this.searchService.search(query)),
catchError(() => of([]))
);
}
}
2. Auto-refresh de données
@Component({...})
export class DashboardComponent implements OnInit, OnDestroy {
data$: Observable<any>;
private destroy$ = new Subject();
ngOnInit() {
this.data$ = interval(30000).pipe( // Refresh toutes les 30 secondes
startWith(0), // Commencer immédiatement
switchMap(() => this.dataService.getData()),
takeUntil(this.destroy$),
shareReplay(1) // Partager le résultat
);
}
ngOnDestroy() {
this.destroy$.next();
this.destroy$.complete();
}
}
3. Gestion d'état avec BehaviorSubject
@Injectable()
export class StateService {
private stateSubject = new BehaviorSubject<any>(initialState);
public state$ = this.stateSubject.asObservable();
updateState(newState: any) {
this.stateSubject.next({ ...this.stateSubject.value, ...newState });
}
getState(): any {
return this.stateSubject.value;
}
}
4. Combinaison de plusieurs API
@Component({...})
export class ProfileComponent implements OnInit {
profile$: Observable<any>;
ngOnInit() {
const userId = this.route.snapshot.params['id'];
this.profile$ = combineLatest([
this.userService.getUser(userId),
this.postsService.getUserPosts(userId),
this.friendsService.getUserFriends(userId)
]).pipe(
map(([user, posts, friends]) => ({
user,
posts,
friends,
totalPosts: posts.length,
totalFriends: friends.length
}))
);
}
}
5. WebSocket avec Observables
@Injectable()
export class WebSocketService {
private socket$ = new WebSocketSubject('ws://localhost:8080');
messages$ = this.socket$.asObservable();
sendMessage(message: any) {
this.socket$.next(message);
}
connect(): Observable<any> {
return this.socket$.pipe(
retry(3),
catchError(error => {
console.error('Erreur WebSocket:', error);
return EMPTY;
})
);
}
}
🎯 Bonnes pratiques
1. Toujours se désabonner
// ✅ Bon
export class Component implements OnDestroy {
private destroy$ = new Subject();
ngOnInit() {
this.service.getData().pipe(
takeUntil(this.destroy$)
).subscribe();
}
ngOnDestroy() {
this.destroy$.next();
this.destroy$.complete();
}
}
2. Utiliser async
pipe quand possible
// ✅ Bon - pas besoin de se désabonner manuellement
@Component({
template: `<div *ngFor="let item of items$ | async">{{ item.name }}</div>`
})
export class Component {
items$ = this.service.getItems();
}
3. Gérer les erreurs
// ✅ Bon
this.service.getData().pipe(
catchError(error => {
this.handleError(error);
return of([]); // Valeur par défaut
})
).subscribe();
4. Éviter les souscriptions imbriquées
// ❌ Mauvais
this.service1.getData().subscribe(data1 => {
this.service2.getData(data1.id).subscribe(data2 => {
// ...
});
});
// ✅ Bon
this.service1.getData().pipe(
switchMap(data1 => this.service2.getData(data1.id))
).subscribe(data2 => {
// ...
});
📚 Résumé des Operators essentiels
Operator | Usage | Exemple |
---|---|---|
map | Transformer | map(x => x * 2) |
filter | Filtrer | filter(x => x > 5) |
switchMap | Changer d'Observable | switchMap(id => getUser(id)) |
debounceTime | Attendre un délai | debounceTime(300) |
distinctUntilChanged | Éviter doublons | distinctUntilChanged() |
takeUntil | Arrêter à un signal | takeUntil(destroy$) |
catchError | Gérer erreurs | catchError(() => of([])) |
combineLatest | Combiner flux | combineLatest([a$, b$]) |
startWith | Valeur initiale | startWith(0) |
shareReplay | Partager résultat | shareReplay(1) |
🎉 Conclusion
Les Observables et les operators RxJS sont des outils puissants pour :
- Gérer les flux de données asynchrones
- Transformer et filtrer les données facilement
- Combiner plusieurs sources de données
- Gérer les erreurs de manière élégante
- Créer des interfaces réactives et performantes
Maîtriser ces concepts est essentiel pour développer des applications Angular modernes et efficaces ! 🚀