Event-Driven Server Architectures
: alternative to synchronous blocking I/O
: the mapping of a single thread to multiple connections
: new events are queued and the thread executes a so-called event loop--dequeuing events from the queue, processing the event, then taking the next event or waiting for new events to be pushed.
- thread-based vs. event-driven
Table 4.2: Main differences between thread-based and event-driven server architectures.thread-based event-driven connection/request state thread context state machine/continuation main I/O model synchronous/blocking asynchronous/non-blocking activity flow thread-per-connection events and associated handlers primary scheduling strategy preemptive (OS) cooperative scheduling component scheduler (OS) event loop calling semantics blocking dispatching/awaiting events Non-blocking I/O Multiplexing Patterns
: event-based I/O multiplexing
- Reactor Pattern
: 주로 리눅스의 구현
: select, epoll, kqueue, Node.js, Ruby EventMachine, Scala Akka’s IO 모듈 등
: synchronous, non-blocking I/O handling and relies on an event notification interface.
: event나 socket 같은 리소스들의 집합을 등록해놓고, 각 리소스들을 콜백 또 후킹하는 적절한 event handler가 필요하다.
: 핵심 구성요소인 synchronous event demultiplexer는 blocking event notification interface를 사용하여 리소스의 이벤트를 기다린다.
: synchronous event demultiplexer가 받는 이벤트는 언제든지 dispatcher로 알리고, 다음 event를 기다린다.
: dispatcher는 관련 event handler 선택하고, callback/hook execution을 triggering함으로써 event를 처리한다.
: 리엑터 패턴은 event handling 과 multiplexing 분리한다. 이 패턴은 싱글 스레드를 사용하므로 blocking 동작은 전체적은 application의 동작을 지연시킬 수 있다. 그래서 다양한 Reactor Pattern들이 event handler에 대한 thread poll(thread 개수는 CPU 개수*2 + 1)을 사용한다.*장점
: 리엑터 패턴은 리엑터 구현에서 애플리케이션에 관련된 코드가 완벽하게 분리됨. 그래서 애플리케이션 컴포넌트들이 모듈화되고 재사용 가능한 부분들로 분리될 수 있음. 시스템을 멀티스레드로 인한 복잡성에서 보호해 줌.*단점
: 디버깅하기 어려움. 요청 핸들러를 동기적으로 호출하기 때문에 최대 동시성에 제한이 있음. 요청 핸들러 뿐만 아니라 디멀티플렉서에 의해서도 확장성이 제한됨. (OS 스케줄링 이용 불가, event queue에 대한 성능 이슈)
■ 1_epoll
: select, poll의 문제점 해결1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495#include <stdio.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <sys/epoll.h> // epoll api
#include <arpa/inet.h>
#include <netinet/in.h>
// select, poll 문제점
// 1) 관찰 대상 디스크립터의 정보를 매번 운영체제에 전달해야 한다.
// 2) 1024 개 이상의 디스크립터를 등록하는 것이 불가능하다.
// epoll
// - 2.5.44 부터 사용 가능하다.
// - $ cat /proc/sys/kernel/osrelease
// epoll api 3가지
// 1) epoll_create : 파일 디스크립터 저장소 생성
// 2) epoll_ctl : 디스크립터 등록 & 삭제
// 3) epoll_wait : 파일 디스크립터의 변화를 감지하는 함수
int
main()
{
int
sock = socket(PF_INET, SOCK_STREAM, 0);
struct
sockaddr_in saddr = {0, };
saddr.sin_family = AF_INET;
saddr.sin_port = htons(4000);
saddr.sin_addr.s_addr = INADDR_ANY;
// 0.0.0.0
int
option =
true
;
socklen_t optlen =
sizeof
(option);
setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, &option, optlen);
bind(sock, (
struct
sockaddr*)&saddr,
sizeof
saddr);
listen(sock, 5);
// 1. 초기화
const
int
EPOLL_SIZE = 1024;
int
efd = epoll_create(EPOLL_SIZE);
// 최신 커널에서는 사이즈는 무시됩니다.
// 2. 이벤트 등록
struct
epoll_event events[EPOLL_SIZE];
struct
epoll_event event;
event.events = EPOLLIN;
event.data.fd = sock;
epoll_ctl(efd, EPOLL_CTL_ADD, sock, &event);
while
(1)
{
int
count = epoll_wait(efd, events, EPOLL_SIZE, -1);
for
(
int
i = 0 ; i < count ; ++i)
{
if
(events[i].data.fd == sock)
{
struct
sockaddr_in caddr = {0, };
socklen_t clen =
sizeof
(caddr);
int
csock = accept(sock, (
struct
sockaddr*)&caddr, &clen);
char
* cip = inet_ntoa(caddr.sin_addr);
printf
(
"Connected from %s\n"
, cip);
// 새로운 연결을 등록해주어야 한다.
event.events = EPOLLIN;
event.data.fd = csock;
epoll_ctl(efd, EPOLL_CTL_ADD, csock, &event);
}
else
{
int
csock = events[i].data.fd;
char
buf[1024];
int
n = read(csock, buf,
sizeof
buf);
if
(n <= 0)
{
printf
(
"연결 종료!!\n"
);
close(csock);
// 종료된 디스크립터를 등록 해지해야 한다.
epoll_ctl(efd, EPOLL_CTL_DEL, csock, 0);
}
else
write(csock, buf, n);
}
}
}
close(sock);
}
■ 2_epoll
: Level Triggering vs. Edge Triggering123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136#include <stdio.h>
#include <unistd.h>
#include <errno.h>
#include <fcntl.h> // fcntl()
#include <arpa/inet.h>
#include <netinet/in.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <sys/epoll.h> // epoll api
// epoll 이벤트 처리 방식
// Level Triggering - Default
// : 수신 버퍼에 데이터가 있다면, 이벤트가 발생
// Edge Triggering
// : 수신 버퍼에 데이터가 없다가, 존재하는 시점에 이벤트가 발생
// 핵심 : 데이터를 처리하는 디스크립터를 Non-blokcing 모드로 설정해야 한다.
// ET 장점
// 1. 데이터를 처리하기 위해 커널에 진입하는 횟수가 적다.
// 2. 데이터를 수신하는 로직과 데이터를 처리하는 로직을 분리할 수 있다.
// - 서버의 구조화에 유리하다.
void
setNonBlokcing(
int
fd)
{
// 아래 코드는 문제점이 있습니다.
// fcntl(fd, F_SETFL, O_NONBLOCK);
// Edge Triggering은 연결에 대한 데이터가 들어오는 시점에서 발생하므로 다시 데이터가 올라올때까지 발생하지 않는다.
// ex) aaaa -> aaaa, aaaab -> aaaa
// -> blocking이 생길 수 있다. -> read를 non-blocking 되도록 해야한다.
// -> Edge trigger + non-blocking -> fcntl
int
flags = fcntl(fd, F_GETFL, 0);
flags |= O_NONBLOCK;
fcntl(fd, F_SETFL, flags);
}
int
main()
{
int
sock = socket(PF_INET, SOCK_STREAM, 0);
struct
sockaddr_in saddr = {0, };
saddr.sin_family = AF_INET;
saddr.sin_port = htons(4000);
saddr.sin_addr.s_addr = INADDR_ANY;
// 0.0.0.0
int
option =
true
;
socklen_t optlen =
sizeof
(option);
setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, &option, optlen);
bind(sock, (
struct
sockaddr*)&saddr,
sizeof
saddr);
listen(sock, 5);
// 1. 초기화
const
int
EPOLL_SIZE = 1024;
int
efd = epoll_create(EPOLL_SIZE);
// 최신 커널에서는 사이즈는 무시됩니다.
// 2. 이벤트 등록
struct
epoll_event events[EPOLL_SIZE];
struct
epoll_event event;
event.events = EPOLLIN;
event.data.fd = sock;
epoll_ctl(efd, EPOLL_CTL_ADD, sock, &event);
while
(1)
{
int
count = epoll_wait(efd, events, EPOLL_SIZE, -1);
printf
(
"epoll_wait()\n"
);
for
(
int
i = 0 ; i < count ; ++i)
{
if
(events[i].data.fd == sock)
{
struct
sockaddr_in caddr = {0, };
socklen_t clen =
sizeof
(caddr);
int
csock = accept(sock, (
struct
sockaddr*)&caddr, &clen);
char
* cip = inet_ntoa(caddr.sin_addr);
printf
(
"Connected from %s\n"
, cip);
// Non-Blocking 으로 설정
setNonBlokcing(csock);
// 새로운 연결을 등록해주어야 한다.
event.events = EPOLLIN | EPOLLET;
event.data.fd = csock;
epoll_ctl(efd, EPOLL_CTL_ADD, csock, &event);
}
else
{
int
csock = events[i].data.fd;
// char buf[1024];
char
buf[4];
while
(1)
{
int
n = read(csock, buf,
sizeof
buf);
if
(n == 0)
{
printf
(
"연결 종료!!\n"
);
close(csock);
// 종료된 디스크립터를 등록 해지해야 한다.
epoll_ctl(efd, EPOLL_CTL_DEL, csock, 0);
}
else
if
(n == -1)
{
int
err =
errno
;
if
(err == EAGAIN)
// 데이터가 존재하지 않는다.
break
;
else
{
close(csock);
epoll_ctl(efd, EPOLL_CTL_DEL, csock, 0);
}
}
else
{
printf
(
"read() : %d\n"
, n);
write(csock, buf, n);
}
}
}
}
}
close(sock);
}
- Proactor Pattern
: 주로 윈도우의 구현
: POSIX AIO, Boost.Asio (C++), Adaptive Communication Environment (C++), libbitcoin (C++), RJR (Ruby), win32 IOCP
: asynchronous, non-blocking I/O operations
: Reactor pattern의 asynchronous 방식으로 보기도 함
: blocking event notification interfaces 대신에 completion events
: proactive initiator는 main application thread로 asynchronous I/O 동작들을 초기화하는 역할
: 동작이 이슈될때 항상 completion handler와 completion dispatcher에 등록된다.
: 비동기적인 동작의 수행은 asynchronous operation processor의해 관장된다. 이 프로세스는 OS에 의해 구현된다.
: I/O 작업이 완료되었을때, completion dispatcher로 notifiy 되고, 다음으로 completion handler는 그 결과를 처리한다.- 1_blocking_queue
: mutex는 커널단 컨텍스트 스위칭이므로 오버헤드가 큼 -> atomic operation1234567891011121314151617181920212223242526272829303132333435363738#include <unistd.h>
#include <mutex>
#include <condition_variable> // Event
#include <deque>
#include <thread>
#include <iostream>
using
namespace
std;
template
<
typename
T>
class
blocking_queue
{
deque<T> d_queue;
condition_variable d_condition;
mutex d_mutex;
public
:
void
push(
const
T& value)
{
{
unique_lock<mutex> lock(d_mutex);
d_queue.push_front(value);
}
d_condition.notify_one();
}
T pop()
{
unique_lock<mutex> lock(d_mutex);
d_condition.wait(lock, [=] {
return
!d_queue.empty(); } );
T rc(std::move(d_queue.back()));
// move fast
d_queue.pop_back();
return
rc;
}
};
- 2_lockfree_queue
: boost를 이용한 생산자 소비자 패턴12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364#include <iostream>
#include <boost/lockfree/queue.hpp>
// h + cpp = hpp
#include <boost/thread/thread.hpp>
#include <boost/atomic.hpp>
using
namespace
std;
using
namespace
boost;
atomic_int producer_count(0);
atomic_int consumer_count(0);
lockfree::queue<
int
> queue(128);
const
int
iterations = 100000000;
const
int
producer_thread_count = 4;
const
int
consumer_thread_count = 4;
void
producer(
void
)
{
for
(
int
i = 0 ; i < iterations ; ++i) {
int
value = ++producer_count;
while
(!queue.push(value))
;
}
}
atomic<
bool
> done(
false
);
void
consumer(
void
)
{
int
value;
while
(!done)
{
while
(queue.pop(value))
++consumer_count;
}
while
(queue.pop(value))
++consumer_count;
}
int
main()
{
cout <<
"queue is "
;
if
(!queue.is_lock_free())
cout <<
"not "
;
cout <<
"lockfree"
<< endl;
thread_group producer_threads, consumer_threads;
for
(
int
i = 0 ; i < producer_thread_count ; ++i)
producer_threads.create_thread(producer);
for
(
int
i = 0 ; i < consumer_thread_count ; ++i)
consumer_threads.create_thread(consumer);
producer_threads.join_all();
done =
true
;
consumer_threads.join_all();
cout <<
"produced "
<< producer_count << endl;
cout <<
"consumed "
<< consumer_count << endl;
- 4_asio_echo_server
: block에 대한 개념이 존재하지 않음
: boost asio에서 핵심은 io_service
: 각각의 연산 자체가 비동기로 이루어짐1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889#include <boost/asio.hpp>
using
namespace
boost;
#include <iostream>
#include <memory> // shared_ptr<>
#include <utility>
using
namespace
std;
using
boost::asio::ip::tcp;
class
Session :
public
std::enable_shared_from_this<Session>
{
public
:
Session(tcp::socket s) : socket(std::move(s)) {}
void
start() { doRead(); }
void
doRead() {
auto self(shared_from_this());
// 참조 계수 증가, self를 참조안하면 객체가 바로 파괴되버림
// 1. async_read : readn
// 2. async_read_some : read
socket.async_read_some(
asio::buffer(data, 1024),
[
this
, self](
system
::error_code ec,
size_t
length) {
if
(!ec)
doWrite(length);
});
}
void
doWrite(
size_t
length)
{
auto self(shared_from_this());
asio::async_write(
socket, asio::buffer(data, length),
[
this
, self](
system
::error_code ec,
size_t
) {
if
(!ec)
doRead();
});
}
private
:
tcp::socket socket;
char
data[1024];
};
class
Server
{
public
:
Server(asio::io_service& io,
short
port)
: acceptor(io, tcp::endpoint(tcp::v4(), port)),
socket(io) {
doAccept();
}
void
doAccept()
{
acceptor.async_accept(socket, [
this
](
system
::error_code ec) {
// 클라이언트의 접속 요청이 완료된 시점에 호출되는 핸들러
if
(!ec) {
// socket...
make_shared<Session>(std::move(socket))->start();
}
});
}
private
:
tcp::acceptor acceptor;
tcp::socket socket;
};
int
main()
{
try
{
asio::io_service io_service;
Server s(io_service, 4000);
io_service.run();
}
catch
(std::exception& e) {
cerr << e.what() << endl;
}
}
Reactor, Proactor 간단한 도식화
- Reactor
- Proactor
참고: http://berb.github.io/diploma-thesis/original/042_serverarch.html, http://ozt88.tistory.com/25
'Programing > Server Model' 카테고리의 다른 글
서버 모델 - 윈도우 Overlapped I/O (0) | 2016.02.26 |
---|---|
서버 모델 - 윈도우 소켓 프로그래밍 (0) | 2016.02.26 |
서버 모델 - I/O multiplexing (0) | 2016.02.22 |
서버 모델 - Synchronous Blocking I/O (0) | 2016.02.22 |
서버 모델 - I/O 모델 개요 (0) | 2016.02.22 |