학생이라 아직 잘 모릅니다.. 틀린 내용이 있으면 알려주시면 감사하겠습니다. ㅠㅜ
이 소스 안되는 것 같습니다. 검증을 충분히 거치고 사용해주세요. X(
더 나은 방법에 대해서는 여기에 기록했습니다.
개요
서버를 리뉴얼하려고 하면서 필요하게 된 자료구조는 다중 입력 싱글 출력이 가능한 큐였다.
그래서 MPSCQueue(Multi Produce Single Consume Queue)가 필요하게 됬는데, 너무 생소한 분야라 라이브러리를 쓸까? 생각했지만 그래도 시간도 있으니 한번 직접 구현해보자! 결정했다. (정확히 소스 분석...)
참고 소스는 여기 (https://github.com/mstump/queues)
분석하기
일단 풀소스는 다음과 같다.
네이밍은 필자 스타일에 맞게 조금 바꾸었다.
#include <atomic>
#include <cassert>
template<typename T>
class CMPSCQueue
{
public:
CMPSCQueue() :
_head(new FNode()),
_tail(_head.load(std::memory_order_relaxed))
{
FNode* front = _head.load(std::memory_order_relaxed);
front->next.store(NULL, std::memory_order_relaxed);
}
~CMPSCQueue()
{
T output;
while (this->Dequeue(output)) {}
FNode* front = _head.load(std::memory_order_relaxed);
delete front;
}
void Enqueue(const T& input)
{
FNode* node = new FNode();
node->node = input;
node->next.store(NULL, std::memory_order_relaxed);
FNode* prev_head = _head.exchange(node, std::memory_order_acq_rel);
prev_head->next.store(node, std::memory_order_release);
}
bool Dequeue(T& output)
{
FNode* tail = _tail.load(std::memory_order_relaxed);
FNode* next = tail->next.load(std::memory_order_acquire);
if (next == NULL) {
return false;
}
output = next->node;
_tail.store(next, std::memory_order_release);
delete tail;
return true;
}
private:
struct FNode
{
T node;
std::atomic<FNode*> next;
};
std::atomic<FNode*> _head;
std::atomic<FNode*> _tail;
};
먼저 std::memory 뭐시기 하는 이상한 애들이 되게 많은데 이 친구들은 std::atomic에서 사용하는 memory_order들이다 자세한 내용은 여기(https://en.cppreference.com/w/cpp/atomic/memory_order)를 보면 되고, 이 소스를 이해하기 위해서 필요한 내용들을 조금 간추려 보았다.
메인 메모리와 캐시 메모리 그리고 메모리 장벽/가시성
메모리의 정보는 메인 메모리에 있지만, 이 메모리의 정보를 읽고 쓰고 하는데는 시간이 많이 걸리므로 레지스터나 캐시 메모리에 저장해서 읽고 쓰다가 메인 메모리에 올리는 방식으로 동작한다.
하지만 여러 코어에서 같은 메모리의 정보를 사용한다면? 재앙신이 들이닥칠 것이다. 그래서 메인 메모리의 값을 사용하도록 강제하는 메모리 장벽(memory fense)이라는 개념이 등장했다. 메모리 장벽에 닿으면 바로 메인 메모리를 사용하는 것이다. 동기화 할때 흔히 사용하는 lock이 이런 장벽을 만들어 주는 역할도 한다. 이런 식으로 메인 메모리를 확보하는것을 메모리 가시성(memory visility)을 확보한다고 표현하기도 한다.
메모리 오더 타입
메모리 오더는 여러 곳에서 동시 메인 메모리 접근 요청이 왔을때 정렬하는 방법을 지정. 쉽게 말하면 값을 읽고 수정할때 우선순위를 정해서 요청하는 것. (내 글은 너무 복잡하니 헷갈리면 포스트 맨 아래의 참고 자료를 보는 게 더 좋음.)
memory_order_relaxed : 딱히 동기화 동작없이 수정하는 것. 순서가 바뀔 수 있음. fetch_add에서 사용하면 좋음. 더하기 작업은 순서가 상관없이 더해지기만 하면 되므로.
memory_order_release : 이전의 명령들이 이후로 재배치되는것을 금지. 수정(store)하기 전에 사용하면 좋음. 이전의 수정을 확정 지을 수 있기 때문.
memory_order_acqire : 이후의 명령들이 이전으로 재배치되는 것을 금지. 읽기(load)하기 전에 사용하면 좋음. 이번에 읽은 내용을 이전의 내용이라고 확정 지을 수 있기 때문.
생산-소비 관계에서 memory_order_release으로 적재(store), memory_order_acquire으로 읽기(load) : 이렇게 두 작업이 실행되었을 때 store가 모두 끝난 뒤에 load가 되게 된다.
memory_order_acq_rel : acqire과 relaese를 합친 명령. 바꾸고 읽는것을 확정 지을 때 사용하면 될 듯.
memory_order_seq_cast : 모든 스레드에서 동일한 연산 순서를 보장시킴. 제일 확실하고 제일 무거운 타입. (아무것도 안 쓰면 기본적으로 이 타입이다.)
필자는 메모리 오더를 이해하는데만 몇 시간이 걸렸다...
이제 진짜 소스를 분석해보자.
CMPSCQueue() :
_head(new FNode()),
_tail(_head.load(std::memory_order_relaxed))
{
FNode* front = _head.load(std::memory_order_relaxed);
front->next.store(NULL, std::memory_order_relaxed);
}
~CMPSCQueue()
{
T output;
while (this->Dequeue(output)) {}
FNode* front = _head.load(std::memory_order_relaxed);
delete front;
}
생성자와 소멸자에서는 크게 볼 것은 없고, 메모리 오더를 relaxed로 넘겨서 만들고 지우도록 했다.
어차피 생성자와 소멸자는 한 곳에서만 수행될 것이고 메모리 관련해서 문제가 생기지 않기 때문인 것 같다.
void Enqueue(const T& input)
{
FNode* node = new FNode();
node->node = input;
node->next.store(NULL, std::memory_order_relaxed);
FNode* prev_head = _head.exchange(node, std::memory_order_acq_rel);
prev_head->next.store(node, std::memory_order_release);
}
삽입 때부터 조금 힘겨워진다.
node를 만들 때는 딱히 어려운 내용이 없지만 다음 문단에서 _head를 node로 바꾸도록 명령하는데 이때 다른 스레드에서 마지막에 바뀐 결과를 가질 수 있도록 acq_rel를 사용한다.
즉 다른 스레드에게 읽기, 쓰기 동기화를 하는 것이다.
그다음에 next를 연결해준다. 쓰기 동기화를 위해서 release를 사용하였다.
bool Dequeue(T& output)
{
FNode* tail = _tail.load(std::memory_order_relaxed);
FNode* next = tail->next.load(std::memory_order_acquire);
if (next == NULL) {
return false;
}
output = next->node;
_tail.store(next, std::memory_order_release);
delete tail;
return true;
}
마지막 삭제.
삭제는 Single Consume Queue이기 때문에 크게 신경 쓸 거 없다. 심지어 삭제에서 쓰이는 tail은 Dequeue에서만 쓰인다!!
tail을 가져오는 부분은 딱히 중요하지 않기 때문에 relaxed로 가져오고, next부분은 head가 바뀌면서 바뀔 수 있으므로 acquire로 가져온다. 다음 노드가 없다면 끝점이므로 실패를 리턴하고, 다음 노드가 있다면 다음 노드로 넘어간 다음 그 노드의 값을 출력해주면 된다.
테스트
테스트 코드는 다음과 같다.
N개의 스레드를 돌면서 숫자를 Enqueue 하고,
Consumer에서 Dequeue 한 숫자를 토대로 개수를 세어서 스레드수만큼 받는 데 성공하면 끝나는 테스트이다.
#include <iostream>
#include <thread>
#include <vector>
#include "MPSCQueue.h"
CMPSCQueue<int> queue;
#define COUNT 100000
#define THREAD_COUNT 100
void Producer() {
for (int i = 0; i < COUNT; ++i) {
queue.Enqueue(i);
}
}
void Consumer() {
int checker[COUNT];
int counter = 0;
for (int i = 0; i < COUNT; ++i) {
checker[i] = 0;
}
while(1) {
int retval = -1;
queue.Dequeue(retval);
if (retval != -1) {
checker[retval] ++;
if (checker[retval] == THREAD_COUNT) {
++counter;
}
if (checker[retval] > THREAD_COUNT) {
printf("%d???\n", checker[retval]);
}
}
if (counter == COUNT) return;
}
}
void main() {
std::thread consumerThreads(Consumer);
std::vector<std::thread> producerThreads;
for (int i = 0; i < THREAD_COUNT; ++i) {
producerThreads.push_back(std::thread(Producer));
}
for (std::thread & th : producerThreads)
{
th.join();
}
consumerThreads.join();
}
잘되는 것을 테스팅한 것도 있지만, 나는 저런 방식으로 스레드를 테스팅할 수 있다는 것을 여기(https://en.cppreference.com/w/cpp/atomic/atomic_flag)서 처음 알았다.
사실 MPSCQueue에 메모리 오더링을 모두 Relaxed로 바꿔도 별 문제가 일어나지 않는다.
MS Vs는 기본적으로 acq_rel을 사용하고, Intel도 기본적으로 acq_rel을 사용하기 때문에 메모리 정렬이 드라마틱하게 일어나지 않는다고 한다...
코드는 되게 짧지만 정말 어려운 내용이었다..
꼭 이 모든 것을 이해하고 짤 수 있는 날이 오겠지... 흑
참고 자료 :
메모리 장벽
(https://en.wikipedia.org/wiki/Memory_barrier)
(http://blog.naver.com/PostView.nhn?blogId=jjoommnn&logNo=130037479493)
메모리 오더
Atomic
'졸업 작품 > 서버' 카테고리의 다른 글
1학기 결산 (0) | 2019.06.29 |
---|---|
쓰레드와 Sleep (0) | 2019.06.29 |
C++ 스핀락 구현 (0) | 2019.06.13 |
졸작서버 AWS 사용기 (0) | 2019.06.08 |
CMake 사용기 (0) | 2019.06.08 |