Event-Driven Server Architectures
: alternative to synchronous blocking I/O
: the mapping of a single thread to multiple connections
: new events are queued and the thread executes a so-called event loop--dequeuing events from the queue, processing the event, then taking the next event or waiting for new events to be pushed.
- thread-based vs. event-driven
Table 4.2: Main differences between thread-based and event-driven server architectures.thread-based event-driven connection/request state thread context state machine/continuation main I/O model synchronous/blocking asynchronous/non-blocking activity flow thread-per-connection events and associated handlers primary scheduling strategy preemptive (OS) cooperative scheduling component scheduler (OS) event loop calling semantics blocking dispatching/awaiting events Non-blocking I/O Multiplexing Patterns
: event-based I/O multiplexing
- Reactor Pattern
: 주로 리눅스의 구현
: select, epoll, kqueue, Node.js, Ruby EventMachine, Scala Akka’s IO 모듈 등
: synchronous, non-blocking I/O handling and relies on an event notification interface.
: event나 socket 같은 리소스들의 집합을 등록해놓고, 각 리소스들을 콜백 또 후킹하는 적절한 event handler가 필요하다.
: 핵심 구성요소인 synchronous event demultiplexer는 blocking event notification interface를 사용하여 리소스의 이벤트를 기다린다.
: synchronous event demultiplexer가 받는 이벤트는 언제든지 dispatcher로 알리고, 다음 event를 기다린다.
: dispatcher는 관련 event handler 선택하고, callback/hook execution을 triggering함으로써 event를 처리한다.
: 리엑터 패턴은 event handling 과 multiplexing 분리한다. 이 패턴은 싱글 스레드를 사용하므로 blocking 동작은 전체적은 application의 동작을 지연시킬 수 있다. 그래서 다양한 Reactor Pattern들이 event handler에 대한 thread poll(thread 개수는 CPU 개수*2 + 1)을 사용한다.*장점
: 리엑터 패턴은 리엑터 구현에서 애플리케이션에 관련된 코드가 완벽하게 분리됨. 그래서 애플리케이션 컴포넌트들이 모듈화되고 재사용 가능한 부분들로 분리될 수 있음. 시스템을 멀티스레드로 인한 복잡성에서 보호해 줌.*단점
: 디버깅하기 어려움. 요청 핸들러를 동기적으로 호출하기 때문에 최대 동시성에 제한이 있음. 요청 핸들러 뿐만 아니라 디멀티플렉서에 의해서도 확장성이 제한됨. (OS 스케줄링 이용 불가, event queue에 대한 성능 이슈)
■ 1_epoll
: select, poll의 문제점 해결#include <stdio.h> #include <unistd.h> #include <sys/types.h> #include <sys/socket.h> #include <sys/epoll.h> // epoll api #include <arpa/inet.h> #include <netinet/in.h> // select, poll 문제점 // 1) 관찰 대상 디스크립터의 정보를 매번 운영체제에 전달해야 한다. // 2) 1024 개 이상의 디스크립터를 등록하는 것이 불가능하다. // epoll // - 2.5.44 부터 사용 가능하다. // - $ cat /proc/sys/kernel/osrelease // epoll api 3가지 // 1) epoll_create : 파일 디스크립터 저장소 생성 // 2) epoll_ctl : 디스크립터 등록 & 삭제 // 3) epoll_wait : 파일 디스크립터의 변화를 감지하는 함수 int main() { int sock = socket(PF_INET, SOCK_STREAM, 0); struct sockaddr_in saddr = {0, }; saddr.sin_family = AF_INET; saddr.sin_port = htons(4000); saddr.sin_addr.s_addr = INADDR_ANY; // 0.0.0.0 int option = true; socklen_t optlen = sizeof(option); setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, &option, optlen); bind(sock, (struct sockaddr*)&saddr, sizeof saddr); listen(sock, 5); // 1. 초기화 const int EPOLL_SIZE = 1024; int efd = epoll_create(EPOLL_SIZE); // 최신 커널에서는 사이즈는 무시됩니다. // 2. 이벤트 등록 struct epoll_event events[EPOLL_SIZE]; struct epoll_event event; event.events = EPOLLIN; event.data.fd = sock; epoll_ctl(efd, EPOLL_CTL_ADD, sock, &event); while (1) { int count = epoll_wait(efd, events, EPOLL_SIZE, -1); for (int i = 0 ; i < count ; ++i) { if (events[i].data.fd == sock) { struct sockaddr_in caddr = {0, }; socklen_t clen = sizeof(caddr); int csock = accept(sock, (struct sockaddr*)&caddr, &clen); char* cip = inet_ntoa(caddr.sin_addr); printf("Connected from %s\n", cip); // 새로운 연결을 등록해주어야 한다. event.events = EPOLLIN; event.data.fd = csock; epoll_ctl(efd, EPOLL_CTL_ADD, csock, &event); } else { int csock = events[i].data.fd; char buf[1024]; int n = read(csock, buf, sizeof buf); if (n <= 0) { printf("연결 종료!!\n"); close(csock); // 종료된 디스크립터를 등록 해지해야 한다. epoll_ctl(efd, EPOLL_CTL_DEL, csock, 0); } else write(csock, buf, n); } } } close(sock); }
■ 2_epoll
: Level Triggering vs. Edge Triggering#include <stdio.h> #include <unistd.h> #include <errno.h> #include <fcntl.h> // fcntl() #include <arpa/inet.h> #include <netinet/in.h> #include <sys/types.h> #include <sys/socket.h> #include <sys/epoll.h> // epoll api // epoll 이벤트 처리 방식 // Level Triggering - Default // : 수신 버퍼에 데이터가 있다면, 이벤트가 발생 // Edge Triggering // : 수신 버퍼에 데이터가 없다가, 존재하는 시점에 이벤트가 발생 // 핵심 : 데이터를 처리하는 디스크립터를 Non-blokcing 모드로 설정해야 한다. // ET 장점 // 1. 데이터를 처리하기 위해 커널에 진입하는 횟수가 적다. // 2. 데이터를 수신하는 로직과 데이터를 처리하는 로직을 분리할 수 있다. // - 서버의 구조화에 유리하다. void setNonBlokcing(int fd) { // 아래 코드는 문제점이 있습니다. // fcntl(fd, F_SETFL, O_NONBLOCK); // Edge Triggering은 연결에 대한 데이터가 들어오는 시점에서 발생하므로 다시 데이터가 올라올때까지 발생하지 않는다. // ex) aaaa -> aaaa, aaaab -> aaaa // -> blocking이 생길 수 있다. -> read를 non-blocking 되도록 해야한다. // -> Edge trigger + non-blocking -> fcntl int flags = fcntl(fd, F_GETFL, 0); flags |= O_NONBLOCK; fcntl(fd, F_SETFL, flags); } int main() { int sock = socket(PF_INET, SOCK_STREAM, 0); struct sockaddr_in saddr = {0, }; saddr.sin_family = AF_INET; saddr.sin_port = htons(4000); saddr.sin_addr.s_addr = INADDR_ANY; // 0.0.0.0 int option = true; socklen_t optlen = sizeof(option); setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, &option, optlen); bind(sock, (struct sockaddr*)&saddr, sizeof saddr); listen(sock, 5); // 1. 초기화 const int EPOLL_SIZE = 1024; int efd = epoll_create(EPOLL_SIZE); // 최신 커널에서는 사이즈는 무시됩니다. // 2. 이벤트 등록 struct epoll_event events[EPOLL_SIZE]; struct epoll_event event; event.events = EPOLLIN; event.data.fd = sock; epoll_ctl(efd, EPOLL_CTL_ADD, sock, &event); while (1) { int count = epoll_wait(efd, events, EPOLL_SIZE, -1); printf("epoll_wait()\n"); for (int i = 0 ; i < count ; ++i) { if (events[i].data.fd == sock) { struct sockaddr_in caddr = {0, }; socklen_t clen = sizeof(caddr); int csock = accept(sock, (struct sockaddr*)&caddr, &clen); char* cip = inet_ntoa(caddr.sin_addr); printf("Connected from %s\n", cip); // Non-Blocking 으로 설정 setNonBlokcing(csock); // 새로운 연결을 등록해주어야 한다. event.events = EPOLLIN | EPOLLET; event.data.fd = csock; epoll_ctl(efd, EPOLL_CTL_ADD, csock, &event); } else { int csock = events[i].data.fd; // char buf[1024]; char buf[4]; while (1) { int n = read(csock, buf, sizeof buf); if (n == 0) { printf("연결 종료!!\n"); close(csock); // 종료된 디스크립터를 등록 해지해야 한다. epoll_ctl(efd, EPOLL_CTL_DEL, csock, 0); } else if (n == -1) { int err = errno; if (err == EAGAIN) // 데이터가 존재하지 않는다. break; else { close(csock); epoll_ctl(efd, EPOLL_CTL_DEL, csock, 0); } } else { printf("read() : %d\n", n); write(csock, buf, n); } } } } } close(sock); }
- Proactor Pattern
: 주로 윈도우의 구현
: POSIX AIO, Boost.Asio (C++), Adaptive Communication Environment (C++), libbitcoin (C++), RJR (Ruby), win32 IOCP
: asynchronous, non-blocking I/O operations
: Reactor pattern의 asynchronous 방식으로 보기도 함
: blocking event notification interfaces 대신에 completion events
: proactive initiator는 main application thread로 asynchronous I/O 동작들을 초기화하는 역할
: 동작이 이슈될때 항상 completion handler와 completion dispatcher에 등록된다.
: 비동기적인 동작의 수행은 asynchronous operation processor의해 관장된다. 이 프로세스는 OS에 의해 구현된다.
: I/O 작업이 완료되었을때, completion dispatcher로 notifiy 되고, 다음으로 completion handler는 그 결과를 처리한다.- 1_blocking_queue
: mutex는 커널단 컨텍스트 스위칭이므로 오버헤드가 큼 -> atomic operation#include <unistd.h> #include <mutex> #include <condition_variable> // Event #include <deque> #include <thread> #include <iostream> using namespace std; template <typename T> class blocking_queue { deque<T> d_queue; condition_variable d_condition; mutex d_mutex; public: void push(const T& value) { { unique_lock<mutex> lock(d_mutex); d_queue.push_front(value); } d_condition.notify_one(); } T pop() { unique_lock<mutex> lock(d_mutex); d_condition.wait(lock, [=] { return !d_queue.empty(); } ); T rc(std::move(d_queue.back())); // move fast d_queue.pop_back(); return rc; } };
- 2_lockfree_queue
: boost를 이용한 생산자 소비자 패턴#include <iostream> #include <boost/lockfree/queue.hpp> // h + cpp = hpp #include <boost/thread/thread.hpp> #include <boost/atomic.hpp> using namespace std; using namespace boost; atomic_int producer_count(0); atomic_int consumer_count(0); lockfree::queue<int> queue(128); const int iterations = 100000000; const int producer_thread_count = 4; const int consumer_thread_count = 4; void producer(void) { for (int i = 0 ; i < iterations ; ++i) { int value = ++producer_count; while (!queue.push(value)) ; } } atomic<bool> done(false); void consumer(void) { int value; while (!done) { while (queue.pop(value)) ++consumer_count; } while (queue.pop(value)) ++consumer_count; } int main() { cout << "queue is "; if (!queue.is_lock_free()) cout << "not "; cout << "lockfree" << endl; thread_group producer_threads, consumer_threads; for (int i = 0 ; i < producer_thread_count ; ++i) producer_threads.create_thread(producer); for (int i = 0 ; i < consumer_thread_count ; ++i) consumer_threads.create_thread(consumer); producer_threads.join_all(); done = true; consumer_threads.join_all(); cout << "produced " << producer_count << endl; cout << "consumed " << consumer_count << endl;
- 4_asio_echo_server
: block에 대한 개념이 존재하지 않음
: boost asio에서 핵심은 io_service
: 각각의 연산 자체가 비동기로 이루어짐#include <boost/asio.hpp> using namespace boost; #include <iostream> #include <memory> // shared_ptr<> #include <utility> using namespace std; using boost::asio::ip::tcp; class Session : public std::enable_shared_from_this<Session> { public: Session(tcp::socket s) : socket(std::move(s)) {} void start() { doRead(); } void doRead() { auto self(shared_from_this()); // 참조 계수 증가, self를 참조안하면 객체가 바로 파괴되버림 // 1. async_read : readn // 2. async_read_some : read socket.async_read_some( asio::buffer(data, 1024), [this, self](system::error_code ec, size_t length) { if (!ec) doWrite(length); }); } void doWrite(size_t length) { auto self(shared_from_this()); asio::async_write( socket, asio::buffer(data, length), [this, self](system::error_code ec, size_t) { if (!ec) doRead(); }); } private: tcp::socket socket; char data[1024]; }; class Server { public: Server(asio::io_service& io, short port) : acceptor(io, tcp::endpoint(tcp::v4(), port)), socket(io) { doAccept(); } void doAccept() { acceptor.async_accept(socket, [this](system::error_code ec) { // 클라이언트의 접속 요청이 완료된 시점에 호출되는 핸들러 if (!ec) { // socket... make_shared<Session>(std::move(socket))->start(); } }); } private: tcp::acceptor acceptor; tcp::socket socket; }; int main() { try { asio::io_service io_service; Server s(io_service, 4000); io_service.run(); } catch (std::exception& e) { cerr << e.what() << endl; } }
Reactor, Proactor 간단한 도식화
- Reactor
- Proactor
참고: http://berb.github.io/diploma-thesis/original/042_serverarch.html, http://ozt88.tistory.com/25
'Programing > Server Model' 카테고리의 다른 글
서버 모델 - 윈도우 Overlapped I/O (0) | 2016.02.26 |
---|---|
서버 모델 - 윈도우 소켓 프로그래밍 (0) | 2016.02.26 |
서버 모델 - I/O multiplexing (0) | 2016.02.22 |
서버 모델 - Synchronous Blocking I/O (0) | 2016.02.22 |
서버 모델 - I/O 모델 개요 (0) | 2016.02.22 |