[RTMP] select, IO 멀티플렉싱

4 minute read

IO 멀티플렉싱 기반 서버

멀티플렉싱은 ‘하나의 통신채널을 통해서 둘 이상의 데이터를 전송하는데 사용되는 기술’입니다. 물리적 장치의 효율성을 높이기 위해서 최소한의 물리적 요소만 사용해서 최대한의 데이터를 전달하기 위해 사용되는 기술입니다. 여러 명이 동시에 말할 때 이를 구분할 수 있는 방법에는 시분할과 주파수 분할이 있습니다.

시분할, 주파수 분할에 상관없이 하나의 프로세서가 여러 명의 클라이언트의 요청에 응답할 수 있다는 점이 중요합니다. 대신 클라이언트의 요청에 대응하기 전에 클라이언트가 요청을 보냈는지(데이터가 수신된 소켓이 있는지) 확인하는 작업이 필요합니다.

select() 함수의 이해와 서버의 구현

select 함수는 한 곳에 여러 개의 파일 디스크립터를 모아서 동시에 이들을 관찰합니다.

  1. 수신한 데이터를 가지고 있는 소켓이 있는가?
  2. 블로킹되지 않고 데이터의 전송이 가능한 소켓이 무엇인가?
  3. 예외상황이 발생한 소켓은 무엇인가?

여기서 말하는 소켓은 서버가 accept()이후 리턴된 클라이언트와 연결된 소켓을 의미합니다. 이 소켓은 위에서 말한 3가지 이벤트에 대해 관찰을 합니다. 소켓들을 select가 관찰하는 순서는 다음과 같습니다.

단계 목적 코드
1-1 파일 디스크립터의 설정  
1-2 검사의 범위 지정  
1-3 타임아웃의 설정  
2 select 함수의 호출  
3 호출결과 확인  

파일 디스크립터의 설정

select는 여러 개의 파일 디스크립터(fd)를 관찰할 수 있습니다. fd의 관찰은 소켓을 관찰하는 것으로 해석할 수 있습니다. 여러 개의 fd를 동시에 관찰할 때는 관찰항목의 수신/전송/예외에 따라서 구분해서 모아야합니다. fd를 모을 때 사용되는 것이 fd_set형 변수입니다. fd_set은 0과 1로 일어진 비트열이고 다음과 같은 형식입니다.

  • 기본 값 : 0 0 0 0
  • 관찰중인 fd : fd0 fd1 fd2 fd3
  • 현재 값 : 0 1 0 1
  • 관심 이벤트 : 수신 / 전송 / 예외 중 하나

위의 fd_set형가 수신 이벤트를 관찰하는 fd_set이라면 fd1과 fd3에 대해서 수신 이벤트가 있을 때를 알려주는 역할을 합니다. fd_set형이 비트형으로 데이터를 저장하기 때문에 관심 fd를 등록하기 위한 메서드를 제공합니다.

FD_ZERO(fd_set * fdset); : 인자로 전달된 주소의 fd_set형 변수의 모든 비트를 0으로 초기화

FD_SET(int fd, fd_set * fdset);    : 매개변수 fdset으로 전달된 주소의 변수에 매개변수 fd 전달된 파일 디스크립터 정보 등록 
FD_CLR(int fd, fd_set * fdset);    :  매개변수 fdset으로 전달된 주소의 변수에 매개변수 fd 전달된 파일 디스크립터 정보 삭세
FD_ISSET(int fd, fd_set * fdset);  :  매개변수 fdset으로 전달된 주소의 변수에 매개변수 fd 전달된 파일 디스크립터 정보가 있으면 양수 반환 

fd_set set;
FD_ZERO(&set);      : 0 0 0 0
FD_SET(1, &set);    : 0 1 0 0 
FD_SET(2, &set);    : 0 1 1 0
FD_CLR(2,  &set);   : 0 1 0 0 

검사의 범위 지정과 타임아웃의 설정

#include <sys/select.h>
#include <sys/time.h>

/**
 * @param maxfd 검사 대상이 되는 파일 디스크립터의 수
 * @param readset fd_set형 변수에 수신된 데이터의 존재여부에 관심 있는 파일 디스크립터 정보를 모두 등록해서 그 변수의 주소 값을 전달
 * @param writeset fd_set형 변수에 블로킹없는 데이터 전송의 가능 여부에 관심 있는 파일 디스크립터 정보를 모두 등록해서 그 변수의 주소 값을 전달
 * @param fd_set형 fd_set형 변수에 예외상황의 발생여부에 관심 있는 파일 디스크립터 정보를 모두 등록해서 그 변수의 주소 값을 전달
 * @param timeout select 호출 이후에 무한정 블로킹에 빠지지 않도록 타임아웃을 설정하기 위한 인자 전달
 * @return 오류발생시 -1. 타임아웃 반환시 0. otherwise 변화가 발생한 fd의 수(양수)
 * 
*/

