RxJS in Angular
Question 1: What is RxJS and why is it used in Angular?
Answer: RxJS (Reactive Extensions for JavaScript) is a library for reactive programming using observables. In Angular, it’s used for:
- Handling asynchronous data streams
- Event handling
- HTTP requests
- State management
- Component communication
- Side effect management
Question 2: What are the key RxJS operators and how do you use them?
Answer: Here are essential RxJS operators and their use cases:
import {
map, filter, switchMap, debounceTime,
distinctUntilChanged, catchError, tap,
mergeMap, concatMap, takeUntil
} from 'rxjs/operators';
import {
of, from, Subject, BehaviorSubject,
combineLatest, forkJoin
} from 'rxjs';
@Component({
selector: 'app-search',
template: `
<input [formControl]="searchControl">
<div *ngFor="let result of results$ | async">
{{ result.title }}
</div>
`
})
export class SearchComponent implements OnInit {
private searchService = inject(SearchService);
private destroy$ = new Subject<void>();
searchControl = new FormControl('');
// Transform and filter
results$ = this.searchControl.valueChanges.pipe(
debounceTime(300), // Wait for user to stop typing
distinctUntilChanged(), // Only emit if value changed
filter(term => term.length >= 2), // Min 2 characters
switchMap(term => this.searchService.search(term).pipe(
catchError(error => {
console.error('Search failed:', error);
return of([]); // Return empty results on error
})
)),
takeUntil(this.destroy$) // Cleanup on destroy
);
// Combining streams
data$ = combineLatest([
this.userService.getUser(),
this.configService.getConfig()
]).pipe(
map(([user, config]) => ({
username: user.name,
theme: config.theme
}))
);
// Parallel requests
loadAll$ = forkJoin({
users: this.userService.getUsers(),
posts: this.postService.getPosts(),
comments: this.commentService.getComments()
});
ngOnDestroy() {
this.destroy$.next();
this.destroy$.complete();
}
}
Question 3: How do you handle errors and retries with RxJS?
Answer: Here’s how to implement error handling and retries:
@Injectable({
providedIn: 'root'
})
export class DataService {
private http = inject(HttpClient);
private errorHandler = inject(ErrorHandlerService);
getData(): Observable<Data[]> {
return this.http.get<Data[]>('/api/data').pipe(
// Retry failed requests
retry({
count: 3,
delay: (error, retryCount) => {
// Exponential backoff
const delay = Math.pow(2, retryCount) * 1000;
console.log(`Retrying after ${delay}ms`);
return timer(delay);
}
}),
// Handle errors
catchError(error => {
if (error.status === 404) {
return of([]); // Return empty array for 404
}
this.errorHandler.handle(error);
throw error; // Re-throw other errors
}),
// Timeout if request takes too long
timeout(5000),
// Log success/failure
tap({
next: data => console.log('Data received', data),
error: error => console.error('Error fetching data', error),
complete: () => console.log('Data stream completed')
})
);
}
}
Question 4: How do you manage state with RxJS?
Answer: Here’s an example of state management using RxJS:
@Injectable({
providedIn: 'root'
})
export class StoreService {
// State interface
interface State {
users: User[];
loading: boolean;
error: string | null;
}
// Initial state
private initialState: State = {
users: [],
loading: false,
error: null
};
// State subject
private state$ = new BehaviorSubject<State>(this.initialState);
// Selectors
users$ = this.state$.pipe(
map(state => state.users),
distinctUntilChanged()
);
loading$ = this.state$.pipe(
map(state => state.loading),
distinctUntilChanged()
);
error$ = this.state$.pipe(
map(state => state.error),
distinctUntilChanged()
);
// Actions
loadUsers() {
this.updateState({ loading: true });
this.http.get<User[]>('/api/users').pipe(
tap(users => this.updateState({
users,
loading: false,
error: null
})),
catchError(error => {
this.updateState({
loading: false,
error: error.message
});
return throwError(() => error);
})
).subscribe();
}
private updateState(partial: Partial<State>) {
this.state$.next({
...this.state$.value,
...partial
});
}
}
Interview Tips 💡
Common RxJS Patterns
// Caching private cache$ = new ReplaySubject<Data>(1); getData() { return this.cache$.pipe( take(1), mergeMap(cached => { if (cached) return of(cached); return this.fetchFresh(); }) ); } // Auto-refresh private refresh$ = new Subject<void>(); data$ = this.refresh$.pipe( startWith(void 0), switchMap(() => this.getData()), shareReplay(1) );
Performance Optimization
// Share expensive operations expensiveData$ = this.getData().pipe( shareReplay({ bufferSize: 1, refCount: true }) ); // Cancel previous requests search$ = this.searchTerm$.pipe( switchMap(term => this.search(term)) );
Testing RxJS
describe('DataService', () => { it('should handle errors', () => { const service = TestBed.inject(DataService); const error = new Error('Network error'); service.getData().pipe( catchError(err => { expect(err).toBe(error); return EMPTY; }) ).subscribe(); }); });
Memory Management
export class Component implements OnDestroy { private destroy$ = new Subject<void>(); ngOnInit() { this.data$.pipe( takeUntil(this.destroy$) ).subscribe(); } ngOnDestroy() { this.destroy$.next(); this.destroy$.complete(); } }
Error Recovery
getData() { return this.http.get('/api/data').pipe( retryWhen(errors => errors.pipe( mergeMap((error, index) => { if (index > 3) throw error; return timer(1000 * Math.pow(2, index)); }) ) ) ); }
Custom Operators
function debugOperator<T>(tag: string) { return tap<T>({ next: value => console.log(`${tag}:`, value), error: err => console.error(`${tag} error:`, err), complete: () => console.log(`${tag} complete`) }); } // Usage data$.pipe( debugOperator('DataStream') );
Remember: In interviews, focus on:
- Understanding reactive programming concepts
- Knowledge of common operators
- Error handling strategies
- State management patterns
- Memory leak prevention
- Testing observables
- Performance optimization
- Real-world use cases