RxJS in Angular

RxJS in Angular

Question 1: What is RxJS and why is it used in Angular?

Answer: RxJS (Reactive Extensions for JavaScript) is a library for reactive programming using observables. In Angular, it’s used for:

  • Handling asynchronous data streams
  • Event handling
  • HTTP requests
  • State management
  • Component communication
  • Side effect management

Question 2: What are the key RxJS operators and how do you use them?

Answer: Here are essential RxJS operators and their use cases:

import { 
  map, filter, switchMap, debounceTime, 
  distinctUntilChanged, catchError, tap,
  mergeMap, concatMap, takeUntil
} from 'rxjs/operators';
import { 
  of, from, Subject, BehaviorSubject, 
  combineLatest, forkJoin
} from 'rxjs';

@Component({
  selector: 'app-search',
  template: `
    <input [formControl]="searchControl">
    <div *ngFor="let result of results$ | async">
      {{ result.title }}
    </div>
  `
})
export class SearchComponent implements OnInit {
  private searchService = inject(SearchService);
  private destroy$ = new Subject<void>();
  
  searchControl = new FormControl('');
  
  // Transform and filter
  results$ = this.searchControl.valueChanges.pipe(
    debounceTime(300),  // Wait for user to stop typing
    distinctUntilChanged(),  // Only emit if value changed
    filter(term => term.length >= 2),  // Min 2 characters
    switchMap(term => this.searchService.search(term).pipe(
      catchError(error => {
        console.error('Search failed:', error);
        return of([]);  // Return empty results on error
      })
    )),
    takeUntil(this.destroy$)  // Cleanup on destroy
  );
  
  // Combining streams
  data$ = combineLatest([
    this.userService.getUser(),
    this.configService.getConfig()
  ]).pipe(
    map(([user, config]) => ({
      username: user.name,
      theme: config.theme
    }))
  );
  
  // Parallel requests
  loadAll$ = forkJoin({
    users: this.userService.getUsers(),
    posts: this.postService.getPosts(),
    comments: this.commentService.getComments()
  });
  
  ngOnDestroy() {
    this.destroy$.next();
    this.destroy$.complete();
  }
}

Question 3: How do you handle errors and retries with RxJS?

Answer: Here’s how to implement error handling and retries:

@Injectable({
  providedIn: 'root'
})
export class DataService {
  private http = inject(HttpClient);
  private errorHandler = inject(ErrorHandlerService);
  
  getData(): Observable<Data[]> {
    return this.http.get<Data[]>('/api/data').pipe(
      // Retry failed requests
      retry({
        count: 3,
        delay: (error, retryCount) => {
          // Exponential backoff
          const delay = Math.pow(2, retryCount) * 1000;
          console.log(`Retrying after ${delay}ms`);
          return timer(delay);
        }
      }),
      
      // Handle errors
      catchError(error => {
        if (error.status === 404) {
          return of([]); // Return empty array for 404
        }
        this.errorHandler.handle(error);
        throw error; // Re-throw other errors
      }),
      
      // Timeout if request takes too long
      timeout(5000),
      
      // Log success/failure
      tap({
        next: data => console.log('Data received', data),
        error: error => console.error('Error fetching data', error),
        complete: () => console.log('Data stream completed')
      })
    );
  }
}

Question 4: How do you manage state with RxJS?

Answer: Here’s an example of state management using RxJS:

@Injectable({
  providedIn: 'root'
})
export class StoreService {
  // State interface
  interface State {
    users: User[];
    loading: boolean;
    error: string | null;
  }
  
  // Initial state
  private initialState: State = {
    users: [],
    loading: false,
    error: null
  };
  
  // State subject
  private state$ = new BehaviorSubject<State>(this.initialState);
  
  // Selectors
  users$ = this.state$.pipe(
    map(state => state.users),
    distinctUntilChanged()
  );
  
  loading$ = this.state$.pipe(
    map(state => state.loading),
    distinctUntilChanged()
  );
  
  error$ = this.state$.pipe(
    map(state => state.error),
    distinctUntilChanged()
  );
  
  // Actions
  loadUsers() {
    this.updateState({ loading: true });
    
    this.http.get<User[]>('/api/users').pipe(
      tap(users => this.updateState({ 
        users, 
        loading: false,
        error: null
      })),
      catchError(error => {
        this.updateState({ 
          loading: false,
          error: error.message
        });
        return throwError(() => error);
      })
    ).subscribe();
  }
  
  private updateState(partial: Partial<State>) {
    this.state$.next({
      ...this.state$.value,
      ...partial
    });
  }
}

Interview Tips 💡

  1. Common RxJS Patterns

    // Caching
    private cache$ = new ReplaySubject<Data>(1);
    
    getData() {
      return this.cache$.pipe(
        take(1),
        mergeMap(cached => {
          if (cached) return of(cached);
          return this.fetchFresh();
        })
      );
    }
    
    // Auto-refresh
    private refresh$ = new Subject<void>();
    
    data$ = this.refresh$.pipe(
      startWith(void 0),
      switchMap(() => this.getData()),
      shareReplay(1)
    );
  2. Performance Optimization

    // Share expensive operations
    expensiveData$ = this.getData().pipe(
      shareReplay({
        bufferSize: 1,
        refCount: true
      })
    );
    
    // Cancel previous requests
    search$ = this.searchTerm$.pipe(
      switchMap(term => this.search(term))
    );
  3. Testing RxJS

    describe('DataService', () => {
      it('should handle errors', () => {
        const service = TestBed.inject(DataService);
        const error = new Error('Network error');
        
        service.getData().pipe(
          catchError(err => {
            expect(err).toBe(error);
            return EMPTY;
          })
        ).subscribe();
      });
    });
  4. Memory Management

    export class Component implements OnDestroy {
      private destroy$ = new Subject<void>();
      
      ngOnInit() {
        this.data$.pipe(
          takeUntil(this.destroy$)
        ).subscribe();
      }
      
      ngOnDestroy() {
        this.destroy$.next();
        this.destroy$.complete();
      }
    }
  5. Error Recovery

    getData() {
      return this.http.get('/api/data').pipe(
        retryWhen(errors => 
          errors.pipe(
            mergeMap((error, index) => {
              if (index > 3) throw error;
              return timer(1000 * Math.pow(2, index));
            })
          )
        )
      );
    }
  6. Custom Operators

    function debugOperator<T>(tag: string) {
      return tap<T>({
        next: value => console.log(`${tag}:`, value),
        error: err => console.error(`${tag} error:`, err),
        complete: () => console.log(`${tag} complete`)
      });
    }
    
    // Usage
    data$.pipe(
      debugOperator('DataStream')
    );

Remember: In interviews, focus on:

  • Understanding reactive programming concepts
  • Knowledge of common operators
  • Error handling strategies
  • State management patterns
  • Memory leak prevention
  • Testing observables
  • Performance optimization
  • Real-world use cases