RxJS in Angular

Question 1: What is RxJS and why is it used in Angular?

Answer: RxJS (Reactive Extensions for JavaScript) is a library for reactive programming using observables. In Angular, it’s used for:

  • Handling asynchronous data streams
  • Event handling
  • HTTP requests
  • State management
  • Component communication
  • Side effect management

Question 2: What are the key RxJS operators and how do you use them?

Answer: Here are essential RxJS operators and their use cases:

import { 
  map, filter, switchMap, debounceTime, 
  distinctUntilChanged, catchError, tap,
  mergeMap, concatMap, takeUntil
} from 'rxjs/operators';
import { 
  of, from, Subject, BehaviorSubject, 
  combineLatest, forkJoin
} from 'rxjs';

@Component({
  selector: 'app-search',
  template: `
    <input [formControl]="searchControl">
    <div *ngFor="let result of results$ | async">
      {{ result.title }}
    </div>
  `
})
export class SearchComponent implements OnInit {
  private searchService = inject(SearchService);
  private destroy$ = new Subject<void>();
  
  searchControl = new FormControl('');
  
  // Transform and filter
  results$ = this.searchControl.valueChanges.pipe(
    debounceTime(300),  // Wait for user to stop typing
    distinctUntilChanged(),  // Only emit if value changed
    filter(term => term.length >= 2),  // Min 2 characters
    switchMap(term => this.searchService.search(term).pipe(
      catchError(error => {
        console.error('Search failed:', error);
        return of([]);  // Return empty results on error
      })
    )),
    takeUntil(this.destroy$)  // Cleanup on destroy
  );
  
  // Combining streams
  data$ = combineLatest([
    this.userService.getUser(),
    this.configService.getConfig()
  ]).pipe(
    map(([user, config]) => ({
      username: user.name,
      theme: config.theme
    }))
  );
  
  // Parallel requests
  loadAll$ = forkJoin({
    users: this.userService.getUsers(),
    posts: this.postService.getPosts(),
    comments: this.commentService.getComments()
  });
  
  ngOnDestroy() {
    this.destroy$.next();
    this.destroy$.complete();
  }
}

Question 3: How do you handle errors and retries with RxJS?

Answer: Here’s how to implement error handling and retries:

@Injectable({
  providedIn: 'root'
})
export class DataService {
  private http = inject(HttpClient);
  private errorHandler = inject(ErrorHandlerService);
  
  getData(): Observable<Data[]> {
    return this.http.get<Data[]>('/api/data').pipe(
      // Retry failed requests
      retry({
        count: 3,
        delay: (error, retryCount) => {
          // Exponential backoff
          const delay = Math.pow(2, retryCount) * 1000;
          console.log(`Retrying after ${delay}ms`);
          return timer(delay);
        }
      }),
      
      // Handle errors
      catchError(error => {
        if (error.status === 404) {
          return of([]); // Return empty array for 404
        }
        this.errorHandler.handle(error);
        throw error; // Re-throw other errors
      }),
      
      // Timeout if request takes too long
      timeout(5000),
      
      // Log success/failure
      tap({
        next: data => console.log('Data received', data),
        error: error => console.error('Error fetching data', error),
        complete: () => console.log('Data stream completed')
      })
    );
  }
}

Question 4: How do you manage state with RxJS?

Answer: Here’s an example of state management using RxJS:

@Injectable({
  providedIn: 'root'
})
export class StoreService {
  // State interface
  interface State {
    users: User[];
    loading: boolean;
    error: string | null;
  }
  
  // Initial state
  private initialState: State = {
    users: [],
    loading: false,
    error: null
  };
  
  // State subject
  private state$ = new BehaviorSubject<State>(this.initialState);
  
  // Selectors
  users$ = this.state$.pipe(
    map(state => state.users),
    distinctUntilChanged()
  );
  
  loading$ = this.state$.pipe(
    map(state => state.loading),
    distinctUntilChanged()
  );
  
  error$ = this.state$.pipe(
    map(state => state.error),
    distinctUntilChanged()
  );
  
  // Actions
  loadUsers() {
    this.updateState({ loading: true });
    
    this.http.get<User[]>('/api/users').pipe(
      tap(users => this.updateState({ 
        users, 
        loading: false,
        error: null
      })),
      catchError(error => {
        this.updateState({ 
          loading: false,
          error: error.message
        });
        return throwError(() => error);
      })
    ).subscribe();
  }
  
  private updateState(partial: Partial<State>) {
    this.state$.next({
      ...this.state$.value,
      ...partial
    });
  }
}

Interview Tips 💡

  1. Common RxJS Patterns

    // Caching
    private cache$ = new ReplaySubject<Data>(1);
    
    getData() {
      return this.cache$.pipe(
        take(1),
        mergeMap(cached => {
          if (cached) return of(cached);
          return this.fetchFresh();
        })
      );
    }
    
    // Auto-refresh
    private refresh$ = new Subject<void>();
    
    data$ = this.refresh$.pipe(
      startWith(void 0),
      switchMap(() => this.getData()),
      shareReplay(1)
    );
  2. Performance Optimization

    // Share expensive operations
    expensiveData$ = this.getData().pipe(
      shareReplay({
        bufferSize: 1,
        refCount: true
      })
    );
    
    // Cancel previous requests
    search$ = this.searchTerm$.pipe(
      switchMap(term => this.search(term))
    );
  3. Testing RxJS

    describe('DataService', () => {
      it('should handle errors', () => {
        const service = TestBed.inject(DataService);
        const error = new Error('Network error');
        
        service.getData().pipe(
          catchError(err => {
            expect(err).toBe(error);
            return EMPTY;
          })
        ).subscribe();
      });
    });
  4. Memory Management

    export class Component implements OnDestroy {
      private destroy$ = new Subject<void>();
      
      ngOnInit() {
        this.data$.pipe(
          takeUntil(this.destroy$)
        ).subscribe();
      }
      
      ngOnDestroy() {
        this.destroy$.next();
        this.destroy$.complete();
      }
    }
  5. Error Recovery

    getData() {
      return this.http.get('/api/data').pipe(
        retryWhen(errors => 
          errors.pipe(
            mergeMap((error, index) => {
              if (index > 3) throw error;
              return timer(1000 * Math.pow(2, index));
            })
          )
        )
      );
    }
  6. Custom Operators

    function debugOperator<T>(tag: string) {
      return tap<T>({
        next: value => console.log(`${tag}:`, value),
        error: err => console.error(`${tag} error:`, err),
        complete: () => console.log(`${tag} complete`)
      });
    }
    
    // Usage
    data$.pipe(
      debugOperator('DataStream')
    );

Remember: In interviews, focus on:

  • Understanding reactive programming concepts
  • Knowledge of common operators
  • Error handling strategies
  • State management patterns
  • Memory leak prevention
  • Testing observables
  • Performance optimization
  • Real-world use cases

Test Your Knowledge

Take a quick quiz to test your understanding of this topic.

Test Your Angular Knowledge

Ready to put your skills to the test? Take our interactive Angular quiz and get instant feedback on your answers.