[RTMP] select, IO 멀티플렉싱
IO 멀티플렉싱 기반 서버
멀티플렉싱은 ‘하나의 통신채널을 통해서 둘 이상의 데이터를 전송하는데 사용되는 기술’입니다. 물리적 장치의 효율성을 높이기 위해서 최소한의 물리적 요소만 사용해서 최대한의 데이터를 전달하기 위해 사용되는 기술입니다. 여러 명이 동시에 말할 때 이를 구분할 수 있는 방법에는 시분할과 주파수 분할이 있습니다.
시분할, 주파수 분할에 상관없이 하나의 프로세서가 여러 명의 클라이언트의 요청에 응답할 수 있다는 점이 중요합니다. 대신 클라이언트의 요청에 대응하기 전에 클라이언트가 요청을 보냈는지(데이터가 수신된 소켓이 있는지) 확인하는 작업이 필요합니다.
select() 함수의 이해와 서버의 구현
select 함수는 한 곳에 여러 개의 파일 디스크립터를 모아서 동시에 이들을 관찰합니다.
- 수신한 데이터를 가지고 있는 소켓이 있는가?
 - 블로킹되지 않고 데이터의 전송이 가능한 소켓이 무엇인가?
 - 예외상황이 발생한 소켓은 무엇인가?
 
여기서 말하는 소켓은 서버가 accept()이후 리턴된 클라이언트와 연결된 소켓을 의미합니다. 이 소켓은 위에서 말한 3가지 이벤트에 대해 관찰을 합니다. 소켓들을 select가 관찰하는 순서는 다음과 같습니다.
| 단계 | 목적 | 코드 | 
|---|---|---|
| 1-1 | 파일 디스크립터의 설정 | |
| 1-2 | 검사의 범위 지정 | |
| 1-3 | 타임아웃의 설정 | |
| 2 | select 함수의 호출 | |
| 3 | 호출결과 확인 | 
파일 디스크립터의 설정
select는 여러 개의 파일 디스크립터(fd)를 관찰할 수 있습니다. fd의 관찰은 소켓을 관찰하는 것으로 해석할 수 있습니다. 여러 개의 fd를 동시에 관찰할 때는 관찰항목의 수신/전송/예외에 따라서 구분해서 모아야합니다. fd를 모을 때 사용되는 것이 fd_set형 변수입니다. fd_set은 0과 1로 일어진 비트열이고 다음과 같은 형식입니다.
- 기본 값 : 0 0 0 0
 - 관찰중인 fd : fd0 fd1 fd2 fd3
 - 현재 값 : 0 1 0 1
 - 관심 이벤트 : 수신 / 전송 / 예외 중 하나
 
