2025. 04

RxJS, 비동기 함수형 프로그래밍의 매력

시간을 세련되게 다루기

  • JS
  • 비동기
  • RxJS

함수형 프로그래밍의 매력

자바스크립트를 공부하면서 으레 쓰게 되는 배열 메서드들이 있습니다. 그 중 slice, map, filter, reduce를 통해 간단히 RxJS에 대한 감을 잡아보도록 하겠습니다.

Typescript
const nums = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

nums 중 5 이하인 숫자를 골라 모두 더해야 하는 문제 상황을 살펴봅시다. 아래와 같이 풀 수 있을 텐데요,

Typescript
function solution(nums: number[]):number {
	let sum:number = 0
	
	for (let i=0; i<nums.length; i++) {
		if (nums[i] <= 5) sum += nums[i]
	}
	
	return sum
}

이걸 조금 더 함수형 느낌으로 풀어보면 다음과 같이도 풀 수 있습니다.

Typescript
function solution(nums: number[]):number {
	let sum: number = 0
	nums.forEach(n => sum += n)
	return sum
}

function solution(nums: number[]):number {
	return nums.reduce((prev, curr) => prev + curr, 0)
}

좀더 복잡한 문제를 살펴보겠습니다.

Typescript
const points = [[1,2], [-1,5], [5,5], [3,3], [5,-7], [10,16], [6,8]]

이 점들 중 1사분면에 있는 점을 뒤에서부터 3개만 골라 그 거리의 합을 구해야 하는 상황을 살펴봅시다.

