TypeScript 및 F#의 반응형 + 함수형 UI 패턴: 실시간 앱을 위한 RxJS, Signals 및 Elmish | Sudhir Mangla


typescript rxjs #Signals elmish reactivepatterns #FunctionalProgramming

Reactive + Functional UI Patterns in TypeScript and F#: RxJS, Signals, and Elmish for Real-Time Apps

핵심 요약

실시간 앱 구축의 새로운 표준: RxJS 옵저버블(비동기 스트림), Signals(동기 상태 추적), 그리고 Elmish MVU(순수 함수형 상태 관리)를 결합하여 지연 시간, 동시성, 일관성을 동시에 해결합니다. WebSocket/SSE 백프레셔 관리부터 .NET 백엔드의 채널 기반 흐름 제어까지, 이 세 계층의 패러다임이 혼돈 없이 예측 가능한 실시간 아키텍처를 만듭니다. 거래 대시보드, 협업 에디터, IoT 텔레메트리 같은 고부하 실시간 시스템에서 메모리 누수, 경합 조건, 상태 동기화 실패를 원칙적으로 배제할 수 있습니다.


상세 요약

1. 문제 정의: 실시간 앱의 도전 과제

1.1 실시간 시스템의 특성

  • 사용자 기대: 대시보드는 즉시 새로고침, 협업 편집은 무결음, 데이터 UI는 부드러운 반응
  • 대표 사례:
    • 거래 대시보드: 초당 수천 개의 시장 틱 처리, UI 버벅 방지 필요
    • 협업 편집기: 로컬 및 원격 편집 충돌 해결, 결정론적 일관성 유지
    • IoT 모니터링: 수천 장치의 연속 텔레메트리, 동적 구독 관리
    • 라이브 채트: 메시지, 타이핑 표시기, 온라인 상태 - 모두 비동기이면서 상호 의존

1.2 공통 실패 패턴 및 해결책

문제 증상 해결책
공유 가변 상태 시간적 경합 버그 불변 업데이트 + 순수 변환
경합 조건 오래된 데이터가 새 상태 덮어씀 취소 의미론(cancellation semantics)
누수 구독 메모리 누적, 컴포넌트 생명주기 초과 결정론적 정리(takeUntil, 자동 정리)
중복 부작용 불필요한 재렌더에 의한 네트워크 중복 선언적 효과 격리(Elmish Cmd)
버벅 재렌더 전체 컴포넌트 트리 재칠 세밀한 변경 감지(Signals)
무제한 큐 메모리 팽창 또는 UI 동결 백프레셔 인식 스트림(bounded channels)

2. 세 가지 기둥: 패러다임 분리

2.1 Observables (RxJS / Rx.NET) - 비동기 변화를 담당

개념: 시간에 따라 변하는 값의 스트림. 구독자에게 값을 푸시
주요 이점:
  - 내장 취소 및 백프레셔 인식
  - 선언적 연산자 조합 (map, merge, switchMap, etc.)
  - 결정론적 정리 (unsubscribe, takeUntil)

사용 사례: WebSocket 프레임, 디바운스된 키입력, API 재시도, 텔레메트리 버스트

콜드 vs 핫 옵저버블:

  • 콜드: 구독하면 생성 시작 (fetch 결과)
  • : 이미 공유 소스 방출 중 (마우스 이동, WebSocket)
  • 중복 작업 방지: share() 또는 shareReplay() 멀티캐스트

백프레셔 전략:

  • 손실 전략: 샘플링 또는 드롭 (sampleTime(200))
  • 무손실 전략: 버퍼링 또는 윈도우 (bufferTime(1000))

2.2 Signals - 동기 상태 의존성 추적

개념: 신호가 변하면 의존하는 것들만 재계산 (풀 기반, 동기식)
생명주기:
  - Signal: 값 저장 및 의존자 알림
  - Computed: 의존성 변경 시 자동 재평가
  - Effect: 상태 변경에 따른 부작용

이점: 불필요한 재렌더 제거, 의존성 자동 추적 (DAG)

잘못된 vs 올바른 사용:

  • :cross_mark: 신호로 비동기 이벤트 처리 (경합 위험)
  • ✓ 옵저버블로 스트림 받고 toSignal() 로 최신값 투영

