•  Event-Driven Server Architectures
    : alternative to synchronous blocking I/O
    the mapping of a single thread to multiple connections
    new events are queued and the thread executes a so-called event loop--dequeuing events from the queue, processing the event, then taking the next event or waiting for new events to be pushed.

    - thread-based vs. event-driven

    thread-basedevent-driven
    connection/request statethread contextstate machine/continuation
    main I/O modelsynchronous/blockingasynchronous/non-blocking
    activity flowthread-per-connectionevents and associated handlers
    primary scheduling strategypreemptive (OS)cooperative
    scheduling componentscheduler (OS)event loop
    calling semanticsblockingdispatching/awaiting events
    Table 4.2: Main differences between thread-based and event-driven server architectures.


  • Non-blocking I/O Multiplexing Patterns
    event-based I/O multiplexing

    - Reactor Pattern
     : 주로 리눅스의 구현
     : select, epoll, kqueue, Node.js, Ruby EventMachine, Scala Akka’s IO 모듈 등
     : synchronous, non-blocking I/O handling and relies on an event notification interface.
     : event나 socket 같은 리소스들의 집합을 등록해놓고, 각 리소스들을 콜백 또 후킹하는 적절한 event handler가 필요하다.
     : 핵심 구성요소인 synchronous event demultiplexer blocking event notification interface를 사용하여 리소스의 이벤트를 기다린다.
     : synchronous event demultiplexer가 받는 이벤트는 언제든지 dispatcher로 알리고, 다음 event를 기다린다.
     : dispatcher는 관련 event handler 선택하고, callback/hook execution을 triggering함으로써 event를 처리한다.
     : 리엑터 패턴은 event handling 과 multiplexing 분리한다. 이 패턴은 싱글 스레드를 사용하므로 blocking 동작은 전체적은 application의 동작을 지연시킬 수 있다. 그래서 다양한 Reactor Pattern들이 event handler에 대한 thread poll(thread 개수는 CPU 개수*2 + 1)을 사용한다.

    *장점
    : 리엑터 패턴은 리엑터 구현에서 애플리케이션에 관련된 코드가 완벽하게 분리됨. 그래서 애플리케이션 컴포넌트들이 모듈화되고 재사용 가능한 부분들로 분리될 수 있음. 시스템을 멀티스레드로 인한 복잡성에서 보호해 줌.

    *단점
    : 디버깅하기 어려움. 요청 핸들러를 동기적으로 호출하기 때문에 최대 동시성에 제한이 있음. 요청 핸들러 뿐만 아니라 디멀티플렉서에 의해서도 확장성이 제한됨. (OS 스케줄링 이용 불가, event queue에 대한 성능 이슈)




      ■ 1_epoll
       : select, poll의 문제점 해결

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    #include <stdio.h>
     
    #include <unistd.h>
    #include <sys/types.h>
    #include <sys/socket.h>
    #include <sys/epoll.h>   // epoll api
     
    #include <arpa/inet.h>
    #include <netinet/in.h>
     
    // select, poll 문제점
    // 1) 관찰 대상 디스크립터의 정보를 매번 운영체제에 전달해야 한다.
    // 2) 1024 개 이상의 디스크립터를 등록하는 것이 불가능하다.
     
    // epoll
    // - 2.5.44 부터 사용 가능하다.
    // - $ cat /proc/sys/kernel/osrelease
     
    // epoll api 3가지
    // 1) epoll_create : 파일 디스크립터 저장소 생성
    // 2) epoll_ctl    : 디스크립터 등록 & 삭제
    // 3) epoll_wait   : 파일 디스크립터의 변화를 감지하는 함수
     
    int main()
    {
        int sock = socket(PF_INET, SOCK_STREAM, 0);
     
        struct sockaddr_in saddr = {0, };
        saddr.sin_family = AF_INET;
        saddr.sin_port = htons(4000);
        saddr.sin_addr.s_addr = INADDR_ANY; // 0.0.0.0
     
        int option = true;
        socklen_t optlen = sizeof(option);
        setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, &option, optlen);
     
        bind(sock, (struct sockaddr*)&saddr, sizeof saddr);
        listen(sock, 5);
     
     
        // 1. 초기화
        const int EPOLL_SIZE = 1024;
        int efd = epoll_create(EPOLL_SIZE);
        // 최신 커널에서는 사이즈는 무시됩니다.
     
        // 2. 이벤트 등록
        struct epoll_event events[EPOLL_SIZE];
     
        struct epoll_event event;
        event.events = EPOLLIN;
        event.data.fd = sock;
        epoll_ctl(efd, EPOLL_CTL_ADD, sock, &event);
     
        while (1)
        {
            int count = epoll_wait(efd, events, EPOLL_SIZE, -1);
     
            for (int i = 0 ; i < count ; ++i)
            {
                if (events[i].data.fd == sock)
                {
                    struct sockaddr_in caddr = {0, };
                    socklen_t clen = sizeof(caddr);
                    int csock = accept(sock, (struct sockaddr*)&caddr, &clen);
     
                    char* cip = inet_ntoa(caddr.sin_addr);
                    printf("Connected from %s\n", cip);
     
                    // 새로운 연결을 등록해주어야 한다.
                    event.events = EPOLLIN;
                    event.data.fd = csock;
                    epoll_ctl(efd, EPOLL_CTL_ADD, csock, &event);
                }
                else
                {
                    int csock = events[i].data.fd;
                    char buf[1024];
                    int n = read(csock, buf, sizeof buf);
     
                    if (n <= 0)
                    {
                        printf("연결 종료!!\n");
                        close(csock);
     
                        // 종료된 디스크립터를 등록 해지해야 한다.
                        epoll_ctl(efd, EPOLL_CTL_DEL, csock, 0);
                    }
                    else
                        write(csock, buf, n);
                }
            }
        }
     
        close(sock);
    }


      ■ 2_epoll
       : Level Triggering vs. Edge Triggering

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    110
    111
    112
    113
    114
    115
    116
    117
    118
    119
    120
    121
    122
    123
    124
    125
    126
    127
    128
    129
    130
    131
    132
    133
    134
    135
    136
    #include <stdio.h>
     
    #include <unistd.h>
    #include <errno.h>
    #include <fcntl.h>       // fcntl()
     
    #include <arpa/inet.h>
    #include <netinet/in.h>
    #include <sys/types.h>
    #include <sys/socket.h>
    #include <sys/epoll.h>   // epoll api
     
    // epoll 이벤트 처리 방식
    // Level Triggering - Default
    // : 수신 버퍼에 데이터가 있다면, 이벤트가 발생
    // Edge Triggering
    // : 수신 버퍼에 데이터가 없다가, 존재하는 시점에 이벤트가 발생
    // 핵심 : 데이터를 처리하는 디스크립터를 Non-blokcing 모드로 설정해야 한다.
     
     
    // ET 장점
    // 1. 데이터를 처리하기 위해 커널에 진입하는 횟수가 적다.
    // 2. 데이터를 수신하는 로직과 데이터를 처리하는 로직을 분리할 수 있다.
    //   - 서버의 구조화에 유리하다.
     
     
    void setNonBlokcing(int fd)
    {
        // 아래 코드는 문제점이 있습니다.
        // fcntl(fd, F_SETFL, O_NONBLOCK);
     
        // Edge Triggering은 연결에 대한 데이터가 들어오는 시점에서 발생하므로 다시 데이터가 올라올때까지 발생하지 않는다.
        // ex) aaaa -> aaaa, aaaab -> aaaa
        // -> blocking이 생길 수 있다. -> read를 non-blocking 되도록 해야한다.
        // -> Edge trigger + non-blocking -> fcntl
     
        int flags = fcntl(fd, F_GETFL, 0);
        flags |= O_NONBLOCK;
     
        fcntl(fd, F_SETFL, flags);
    }
     
     
    int main()
    {
        int sock = socket(PF_INET, SOCK_STREAM, 0);
     
        struct sockaddr_in saddr = {0, };
        saddr.sin_family = AF_INET;
        saddr.sin_port = htons(4000);
        saddr.sin_addr.s_addr = INADDR_ANY; // 0.0.0.0
     
        int option = true;
        socklen_t optlen = sizeof(option);
        setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, &option, optlen);
     
        bind(sock, (struct sockaddr*)&saddr, sizeof saddr);
        listen(sock, 5);
     
     
        // 1. 초기화
        const int EPOLL_SIZE = 1024;
        int efd = epoll_create(EPOLL_SIZE);
        // 최신 커널에서는 사이즈는 무시됩니다.
     
        // 2. 이벤트 등록
        struct epoll_event events[EPOLL_SIZE];
     
        struct epoll_event event;
        event.events = EPOLLIN;
        event.data.fd = sock;
        epoll_ctl(efd, EPOLL_CTL_ADD, sock, &event);
     
        while (1)
        {
            int count = epoll_wait(efd, events, EPOLL_SIZE, -1);
            printf("epoll_wait()\n");
     
            for (int i = 0 ; i < count ; ++i)
            {
                if (events[i].data.fd == sock)
                {
                    struct sockaddr_in caddr = {0, };
                    socklen_t clen = sizeof(caddr);
                    int csock = accept(sock, (struct sockaddr*)&caddr, &clen);
     
                    char* cip = inet_ntoa(caddr.sin_addr);
                    printf("Connected from %s\n", cip);
     
                    // Non-Blocking 으로 설정 
                    setNonBlokcing(csock);
     
                    // 새로운 연결을 등록해주어야 한다.
                    event.events = EPOLLIN | EPOLLET;
                    event.data.fd = csock;
                    epoll_ctl(efd, EPOLL_CTL_ADD, csock, &event);
                }
                else
                {
                    int csock = events[i].data.fd;
                    // char buf[1024];
                    char buf[4];
                    while (1)
                    {
                        int n = read(csock, buf, sizeof buf);
                        if (n == 0)
                        {
                            printf("연결 종료!!\n");
                            close(csock);
     
                            // 종료된 디스크립터를 등록 해지해야 한다.
                            epoll_ctl(efd, EPOLL_CTL_DEL, csock, 0);
                        }
                        else if (n == -1)
                        {
                            int err = errno;
                            if (err == EAGAIN)   // 데이터가 존재하지 않는다.
                                break;
                            else {
                                close(csock);
     
                                epoll_ctl(efd, EPOLL_CTL_DEL, csock, 0);
                            }
     
                        }
                        else {
                            printf("read() : %d\n", n);
                            write(csock, buf, n);
                        }
                    }
                }
            }
        }
     
        close(sock);
    }


    - Proactor Pattern
     : 주로 윈도우의 구현
     : POSIX AIO, Boost.Asio (C++), 
    Adaptive Communication Environment (C++), libbitcoin (C++), RJR (Ruby), win32 IOCP
     : asynchronous, non-blocking I/O operations
     : Reactor pattern의 asynchronous 방식으로 보기도 함
     : blocking event notification interfaces 대신에 completion events
     : proactive initiator는 main application thread로 
    asynchronous I/O 동작들을 초기화하는 역할
     : 동작이 이슈될때 항상 completion handler와 completion dispatcher에 등록된다.
     : 비동기적인 동작의 수행은 asynchronous operation processor의해 관장된다. 이 프로세스는 OS에 의해 구현된다. 
     : I/O 작업이 완료되었을때, completion dispatcher로 notifiy 되고, 다음으로 completion handler는 그 결과를 처리한다.

     




    - 1_blocking_queue
     : mutex는 커널단 컨텍스트 스위칭이므로 오버헤드가 큼 -> atomic operation

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    #include <unistd.h>
     
    #include <mutex>
    #include <condition_variable> // Event
    #include <deque>
     
    #include <thread>
    #include <iostream>
    using namespace std;
     
    template <typename T>
    class blocking_queue
    {
        deque<T> d_queue;
        condition_variable d_condition;
        mutex d_mutex;
     
    public:
        void push(const T& value)
        {
            {
                unique_lock<mutex> lock(d_mutex);
                d_queue.push_front(value);     
            }
            d_condition.notify_one();
        }
         
        T pop()
        {
            unique_lock<mutex> lock(d_mutex);
            d_condition.wait(lock, [=] { return !d_queue.empty(); } );
     
            T rc(std::move(d_queue.back())); // move fast
            d_queue.pop_back();
     
            return rc;
        }
    };

    - 2_lockfree_queue
     : boost를 이용한 생산자 소비자 패턴
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    #include <iostream>
     
    #include <boost/lockfree/queue.hpp>
    // h + cpp = hpp
     
    #include <boost/thread/thread.hpp>
    #include <boost/atomic.hpp>
     
    using namespace std;
    using namespace boost;
     
    atomic_int producer_count(0);
    atomic_int consumer_count(0);
     
    lockfree::queue<int> queue(128);
     
    const int iterations = 100000000;
    const int producer_thread_count = 4;
    const int consumer_thread_count = 4;
     
    void producer(void)
    {
        for (int i = 0 ; i < iterations ; ++i) {
            int value = ++producer_count;
            while (!queue.push(value))
                ;
        }
    }
     
    atomic<bool> done(false);
    void consumer(void)
    {
        int value;
        while (!done)
        {
            while (queue.pop(value))
                ++consumer_count;
        }
     
        while (queue.pop(value))
            ++consumer_count;
    }
     
    int main()
    {
        cout << "queue is ";
        if (!queue.is_lock_free())
            cout << "not ";
        cout << "lockfree" << endl;
     
        thread_group producer_threads, consumer_threads;
        for (int i = 0 ; i < producer_thread_count ; ++i)
            producer_threads.create_thread(producer);
     
        for (int i = 0 ; i < consumer_thread_count ; ++i)
            consumer_threads.create_thread(consumer);
     
        producer_threads.join_all();
        done = true;
     
        consumer_threads.join_all();
     
        cout << "produced " << producer_count << endl;
        cout << "consumed " << consumer_count << endl;


    - 4_asio_echo_server
     : block에 대한 개념이 존재하지 않음
     : boost asio에서 핵심은 io_service
     : 각각의 연산 자체가 비동기로 이루어짐

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    #include <boost/asio.hpp>
    using namespace boost;
     
    #include <iostream>
    #include <memory>    // shared_ptr<>
    #include <utility>
    using namespace std;
     
    using boost::asio::ip::tcp;
    class Session : public std::enable_shared_from_this<Session>
    {
    public:
        Session(tcp::socket s) : socket(std::move(s)) {}
     
        void start() { doRead(); }
     
        void doRead() {
            auto self(shared_from_this());  // 참조 계수 증가, self를 참조안하면 객체가 바로 파괴되버림
            // 1. async_read      : readn
            // 2. async_read_some : read
     
            socket.async_read_some(
                    asio::buffer(data, 1024),
                    [this, self](system::error_code ec, size_t length) {
                     
                    if (!ec)
                        doWrite(length);
            });
        }
     
        void doWrite(size_t length)
        {
            auto self(shared_from_this());
     
            asio::async_write(
                    socket, asio::buffer(data, length),
                    [this, self](system::error_code ec, size_t) {
                        if (!ec)
                            doRead();
                    });
        }
     
     
    private:
        tcp::socket socket;
        char data[1024];
    };
     
     
    class Server
    {
    public:
        Server(asio::io_service& io, short port)
            : acceptor(io, tcp::endpoint(tcp::v4(), port)),
                socket(io) {
             
            doAccept();
        }
     
        void doAccept()
        {
            acceptor.async_accept(socket, [this](system::error_code ec) {
                // 클라이언트의 접속 요청이 완료된 시점에 호출되는 핸들러  
                if (!ec) {
                    // socket...
                    make_shared<Session>(std::move(socket))->start();
                }
            });
     
        }
     
    private:
        tcp::acceptor acceptor;
        tcp::socket socket;
         
    };
     
     
    int main()
    {
        try {
            asio::io_service io_service;
            Server s(io_service, 4000);
            io_service.run();
     
        } catch (std::exception& e) {
            cerr << e.what() << endl;
        }
    }

  • Reactor, Proactor 간단한 도식화
    - Reactor


    - Proactor




참고: http://berb.github.io/diploma-thesis/original/042_serverarch.html, http://ozt88.tistory.com/25

+ Recent posts