위의 fd_set형가 수신 이벤트를 관찰하는 fd_set이라면 fd1과 fd3에 대해서 수신 이벤트가 있을 때를 알려주는 역할을 합니다. fd_set형이 비트형으로 데이터를 저장하기 때문에 관심 fd를 등록하기 위한 메서드를 제공합니다.
FD_ZERO(fd_set * fdset); : 인자로 전달된 주소의 fd_set형 변수의 모든 비트를 0으로 초기화
FD_SET(int fd, fd_set * fdset);    : 매개변수 fdset으로 전달된 주소의 변수에 매개변수 fd로 전달된 파일 디스크립터 정보 등록 
FD_CLR(int fd, fd_set * fdset);    :  매개변수 fdset으로 전달된 주소의 변수에 매개변수 fd로 전달된 파일 디스크립터 정보 삭세
FD_ISSET(int fd, fd_set * fdset);  :  매개변수 fdset으로 전달된 주소의 변수에 매개변수 fd로 전달된 파일 디스크립터 정보가 있으면 양수 반환 
fd_set set;
FD_ZERO(&set);      : 0 0 0 0
FD_SET(1, &set);    : 0 1 0 0 
FD_SET(2, &set);    : 0 1 1 0
FD_CLR(2,  &set);   : 0 1 0 0 
검사의 범위 지정과 타임아웃의 설정
#include <sys/select.h>
#include <sys/time.h>
/**
 * @param maxfd 검사 대상이 되는 파일 디스크립터의 수
 * @param readset fd_set형 변수에 수신된 데이터의 존재여부에 관심 있는 파일 디스크립터 정보를 모두 등록해서 그 변수의 주소 값을 전달
 * @param writeset fd_set형 변수에 블로킹없는 데이터 전송의 가능 여부에 관심 있는 파일 디스크립터 정보를 모두 등록해서 그 변수의 주소 값을 전달
 * @param fd_set형 fd_set형 변수에 예외상황의 발생여부에 관심 있는 파일 디스크립터 정보를 모두 등록해서 그 변수의 주소 값을 전달
 * @param timeout select 호출 이후에 무한정 블로킹에 빠지지 않도록 타임아웃을 설정하기 위한 인자 전달
 * @return 오류발생시 -1. 타임아웃 반환시 0. otherwise 변화가 발생한 fd의 수(양수)
 * 
*/
int select(
	int maxfd, fd_set * readset, fd_Set *writeset, fd_set * exceptset, const struct timeval * timeout);
)
/**
 * select 함수는 관찰 중인 파일 디스크립터에 변화가 생겼을 때 반환을 한다. 하지만 변화가 생기지 않으면 무한저 블로킹 상태에 머문다.
 * 초단위 정보와 마이크로 초 단위 정보를 지정하고 이 시간이 지나면 select()가 반환이 된다. 타임아웃에 의한 반환인 것은 리턴값이 0인지
 * 확인해서 알 수 있다.
 * 
 * 타임아웃을 지정하고 싶지 않을 때는 NULL을 전달한다. 
 * 
*/
struct timeval{
	long tv_sec;  //seconds
	long tv_usec; //microseconds
}
fd_set 변수에 0 1 1 1을 지정해서 select()에 전달했다고 가정해보겠습니다. 그리고 select()가 반환된 이후 리턴값은 2이고, fd_set의 값은 0 1 0 1로 변했습니다. 이 경우 fd1과 fd3에 관심있는 이벤트가 발생했음을 알 수 있습니다. 만약 같은 조건으로 계속 이벤트를 기다리고 싶다면 fd_set의 변수를 다시 0 1 1 1로 바꾸어서 select()에 전달해야 합니다. 그래서 보통 select()에 넣기 전에 fd_set값을 copy해둡니다.
$ git clone https://github.com/niklasjang/cpp-rtmp-relay
$ git checkout feature/select
fd_set reads, temps;
int result, str_len;
char buf[BUF_SIZE];
struct timeval timeout;
FD_ZERO(&reads);
FD_SET(0, &reads); // 0 is standard input(console)
/*
timeout.tv_sec=5;
timeout.tv_usec=5000;
*/
while(1)
{
	temps=reads;
	timeout.tv_sec=5;
	timeout.tv_usec=0;
	result=select(1, &temps, 0, 0, &timeout);
	if(result==-1)
	{
		puts("select() error!");
		break;
	}
	else if(result==0)
	{
		puts("Time-out!");
	}
	else 
	{
		if(FD_ISSET(0, &temps)) 
		{
			str_len=read(0, buf, BUF_SIZE);
			buf[str_len]=0;
			printf("message from console: %s", buf);
		}
	}
}
echo select server
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <arpa/inet.h>
#include <sys/socket.h>
#include <sys/time.h>
#include <sys/select.h>
#define BUF_SIZE 100
void error_handling(char *buf);
int main(int argc, char *argv[])
{
	int serv_sock, clnt_sock;
	struct sockaddr_in serv_adr, clnt_adr;
	struct timeval timeout;
	fd_set reads, cpy_reads;
	socklen_t adr_sz;
	int fd_max, str_len, fd_num, i;
	char buf[BUF_SIZE];
	if(argc!=2) {
		printf("Usage : %s <port>\n", argv[0]);
		exit(1);
	}
	serv_sock=socket(PF_INET, SOCK_STREAM, 0);
	memset(&serv_adr, 0, sizeof(serv_adr));
	serv_adr.sin_family=AF_INET;
	serv_adr.sin_addr.s_addr=htonl(INADDR_ANY);
	serv_adr.sin_port=htons(atoi(argv[1]));
	
	if(bind(serv_sock, (struct sockaddr*) &serv_adr, sizeof(serv_adr))==-1)
		error_handling("bind() error");
	if(listen(serv_sock, 5)==-1)
		error_handling("listen() error");
	FD_ZERO(&reads);
	FD_SET(serv_sock, &reads);
	fd_max=serv_sock;
	while(1)
	{
		cpy_reads=reads;
		timeout.tv_sec=5;
		timeout.tv_usec=5000;
		if((fd_num=select(fd_max+1, &cpy_reads, 0, 0, &timeout))==-1)
			break;
		
		if(fd_num==0)
			continue;
		for(i=0; i<fd_max+1; i++)
		{
			if(FD_ISSET(i, &cpy_reads))
			{
				if(i==serv_sock)     // connection request!
				{
					adr_sz=sizeof(clnt_adr);
					clnt_sock=
						accept(serv_sock, (struct sockaddr*)&clnt_adr, &adr_sz);
					FD_SET(clnt_sock, &reads);
					if(fd_max<clnt_sock)
						fd_max=clnt_sock;
					printf("connected client: %d \n", clnt_sock);
				}
				else    // read message!
				{
					str_len=read(i, buf, BUF_SIZE);
					if(str_len==0)    // close request!
					{
						FD_CLR(i, &reads);
						close(i);
						printf("closed client: %d \n", i);
					}
					else
					{
						write(i, buf, str_len);    // echo!
					}
				}
			}
		}
	}
	close(serv_sock);
	return 0;
}
void error_handling(char *buf)
{
	fputs(buf, stderr);
	fputc('\n', stderr);
	exit(1);
}