随着处理器往多核的发展,多线程被越来越多的应用到软件的开发中。但是如果没有正确的使用多线程,反而可能会导致软件性能的下降。
多线程程序中一个影响程序性能的因素就是同步。对于windows系统来说,最快的同步方案就是critical_section,critical_section基本上可以被认为是一个用户态的同步机制(特别是设定了spincount,只有在自旋超过了spincount次之后任然不能获得锁,才会切入核心态并把当前线程阻塞).但即使是这样,如果在临界区中的代码如果处理时间比较长,任然会导致处理器浪费在自旋上。如果我们可以让线程在无法获得锁的时候就切换线程(当然是在用户态,切换核心态线程的代价很大,除了进入核心态的开销,还有因为线程切换而导致缓存失效带来的代价)那么就可以把浪费在自旋上的cpu时间用来做有用的工作了。
下面介绍一种利用用户态线程的多线程解决方案,首先,创建跟cpu数量一致的线程,每个线程上将会运行一个用户级线程调度器。
所有的业务处理都交给用户级线程处理,每当用户级线程无法获得锁时,就将自己阻塞并回到调度器中,由调度器选择另一个用户级线程来运行。当一个用户级线程释放锁的时候,会唤醒一个阻塞在这个锁上的用户级线程。当然,因为用户级线程是没有时间片控制的,如果在里面处理耗时的代码,将会导致在同一调度器上运行的其它用户级线程无法运行。
下面是代码:
首先是一个lockfree队列,队列是线程安全的,并且无需任何锁
lockfree_queue.h
#ifndef _LOCKFREE_QUEUE_H #define _LOCKFREE_QUEUE_H templatestruct _node { T val; _node *_next; }; template class LockFreeQueue { public: LockFreeQueue():_head(0){} //在队列头插入一个节点 void push(_node *newnode) { while(1) { _node *lhead = _head;//本地保存 newnode->_next = lhead; //成功就退出,失败就重做 if(InterlockedCompareExchangePointer((volatile PVOID *)&_head,newnode,lhead) == lhead) break; } } //从队列头弹出一个节点 _node * pop() { while(1) { _node *lhead = _head;//本地保存 if(!lhead) return NULL; _node *ret = _head; if(InterlockedCompareExchangePointer((volatile PVOID *)&_head,_head->_next,lhead) == lhead) { ret->_next = NULL; return ret; } } } private: _node *_head; }; #endif
用户级线程
uthread.h
#ifndef _UTHREAD_H #define _UTHREAD_H #include#include "lockfree_queue.h" //#include "luaWrapper.h" enum { NONE, WAIT4EVENT = 1, //等待某事件的来临 DEAD, //纤程已死亡 ACTIVED, //可运行的 UNACTIVED, //不可被添加到调度队列中 YIELD, SLEEP, }; enum { BS_MOV = 0, BS_ATK, BS_OTHER, BS_WAIT4LOCK, BS_END, }; //阻塞结构 class BlockStruct { public: BlockStruct(uChar type=BS_OTHER):bs_type(type){} //返回true纤程将从block中返回 virtual bool WakeUp() = 0; uChar bs_type; }; typedef int uthread_t; class uthread; class runnable { public: virtual void main_routine() = 0; }; class uthread; struct st_timeout { st_timeout(uthread *ut):ut(ut),_timeout(0),index(0){} bool operator < (st_timeout &r) { return _timeout < r._timeout; } uLong _timeout; uthread *ut; int index;//在超时队列中的下标 private: st_timeout & operator = (const st_timeout &other); st_timeout(const st_timeout &other); }; class Scheduler; class uthread; struct ulstruct { void *lock_addr; uthread *ut; }; //纤程 class uthread { public: uthread(Scheduler *sc):m_runnable(0),m_bs(0),uthread_id(-1),m_status(NONE),_st_timeout(this),p_uthreadContext(0),m_next(0),wakeuptick(0),m_scheduler(sc) { m_unlockevent = (_node *)_aligned_malloc(sizeof(*m_unlockevent),4); m_locknode = (_node *)_aligned_malloc(sizeof(*m_locknode),4); m_locknode->val = this; m_unlockevent->val.ut = this; } static void WINAPI fiber_routine(LPVOID pvParam); Scheduler *GetScheduler() { return m_scheduler; } //有事件到达,尝试唤醒block的纤程 void Signal(); void SetStatus(unsigned char st) { m_status = st; } unsigned char GetStatus() { return m_status; } PVOID GetUContext() { return p_uthreadContext; } void SetUContext(PVOID uct) { p_uthreadContext = uct; } void SetBs(BlockStruct *bs) { m_bs = bs; } BlockStruct *GetBs() { return m_bs; } void SetRunnable(runnable *ra) { m_runnable = ra; } runnable *GetRunnable() { return m_runnable; } st_timeout &GetTimeoutSt() { return _st_timeout; } uthread_t GetUid() { return uthread_id; } void SetUid(uthread_t uid) { uthread_id = uid; } uthread *Next() { return m_next; } void SetNext(uthread *ut) { m_next = ut; } uLong wakeuptick; _node * GetUnlockEvent() { return m_unlockevent; } _node * GetLockNode() { return m_locknode; } private: unsigned char m_status; uthread_t uthread_id; PVOID p_uthreadContext; BlockStruct *m_bs; runnable *m_runnable; uthread *m_next; st_timeout _st_timeout; _node * m_unlockevent; _node * m_locknode; Scheduler *m_scheduler; }; #endif
uthread.cpp
#include "stdafx.h" #include "uthread.h" #include "fiberApi.h" #include#include #include "ulock.h" void WINAPI uthread::fiber_routine(LPVOID pvParam) { uthread *_uthread = (uthread*)pvParam; while(1) { assert(_uthread->m_runnable); std::cout << "Ai Start,threadid :" << _uthread->uthread_id << std::endl; _uthread->m_runnable->main_routine(); std::cout << "Ai Stop" << std::endl; _uthread->m_runnable = 0; //从可运行队列中删除 //Scheduler::m_uthreads[Scheduler::m_curuid]->m_status = UNACTIVED; //SetCurrentUthreadState(UNACTIVED); //Scheduler::ReleaseUthread(Scheduler::m_curuid); ReleaseCurrentUthread(); //Scheduler::_Yield(UNACTIVED); _Yield(UNACTIVED); } //Scheduler::m_uthreads[Scheduler::m_curuid]->m_status = DEAD; //SetCurrentUthreadState(DEAD); /*这里不能直接退出纤程运行函数,否则会导致运行线程的退出, * 正确的做法是把运行权交回给scheduler,由scheduler来删除 * 这个纤程 */ //Scheduler::_Yield(DEAD); _Yield(DEAD); } //等待的事件到达了,将纤程重新插入到可运行队列中 void uthread::Signal() { if(m_bs->WakeUp()) { //printf("满足唤醒条件 %d /n",this->GetUid()); //等待的条件满足了,把fiber置为可运行态并添加到运行队列中 //Scheduler::Add2Active(this); Add2Active(this); m_bs = 0; wakeuptick = 0; } }
然后是用户态的锁
uLock.h
#ifndef _ULOCK_H #define _ULOCK_H #pragma pack(push) #pragma pack(4) #include "fiberApi.h" #include "lockfree_queue.h" class Scheduler; //纤程间使用的用户级锁 struct umutex { friend class Scheduler; public: umutex():flag(0){} void Lock() { if(InterlockedCompareExchange(&flag,1,0) == 1) { uthread *currentUThread = GetCurrentUThread(); _node*tmp = currentUThread->GetLockNode(); m_blockthread.push(tmp); //加锁失败,阻塞当前纤程 Wait4Lock(); } } void UnLock() { if(InterlockedCompareExchange(&flag,0,1) == 0) { //没有lock return; } //已经解锁,唤醒阻塞在这个锁上的纤程 _node *tmp = m_blockthread.pop(); if(tmp) { NotifyUnLock(this,tmp->val); } } private: bool _Lock(uthread *ut) { bool ret = InterlockedCompareExchange(&flag,1,0) == 0; if(!ret) { //uthread *currentUThread = GetCurrentUThread(); _node *tmp = ut->GetLockNode(); m_blockthread.push(tmp); } return ret; } private: volatile long flag;//如果被持有则置1,否则置0 LockFreeQueue m_blockthread;//阻塞在这个锁上的纤程 }; #pragma pack(pop) #endif
调度器
scheduler.h
#ifndef _SCHEDULER_H #define _SCHEDULER_H #include#include "uthread.h" #include
scheduler.cpp
#include "stdafx.h" //#include "Scheduler.h" #include#include #include "fiberApi.h" #include "uLock.h" //extern umutex *g_lock; uthread_t Scheduler::FiberStartRun(runnable *param) { uthread_t uid = GetFreeUthread(); if(uid != -1) { m_uthreads[uid]->SetRunnable(param); Add2Active(m_uthreads[uid]); } return uid; } void Scheduler::Schedule() { { //看看是否有可以获取锁的纤程 _node *tmp = NULL; while(tmp = m_unlockevent.pop()) { umutex *um = (umutex*)tmp->val.lock_addr; uthread *ut = tmp->val.ut; if(um->_Lock(ut)) { //加锁成功,将纤程从等待队列中删除并投入到可运行队列中 Add2Active(ut); } //std::map >::iterator it = m_wait4lock.find(tmp->val); //if(it != m_wait4lock.end()) //{ //尝试加锁 /*if(!it->second.empty()) { umutex *um = (umutex*)it->first; uthread *ut = it->second.front(); if(um->_Lock(ut)) { //加锁成功,将纤程从等待队列中删除并投入到可运行队列中 it->second.pop_front(); Add2Active(ut); } }*/ //} //else //{ //在Wait4Lock调用完成前,其它线程的解锁可能已经调用过NotifyUnLock了, //所以这里把解锁消息重新放回队列中,再次尝试 // m_unlockevent.push(tmp); //} } } //将所有等待添加到m_activeList中的纤程都添加进去 { for(unsigned int i = 0; i < pending_index; ++i) { uthread *ut = m_uthreads[m_pendingAdd[i]]; ut->SetNext(0); if(m_active_tail) { m_active_tail->SetNext(ut); m_active_tail = ut; } else { m_active_head = m_active_tail = ut; } } pending_index = 0; } uthread *cur = m_active_head; uthread *pre = NULL; while(cur) { m_curuid = cur->GetUid(); SwitchToFiber(cur->GetUContext()); m_curuid = -1; unsigned char status = cur->GetStatus(); //当纤程处于以下状态时需要从可运行队列中移除 if(status == DEAD || status == SLEEP || status == WAIT4EVENT || status == UNACTIVED || status == YIELD) { //删除首元素 if(cur == m_active_head) { //同时也是尾元素 if(cur == m_active_tail) m_active_head = m_active_tail = NULL; else m_active_head = cur->Next(); } else if(cur == m_active_tail) { pre->SetNext(NULL); m_active_tail = pre; } else pre->SetNext(cur->Next()); uthread *tmp = cur; cur = cur->Next(); tmp->SetNext(0); //如果仅仅是让出处理器,需要重新投入到可运行队列中 if(status == YIELD) Add2Active(tmp); } else { pre = cur; cur = cur->Next(); } } //看看有没有timeout的纤程 { uLong now = GetTickCount(); while(m_timeoutlist.Min() !=0 && m_timeoutlist.Min() <= now) { st_timeout *timeout = m_timeoutlist.PopMin(); if(timeout->ut->GetStatus() == WAIT4EVENT || timeout->ut->GetStatus() == SLEEP) { timeout->ut->wakeuptick = timeout->_timeout; Add2Active(timeout->ut); } } } } void Scheduler::Destroy() { for(int i = 0; i < MAX_FIBER; ++i) { if(m_uthreads[i]) { DeleteFiber(m_uthreads[i]->GetUContext()); delete m_uthreads[i]; } } ConvertFiberToThread(); } void Scheduler::Block(BlockStruct *bs,uLong ms) { if(ms > 0) { st_timeout &_st_timeout = m_uthreads[m_curuid]->GetTimeoutSt(); _st_timeout._timeout = GetTickCount() + ms;//time(NULL) + timeout; if(!_st_timeout.index) { m_timeoutlist.Insert(&_st_timeout); } else { m_timeoutlist.Change(&_st_timeout); } } m_uthreads[m_curuid]->SetBs(bs); m_uthreads[m_curuid]->SetStatus(WAIT4EVENT); SwitchToFiber(m_pUthreadContext); m_uthreads[m_curuid]->SetBs(0); } void Scheduler::Init() { m_pUthreadContext = ConvertThreadToFiber(NULL); //创建fiber池 for(int i = 0 ; i < MAX_FIBER; ++i) { uthread *nthread = new uthread(this); PVOID uthreadcontext = CreateFiberEx(commitsize,reservesize,0,uthread::fiber_routine,nthread); assert(uthreadcontext); nthread->SetUContext(uthreadcontext); m_uthreads[i] = nthread; nthread->SetUid(i); m_uthreadpool.push_back(i); } } //将一个纤程添加到可运行队列中 void Scheduler::Add2Active(uthread *ut) { //如果已经在active中了则不能再次添加 if(ut->GetStatus() != ACTIVED) { ut->SetStatus(ACTIVED); m_pendingAdd[pending_index++] = ut->GetUid(); } } void Scheduler::ClearActiveList() { pending_index = 0; uthread *cur = m_active_head; while(cur) { uthread *next = cur->Next(); cur->SetNext(0); cur = next; } m_active_head = m_active_tail = NULL; } void Scheduler::Sleep(uLong ms) { if(ms > 0) { st_timeout &_st_timeout = m_uthreads[m_curuid]->GetTimeoutSt(); _st_timeout._timeout = GetTickCount() + ms;//time(NULL) + seconds; if(!_st_timeout.index) { m_timeoutlist.Insert(&_st_timeout); } else { m_timeoutlist.Change(&_st_timeout); } m_uthreads[m_curuid]->SetStatus(SLEEP); } SwitchToFiber(m_pUthreadContext); } //纤程在等待lock_addr锁,需要将纤程移出可运行队列,并记等待信息 void Scheduler::Wait4Lock() { /*std::map >::iterator it = m_wait4lock.find(lock_addr); uthread *current_uthread = m_uthreads[m_curuid]; if(it == m_wait4lock.end()) m_wait4lock.insert(std::make_pair(lock_addr,std::list (1,current_uthread))); else it->second.push_back(current_uthread); */ uthread *current_uthread = m_uthreads[m_curuid]; //m_wait4lock.push_back(current_uthread); current_uthread->SetStatus(WAIT4EVENT); //切换回调度器 SwitchToFiber(m_pUthreadContext); }
然后是一些API
fiberApi.h
#ifndef _FIBERAPI_H #define _FIBERAPI_H #include "Scheduler.h" #include
fiberApi.cpp
#include "stdafx.h" #include "fiberApi.h" //std::mapg_tlssc; Scheduler* g_tlssc[1019]; void _Yield(uChar state) { DWORD currenttrheadid = GetCurrentThreadId(); g_tlssc[currenttrheadid%512]->_Yield(state); } void ReleaseUthread(int uthreadid) { DWORD currenttrheadid = GetCurrentThreadId(); g_tlssc[currenttrheadid%512]->ReleaseUthread(uthreadid); } void ReleaseCurrentUthread() { DWORD currenttrheadid = GetCurrentThreadId(); Scheduler *sc = g_tlssc[currenttrheadid%512]; sc->ReleaseUthread(sc->m_curuid); } void SetCurrentUthreadState(uChar state) { DWORD currenttrheadid = GetCurrentThreadId(); Scheduler *sc = g_tlssc[currenttrheadid%512]; sc->m_uthreads[sc->m_curuid]->SetStatus(state); } void Add2Active(uthread *ut) { DWORD currenttrheadid = GetCurrentThreadId(); g_tlssc[currenttrheadid%512]->Add2Active(ut); } uthread *GetCurrentUThread() { DWORD currenttrheadid = GetCurrentThreadId(); Scheduler *sc = g_tlssc[currenttrheadid%512]; return sc->m_uthreads[sc->m_curuid]; } uthread_t GetCurrentUThreadId() { DWORD currenttrheadid = GetCurrentThreadId(); Scheduler *sc = g_tlssc[currenttrheadid%512]; return sc->m_curuid; } void Wait4Lock() { DWORD currenttrheadid = GetCurrentThreadId(); g_tlssc[currenttrheadid%512]->Wait4Lock(); } void NotifyUnLock(void *lock_addr,uthread *ut) { _node *nn = ut->GetUnlockEvent(); nn->val.lock_addr = lock_addr; ut->GetScheduler()->NotifyUnlock(nn); //g_tlssc[threadid]->NotifyUnlock(lock_addr); //std::map ::iterator it = g_tlssc.begin(); //std::map ::iterator end = g_tlssc.end(); //for(; it != end; ++it) // it->second.NotifyUnlock(lock_addr); }
经过进一步测试,在ulock的lock和unlock中使用的无锁队列m_blockthread可能因为多线程操作导致解锁通告丢失。
因此,m_blockthread需要改为普通队列,并且在操作前暂时用自旋锁锁定(暂时使用,希望可以找到更好的方法)。
大致修改如下:
void Lock() { if(InterlockedCompareExchange(&flag,1,0) == 1) { uthread *currentUThread = GetCurrentUThread(); _nodetmp = currentUThread->GetLockNode(); while(InterlockedCompareExchange(&spinlock,1,0) == 1); push(tmp); InterlockedCompareExchange(&spinlock,0,1); Wait4Lock(); } } void UnLock() { if(InterlockedCompareExchange(&flag,0,1) == 0) return; while(InterlockedCompareExchange(&spinlock,1,0) == 1); _node *tmp = pop(); InterlockedCompareExchange(&spinlock,0,1); if(tmp) NotifyUnLock(this,tmp->val); }
其次,还有一个问题需要解决,就是各纤程获得锁的次数不平均,具体例子如下:在双核机器上启动两个线程,线程上各运行一个纤程,对testlist进行写入的时候
会发现,大部分的写入是由其中一个纤程完成的,而另外一个纤程则很少能获得写入的机会。
下面是修改后的uLock.h,解决了纤程获得锁不平均的问题,只要创建的调度线程不超过cpu的数量,基本保证了各纤程有均等的机会获得锁。
#ifndef _ULOCK_H #define _ULOCK_H #pragma pack(push) #pragma pack(4) #include "fiberApi.h" //#include "lockfree_queue.h" class Scheduler; //纤程间使用的用户级锁 struct umutex { friend class Scheduler; public: umutex():flag(0),spinlock(0),m_head(0),m_tail(0) { } void Lock() { if(InterlockedCompareExchange(&flag,1,0) == 1) { uthread *currentUThread = GetCurrentUThread(); _node*tmp = currentUThread->GetLockNode(); while(InterlockedCompareExchange(&spinlock,1,0) == 1); //再次尝试加锁 if(InterlockedCompareExchange(&flag,1,0) == 0) { InterlockedCompareExchange(&spinlock,0,1); return; } push(tmp); InterlockedCompareExchange(&spinlock,0,1); //加锁失败,阻塞当前纤程 Wait4Lock(); } } void UnLock() { if(InterlockedCompareExchange(&flag,0,1) == 0) { //没有lock return; } //已经解锁,挑选一个纤程,并将它唤醒 while(InterlockedCompareExchange(&spinlock,1,0) == 1); _node *tmp = pop(); InterlockedCompareExchange(&spinlock,0,1); if(tmp) { NotifyUnLock(this,tmp->val); } } private: bool _Lock(uthread *ut) { bool ret = InterlockedCompareExchange(&flag,1,0) == 0; if(!ret) { uthread *currentUThread = GetCurrentUThread(); _node *tmp = ut->GetLockNode(); while(InterlockedCompareExchange(&spinlock,1,0) == 1); //再次尝试加锁 if(InterlockedCompareExchange(&flag,1,0) == 0) { InterlockedCompareExchange(&spinlock,0,1); return true; } push(tmp); InterlockedCompareExchange(&spinlock,0,1); } return ret; } void push(_node *blockut) { blockut->_next = NULL; if(NULL == m_tail) { m_head = m_tail = blockut; } else { m_tail->_next = blockut; m_tail = blockut; } } _node *pop() { if(NULL == m_head) return NULL; else { _node *ret = m_head; m_head = m_head->_next; if(m_head == NULL) m_tail = m_head; return ret; } } private: volatile long flag;//如果被持有则置1,否则置0 volatile long spinlock;//自旋锁,保护m_blockthread; //队列,记录阻塞在这个锁上的纤程 _node *m_head; _node *m_tail; }; #pragma pack(pop) #endif
测试代码:
// fiberFramework.cpp : 定义控制台应用程序的入口点。 // #include "stdafx.h" #include "CThread.h" #include "fiberApi.h" #include "uLock.h" #include "CLock.h" #define TESTSIZE 1000000 int g_testlist[TESTSIZE]; int g_testlistcs[TESTSIZE]; int g_testmutex[TESTSIZE]; umutex *g_lock; zMutex *g_lockmutex; zLightMutex *g_lockcs; /*std::list g_testlist2; std::list g_testlistcs2; std::list g_testmutex2; umutex *g_lock2; zMutex *g_lockmutex2; zLightMutex *g_lockcs2; */ static volatile bool finish = false; static volatile long count = 0; zThreadGroup g_threadgroup; DWORD starttime = 0; DWORD endtime = 0; class uworker : public runnable { public: void main_routine() { while(1) { g_lock->Lock(); if(count==0) { starttime = GetTickCount(); } if(count == TESTSIZE) { endtime = GetTickCount(); finish = true; g_lock->UnLock(); return; } else { g_testlist[count] = GetCurrentThreadId()+GetCurrentUThreadId(); //InterlockedIncrement(&count); } ++count; g_lock->UnLock(); _Yield(YIELD); volatile int c = 0; for(volatile int cc = 0; cc < 100; ++cc) c++; } } }; class CWorkerThread : public zThread,private Noncopyable { public: CWorkerThread(const std::string &name = std::string("zThread"),const bool joinable = true) :zThread(name,joinable){} ~CWorkerThread(){} void run() { Scheduler *sc = new Scheduler; sc->Init(); //g_tlssc.insert(std::make_pair(GetCurrentThreadId(),sc)); if(g_tlssc[GetCurrentThreadId()%TLSSIZE] != NULL) { printf("error here/n"); getchar(); exit(0); } g_tlssc[GetCurrentThreadId()%TLSSIZE] = sc; { uworker uw1; uworker uw2; uworker uw3; uworker uw4; sc->FiberStartRun(&uw1); sc->FiberStartRun(&uw2); sc->FiberStartRun(&uw3); sc->FiberStartRun(&uw4); } /*{ uworker uw1; uworker uw2; uworker uw3; uworker uw4; sc->FiberStartRun(&uw1); sc->FiberStartRun(&uw2); sc->FiberStartRun(&uw3); sc->FiberStartRun(&uw4); }*/ while(!finish) { sc->Schedule(); } } }; class CWorkerThreadCs : public zThread,private Noncopyable { public: CWorkerThreadCs(const std::string &name = std::string("zThread"),const bool joinable = true) :zThread(name,joinable){} ~CWorkerThreadCs(){} void run() { while(1) { g_lockcs->Lock(); if(count == 0) { starttime = GetTickCount();; } if(count == TESTSIZE) { endtime = GetTickCount(); g_lockcs->UnLock(); return; } else { g_testlistcs[count] = GetCurrentThreadId(); //InterlockedIncrement(&count); } ++count; g_lockcs->UnLock(); volatile int c = 0; for(volatile int cc = 0; cc < 100; ++cc) c++; } } }; class CWorkerThreadMutex : public zThread,private Noncopyable { public: CWorkerThreadMutex(const std::string &name = std::string("zThread"),const bool joinable = true) :zThread(name,joinable){} ~CWorkerThreadMutex(){} void run() { while(1) { g_lockmutex->Lock(); if(count == 0) { starttime = GetTickCount();; } if(count == TESTSIZE) { endtime = GetTickCount(); g_lockmutex->UnLock(); //printf("finish/n"); return; } else { g_testmutex[count] = GetCurrentThreadId(); //InterlockedIncrement(&count); //printf("uthread:%d/n",GetCurrentThreadId()); } ++count; g_lockmutex->UnLock(); volatile int c = 0; for(volatile int cc = 0; cc < 100; ++cc) c++; } } }; struct TestCallback : public zThreadGroup::Callback { void exec(zThread *e) { e->start(); } ~TestCallback(){} }; void testfiber(int n) { void *buf = _aligned_malloc(sizeof(*g_lock),4); g_lock = new (buf)umutex; for(int i = 0; i < n; ++i) { CWorkerThread *cw1 = new CWorkerThread; g_threadgroup.add(cw1); } TestCallback CallBack; g_threadgroup.execAll(CallBack); g_threadgroup.joinAll(); printf("test fiber/n"); printf("count %d/n",count); printf("time %d/n",endtime - starttime); std::mapstat; for(int i = 0; i < TESTSIZE; ++i) { std::map ::iterator it = stat.find(g_testlist[i]); if(it == stat.end()) { stat.insert(std::make_pair(g_testlist[i],1)); } else { stat[g_testlist[i]]++; } } printf("stat size = %d/n",stat.size()); for(std::map ::iterator it = stat.begin(); it != stat.end(); ++it) { printf("id=%d,count=%d/n",it->first,it->second); } } void testcs(int n) { g_lockcs = new zLightMutex; for(int i = 0; i < n; ++i) { CWorkerThreadCs *cw1 = new CWorkerThreadCs; g_threadgroup.add(cw1); } TestCallback CallBack; g_threadgroup.execAll(CallBack); //while(!_kbhit())//等待服务器终止 //{ // Sleep(10); //} g_threadgroup.joinAll(); printf("test cs/n"); printf("count %d/n",count); printf("time %d/n",endtime - starttime); std::map stat; for(int i = 0; i < TESTSIZE; ++i) { std::map ::iterator it = stat.find(g_testlistcs[i]); if(it == stat.end()) { stat.insert(std::make_pair(g_testlistcs[i],1)); } else { stat[g_testlistcs[i]]++; } } printf("stat size = %d/n",stat.size()); for(std::map ::iterator it = stat.begin(); it != stat.end(); ++it) { printf("id=%d,count=%d/n",it->first,it->second); } } void testmutex(int n) { g_lockmutex = new zMutex; for(int i = 0; i < n; ++i) { CWorkerThreadMutex *cw1 = new CWorkerThreadMutex; g_threadgroup.add(cw1); } TestCallback CallBack; g_threadgroup.execAll(CallBack); g_threadgroup.joinAll(); printf("test mutex/n"); printf("count %d/n",count); printf("time %d/n",endtime - starttime); std::map stat; for(int i = 0; i < TESTSIZE; ++i) { std::map ::iterator it = stat.find(g_testmutex[i]); if(it == stat.end()) { stat.insert(std::make_pair(g_testmutex[i],1)); } else { stat[g_testmutex[i]]++; } } printf("stat size = %d/n",stat.size()); for(std::map ::iterator it = stat.begin(); it != stat.end(); ++it) { printf("id=%d,count=%d/n",it->first,it->second); } } int _tmain(int argc, _TCHAR* argv[]) { int n = _ttol(argv[1]); memset(g_tlssc,0,sizeof(g_tlssc)); count = 0; testfiber(n/4); count = 0; testcs(n); count = 0; testmutex(n); /*LockFreeQueue q; for(int i = 0; i < 5; ++i) { _node *pNode = new _node ; pNode->val = i; q.push(pNode); } //q.print(); for(int i = 0; i < 5; ++i) { _node *pNode = q.pop(); printf("%d/n",pNode->val); } _node *pNode = q.pop(); */ getchar(); return 0; }