[RTMP] select, IO 멀티플렉싱
IO 멀티플렉싱 기반 서버
멀티플렉싱은 ‘하나의 통신채널을 통해서 둘 이상의 데이터를 전송하는데 사용되는 기술’입니다. 물리적 장치의 효율성을 높이기 위해서 최소한의 물리적 요소만 사용해서 최대한의 데이터를 전달하기 위해 사용되는 기술입니다. 여러 명이 동시에 말할 때 이를 구분할 수 있는 방법에는 시분할과 주파수 분할이 있습니다.
시분할, 주파수 분할에 상관없이 하나의 프로세서가 여러 명의 클라이언트의 요청에 응답할 수 있다는 점이 중요합니다. 대신 클라이언트의 요청에 대응하기 전에 클라이언트가 요청을 보냈는지(데이터가 수신된 소켓이 있는지) 확인하는 작업이 필요합니다.
select() 함수의 이해와 서버의 구현
select 함수는 한 곳에 여러 개의 파일 디스크립터를 모아서 동시에 이들을 관찰합니다.
- 수신한 데이터를 가지고 있는 소켓이 있는가?
- 블로킹되지 않고 데이터의 전송이 가능한 소켓이 무엇인가?
- 예외상황이 발생한 소켓은 무엇인가?
여기서 말하는 소켓은 서버가 accept()이후 리턴된 클라이언트와 연결된 소켓을 의미합니다. 이 소켓은 위에서 말한 3가지 이벤트에 대해 관찰을 합니다. 소켓들을 select가 관찰하는 순서는 다음과 같습니다.
단계 | 목적 | 코드 |
---|---|---|
1-1 | 파일 디스크립터의 설정 | |
1-2 | 검사의 범위 지정 | |
1-3 | 타임아웃의 설정 | |
2 | select 함수의 호출 | |
3 | 호출결과 확인 |
파일 디스크립터의 설정
select는 여러 개의 파일 디스크립터(fd)를 관찰할 수 있습니다. fd의 관찰은 소켓을 관찰하는 것으로 해석할 수 있습니다. 여러 개의 fd를 동시에 관찰할 때는 관찰항목의 수신/전송/예외에 따라서 구분해서 모아야합니다. fd를 모을 때 사용되는 것이 fd_set형 변수입니다. fd_set은 0과 1로 일어진 비트열이고 다음과 같은 형식입니다.
- 기본 값 : 0 0 0 0
- 관찰중인 fd : fd0 fd1 fd2 fd3
- 현재 값 : 0 1 0 1
- 관심 이벤트 : 수신 / 전송 / 예외 중 하나
위의 fd_set형가 수신 이벤트를 관찰하는 fd_set이라면 fd1과 fd3에 대해서 수신 이벤트가 있을 때를 알려주는 역할을 합니다. fd_set형이 비트형으로 데이터를 저장하기 때문에 관심 fd를 등록하기 위한 메서드를 제공합니다.
FD_ZERO(fd_set * fdset); : 인자로 전달된 주소의 fd_set형 변수의 모든 비트를 0으로 초기화
FD_SET(int fd, fd_set * fdset); : 매개변수 fdset으로 전달된 주소의 변수에 매개변수 fd로 전달된 파일 디스크립터 정보 등록
FD_CLR(int fd, fd_set * fdset); : 매개변수 fdset으로 전달된 주소의 변수에 매개변수 fd로 전달된 파일 디스크립터 정보 삭세
FD_ISSET(int fd, fd_set * fdset); : 매개변수 fdset으로 전달된 주소의 변수에 매개변수 fd로 전달된 파일 디스크립터 정보가 있으면 양수 반환
fd_set set;
FD_ZERO(&set); : 0 0 0 0
FD_SET(1, &set); : 0 1 0 0
FD_SET(2, &set); : 0 1 1 0
FD_CLR(2, &set); : 0 1 0 0
검사의 범위 지정과 타임아웃의 설정
#include <sys/select.h>
#include <sys/time.h>
/**
* @param maxfd 검사 대상이 되는 파일 디스크립터의 수
* @param readset fd_set형 변수에 수신된 데이터의 존재여부에 관심 있는 파일 디스크립터 정보를 모두 등록해서 그 변수의 주소 값을 전달
* @param writeset fd_set형 변수에 블로킹없는 데이터 전송의 가능 여부에 관심 있는 파일 디스크립터 정보를 모두 등록해서 그 변수의 주소 값을 전달
* @param fd_set형 fd_set형 변수에 예외상황의 발생여부에 관심 있는 파일 디스크립터 정보를 모두 등록해서 그 변수의 주소 값을 전달
* @param timeout select 호출 이후에 무한정 블로킹에 빠지지 않도록 타임아웃을 설정하기 위한 인자 전달
* @return 오류발생시 -1. 타임아웃 반환시 0. otherwise 변화가 발생한 fd의 수(양수)
*
*/
int select(
int maxfd, fd_set * readset, fd_Set *writeset, fd_set * exceptset, const struct timeval * timeout);
)
/**
* select 함수는 관찰 중인 파일 디스크립터에 변화가 생겼을 때 반환을 한다. 하지만 변화가 생기지 않으면 무한저 블로킹 상태에 머문다.
* 초단위 정보와 마이크로 초 단위 정보를 지정하고 이 시간이 지나면 select()가 반환이 된다. 타임아웃에 의한 반환인 것은 리턴값이 0인지
* 확인해서 알 수 있다.
*
* 타임아웃을 지정하고 싶지 않을 때는 NULL을 전달한다.
*
*/
struct timeval{
long tv_sec; //seconds
long tv_usec; //microseconds
}
fd_set 변수에 0 1 1 1을 지정해서 select()에 전달했다고 가정해보겠습니다. 그리고 select()가 반환된 이후 리턴값은 2이고, fd_set의 값은 0 1 0 1로 변했습니다. 이 경우 fd1과 fd3에 관심있는 이벤트가 발생했음을 알 수 있습니다. 만약 같은 조건으로 계속 이벤트를 기다리고 싶다면 fd_set의 변수를 다시 0 1 1 1로 바꾸어서 select()에 전달해야 합니다. 그래서 보통 select()에 넣기 전에 fd_set값을 copy해둡니다.
$ git clone https://github.com/niklasjang/cpp-rtmp-relay
$ git checkout feature/select
fd_set reads, temps;
int result, str_len;
char buf[BUF_SIZE];
struct timeval timeout;
FD_ZERO(&reads);
FD_SET(0, &reads); // 0 is standard input(console)
/*
timeout.tv_sec=5;
timeout.tv_usec=5000;
*/
while(1)
{
temps=reads;
timeout.tv_sec=5;
timeout.tv_usec=0;
result=select(1, &temps, 0, 0, &timeout);
if(result==-1)
{
puts("select() error!");
break;
}
else if(result==0)
{
puts("Time-out!");
}
else
{
if(FD_ISSET(0, &temps))
{
str_len=read(0, buf, BUF_SIZE);
buf[str_len]=0;
printf("message from console: %s", buf);
}
}
}
echo select server
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <arpa/inet.h>
#include <sys/socket.h>
#include <sys/time.h>
#include <sys/select.h>
#define BUF_SIZE 100
void error_handling(char *buf);
int main(int argc, char *argv[])
{
int serv_sock, clnt_sock;
struct sockaddr_in serv_adr, clnt_adr;
struct timeval timeout;
fd_set reads, cpy_reads;
socklen_t adr_sz;
int fd_max, str_len, fd_num, i;
char buf[BUF_SIZE];
if(argc!=2) {
printf("Usage : %s <port>\n", argv[0]);
exit(1);
}
serv_sock=socket(PF_INET, SOCK_STREAM, 0);
memset(&serv_adr, 0, sizeof(serv_adr));
serv_adr.sin_family=AF_INET;
serv_adr.sin_addr.s_addr=htonl(INADDR_ANY);
serv_adr.sin_port=htons(atoi(argv[1]));
if(bind(serv_sock, (struct sockaddr*) &serv_adr, sizeof(serv_adr))==-1)
error_handling("bind() error");
if(listen(serv_sock, 5)==-1)
error_handling("listen() error");
FD_ZERO(&reads);
FD_SET(serv_sock, &reads);
fd_max=serv_sock;
while(1)
{
cpy_reads=reads;
timeout.tv_sec=5;
timeout.tv_usec=5000;
if((fd_num=select(fd_max+1, &cpy_reads, 0, 0, &timeout))==-1)
break;
if(fd_num==0)
continue;
for(i=0; i<fd_max+1; i++)
{
if(FD_ISSET(i, &cpy_reads))
{
if(i==serv_sock) // connection request!
{
adr_sz=sizeof(clnt_adr);
clnt_sock=
accept(serv_sock, (struct sockaddr*)&clnt_adr, &adr_sz);
FD_SET(clnt_sock, &reads);
if(fd_max<clnt_sock)
fd_max=clnt_sock;
printf("connected client: %d \n", clnt_sock);
}
else // read message!
{
str_len=read(i, buf, BUF_SIZE);
if(str_len==0) // close request!
{
FD_CLR(i, &reads);
close(i);
printf("closed client: %d \n", i);
}
else
{
write(i, buf, str_len); // echo!
}
}
}
}
}
close(serv_sock);
return 0;
}
void error_handling(char *buf)
{
fputs(buf, stderr);
fputc('\n', stderr);
exit(1);
}