typescript rxjs #Signals elmish reactivepatterns #FunctionalProgramming
Reactive + Functional UI Patterns in TypeScript and F#: RxJS, Signals, and Elmish for Real-Time Apps
핵심 요약
실시간 앱 구축의 새로운 표준: RxJS 옵저버블(비동기 스트림), Signals(동기 상태 추적), 그리고 Elmish MVU(순수 함수형 상태 관리)를 결합하여 지연 시간, 동시성, 일관성을 동시에 해결합니다. WebSocket/SSE 백프레셔 관리부터 .NET 백엔드의 채널 기반 흐름 제어까지, 이 세 계층의 패러다임이 혼돈 없이 예측 가능한 실시간 아키텍처를 만듭니다. 거래 대시보드, 협업 에디터, IoT 텔레메트리 같은 고부하 실시간 시스템에서 메모리 누수, 경합 조건, 상태 동기화 실패를 원칙적으로 배제할 수 있습니다.
상세 요약
1. 문제 정의: 실시간 앱의 도전 과제
1.1 실시간 시스템의 특성
- 사용자 기대: 대시보드는 즉시 새로고침, 협업 편집은 무결음, 데이터 UI는 부드러운 반응
- 대표 사례:
- 거래 대시보드: 초당 수천 개의 시장 틱 처리, UI 버벅 방지 필요
- 협업 편집기: 로컬 및 원격 편집 충돌 해결, 결정론적 일관성 유지
- IoT 모니터링: 수천 장치의 연속 텔레메트리, 동적 구독 관리
- 라이브 채트: 메시지, 타이핑 표시기, 온라인 상태 - 모두 비동기이면서 상호 의존
1.2 공통 실패 패턴 및 해결책
| 문제 | 증상 | 해결책 |
|---|---|---|
| 공유 가변 상태 | 시간적 경합 버그 | 불변 업데이트 + 순수 변환 |
| 경합 조건 | 오래된 데이터가 새 상태 덮어씀 | 취소 의미론(cancellation semantics) |
| 누수 구독 | 메모리 누적, 컴포넌트 생명주기 초과 | 결정론적 정리(takeUntil, 자동 정리) |
| 중복 부작용 | 불필요한 재렌더에 의한 네트워크 중복 | 선언적 효과 격리(Elmish Cmd) |
| 버벅 재렌더 | 전체 컴포넌트 트리 재칠 | 세밀한 변경 감지(Signals) |
| 무제한 큐 | 메모리 팽창 또는 UI 동결 | 백프레셔 인식 스트림(bounded channels) |
2. 세 가지 기둥: 패러다임 분리
2.1 Observables (RxJS / Rx.NET) - 비동기 변화를 담당
개념: 시간에 따라 변하는 값의 스트림. 구독자에게 값을 푸시
주요 이점:
- 내장 취소 및 백프레셔 인식
- 선언적 연산자 조합 (map, merge, switchMap, etc.)
- 결정론적 정리 (unsubscribe, takeUntil)
사용 사례: WebSocket 프레임, 디바운스된 키입력, API 재시도, 텔레메트리 버스트
콜드 vs 핫 옵저버블:
- 콜드: 구독하면 생성 시작 (fetch 결과)
- 핫: 이미 공유 소스 방출 중 (마우스 이동, WebSocket)
- 중복 작업 방지:
share()또는shareReplay()멀티캐스트
백프레셔 전략:
- 손실 전략: 샘플링 또는 드롭 (
sampleTime(200)) - 무손실 전략: 버퍼링 또는 윈도우 (
bufferTime(1000))
2.2 Signals - 동기 상태 의존성 추적
개념: 신호가 변하면 의존하는 것들만 재계산 (풀 기반, 동기식)
생명주기:
- Signal: 값 저장 및 의존자 알림
- Computed: 의존성 변경 시 자동 재평가
- Effect: 상태 변경에 따른 부작용
이점: 불필요한 재렌더 제거, 의존성 자동 추적 (DAG)
잘못된 vs 올바른 사용:
신호로 비동기 이벤트 처리 (경합 위험)- ✓ 옵저버블로 스트림 받고
toSignal()로 최신값 투영
2.3 Elmish MVU (F#) - 순수 함수형 상태 관리
Model = 불변 도메인 상태
Msg = 메시지 판별 연합
update = 순수 함수: (Msg × Model) → (Model × Cmd<Msg>)
type Model = { Connected : bool; Messages : string list }
type Msg = Connect | Disconnect | Receive of string
let update msg model =
match msg with
| Connect -> { model with Connected = true }, Cmd.none
| Disconnect -> { model with Connected = false }, Cmd.none
| Receive text -> { model with Messages = text :: model.Messages }, Cmd.none
Elmish의 강점:
- 모든 전환이 순수하고 예측 가능
- 부작용을
Cmd를 통해 명시적으로 분리 - 상태 전환을 모델 업데이트 없이 테스트 가능
- 시간 여행 디버깅 가능
- 복잡한 거래 대시보드/텔레메트리에서 비동기 간 교차 채팅 방지
2.4 도구 선택 매트릭스
| 문제 | 권장 도구 | 이유 |
|---|---|---|
| UI 값 도출 (합계, 필터, 계산 필드) | Signal | 동기 전파, 최소 재렌더 |
| 연속 비동기 업데이트 (WebSocket, SSE, DOM 이벤트) | Observable | 푸시 기반, 취소 가능 |
| 모듈 수준 상태 전환 (사용자 작업, 효과) | Elmish MVU | 순수 업데이트, 명시적 효과 |
| 교차 계층 동기화 (UI ↔ 백엔드) | RxJS + Elmish | 예측 가능한 메시징 및 재시도 |
| 비동기 데이터의 최신 스냅샷 렌더 | Observable → Signal | 안정적 뷰 바인딩 |
3. RxJS로 실시간 스트림 모델링
3.1 SSE (Server-Sent Events) vs WebSocket
| 특성 | SSE | WebSocket |
|---|---|---|
| 방향 | 서버 → 클라이언트 | 양방향 |
| 사용 사례 | 텔레메트리, 피드 업데이트 | 거래, 채팅 |
| 재연결 | 자동 | 수동 |
| HTTP 친화적 | ✓ 캐싱/프록시 통과 | ✗ |
SSE 장점: 경량, 자동 재연결, 방화벽 친화적, 단방향 시나리오에 최적
WebSocket 장점: 저지연 양방향, 바이너리 프레임 가능
관리형 인프라 옵션: Azure Web PubSub, AWS API Gateway WebSocket, Ably, Pusher
3.2 RxJS 기초
콜드 옵저버블로 트랜스포트 래핑:
function webSocket$(url: string): Observable<MessageEvent> {
return new Observable(observer => {
const ws = new WebSocket(url);
ws.onmessage = e => observer.next(e);
ws.onerror = e => observer.error(e);
ws.onclose = () => observer.complete();
return () => ws.close(); // 정리
});
}
고차 매핑 연산자 선택:
| 연산자 | 동작 | 사용 사례 |
|---|---|---|
| switchMap | 새 방출 시 이전 내부 옵저버블 취소 | 재연결 또는 검색 입력 |
| concatMap | 순차 실행, 이전 완료 대기 | 순서 보장 필요 명령 |
| mergeMap | 모두 동시 처리 | 병렬 네트워크 요청 |
| exhaustMap | 활성 중 새 방출 무시 | 중복 제출 방지 |
재연결 로직:
const reconnect$ = interval(5000).pipe(
switchMap(() => webSocket$('wss://feed').pipe(retry({ delay: 2000 })))
);
3.3 백프레셔 및 속도 제어
시간 기반 스로틀링:
feed$.pipe(throttleTime(200)); // 먼저 방출, 침묵
feed$.pipe(auditTime(200)); // 조용한 후 마지막만
feed$.pipe(sampleTime(200)); // 주기적 샘플링
feed$.pipe(debounceTime(300)); // 유휴 대기
용도별 선택:
- throttleTime: 드래그/이동 이벤트
- debounceTime: 텍스트 입력
- auditTime: 재칠 전 최신값
- sampleTime: 모니터링 대시보드
카운트 기반 버퍼링 (손실 없이 배치 처리):
feed$.pipe(bufferCount(100)).subscribe(batch => sendToServer(batch));
feed$.pipe(bufferTime(1000)).subscribe(batch => updateUI(batch.length));
손실 vs 무손실 전략:
- 손실: 중간값 드롭 (스로틀, 샘플) - CPU 사용량 높은 그래프에 적합
- 무손실: 모든 데이터 보존 (버퍼, 윈도우) - 금융 거래에 적합
3.4 에러 처리 및 재시도 (취소 지원)
간단한 재시도:
feed$.pipe(retry({ delay: 2000 })).subscribe();
제어된 재시도 및 지수 백오프:
feed$.pipe(
retryWhen(errors => errors.pipe(delay(1000))),
takeUntil(stop$)
).subscribe();
AbortController 통합:
function fetch$(url: string, signal: AbortSignal) {
return from(fetch(url, { signal })).pipe(switchMap(r => r.json()));
}
const controller = new AbortController();
fetch$('https://api/data', controller.signal)
.pipe(retry({ delay: 1000 }))
.subscribe(console.log);
controller.abort(); // 정리
멱등성 보장 (재시도 안전):
interface Command { id: string; payload: any }
function sendCommand$(cmd: Command) {
return ajax.post('/api/command', cmd);
}
// 백엔드가 id로 중복 제거
3.5 메모리 안전 패턴
takeUntil로 생명주기 바인딩:
const destroy$ = new Subject<void>();
feed$.pipe(takeUntil(destroy$)).subscribe(updateUI);
// 컴포넌트 정리 시
destroy$.next();
destroy$.complete();
Angular의 takeUntilDestroyed (자동):
@Component({...})
export class Dashboard {
constructor(destroyRef: DestroyRef) {
feed$.pipe(takeUntilDestroyed(destroyRef)).subscribe(updateUI);
}
}
중첩 구독 방지 (메모리 누수 원인):
feed$.subscribe(f => another$.subscribe(...))- ✓
feed$.pipe(switchMap(f => another$)).subscribe(...)
3.6 프로덕션급 예제
백오프와 toSignal을 통한 탄력적 SSE 소비:
const feed$ = defer(() => eventSource$('/api/feed')).pipe(
retry({ delay: 2000 }), // 재연결
map(e => JSON.parse(e.data)),
shareReplay(1)
);
const latestData = toSignal(feed$, { initialValue: { price: 0 } });
// Angular 템플릿은 latestData() 변경 시 자동 재렌더
순서 보장 멱등 전송을 위한 WebSocket 명령 스트림:
const trade$ = new Subject<TradeCmd>();
const send$ = trade$.pipe(
concatMap(cmd =>
ajax.post('/api/trade', cmd).pipe(
catchError(err => of({ ...cmd, error: true }))
)
),
shareReplay(1)
);
trade$.next({ id: crypto.randomUUID(), symbol: 'AAPL', qty: 50 });
// concatMap은 각 명령 완료 후 다음 시작 → 재시도 중 순서 보존
4. Signals - 현대 UI 프레임워크의 세밀한 반응성
4.1 프레임워크별 Signal 수렴
Angular Signals:
const count = signal(0);
const doubled = computed(() => count() * 2);
effect(() => console.log('Doubled:', doubled()));
SolidJS Signals (가상 DOM 없음, 진정한 세밀한 반응성):
const [count, setCount] = createSignal(0);
createEffect(() => console.log(count()));
Preact Signals (React 호환):
const count = signal(0);
effect(() => console.log('Count', count.value));
수렴:
- 동기식, 의존성 추적 값
- 자동 파생 (computed)
- 변경 시 부작용 (effect)
- 최소한의 업데이트로 예측 가능
4.2 Angular 실용적 상호운용 패턴
toSignal - 최신 값 렌더링 (Observable → Signal):
@Component({
template: `<div>Latest price: {{ price() }}</div>`
})
export class PriceTicker {
price = toSignal(price$, { initialValue: 0 });
}
effect() - 상태 변경에 따른 부작용:
const filter = signal('AAPL');
effect(() => {
console.log('Filter changed:', filter());
socket.send(JSON.stringify({ type: 'subscribe', symbol: filter() }));
});
toObservable - 하위 제어 흐름 (Signal → Observable):
const query = signal('');
toObservable(query)
.pipe(
debounceTime(300),
switchMap(q => http.get(`/api/search?q=${q}`))
)
.subscribe(renderResults);
4.3 Signal에서 발생하는 미묘한 경합 (비동기 페칭)
경합 문제:
// ❌ 문제: 여러 fetch 중복, 느린 응답이 새것 덮음
effect(() => {
fetch(`/api/search?q=${query()}`)
.then(r => r.json())
.then(renderResults);
});
RxJS switchMap으로 복구:
// ✓ 새 쿼리 도착 시 이전 요청 취소
toObservable(query)
.pipe(
debounceTime(300),
switchMap(q => from(fetch(`/api/search?q=${q}`)).pipe(switchMap(r => r.json())))
)
.subscribe(renderResults);
핵심 통찰: Signal은 렌더 프레임 내 일관성 보장, 비동기 경계 아님. 비동기는 항상 RxJS에 위임 (취소, 재시도, 순서).
4.4 Preact Signals - React 앱에서 vs React Query/Zustand
useState 대체 (세밀한 반응성):
const counter = signal(0);
function Counter() {
return <button onClick={() => counter.value++}>{counter.value}</button>;
}
// useState와 달리 counter.value++ 는 컴포넌트 재렌더 트리거 안 함, signal 소비자만 업데이트
역할 분담:
- React Query: 원격 비동기 상태 (데이터 페칭, 캐싱)
- Zustand: 중앙 공유 상태
- Signals: 세밀한 로컬 반응성 - UI 계산 또는 인터컴포넌트 파생 상태
베스트 프랙티스:
const userSignal = signal<User | null>(null);
useEffect(() => {
fetchUser().then(u => (userSignal.value = u));
}, []);
스트림을 모서리에 유지:
- RxJS(WebSocket, SSE)로 비동기 데이터 스트림
- Signal로 현재 스냅샷 변환
- Computed signal로 UI 상태 파생
- 새로운 비동기 흐름 오케스트레이션 시에만 Observable로 역변환
5. Elmish (F#) - 예측 가능한 비동기 오케스트레이션
5.1 MVU 기초: Model, Messages, Update, Commands
최소 예제:
type Model = { Count: int; Loading: bool }
type Msg =
| Increment
| Decrement
| Load
| Loaded of int
let update msg model =
match msg with
| Increment -> { model with Count = model.Count + 1 }, Cmd.none
| Decrement -> { model with Count = model.Count - 1 }, Cmd.none
| Load -> { model with Loading = true }, Cmd.OfAsync.perform loadData () Loaded
| Loaded value -> { model with Loading = false; Count = value }, Cmd.none
순수 함수 테스트:
let initial = { Count = 0; Loading = false }
let model', _ = update Increment initial
assert (model'.Count = 1)
// 목(mock), DI 없음 - 투명한 흐름
선언적 효과:
Cmd값이 부작용 명시적 분리- 비동기 행동을 선언적, 테스트 가능, 재시도 가능 유닛으로 변환
- 명령형 콜백 지옥 배제
5.2 비동기 흐름: Cmd.OfAsync, 재시도, 에러 라우팅
조합자:
Cmd.OfAsync.perform: 성공 시 메시지 발송, 에러 무시Cmd.OfAsync.either: 성공 및 에러 메시지 모두 발송Cmd.OfPromise.perform: JavaScript 프로미스용 (Fable)
재시도 및 에러 처리 포함 안전 데이터 페칭:
type Msg =
| Load
| Loaded of Data
| Error of exn
let update msg model =
match msg with
| Load ->
let cmd =
Cmd.OfAsync.either
(fun () -> api.FetchData()) () Loaded Error
{ model with Loading = true }, cmd
| Loaded data ->
{ model with Loading = false; Data = Some data }, Cmd.none
| Error e ->
{ model with Loading = false; Error = Some e.Message }, Cmd.none
낙관적 업데이트 (트랜잭션 UI 업데이트):
| Submit order ->
let newModel = { model with Pending = true; Orders = order :: model.Orders }
newModel, Cmd.OfAsync.either api.Submit order Submitted SubmitFailed
// API 실패 시 롤백: 낙관적 항목 제거 - 가변 부작용 없음
5.3 구독: 타이머, 소켓, DOM 이벤트를 메시지 루프에 연결
타이머 구독 (1초마다 Tick 발송):
let timerSub dispatch =
let rec loop () = async {
do! Async.Sleep 1000
dispatch Tick
return! loop ()
}
Async.StartImmediate(loop())
let subscriptions model =
if model.Active then Cmd.ofSub timerSub else Cmd.none
WebSocket 구독 (메시지 수신 시 발송):
let socketSub dispatch =
let ws = new WebSocket("wss://feed")
ws.onmessage <- fun e -> dispatch (Received e.data)
ws.onclose <- fun _ -> dispatch Disconnected
생명주기 제어: 구독은 Cmd<Msg> 값, Elmish 런타임이 자동 attach/detach → 소켓 누수 없고 메모리 안정
5.4 Elmish.Bridge vs SAFE Stack
Bridge (폐쇄 생태계 용):
type ServerMsg = PriceUpdate of float | TradeAck of string
type ClientMsg = PlaceTrade of Trade | Ping
Program.mkProgram init update view
|> Program.withBridge
(Bridge.endpoint "/socket"
|> Bridge.withServerHub<ServerMsg, ClientMsg>())
|> Program.run
Bridge 사용 시점:
- 모든 클라이언트가 Elmish/Fable 기반
- 메시지 타입 통합 원함
- ASP.NET SignalR 호환
피할 상황: 열린 또는 혼합 프론트엔드 - 일반 WebSocket/SSE가 더 간단
SAFE Stack: Elmish 구독을 WebSocket 엔드포인트에 직접 연결, 명령을 통해 재연결 재시도 → TypeScript 클라이언트와 호환
5.5 Fable/TypeScript 프론트엔드와 상호운용: 공유 메시지 계약, JSON 코덱
공유 스키마 (F# 정의):
type Trade =
{ Id: Guid; Symbol: string; Qty: int; Price: decimal }
type ClientToServer =
| PlaceTrade of Trade
| Cancel of Guid
type ServerToClient =
| TradeUpdate of Trade
| Acknowledged of Guid
JSON 코덱 (Thoth.Json):
let encodeTrade t =
Encode.object [
"Id", Encode.guid t.Id;
"Symbol", Encode.string t.Symbol
]
TypeScript 상호운용:
interface Trade { id: string; symbol: string; qty: number; price: number }
백프레셔 조정: 고빈도 Elmish 업데이트 시 .NET 측 Channel/스로틀, Elmish는 Cmd.OfAsync.either + Async.Sleep 간격 제어 → 계층별 처리량 제약 존중
6. .NET 백엔드 실시간 + 정확성
6.1 트랜스포트 선택: SignalR, WebSocket, SSE
| 트랜스포트 | 방향 | 이상적 사용 | 비고 |
|---|---|---|---|
| SignalR | 양방향 | 채팅, 협업 | 프로토콜 추상화, 자동 재연결 |
| WebSocket | 양방향 | 저지연 거래 | 전체 제어, 바이너리 프레임 |
| SSE | 서버 → 클라이언트 | 시장 데이터, 텔레메트리 | 간결, HTTP 친화, 자동 재연결 |
SignalR 예제:
public class TradeHub : Hub
{
public async Task SendTrade(Trade trade)
=> await Clients.All.SendAsync("TradeUpdate", trade);
}
SSE 엔드포인트 예제:
app.MapGet("/feed", async res =>
{
res.Headers.Add("Content-Type", "text/event-stream");
await foreach (var evt in FeedService.Stream())
{
await res.WriteAsync($"data: {JsonSerializer.Serialize(evt)}\n\n");
await res.Body.FlushAsync();
}
});
6.2 백프레셔 및 흐름 제어
System.Threading.Channels (유계 큐):
var channel = Channel.CreateBounded<Trade>(new BoundedChannelOptions(1000)
{
FullMode = BoundedChannelFullMode.Wait
});
// 제작자가 공간 대기 → 자연스러운 백프레셔
소비 패턴:
await foreach (var trade in channel.Reader.ReadAllAsync())
{
await ProcessTrade(trade);
}
TPL Dataflow 대안 (풍부한 파이프라인 또는 병렬성):
var block = new TransformBlock<Trade, Order>(
t => Transform(t),
new ExecutionDataflowBlockOptions {
BoundedCapacity = 500,
MaxDegreeOfParallelism = 4
});
6.3 분산 시스템의 멱등성
멱등성 키 패턴 (중복 처리 방지):
private readonly ISet<Guid> _processed = new HashSet<Guid>();
public async Task HandleAsync(Command cmd)
{
if (_processed.Contains(cmd.Id)) return;
_processed.Add(cmd.Id);
await Process(cmd);
}
프로덕션: 트랜잭션 데이터베이스 또는 Redis 중복 제거 테이블
아웃박스 패턴 (메시지 손실 방지):
await db.SaveChangesAsync();
await outbox.WriteAsync(new OutboxEvent(evt));
// 백그라운드 워커가 아웃박스 드레인 → 게시 전 영속성
미들웨어 통합 (MediatR + Polly):
builder.Services.AddTransient(typeof(IPipelineBehavior<,>), typeof(IdempotencyBehavior<,>));
builder.Services.AddResiliencePipeline("default", p =>
p.AddRetry(3).AddCircuitBreaker(5));
6.4 Rx.NET 서비스에서 이벤트 조합
집계, 조인, 시간 기반 분석 (제한 범위 내):
var feed = market.ObservePrices()
.Buffer(TimeSpan.FromSeconds(1))
.Select(batch => batch.Average(x => x.Price))
.Subscribe(avg => Console.WriteLine($"Avg: {avg}"));
용도: 제한 범위 내 텔레메트리/분석. 교차 서비스: Channel이 관찰성/제어 더 우수
6.5 관리형 vs DIY: Azure Web PubSub
관리형 (Azure Web PubSub):
- 투명 확장, 내장 인증, 지리 배포
- 글로벌 대시보드/채팅에 이상적
자체 호스팅 (SignalR/YARP):
- 전체 제어, 지연 중요 시
- 채널 용량 수동 튜닝
하이브리드: 외부 클라이언트 관리형 PubSub, 내부 오케스트레이션 자체 호스팅
6.6 관찰성: 큐 깊이, 드롭 vs 처리, 테넌트별 속도 제한
채널 메트릭:
var gauge = meter.CreateObservableGauge("channel_depth", () => channel.Reader.Count);
속도 제한 (.NET의 RateLimiter 미들웨어):
app.UseRateLimiter(new RateLimiterOptions
{
GlobalLimiter = PartitionedRateLimiter.Create<HttpContext, string>(_ =>
RateLimitPartition.GetFixedWindowLimiter("default", _ => new FixedWindowRateLimiterOptions
{
PermitLimit = 100,
Window = TimeSpan.FromSeconds(1)
}))
});
트레이싱 및 상관 (ActivitySource):
각 요청이 상관 ID 전파 → 사용자 작업 RxJS → 백엔드 명령 추적
ILogger.BeginScope로 동일 ID 로그 → 엔드투엔드 지연 진단
7. 엔드투엔드 구현: 라이브 주문 장부 및 거래
7.1 요구사항
- 실시간 시장 데이터 스트리밍 (가격, 주문 장부)
- 사용자 거래 명령 수용
- 멱등성 및 백프레셔 안전성
- 네트워크 중단 후 우아한 복구
7.2 백엔드 (ASP.NET Core)
데이터 파이프라인:
var feed = Channel.CreateBounded<Trade>(5000);
_ = Task.Run(async () =>
{
await foreach (var trade in source.StreamAsync())
await feed.Writer.WriteAsync(trade);
});
SignalR + SSE 팬아웃:
app.MapHub<TradeHub>("/ws");
app.MapGet("/sse", async res =>
{
res.Headers.ContentType = "text/event-stream";
await foreach (var t in feed.Reader.ReadAllAsync())
await res.WriteAsync($"data:{JsonSerializer.Serialize(t)}\n\n");
});
명령 엔드포인트 (멱등성 포함):
app.MapPost("/trade", async (TradeCommand cmd, ITradeService svc) =>
{
await svc.HandleAsync(cmd);
return Results.Ok();
});
7.3 UI (TypeScript)
네트워크 계층:
const trades$ = eventSource$('/sse').pipe(
retry({ delay: 2000 }),
map(e => JSON.parse(e.data)),
shareReplay(1)
);
const trades = toSignal(trades$, { initialValue: [] });
거래 제출:
const tradeCmd$ = new Subject<Trade>();
tradeCmd$
.pipe(concatMap(cmd => ajax.post('/trade', cmd)))
.subscribe();
속도 제한 (UI 페인트 3Hz):
trades$
.pipe(auditTime(300))
.subscribe(renderOrderBook);
7.4 Elmish (F#) 모듈
type Msg =
| NewPrice of float
| SubmitTrade of Trade
| Confirmed of Trade
| Error of exn
let update msg model =
match msg with
| NewPrice p -> { model with Price = p }, Cmd.none
| SubmitTrade t -> { model with Pending = true }, Cmd.OfAsync.either api.Submit t Confirmed Error
| Confirmed t -> { model with Pending = false; Trades = t :: model.Trades }, Cmd.none
| Error e -> { model with Error = Some e.Message }, Cmd.none
7.5 교차 스택 백프레셔
- 브라우저:
auditTime(300)- DOM 업데이트 평활 - 백엔드:
BoundedChannels- 과부하 방지 - Elmish: 명령 스로틀 - 동시 제출 제한
→ 엔드투엔드 흐름 제어 체인
7.6 장애 복구 드릴
- 서버 재시작: SSE Last-Event-ID 헤더 → 재개 가능 피드
- 중복 명령: 서버 중복 제거 테이블 → 반복 거부
- 클라이언트 재연결: RxJS switchMap → 자동 스트림 재초기화
→ 모든 복구 선언적, 측정 가능
7.7 배포 및 인프라
Docker:
FROM mcr.microsoft.com/dotnet/aspnet:8.0
COPY ./out /app
ENTRYPOINT ["dotnet", "TradeFeed.dll"]
환경 토글:
{ "Transport": "SSE" }
CI: 백프레셔 및 재연결 검증 통합 테스트
→ 결과: 구성 가능, 반응적, 수학적으로 검증 가능한 고부하 실시간 시스템
8. 프로덕션 체크리스트: 성능, 누수, 운영
8.1 RxJS 인체공학 및 성능
연산자 선택 매트릭스:
| 시나리오 | 선호 연산자 | 이유 |
|---|---|---|
| 검색 제안 | switchMap | 오래된 요청 취소, 최신 승리 |
| 순차 명령 | concatMap | 순서 보존, 겹침 없음 |
| 백그라운드 작업 | mergeMap | 최대 동시성, 논블로킹 |
| 탭투페이 | exhaustMap | 활성 중 중복 무시 |
속도 제한 선택:
throttleTime(200); // 드래그/스크롤: 첫 값
debounceTime(300); // 입력 자동완성: 마지막 값
auditTime(300); // UI 페인트: 마지막 후 발송
sampleTime(200); // 메트릭 대시보드: 주기적 스냅샷
중복 계산 방지:
price$.pipe(distinctUntilChanged()).subscribe(renderPrice);
8.2 메모리 누수 방지
통합 정리:
const destroy$ = new Subject<void>();
feed$.pipe(takeUntil(destroy$)).subscribe(render);
destroy$.next();
destroy$.complete();
Angular 자동 정리:
constructor(destroyRef: DestroyRef) {
this.feed$.pipe(takeUntilDestroyed(destroyRef)).subscribe(render);
}
템플릿에서 async 파이프 또는 Signal (수동 구독 회피):
<div *ngIf="price$ | async as price">Price: {{ price }}</div>
Signal 메모리 가드: Signal을 동적으로 effect 내에서 생성하지 말 것 (지수 의존성 그래프)
8.3 Signal 베스트 프랙티스
역할 분리:
- Signal: 로컬 상태 (폼 필드, 계산 합계)
- Observable: 비동기 I/O (WebSocket, SSE, 타이머)
- 모서리에서만 브리지 (toSignal, toObservable)
파생 Signal과 메모이제이션:
const avgPrice = computed(() =>
trades().length
? trades().reduce((s, t) => s + t.price, 0) / trades().length
: 0
);
제어된 Effect (데이터 페칭 X):
effect(() => console.log('New trade count:', trades().length));
// 모든 부작용 RxJS로 위임
8.4 Elmish 강화
불변성 강제:
// ❌ model.Count <- model.Count + 1
// ✓ { model with Count = model.Count + 1 }
철저한 패턴 매칭 (컴파일러 경고):
match msg with
| Increment -> ...
| Decrement -> ...
| _ -> model, Cmd.none
부작용 격리 (Cmd/Sub만 실행):
// 로직 → 헬퍼 모듈, update 내부 아님
let update msg model = ...
Update 테스트 (스냅샷):
let model, cmd = update (Receive "Hello") initialModel
Assert.Equal(["Hello"], model.Messages)
8.5 .NET 백엔드 처리량/지연
유계 Channel 우선:
var channel = Channel.CreateBounded<Event>(new BoundedChannelOptions(1000)
{
FullMode = BoundedChannelFullMode.Wait
});
포화도 측정:
metrics.Gauge("queue_depth", () => channel.Reader.Count);
Dataflow 사용 시점:
- 병렬 제어 필요
- 팬인/팬아웃 파이프라인
- 배칭 및 스로틀 내장
8.6 멱등성 및 재시도
명령 멱등성 키:
public record PlaceOrder(Guid Id, string Symbol, decimal Qty);
if (await db.Orders.AnyAsync(o => o.Id == cmd.Id)) return;
결정론적 핸들러 (재플레이 가능):
// 시스템 시간/난수 X → 모든 입력 명시적
Polly 재시도 + 차단기:
var policy = Policy
.Handle<Exception>()
.WaitAndRetryAsync(3, i => TimeSpan.FromSeconds(Math.Pow(2, i)))
.WrapAsync(Policy.CircuitBreakerAsync(5, TimeSpan.FromSeconds(30)));
8.7 관찰성 및 SLO
핵심 메트릭:
- 이벤트 지연 (enqueue → emit)
- 큐 깊이
- 드롭 vs 처리
- 재구독 횟수
추적 및 상관:
meter.CreateObservableGauge("feed_lag_ms", () => MeasureLag());
// 각 요청: X-Correlation-Id 전파
// 백엔드: ILogger.BeginScope(id) → 엔드투엔드 타이밍
SLO 정의:
- 피드 업데이트 99번째 백분위수 < 250ms
- 99.9% 성공 재연결 < 5s
- 무제한 큐 0개
8.8 보안 및 복원력
WebSocket/SSE 인증 (JWT):
options.Events = new JwtBearerEvents
{
OnMessageReceived = ctx =>
{
var accessToken = ctx.Request.Query["access_token"];
if (!string.IsNullOrEmpty(accessToken))
ctx.Token = accessToken;
return Task.CompletedTask;
}
};
테넌트별 쿼터:
if (client.EventCount > 1000) await client.DisconnectAsync("Rate limit exceeded");
서버 측 필터링 (다중 테넌트 전송 피함):
필터 + 변환 → 서버 측 완료 (클라이언트 측 X)
복구 드릴: 네트워크 드롭, DB 스로틀, 재연결 시뮬레이션 → 백프레셔 + 재시도 결정론적 확인
인프라 위생: 롤링 재시작, 연결 드레인, 유휴 타임아웃, 버전 태깅 (전면 호환)
실용적 팁 및 주의사항
팁
- Observable은 “시간이 있는 것”, Signal은 “현재값”: 시간 관련 작업은 Observable, 현재 상태 렌더링은 Signal로 엄격히 분리
- toSignal 첫 가입자는 초기값 필수: 콜드 옵저버블은
initialValue없으면undefined렌더링 위험 - switchMap은 자동 취소 제공: 검색/필터링에서 이전 요청 자동 정리 → 메모리 누수 예방
- BackpressureStrategy는 도메인마다 다름: 금융(무손실), 센서(손실 허용) - 용도에 맞춰 선택
- Cmd는 테스트 가능 명령어 목록: Elmish 순수성을 활용해 메시지 흐름 시뮬레이션 → 버그 조기 적발
주의사항
- 중첩 구독 금지:
subscribe(x => subscribe(...))형태는 메모리 누수,switchMap/mergeMap사용 - Async fetch inside effect는 위험: 여러 fetch 동시 실행 → 오래된 응답이 최신 덮음. RxJS
switchMap위임 - Signal에서 신규 Signal 생성 금지: 동적 signal 생성 → 지수 의존성 그래프 → 성능 악화
- 취소 없는 retry는 무한 루프 위험:
takeUntil(stop$)또는 시간 제한 설정 필수 - 멱등성 키 없는 재시도는 중복 처리: 매 명령마다 고유 ID 필수, 백엔드 중복 제거 구현
- SSE Last-Event-ID 미사용 시 피드 손실: 유휴 후 재연결 시 과거 메시지 손실 → 헤더 구현 필수
- 관찰성 없는 실시간 시스템은 암흑: 큐 깊이, 드롭율, 지연 메트릭 필수 → SLO 모니터링
학습 리소스 및 참고 자료
공식 문서
- RxJS: https://rxjs.dev - 연산자 레퍼런스, 마블 다이어그램
- Angular Signals: https://angular.io/guide/signals - toSignal, toObservable 상호운용
- SolidJS: https://docs.solidjs.com - 세밀한 반응성 아키텍처
- Preact Signals: https://preactjs.com/guide/signals - React 호환 구현
Elmish 및 F#
- Elmish 공식: https://elmish.github.io - MVU 패턴, Cmd/Sub
- SAFE Stack: https://safe-stack.github.io - Suave + Fable + Elmish 통합
- Fable: https://fable.io - F#-to-JavaScript 컴파일러
.NET 실시간
- SignalR 문서: https://learn.microsoft.com/en-us/aspnet/core/signalr
- System.Threading.Channels: https://learn.microsoft.com/en-us/dotnet/api/system.threading.channels
- TPL Dataflow: https://learn.microsoft.com/en-us/dotnet/standard/parallel-programming/dataflow-task-parallel-library
고급 주제
- Azure Web PubSub: https://learn.microsoft.com/en-us/azure/azure-web-pubsub
- Polly (재시도/차단기): https://github.com/App-vNext/Polly
- Thoth.Json (F#-TS 직렬화): https://thoth-org.github.io/Thoth.Json
마블 다이어그램 시각화
- RxMarbles: https://rxmarbles.com - 연산자 시뮬레이션