2.3 Elmish MVU (F#) - 순수 함수형 상태 관리

Model = 불변 도메인 상태
Msg = 메시지 판별 연합
update = 순수 함수: (Msg × Model) → (Model × Cmd<Msg>)
type Model = { Connected : bool; Messages : string list }
type Msg = Connect | Disconnect | Receive of string

let update msg model =
    match msg with
    | Connect -> { model with Connected = true }, Cmd.none
    | Disconnect -> { model with Connected = false }, Cmd.none
    | Receive text -> { model with Messages = text :: model.Messages }, Cmd.none

Elmish의 강점:

  • 모든 전환이 순수하고 예측 가능
  • 부작용을 Cmd를 통해 명시적으로 분리
  • 상태 전환을 모델 업데이트 없이 테스트 가능
  • 시간 여행 디버깅 가능
  • 복잡한 거래 대시보드/텔레메트리에서 비동기 간 교차 채팅 방지

2.4 도구 선택 매트릭스

문제 권장 도구 이유
UI 값 도출 (합계, 필터, 계산 필드) Signal 동기 전파, 최소 재렌더
연속 비동기 업데이트 (WebSocket, SSE, DOM 이벤트) Observable 푸시 기반, 취소 가능
모듈 수준 상태 전환 (사용자 작업, 효과) Elmish MVU 순수 업데이트, 명시적 효과
교차 계층 동기화 (UI ↔ 백엔드) RxJS + Elmish 예측 가능한 메시징 및 재시도
비동기 데이터의 최신 스냅샷 렌더 Observable → Signal 안정적 뷰 바인딩

3. RxJS로 실시간 스트림 모델링

3.1 SSE (Server-Sent Events) vs WebSocket

특성 SSE WebSocket
방향 서버 → 클라이언트 양방향
사용 사례 텔레메트리, 피드 업데이트 거래, 채팅
재연결 자동 수동
HTTP 친화적 ✓ 캐싱/프록시 통과

SSE 장점: 경량, 자동 재연결, 방화벽 친화적, 단방향 시나리오에 최적

WebSocket 장점: 저지연 양방향, 바이너리 프레임 가능

관리형 인프라 옵션: Azure Web PubSub, AWS API Gateway WebSocket, Ably, Pusher

3.2 RxJS 기초

콜드 옵저버블로 트랜스포트 래핑:

function webSocket$(url: string): Observable<MessageEvent> {
  return new Observable(observer => {
    const ws = new WebSocket(url);
    ws.onmessage = e => observer.next(e);
    ws.onerror = e => observer.error(e);
    ws.onclose = () => observer.complete();
    return () => ws.close(); // 정리
  });
}

고차 매핑 연산자 선택:

연산자 동작 사용 사례
switchMap 새 방출 시 이전 내부 옵저버블 취소 재연결 또는 검색 입력
concatMap 순차 실행, 이전 완료 대기 순서 보장 필요 명령
mergeMap 모두 동시 처리 병렬 네트워크 요청
exhaustMap 활성 중 새 방출 무시 중복 제출 방지

재연결 로직:

const reconnect$ = interval(5000).pipe(
  switchMap(() => webSocket$('wss://feed').pipe(retry({ delay: 2000 })))
);

3.3 백프레셔 및 속도 제어

시간 기반 스로틀링:

feed$.pipe(throttleTime(200));   // 먼저 방출, 침묵
feed$.pipe(auditTime(200));      // 조용한 후 마지막만
feed$.pipe(sampleTime(200));     // 주기적 샘플링
feed$.pipe(debounceTime(300));   // 유휴 대기

용도별 선택:

  • throttleTime: 드래그/이동 이벤트
  • debounceTime: 텍스트 입력
  • auditTime: 재칠 전 최신값
  • sampleTime: 모니터링 대시보드

카운트 기반 버퍼링 (손실 없이 배치 처리):

feed$.pipe(bufferCount(100)).subscribe(batch => sendToServer(batch));
feed$.pipe(bufferTime(1000)).subscribe(batch => updateUI(batch.length));

손실 vs 무손실 전략:

  • 손실: 중간값 드롭 (스로틀, 샘플) - CPU 사용량 높은 그래프에 적합
  • 무손실: 모든 데이터 보존 (버퍼, 윈도우) - 금융 거래에 적합

3.4 에러 처리 및 재시도 (취소 지원)

간단한 재시도:

feed$.pipe(retry({ delay: 2000 })).subscribe();

제어된 재시도 및 지수 백오프:

feed$.pipe(
  retryWhen(errors => errors.pipe(delay(1000))),
  takeUntil(stop$)
).subscribe();

AbortController 통합:

function fetch$(url: string, signal: AbortSignal) {
  return from(fetch(url, { signal })).pipe(switchMap(r => r.json()));
}

const controller = new AbortController();
fetch$('https://api/data', controller.signal)
  .pipe(retry({ delay: 1000 }))
  .subscribe(console.log);

controller.abort(); // 정리

멱등성 보장 (재시도 안전):

interface Command { id: string; payload: any }
function sendCommand$(cmd: Command) {
  return ajax.post('/api/command', cmd);
}
// 백엔드가 id로 중복 제거

3.5 메모리 안전 패턴

takeUntil로 생명주기 바인딩:

const destroy$ = new Subject<void>();

feed$.pipe(takeUntil(destroy$)).subscribe(updateUI);

// 컴포넌트 정리 시
destroy$.next();
destroy$.complete();

Angular의 takeUntilDestroyed (자동):

@Component({...})
export class Dashboard {
  constructor(destroyRef: DestroyRef) {
    feed$.pipe(takeUntilDestroyed(destroyRef)).subscribe(updateUI);
  }
}

중첩 구독 방지 (메모리 누수 원인):

  • :cross_mark: feed$.subscribe(f => another$.subscribe(...))
  • feed$.pipe(switchMap(f => another$)).subscribe(...)

3.6 프로덕션급 예제

백오프와 toSignal을 통한 탄력적 SSE 소비:

const feed$ = defer(() => eventSource$('/api/feed')).pipe(
  retry({ delay: 2000 }),     // 재연결
  map(e => JSON.parse(e.data)),
  shareReplay(1)
);

const latestData = toSignal(feed$, { initialValue: { price: 0 } });
// Angular 템플릿은 latestData() 변경 시 자동 재렌더

순서 보장 멱등 전송을 위한 WebSocket 명령 스트림:

const trade$ = new Subject<TradeCmd>();

const send$ = trade$.pipe(
  concatMap(cmd =>
    ajax.post('/api/trade', cmd).pipe(
      catchError(err => of({ ...cmd, error: true }))
    )
  ),
  shareReplay(1)
);

trade$.next({ id: crypto.randomUUID(), symbol: 'AAPL', qty: 50 });
// concatMap은 각 명령 완료 후 다음 시작 → 재시도 중 순서 보존

4. Signals - 현대 UI 프레임워크의 세밀한 반응성

4.1 프레임워크별 Signal 수렴

Angular Signals:

const count = signal(0);
const doubled = computed(() => count() * 2);
effect(() => console.log('Doubled:', doubled()));

SolidJS Signals (가상 DOM 없음, 진정한 세밀한 반응성):

const [count, setCount] = createSignal(0);
createEffect(() => console.log(count()));

Preact Signals (React 호환):

const count = signal(0);
effect(() => console.log('Count', count.value));

수렴:

  • 동기식, 의존성 추적 값
  • 자동 파생 (computed)
  • 변경 시 부작용 (effect)
  • 최소한의 업데이트로 예측 가능

4.2 Angular 실용적 상호운용 패턴

toSignal - 최신 값 렌더링 (Observable → Signal):

@Component({
  template: `<div>Latest price: {{ price() }}</div>`
})
export class PriceTicker {
  price = toSignal(price$, { initialValue: 0 });
}

effect() - 상태 변경에 따른 부작용:

const filter = signal('AAPL');

effect(() => {
  console.log('Filter changed:', filter());
  socket.send(JSON.stringify({ type: 'subscribe', symbol: filter() }));
});

toObservable - 하위 제어 흐름 (Signal → Observable):

const query = signal('');

toObservable(query)
  .pipe(
    debounceTime(300),
    switchMap(q => http.get(`/api/search?q=${q}`))
  )
  .subscribe(renderResults);

4.3 Signal에서 발생하는 미묘한 경합 (비동기 페칭)

경합 문제:

// ❌ 문제: 여러 fetch 중복, 느린 응답이 새것 덮음
effect(() => {
  fetch(`/api/search?q=${query()}`)
    .then(r => r.json())
    .then(renderResults);
});

RxJS switchMap으로 복구:

// ✓ 새 쿼리 도착 시 이전 요청 취소
toObservable(query)
  .pipe(
    debounceTime(300),
    switchMap(q => from(fetch(`/api/search?q=${q}`)).pipe(switchMap(r => r.json())))
  )
  .subscribe(renderResults);

핵심 통찰: Signal은 렌더 프레임 내 일관성 보장, 비동기 경계 아님. 비동기는 항상 RxJS에 위임 (취소, 재시도, 순서).

4.4 Preact Signals - React 앱에서 vs React Query/Zustand

useState 대체 (세밀한 반응성):

const counter = signal(0);

function Counter() {
  return <button onClick={() => counter.value++}>{counter.value}</button>;
}
// useState와 달리 counter.value++ 는 컴포넌트 재렌더 트리거 안 함, signal 소비자만 업데이트

역할 분담:

  • React Query: 원격 비동기 상태 (데이터 페칭, 캐싱)
  • Zustand: 중앙 공유 상태
  • Signals: 세밀한 로컬 반응성 - UI 계산 또는 인터컴포넌트 파생 상태

베스트 프랙티스:

const userSignal = signal<User | null>(null);

useEffect(() => {
  fetchUser().then(u => (userSignal.value = u));
}, []);

스트림을 모서리에 유지:

  1. RxJS(WebSocket, SSE)로 비동기 데이터 스트림
  2. Signal로 현재 스냅샷 변환
  3. Computed signal로 UI 상태 파생
  4. 새로운 비동기 흐름 오케스트레이션 시에만 Observable로 역변환

5. Elmish (F#) - 예측 가능한 비동기 오케스트레이션

5.1 MVU 기초: Model, Messages, Update, Commands

최소 예제:

type Model = { Count: int; Loading: bool }

type Msg =
    | Increment
    | Decrement
    | Load
    | Loaded of int

let update msg model =
    match msg with
    | Increment -> { model with Count = model.Count + 1 }, Cmd.none
    | Decrement -> { model with Count = model.Count - 1 }, Cmd.none
    | Load -> { model with Loading = true }, Cmd.OfAsync.perform loadData () Loaded
    | Loaded value -> { model with Loading = false; Count = value }, Cmd.none

순수 함수 테스트:

let initial = { Count = 0; Loading = false }
let model', _ = update Increment initial
assert (model'.Count = 1)
// 목(mock), DI 없음 - 투명한 흐름

선언적 효과:

  • Cmd 값이 부작용 명시적 분리
  • 비동기 행동을 선언적, 테스트 가능, 재시도 가능 유닛으로 변환
  • 명령형 콜백 지옥 배제

5.2 비동기 흐름: Cmd.OfAsync, 재시도, 에러 라우팅

조합자:

  • Cmd.OfAsync.perform: 성공 시 메시지 발송, 에러 무시
  • Cmd.OfAsync.either: 성공 및 에러 메시지 모두 발송
  • Cmd.OfPromise.perform: JavaScript 프로미스용 (Fable)

재시도 및 에러 처리 포함 안전 데이터 페칭:

type Msg =
    | Load
    | Loaded of Data
    | Error of exn

let update msg model =
    match msg with
    | Load ->
        let cmd =
            Cmd.OfAsync.either
                (fun () -> api.FetchData()) () Loaded Error
        { model with Loading = true }, cmd

    | Loaded data ->
        { model with Loading = false; Data = Some data }, Cmd.none

    | Error e ->
        { model with Loading = false; Error = Some e.Message }, Cmd.none

낙관적 업데이트 (트랜잭션 UI 업데이트):

| Submit order ->
    let newModel = { model with Pending = true; Orders = order :: model.Orders }
    newModel, Cmd.OfAsync.either api.Submit order Submitted SubmitFailed
// API 실패 시 롤백: 낙관적 항목 제거 - 가변 부작용 없음

5.3 구독: 타이머, 소켓, DOM 이벤트를 메시지 루프에 연결

타이머 구독 (1초마다 Tick 발송):

let timerSub dispatch =
    let rec loop () = async {
        do! Async.Sleep 1000
        dispatch Tick
        return! loop ()
    }
    Async.StartImmediate(loop())

let subscriptions model =
    if model.Active then Cmd.ofSub timerSub else Cmd.none

WebSocket 구독 (메시지 수신 시 발송):

let socketSub dispatch =
    let ws = new WebSocket("wss://feed")
    ws.onmessage <- fun e -> dispatch (Received e.data)
    ws.onclose <- fun _ -> dispatch Disconnected

생명주기 제어: 구독은 Cmd<Msg> 값, Elmish 런타임이 자동 attach/detach → 소켓 누수 없고 메모리 안정

5.4 Elmish.Bridge vs SAFE Stack

Bridge (폐쇄 생태계 용):

type ServerMsg = PriceUpdate of float | TradeAck of string
type ClientMsg = PlaceTrade of Trade | Ping

Program.mkProgram init update view
|> Program.withBridge
    (Bridge.endpoint "/socket"
        |> Bridge.withServerHub<ServerMsg, ClientMsg>())
|> Program.run

Bridge 사용 시점:

  • 모든 클라이언트가 Elmish/Fable 기반
  • 메시지 타입 통합 원함
  • ASP.NET SignalR 호환

피할 상황: 열린 또는 혼합 프론트엔드 - 일반 WebSocket/SSE가 더 간단

SAFE Stack: Elmish 구독을 WebSocket 엔드포인트에 직접 연결, 명령을 통해 재연결 재시도 → TypeScript 클라이언트와 호환

5.5 Fable/TypeScript 프론트엔드와 상호운용: 공유 메시지 계약, JSON 코덱

공유 스키마 (F# 정의):

type Trade =
    { Id: Guid; Symbol: string; Qty: int; Price: decimal }

type ClientToServer =
    | PlaceTrade of Trade
    | Cancel of Guid

type ServerToClient =
    | TradeUpdate of Trade
    | Acknowledged of Guid

JSON 코덱 (Thoth.Json):

let encodeTrade t = 
  Encode.object [ 
    "Id", Encode.guid t.Id; 
    "Symbol", Encode.string t.Symbol 
  ]

TypeScript 상호운용:

interface Trade { id: string; symbol: string; qty: number; price: number }

백프레셔 조정: 고빈도 Elmish 업데이트 시 .NET 측 Channel/스로틀, Elmish는 Cmd.OfAsync.either + Async.Sleep 간격 제어 → 계층별 처리량 제약 존중

6. .NET 백엔드 실시간 + 정확성

6.1 트랜스포트 선택: SignalR, WebSocket, SSE

트랜스포트 방향 이상적 사용 비고
SignalR 양방향 채팅, 협업 프로토콜 추상화, 자동 재연결
WebSocket 양방향 저지연 거래 전체 제어, 바이너리 프레임
SSE 서버 → 클라이언트 시장 데이터, 텔레메트리 간결, HTTP 친화, 자동 재연결

SignalR 예제:

public class TradeHub : Hub
{
    public async Task SendTrade(Trade trade)
        => await Clients.All.SendAsync("TradeUpdate", trade);
}

SSE 엔드포인트 예제:

app.MapGet("/feed", async res =>
{
    res.Headers.Add("Content-Type", "text/event-stream");
    await foreach (var evt in FeedService.Stream())
    {
        await res.WriteAsync($"data: {JsonSerializer.Serialize(evt)}\n\n");
        await res.Body.FlushAsync();
    }
});

6.2 백프레셔 및 흐름 제어

System.Threading.Channels (유계 큐):

var channel = Channel.CreateBounded<Trade>(new BoundedChannelOptions(1000)
{
    FullMode = BoundedChannelFullMode.Wait
});
// 제작자가 공간 대기 → 자연스러운 백프레셔

소비 패턴:

await foreach (var trade in channel.Reader.ReadAllAsync())
{
    await ProcessTrade(trade);
}

TPL Dataflow 대안 (풍부한 파이프라인 또는 병렬성):

var block = new TransformBlock<Trade, Order>(
    t => Transform(t),
    new ExecutionDataflowBlockOptions { 
      BoundedCapacity = 500, 
      MaxDegreeOfParallelism = 4 
    });

6.3 분산 시스템의 멱등성

멱등성 키 패턴 (중복 처리 방지):

private readonly ISet<Guid> _processed = new HashSet<Guid>();

public async Task HandleAsync(Command cmd)
{
    if (_processed.Contains(cmd.Id)) return;
    _processed.Add(cmd.Id);
    await Process(cmd);
}

프로덕션: 트랜잭션 데이터베이스 또는 Redis 중복 제거 테이블

아웃박스 패턴 (메시지 손실 방지):

await db.SaveChangesAsync();
await outbox.WriteAsync(new OutboxEvent(evt));
// 백그라운드 워커가 아웃박스 드레인 → 게시 전 영속성

미들웨어 통합 (MediatR + Polly):

builder.Services.AddTransient(typeof(IPipelineBehavior<,>), typeof(IdempotencyBehavior<,>));
builder.Services.AddResiliencePipeline("default", p => 
  p.AddRetry(3).AddCircuitBreaker(5));

6.4 Rx.NET 서비스에서 이벤트 조합

집계, 조인, 시간 기반 분석 (제한 범위 내):

var feed = market.ObservePrices()
    .Buffer(TimeSpan.FromSeconds(1))
    .Select(batch => batch.Average(x => x.Price))
    .Subscribe(avg => Console.WriteLine($"Avg: {avg}"));

용도: 제한 범위 내 텔레메트리/분석. 교차 서비스: Channel이 관찰성/제어 더 우수

6.5 관리형 vs DIY: Azure Web PubSub

관리형 (Azure Web PubSub):

  • 투명 확장, 내장 인증, 지리 배포
  • 글로벌 대시보드/채팅에 이상적

자체 호스팅 (SignalR/YARP):

  • 전체 제어, 지연 중요 시
  • 채널 용량 수동 튜닝

하이브리드: 외부 클라이언트 관리형 PubSub, 내부 오케스트레이션 자체 호스팅

6.6 관찰성: 큐 깊이, 드롭 vs 처리, 테넌트별 속도 제한

채널 메트릭:

var gauge = meter.CreateObservableGauge("channel_depth", () => channel.Reader.Count);

속도 제한 (.NET의 RateLimiter 미들웨어):

app.UseRateLimiter(new RateLimiterOptions
{
    GlobalLimiter = PartitionedRateLimiter.Create<HttpContext, string>(_ =>
        RateLimitPartition.GetFixedWindowLimiter("default", _ => new FixedWindowRateLimiterOptions
        {
            PermitLimit = 100,
            Window = TimeSpan.FromSeconds(1)
        }))
});

트레이싱 및 상관 (ActivitySource):

각 요청이 상관 ID 전파 → 사용자 작업 RxJS → 백엔드 명령 추적
ILogger.BeginScope로 동일 ID 로그 → 엔드투엔드 지연 진단

7. 엔드투엔드 구현: 라이브 주문 장부 및 거래

7.1 요구사항

  • 실시간 시장 데이터 스트리밍 (가격, 주문 장부)
  • 사용자 거래 명령 수용
  • 멱등성 및 백프레셔 안전성
  • 네트워크 중단 후 우아한 복구

7.2 백엔드 (ASP.NET Core)

데이터 파이프라인:

var feed = Channel.CreateBounded<Trade>(5000);
_ = Task.Run(async () =>
{
    await foreach (var trade in source.StreamAsync())
        await feed.Writer.WriteAsync(trade);
});

SignalR + SSE 팬아웃:

app.MapHub<TradeHub>("/ws");
app.MapGet("/sse", async res =>
{
    res.Headers.ContentType = "text/event-stream";
    await foreach (var t in feed.Reader.ReadAllAsync())
        await res.WriteAsync($"data:{JsonSerializer.Serialize(t)}\n\n");
});

명령 엔드포인트 (멱등성 포함):

app.MapPost("/trade", async (TradeCommand cmd, ITradeService svc) =>
{
    await svc.HandleAsync(cmd);
    return Results.Ok();
});

7.3 UI (TypeScript)

네트워크 계층:

const trades$ = eventSource$('/sse').pipe(
  retry({ delay: 2000 }),
  map(e => JSON.parse(e.data)),
  shareReplay(1)
);

const trades = toSignal(trades$, { initialValue: [] });

거래 제출:

const tradeCmd$ = new Subject<Trade>();

tradeCmd$
  .pipe(concatMap(cmd => ajax.post('/trade', cmd)))
  .subscribe();

속도 제한 (UI 페인트 3Hz):

trades$
  .pipe(auditTime(300))
  .subscribe(renderOrderBook);

7.4 Elmish (F#) 모듈

type Msg =
    | NewPrice of float
    | SubmitTrade of Trade
    | Confirmed of Trade
    | Error of exn

let update msg model =
    match msg with
    | NewPrice p -> { model with Price = p }, Cmd.none
    | SubmitTrade t -> { model with Pending = true }, Cmd.OfAsync.either api.Submit t Confirmed Error
    | Confirmed t -> { model with Pending = false; Trades = t :: model.Trades }, Cmd.none
    | Error e -> { model with Error = Some e.Message }, Cmd.none

7.5 교차 스택 백프레셔

  1. 브라우저: auditTime(300) - DOM 업데이트 평활
  2. 백엔드: BoundedChannels - 과부하 방지
  3. Elmish: 명령 스로틀 - 동시 제출 제한

엔드투엔드 흐름 제어 체인

7.6 장애 복구 드릴

  • 서버 재시작: SSE Last-Event-ID 헤더 → 재개 가능 피드
  • 중복 명령: 서버 중복 제거 테이블 → 반복 거부
  • 클라이언트 재연결: RxJS switchMap → 자동 스트림 재초기화

→ 모든 복구 선언적, 측정 가능

7.7 배포 및 인프라

Docker:

FROM mcr.microsoft.com/dotnet/aspnet:8.0
COPY ./out /app
ENTRYPOINT ["dotnet", "TradeFeed.dll"]

환경 토글:

{ "Transport": "SSE" }

CI: 백프레셔 및 재연결 검증 통합 테스트

→ 결과: 구성 가능, 반응적, 수학적으로 검증 가능한 고부하 실시간 시스템

8. 프로덕션 체크리스트: 성능, 누수, 운영

8.1 RxJS 인체공학 및 성능

연산자 선택 매트릭스:

시나리오 선호 연산자 이유
검색 제안 switchMap 오래된 요청 취소, 최신 승리
순차 명령 concatMap 순서 보존, 겹침 없음
백그라운드 작업 mergeMap 최대 동시성, 논블로킹
탭투페이 exhaustMap 활성 중 중복 무시

속도 제한 선택:

throttleTime(200);   // 드래그/스크롤: 첫 값
debounceTime(300);   // 입력 자동완성: 마지막 값
auditTime(300);      // UI 페인트: 마지막 후 발송
sampleTime(200);     // 메트릭 대시보드: 주기적 스냅샷

중복 계산 방지:

price$.pipe(distinctUntilChanged()).subscribe(renderPrice);

8.2 메모리 누수 방지

통합 정리:

const destroy$ = new Subject<void>();

feed$.pipe(takeUntil(destroy$)).subscribe(render);

destroy$.next();
destroy$.complete();

Angular 자동 정리:

constructor(destroyRef: DestroyRef) {
  this.feed$.pipe(takeUntilDestroyed(destroyRef)).subscribe(render);
}

템플릿에서 async 파이프 또는 Signal (수동 구독 회피):

<div *ngIf="price$ | async as price">Price: {{ price }}</div>

Signal 메모리 가드: Signal을 동적으로 effect 내에서 생성하지 말 것 (지수 의존성 그래프)

8.3 Signal 베스트 프랙티스

역할 분리:

  • Signal: 로컬 상태 (폼 필드, 계산 합계)
  • Observable: 비동기 I/O (WebSocket, SSE, 타이머)
  • 모서리에서만 브리지 (toSignal, toObservable)

파생 Signal과 메모이제이션:

const avgPrice = computed(() =>
  trades().length
    ? trades().reduce((s, t) => s + t.price, 0) / trades().length
    : 0
);

제어된 Effect (데이터 페칭 X):

effect(() => console.log('New trade count:', trades().length));
// 모든 부작용 RxJS로 위임

8.4 Elmish 강화

불변성 강제:

// ❌ model.Count <- model.Count + 1
// ✓ { model with Count = model.Count + 1 }

철저한 패턴 매칭 (컴파일러 경고):

match msg with
| Increment -> ...
| Decrement -> ...
| _ -> model, Cmd.none

부작용 격리 (Cmd/Sub만 실행):

// 로직 → 헬퍼 모듈, update 내부 아님
let update msg model = ...

Update 테스트 (스냅샷):

let model, cmd = update (Receive "Hello") initialModel
Assert.Equal(["Hello"], model.Messages)

8.5 .NET 백엔드 처리량/지연

유계 Channel 우선:

var channel = Channel.CreateBounded<Event>(new BoundedChannelOptions(1000)
{
    FullMode = BoundedChannelFullMode.Wait
});

포화도 측정:

metrics.Gauge("queue_depth", () => channel.Reader.Count);

Dataflow 사용 시점:

  • 병렬 제어 필요
  • 팬인/팬아웃 파이프라인
  • 배칭 및 스로틀 내장

8.6 멱등성 및 재시도

명령 멱등성 키:

public record PlaceOrder(Guid Id, string Symbol, decimal Qty);
if (await db.Orders.AnyAsync(o => o.Id == cmd.Id)) return;

결정론적 핸들러 (재플레이 가능):

// 시스템 시간/난수 X → 모든 입력 명시적

Polly 재시도 + 차단기:

var policy = Policy
    .Handle<Exception>()
    .WaitAndRetryAsync(3, i => TimeSpan.FromSeconds(Math.Pow(2, i)))
    .WrapAsync(Policy.CircuitBreakerAsync(5, TimeSpan.FromSeconds(30)));

8.7 관찰성 및 SLO

핵심 메트릭:

  • 이벤트 지연 (enqueue → emit)
  • 큐 깊이
  • 드롭 vs 처리
  • 재구독 횟수

추적 및 상관:

meter.CreateObservableGauge("feed_lag_ms", () => MeasureLag());
// 각 요청: X-Correlation-Id 전파
// 백엔드: ILogger.BeginScope(id) → 엔드투엔드 타이밍

SLO 정의:

  • 피드 업데이트 99번째 백분위수 < 250ms
  • 99.9% 성공 재연결 < 5s
  • 무제한 큐 0개

8.8 보안 및 복원력

WebSocket/SSE 인증 (JWT):

options.Events = new JwtBearerEvents
{
    OnMessageReceived = ctx =>
    {
        var accessToken = ctx.Request.Query["access_token"];
        if (!string.IsNullOrEmpty(accessToken))
            ctx.Token = accessToken;
        return Task.CompletedTask;
    }
};

테넌트별 쿼터:

if (client.EventCount > 1000) await client.DisconnectAsync("Rate limit exceeded");

서버 측 필터링 (다중 테넌트 전송 피함):

필터 + 변환 → 서버 측 완료 (클라이언트 측 X)

복구 드릴: 네트워크 드롭, DB 스로틀, 재연결 시뮬레이션 → 백프레셔 + 재시도 결정론적 확인

인프라 위생: 롤링 재시작, 연결 드레인, 유휴 타임아웃, 버전 태깅 (전면 호환)


실용적 팁 및 주의사항

  1. Observable은 “시간이 있는 것”, Signal은 “현재값”: 시간 관련 작업은 Observable, 현재 상태 렌더링은 Signal로 엄격히 분리
  2. toSignal 첫 가입자는 초기값 필수: 콜드 옵저버블은 initialValue 없으면 undefined 렌더링 위험
  3. switchMap은 자동 취소 제공: 검색/필터링에서 이전 요청 자동 정리 → 메모리 누수 예방
  4. BackpressureStrategy는 도메인마다 다름: 금융(무손실), 센서(손실 허용) - 용도에 맞춰 선택
  5. Cmd는 테스트 가능 명령어 목록: Elmish 순수성을 활용해 메시지 흐름 시뮬레이션 → 버그 조기 적발

주의사항

  1. 중첩 구독 금지: subscribe(x => subscribe(...)) 형태는 메모리 누수, switchMap/mergeMap 사용
  2. Async fetch inside effect는 위험: 여러 fetch 동시 실행 → 오래된 응답이 최신 덮음. RxJS switchMap 위임
  3. Signal에서 신규 Signal 생성 금지: 동적 signal 생성 → 지수 의존성 그래프 → 성능 악화
  4. 취소 없는 retry는 무한 루프 위험: takeUntil(stop$) 또는 시간 제한 설정 필수
  5. 멱등성 키 없는 재시도는 중복 처리: 매 명령마다 고유 ID 필수, 백엔드 중복 제거 구현
  6. SSE Last-Event-ID 미사용 시 피드 손실: 유휴 후 재연결 시 과거 메시지 손실 → 헤더 구현 필수
  7. 관찰성 없는 실시간 시스템은 암흑: 큐 깊이, 드롭율, 지연 메트릭 필수 → SLO 모니터링

학습 리소스 및 참고 자료

공식 문서

Elmish 및 F#

.NET 실시간

고급 주제

마블 다이어그램 시각화

1개의 좋아요