1. Reactive Programming là gì?

Reactive Programming là mô hình lập trình tập trung vào luồng dữ liệu (data streams) và lan truyền thay đổi (propagation of change). Mọi thứ đều có thể được xem như một luồng dữ liệu: biến, user input, properties, cache, data structures, etc.

Ghi chú cho người đọc 2025: Ví dụ trong bài dùng RxJS 7+ (hiện tại 7.8, bản 8 đang alpha). Thay đổi quan trọng so với RxJS 6:

  • Operator import trực tiếp từ "rxjs", không còn "rxjs/operators" (vẫn chạy nhưng deprecated, sẽ bỏ ở v9).
  • observable.toPromise() đã bỏ, dùng firstValueFrom(obs$) / lastValueFrom(obs$).
  • subscribe(next, error, complete) (positional) deprecated, ưu tiên object form subscribe({ next, error, complete }).
  • retryWhen, throwError(() => ...), connectable() là API thay thế mới.
import { fromEvent, map, debounceTime } from "rxjs";

// Tạo stream từ sự kiện input
const input = document.querySelector("input");
const inputStream = fromEvent(input, "input").pipe(
  map((event) => event.target.value),
  debounceTime(300)
);

// Subscribe để xử lý thay đổi (object form, khuyến nghị từ RxJS 7)
inputStream.subscribe({
  next: (value) => console.log("Search term:", value),
  error: (err) => console.error(err),
});

2. Các Khái Niệm Cơ Bản

2.1 Observable và Observer

import { Observable, Observer } from "rxjs";

// Observable - nguồn phát dữ liệu
const numberStream = new Observable<number>((subscriber) => {
  subscriber.next(1);
  subscriber.next(2);
  subscriber.next(3);
  subscriber.complete();
});

// Observer - đối tượng lắng nghe
const observer: Observer<number> = {
  next: (value) => console.log("Next:", value),
  error: (error) => console.error("Error:", error),
  complete: () => console.log("Complete!"),
};

// Subscription
numberStream.subscribe(observer);

2.2 Subjects

import { Subject, BehaviorSubject } from "rxjs";

// Subject - vừa là Observable vừa là Observer
const subject = new Subject<string>();
subject.subscribe((value) => console.log("Observer 1:", value));
subject.next("Hello");
subject.subscribe((value) => console.log("Observer 2:", value));

// BehaviorSubject - giữ giá trị hiện tại
const behaviorSubject = new BehaviorSubject<number>(0);
behaviorSubject.subscribe((value) => console.log("Initial value:", value));
behaviorSubject.next(1);
behaviorSubject.subscribe((value) => console.log("Late subscriber:", value));

3. Triển khai trong JavaScript

3.1 Event Streams

import { fromEvent, merge, map, filter } from "rxjs";

// Mouse events
const clicks = fromEvent(document, "click");
const moves = fromEvent(document, "mousemove");
const ups = fromEvent(document, "mouseup");

// Combine streams
const mouseEvents = merge(
  clicks.pipe(map((event) => ({ type: "click", event }))),
  moves.pipe(map((event) => ({ type: "move", event }))),
  ups.pipe(map((event) => ({ type: "up", event })))
);

// Filter and handle events
mouseEvents
  .pipe(filter(({ type }) => type === "click"))
  .subscribe(({ event }) => {
    console.log("Click coordinates:", event.clientX, event.clientY);
  });

3.2 Data Transformation

import { interval, map, filter, scan } from "rxjs";

// Generate numbers every second
const numbers = interval(1000);

// Transform stream
const transformed = numbers.pipe(
  map((x) => x * 2),
  filter((x) => x % 4 === 0),
  scan((acc, curr) => acc + curr, 0)
);

// Handle results
transformed.subscribe((result) => {
  console.log("Sum of even doubles:", result);
});

4. Triển khai trong TypeScript

4.1 Type-safe Observables

interface User {
  id: number;
  name: string;
  email: string;
}

interface UserState {
  users: User[];
  loading: boolean;
  error: Error | null;
}

class UserService {
  private state = new BehaviorSubject<UserState>({
    users: [],
    loading: false,
    error: null,
  });

  getState(): Observable<UserState> {
    return this.state.asObservable();
  }

  async fetchUsers(): Promise<void> {
    this.state.next({ ...this.state.value, loading: true });

    try {
      const response = await fetch("/api/users");
      const users = await response.json();
      this.state.next({
        users,
        loading: false,
        error: null,
      });
    } catch (error) {
      this.state.next({
        users: [],
        loading: false,
        error: error instanceof Error ? error : new Error(String(error)),
      });
    }
  }
}

4.2 Custom Operators

import { Observable, pipe, filter, map } from "rxjs";

interface DataEvent<T> {
  type: string;
  payload: T;
}

// Custom operator
function filterByType<T>(eventType: string) {
  return pipe(
    filter((event: DataEvent<T>) => event.type === eventType),
    map((event: DataEvent<T>) => event.payload)
  );
}

// Usage
const events = new Observable<DataEvent<string>>((subscriber) => {
  subscriber.next({ type: "message", payload: "Hello" });
  subscriber.next({ type: "error", payload: "Failed" });
  subscriber.next({ type: "message", payload: "World" });
});