Typescript
function solution(points: number[][]):number {
	let count:number = 3
	let sum:number = 0
	
	for (let i = points.length - 1; i >= 0; i--) {
		const [X,Y] = points[i]
		if (X > 0 && Y > 0 && count > 0) {
			count--
			sum += Math.sqrt(X*X + Y*Y)
	}
	
	return sum
}

이렇게도 풀 수 있지만,

Typescript
function solution(points: number[][]):number {
	
	let count = 0
	const result = points
			.filter(([X,Y]) => (X > 0 && Y > 0)) //1사분면에 있는 점을 골라
			.slice(-3) //뒤에서 3개를 잘라낸 뒤
			.map(([X,Y]) => Math.sqrt(X*X + Y*Y)) //거리를 구해서
			.reduce((prev,curr) => prev+curr, 0) //모두 더한다
						
	return result
}

이렇게 풀 수도 있습니다. 한 줄에 로직 하나가 작성된 모습을 볼 수 있는데요, 이렇게 함수형 프로그래밍의 매력은 각 함수가 무슨 동작(What)을 하는지에 대해 직관적으로 알기 쉽다는 것이라고 생각합니다. 이런 특징을 “선언적이다” 라고 합니다.

다만 모든 함수형 프로그래밍이 선언적이라고 할 수는 없습니다. 선언적 프로그래밍의 반대 개념은 “명령형 프로그래밍”이고, 이건 상대적인 개념이기 때문입니다. 함수형 체인을 부분적으로 쓴다고 해도 What보다 How가 서술되어 있고, 구체적인 알고리즘이 보인다면 그건 명령형에 가까운 코드입니다.

RxJS?

문제의식

그러면 함수형 프로그래밍의 매력을 살펴봤으니, 선언적 프로그래밍으로 바꾸고 싶은 “명령형 프로그래밍”이 부득이하게 많이 쓰이는 곳을 살펴봅시다. 옛날에 진행했던 라이브 경매 프로젝트에서 다른 동료가 작성한 웹소켓 코드를 가져왔어요.

Typescript

...
  io.on('connection', socket => {

		//경매가 시작되면
    socket.on('start', async ({ roomId }) => {
		...
			//db에서 진행중인 경매 정보를 가져와
      const redisCli = app.get('redisCli');
      const itemId = await getRedis(redisCli, `auction:${roomId}:onLiveItem:itemId`);
      const price = await getRedis(redisCli, `auction:${roomId}:onLiveItem:startPrice`);
      //클라이언트에 전송하고 
        io.to(roomId).emit('start', { itemId: Number(itemId), price: Number(price), askingPrice: Number(price) });
      //타이머를 시작합니다.
        startCountdownTimer(app, roomId);
    });
  });
...

소켓에서 이벤트가 들어오면 어떤 동작을 해야 하는지 구체적으로 명시하고 있습니다. 이런 패턴을 많이 본 적이 있을 겁니다. 프론트엔드에서 쓰이는 이벤트 핸들러라든지, 비동기적인 이벤트를 다룰 때 이런 식으로 핸들러를 등록하게 됩니다.

이 코드를 선언적으로 바꾼다는 건 어떻게 한다는 걸까요? 아까 1사분면에 있는 점의 거리를 구했던 문제처럼, 이벤트들이 비동기적이지만 배열로 존재한다고 생각해 봅시다.

Typescript
const events = [(connect), (enterRoom), (start), (...)]

const 경매_시작 = (roomId) => events
				.map((roomId) => getRedis(...))
				.forEach(txt => io.to(roomId).emit('start', {}))
				.then(() => startCountdownTimer(app, roomId))

이벤트를 가지고 이런 식의 코드를 작성할 수는 없는 것일까요? 이런 문제의식을 가지고 탄생한 라이브러리가 바로 RxJS입니다.

RxJS, Observable

RxJS

https://rxjs.dev/guide/overview

RxJS is a library for composing asynchronous and event-based programs by using observable sequences. It provides one core type, the Observable, satellite types (Observer, Schedulers, Subjects) and operators inspired by Array methods (mapfilterreduceevery, etc) to allow handling asynchronous events as collections.

요약하면 Observable이라는 타입을 가지고, 이벤트를 비롯한 비동기 프로그래밍을 배열 다루듯 할 수 있도록 해 주는 라이브러리라는 뜻인데요, 비동기의 뜻이 모호한데 저는 “타이밍이 정해지지 않은 것들” 정도로 이해하고 있습니다.

즉 돌려 말하면 Observable은 배열과 Promise의 특성을 모두 가지고 있다는 뜻이 됩니다.

예시를 통해 살펴보겠습니다. 매 1초마다 데이터를 요청해 갱신하는 코드(인터벌 폴링)를 구현한다고 생각해 봅시다. setInterval과 Promise같은 것들을 통해 잘 구현할 수 있는데요, 그런데 여기에 실패 시 재시도, exponential backup같은 추가 요구사항들이 들어가면 코드가 지저분해질 생각에 머리가 아파집니다. 그런데 RxJS를 사용하면 아래처럼 쉽게 구현할 수 있었어요.

Typescript
const request$ = interval(1000) //1초에 한 번씩 시도하는 Observable 생성
	.pipe( 
		switchMap(() => getData()), //fetch(...) 를 통해 데이터를 가져오는 함수
		timeout(3000), //3초간 응답 없으면 에러
		catchError(err => console.error(err)), //에러 처리
		retry({count: 5, delay: (_, tries) => timer(500 * 2 ** tries)})
		//5번까지 재시도, 0.5초부터 시작해 시행횟수당 2배씩 간격 늘림
	)

또 다른 예시를 살펴보겠습니다. 트리플 클릭(탭)인데요, 이게 생각보다 구현이 까다롭습니다. 클릭과 더블클릭은 이벤트를 제공하지만 트리플 클릭은 진짜로 이벤트 자체를 구현해야 하기 때문인데, setTimeOut과 더블클릭, 클릭을 조합해서 트리플 클릭을 구현해 보려고 하면 진짜 좀 막막합니다. 그런데 RxJS를 사용하면…

Typescript
const tripleTab$ = fromEvent(window, "click")
	.pipe(
	  bufferTime(300), //0.3초 동안 버퍼에 이벤트를 모읍니다
	  filter(clicks => clicks.length === 3), //클릭이 3개 모이면 TRUE
	).subscribe(event => { //do something })

헉! 처음 보는데도 생각보다 덜 어지럽죠(?)

RxJS and React

Observable은 Promise과 유사한 성질를 가지기 때문에, subscribe 모델이 Promise memoization과도 개념적으로 유사한 측면이 있지 않을까 하는 생각을 했습니다. 이게 무슨 말인가 하면 Promise를 작은 캐시처럼 사용할 수 있듯이, RxJS를 사용해서 비슷한 일을 간단하게 할 수 있을 것 같다는 발상입니다. 그게 가능하다면 전역 상태로 쓸 수 있지 않을까 하는 (거의 비약 수준의) 상상의 나래를 펼쳐 봤습니다.

그래서 검색을 좀 해 보았는데, BehaviorSubject를 React 상태와 연동시키는 것으로 간단하게 전역 상태화시킬 수 있었습니다.

Typescript
export const state$ = new BehaviorSubject([])

export const setState$ = (data) => {
  const current = state$.getValue()
  state$.next([...current, ...data])
}
Typescript
export function useObservable(observable) {
  const [state, setState] = useState()
  
  useEffect(() => {
    const subscription = observable.subscribe(setState)
    return () => subscription.unsubscribe()
  }, [observable])
  
  return state
}
Typescript
const state = useObservable(state$)

헉! 이걸로 무언가 해볼 수 있을지도?

마무리

어떻게 좀… 매력을 느끼셨나요? 저는 SSE를 서버와 클라이언트 양 쪽에서 다룰 일이 있어서 공부해서 적용해 보았는데, 숙련되었다고 말할 수 있는 수준까지는 아니지만 꽤나 유용하게 프로젝트에 적용했습니다. Next.js를 사용하느라 서버와 클라이언트에서 같은 자료를 주고받는 것을 모두 RxJS로 해볼 수 있었는데 참 재미있는 경험이었다고 생각합니다. 공부하기에 참 어렵고 난해하고, 아직도 많이 어렵지만 학습할 가치는 충분히 있다고 생각합니다.