博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
利用用户级线程提高多线程应用的性能
阅读量:5890 次
发布时间:2019-06-19

本文共 29347 字,大约阅读时间需要 97 分钟。

随着处理器往多核的发展,多线程被越来越多的应用到软件的开发中。但是如果没有正确的使用多线程,反而可能会导致软件性能的下降。

多线程程序中一个影响程序性能的因素就是同步。对于windows系统来说,最快的同步方案就是critical_section,critical_section基本上可以被认为是一个用户态的同步机制(特别是设定了spincount,只有在自旋超过了spincount次之后任然不能获得锁,才会切入核心态并把当前线程阻塞).但即使是这样,如果在临界区中的代码如果处理时间比较长,任然会导致处理器浪费在自旋上。如果我们可以让线程在无法获得锁的时候就切换线程(当然是在用户态,切换核心态线程的代价很大,除了进入核心态的开销,还有因为线程切换而导致缓存失效带来的代价)那么就可以把浪费在自旋上的cpu时间用来做有用的工作了。

 

下面介绍一种利用用户态线程的多线程解决方案,首先,创建跟cpu数量一致的线程,每个线程上将会运行一个用户级线程调度器。

所有的业务处理都交给用户级线程处理,每当用户级线程无法获得锁时,就将自己阻塞并回到调度器中,由调度器选择另一个用户级线程来运行。当一个用户级线程释放锁的时候,会唤醒一个阻塞在这个锁上的用户级线程。当然,因为用户级线程是没有时间片控制的,如果在里面处理耗时的代码,将会导致在同一调度器上运行的其它用户级线程无法运行。

 

下面是代码:

首先是一个lockfree队列,队列是线程安全的,并且无需任何锁

lockfree_queue.h

#ifndef _LOCKFREE_QUEUE_H #define _LOCKFREE_QUEUE_H template 
struct _node {
T val; _node
*_next; }; template
class LockFreeQueue {
public: LockFreeQueue():_head(0){} //在队列头插入一个节点 void push(_node
*newnode) {
while(1) {
_node
*lhead = _head;//本地保存 newnode->_next = lhead; //成功就退出,失败就重做 if(InterlockedCompareExchangePointer((volatile PVOID *)&_head,newnode,lhead) == lhead) break; } } //从队列头弹出一个节点 _node
* pop() { while(1) { _node
*lhead = _head;//本地保存 if(!lhead) return NULL; _node
*ret = _head; if(InterlockedCompareExchangePointer((volatile PVOID *)&_head,_head->_next,lhead) == lhead) { ret->_next = NULL; return ret; } } } private: _node
*_head; }; #endif

用户级线程

uthread.h

#ifndef _UTHREAD_H #define _UTHREAD_H #include 
#include "lockfree_queue.h" //#include "luaWrapper.h" enum {
NONE, WAIT4EVENT = 1, //等待某事件的来临 DEAD, //纤程已死亡 ACTIVED, //可运行的 UNACTIVED, //不可被添加到调度队列中 YIELD, SLEEP, }; enum {
BS_MOV = 0, BS_ATK, BS_OTHER, BS_WAIT4LOCK, BS_END, }; //阻塞结构 class BlockStruct {
public: BlockStruct(uChar type=BS_OTHER):bs_type(type){} //返回true纤程将从block中返回 virtual bool WakeUp() = 0; uChar bs_type; }; typedef int uthread_t; class uthread; class runnable {
public: virtual void main_routine() = 0; }; class uthread; struct st_timeout {
st_timeout(uthread *ut):ut(ut),_timeout(0),index(0){} bool operator < (st_timeout &r) {
return _timeout < r._timeout; } uLong _timeout; uthread *ut; int index;//在超时队列中的下标 private: st_timeout & operator = (const st_timeout &other); st_timeout(const st_timeout &other); }; class Scheduler; class uthread; struct ulstruct {
void *lock_addr; uthread *ut; }; //纤程 class uthread {
public: uthread(Scheduler *sc):m_runnable(0),m_bs(0),uthread_id(-1),m_status(NONE),_st_timeout(this),p_uthreadContext(0),m_next(0),wakeuptick(0),m_scheduler(sc) {
m_unlockevent = (_node
*)_aligned_malloc(sizeof(*m_unlockevent),4); m_locknode = (_node
*)_aligned_malloc(sizeof(*m_locknode),4); m_locknode->val = this; m_unlockevent->val.ut = this; } static void WINAPI fiber_routine(LPVOID pvParam); Scheduler *GetScheduler() {
return m_scheduler; } //有事件到达,尝试唤醒block的纤程 void Signal(); void SetStatus(unsigned char st) {
m_status = st; } unsigned char GetStatus() {
return m_status; } PVOID GetUContext() {
return p_uthreadContext; } void SetUContext(PVOID uct) {
p_uthreadContext = uct; } void SetBs(BlockStruct *bs) {
m_bs = bs; } BlockStruct *GetBs() {
return m_bs; } void SetRunnable(runnable *ra) {
m_runnable = ra; } runnable *GetRunnable() {
return m_runnable; } st_timeout &GetTimeoutSt() {
return _st_timeout; } uthread_t GetUid() {
return uthread_id; } void SetUid(uthread_t uid) {
uthread_id = uid; } uthread *Next() {
return m_next; } void SetNext(uthread *ut) {
m_next = ut; } uLong wakeuptick; _node
* GetUnlockEvent() {
return m_unlockevent; } _node
* GetLockNode() {
return m_locknode; } private: unsigned char m_status; uthread_t uthread_id; PVOID p_uthreadContext; BlockStruct *m_bs; runnable *m_runnable; uthread *m_next; st_timeout _st_timeout; _node
* m_unlockevent; _node
* m_locknode; Scheduler *m_scheduler; }; #endif