const messages = events.pipe(filterByType<string>("message"));
messages.subscribe((message) => console.log(message));

5. Ví dụ Thực Tế: Real-time Search với Debounce

import {
  fromEvent,
  Observable,
  debounceTime,
  distinctUntilChanged,
  switchMap,
  map,
  catchError,
  from,
  of,
} from "rxjs";

interface SearchResult {
  items: any[];
  total: number;
}

class SearchService {
  private searchInput: HTMLInputElement;
  private resultsDiv: HTMLDivElement;
  private searchStream: Observable<string>;

  constructor() {
    this.searchInput = document.querySelector("#search-input")!;
    this.resultsDiv = document.querySelector("#search-results")!;
    this.setupSearch();
  }

  private async searchAPI(term: string): Promise<SearchResult> {
    const response = await fetch(`/api/search?q=${encodeURIComponent(term)}`);
    if (!response.ok) {
      throw new Error("Search failed");
    }
    return response.json();
  }

  private setupSearch(): void {
    // Stream tìm kiếm, ngắn hơn nhờ `from(promise)` + operator inline
    this.searchStream = fromEvent(this.searchInput, "input").pipe(
      map((event) => (event.target as HTMLInputElement).value),
      debounceTime(300),
      distinctUntilChanged(),
      switchMap((term) =>
        from(this.searchAPI(term)).pipe(
          catchError((error) => {
            console.error("Search error:", error);
            return of<SearchResult>({ items: [], total: 0 });
          })
        )
      )
    );

    // Object-form subscribe (khuyến nghị RxJS 7+)
    this.searchStream.subscribe({
      next: (result) => this.updateResults(result),
      error: (err) => console.error("Stream error:", err),
    });
  }

  private updateResults(result: SearchResult): void {
    this.resultsDiv.innerHTML = result.items
      .map((item) => `<div class="result-item">${item.title}</div>`)
      .join("");
  }
}

// Initialize search
new SearchService();

5.5 Bridging Observable ↔ Promise / AsyncIterator

5.5.1 firstValueFrom / lastValueFrom thay toPromise()

import { firstValueFrom, lastValueFrom, interval, take } from "rxjs";

// Lấy emit đầu tiên → Promise
const first = await firstValueFrom(interval(500)); // 0

// Lấy emit cuối cùng trước khi complete
const last = await lastValueFrom(interval(500).pipe(take(5))); // 4

// Default value khi stream complete mà chưa emit
const v = await firstValueFrom(interval(500).pipe(take(0)), {
  defaultValue: -1,
}); // -1

5.5.2 Huỷ Observable bằng AbortSignal

Từ RxJS 8 alpha, AbortSignal trực tiếp huỷ subscription. Với 7+, chỉ cần wrap:

function withAbort<T>(
  source: Observable<T>,
  signal: AbortSignal
): Observable<T> {
  return new Observable((subscriber) => {
    if (signal.aborted) return subscriber.complete();
    const sub = source.subscribe(subscriber);
    const onAbort = () => {
      sub.unsubscribe();
      subscriber.complete();
    };
    signal.addEventListener("abort", onAbort, { once: true });
    return () => signal.removeEventListener("abort", onAbort);
  });
}

5.5.3 Signals, lớp mới của reactive UI (2024+)

Signals đang trở thành mô hình reactive cho UI, Angular 17+ (stable signals), Vue ref/computed, SolidJS, Preact signals, TC39 Signals proposal (đang soạn). So với RxJS:

RxJS ObservableSignal
Mục đích chínhStream data theo thời gian, operator phức tạpState reactive, automatic dependency tracking
Giá trị hiện tạiKhông có (push-based)Luôn có (pull-based)
HọcKhóDễ
Use case mạnhWebSocket, debounce, combine nhiều source asyncUI state, derived value

Không cạnh tranh, hay dùng cùng nhau: Signal cho UI state, Observable cho stream phức tạp. Angular 17 đã có hàm toSignal(observable$)toObservable(signal) cầu hai thế giới.

6. Ưu điểm và Nhược điểm

6.1 Ưu điểm

  • Declarative: Code dễ đọc và maintain hơn
  • Data flow: Quản lý luồng dữ liệu hiệu quả
  • Composability: Dễ dàng kết hợp và biến đổi streams
  • Error handling: Xử lý lỗi nhất quán

6.2 Nhược điểm

  • Learning curve: Khó học và làm quen
  • Debugging: Khó debug khi có nhiều operators
  • Memory: Có thể gây memory leaks nếu không unsubscribe
  • Bundle size: Thư viện như RxJS có thể làm tăng kích thước bundle

7. Khi nào nên sử dụng?

Reactive Patterns phù hợp khi:

  • Xử lý nhiều sự kiện bất đồng bộ
  • Cần transform và combine dữ liệu từ nhiều nguồn
  • Làm việc với real-time data
  • Xây dựng ứng dụng phức tạp với nhiều state

8. Kết luận

Reactive Patterns cung cấp một cách mạnh mẽ để xử lý các sự kiện và dữ liệu bất đồng bộ. Mặc dù có learning curve cao, nhưng lợi ích mang lại là rất lớn, đặc biệt trong các ứng dụng phức tạp với nhiều tương tác và real-time data.