int select(
	int maxfd, fd_set * readset, fd_Set *writeset, fd_set * exceptset, const struct timeval * timeout);
)


/**
 * select 함수는 관찰 중인 파일 디스크립터에 변화가 생겼을 때 반환을 한다. 하지만 변화가 생기지 않으면 무한저 블로킹 상태에 머문다.
 * 초단위 정보와 마이크로 초 단위 정보를 지정하고 이 시간이 지나면 select()가 반환이 된다. 타임아웃에 의한 반환인 것은 리턴값이 0인지
 * 확인해서 알 수 있다.
 * 
 * 타임아웃을 지정하고 싶지 않을 때는 NULL을 전달한다. 
 * 
*/
struct timeval{
	long tv_sec;  //seconds
	long tv_usec; //microseconds
}

fd_set 변수에 0 1 1 1을 지정해서 select()에 전달했다고 가정해보겠습니다. 그리고 select()가 반환된 이후 리턴값은 2이고, fd_set의 값은 0 1 0 1로 변했습니다. 이 경우 fd1과 fd3에 관심있는 이벤트가 발생했음을 알 수 있습니다. 만약 같은 조건으로 계속 이벤트를 기다리고 싶다면 fd_set의 변수를 다시 0 1 1 1로 바꾸어서 select()에 전달해야 합니다. 그래서 보통 select()에 넣기 전에 fd_set값을 copy해둡니다.

$ git clone https://github.com/niklasjang/cpp-rtmp-relay
$ git checkout feature/select
fd_set reads, temps;
int result, str_len;
char buf[BUF_SIZE];
struct timeval timeout;

FD_ZERO(&reads);
FD_SET(0, &reads); // 0 is standard input(console)

/*
timeout.tv_sec=5;
timeout.tv_usec=5000;
*/

while(1)
{
	temps=reads;
	timeout.tv_sec=5;
	timeout.tv_usec=0;
	result=select(1, &temps, 0, 0, &timeout);
	if(result==-1)
	{
		puts("select() error!");
		break;
	}
	else if(result==0)
	{
		puts("Time-out!");
	}
	else 
	{
		if(FD_ISSET(0, &temps)) 
		{
			str_len=read(0, buf, BUF_SIZE);
			buf[str_len]=0;
			printf("message from console: %s", buf);
		}
	}
}

echo select server

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <arpa/inet.h>
#include <sys/socket.h>
#include <sys/time.h>
#include <sys/select.h>

#define BUF_SIZE 100
void error_handling(char *buf);

int main(int argc, char *argv[])
{
	int serv_sock, clnt_sock;
	struct sockaddr_in serv_adr, clnt_adr;
	struct timeval timeout;
	fd_set reads, cpy_reads;

	socklen_t adr_sz;
	int fd_max, str_len, fd_num, i;
	char buf[BUF_SIZE];
	if(argc!=2) {
		printf("Usage : %s <port>\n", argv[0]);
		exit(1);
	}

	serv_sock=socket(PF_INET, SOCK_STREAM, 0);
	memset(&serv_adr, 0, sizeof(serv_adr));
	serv_adr.sin_family=AF_INET;
	serv_adr.sin_addr.s_addr=htonl(INADDR_ANY);
	serv_adr.sin_port=htons(atoi(argv[1]));
	
	if(bind(serv_sock, (struct sockaddr*) &serv_adr, sizeof(serv_adr))==-1)
		error_handling("bind() error");
	if(listen(serv_sock, 5)==-1)
		error_handling("listen() error");

	FD_ZERO(&reads);
	FD_SET(serv_sock, &reads);
	fd_max=serv_sock;

	while(1)
	{
		cpy_reads=reads;
		timeout.tv_sec=5;
		timeout.tv_usec=5000;

		if((fd_num=select(fd_max+1, &cpy_reads, 0, 0, &timeout))==-1)
			break;
		
		if(fd_num==0)
			continue;

		for(i=0; i<fd_max+1; i++)
		{
			if(FD_ISSET(i, &cpy_reads))
			{
				if(i==serv_sock)     // connection request!
				{
					adr_sz=sizeof(clnt_adr);
					clnt_sock=
						accept(serv_sock, (struct sockaddr*)&clnt_adr, &adr_sz);
					FD_SET(clnt_sock, &reads);
					if(fd_max<clnt_sock)
						fd_max=clnt_sock;
					printf("connected client: %d \n", clnt_sock);
				}
				else    // read message!
				{
					str_len=read(i, buf, BUF_SIZE);
					if(str_len==0)    // close request!
					{
						FD_CLR(i, &reads);
						close(i);
						printf("closed client: %d \n", i);
					}
					else
					{
						write(i, buf, str_len);    // echo!
					}
				}
			}
		}
	}
	close(serv_sock);
	return 0;
}

void error_handling(char *buf)
{
	fputs(buf, stderr);
	fputc('\n', stderr);
	exit(1);
}

Tags:

Categories:

Updated: