• 이벤트 모델
    - 1_event
    // 현재 프로젝트는 GUI 프로그램이 아니라고 링커에게 알려주는것
    #pragma comment(linker, "/subsystem:console")
    #include <stdio.h>
    #include <Windows.h>
    #include <conio.h>
    
    // 윈도우 "event"개념은 linux 의 "conditional variable"가 거의
    // 유사합니다. 
    
    // 실제로 포팅할때 서로 대응되는 개념입니다. 
    
    
    DWORD __stdcall foo( void* p )
    {
    	HANDLE h = (HANDLE)p;
    
    	// 이벤트이 signal 상태 조사하기
    	DWORD ret = WaitForSingleObject( h, 0);
    
    	if ( ret == WAIT_OBJECT_0)
    		printf("시그널 상태(1) 입니다.\n");
    	else
    		printf("시그널이 아닙니다.\n");
    
    	// 시그널 될때까지 대기하는 방법
    	ret = WaitForSingleObject( h, INFINITE); // 시간(무한)
    
    	if ( ret == WAIT_OBJECT_0)
    		printf("시그널 상태(1) 입니다.\n");
    	else
    		printf("시그널이 아닙니다.\n");
    
    	return 0;
    }
    
    int main()
    {
    	HANDLE hEvent = CreateEvent( 0, 0, FALSE, // 초기 signal을
    								0);			  // 0으로
    
    	CreateThread( 0, 0, foo, (void*)hEvent, 0, 0);
    	getch();
    
    	SetEvent( hEvent); // event의 signal을 1로 변경한다.
    
    	getch();
    }
    


    - 2_event
      
    WSAAsyncSelect
       : non-blocking, 비동기 통지방식 (통지 방식으로 윈도우 메시지를 사용하므로 윈도우 프로시저에서만 사용할 수 있다.)
       : 커널에게 미리 등록만 해두면 유저는 따로 커널에게 확인하여 동기화하지 않더라도 알아서 메시지가 날아온다.
       : 내부적으로 따로 체크하는 것이 아니라 운영체제가 I/O 상황이 될때 인터럽트를 사용하는 방식으로 구현되기 때문에 운영체제 수준에서도 연산량이 많이 줄어든다. 대신 다른 운영체제에서 지원하지 않는 기능이기 때문에, 다른 구동환경에서 같은 프로세스를 사용할 수가 없다는 단점은 있다.

     ■  WSAEventSelect
       이 통지방식은 다소 애매한 구석이 있다. 우선 이벤트 오브젝트를 사용하여 signal을 체크한다는 점이 동기랑 비슷한 점이 있어보인다. 그리고 wait함수로 이벤트가 발생할 때까지 대기한다는점이 blocking같기도 하다. 하지만 select나 epoll처럼 유저가 리소스를 사용하여 체크하는 것이 아니라 Wait함수를 사용하여 이벤트가 발생할 때 활성화 된다는 점이 비동기 방식에 가깝다고 생각한다.
       :
    WSAWaitForMultipleEvent() 함수에서 timeout 옵션이 있기 때문에 select나 epoll 처럼 어떻게 사용하느냐에 따라 blocking 방식으로 사용할 수도 non-blocking 방식으로 사용할 수도 있다.
       : 하지만 다른 점이 있다면, blocking 방식을 사용해도 멀티플렉싱이 가능하다는 점이다. 하나의 Wait에서 여러개의 I/O를 동시에 감지하고 있기 때문에, 쓰레드 하나만 wait를 통해 blocking한 상태에서 대기시키면 멀티플렉싱이 가능하다. 결과적으로는 Blocking 이면서 Non-Blocking이기도 하다.

    #pragma comment(linker,"/subsystem:console")
    
    #define WIN32_LEAN_AND_MEAN  
    #include <stdio.h>
    #include <WinSock2.h>
    #include <Windows.h>
    #include <conio.h>
    #pragma comment(lib, "ws2_32.lib") 
    
    int main()
    {
    	WSADATA w;
    	int ret = WSAStartup( MAKEWORD(2,2), &w);    
    
    	int listen_sock = socket( PF_INET,  
    						SOCK_STREAM, 
    						0);
    
    	SOCKADDR_IN addr; 
    	addr.sin_family = AF_INET;
    	addr.sin_port   = htons(4000);
    	addr.sin_addr.s_addr = INADDR_ANY; 
    	
    	bind( listen_sock, (struct sockaddr*)&addr, sizeof addr);
    
    	listen(listen_sock,  5); 
    	//------------------
    	WSAEVENT ev = WSACreateEvent(); // CreateEvent()의 네트워크
    									// 버전.
    	// 소켓을 비동기 소켓으로 변경한다.
    	WSAEventSelect( listen_sock, ev, FD_ACCEPT);
    	//-----------------------------
    
    	// 모든 소켓 핸들과 이벤트 핸들을 배열에 보관해야 합니다.
    	int sockArr[256] = { listen_sock };
    	WSAEVENT evArr[256] = { ev };
    
    	int cnt = 1;
    
    	while( 1 )
    	{
    		int pos = WSAWaitForMultipleEvents( cnt, evArr,  // event배열이름
    						FALSE, // 하나라도 signal 되면
    						WSA_INFINITE,0);
    
    
    		// 배열의 몇번째 요소인지 조사한다
    		int idx = pos - WSA_WAIT_EVENT_0;
    
    		printf("%d 번째 이벤트가 signal\n", idx);
    
    
    		WSANETWORKEVENTS netEv;
    		WSAEnumNetworkEvents( sockArr[idx], evArr[idx], &netEv);
    
    		
    		if ( netEv.lNetworkEvents & FD_READ )
    		{
    			char s[1024] = {0};
    			int n = recv( sockArr[idx], s, 1024, 0);
    			printf("도착한 data : %s\n", s);
    			strcat( s, " from Server");
    			send( sockArr[idx], s, 1024, 0);
    
    		}
    
    
    		if ( netEv.lNetworkEvents & FD_CLOSE )
    		{
    			closesocket( sockArr[idx]);
    			WSACloseEvent( evArr[idx]);
    
    			// 배열에서 제거 해야 합니다.
    			sockArr[idx] = sockArr[cnt-1];
    			evArr[idx] = evArr[cnt-1];
    
    			--cnt;
    			printf("Client 접속 끊어짐\n");
    			continue;
    		}
    
    
    
    
    		if ( netEv.lNetworkEvents & FD_ACCEPT)
    		{
    			printf("접속요청\n");
    
    
    			SOCKADDR_IN caddr;
    			int sz = sizeof caddr;
    
    			int link_sock = accept( listen_sock, 
    							(SOCKADDR*)&caddr, &sz);
    
    			WSAEVENT ev2 = WSACreateEvent();
    			WSAEventSelect( link_sock, ev2, 
    								FD_READ | FD_CLOSE);
    			
    			sockArr[cnt] = link_sock;
    			evArr[cnt] = ev2;
    			++cnt;
    
    			printf("클라이언트 접속 : %s\n", 
    						inet_ntoa( caddr.sin_addr));
    		}
    	}
    	//------------------------------
    	WSACleanup(); 
    }
    

  • Overlapped IO
    : 중첩 입출력 모델
    : non-blocking, asynchronous I/O (비동기적 완료 통보)
    : 데이터 입출력이 진행되는 동안에도 다른일을 할 수 있다.
    : 원리는 리눅스의 I/O multiplexing, RTS과 비슷하다.

     
    : 리눅스의 I/O multiplexing, RTS와 다른점은 모드 변환이 발생하지 않는다. (user <-> kernel 여러번 데이터 복사 x)


    #pragma comment(linker, "/subsystem:console")
    
    #define WIN32_LEAN_AND_MEAN  
    #include <stdio.h>
    #include <WinSock2.h> 
    #include <Windows.h>  
    
    #pragma comment(lib, "ws2_32.lib") 
    
    int main()
    {
    	WSADATA w;
    	int ret = WSAStartup( MAKEWORD(2,2), &w);    
    	
    	//-------------------------------------------
    	// 1. Overlapped io를 위한 소켓 생성
    	// 표준 C 네트워크 함수 : 소문자()
    	// WSAxxxx() 함수들 : windows socket 2.0 부터 지원되는
    	//					windows만의 개념들..
    	int listen_sock = WSASocket( PF_INET, SOCK_STREAM,
    								0, 0, 0,
    								WSA_FLAG_OVERLAPPED);
    								
    
    
    
    	// 2. 소켓에 주소 지정(bind)
    	SOCKADDR_IN addr; 
    
    	addr.sin_family = AF_INET;
    	addr.sin_port   = htons(4000);
    	addr.sin_addr.s_addr = INADDR_ANY; 
    	
    	bind( listen_sock, (struct sockaddr*)&addr, sizeof addr);
    
    
    
    	listen(listen_sock,  5); 
    	struct sockaddr_in addr2;
    	int sz = sizeof addr2;
    
    
    	int link_sock = accept( listen_sock, 
    						  (struct sockaddr*)&addr2, &sz);
    
    	printf("클라이언트가 접속되었습니다\n");
    
    
    	closesocket(listen_sock); // 대기 소켓은 이제 필요없다.
    
    	// Overlapped로 수신 하는 코드
    	WSAEVENT ev = WSACreateEvent();
    	WSAOVERLAPPED ov = {0};
    	ov.hEvent = ev;
    
    
    	// 수신 버퍼..
    	char s[1024] = {0};
    	WSABUF buf;
    	buf.buf = s;
    	buf.len = 1024;
    
    	DWORD flag = 0;
    	DWORD recvBytes = 0;
    
    	int n = WSARecv( link_sock,
    					 &buf, 1,  // 버퍼와 버퍼 갯수
    					 &recvBytes, // 받을 data 크기
    					 &flag,
    					 &ov, // overlapped 구조체
    					 0);
    
    	if ( n == SOCKET_ERROR ) // -1 
    	{
    		if ( WSAGetLastError() == WSA_IO_PENDING )
    		{
    			printf("data 를 수신하고 있는 중입니다.\n");
    
    			// 비동기의 종료를 대기 해야 합니다.
    			// 비동기 IO의 종료시에는 overlapped구조체 안에 있는
    			// event가 signal 됩니다.
    			WSAWaitForMultipleEvents( 1, &ev, TRUE, 
    								WSA_INFINITE, 0);
    
    			printf("비동기 IO로 수신 완료\n");
    		}
    	}
    	else 
    		printf("data를 동기적으로 수신 : %d\n", n); 
    
    	printf("수신된 data : %s\n", buf.buf);
    
    	closesocket( link_sock);
    	//------------------------------
    	WSACleanup(); 
    }