uthread.cpp

#include "stdafx.h" #include "uthread.h" #include "fiberApi.h" #include 
#include
#include "ulock.h" void WINAPI uthread::fiber_routine(LPVOID pvParam) {
uthread *_uthread = (uthread*)pvParam; while(1) {
assert(_uthread->m_runnable); std::cout << "Ai Start,threadid :" << _uthread->uthread_id << std::endl; _uthread->m_runnable->main_routine(); std::cout << "Ai Stop" << std::endl; _uthread->m_runnable = 0; //从可运行队列中删除 //Scheduler::m_uthreads[Scheduler::m_curuid]->m_status = UNACTIVED; //SetCurrentUthreadState(UNACTIVED); //Scheduler::ReleaseUthread(Scheduler::m_curuid); ReleaseCurrentUthread(); //Scheduler::_Yield(UNACTIVED); _Yield(UNACTIVED); } //Scheduler::m_uthreads[Scheduler::m_curuid]->m_status = DEAD; //SetCurrentUthreadState(DEAD); /*这里不能直接退出纤程运行函数,否则会导致运行线程的退出, * 正确的做法是把运行权交回给scheduler,由scheduler来删除 * 这个纤程 */ //Scheduler::_Yield(DEAD); _Yield(DEAD); } //等待的事件到达了,将纤程重新插入到可运行队列中 void uthread::Signal() {
if(m_bs->WakeUp()) {
//printf("满足唤醒条件 %d /n",this->GetUid()); //等待的条件满足了,把fiber置为可运行态并添加到运行队列中 //Scheduler::Add2Active(this); Add2Active(this); m_bs = 0; wakeuptick = 0; } }

然后是用户态的锁

uLock.h

#ifndef _ULOCK_H #define _ULOCK_H #pragma pack(push) #pragma pack(4) #include "fiberApi.h" #include "lockfree_queue.h" class Scheduler; //纤程间使用的用户级锁 struct umutex {
friend class Scheduler; public: umutex():flag(0){} void Lock() {
if(InterlockedCompareExchange(&flag,1,0) == 1) { uthread *currentUThread = GetCurrentUThread(); _node
*tmp = currentUThread->GetLockNode(); m_blockthread.push(tmp); //加锁失败,阻塞当前纤程 Wait4Lock(); } } void UnLock() {
if(InterlockedCompareExchange(&flag,0,1) == 0) {
//没有lock return; } //已经解锁,唤醒阻塞在这个锁上的纤程 _node
*tmp = m_blockthread.pop(); if(tmp) {
NotifyUnLock(this,tmp->val); } } private: bool _Lock(uthread *ut) {
bool ret = InterlockedCompareExchange(&flag,1,0) == 0; if(!ret) {
//uthread *currentUThread = GetCurrentUThread(); _node
*tmp = ut->GetLockNode(); m_blockthread.push(tmp); } return ret; } private: volatile long flag;//如果被持有则置1,否则置0 LockFreeQueue
m_blockthread;//阻塞在这个锁上的纤程 }; #pragma pack(pop) #endif

调度器

scheduler.h

#ifndef _SCHEDULER_H #define _SCHEDULER_H #include 
#include "uthread.h" #include
#include
#include
#include "minHeap.h" #include "lockfree_queue.h" #define MAX_FIBER 32 class Scheduler {
friend class uthread; friend void _Yield(uChar); friend void ReleaseUthread(int); friend void ReleaseCurrentUthread(); friend void SetCurrentUthreadState(uChar); friend void Add2Active(uthread*); friend uthread *GetCurrentUThread(); friend uthread_t GetCurrentUThreadId(); public: Scheduler():m_active_head(0),m_active_tail(0),m_count(0),m_curuid(-1),pending_index(0) {} //初始化纤程库 void Init(); void Destroy(); //将一个纤程加入到调度列表中以运行runnable uthread_t FiberStartRun(runnable *param); //选择一个纤程以进行调度 void Schedule(); void SwitchTo(uthread_t uid) {
SwitchToFiber(m_uthreads[uid]->GetUContext()); } void SwitchToBlock(uthread_t uid) {
if(m_uthreads[uid]->GetBs()) SwitchTo(uid); } void _Yield(uChar status = YIELD) {
m_uthreads[m_curuid]->SetStatus(status); SwitchToFiber(m_pUthreadContext); } //将一个纤程添加到可运行队列中 void Add2Active(uthread *ut); //阻塞纤程,直到wc得到满足 void Block(BlockStruct *bs,uLong ms); uthread_t GetFreeUthread() {
if(!m_uthreadpool.empty()) {
uthread_t ret = m_uthreadpool.front(); m_uthreadpool.pop_front(); return ret; } return -1; } void ReleaseUthread(uthread_t uid) {
if(uid < MAX_FIBER) {
m_uthreads[uid]->SetStatus(UNACTIVED); m_uthreadpool.push_back(uid); } } //尝试唤醒uid void TryWakeup(uthread_t uid) {
if(m_uthreads[uid]->GetBs()) m_uthreads[uid]->Signal(); } //强制唤醒纤程 void ForceWakeup(uthread_t uid) {
if(m_uthreads[uid]->GetStatus() != ACTIVED) {
//printf("强制唤醒/n"); Add2Active(m_uthreads[uid]); } } //强制唤醒阻塞在type条件上的纤程 void ForceWakeup(uthread_t uid,uChar type) {
if(m_uthreads[uid]->GetStatus() != ACTIVED && m_uthreads[uid]->GetBs()->bs_type == type) {
//printf("强制唤醒/n"); Add2Active(m_uthreads[uid]); } } //清空activelist,和pendingadd void ClearActiveList(); void ClearTimeOut() {
m_timeoutlist.Clear(); } void Sleep(uLong ms); void NotifyUnlock(_node
*nn) {
m_unlockevent.push(nn); } void Wait4Lock(); private: uthread *m_active_head; uthread *m_active_tail; //也可以不使用m_pendingAdd,根据测试结果决定 uthread_t m_pendingAdd[MAX_FIBER]; unsigned int pending_index; minheap
m_timeoutlist; PVOID m_pUthreadContext;//调度器所在纤程的上下文 uthread *m_uthreads[MAX_FIBER]; LockFreeQueue
m_unlockevent; int m_count; int m_curuid; //当前正在运行的纤程的uid,==-1表示在scheduler中运行 std::list
m_uthreadpool;//fiber池 //std::map
> m_wait4lock; //std::list
m_wait4lock; static const int reservesize = 65536; static const int commitsize = 8192; }; #endif

scheduler.cpp

#include "stdafx.h" //#include "Scheduler.h" #include 
#include
#include "fiberApi.h" #include "uLock.h" //extern umutex *g_lock; uthread_t Scheduler::FiberStartRun(runnable *param) {
uthread_t uid = GetFreeUthread(); if(uid != -1) {
m_uthreads[uid]->SetRunnable(param); Add2Active(m_uthreads[uid]); } return uid; } void Scheduler::Schedule() {
{
//看看是否有可以获取锁的纤程 _node
*tmp = NULL; while(tmp = m_unlockevent.pop()) {
umutex *um = (umutex*)tmp->val.lock_addr; uthread *ut = tmp->val.ut; if(um->_Lock(ut)) {
//加锁成功,将纤程从等待队列中删除并投入到可运行队列中 Add2Active(ut); } //std::map
>::iterator it = m_wait4lock.find(tmp->val); //if(it != m_wait4lock.end()) //{
//尝试加锁 /*if(!it->second.empty()) {
umutex *um = (umutex*)it->first; uthread *ut = it->second.front(); if(um->_Lock(ut)) {
//加锁成功,将纤程从等待队列中删除并投入到可运行队列中 it->second.pop_front(); Add2Active(ut); } }*/ //} //else //{
//在Wait4Lock调用完成前,其它线程的解锁可能已经调用过NotifyUnLock了, //所以这里把解锁消息重新放回队列中,再次尝试 // m_unlockevent.push(tmp); //} } } //将所有等待添加到m_activeList中的纤程都添加进去 {
for(unsigned int i = 0; i < pending_index; ++i) {
uthread *ut = m_uthreads[m_pendingAdd[i]]; ut->SetNext(0); if(m_active_tail) {
m_active_tail->SetNext(ut); m_active_tail = ut; } else {
m_active_head = m_active_tail = ut; } } pending_index = 0; } uthread *cur = m_active_head; uthread *pre = NULL; while(cur) {
m_curuid = cur->GetUid(); SwitchToFiber(cur->GetUContext()); m_curuid = -1; unsigned char status = cur->GetStatus(); //当纤程处于以下状态时需要从可运行队列中移除 if(status == DEAD || status == SLEEP || status == WAIT4EVENT || status == UNACTIVED || status == YIELD) {
//删除首元素 if(cur == m_active_head) {
//同时也是尾元素 if(cur == m_active_tail) m_active_head = m_active_tail = NULL; else m_active_head = cur->Next(); } else if(cur == m_active_tail) {
pre->SetNext(NULL); m_active_tail = pre; } else pre->SetNext(cur->Next()); uthread *tmp = cur; cur = cur->Next(); tmp->SetNext(0); //如果仅仅是让出处理器,需要重新投入到可运行队列中 if(status == YIELD) Add2Active(tmp); } else {
pre = cur; cur = cur->Next(); } } //看看有没有timeout的纤程 {
uLong now = GetTickCount(); while(m_timeoutlist.Min() !=0 && m_timeoutlist.Min() <= now) {
st_timeout *timeout = m_timeoutlist.PopMin(); if(timeout->ut->GetStatus() == WAIT4EVENT || timeout->ut->GetStatus() == SLEEP) {
timeout->ut->wakeuptick = timeout->_timeout; Add2Active(timeout->ut); } } } } void Scheduler::Destroy() {
for(int i = 0; i < MAX_FIBER; ++i) {
if(m_uthreads[i]) {
DeleteFiber(m_uthreads[i]->GetUContext()); delete m_uthreads[i]; } } ConvertFiberToThread(); } void Scheduler::Block(BlockStruct *bs,uLong ms) {
if(ms > 0) {
st_timeout &_st_timeout = m_uthreads[m_curuid]->GetTimeoutSt(); _st_timeout._timeout = GetTickCount() + ms;//time(NULL) + timeout; if(!_st_timeout.index) {
m_timeoutlist.Insert(&_st_timeout); } else {
m_timeoutlist.Change(&_st_timeout); } } m_uthreads[m_curuid]->SetBs(bs); m_uthreads[m_curuid]->SetStatus(WAIT4EVENT); SwitchToFiber(m_pUthreadContext); m_uthreads[m_curuid]->SetBs(0); } void Scheduler::Init() {
m_pUthreadContext = ConvertThreadToFiber(NULL); //创建fiber池 for(int i = 0 ; i < MAX_FIBER; ++i) { uthread *nthread = new uthread(this); PVOID uthreadcontext = CreateFiberEx(commitsize,reservesize,0,uthread::fiber_routine,nthread); assert(uthreadcontext); nthread->SetUContext(uthreadcontext); m_uthreads[i] = nthread; nthread->SetUid(i); m_uthreadpool.push_back(i); } } //将一个纤程添加到可运行队列中 void Scheduler::Add2Active(uthread *ut) {
//如果已经在active中了则不能再次添加 if(ut->GetStatus() != ACTIVED) {
ut->SetStatus(ACTIVED); m_pendingAdd[pending_index++] = ut->GetUid(); } } void Scheduler::ClearActiveList() {
pending_index = 0; uthread *cur = m_active_head; while(cur) {
uthread *next = cur->Next(); cur->SetNext(0); cur = next; } m_active_head = m_active_tail = NULL; } void Scheduler::Sleep(uLong ms) {
if(ms > 0) {
st_timeout &_st_timeout = m_uthreads[m_curuid]->GetTimeoutSt(); _st_timeout._timeout = GetTickCount() + ms;//time(NULL) + seconds; if(!_st_timeout.index) {
m_timeoutlist.Insert(&_st_timeout); } else {
m_timeoutlist.Change(&_st_timeout); } m_uthreads[m_curuid]->SetStatus(SLEEP); } SwitchToFiber(m_pUthreadContext); } //纤程在等待lock_addr锁,需要将纤程移出可运行队列,并记等待信息 void Scheduler::Wait4Lock() {
/*std::map
>::iterator it = m_wait4lock.find(lock_addr); uthread *current_uthread = m_uthreads[m_curuid]; if(it == m_wait4lock.end()) m_wait4lock.insert(std::make_pair(lock_addr,std::list
(1,current_uthread))); else it->second.push_back(current_uthread); */ uthread *current_uthread = m_uthreads[m_curuid]; //m_wait4lock.push_back(current_uthread); current_uthread->SetStatus(WAIT4EVENT); //切换回调度器 SwitchToFiber(m_pUthreadContext); }

然后是一些API

fiberApi.h

#ifndef _FIBERAPI_H #define _FIBERAPI_H #include "Scheduler.h" #include  //与每个线程相关的纤程调度器 //extern std::map
g_tlssc; extern Scheduler* g_tlssc[1019]; void _Yield(uChar); void ReleaseUthread(int); void ReleaseCurrentUthread();//释放当前的纤程 void SetCurrentUthreadState(uChar);//设置当前纤程的状态 void Add2Active(uthread*); uthread *GetCurrentUThread(); uthread_t GetCurrentUThreadId(); void Wait4Lock(); void NotifyUnLock(void*,uthread*); #endif

fiberApi.cpp

#include "stdafx.h" #include "fiberApi.h" //std::map
g_tlssc; Scheduler* g_tlssc[1019]; void _Yield(uChar state) {
DWORD currenttrheadid = GetCurrentThreadId(); g_tlssc[currenttrheadid%512]->_Yield(state); } void ReleaseUthread(int uthreadid) {
DWORD currenttrheadid = GetCurrentThreadId(); g_tlssc[currenttrheadid%512]->ReleaseUthread(uthreadid); } void ReleaseCurrentUthread() {
DWORD currenttrheadid = GetCurrentThreadId(); Scheduler *sc = g_tlssc[currenttrheadid%512]; sc->ReleaseUthread(sc->m_curuid); } void SetCurrentUthreadState(uChar state) {
DWORD currenttrheadid = GetCurrentThreadId(); Scheduler *sc = g_tlssc[currenttrheadid%512]; sc->m_uthreads[sc->m_curuid]->SetStatus(state); } void Add2Active(uthread *ut) {
DWORD currenttrheadid = GetCurrentThreadId(); g_tlssc[currenttrheadid%512]->Add2Active(ut); } uthread *GetCurrentUThread() {
DWORD currenttrheadid = GetCurrentThreadId(); Scheduler *sc = g_tlssc[currenttrheadid%512]; return sc->m_uthreads[sc->m_curuid]; } uthread_t GetCurrentUThreadId() {
DWORD currenttrheadid = GetCurrentThreadId(); Scheduler *sc = g_tlssc[currenttrheadid%512]; return sc->m_curuid; } void Wait4Lock() {
DWORD currenttrheadid = GetCurrentThreadId(); g_tlssc[currenttrheadid%512]->Wait4Lock(); } void NotifyUnLock(void *lock_addr,uthread *ut) {
_node
*nn = ut->GetUnlockEvent(); nn->val.lock_addr = lock_addr; ut->GetScheduler()->NotifyUnlock(nn); //g_tlssc[threadid]->NotifyUnlock(lock_addr); //std::map
::iterator it = g_tlssc.begin(); //std::map
::iterator end = g_tlssc.end(); //for(; it != end; ++it) // it->second.NotifyUnlock(lock_addr); }

经过进一步测试,在ulock的lock和unlock中使用的无锁队列m_blockthread可能因为多线程操作导致解锁通告丢失。

因此,m_blockthread需要改为普通队列,并且在操作前暂时用自旋锁锁定(暂时使用,希望可以找到更好的方法)。

大致修改如下:

void Lock() {
if(InterlockedCompareExchange(&flag,1,0) == 1) {
uthread *currentUThread = GetCurrentUThread(); _node
tmp = currentUThread->GetLockNode(); while(InterlockedCompareExchange(&spinlock,1,0) == 1); push(tmp); InterlockedCompareExchange(&spinlock,0,1); Wait4Lock(); } } void UnLock() {
if(InterlockedCompareExchange(&flag,0,1) == 0) return; while(InterlockedCompareExchange(&spinlock,1,0) == 1); _node
*tmp = pop(); InterlockedCompareExchange(&spinlock,0,1); if(tmp) NotifyUnLock(this,tmp->val); }

其次,还有一个问题需要解决,就是各纤程获得锁的次数不平均,具体例子如下:在双核机器上启动两个线程,线程上各运行一个纤程,对testlist进行写入的时候

会发现,大部分的写入是由其中一个纤程完成的,而另外一个纤程则很少能获得写入的机会。

 

下面是修改后的uLock.h,解决了纤程获得锁不平均的问题,只要创建的调度线程不超过cpu的数量,基本保证了各纤程有均等的机会获得锁。

#ifndef _ULOCK_H #define _ULOCK_H #pragma pack(push) #pragma pack(4) #include "fiberApi.h" //#include "lockfree_queue.h" class Scheduler; //纤程间使用的用户级锁 struct umutex {
friend class Scheduler; public: umutex():flag(0),spinlock(0),m_head(0),m_tail(0) {
} void Lock() {
if(InterlockedCompareExchange(&flag,1,0) == 1) { uthread *currentUThread = GetCurrentUThread(); _node
*tmp = currentUThread->GetLockNode(); while(InterlockedCompareExchange(&spinlock,1,0) == 1); //再次尝试加锁 if(InterlockedCompareExchange(&flag,1,0) == 0) {
InterlockedCompareExchange(&spinlock,0,1); return; } push(tmp); InterlockedCompareExchange(&spinlock,0,1); //加锁失败,阻塞当前纤程 Wait4Lock(); } } void UnLock() {
if(InterlockedCompareExchange(&flag,0,1) == 0) {
//没有lock return; } //已经解锁,挑选一个纤程,并将它唤醒 while(InterlockedCompareExchange(&spinlock,1,0) == 1); _node
*tmp = pop(); InterlockedCompareExchange(&spinlock,0,1); if(tmp) {
NotifyUnLock(this,tmp->val); } } private: bool _Lock(uthread *ut) {
bool ret = InterlockedCompareExchange(&flag,1,0) == 0; if(!ret) {
uthread *currentUThread = GetCurrentUThread(); _node
*tmp = ut->GetLockNode(); while(InterlockedCompareExchange(&spinlock,1,0) == 1); //再次尝试加锁 if(InterlockedCompareExchange(&flag,1,0) == 0) {
InterlockedCompareExchange(&spinlock,0,1); return true; } push(tmp); InterlockedCompareExchange(&spinlock,0,1); } return ret; } void push(_node
*blockut) {
blockut->_next = NULL; if(NULL == m_tail) {
m_head = m_tail = blockut; } else {
m_tail->_next = blockut; m_tail = blockut; } } _node
*pop() {
if(NULL == m_head) return NULL; else {
_node
*ret = m_head; m_head = m_head->_next; if(m_head == NULL) m_tail = m_head; return ret; } } private: volatile long flag;//如果被持有则置1,否则置0 volatile long spinlock;//自旋锁,保护m_blockthread; //队列,记录阻塞在这个锁上的纤程 _node
*m_head; _node
*m_tail; }; #pragma pack(pop) #endif

