•  Event-Driven Server Architectures
    : alternative to synchronous blocking I/O
    the mapping of a single thread to multiple connections
    new events are queued and the thread executes a so-called event loop--dequeuing events from the queue, processing the event, then taking the next event or waiting for new events to be pushed.

    - thread-based vs. event-driven

    thread-basedevent-driven
    connection/request statethread contextstate machine/continuation
    main I/O modelsynchronous/blockingasynchronous/non-blocking
    activity flowthread-per-connectionevents and associated handlers
    primary scheduling strategypreemptive (OS)cooperative
    scheduling componentscheduler (OS)event loop
    calling semanticsblockingdispatching/awaiting events
    Table 4.2: Main differences between thread-based and event-driven server architectures.


  • Non-blocking I/O Multiplexing Patterns
    event-based I/O multiplexing

    - Reactor Pattern
     : 주로 리눅스의 구현
     : select, epoll, kqueue, Node.js, Ruby EventMachine, Scala Akka’s IO 모듈 등
     : synchronous, non-blocking I/O handling and relies on an event notification interface.
     : event나 socket 같은 리소스들의 집합을 등록해놓고, 각 리소스들을 콜백 또 후킹하는 적절한 event handler가 필요하다.
     : 핵심 구성요소인 synchronous event demultiplexer blocking event notification interface를 사용하여 리소스의 이벤트를 기다린다.
     : synchronous event demultiplexer가 받는 이벤트는 언제든지 dispatcher로 알리고, 다음 event를 기다린다.
     : dispatcher는 관련 event handler 선택하고, callback/hook execution을 triggering함으로써 event를 처리한다.
     : 리엑터 패턴은 event handling 과 multiplexing 분리한다. 이 패턴은 싱글 스레드를 사용하므로 blocking 동작은 전체적은 application의 동작을 지연시킬 수 있다. 그래서 다양한 Reactor Pattern들이 event handler에 대한 thread poll(thread 개수는 CPU 개수*2 + 1)을 사용한다.

    *장점
    : 리엑터 패턴은 리엑터 구현에서 애플리케이션에 관련된 코드가 완벽하게 분리됨. 그래서 애플리케이션 컴포넌트들이 모듈화되고 재사용 가능한 부분들로 분리될 수 있음. 시스템을 멀티스레드로 인한 복잡성에서 보호해 줌.

    *단점
    : 디버깅하기 어려움. 요청 핸들러를 동기적으로 호출하기 때문에 최대 동시성에 제한이 있음. 요청 핸들러 뿐만 아니라 디멀티플렉서에 의해서도 확장성이 제한됨. (OS 스케줄링 이용 불가, event queue에 대한 성능 이슈)




      ■ 1_epoll
       : select, poll의 문제점 해결

     
    #include <stdio.h>
    
    #include <unistd.h>
    #include <sys/types.h>
    #include <sys/socket.h>
    #include <sys/epoll.h>   // epoll api
    
    #include <arpa/inet.h>
    #include <netinet/in.h>
    
    // select, poll 문제점
    // 1) 관찰 대상 디스크립터의 정보를 매번 운영체제에 전달해야 한다.
    // 2) 1024 개 이상의 디스크립터를 등록하는 것이 불가능하다.
    
    // epoll
    // - 2.5.44 부터 사용 가능하다.
    // - $ cat /proc/sys/kernel/osrelease
    
    // epoll api 3가지
    // 1) epoll_create : 파일 디스크립터 저장소 생성
    // 2) epoll_ctl    : 디스크립터 등록 & 삭제
    // 3) epoll_wait   : 파일 디스크립터의 변화를 감지하는 함수
    
    int main()
    {
    	int sock = socket(PF_INET, SOCK_STREAM, 0);
    
    	struct sockaddr_in saddr = {0, };
    	saddr.sin_family = AF_INET;
    	saddr.sin_port = htons(4000);
    	saddr.sin_addr.s_addr = INADDR_ANY; // 0.0.0.0
    
    	int option = true;
    	socklen_t optlen = sizeof(option);
    	setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, &option, optlen);
    
    	bind(sock, (struct sockaddr*)&saddr, sizeof saddr);
    	listen(sock, 5);
    
    
    	// 1. 초기화
    	const int EPOLL_SIZE = 1024;
    	int efd = epoll_create(EPOLL_SIZE);
    	// 최신 커널에서는 사이즈는 무시됩니다.
    
    	// 2. 이벤트 등록
    	struct epoll_event events[EPOLL_SIZE];
    
    	struct epoll_event event;
    	event.events = EPOLLIN;
    	event.data.fd = sock;
    	epoll_ctl(efd, EPOLL_CTL_ADD, sock, &event); 
    
    	while (1) 
    	{
    		int count = epoll_wait(efd, events, EPOLL_SIZE, -1);
    
    		for (int i = 0 ; i < count ; ++i)
    		{
    			if (events[i].data.fd == sock)
    			{
    				struct sockaddr_in caddr = {0, };
    				socklen_t clen = sizeof(caddr);
    				int csock = accept(sock, (struct sockaddr*)&caddr, &clen);
    
    				char* cip = inet_ntoa(caddr.sin_addr);
    				printf("Connected from %s\n", cip);
    
    				// 새로운 연결을 등록해주어야 한다.
    				event.events = EPOLLIN;
    				event.data.fd = csock;
    				epoll_ctl(efd, EPOLL_CTL_ADD, csock, &event); 
    			} 
    			else
    			{
    				int csock = events[i].data.fd;
    				char buf[1024];
    				int n = read(csock, buf, sizeof buf);
    
    				if (n <= 0)
    				{
    					printf("연결 종료!!\n");
    					close(csock);
    
    					// 종료된 디스크립터를 등록 해지해야 한다.
    					epoll_ctl(efd, EPOLL_CTL_DEL, csock, 0);
    				} 
    				else
    					write(csock, buf, n);
    			}
    		}
    	}
    
    	close(sock);
    }
    


      ■ 2_epoll
       : Level Triggering vs. Edge Triggering

     
    #include <stdio.h>
    
    #include <unistd.h>
    #include <errno.h>
    #include <fcntl.h>       // fcntl()
    
    #include <arpa/inet.h>
    #include <netinet/in.h>
    #include <sys/types.h>
    #include <sys/socket.h>
    #include <sys/epoll.h>   // epoll api
    
    // epoll 이벤트 처리 방식
    // Level Triggering - Default
    // : 수신 버퍼에 데이터가 있다면, 이벤트가 발생
    // Edge Triggering
    // : 수신 버퍼에 데이터가 없다가, 존재하는 시점에 이벤트가 발생
    // 핵심 : 데이터를 처리하는 디스크립터를 Non-blokcing 모드로 설정해야 한다.
    
    
    // ET 장점
    // 1. 데이터를 처리하기 위해 커널에 진입하는 횟수가 적다.
    // 2. 데이터를 수신하는 로직과 데이터를 처리하는 로직을 분리할 수 있다.
    //   - 서버의 구조화에 유리하다.
    
    
    void setNonBlokcing(int fd)
    {
    	// 아래 코드는 문제점이 있습니다.
    	// fcntl(fd, F_SETFL, O_NONBLOCK);
    
    	// Edge Triggering은 연결에 대한 데이터가 들어오는 시점에서 발생하므로 다시 데이터가 올라올때까지 발생하지 않는다. 
    	// ex) aaaa -> aaaa, aaaab -> aaaa
    	// -> blocking이 생길 수 있다. -> read를 non-blocking 되도록 해야한다.
    	// -> Edge trigger + non-blocking -> fcntl
    
    	int flags = fcntl(fd, F_GETFL, 0);
    	flags |= O_NONBLOCK;
    
    	fcntl(fd, F_SETFL, flags);
    }
    
    
    int main()
    {
    	int sock = socket(PF_INET, SOCK_STREAM, 0);
    
    	struct sockaddr_in saddr = {0, };
    	saddr.sin_family = AF_INET;
    	saddr.sin_port = htons(4000);
    	saddr.sin_addr.s_addr = INADDR_ANY; // 0.0.0.0
    
    	int option = true;
    	socklen_t optlen = sizeof(option);
    	setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, &option, optlen);
    
    	bind(sock, (struct sockaddr*)&saddr, sizeof saddr);
    	listen(sock, 5);
    
    
    	// 1. 초기화
    	const int EPOLL_SIZE = 1024;
    	int efd = epoll_create(EPOLL_SIZE);
    	// 최신 커널에서는 사이즈는 무시됩니다.
    
    	// 2. 이벤트 등록
    	struct epoll_event events[EPOLL_SIZE];
    
    	struct epoll_event event;
    	event.events = EPOLLIN;
    	event.data.fd = sock;
    	epoll_ctl(efd, EPOLL_CTL_ADD, sock, &event); 
    
    	while (1) 
    	{
    		int count = epoll_wait(efd, events, EPOLL_SIZE, -1);
    		printf("epoll_wait()\n");
    
    		for (int i = 0 ; i < count ; ++i)
    		{
    			if (events[i].data.fd == sock)
    			{
    				struct sockaddr_in caddr = {0, };
    				socklen_t clen = sizeof(caddr);
    				int csock = accept(sock, (struct sockaddr*)&caddr, &clen);
    
    				char* cip = inet_ntoa(caddr.sin_addr);
    				printf("Connected from %s\n", cip);
    
    				// Non-Blocking 으로 설정  
    				setNonBlokcing(csock);
    
    				// 새로운 연결을 등록해주어야 한다.
    				event.events = EPOLLIN | EPOLLET;
    				event.data.fd = csock;
    				epoll_ctl(efd, EPOLL_CTL_ADD, csock, &event); 
    			} 
    			else
    			{
    				int csock = events[i].data.fd;
    				// char buf[1024];
    				char buf[4];
    				while (1)
    				{
    					int n = read(csock, buf, sizeof buf);
    					if (n == 0)
    					{
    						printf("연결 종료!!\n");
    						close(csock);
    
    						// 종료된 디스크립터를 등록 해지해야 한다.
    						epoll_ctl(efd, EPOLL_CTL_DEL, csock, 0);
    					} 
    					else if (n == -1)
    					{
    						int err = errno;
    						if (err == EAGAIN)   // 데이터가 존재하지 않는다.
    							break;
    						else {
    							close(csock);
    
    							epoll_ctl(efd, EPOLL_CTL_DEL, csock, 0);
    						}
    
    					}
    					else {
    						printf("read() : %d\n", n);
    						write(csock, buf, n);
    					}
    				}
    			}
    		}
    	}
    
    	close(sock);
    }


    - Proactor Pattern
     : 주로 윈도우의 구현
     : POSIX AIO, Boost.Asio (C++), 
    Adaptive Communication Environment (C++), libbitcoin (C++), RJR (Ruby), win32 IOCP
     : asynchronous, non-blocking I/O operations
     : Reactor pattern의 asynchronous 방식으로 보기도 함
     : blocking event notification interfaces 대신에 completion events
     : proactive initiator는 main application thread로 
    asynchronous I/O 동작들을 초기화하는 역할
     : 동작이 이슈될때 항상 completion handler와 completion dispatcher에 등록된다.
     : 비동기적인 동작의 수행은 asynchronous operation processor의해 관장된다. 이 프로세스는 OS에 의해 구현된다. 
     : I/O 작업이 완료되었을때, completion dispatcher로 notifiy 되고, 다음으로 completion handler는 그 결과를 처리한다.

     




    - 1_blocking_queue
     : mutex는 커널단 컨텍스트 스위칭이므로 오버헤드가 큼 -> atomic operation

     
    #include <unistd.h>
    
    #include <mutex>
    #include <condition_variable> // Event
    #include <deque>
    
    #include <thread>
    #include <iostream>
    using namespace std;
    
    template <typename T>
    class blocking_queue
    {
    	deque<T> d_queue;
    	condition_variable d_condition;
    	mutex d_mutex;
    
    public:
    	void push(const T& value)
    	{
    		{
    			unique_lock<mutex> lock(d_mutex);
    			d_queue.push_front(value);		
    		}
    		d_condition.notify_one();
    	}
    	
    	T pop()
    	{
    		unique_lock<mutex> lock(d_mutex);
    		d_condition.wait(lock, [=] { return !d_queue.empty(); } );
    
    		T rc(std::move(d_queue.back())); // move fast
    		d_queue.pop_back();
    
    		return rc;
    	}
    };
    

    - 2_lockfree_queue
     : boost를 이용한 생산자 소비자 패턴
     
    #include <iostream>
    
    #include <boost/lockfree/queue.hpp>
    // h + cpp = hpp
    
    #include <boost/thread/thread.hpp>
    #include <boost/atomic.hpp>
    
    using namespace std;
    using namespace boost;
    
    atomic_int producer_count(0);
    atomic_int consumer_count(0);
    
    lockfree::queue<int> queue(128);
    
    const int iterations = 100000000;
    const int producer_thread_count = 4;
    const int consumer_thread_count = 4;
    
    void producer(void)
    {
    	for (int i = 0 ; i < iterations ; ++i) {
    		int value = ++producer_count;
    		while (!queue.push(value))
    			;
    	}
    }
    
    atomic<bool> done(false);
    void consumer(void)
    {
    	int value;
    	while (!done)
    	{
    		while (queue.pop(value))
    			++consumer_count;
    	}
    
    	while (queue.pop(value))
    		++consumer_count;
    }
    
    int main()
    {
    	cout << "queue is ";
    	if (!queue.is_lock_free())
    		cout << "not ";
    	cout << "lockfree" << endl;
    
    	thread_group producer_threads, consumer_threads;
    	for (int i = 0 ; i < producer_thread_count ; ++i)
    		producer_threads.create_thread(producer);
    
    	for (int i = 0 ; i < consumer_thread_count ; ++i)
    		consumer_threads.create_thread(consumer);
    
    	producer_threads.join_all();
    	done = true;
    
    	consumer_threads.join_all();
    
    	cout << "produced " << producer_count << endl;
    	cout << "consumed " << consumer_count << endl;
    


    - 4_asio_echo_server
     : block에 대한 개념이 존재하지 않음
     : boost asio에서 핵심은 io_service
     : 각각의 연산 자체가 비동기로 이루어짐

    #include <boost/asio.hpp>
    using namespace boost;
    
    #include <iostream>
    #include <memory>    // shared_ptr<>
    #include <utility>
    using namespace std;
    
    using boost::asio::ip::tcp;
    class Session : public std::enable_shared_from_this<Session>
    {
    public:
    	Session(tcp::socket s) : socket(std::move(s)) {}
    
    	void start() { doRead(); }
    
    	void doRead() {
    		auto self(shared_from_this());  // 참조 계수 증가, self를 참조안하면 객체가 바로 파괴되버림
    		// 1. async_read      : readn
    		// 2. async_read_some : read
    
    		socket.async_read_some(
    				asio::buffer(data, 1024),
    				[this, self](system::error_code ec, size_t length) {
    				
    				if (!ec) 
    					doWrite(length);
    		});
    	}
    
    	void doWrite(size_t length)
    	{
    		auto self(shared_from_this());
    
    		asio::async_write(
    				socket, asio::buffer(data, length),
    				[this, self](system::error_code ec, size_t) {
    					if (!ec)
    						doRead();
    				});
    	}
    
    
    private:
    	tcp::socket socket;
    	char data[1024];
    };
    
    
    class Server
    {
    public:
    	Server(asio::io_service& io, short port)
    		: acceptor(io, tcp::endpoint(tcp::v4(), port)),
    			socket(io) {
    		
    		doAccept();
    	}
    
    	void doAccept()
    	{
    		acceptor.async_accept(socket, [this](system::error_code ec) {
    			// 클라이언트의 접속 요청이 완료된 시점에 호출되는 핸들러	
    			if (!ec) {
    				// socket...
    				make_shared<Session>(std::move(socket))->start();
    			}
    		});
    
    	}
    
    private:
    	tcp::acceptor acceptor;
    	tcp::socket socket;
    	
    };
    
    
    int main()
    {
    	try {
    		asio::io_service io_service;
    		Server s(io_service, 4000);
    		io_service.run();
    
    	} catch (std::exception& e) {
    		cerr << e.what() << endl;
    	}
    }

  • Reactor, Proactor 간단한 도식화
    - Reactor


    - Proactor




참고: http://berb.github.io/diploma-thesis/original/042_serverarch.html, http://ozt88.tistory.com/25

+ Recent posts