참고: http://ozt88.tistory.com/22

  • Server
    - 1_server ~ 3_server
     
    #define WIN32_LEAN_AND_MEAN
    #include <stdio.h>
    
    #include <Windows.h>
    
    #include <WinSock2.h>
    
    #pragma comment (lib, "ws2_32.lib")
    
    int main()
    {
    	WSADATA w;
    	WSAStartup(MAKEWORD(2, 2), &w);
    	//-----------------------------------
    
    	// 1. 소켓 생성
    	SOCKET sock = socket(PF_INET, SOCK_STREAM, 0);
    
    	// 2. 소켓에 주소 지정(bind)
    	SOCKADDR_IN addr = { 0, };
    	addr.sin_family = AF_INET;
    	addr.sin_port = htons(4000);
    	addr.sin_addr.s_addr = INADDR_ANY;
    
    	bind(sock, (SOCKADDR*)&addr, sizeof addr);
    
    	// 3. 소켓을 대기 상태로, 5라는 arg는 무시 됨
    	listen(sock, 5);
    
    	// 4. 클라이언트 요청을 수락
    	SOCKADDR_IN caddr = { 0, };
    	int sz = sizeof(caddr);
    
    	SOCKET csock = accept(sock, (SOCKADDR*)&caddr, &sz);
    
    	printf("클라이언트가 접속되었습니다.\n");
    
    
    	char buf[1024];
    	// 핵심 : read() 를 통해 수신하지 않습니다. 리턴값이 가장 중요하다.
    	int ret;
    	while (1) {
    		ret = recv(csock, buf, 1024, 0);
    		if (ret == 0) {
    			printf("연결이 정상적으로 종료되었습니다.\n");
    			break;
    		}
    		else if (ret == -1) {
    			printf("연결이 비정상적으로 종료되었습니다.\n");
    			break;
    		}
    
    		// ...
    		ret = send(csock, buf, ret, 0);
    		if (ret < 0)
    			break;
    	}
    
    	printf("클라이언트가 접속이 해지되었습니다.\n");
    
    	closesocket(sock);
    	closesocket(csock);
    
    	//-----------------------------------
    	WSACleanup();
    }
    

  • Client
    - 1_client
     : 초기 셋팅, 주의 사항

    // 1_client.cpp
    
    // 1. 윈도우즈의 모든 네트워크 관련 함수는
    //		winsock2.h에 있습니다. (헤더만)
    //		기존의 소켓 API를 확장한 형태
    // 2.모든 구현은 sw2_32.dll에 있습니다. 링킹이 필요
    // 3. Windows.h와 같이 사용할 경우 반드시 WinSock2.h를 먼저
    //	  포함해야 합니다.
    // 4. #define WIN32_LEAN_AND_MEAN
    // 5. 환경설정에서 링커 설정
    // 3~5 중 선택적으로 사용
    
    // Windows.h의 헤더에서 자주 사용하지 않는 것을 제외해달라.
    #define WIN32_LEAN_AND_MEAN
    
    #include <WinSock2.h>
    #pragma comment(lib, "ws2_32.lib") // 링킹
    
    #include <Windows.h> // 모든 시스템 함수가 헤더파일에 선언되어 있습니다.
    
    int main()
    {
    	return 0;
    }
    

    - 2_client

    #include <stdio.h>
    #include <WinSock2.h>
    #include <Windows.h> 
    
    #pragma comment(lib, "ws2_32.lib")
    
    // 1. Network 함수를 사용하기 전에 WSAStartup을 통해
    //	라이브러리 초기화를 수행해야 한다.
    
    // 2. WSA : Windows Socket API
    // 3. WSACleanup() 을 통해 자원을 정리해야 한다.
    
    int main()
    {
    	// 찰스 시모니의 헝가리안 표기법 이해 필요, 변수명에 타입에 대한 정보가 나타남
    	// JAVA의 m~은? 안드로이드 표준
    	WSADATA w;
    	int ret = WSAStartup(MAKEWORD(2, 2), &w);
    	// 0x0202 word가 2byte
    
    	if(ret != 0)
    		printf("소켓 라이브러리를 사용할 수 없다.\n");
    
    	// w에 있는 정보는 현재까지 정확하지 않습니다. 사용되지 않습니다.
    	printf("최대 소켓 갯수: %d\n", w.iMaxSockets);
    	printf("소켓 버전: %d.%d\n", w.wHighVersion, w.wVersion);
    
    	// 소켓 라이브러리가 사용하는 자원을 정리
    	WSACleanup();
    }
    

    - 3_client
    #include <stdio.h>
    #include <stdlib.h>
    
    #include <WinSock2.h>
    #include <Windows.h>
    
    #pragma comment(lib, "ws2_32.lib")
    
    #include <locale.h>
    
    int main()
    {
    	setlocale(LC_ALL, "korean"); // 유니코드 셋팅
    	
    	WSADATA w;
    	WSAStartup(MAKEWORD(2, 2), &w);
    	//--------------
    	// 1. 소켓 생성, 리눅스는 fd가 retrun, 윈도우는 SOCKET return
    	SOCKET sock = socket(PF_INET, SOCK_STREAM, 0); // int도 되지만 정확한 타입을 사용!
    
    	// 2. 서버 주소 지정
    	// struct sockaddr_in addr = { 0, };
    	SOCKADDR_IN addr = { 0, };
    	addr.sin_family = AF_INET;
    	addr.sin_port = htons(4000);
    	//addr.sin_port = htons(80);
    	addr.sin_addr.s_addr = inet_addr("127.0.0.1");
    	//addr.sin_addr.s_addr = inet_addr("203.249.22.240");
    
    	// 3. 서버 접속
    	int ret = connect(sock, (SOCKADDR*)&addr, sizeof addr);
    	if (ret == 0)
    		printf("접속 성공\n");
    	else {
    		int err = WSAGetLastError(); // 함수 호출하면 
    		// 1. WSAGetLastError() / 도구 -> 오류 조희 
    		printf("접속 실패 : %d\n", err);
    
    		// 2. WSAGetLastError() 오류 번호를 저장하지 않고 다른 함수를 호출하면 오류 번호가 초기화 되버린다. 
    		WCHAR buf[512];
    		FormatMessage(FORMAT_MESSAGE_FROM_SYSTEM,
    					  0, 
    					  err, 
    					  LANG_SYSTEM_DEFAULT,
    					  buf, 
    					  512, 0);
    		// L은 유니코드라는 표기
    		wprintf(L"에러 발생 : %s", buf);
    
    		// 3. Debug Mode - 조사식
    		// @err
    		// @err, hr
    
    		// 비정상종료 테스트, 자원해지(소멸자 등) 테스트
    		//Sleep(1000 * 10); 
    		//exit(0);
    	}
    
    	//--------------
    
    	closesocket(sock);	// close(sock);
    	
    	WSACleanup();
    }
    


  •  Event-Driven Server Architectures
    : alternative to synchronous blocking I/O
    the mapping of a single thread to multiple connections
    new events are queued and the thread executes a so-called event loop--dequeuing events from the queue, processing the event, then taking the next event or waiting for new events to be pushed.

    - thread-based vs. event-driven

    thread-basedevent-driven
    connection/request statethread contextstate machine/continuation
    main I/O modelsynchronous/blockingasynchronous/non-blocking
    activity flowthread-per-connectionevents and associated handlers
    primary scheduling strategypreemptive (OS)cooperative
    scheduling componentscheduler (OS)event loop
    calling semanticsblockingdispatching/awaiting events
    Table 4.2: Main differences between thread-based and event-driven server architectures.


  • Non-blocking I/O Multiplexing Patterns
    event-based I/O multiplexing

    - Reactor Pattern
     : 주로 리눅스의 구현
     : select, epoll, kqueue, Node.js, Ruby EventMachine, Scala Akka’s IO 모듈 등
     : synchronous, non-blocking I/O handling and relies on an event notification interface.
     : event나 socket 같은 리소스들의 집합을 등록해놓고, 각 리소스들을 콜백 또 후킹하는 적절한 event handler가 필요하다.
     : 핵심 구성요소인 synchronous event demultiplexer blocking event notification interface를 사용하여 리소스의 이벤트를 기다린다.
     : synchronous event demultiplexer가 받는 이벤트는 언제든지 dispatcher로 알리고, 다음 event를 기다린다.
     : dispatcher는 관련 event handler 선택하고, callback/hook execution을 triggering함으로써 event를 처리한다.
     : 리엑터 패턴은 event handling 과 multiplexing 분리한다. 이 패턴은 싱글 스레드를 사용하므로 blocking 동작은 전체적은 application의 동작을 지연시킬 수 있다. 그래서 다양한 Reactor Pattern들이 event handler에 대한 thread poll(thread 개수는 CPU 개수*2 + 1)을 사용한다.

    *장점
    : 리엑터 패턴은 리엑터 구현에서 애플리케이션에 관련된 코드가 완벽하게 분리됨. 그래서 애플리케이션 컴포넌트들이 모듈화되고 재사용 가능한 부분들로 분리될 수 있음. 시스템을 멀티스레드로 인한 복잡성에서 보호해 줌.

    *단점
    : 디버깅하기 어려움. 요청 핸들러를 동기적으로 호출하기 때문에 최대 동시성에 제한이 있음. 요청 핸들러 뿐만 아니라 디멀티플렉서에 의해서도 확장성이 제한됨. (OS 스케줄링 이용 불가, event queue에 대한 성능 이슈)




      ■ 1_epoll
       : select, poll의 문제점 해결

     
    #include <stdio.h>
    
    #include <unistd.h>
    #include <sys/types.h>
    #include <sys/socket.h>
    #include <sys/epoll.h>   // epoll api
    
    #include <arpa/inet.h>
    #include <netinet/in.h>
    
    // select, poll 문제점
    // 1) 관찰 대상 디스크립터의 정보를 매번 운영체제에 전달해야 한다.
    // 2) 1024 개 이상의 디스크립터를 등록하는 것이 불가능하다.
    
    // epoll
    // - 2.5.44 부터 사용 가능하다.
    // - $ cat /proc/sys/kernel/osrelease
    
    // epoll api 3가지
    // 1) epoll_create : 파일 디스크립터 저장소 생성
    // 2) epoll_ctl    : 디스크립터 등록 & 삭제
    // 3) epoll_wait   : 파일 디스크립터의 변화를 감지하는 함수
    
    int main()
    {
    	int sock = socket(PF_INET, SOCK_STREAM, 0);
    
    	struct sockaddr_in saddr = {0, };
    	saddr.sin_family = AF_INET;
    	saddr.sin_port = htons(4000);
    	saddr.sin_addr.s_addr = INADDR_ANY; // 0.0.0.0
    
    	int option = true;
    	socklen_t optlen = sizeof(option);
    	setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, &option, optlen);
    
    	bind(sock, (struct sockaddr*)&saddr, sizeof saddr);
    	listen(sock, 5);
    
    
    	// 1. 초기화
    	const int EPOLL_SIZE = 1024;
    	int efd = epoll_create(EPOLL_SIZE);
    	// 최신 커널에서는 사이즈는 무시됩니다.
    
    	// 2. 이벤트 등록
    	struct epoll_event events[EPOLL_SIZE];
    
    	struct epoll_event event;
    	event.events = EPOLLIN;
    	event.data.fd = sock;
    	epoll_ctl(efd, EPOLL_CTL_ADD, sock, &event); 
    
    	while (1) 
    	{
    		int count = epoll_wait(efd, events, EPOLL_SIZE, -1);
    
    		for (int i = 0 ; i < count ; ++i)
    		{
    			if (events[i].data.fd == sock)
    			{
    				struct sockaddr_in caddr = {0, };
    				socklen_t clen = sizeof(caddr);
    				int csock = accept(sock, (struct sockaddr*)&caddr, &clen);
    
    				char* cip = inet_ntoa(caddr.sin_addr);
    				printf("Connected from %s\n", cip);
    
    				// 새로운 연결을 등록해주어야 한다.
    				event.events = EPOLLIN;
    				event.data.fd = csock;
    				epoll_ctl(efd, EPOLL_CTL_ADD, csock, &event); 
    			} 
    			else
    			{
    				int csock = events[i].data.fd;
    				char buf[1024];
    				int n = read(csock, buf, sizeof buf);
    
    				if (n <= 0)
    				{
    					printf("연결 종료!!\n");
    					close(csock);
    
    					// 종료된 디스크립터를 등록 해지해야 한다.
    					epoll_ctl(efd, EPOLL_CTL_DEL, csock, 0);
    				} 
    				else
    					write(csock, buf, n);
    			}
    		}
    	}
    
    	close(sock);
    }
    


      ■ 2_epoll
       : Level Triggering vs. Edge Triggering

     
    #include <stdio.h>
    
    #include <unistd.h>
    #include <errno.h>
    #include <fcntl.h>       // fcntl()
    
    #include <arpa/inet.h>
    #include <netinet/in.h>
    #include <sys/types.h>
    #include <sys/socket.h>
    #include <sys/epoll.h>   // epoll api
    
    // epoll 이벤트 처리 방식
    // Level Triggering - Default
    // : 수신 버퍼에 데이터가 있다면, 이벤트가 발생
    // Edge Triggering
    // : 수신 버퍼에 데이터가 없다가, 존재하는 시점에 이벤트가 발생
    // 핵심 : 데이터를 처리하는 디스크립터를 Non-blokcing 모드로 설정해야 한다.
    
    
    // ET 장점
    // 1. 데이터를 처리하기 위해 커널에 진입하는 횟수가 적다.
    // 2. 데이터를 수신하는 로직과 데이터를 처리하는 로직을 분리할 수 있다.
    //   - 서버의 구조화에 유리하다.
    
    
    void setNonBlokcing(int fd)
    {
    	// 아래 코드는 문제점이 있습니다.
    	// fcntl(fd, F_SETFL, O_NONBLOCK);
    
    	// Edge Triggering은 연결에 대한 데이터가 들어오는 시점에서 발생하므로 다시 데이터가 올라올때까지 발생하지 않는다. 
    	// ex) aaaa -> aaaa, aaaab -> aaaa
    	// -> blocking이 생길 수 있다. -> read를 non-blocking 되도록 해야한다.
    	// -> Edge trigger + non-blocking -> fcntl
    
    	int flags = fcntl(fd, F_GETFL, 0);
    	flags |= O_NONBLOCK;
    
    	fcntl(fd, F_SETFL, flags);
    }
    
    
    int main()
    {
    	int sock = socket(PF_INET, SOCK_STREAM, 0);
    
    	struct sockaddr_in saddr = {0, };
    	saddr.sin_family = AF_INET;
    	saddr.sin_port = htons(4000);
    	saddr.sin_addr.s_addr = INADDR_ANY; // 0.0.0.0
    
    	int option = true;
    	socklen_t optlen = sizeof(option);
    	setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, &option, optlen);
    
    	bind(sock, (struct sockaddr*)&saddr, sizeof saddr);
    	listen(sock, 5);
    
    
    	// 1. 초기화
    	const int EPOLL_SIZE = 1024;
    	int efd = epoll_create(EPOLL_SIZE);
    	// 최신 커널에서는 사이즈는 무시됩니다.
    
    	// 2. 이벤트 등록
    	struct epoll_event events[EPOLL_SIZE];
    
    	struct epoll_event event;
    	event.events = EPOLLIN;
    	event.data.fd = sock;
    	epoll_ctl(efd, EPOLL_CTL_ADD, sock, &event); 
    
    	while (1) 
    	{
    		int count = epoll_wait(efd, events, EPOLL_SIZE, -1);
    		printf("epoll_wait()\n");
    
    		for (int i = 0 ; i < count ; ++i)
    		{
    			if (events[i].data.fd == sock)
    			{
    				struct sockaddr_in caddr = {0, };
    				socklen_t clen = sizeof(caddr);
    				int csock = accept(sock, (struct sockaddr*)&caddr, &clen);
    
    				char* cip = inet_ntoa(caddr.sin_addr);
    				printf("Connected from %s\n", cip);
    
    				// Non-Blocking 으로 설정  
    				setNonBlokcing(csock);
    
    				// 새로운 연결을 등록해주어야 한다.
    				event.events = EPOLLIN | EPOLLET;
    				event.data.fd = csock;
    				epoll_ctl(efd, EPOLL_CTL_ADD, csock, &event); 
    			} 
    			else
    			{
    				int csock = events[i].data.fd;
    				// char buf[1024];
    				char buf[4];
    				while (1)
    				{
    					int n = read(csock, buf, sizeof buf);
    					if (n == 0)
    					{
    						printf("연결 종료!!\n");
    						close(csock);
    
    						// 종료된 디스크립터를 등록 해지해야 한다.
    						epoll_ctl(efd, EPOLL_CTL_DEL, csock, 0);
    					} 
    					else if (n == -1)
    					{
    						int err = errno;
    						if (err == EAGAIN)   // 데이터가 존재하지 않는다.
    							break;
    						else {
    							close(csock);
    
    							epoll_ctl(efd, EPOLL_CTL_DEL, csock, 0);
    						}
    
    					}
    					else {
    						printf("read() : %d\n", n);
    						write(csock, buf, n);
    					}
    				}
    			}
    		}
    	}
    
    	close(sock);
    }


    - Proactor Pattern
     : 주로 윈도우의 구현
     : POSIX AIO, Boost.Asio (C++), 
    Adaptive Communication Environment (C++), libbitcoin (C++), RJR (Ruby), win32 IOCP
     : asynchronous, non-blocking I/O operations
     : Reactor pattern의 asynchronous 방식으로 보기도 함
     : blocking event notification interfaces 대신에 completion events
     : proactive initiator는 main application thread로 
    asynchronous I/O 동작들을 초기화하는 역할
     : 동작이 이슈될때 항상 completion handler와 completion dispatcher에 등록된다.
     : 비동기적인 동작의 수행은 asynchronous operation processor의해 관장된다. 이 프로세스는 OS에 의해 구현된다. 
     : I/O 작업이 완료되었을때, completion dispatcher로 notifiy 되고, 다음으로 completion handler는 그 결과를 처리한다.

     




    - 1_blocking_queue
     : mutex는 커널단 컨텍스트 스위칭이므로 오버헤드가 큼 -> atomic operation

     
    #include <unistd.h>
    
    #include <mutex>
    #include <condition_variable> // Event
    #include <deque>
    
    #include <thread>
    #include <iostream>
    using namespace std;
    
    template <typename T>
    class blocking_queue
    {
    	deque<T> d_queue;
    	condition_variable d_condition;
    	mutex d_mutex;
    
    public:
    	void push(const T& value)
    	{
    		{
    			unique_lock<mutex> lock(d_mutex);
    			d_queue.push_front(value);		
    		}
    		d_condition.notify_one();
    	}
    	
    	T pop()
    	{
    		unique_lock<mutex> lock(d_mutex);
    		d_condition.wait(lock, [=] { return !d_queue.empty(); } );
    
    		T rc(std::move(d_queue.back())); // move fast
    		d_queue.pop_back();
    
    		return rc;
    	}
    };
    

    - 2_lockfree_queue
     : boost를 이용한 생산자 소비자 패턴
     
    #include <iostream>
    
    #include <boost/lockfree/queue.hpp>
    // h + cpp = hpp
    
    #include <boost/thread/thread.hpp>
    #include <boost/atomic.hpp>
    
    using namespace std;
    using namespace boost;
    
    atomic_int producer_count(0);
    atomic_int consumer_count(0);
    
    lockfree::queue<int> queue(128);
    
    const int iterations = 100000000;
    const int producer_thread_count = 4;
    const int consumer_thread_count = 4;
    
    void producer(void)
    {
    	for (int i = 0 ; i < iterations ; ++i) {
    		int value = ++producer_count;
    		while (!queue.push(value))
    			;
    	}
    }
    
    atomic<bool> done(false);
    void consumer(void)
    {
    	int value;
    	while (!done)
    	{
    		while (queue.pop(value))
    			++consumer_count;
    	}
    
    	while (queue.pop(value))
    		++consumer_count;
    }
    
    int main()
    {
    	cout << "queue is ";
    	if (!queue.is_lock_free())
    		cout << "not ";
    	cout << "lockfree" << endl;
    
    	thread_group producer_threads, consumer_threads;
    	for (int i = 0 ; i < producer_thread_count ; ++i)
    		producer_threads.create_thread(producer);
    
    	for (int i = 0 ; i < consumer_thread_count ; ++i)
    		consumer_threads.create_thread(consumer);
    
    	producer_threads.join_all();
    	done = true;
    
    	consumer_threads.join_all();
    
    	cout << "produced " << producer_count << endl;
    	cout << "consumed " << consumer_count << endl;
    


    - 4_asio_echo_server
     : block에 대한 개념이 존재하지 않음
     : boost asio에서 핵심은 io_service
     : 각각의 연산 자체가 비동기로 이루어짐

    #include <boost/asio.hpp>
    using namespace boost;
    
    #include <iostream>
    #include <memory>    // shared_ptr<>
    #include <utility>
    using namespace std;
    
    using boost::asio::ip::tcp;
    class Session : public std::enable_shared_from_this<Session>
    {
    public:
    	Session(tcp::socket s) : socket(std::move(s)) {}
    
    	void start() { doRead(); }
    
    	void doRead() {
    		auto self(shared_from_this());  // 참조 계수 증가, self를 참조안하면 객체가 바로 파괴되버림
    		// 1. async_read      : readn
    		// 2. async_read_some : read
    
    		socket.async_read_some(
    				asio::buffer(data, 1024),
    				[this, self](system::error_code ec, size_t length) {
    				
    				if (!ec) 
    					doWrite(length);
    		});
    	}
    
    	void doWrite(size_t length)
    	{
    		auto self(shared_from_this());
    
    		asio::async_write(
    				socket, asio::buffer(data, length),
    				[this, self](system::error_code ec, size_t) {
    					if (!ec)
    						doRead();
    				});
    	}
    
    
    private:
    	tcp::socket socket;
    	char data[1024];
    };
    
    
    class Server
    {
    public:
    	Server(asio::io_service& io, short port)
    		: acceptor(io, tcp::endpoint(tcp::v4(), port)),
    			socket(io) {
    		
    		doAccept();
    	}
    
    	void doAccept()
    	{
    		acceptor.async_accept(socket, [this](system::error_code ec) {
    			// 클라이언트의 접속 요청이 완료된 시점에 호출되는 핸들러	
    			if (!ec) {
    				// socket...
    				make_shared<Session>(std::move(socket))->start();
    			}
    		});
    
    	}
    
    private:
    	tcp::acceptor acceptor;
    	tcp::socket socket;
    	
    };
    
    
    int main()
    {
    	try {
    		asio::io_service io_service;
    		Server s(io_service, 4000);
    		io_service.run();
    
    	} catch (std::exception& e) {
    		cerr << e.what() << endl;
    	}
    }

  • Reactor, Proactor 간단한 도식화
    - Reactor


    - Proactor




참고: http://berb.github.io/diploma-thesis/original/042_serverarch.html, http://ozt88.tistory.com/25

+ Recent posts