测试代码:

// fiberFramework.cpp : 定义控制台应用程序的入口点。 // #include "stdafx.h" #include "CThread.h" #include "fiberApi.h" #include "uLock.h" #include "CLock.h" #define TESTSIZE 1000000 int g_testlist[TESTSIZE]; int g_testlistcs[TESTSIZE]; int g_testmutex[TESTSIZE]; umutex *g_lock; zMutex *g_lockmutex; zLightMutex *g_lockcs; /*std::list
g_testlist2; std::list
g_testlistcs2; std::list
g_testmutex2; umutex *g_lock2; zMutex *g_lockmutex2; zLightMutex *g_lockcs2; */ static volatile bool finish = false; static volatile long count = 0; zThreadGroup g_threadgroup; DWORD starttime = 0; DWORD endtime = 0; class uworker : public runnable {
public: void main_routine() {
while(1) {
g_lock->Lock(); if(count==0) {
starttime = GetTickCount(); } if(count == TESTSIZE) {
endtime = GetTickCount(); finish = true; g_lock->UnLock(); return; } else {
g_testlist[count] = GetCurrentThreadId()+GetCurrentUThreadId(); //InterlockedIncrement(&count); } ++count; g_lock->UnLock(); _Yield(YIELD); volatile int c = 0; for(volatile int cc = 0; cc < 100; ++cc) c++; } } }; class CWorkerThread : public zThread,private Noncopyable {
public: CWorkerThread(const std::string &name = std::string("zThread"),const bool joinable = true) :zThread(name,joinable){} ~CWorkerThread(){} void run() {
Scheduler *sc = new Scheduler; sc->Init(); //g_tlssc.insert(std::make_pair(GetCurrentThreadId(),sc)); if(g_tlssc[GetCurrentThreadId()%TLSSIZE] != NULL) {
printf("error here/n"); getchar(); exit(0); } g_tlssc[GetCurrentThreadId()%TLSSIZE] = sc; {
uworker uw1; uworker uw2; uworker uw3; uworker uw4; sc->FiberStartRun(&uw1); sc->FiberStartRun(&uw2); sc->FiberStartRun(&uw3); sc->FiberStartRun(&uw4); } /*{
uworker uw1; uworker uw2; uworker uw3; uworker uw4; sc->FiberStartRun(&uw1); sc->FiberStartRun(&uw2); sc->FiberStartRun(&uw3); sc->FiberStartRun(&uw4); }*/ while(!finish) {
sc->Schedule(); } } }; class CWorkerThreadCs : public zThread,private Noncopyable {
public: CWorkerThreadCs(const std::string &name = std::string("zThread"),const bool joinable = true) :zThread(name,joinable){} ~CWorkerThreadCs(){} void run() {
while(1) {
g_lockcs->Lock(); if(count == 0) {
starttime = GetTickCount();; } if(count == TESTSIZE) {
endtime = GetTickCount(); g_lockcs->UnLock(); return; } else {
g_testlistcs[count] = GetCurrentThreadId(); //InterlockedIncrement(&count); } ++count; g_lockcs->UnLock(); volatile int c = 0; for(volatile int cc = 0; cc < 100; ++cc) c++; } } }; class CWorkerThreadMutex : public zThread,private Noncopyable {
public: CWorkerThreadMutex(const std::string &name = std::string("zThread"),const bool joinable = true) :zThread(name,joinable){} ~CWorkerThreadMutex(){} void run() {
while(1) {
g_lockmutex->Lock(); if(count == 0) {
starttime = GetTickCount();; } if(count == TESTSIZE) {
endtime = GetTickCount(); g_lockmutex->UnLock(); //printf("finish/n"); return; } else {
g_testmutex[count] = GetCurrentThreadId(); //InterlockedIncrement(&count); //printf("uthread:%d/n",GetCurrentThreadId()); } ++count; g_lockmutex->UnLock(); volatile int c = 0; for(volatile int cc = 0; cc < 100; ++cc) c++; } } }; struct TestCallback : public zThreadGroup::Callback {
void exec(zThread *e) {
e->start(); } ~TestCallback(){} }; void testfiber(int n) {
void *buf = _aligned_malloc(sizeof(*g_lock),4); g_lock = new (buf)umutex; for(int i = 0; i < n; ++i) {
CWorkerThread *cw1 = new CWorkerThread; g_threadgroup.add(cw1); } TestCallback CallBack; g_threadgroup.execAll(CallBack); g_threadgroup.joinAll(); printf("test fiber/n"); printf("count %d/n",count); printf("time %d/n",endtime - starttime); std::map
stat; for(int i = 0; i < TESTSIZE; ++i) {
std::map
::iterator it = stat.find(g_testlist[i]); if(it == stat.end()) {
stat.insert(std::make_pair(g_testlist[i],1)); } else {
stat[g_testlist[i]]++; } } printf("stat size = %d/n",stat.size()); for(std::map
::iterator it = stat.begin(); it != stat.end(); ++it) { printf("id=%d,count=%d/n",it->first,it->second); } } void testcs(int n) { g_lockcs = new zLightMutex; for(int i = 0; i < n; ++i) { CWorkerThreadCs *cw1 = new CWorkerThreadCs; g_threadgroup.add(cw1); } TestCallback CallBack; g_threadgroup.execAll(CallBack); //while(!_kbhit())//等待服务器终止 //{ // Sleep(10); //} g_threadgroup.joinAll(); printf("test cs/n"); printf("count %d/n",count); printf("time %d/n",endtime - starttime); std::map
stat; for(int i = 0; i < TESTSIZE; ++i) { std::map
::iterator it = stat.find(g_testlistcs[i]); if(it == stat.end()) { stat.insert(std::make_pair(g_testlistcs[i],1)); } else { stat[g_testlistcs[i]]++; } } printf("stat size = %d/n",stat.size()); for(std::map
::iterator it = stat.begin(); it != stat.end(); ++it) { printf("id=%d,count=%d/n",it->first,it->second); } } void testmutex(int n) { g_lockmutex = new zMutex; for(int i = 0; i < n; ++i) { CWorkerThreadMutex *cw1 = new CWorkerThreadMutex; g_threadgroup.add(cw1); } TestCallback CallBack; g_threadgroup.execAll(CallBack); g_threadgroup.joinAll(); printf("test mutex/n"); printf("count %d/n",count); printf("time %d/n",endtime - starttime); std::map
stat; for(int i = 0; i < TESTSIZE; ++i) { std::map
::iterator it = stat.find(g_testmutex[i]); if(it == stat.end()) { stat.insert(std::make_pair(g_testmutex[i],1)); } else { stat[g_testmutex[i]]++; } } printf("stat size = %d/n",stat.size()); for(std::map
::iterator it = stat.begin(); it != stat.end(); ++it) { printf("id=%d,count=%d/n",it->first,it->second); } } int _tmain(int argc, _TCHAR* argv[]) { int n = _ttol(argv[1]); memset(g_tlssc,0,sizeof(g_tlssc)); count = 0; testfiber(n/4); count = 0; testcs(n); count = 0; testmutex(n); /*LockFreeQueue
q; for(int i = 0; i < 5; ++i) { _node
*pNode = new _node
; pNode->val = i; q.push(pNode); } //q.print(); for(int i = 0; i < 5; ++i) { _node
*pNode = q.pop(); printf("%d/n",pNode->val); } _node
*pNode = q.pop(); */ getchar(); return 0; }

 

 

转载于:https://www.cnblogs.com/sniperHW/archive/2012/04/02/2429634.html

你可能感兴趣的文章
Python3求m以内的素数、求m个数中最小的n个数
查看>>
GAN的原理入门
查看>>
You must reset your password using ALTER USER statement before executing this statement.
查看>>
gitbook简单安装和使用
查看>>
soft-nms
查看>>
[Jest] Use property matchers in snapshot tests with Jest
查看>>
4种用于构建嵌入式linux系统的工具_转
查看>>
033 Url中特殊字符的处理
查看>>
谈谈MySQL的do语句
查看>>
微服务SpringCloud容器化案例
查看>>
Python 学习书籍推荐
查看>>
Jmeter----属性和变量
查看>>
MySQL的SQL预处理(Prepared)
查看>>
webpack8--删除dist目录,压缩分离后的CSS
查看>>
微信小程序开发:http请求
查看>>
【netcore基础】.NET Core使用EPPlus实现MVC API里的Excel导出功能 配置中文表头
查看>>
对C++ templates类模板的几点补充(Traits类模板特化)
查看>>
VC++ .net 2005运行库解析
查看>>
转自CSDN:写给25岁以上单身男性的100条忠告
查看>>
经纬度坐标系与UTM MGRS坐标系之间的转换 c# 版本
查看>>