RxJS基础 #

一、RxJS概述 #

RxJS是Reactive Extensions for JavaScript的缩写,是一个用于响应式编程的库。

text
传统编程                          响应式编程
┌─────────────────┐              ┌─────────────────┐
│  变量 = 值      │              │  Observable     │
│  单个值         │              │  数据流         │
│  同步           │              │  异步           │
│  拉取           │              │  推送           │
└─────────────────┘              └─────────────────┘

二、核心概念 #

2.1 Observable #

Observable是一个可观察对象,表示一个数据流。

typescript
import { Observable } from 'rxjs';

const observable = new Observable<string>(subscriber => {
  subscriber.next('Hello');
  subscriber.next('World');
  subscriber.complete();
});

observable.subscribe({
  next: (value) => console.log(value),
  complete: () => console.log('完成')
});

2.2 Observer #

Observer是观察者,用于接收Observable发出的数据。

typescript
const observer = {
  next: (value: any) => console.log('值:', value),
  error: (err: any) => console.error('错误:', err),
  complete: () => console.log('完成')
};

observable.subscribe(observer);

2.3 Subscription #

Subscription是订阅对象,用于取消订阅。

typescript
const subscription = observable.subscribe(value => {
  console.log(value);
});

subscription.unsubscribe();

三、创建Observable #

3.1 new Observable #

typescript
const observable = new Observable<number>(subscriber => {
  subscriber.next(1);
  subscriber.next(2);
  subscriber.next(3);
  subscriber.complete();
  
  return () => {
    console.log('清理资源');
  };
});

3.2 of #

typescript
import { of } from 'rxjs';

const observable = of(1, 2, 3, 4, 5);
observable.subscribe(value => console.log(value));

3.3 from #

typescript
import { from } from 'rxjs';

// 从数组
from([1, 2, 3]).subscribe(console.log);

// 从Promise
from(fetch('/api/data')).subscribe(console.log);

// 从字符串
from('Hello').subscribe(console.log);

3.4 interval #

typescript
import { interval } from 'rxjs';

const observable = interval(1000);
observable.subscribe(value => console.log(value));

3.5 timer #

typescript
import { timer } from 'rxjs';

timer(2000).subscribe(() => console.log('2秒后执行'));
timer(1000, 500).subscribe(value => console.log(value));

3.6 fromEvent #

typescript
import { fromEvent } from 'rxjs';

fromEvent(document, 'click').subscribe(event => {
  console.log('点击事件:', event);
});

fromEvent(document.getElementById('input'), 'input').subscribe(event => {
  console.log('输入事件:', event);
});

四、Subject #

4.1 Subject #

Subject既是Observable也是Observer,可以多播。

typescript
import { Subject } from 'rxjs';

const subject = new Subject<number>();

subject.subscribe(value => console.log('观察者A:', value));
subject.subscribe(value => console.log('观察者B:', value));

subject.next(1);
subject.next(2);

4.2 BehaviorSubject #

BehaviorSubject有初始值,会记住最新的值。

typescript
import { BehaviorSubject } from 'rxjs';

const subject = new BehaviorSubject<number>(0);

subject.subscribe(value => console.log('观察者A:', value));

subject.next(1);
subject.next(2);

subject.subscribe(value => console.log('观察者B:', value));

4.3 ReplaySubject #

ReplaySubject会重放指定数量的历史值。

typescript
import { ReplaySubject } from 'rxjs';

const subject = new ReplaySubject<number>(2);

subject.next(1);
subject.next(2);
subject.next(3);

subject.subscribe(value => console.log('新观察者:', value));

4.4 AsyncSubject #

AsyncSubject只在完成时发出最后一个值。

typescript
import { AsyncSubject } from 'rxjs';

const subject = new AsyncSubject<number>();

subject.subscribe(value => console.log('值:', value));

subject.next(1);
subject.next(2);
subject.next(3);
subject.complete();

五、常用操作符 #

5.1 map #

typescript
import { of } from 'rxjs';
import { map } from 'rxjs/operators';

of(1, 2, 3)
  .pipe(map(x => x * 2))
  .subscribe(console.log);

5.2 filter #

typescript
import { of } from 'rxjs';
import { filter } from 'rxjs/operators';

of(1, 2, 3, 4, 5)
  .pipe(filter(x => x % 2 === 0))
  .subscribe(console.log);

5.3 tap #

typescript
import { of } from 'rxjs';
import { tap, map } from 'rxjs/operators';

of(1, 2, 3)
  .pipe(
    tap(x => console.log('原始值:', x)),
    map(x => x * 2),
    tap(x => console.log('处理后:', x))
  )
  .subscribe(console.log);

5.4 catchError #

typescript
import { of, throwError } from 'rxjs';
import { catchError } from 'rxjs/operators';

throwError(() => new Error('出错了'))
  .pipe(
    catchError(error => {
      console.error('捕获错误:', error);
      return of('默认值');
    })
  )
  .subscribe(console.log);

5.5 retry #

typescript
import { of, throwError } from 'rxjs';
import { retry, catchError } from 'rxjs/operators';

throwError(() => new Error('出错了'))
  .pipe(
    retry(3),
    catchError(error => of('失败'))
  )
  .subscribe(console.log);

5.6 debounceTime #

typescript
import { fromEvent } from 'rxjs';
import { debounceTime, map } from 'rxjs/operators';

fromEvent(document.getElementById('search'), 'input')
  .pipe(
    debounceTime(300),
    map((event: any) => event.target.value)
  )
  .subscribe(value => console.log('搜索:', value));

