졸업 작품/서버

메세지 큐 구현

장형이 2019. 7. 10. 14:32

IO <-> Logic 스레드 간의 통신을 위해서 MPSQ 큐를 사용하고 있었다. (여기 참고)

근데 이 Atomic 연산이 코드로는 빨라보여도 전혀 빠르지 않을 수도 있다는 말을 선배한테 들었다.

보다 더 쉽고 나은 방법에 대해서 알려주셨다. 내용은 아래와 같다.

 

기존에 사용하던 MPSQ 큐

 

위의 큐를 다음과 같이 바꾸는 방법에 대해서 알게 되었다.

 

 

Enqueue/Dequeue

일단 큐를 두개를 만들고, 입력부분에는 Mutex를 사용하여 Enqueue 시키도록 한다.

어짜피 출력이 병목이기 때문에, 입력 부분은 동기화 렉이 있어도 괜찮다.

 

Dequeue쪽은 Mutex없이 그냥 큐 쓰듯이 쓴다. 어짜피 싱글 스레드니까..

 

메시지 큐를 비웠다면

 

만약 Dequeue를 끝내고 메시지큐가 비었다면?

 

 

메시지 큐 교체

 

메시지 큐를 교체해준다.

 

이렇게 구현하면 입력은 Mutex로 인해서 동기화가 일어나 느려질 수는 있지만, 중요한 출력에서는 아무런 제약없이 메시지를 왕창 가져 갈 수 있게 된다!!

 

소스는 간단하다.

 

// MessageQueue.h

#pragma once
#include <queue>
#include <mutex>

typedef std::lock_guard<std::mutex> locker;

template<typename T>
class CMessageQueue {
public:
	typedef std::queue<T>* QueueType;

	CMessageQueue() {
		_messageQueue = new std::queue<T>();
	}
	~CMessageQueue() {
		delete _messageQueue;
	}

	// 교체 큐를 뽑아준다.
	std::queue<T>* CreateQueue() {
		return new std::queue<T>();
	}

	// 큐에 집어 넣는다.
	void Enqueue(const T& data) {
		locker mt(_mutex);
		_messageQueue->push(data);
	}

	// 큐를 교체한다.
	void UpdateQueue(std::queue<T>*& queue) {
		locker mt(_mutex);
		std::queue<T>* currentQueue = _messageQueue;
		_messageQueue = queue;
		queue = currentQueue;
	}

private:
	std::queue<T>* _messageQueue;
	std::mutex _mutex;
};

 

사용한 부분은 다음과 같다.

 

// 헤더쪽 선언부 *********************
	// 메세지 큐
    CMessageQueue<FGameProcessorMessage*>* _messageQueue;
    
    // 현재 사용중인 큐
    CMessageQueue<FGameProcessorMessage*>::QueueType _currentQueue;
    
// 큐를 생성하는 부분 *********************
	// 메세지 큐를 생성한다.
	_messageQueue = new CMessageQueue<FGameProcessorMessage*>();
    // 교체할 큐도 생성해준다.
	_currentQueue = _messageQueue->CreateQueue();
    
    
// 큐에 입력하는 부분 *********************
	// 일반 큐에 넣듯이 넣으면 된다.
	_messageQueue->Enqueue((FGameProcessorMessage*)acceptMessage);
    
// 큐를 출력하는 부분 *********************
	// 큐를 교체한다.
	_messageQueue->UpdateQueue(_currentQueue);
    
	// 큐가 빌때까지
	while (!_currentQueue->empty()) {
		// 맨 앞의 메세지를 사용한다.
		msg = _currentQueue->front();
		_currentQueue->pop();
        
        	...msg 처리...

 

이 방법은 매우 쉽고 효과적인 방법인것 같다!!

한번에 왕창 가져가는 생각을 왜 못했을까 ㅠㅜ