rxnet 리액티브프로그래밍 #CancellationToken #TakeUntil #닷넷
Rx.NET 6.1의 새로운 TakeUntil(CancellationToken) 연산자 소개
개요
2025년 10월에 출시된 Rx.NET 버전 6.1은 호환성을 깨지 않으면서 새로운 기능들을 추가한 마이너 버전 업데이트입니다. 이번 릴리스에서는 세 가지 주요 기능이 추가되었으며, 그 중 TakeUntil(CancellationToken) 연산자가 핵심입니다.
Rx.NET 6.1의 새로운 기능
추가된 기능 목록
-
DisposedWith - 별도 영상에서 다룰 예정
-
TakeUntil(CancellationToken) - 본 문서의 주제
-
ResetExceptionDispatchState
향후 계획
장기간 지속되어 온 패키징 문제 해결은 v6.1에 포함되지 않았으며, v7.0에서 제공될 예정입니다.
커뮤니티 기여
개발 과정
이 기능은 커뮤니티의 협력을 통해 개발되었습니다:
-
Nils Aufschläger: 초기 제안 및 구현 작업 수행
-
Daniel Weber: 토론 중 완전히 다른 접근 방식 제안
초기 제안은 다른 형태의 기능이었으나, 논의 과정에서 설계가 변경되었습니다.
TakeUntil(CancellationToken) 연산자 이해하기
기존 TakeUntil 연산자
Rx에는 오랫동안 다양한 형태의 TakeUntil 연산자가 존재했습니다. 기본 개념은 특정 기준이 충족될 때까지 소스에서 모든 요소를 가져오는 것입니다.
새로운 기능의 목적
새로운 기능은 CancellationToken이 신호를 받을 때 요소 수신을 중단하고 소스를 완료할 수 있게 합니다.
주요 목적:
-
무한 소스를 제어 가능하게 만들기
-
자체적으로 완료되지 않는 소스 처리
무한 소스의 예시
-
Interval 타이머: 자체적으로 완료되지 않음
-
.NET 이벤트 소스: 일반 .NET 이벤트를 Rx 소스로 변환하면 자체적으로 완료되지 않음
이러한 이벤트 소스를 완료할 수 있게 하는 것이 이 기능의 원래 동기였습니다.
설계 변경 과정
초기 설계
원래 설계는 특수화된 disposable 구현을 만드는 것이었습니다.
설계 변경 이유
초기 접근 방식의 문제점:
-
disposable과 observable을 동시에 구현하는 다른 사례가 없음
-
두 가지를 모두 구현하는 방식이 Rx.NET의 기존 타입 예제와 잘 맞지 않음
Daniel Weber의 제안
TakeUntil의 새로운 오버로드를 도입하는 방식:
-
동일한 효과 달성 가능
-
동일한 문제 해결
-
더 유연함: CancellationToken을 생성하는 모든 것과 함께 작동
CancellationDisposable 활용
Rx.NET에는 이미 CancellationDisposable이 존재:
-
dispose 가능한 객체
-
dispose 시 CancellationToken 설정
-
이것이 채택된 해결 방식
실제 사용 예제
기본 Interval 예제
Observable.Interval(TimeSpan.FromSeconds(1))
.Subscribe(x => Console.WriteLine(x));
이 코드의 특징:
-
Rx에 오랫동안 내장된 Interval 연산자 사용
-
매초마다 이벤트 발생
-
0, 1, 2, 3… 순서로 숫자를 무한정 생성
-
절대 중단되지 않음: 구독 취소하거나 프로세스가 종료될 때까지 계속 실행
구독 취소의 한계
var subscription = Observable.Interval(TimeSpan.FromSeconds(1))
.Subscribe(x => Console.WriteLine(x));
subscription.Dispose(); // 구독 취소
구독 취소의 문제점:
-
실행을 중단시킬 수 있음
-
OnCompleted 이벤트가 전달되지 않음
-
Subscribe() 호출 중에 볼 수는 있지만 보장되지 않음
TakeUntil(CancellationToken) 사용하기
완료 이벤트를 확실히 보려면 소스 자체를 종료해야 하며, 이것이 새로운 연산자의 핵심입니다.
구현 코드
var cancellationDisposable = new CancellationDisposable();
Observable.Interval(TimeSpan.FromSeconds(1))
.TakeUntil(cancellationDisposable.Token)
.Subscribe(
x => Console.WriteLine(x),
() => Console.WriteLine("Completed")
);
Console.ReadLine();
cancellationDisposable.Dispose();
Console.WriteLine("Disposed");
Console.ReadLine();
코드 설명
-
CancellationDisposable 생성: Rx에 오랫동안 내장된 타입으로, CancellationToken 제공
-
TakeUntil 적용: CancellationToken이 설정될 때까지 항목 수신
-
Dispose 호출: 첫 번째 엔터 키 입력 시 CancellationDisposable을 dispose
-
Observable 종료: dispose로 인해 CancellationToken이 취소 상태로 전환
-
TakeUntil 반응: 연산자가 "완료되었다"고 판단
-
완료 이벤트 발생: 소스가 제대로 완료됨
실행 결과
-
숫자가 계속 생성됨
-
엔터 키를 누르면 dispose 발생
-
소스가 즉시 제대로 완료됨
-
구독이 소스 측에서 종료됨
-
구독 취소가 아닌 소스 자체가 완료됨
핵심 개념 정리
TakeUntil의 기능
이 새로운 TakeUntil 연산자 오버로드는 무한 소스를 원하는 시점에 완료시킬 수 있게 합니다.
사용 방법
-
System.Reactive v6.1 설치
-
CancellationToken 획득
- Rx의 CancellationDisposable 사용 가능
- 새로운 TakeUntil 오버로드에 토큰 전달
구독 취소 vs 소스 완료
구독 취소 (Unsubscribe):
-
관찰자가 더 이상 이벤트를 받지 않음
-
OnCompleted 이벤트 보장 안 됨
-
소스는 계속 실행될 수 있음
소스 완료 (TakeUntil):
-
소스 자체가 종료됨
-
OnCompleted 이벤트 보장됨
-
정상적인 종료 흐름 유지
참고 자료
공식 문서
https://introtorx.com 사이트에서 이 연산자에 대한 상세한 문서를 확인할 수 있습니다.
관련 타입
-
CancellationDisposable: Rx.NET에 내장된 타입
-
CancellationToken: .NET 표준 취소 메커니즘
-
IDisposable: 구독 관리 인터페이스
실용적 적용 시나리오
이벤트 기반 소스 관리
.NET 이벤트를 Rx 소스로 변환할 때:
-
이벤트는 자연적으로 완료되지 않음
-
TakeUntil(CancellationToken)으로 명시적 완료 제어
-
리소스 정리 보장
타이머 기반 작업
Interval이나 Timer를 사용할 때:
-
무한 실행 방지
-
애플리케이션 종료 시 정상 종료
-
메모리 누수 방지
장기 실행 스트림
장기 실행되는 Observable 스트림:
-
애플리케이션 수명 주기와 연동
-
사용자 입력에 따른 종료
-
비동기 작업 조정