5.7 distinctUntilChanged #

typescript
import { of } from 'rxjs';
import { distinctUntilChanged } from 'rxjs/operators';

of(1, 1, 2, 2, 3, 3, 1, 1)
  .pipe(distinctUntilChanged())
  .subscribe(console.log);

5.8 switchMap #

typescript
import { fromEvent } from 'rxjs';
import { debounceTime, switchMap } from 'rxjs/operators';

fromEvent(document.getElementById('search'), 'input')
  .pipe(
    debounceTime(300),
    switchMap((event: any) => 
      fetch(`/api/search?q=${event.target.value}`)
    )
  )
  .subscribe(result => console.log(result));

5.9 mergeMap #

typescript
import { of } from 'rxjs';
import { mergeMap } from 'rxjs/operators';

of(1, 2, 3)
  .pipe(
    mergeMap(x => of(x * 2))
  )
  .subscribe(console.log);

5.10 combineLatest #

typescript
import { combineLatest, of } from 'rxjs';

const obs1 = of('a', 'b', 'c');
const obs2 = of(1, 2, 3);

combineLatest([obs1, obs2]).subscribe(console.log);

5.11 forkJoin #

typescript
import { forkJoin, of } from 'rxjs';

forkJoin({
  users: of(['user1', 'user2']),
  products: of(['product1', 'product2'])
}).subscribe(result => {
  console.log(result.users);
  console.log(result.products);
});

5.12 take #

typescript
import { interval } from 'rxjs';
import { take } from 'rxjs/operators';

interval(1000)
  .pipe(take(3))
  .subscribe(console.log);

5.13 takeUntil #

typescript
import { interval, Subject } from 'rxjs';
import { takeUntil } from 'rxjs/operators';

const destroy$ = new Subject<void>();

interval(1000)
  .pipe(takeUntil(destroy$))
  .subscribe(console.log);

setTimeout(() => {
  destroy$.next();
  destroy$.complete();
}, 5000);

六、管道操作 #

6.1 pipe #

typescript
import { of } from 'rxjs';
import { pipe, map, filter } from 'rxjs/operators';

const source = of(1, 2, 3, 4, 5);

source.pipe(
  filter(x => x % 2 === 0),
  map(x => x * 2),
  map(x => `值: ${x}`)
).subscribe(console.log);

6.2 自定义操作符 #

typescript
import { Observable } from 'rxjs';
import { map } from 'rxjs/operators';

function log<T>(prefix: string) {
  return (source: Observable<T>) => {
    return source.pipe(
      tap(value => console.log(`${prefix}:`, value))
    );
  };
}

of(1, 2, 3)
  .pipe(
    log('原始'),
    map(x => x * 2),
    log('处理后')
  )
  .subscribe();

七、错误处理 #

7.1 catchError #

typescript
import { of, throwError } from 'rxjs';
import { catchError } from 'rxjs/operators';

throwError(() => new Error('错误'))
  .pipe(
    catchError(error => {
      return of('恢复值');
    })
  )
  .subscribe(console.log);

7.2 throwError #

typescript
import { throwError } from 'rxjs';

throwError(() => new Error('出错了'))
  .subscribe({
    next: () => {},
    error: (err) => console.error(err)
  });

7.3 finalize #

typescript
import { of } from 'rxjs';
import { finalize } from 'rxjs/operators';

of(1, 2, 3)
  .pipe(
    finalize(() => console.log('清理资源'))
  )
  .subscribe(console.log);

八、Angular中使用RxJS #

8.1 HTTP请求 #

typescript
import { HttpClient } from '@angular/common/http';
import { catchError, retry } from 'rxjs/operators';

@Injectable({ providedIn: 'root' })
export class UserService {
  constructor(private http: HttpClient) {}
  
  getUsers() {
    return this.http.get<User[]>('/api/users').pipe(
      retry(3),
      catchError(error => {
        console.error(error);
        return [];
      })
    );
  }
}

8.2 组件中订阅 #

typescript
import { Component, OnInit, OnDestroy } from '@angular/core';
import { Subject } from 'rxjs';
import { takeUntil } from 'rxjs/operators';

@Component({
  selector: 'app-user-list',
  template: '...'
})
export class UserListComponent implements OnInit, OnDestroy {
  private destroy$ = new Subject<void>();
  
  constructor(private userService: UserService) {}
  
  ngOnInit() {
    this.userService.getUsers()
      .pipe(takeUntil(this.destroy$))
      .subscribe(users => {
        this.users = users;
      });
  }
  
  ngOnDestroy() {
    this.destroy$.next();
    this.destroy$.complete();
  }
}

8.3 使用async管道 #

typescript
import { Component } from '@angular/core';
import { Observable } from 'rxjs';

@Component({
  selector: 'app-user-list',
  template: `
    <ul>
      <li *ngFor="let user of users$ | async">
        {{ user.name }}
      </li>
    </ul>
  `
})
export class UserListComponent {
  users$: Observable<User[]>;
  
  constructor(private userService: UserService) {
    this.users$ = this.userService.getUsers();
  }
}

九、总结 #

概念 说明
Observable 可观察对象,数据流
Observer 观察者,接收数据
Subscription 订阅对象,取消订阅
Subject 既是Observable也是Observer
BehaviorSubject 有初始值的Subject
ReplaySubject 重放历史值的Subject
操作符 处理数据流的函数

下一步:NgRx状态管理

最后更新:2026-03-26