123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995 |
- /*
- ** 2012-01-23
- **
- ** The author disclaims copyright to this source code. In place of
- ** a legal notice, here is a blessing:
- **
- ** May you do good and not evil.
- ** May you find forgiveness for yourself and forgive others.
- ** May you share freely, never taking more than you give.
- **
- *************************************************************************
- **
- ** Utilities used to help multiple LSM clients to coexist within the
- ** same process space.
- */
- #include "lsmInt.h"
- /*
- ** Global data. All global variables used by code in this file are grouped
- ** into the following structure instance.
- **
- ** pDatabase:
- ** Linked list of all Database objects allocated within this process.
- ** This list may not be traversed without holding the global mutex (see
- ** functions enterGlobalMutex() and leaveGlobalMutex()).
- */
- static struct SharedData {
- Database *pDatabase; /* Linked list of all Database objects */
- } gShared;
- /*
- ** Database structure. There is one such structure for each distinct
- ** database accessed by this process. They are stored in the singly linked
- ** list starting at global variable gShared.pDatabase. Database objects are
- ** reference counted. Once the number of connections to the associated
- ** database drops to zero, they are removed from the linked list and deleted.
- **
- ** pFile:
- ** In multi-process mode, this file descriptor is used to obtain locks
- ** and to access shared-memory. In single process mode, its only job is
- ** to hold the exclusive lock on the file.
- **
- */
- struct Database {
- /* Protected by the global mutex (enterGlobalMutex/leaveGlobalMutex): */
- char *zName; /* Canonical path to database file */
- int nName; /* strlen(zName) */
- int nDbRef; /* Number of associated lsm_db handles */
- Database *pDbNext; /* Next Database structure in global list */
- /* Protected by the local mutex (pClientMutex) */
- int bReadonly; /* True if Database.pFile is read-only */
- int bMultiProc; /* True if running in multi-process mode */
- lsm_file *pFile; /* Used for locks/shm in multi-proc mode */
- LsmFile *pLsmFile; /* List of deferred closes */
- lsm_mutex *pClientMutex; /* Protects the apShmChunk[] and pConn */
- int nShmChunk; /* Number of entries in apShmChunk[] array */
- void **apShmChunk; /* Array of "shared" memory regions */
- lsm_db *pConn; /* List of connections to this db. */
- };
- /*
- ** Functions to enter and leave the global mutex. This mutex is used
- ** to protect the global linked-list headed at gShared.pDatabase.
- */
- static int enterGlobalMutex(lsm_env *pEnv){
- lsm_mutex *p;
- int rc = lsmMutexStatic(pEnv, LSM_MUTEX_GLOBAL, &p);
- if( rc==LSM_OK ) lsmMutexEnter(pEnv, p);
- return rc;
- }
- static void leaveGlobalMutex(lsm_env *pEnv){
- lsm_mutex *p;
- lsmMutexStatic(pEnv, LSM_MUTEX_GLOBAL, &p);
- lsmMutexLeave(pEnv, p);
- }
- #ifdef LSM_DEBUG
- static int holdingGlobalMutex(lsm_env *pEnv){
- lsm_mutex *p;
- lsmMutexStatic(pEnv, LSM_MUTEX_GLOBAL, &p);
- return lsmMutexHeld(pEnv, p);
- }
- #endif
- #if 0
- static void assertNotInFreelist(Freelist *p, int iBlk){
- int i;
- for(i=0; i<p->nEntry; i++){
- assert( p->aEntry[i].iBlk!=iBlk );
- }
- }
- #else
- # define assertNotInFreelist(x,y)
- #endif
- /*
- ** Append an entry to the free-list. If (iId==-1), this is a delete.
- */
- int freelistAppend(lsm_db *db, u32 iBlk, i64 iId){
- lsm_env *pEnv = db->pEnv;
- Freelist *p;
- int i;
- assert( iId==-1 || iId>=0 );
- p = db->bUseFreelist ? db->pFreelist : &db->pWorker->freelist;
- /* Extend the space allocated for the freelist, if required */
- assert( p->nAlloc>=p->nEntry );
- if( p->nAlloc==p->nEntry ){
- int nNew;
- int nByte;
- FreelistEntry *aNew;
- nNew = (p->nAlloc==0 ? 4 : p->nAlloc*2);
- nByte = sizeof(FreelistEntry) * nNew;
- aNew = (FreelistEntry *)lsmRealloc(pEnv, p->aEntry, nByte);
- if( !aNew ) return LSM_NOMEM_BKPT;
- p->nAlloc = nNew;
- p->aEntry = aNew;
- }
- for(i=0; i<p->nEntry; i++){
- assert( i==0 || p->aEntry[i].iBlk > p->aEntry[i-1].iBlk );
- if( p->aEntry[i].iBlk>=iBlk ) break;
- }
- if( i<p->nEntry && p->aEntry[i].iBlk==iBlk ){
- /* Clobber an existing entry */
- p->aEntry[i].iId = iId;
- }else{
- /* Insert a new entry into the list */
- int nByte = sizeof(FreelistEntry)*(p->nEntry-i);
- memmove(&p->aEntry[i+1], &p->aEntry[i], nByte);
- p->aEntry[i].iBlk = iBlk;
- p->aEntry[i].iId = iId;
- p->nEntry++;
- }
- return LSM_OK;
- }
- /*
- ** This function frees all resources held by the Database structure passed
- ** as the only argument.
- */
- static void freeDatabase(lsm_env *pEnv, Database *p){
- assert( holdingGlobalMutex(pEnv) );
- if( p ){
- /* Free the mutexes */
- lsmMutexDel(pEnv, p->pClientMutex);
- if( p->pFile ){
- lsmEnvClose(pEnv, p->pFile);
- }
- /* Free the array of shm pointers */
- lsmFree(pEnv, p->apShmChunk);
- /* Free the memory allocated for the Database struct itself */
- lsmFree(pEnv, p);
- }
- }
- typedef struct DbTruncateCtx DbTruncateCtx;
- struct DbTruncateCtx {
- int nBlock;
- i64 iInUse;
- };
- static int dbTruncateCb(void *pCtx, int iBlk, i64 iSnapshot){
- DbTruncateCtx *p = (DbTruncateCtx *)pCtx;
- if( iBlk!=p->nBlock || (p->iInUse>=0 && iSnapshot>=p->iInUse) ) return 1;
- p->nBlock--;
- return 0;
- }
- static int dbTruncate(lsm_db *pDb, i64 iInUse){
- int rc = LSM_OK;
- #if 0
- int i;
- DbTruncateCtx ctx;
- assert( pDb->pWorker );
- ctx.nBlock = pDb->pWorker->nBlock;
- ctx.iInUse = iInUse;
- rc = lsmWalkFreelist(pDb, 1, dbTruncateCb, (void *)&ctx);
- for(i=ctx.nBlock+1; rc==LSM_OK && i<=pDb->pWorker->nBlock; i++){
- rc = freelistAppend(pDb, i, -1);
- }
- if( rc==LSM_OK ){
- #ifdef LSM_LOG_FREELIST
- if( ctx.nBlock!=pDb->pWorker->nBlock ){
- lsmLogMessage(pDb, 0,
- "dbTruncate(): truncated db to %d blocks",ctx.nBlock
- );
- }
- #endif
- pDb->pWorker->nBlock = ctx.nBlock;
- }
- #endif
- return rc;
- }
- /*
- ** This function is called during database shutdown (when the number of
- ** connections drops from one to zero). It truncates the database file
- ** to as small a size as possible without truncating away any blocks that
- ** contain data.
- */
- static int dbTruncateFile(lsm_db *pDb){
- int rc;
- assert( pDb->pWorker==0 );
- assert( lsmShmAssertLock(pDb, LSM_LOCK_DMS1, LSM_LOCK_EXCL) );
- rc = lsmCheckpointLoadWorker(pDb);
- if( rc==LSM_OK ){
- DbTruncateCtx ctx;
- /* Walk the database free-block-list in reverse order. Set ctx.nBlock
- ** to the block number of the last block in the database that actually
- ** contains data. */
- ctx.nBlock = pDb->pWorker->nBlock;
- ctx.iInUse = -1;
- rc = lsmWalkFreelist(pDb, 1, dbTruncateCb, (void *)&ctx);
- /* If the last block that contains data is not already the last block in
- ** the database file, truncate the database file so that it is. */
- if( rc==LSM_OK ){
- rc = lsmFsTruncateDb(
- pDb->pFS, (i64)ctx.nBlock*lsmFsBlockSize(pDb->pFS)
- );
- }
- }
- lsmFreeSnapshot(pDb->pEnv, pDb->pWorker);
- pDb->pWorker = 0;
- return rc;
- }
- static void doDbDisconnect(lsm_db *pDb){
- int rc;
- if( pDb->bReadonly ){
- lsmShmLock(pDb, LSM_LOCK_DMS3, LSM_LOCK_UNLOCK, 0);
- }else{
- /* Block for an exclusive lock on DMS1. This lock serializes all calls
- ** to doDbConnect() and doDbDisconnect() across all processes. */
- rc = lsmShmLock(pDb, LSM_LOCK_DMS1, LSM_LOCK_EXCL, 1);
- if( rc==LSM_OK ){
- lsmShmLock(pDb, LSM_LOCK_DMS2, LSM_LOCK_UNLOCK, 0);
- /* Try an exclusive lock on DMS2. If successful, this is the last
- ** connection to the database. In this case flush the contents of the
- ** in-memory tree to disk and write a checkpoint. */
- rc = lsmShmTestLock(pDb, LSM_LOCK_DMS2, 1, LSM_LOCK_EXCL);
- if( rc==LSM_OK ){
- rc = lsmShmTestLock(pDb, LSM_LOCK_CHECKPOINTER, 1, LSM_LOCK_EXCL);
- }
- if( rc==LSM_OK ){
- int bReadonly = 0; /* True if there exist read-only conns. */
- /* Flush the in-memory tree, if required. If there is data to flush,
- ** this will create a new client snapshot in Database.pClient. The
- ** checkpoint (serialization) of this snapshot may be written to disk
- ** by the following block.
- **
- ** There is no need to take a WRITER lock here. That there are no
- ** other locks on DMS2 guarantees that there are no other read-write
- ** connections at this time (and the lock on DMS1 guarantees that
- ** no new ones may appear).
- */
- rc = lsmTreeLoadHeader(pDb, 0);
- if( rc==LSM_OK && (lsmTreeHasOld(pDb) || lsmTreeSize(pDb)>0) ){
- rc = lsmFlushTreeToDisk(pDb);
- }
- /* Now check if there are any read-only connections. If there are,
- ** then do not truncate the db file or unlink the shared-memory
- ** region. */
- if( rc==LSM_OK ){
- rc = lsmShmTestLock(pDb, LSM_LOCK_DMS3, 1, LSM_LOCK_EXCL);
- if( rc==LSM_BUSY ){
- bReadonly = 1;
- rc = LSM_OK;
- }
- }
- /* Write a checkpoint to disk. */
- if( rc==LSM_OK ){
- rc = lsmCheckpointWrite(pDb, 0);
- }
- /* If the checkpoint was written successfully, delete the log file
- ** and, if possible, truncate the database file. */
- if( rc==LSM_OK ){
- int bRotrans = 0;
- Database *p = pDb->pDatabase;
- /* The log file may only be deleted if there are no clients
- ** read-only clients running rotrans transactions. */
- rc = lsmDetectRoTrans(pDb, &bRotrans);
- if( rc==LSM_OK && bRotrans==0 ){
- lsmFsCloseAndDeleteLog(pDb->pFS);
- }
- /* The database may only be truncated if there exist no read-only
- ** clients - either connected or running rotrans transactions. */
- if( bReadonly==0 && bRotrans==0 ){
- lsmFsUnmap(pDb->pFS);
- dbTruncateFile(pDb);
- if( p->pFile && p->bMultiProc ){
- lsmEnvShmUnmap(pDb->pEnv, p->pFile, 1);
- }
- }
- }
- }
- }
- if( pDb->iRwclient>=0 ){
- lsmShmLock(pDb, LSM_LOCK_RWCLIENT(pDb->iRwclient), LSM_LOCK_UNLOCK, 0);
- pDb->iRwclient = -1;
- }
- lsmShmLock(pDb, LSM_LOCK_DMS1, LSM_LOCK_UNLOCK, 0);
- }
- pDb->pShmhdr = 0;
- }
- static int doDbConnect(lsm_db *pDb){
- const int nUsMax = 100000; /* Max value for nUs */
- int nUs = 1000; /* us to wait between DMS1 attempts */
- int rc;
- /* Obtain a pointer to the shared-memory header */
- assert( pDb->pShmhdr==0 );
- assert( pDb->bReadonly==0 );
- /* Block for an exclusive lock on DMS1. This lock serializes all calls
- ** to doDbConnect() and doDbDisconnect() across all processes. */
- while( 1 ){
- rc = lsmShmLock(pDb, LSM_LOCK_DMS1, LSM_LOCK_EXCL, 1);
- if( rc!=LSM_BUSY ) break;
- lsmEnvSleep(pDb->pEnv, nUs);
- nUs = nUs * 2;
- if( nUs>nUsMax ) nUs = nUsMax;
- }
- if( rc==LSM_OK ){
- rc = lsmShmCacheChunks(pDb, 1);
- }
- if( rc!=LSM_OK ) return rc;
- pDb->pShmhdr = (ShmHeader *)pDb->apShm[0];
- /* Try an exclusive lock on DMS2/DMS3. If successful, this is the first
- ** and only connection to the database. In this case initialize the
- ** shared-memory and run log file recovery. */
- assert( LSM_LOCK_DMS3==1+LSM_LOCK_DMS2 );
- rc = lsmShmTestLock(pDb, LSM_LOCK_DMS2, 2, LSM_LOCK_EXCL);
- if( rc==LSM_OK ){
- memset(pDb->pShmhdr, 0, sizeof(ShmHeader));
- rc = lsmCheckpointRecover(pDb);
- if( rc==LSM_OK ){
- rc = lsmLogRecover(pDb);
- }
- if( rc==LSM_OK ){
- ShmHeader *pShm = pDb->pShmhdr;
- pShm->aReader[0].iLsmId = lsmCheckpointId(pShm->aSnap1, 0);
- pShm->aReader[0].iTreeId = pDb->treehdr.iUsedShmid;
- }
- }else if( rc==LSM_BUSY ){
- rc = LSM_OK;
- }
- /* Take a shared lock on DMS2. In multi-process mode this lock "cannot"
- ** fail, as connections may only hold an exclusive lock on DMS2 if they
- ** first hold an exclusive lock on DMS1. And this connection is currently
- ** holding the exclusive lock on DSM1.
- **
- ** However, if some other connection has the database open in single-process
- ** mode, this operation will fail. In this case, return the error to the
- ** caller - the attempt to connect to the db has failed.
- */
- if( rc==LSM_OK ){
- rc = lsmShmLock(pDb, LSM_LOCK_DMS2, LSM_LOCK_SHARED, 0);
- }
- /* If anything went wrong, unlock DMS2. Otherwise, try to take an exclusive
- ** lock on one of the LSM_LOCK_RWCLIENT() locks. Unlock DMS1 in any case. */
- if( rc!=LSM_OK ){
- pDb->pShmhdr = 0;
- }else{
- int i;
- for(i=0; i<LSM_LOCK_NRWCLIENT; i++){
- int rc2 = lsmShmLock(pDb, LSM_LOCK_RWCLIENT(i), LSM_LOCK_EXCL, 0);
- if( rc2==LSM_OK ) pDb->iRwclient = i;
- if( rc2!=LSM_BUSY ){
- rc = rc2;
- break;
- }
- }
- }
- lsmShmLock(pDb, LSM_LOCK_DMS1, LSM_LOCK_UNLOCK, 0);
- return rc;
- }
- static int dbOpenSharedFd(lsm_env *pEnv, Database *p, int bRoOk){
- int rc;
- rc = lsmEnvOpen(pEnv, p->zName, 0, &p->pFile);
- if( rc==LSM_IOERR && bRoOk ){
- rc = lsmEnvOpen(pEnv, p->zName, LSM_OPEN_READONLY, &p->pFile);
- p->bReadonly = 1;
- }
- return rc;
- }
- /*
- ** Return a reference to the shared Database handle for the database
- ** identified by canonical path zName. If this is the first connection to
- ** the named database, a new Database object is allocated. Otherwise, a
- ** pointer to an existing object is returned.
- **
- ** If successful, *ppDatabase is set to point to the shared Database
- ** structure and LSM_OK returned. Otherwise, *ppDatabase is set to NULL
- ** and and LSM error code returned.
- **
- ** Each successful call to this function should be (eventually) matched
- ** by a call to lsmDbDatabaseRelease().
- */
- int lsmDbDatabaseConnect(
- lsm_db *pDb, /* Database handle */
- const char *zName /* Full-path to db file */
- ){
- lsm_env *pEnv = pDb->pEnv;
- int rc; /* Return code */
- Database *p = 0; /* Pointer returned via *ppDatabase */
- int nName = lsmStrlen(zName);
- assert( pDb->pDatabase==0 );
- rc = enterGlobalMutex(pEnv);
- if( rc==LSM_OK ){
- /* Search the global list for an existing object. TODO: Need something
- ** better than the memcmp() below to figure out if a given Database
- ** object represents the requested file. */
- for(p=gShared.pDatabase; p; p=p->pDbNext){
- if( nName==p->nName && 0==memcmp(zName, p->zName, nName) ) break;
- }
- /* If no suitable Database object was found, allocate a new one. */
- if( p==0 ){
- p = (Database *)lsmMallocZeroRc(pEnv, sizeof(Database)+nName+1, &rc);
- /* If the allocation was successful, fill in other fields and
- ** allocate the client mutex. */
- if( rc==LSM_OK ){
- p->bMultiProc = pDb->bMultiProc;
- p->zName = (char *)&p[1];
- p->nName = nName;
- memcpy((void *)p->zName, zName, nName+1);
- rc = lsmMutexNew(pEnv, &p->pClientMutex);
- }
- /* If nothing has gone wrong so far, open the shared fd. And if that
- ** succeeds and this connection requested single-process mode,
- ** attempt to take the exclusive lock on DMS2. */
- if( rc==LSM_OK ){
- int bReadonly = (pDb->bReadonly && pDb->bMultiProc);
- rc = dbOpenSharedFd(pDb->pEnv, p, bReadonly);
- }
- if( rc==LSM_OK && p->bMultiProc==0 ){
- /* Hold an exclusive lock DMS1 while grabbing DMS2. This ensures
- ** that any ongoing call to doDbDisconnect() (even one in another
- ** process) is finished before proceeding. */
- assert( p->bReadonly==0 );
- rc = lsmEnvLock(pDb->pEnv, p->pFile, LSM_LOCK_DMS1, LSM_LOCK_EXCL);
- if( rc==LSM_OK ){
- rc = lsmEnvLock(pDb->pEnv, p->pFile, LSM_LOCK_DMS2, LSM_LOCK_EXCL);
- lsmEnvLock(pDb->pEnv, p->pFile, LSM_LOCK_DMS1, LSM_LOCK_UNLOCK);
- }
- }
- if( rc==LSM_OK ){
- p->pDbNext = gShared.pDatabase;
- gShared.pDatabase = p;
- }else{
- freeDatabase(pEnv, p);
- p = 0;
- }
- }
- if( p ){
- p->nDbRef++;
- }
- leaveGlobalMutex(pEnv);
- if( p ){
- lsmMutexEnter(pDb->pEnv, p->pClientMutex);
- pDb->pNext = p->pConn;
- p->pConn = pDb;
- lsmMutexLeave(pDb->pEnv, p->pClientMutex);
- }
- }
- pDb->pDatabase = p;
- if( rc==LSM_OK ){
- assert( p );
- rc = lsmFsOpen(pDb, zName, p->bReadonly);
- }
- /* If the db handle is read-write, then connect to the system now. Run
- ** recovery as necessary. Or, if this is a read-only database handle,
- ** defer attempting to connect to the system until a read-transaction
- ** is opened. */
- if( rc==LSM_OK ){
- rc = lsmFsConfigure(pDb);
- }
- if( rc==LSM_OK && pDb->bReadonly==0 ){
- rc = doDbConnect(pDb);
- }
- return rc;
- }
- static void dbDeferClose(lsm_db *pDb){
- if( pDb->pFS ){
- LsmFile *pLsmFile;
- Database *p = pDb->pDatabase;
- pLsmFile = lsmFsDeferClose(pDb->pFS);
- pLsmFile->pNext = p->pLsmFile;
- p->pLsmFile = pLsmFile;
- }
- }
- LsmFile *lsmDbRecycleFd(lsm_db *db){
- LsmFile *pRet;
- Database *p = db->pDatabase;
- lsmMutexEnter(db->pEnv, p->pClientMutex);
- if( (pRet = p->pLsmFile)!=0 ){
- p->pLsmFile = pRet->pNext;
- }
- lsmMutexLeave(db->pEnv, p->pClientMutex);
- return pRet;
- }
- /*
- ** Release a reference to a Database object obtained from
- ** lsmDbDatabaseConnect(). There should be exactly one call to this function
- ** for each successful call to Find().
- */
- void lsmDbDatabaseRelease(lsm_db *pDb){
- Database *p = pDb->pDatabase;
- if( p ){
- lsm_db **ppDb;
- if( pDb->pShmhdr ){
- doDbDisconnect(pDb);
- }
- lsmFsUnmap(pDb->pFS);
- lsmMutexEnter(pDb->pEnv, p->pClientMutex);
- for(ppDb=&p->pConn; *ppDb!=pDb; ppDb=&((*ppDb)->pNext));
- *ppDb = pDb->pNext;
- dbDeferClose(pDb);
- lsmMutexLeave(pDb->pEnv, p->pClientMutex);
- enterGlobalMutex(pDb->pEnv);
- p->nDbRef--;
- if( p->nDbRef==0 ){
- LsmFile *pIter;
- LsmFile *pNext;
- Database **pp;
- /* Remove the Database structure from the linked list. */
- for(pp=&gShared.pDatabase; *pp!=p; pp=&((*pp)->pDbNext));
- *pp = p->pDbNext;
- /* If they were allocated from the heap, free the shared memory chunks */
- if( p->bMultiProc==0 ){
- int i;
- for(i=0; i<p->nShmChunk; i++){
- lsmFree(pDb->pEnv, p->apShmChunk[i]);
- }
- }
- /* Close any outstanding file descriptors */
- for(pIter=p->pLsmFile; pIter; pIter=pNext){
- pNext = pIter->pNext;
- lsmEnvClose(pDb->pEnv, pIter->pFile);
- lsmFree(pDb->pEnv, pIter);
- }
- freeDatabase(pDb->pEnv, p);
- }
- leaveGlobalMutex(pDb->pEnv);
- }
- }
- Level *lsmDbSnapshotLevel(Snapshot *pSnapshot){
- return pSnapshot->pLevel;
- }
- void lsmDbSnapshotSetLevel(Snapshot *pSnap, Level *pLevel){
- pSnap->pLevel = pLevel;
- }
- /* TODO: Shuffle things around to get rid of this */
- static int firstSnapshotInUse(lsm_db *, i64 *);
- /*
- ** Context object used by the lsmWalkFreelist() utility.
- */
- typedef struct WalkFreelistCtx WalkFreelistCtx;
- struct WalkFreelistCtx {
- lsm_db *pDb;
- int bReverse;
- Freelist *pFreelist;
- int iFree;
- int (*xUsr)(void *, int, i64); /* User callback function */
- void *pUsrctx; /* User callback context */
- int bDone; /* Set to true after xUsr() returns true */
- };
- /*
- ** Callback used by lsmWalkFreelist().
- */
- static int walkFreelistCb(void *pCtx, int iBlk, i64 iSnapshot){
- WalkFreelistCtx *p = (WalkFreelistCtx *)pCtx;
- const int iDir = (p->bReverse ? -1 : 1);
- Freelist *pFree = p->pFreelist;
- assert( p->bDone==0 );
- assert( iBlk>=0 );
- if( pFree ){
- while( (p->iFree < pFree->nEntry) && p->iFree>=0 ){
- FreelistEntry *pEntry = &pFree->aEntry[p->iFree];
- if( (p->bReverse==0 && pEntry->iBlk>(u32)iBlk)
- || (p->bReverse!=0 && pEntry->iBlk<(u32)iBlk)
- ){
- break;
- }else{
- p->iFree += iDir;
- if( pEntry->iId>=0
- && p->xUsr(p->pUsrctx, pEntry->iBlk, pEntry->iId)
- ){
- p->bDone = 1;
- return 1;
- }
- if( pEntry->iBlk==(u32)iBlk ) return 0;
- }
- }
- }
- if( p->xUsr(p->pUsrctx, iBlk, iSnapshot) ){
- p->bDone = 1;
- return 1;
- }
- return 0;
- }
- /*
- ** The database handle passed as the first argument must be the worker
- ** connection. This function iterates through the contents of the current
- ** free block list, invoking the supplied callback once for each list
- ** element.
- **
- ** The difference between this function and lsmSortedWalkFreelist() is
- ** that lsmSortedWalkFreelist() only considers those free-list elements
- ** stored within the LSM. This function also merges in any in-memory
- ** elements.
- */
- int lsmWalkFreelist(
- lsm_db *pDb, /* Database handle (must be worker) */
- int bReverse, /* True to iterate from largest to smallest */
- int (*x)(void *, int, i64), /* Callback function */
- void *pCtx /* First argument to pass to callback */
- ){
- const int iDir = (bReverse ? -1 : 1);
- int rc;
- int iCtx;
- WalkFreelistCtx ctx[2];
- ctx[0].pDb = pDb;
- ctx[0].bReverse = bReverse;
- ctx[0].pFreelist = &pDb->pWorker->freelist;
- if( ctx[0].pFreelist && bReverse ){
- ctx[0].iFree = ctx[0].pFreelist->nEntry-1;
- }else{
- ctx[0].iFree = 0;
- }
- ctx[0].xUsr = walkFreelistCb;
- ctx[0].pUsrctx = (void *)&ctx[1];
- ctx[0].bDone = 0;
- ctx[1].pDb = pDb;
- ctx[1].bReverse = bReverse;
- ctx[1].pFreelist = pDb->pFreelist;
- if( ctx[1].pFreelist && bReverse ){
- ctx[1].iFree = ctx[1].pFreelist->nEntry-1;
- }else{
- ctx[1].iFree = 0;
- }
- ctx[1].xUsr = x;
- ctx[1].pUsrctx = pCtx;
- ctx[1].bDone = 0;
- rc = lsmSortedWalkFreelist(pDb, bReverse, walkFreelistCb, (void *)&ctx[0]);
- if( ctx[0].bDone==0 ){
- for(iCtx=0; iCtx<2; iCtx++){
- int i;
- WalkFreelistCtx *p = &ctx[iCtx];
- for(i=p->iFree;
- p->pFreelist && rc==LSM_OK && i<p->pFreelist->nEntry && i>=0;
- i += iDir
- ){
- FreelistEntry *pEntry = &p->pFreelist->aEntry[i];
- if( pEntry->iId>=0 && p->xUsr(p->pUsrctx, pEntry->iBlk, pEntry->iId) ){
- return LSM_OK;
- }
- }
- }
- }
- return rc;
- }
- typedef struct FindFreeblockCtx FindFreeblockCtx;
- struct FindFreeblockCtx {
- i64 iInUse;
- int iRet;
- int bNotOne;
- };
- static int findFreeblockCb(void *pCtx, int iBlk, i64 iSnapshot){
- FindFreeblockCtx *p = (FindFreeblockCtx *)pCtx;
- if( iSnapshot<p->iInUse && (iBlk!=1 || p->bNotOne==0) ){
- p->iRet = iBlk;
- return 1;
- }
- return 0;
- }
- static int findFreeblock(lsm_db *pDb, i64 iInUse, int bNotOne, int *piRet){
- int rc; /* Return code */
- FindFreeblockCtx ctx; /* Context object */
- ctx.iInUse = iInUse;
- ctx.iRet = 0;
- ctx.bNotOne = bNotOne;
- rc = lsmWalkFreelist(pDb, 0, findFreeblockCb, (void *)&ctx);
- *piRet = ctx.iRet;
- return rc;
- }
- /*
- ** Allocate a new database file block to write data to, either by extending
- ** the database file or by recycling a free-list entry. The worker snapshot
- ** must be held in order to call this function.
- **
- ** If successful, *piBlk is set to the block number allocated and LSM_OK is
- ** returned. Otherwise, *piBlk is zeroed and an lsm error code returned.
- */
- int lsmBlockAllocate(lsm_db *pDb, int iBefore, int *piBlk){
- Snapshot *p = pDb->pWorker;
- int iRet = 0; /* Block number of allocated block */
- int rc = LSM_OK;
- i64 iInUse = 0; /* Snapshot id still in use */
- i64 iSynced = 0; /* Snapshot id synced to disk */
- assert( p );
- #ifdef LSM_LOG_FREELIST
- {
- static int nCall = 0;
- char *zFree = 0;
- nCall++;
- rc = lsmInfoFreelist(pDb, &zFree);
- if( rc!=LSM_OK ) return rc;
- lsmLogMessage(pDb, 0, "lsmBlockAllocate(): %d freelist: %s", nCall, zFree);
- lsmFree(pDb->pEnv, zFree);
- }
- #endif
- /* Set iInUse to the smallest snapshot id that is either:
- **
- ** * Currently in use by a database client,
- ** * May be used by a database client in the future, or
- ** * Is the most recently checkpointed snapshot (i.e. the one that will
- ** be used following recovery if a failure occurs at this point).
- */
- rc = lsmCheckpointSynced(pDb, &iSynced, 0, 0);
- if( rc==LSM_OK && iSynced==0 ) iSynced = p->iId;
- iInUse = iSynced;
- if( rc==LSM_OK && pDb->iReader>=0 ){
- assert( pDb->pClient );
- iInUse = LSM_MIN(iInUse, pDb->pClient->iId);
- }
- if( rc==LSM_OK ) rc = firstSnapshotInUse(pDb, &iInUse);
- #ifdef LSM_LOG_FREELIST
- {
- lsmLogMessage(pDb, 0, "lsmBlockAllocate(): "
- "snapshot-in-use: %lld (iSynced=%lld) (client-id=%lld)",
- iInUse, iSynced, (pDb->iReader>=0 ? pDb->pClient->iId : 0)
- );
- }
- #endif
- /* Unless there exists a read-only transaction (which prevents us from
- ** recycling any blocks regardless, query the free block list for a
- ** suitable block to reuse.
- **
- ** It might seem more natural to check for a read-only transaction at
- ** the start of this function. However, it is better do wait until after
- ** the call to lsmCheckpointSynced() to do so.
- */
- if( rc==LSM_OK ){
- int bRotrans;
- rc = lsmDetectRoTrans(pDb, &bRotrans);
- if( rc==LSM_OK && bRotrans==0 ){
- rc = findFreeblock(pDb, iInUse, (iBefore>0), &iRet);
- }
- }
- if( iBefore>0 && (iRet<=0 || iRet>=iBefore) ){
- iRet = 0;
- }else if( rc==LSM_OK ){
- /* If a block was found in the free block list, use it and remove it from
- ** the list. Otherwise, if no suitable block was found, allocate one from
- ** the end of the file. */
- if( iRet>0 ){
- #ifdef LSM_LOG_FREELIST
- lsmLogMessage(pDb, 0,
- "reusing block %d (snapshot-in-use=%lld)", iRet, iInUse);
- #endif
- rc = freelistAppend(pDb, iRet, -1);
- if( rc==LSM_OK ){
- rc = dbTruncate(pDb, iInUse);
- }
- }else{
- iRet = ++(p->nBlock);
- #ifdef LSM_LOG_FREELIST
- lsmLogMessage(pDb, 0, "extending file to %d blocks", iRet);
- #endif
- }
- }
- assert( iBefore>0 || iRet>0 || rc!=LSM_OK );
- *piBlk = iRet;
- return rc;
- }
- /*
- ** Free a database block. The worker snapshot must be held in order to call
- ** this function.
- **
- ** If successful, LSM_OK is returned. Otherwise, an lsm error code (e.g.
- ** LSM_NOMEM).
- */
- int lsmBlockFree(lsm_db *pDb, int iBlk){
- Snapshot *p = pDb->pWorker;
- assert( lsmShmAssertWorker(pDb) );
- #ifdef LSM_LOG_FREELIST
- lsmLogMessage(pDb, LSM_OK, "lsmBlockFree(): Free block %d", iBlk);
- #endif
- return freelistAppend(pDb, iBlk, p->iId);
- }
- /*
- ** Refree a database block. The worker snapshot must be held in order to call
- ** this function.
- **
- ** Refreeing is required when a block is allocated using lsmBlockAllocate()
- ** but then not used. This function is used to push the block back onto
- ** the freelist. Refreeing a block is different from freeing is, as a refreed
- ** block may be reused immediately. Whereas a freed block can not be reused
- ** until (at least) after the next checkpoint.
- */
- int lsmBlockRefree(lsm_db *pDb, int iBlk){
- int rc = LSM_OK; /* Return code */
- #ifdef LSM_LOG_FREELIST
- lsmLogMessage(pDb, LSM_OK, "lsmBlockRefree(): Refree block %d", iBlk);
- #endif
- rc = freelistAppend(pDb, iBlk, 0);
- return rc;
- }
- /*
- ** If required, copy a database checkpoint from shared memory into the
- ** database itself.
- **
- ** The WORKER lock must not be held when this is called. This is because
- ** this function may indirectly call fsync(). And the WORKER lock should
- ** not be held that long (in case it is required by a client flushing an
- ** in-memory tree to disk).
- */
- int lsmCheckpointWrite(lsm_db *pDb, u32 *pnWrite){
- int rc; /* Return Code */
- u32 nWrite = 0;
- assert( pDb->pWorker==0 );
- assert( 1 || pDb->pClient==0 );
- assert( lsmShmAssertLock(pDb, LSM_LOCK_WORKER, LSM_LOCK_UNLOCK) );
- rc = lsmShmLock(pDb, LSM_LOCK_CHECKPOINTER, LSM_LOCK_EXCL, 0);
- if( rc!=LSM_OK ) return rc;
- rc = lsmCheckpointLoad(pDb, 0);
- if( rc==LSM_OK ){
- int nBlock = lsmCheckpointNBlock(pDb->aSnapshot);
- ShmHeader *pShm = pDb->pShmhdr;
- int bDone = 0; /* True if checkpoint is already stored */
- /* Check if this checkpoint has already been written to the database
- ** file. If so, set variable bDone to true. */
- if( pShm->iMetaPage ){
- MetaPage *pPg; /* Meta page */
- u8 *aData; /* Meta-page data buffer */
- int nData; /* Size of aData[] in bytes */
- i64 iCkpt; /* Id of checkpoint just loaded */
- i64 iDisk = 0; /* Id of checkpoint already stored in db */
- iCkpt = lsmCheckpointId(pDb->aSnapshot, 0);
- rc = lsmFsMetaPageGet(pDb->pFS, 0, pShm->iMetaPage, &pPg);
- if( rc==LSM_OK ){
- aData = lsmFsMetaPageData(pPg, &nData);
- iDisk = lsmCheckpointId((u32 *)aData, 1);
- nWrite = lsmCheckpointNWrite((u32 *)aData, 1);
- lsmFsMetaPageRelease(pPg);
- }
- bDone = (iDisk>=iCkpt);
- }
- if( rc==LSM_OK && bDone==0 ){
- int iMeta = (pShm->iMetaPage % 2) + 1;
- if( pDb->eSafety!=LSM_SAFETY_OFF ){
- rc = lsmFsSyncDb(pDb->pFS, nBlock);
- }
- if( rc==LSM_OK ) rc = lsmCheckpointStore(pDb, iMeta);
- if( rc==LSM_OK && pDb->eSafety!=LSM_SAFETY_OFF){
- rc = lsmFsSyncDb(pDb->pFS, 0);
- }
- if( rc==LSM_OK ){
- pShm->iMetaPage = iMeta;
- nWrite = lsmCheckpointNWrite(pDb->aSnapshot, 0) - nWrite;
- }
- #ifdef LSM_LOG_WORK
- lsmLogMessage(pDb, 0, "finish checkpoint %d",
- (int)lsmCheckpointId(pDb->aSnapshot, 0)
- );
- #endif
- }
- }
- lsmShmLock(pDb, LSM_LOCK_CHECKPOINTER, LSM_LOCK_UNLOCK, 0);
- if( pnWrite && rc==LSM_OK ) *pnWrite = nWrite;
- return rc;
- }
- int lsmBeginWork(lsm_db *pDb){
- int rc;
- /* Attempt to take the WORKER lock */
- rc = lsmShmLock(pDb, LSM_LOCK_WORKER, LSM_LOCK_EXCL, 0);
- /* Deserialize the current worker snapshot */
- if( rc==LSM_OK ){
- rc = lsmCheckpointLoadWorker(pDb);
- }
- return rc;
- }
- void lsmFreeSnapshot(lsm_env *pEnv, Snapshot *p){
- if( p ){
- lsmSortedFreeLevel(pEnv, p->pLevel);
- lsmFree(pEnv, p->freelist.aEntry);
- lsmFree(pEnv, p->redirect.a);
- lsmFree(pEnv, p);
- }
- }
- /*
- ** Attempt to populate one of the read-lock slots to contain lock values
- ** iLsm/iShm. Or, if such a slot exists already, this function is a no-op.
- **
- ** It is not an error if no slot can be populated because the write-lock
- ** cannot be obtained. If any other error occurs, return an LSM error code.
- ** Otherwise, LSM_OK.
- **
- ** This function is called at various points to try to ensure that there
- ** always exists at least one read-lock slot that can be used by a read-only
- ** client. And so that, in the usual case, there is an "exact match" available
- ** whenever a read transaction is opened by any client. At present this
- ** function is called when:
- **
- ** * A write transaction that called lsmTreeDiscardOld() is committed, and
- ** * Whenever the working snapshot is updated (i.e. lsmFinishWork()).
- */
- static int dbSetReadLock(lsm_db *db, i64 iLsm, u32 iShm){
- int rc = LSM_OK;
- ShmHeader *pShm = db->pShmhdr;
- int i;
- /* Check if there is already a slot containing the required values. */
- for(i=0; i<LSM_LOCK_NREADER; i++){
- ShmReader *p = &pShm->aReader[i];
- if( p->iLsmId==iLsm && p->iTreeId==iShm ) return LSM_OK;
- }
- /* Iterate through all read-lock slots, attempting to take a write-lock
- ** on each of them. If a write-lock succeeds, populate the locked slot
- ** with the required values and break out of the loop. */
- for(i=0; rc==LSM_OK && i<LSM_LOCK_NREADER; i++){
- rc = lsmShmLock(db, LSM_LOCK_READER(i), LSM_LOCK_EXCL, 0);
- if( rc==LSM_BUSY ){
- rc = LSM_OK;
- }else{
- ShmReader *p = &pShm->aReader[i];
- p->iLsmId = iLsm;
- p->iTreeId = iShm;
- lsmShmLock(db, LSM_LOCK_READER(i), LSM_LOCK_UNLOCK, 0);
- break;
- }
- }
- return rc;
- }
- /*
- ** Release the read-lock currently held by connection db.
- */
- int dbReleaseReadlock(lsm_db *db){
- int rc = LSM_OK;
- if( db->iReader>=0 ){
- rc = lsmShmLock(db, LSM_LOCK_READER(db->iReader), LSM_LOCK_UNLOCK, 0);
- db->iReader = -1;
- }
- db->bRoTrans = 0;
- return rc;
- }
- /*
- ** Argument bFlush is true if the contents of the in-memory tree has just
- ** been flushed to disk. The significance of this is that once the snapshot
- ** created to hold the updated state of the database is synced to disk, log
- ** file space can be recycled.
- */
- void lsmFinishWork(lsm_db *pDb, int bFlush, int *pRc){
- int rc = *pRc;
- assert( rc!=0 || pDb->pWorker );
- if( pDb->pWorker ){
- /* If no error has occurred, serialize the worker snapshot and write
- ** it to shared memory. */
- if( rc==LSM_OK ){
- rc = lsmSaveWorker(pDb, bFlush);
- }
- /* Assuming no error has occurred, update a read lock slot with the
- ** new snapshot id (see comments above function dbSetReadLock()). */
- if( rc==LSM_OK ){
- if( pDb->iReader<0 ){
- rc = lsmTreeLoadHeader(pDb, 0);
- }
- if( rc==LSM_OK ){
- rc = dbSetReadLock(pDb, pDb->pWorker->iId, pDb->treehdr.iUsedShmid);
- }
- }
- /* Free the snapshot object. */
- lsmFreeSnapshot(pDb->pEnv, pDb->pWorker);
- pDb->pWorker = 0;
- }
- lsmShmLock(pDb, LSM_LOCK_WORKER, LSM_LOCK_UNLOCK, 0);
- *pRc = rc;
- }
- /*
- ** Called when recovery is finished.
- */
- int lsmFinishRecovery(lsm_db *pDb){
- lsmTreeEndTransaction(pDb, 1);
- return LSM_OK;
- }
- /*
- ** Check if the currently configured compression functions
- ** (LSM_CONFIG_SET_COMPRESSION) are compatible with a database that has its
- ** compression id set to iReq. Compression routines are compatible if iReq
- ** is zero (indicating the database is empty), or if it is equal to the
- ** compression id of the configured compression routines.
- **
- ** If the check shows that the current compression are incompatible and there
- ** is a compression factory registered, give it a chance to install new
- ** compression routines.
- **
- ** If, after any registered factory is invoked, the compression functions
- ** are still incompatible, return LSM_MISMATCH. Otherwise, LSM_OK.
- */
- int lsmCheckCompressionId(lsm_db *pDb, u32 iReq){
- if( iReq!=LSM_COMPRESSION_EMPTY && pDb->compress.iId!=iReq ){
- if( pDb->factory.xFactory ){
- pDb->bInFactory = 1;
- pDb->factory.xFactory(pDb->factory.pCtx, pDb, iReq);
- pDb->bInFactory = 0;
- }
- if( pDb->compress.iId!=iReq ){
- /* Incompatible */
- return LSM_MISMATCH;
- }
- }
- /* Compatible */
- return LSM_OK;
- }
- /*
- ** Begin a read transaction. This function is a no-op if the connection
- ** passed as the only argument already has an open read transaction.
- */
- int lsmBeginReadTrans(lsm_db *pDb){
- const int MAX_READLOCK_ATTEMPTS = 10;
- const int nMaxAttempt = (pDb->bRoTrans ? 1 : MAX_READLOCK_ATTEMPTS);
- int rc = LSM_OK; /* Return code */
- int iAttempt = 0;
- assert( pDb->pWorker==0 );
- while( rc==LSM_OK && pDb->iReader<0 && (iAttempt++)<nMaxAttempt ){
- int iTreehdr = 0;
- int iSnap = 0;
- assert( pDb->pCsr==0 && pDb->nTransOpen==0 );
- /* Load the in-memory tree header. */
- rc = lsmTreeLoadHeader(pDb, &iTreehdr);
- /* Load the database snapshot */
- if( rc==LSM_OK ){
- if( lsmCheckpointClientCacheOk(pDb)==0 ){
- lsmFreeSnapshot(pDb->pEnv, pDb->pClient);
- pDb->pClient = 0;
- lsmMCursorFreeCache(pDb);
- lsmFsPurgeCache(pDb->pFS);
- rc = lsmCheckpointLoad(pDb, &iSnap);
- }else{
- iSnap = 1;
- }
- }
- /* Take a read-lock on the tree and snapshot just loaded. Then check
- ** that the shared-memory still contains the same values. If so, proceed.
- ** Otherwise, relinquish the read-lock and retry the whole procedure
- ** (starting with loading the in-memory tree header). */
- if( rc==LSM_OK ){
- u32 iShmMax = pDb->treehdr.iUsedShmid;
- u32 iShmMin = pDb->treehdr.iNextShmid+1-LSM_MAX_SHMCHUNKS;
- rc = lsmReadlock(
- pDb, lsmCheckpointId(pDb->aSnapshot, 0), iShmMin, iShmMax
- );
- if( rc==LSM_OK ){
- if( lsmTreeLoadHeaderOk(pDb, iTreehdr)
- && lsmCheckpointLoadOk(pDb, iSnap)
- ){
- /* Read lock has been successfully obtained. Deserialize the
- ** checkpoint just loaded. TODO: This will be removed after
- ** lsm_sorted.c is changed to work directly from the serialized
- ** version of the snapshot. */
- if( pDb->pClient==0 ){
- rc = lsmCheckpointDeserialize(pDb, 0, pDb->aSnapshot,&pDb->pClient);
- }
- assert( (rc==LSM_OK)==(pDb->pClient!=0) );
- assert( pDb->iReader>=0 );
- /* Check that the client has the right compression hooks loaded.
- ** If not, set rc to LSM_MISMATCH. */
- if( rc==LSM_OK ){
- rc = lsmCheckCompressionId(pDb, pDb->pClient->iCmpId);
- }
- }else{
- rc = dbReleaseReadlock(pDb);
- }
- }
- if( rc==LSM_BUSY ){
- rc = LSM_OK;
- }
- }
- #if 0
- if( rc==LSM_OK && pDb->pClient ){
- fprintf(stderr,
- "reading %p: snapshot:%d used-shmid:%d trans-id:%d iOldShmid=%d\n",
- (void *)pDb,
- (int)pDb->pClient->iId, (int)pDb->treehdr.iUsedShmid,
- (int)pDb->treehdr.root.iTransId,
- (int)pDb->treehdr.iOldShmid
- );
- }
- #endif
- }
- if( rc==LSM_OK ){
- rc = lsmShmCacheChunks(pDb, pDb->treehdr.nChunk);
- }
- if( rc!=LSM_OK ){
- dbReleaseReadlock(pDb);
- }
- if( pDb->pClient==0 && rc==LSM_OK ) rc = LSM_BUSY;
- return rc;
- }
- /*
- ** This function is used by a read-write connection to determine if there
- ** are currently one or more read-only transactions open on the database
- ** (in this context a read-only transaction is one opened by a read-only
- ** connection on a non-live database).
- **
- ** If no error occurs, LSM_OK is returned and *pbExists is set to true if
- ** some other connection has a read-only transaction open, or false
- ** otherwise. If an error occurs an LSM error code is returned and the final
- ** value of *pbExist is undefined.
- */
- int lsmDetectRoTrans(lsm_db *db, int *pbExist){
- int rc;
- /* Only a read-write connection may use this function. */
- assert( db->bReadonly==0 );
- rc = lsmShmTestLock(db, LSM_LOCK_ROTRANS, 1, LSM_LOCK_EXCL);
- if( rc==LSM_BUSY ){
- *pbExist = 1;
- rc = LSM_OK;
- }else{
- *pbExist = 0;
- }
- return rc;
- }
- /*
- ** db is a read-only database handle in the disconnected state. This function
- ** attempts to open a read-transaction on the database. This may involve
- ** connecting to the database system (opening shared memory etc.).
- */
- int lsmBeginRoTrans(lsm_db *db){
- int rc = LSM_OK;
- assert( db->bReadonly && db->pShmhdr==0 );
- assert( db->iReader<0 );
- if( db->bRoTrans==0 ){
- /* Attempt a shared-lock on DMS1. */
- rc = lsmShmLock(db, LSM_LOCK_DMS1, LSM_LOCK_SHARED, 0);
- if( rc!=LSM_OK ) return rc;
- rc = lsmShmTestLock(
- db, LSM_LOCK_RWCLIENT(0), LSM_LOCK_NREADER, LSM_LOCK_SHARED
- );
- if( rc==LSM_OK ){
- /* System is not live. Take a SHARED lock on the ROTRANS byte and
- ** release DMS1. Locking ROTRANS tells all read-write clients that they
- ** may not recycle any disk space from within the database or log files,
- ** as a read-only client may be using it. */
- rc = lsmShmLock(db, LSM_LOCK_ROTRANS, LSM_LOCK_SHARED, 0);
- lsmShmLock(db, LSM_LOCK_DMS1, LSM_LOCK_UNLOCK, 0);
- if( rc==LSM_OK ){
- db->bRoTrans = 1;
- rc = lsmShmCacheChunks(db, 1);
- if( rc==LSM_OK ){
- db->pShmhdr = (ShmHeader *)db->apShm[0];
- memset(db->pShmhdr, 0, sizeof(ShmHeader));
- rc = lsmCheckpointRecover(db);
- if( rc==LSM_OK ){
- rc = lsmLogRecover(db);
- }
- }
- }
- }else if( rc==LSM_BUSY ){
- /* System is live! */
- rc = lsmShmLock(db, LSM_LOCK_DMS3, LSM_LOCK_SHARED, 0);
- lsmShmLock(db, LSM_LOCK_DMS1, LSM_LOCK_UNLOCK, 0);
- if( rc==LSM_OK ){
- rc = lsmShmCacheChunks(db, 1);
- if( rc==LSM_OK ){
- db->pShmhdr = (ShmHeader *)db->apShm[0];
- }
- }
- }
- /* In 'lsm_open()' we don't update the page and block sizes in the
- ** Filesystem for 'readonly' connection. Because member 'db->pShmhdr' is a
- ** nullpointer, this prevents loading a checkpoint. Now that the system is
- ** live this member should be set. So we can update both values in
- ** the Filesystem.
- **
- ** Configure the file-system connection with the page-size and block-size
- ** of this database. Even if the database file is zero bytes in size
- ** on disk, these values have been set in shared-memory by now, and so
- ** are guaranteed not to change during the lifetime of this connection. */
- if( LSM_OK==rc
- && 0==lsmCheckpointClientCacheOk(db)
- && LSM_OK==(rc=lsmCheckpointLoad(db, 0))
- ){
- lsmFsSetPageSize(db->pFS, lsmCheckpointPgsz(db->aSnapshot));
- lsmFsSetBlockSize(db->pFS, lsmCheckpointBlksz(db->aSnapshot));
- }
- if( rc==LSM_OK ){
- rc = lsmBeginReadTrans(db);
- }
- }
- return rc;
- }
- /*
- ** Close the currently open read transaction.
- */
- void lsmFinishReadTrans(lsm_db *pDb){
- /* Worker connections should not be closing read transactions. And
- ** read transactions should only be closed after all cursors and write
- ** transactions have been closed. Finally pClient should be non-NULL
- ** only iff pDb->iReader>=0. */
- assert( pDb->pWorker==0 );
- assert( pDb->pCsr==0 && pDb->nTransOpen==0 );
- if( pDb->bRoTrans ){
- int i;
- for(i=0; i<pDb->nShm; i++){
- lsmFree(pDb->pEnv, pDb->apShm[i]);
- }
- lsmFree(pDb->pEnv, pDb->apShm);
- pDb->apShm = 0;
- pDb->nShm = 0;
- pDb->pShmhdr = 0;
- lsmShmLock(pDb, LSM_LOCK_ROTRANS, LSM_LOCK_UNLOCK, 0);
- }
- dbReleaseReadlock(pDb);
- }
- /*
- ** Open a write transaction.
- */
- int lsmBeginWriteTrans(lsm_db *pDb){
- int rc = LSM_OK; /* Return code */
- ShmHeader *pShm = pDb->pShmhdr; /* Shared memory header */
- assert( pDb->nTransOpen==0 );
- assert( pDb->bDiscardOld==0 );
- assert( pDb->bReadonly==0 );
- /* If there is no read-transaction open, open one now. */
- if( pDb->iReader<0 ){
- rc = lsmBeginReadTrans(pDb);
- }
- /* Attempt to take the WRITER lock */
- if( rc==LSM_OK ){
- rc = lsmShmLock(pDb, LSM_LOCK_WRITER, LSM_LOCK_EXCL, 0);
- }
- /* If the previous writer failed mid-transaction, run emergency rollback. */
- if( rc==LSM_OK && pShm->bWriter ){
- rc = lsmTreeRepair(pDb);
- if( rc==LSM_OK ) pShm->bWriter = 0;
- }
- /* Check that this connection is currently reading from the most recent
- ** version of the database. If not, return LSM_BUSY. */
- if( rc==LSM_OK && memcmp(&pShm->hdr1, &pDb->treehdr, sizeof(TreeHeader)) ){
- rc = LSM_BUSY;
- }
- if( rc==LSM_OK ){
- rc = lsmLogBegin(pDb);
- }
- /* If everything was successful, set the "transaction-in-progress" flag
- ** and return LSM_OK. Otherwise, if some error occurred, relinquish the
- ** WRITER lock and return an error code. */
- if( rc==LSM_OK ){
- TreeHeader *p = &pDb->treehdr;
- pShm->bWriter = 1;
- p->root.iTransId++;
- if( lsmTreeHasOld(pDb) && p->iOldLog==pDb->pClient->iLogOff ){
- lsmTreeDiscardOld(pDb);
- pDb->bDiscardOld = 1;
- }
- }else{
- lsmShmLock(pDb, LSM_LOCK_WRITER, LSM_LOCK_UNLOCK, 0);
- if( pDb->pCsr==0 ) lsmFinishReadTrans(pDb);
- }
- return rc;
- }
- /*
- ** End the current write transaction. The connection is left with an open
- ** read transaction. It is an error to call this if there is no open write
- ** transaction.
- **
- ** If the transaction was committed, then a commit record has already been
- ** written into the log file when this function is called. Or, if the
- ** transaction was rolled back, both the log file and in-memory tree
- ** structure have already been restored. In either case, this function
- ** merely releases locks and other resources held by the write-transaction.
- **
- ** LSM_OK is returned if successful, or an LSM error code otherwise.
- */
- int lsmFinishWriteTrans(lsm_db *pDb, int bCommit){
- int rc = LSM_OK;
- int bFlush = 0;
- lsmLogEnd(pDb, bCommit);
- if( rc==LSM_OK && bCommit && lsmTreeSize(pDb)>pDb->nTreeLimit ){
- bFlush = 1;
- lsmTreeMakeOld(pDb);
- }
- lsmTreeEndTransaction(pDb, bCommit);
- if( rc==LSM_OK ){
- if( bFlush && pDb->bAutowork ){
- rc = lsmSortedAutoWork(pDb, 1);
- }else if( bCommit && pDb->bDiscardOld ){
- rc = dbSetReadLock(pDb, pDb->pClient->iId, pDb->treehdr.iUsedShmid);
- }
- }
- pDb->bDiscardOld = 0;
- lsmShmLock(pDb, LSM_LOCK_WRITER, LSM_LOCK_UNLOCK, 0);
- if( bFlush && pDb->bAutowork==0 && pDb->xWork ){
- pDb->xWork(pDb, pDb->pWorkCtx);
- }
- return rc;
- }
- /*
- ** Return non-zero if the caller is holding the client mutex.
- */
- #ifdef LSM_DEBUG
- int lsmHoldingClientMutex(lsm_db *pDb){
- return lsmMutexHeld(pDb->pEnv, pDb->pDatabase->pClientMutex);
- }
- #endif
- static int slotIsUsable(ShmReader *p, i64 iLsm, u32 iShmMin, u32 iShmMax){
- return(
- p->iLsmId && p->iLsmId<=iLsm
- && shm_sequence_ge(iShmMax, p->iTreeId)
- && shm_sequence_ge(p->iTreeId, iShmMin)
- );
- }
- /*
- ** Obtain a read-lock on database version identified by the combination
- ** of snapshot iLsm and tree iTree. Return LSM_OK if successful, or
- ** an LSM error code otherwise.
- */
- int lsmReadlock(lsm_db *db, i64 iLsm, u32 iShmMin, u32 iShmMax){
- int rc = LSM_OK;
- ShmHeader *pShm = db->pShmhdr;
- int i;
- assert( db->iReader<0 );
- assert( shm_sequence_ge(iShmMax, iShmMin) );
- /* This is a no-op if the read-only transaction flag is set. */
- if( db->bRoTrans ){
- db->iReader = 0;
- return LSM_OK;
- }
- /* Search for an exact match. */
- for(i=0; db->iReader<0 && rc==LSM_OK && i<LSM_LOCK_NREADER; i++){
- ShmReader *p = &pShm->aReader[i];
- if( p->iLsmId==iLsm && p->iTreeId==iShmMax ){
- rc = lsmShmLock(db, LSM_LOCK_READER(i), LSM_LOCK_SHARED, 0);
- if( rc==LSM_OK && p->iLsmId==iLsm && p->iTreeId==iShmMax ){
- db->iReader = i;
- }else if( rc==LSM_BUSY ){
- rc = LSM_OK;
- }
- }
- }
- /* Try to obtain a write-lock on each slot, in order. If successful, set
- ** the slot values to iLsm/iTree. */
- for(i=0; db->iReader<0 && rc==LSM_OK && i<LSM_LOCK_NREADER; i++){
- rc = lsmShmLock(db, LSM_LOCK_READER(i), LSM_LOCK_EXCL, 0);
- if( rc==LSM_BUSY ){
- rc = LSM_OK;
- }else{
- ShmReader *p = &pShm->aReader[i];
- p->iLsmId = iLsm;
- p->iTreeId = iShmMax;
- rc = lsmShmLock(db, LSM_LOCK_READER(i), LSM_LOCK_SHARED, 0);
- assert( rc!=LSM_BUSY );
- if( rc==LSM_OK ) db->iReader = i;
- }
- }
- /* Search for any usable slot */
- for(i=0; db->iReader<0 && rc==LSM_OK && i<LSM_LOCK_NREADER; i++){
- ShmReader *p = &pShm->aReader[i];
- if( slotIsUsable(p, iLsm, iShmMin, iShmMax) ){
- rc = lsmShmLock(db, LSM_LOCK_READER(i), LSM_LOCK_SHARED, 0);
- if( rc==LSM_OK && slotIsUsable(p, iLsm, iShmMin, iShmMax) ){
- db->iReader = i;
- }else if( rc==LSM_BUSY ){
- rc = LSM_OK;
- }
- }
- }
- if( rc==LSM_OK && db->iReader<0 ){
- rc = LSM_BUSY;
- }
- return rc;
- }
- /*
- ** This is used to check if there exists a read-lock locking a particular
- ** version of either the in-memory tree or database file.
- **
- ** If iLsmId is non-zero, then it is a snapshot id. If there exists a
- ** read-lock using this snapshot or newer, set *pbInUse to true. Or,
- ** if there is no such read-lock, set it to false.
- **
- ** Or, if iLsmId is zero, then iShmid is a shared-memory sequence id.
- ** Search for a read-lock using this sequence id or newer. etc.
- */
- static int isInUse(lsm_db *db, i64 iLsmId, u32 iShmid, int *pbInUse){
- ShmHeader *pShm = db->pShmhdr;
- int i;
- int rc = LSM_OK;
- for(i=0; rc==LSM_OK && i<LSM_LOCK_NREADER; i++){
- ShmReader *p = &pShm->aReader[i];
- if( p->iLsmId ){
- if( (iLsmId!=0 && p->iLsmId!=0 && iLsmId>=p->iLsmId)
- || (iLsmId==0 && shm_sequence_ge(p->iTreeId, iShmid))
- ){
- rc = lsmShmLock(db, LSM_LOCK_READER(i), LSM_LOCK_EXCL, 0);
- if( rc==LSM_OK ){
- p->iLsmId = 0;
- lsmShmLock(db, LSM_LOCK_READER(i), LSM_LOCK_UNLOCK, 0);
- }
- }
- }
- }
- if( rc==LSM_BUSY ){
- *pbInUse = 1;
- return LSM_OK;
- }
- *pbInUse = 0;
- return rc;
- }
- /*
- ** This function is called by worker connections to determine the smallest
- ** snapshot id that is currently in use by a database client. The worker
- ** connection uses this result to determine whether or not it is safe to
- ** recycle a database block.
- */
- static int firstSnapshotInUse(
- lsm_db *db, /* Database handle */
- i64 *piInUse /* IN/OUT: Smallest snapshot id in use */
- ){
- ShmHeader *pShm = db->pShmhdr;
- i64 iInUse = *piInUse;
- int i;
- assert( iInUse>0 );
- for(i=0; i<LSM_LOCK_NREADER; i++){
- ShmReader *p = &pShm->aReader[i];
- if( p->iLsmId ){
- i64 iThis = p->iLsmId;
- if( iThis!=0 && iInUse>iThis ){
- int rc = lsmShmLock(db, LSM_LOCK_READER(i), LSM_LOCK_EXCL, 0);
- if( rc==LSM_OK ){
- p->iLsmId = 0;
- lsmShmLock(db, LSM_LOCK_READER(i), LSM_LOCK_UNLOCK, 0);
- }else if( rc==LSM_BUSY ){
- iInUse = iThis;
- }else{
- /* Some error other than LSM_BUSY. Return the error code to
- ** the caller in this case. */
- return rc;
- }
- }
- }
- }
- *piInUse = iInUse;
- return LSM_OK;
- }
- int lsmTreeInUse(lsm_db *db, u32 iShmid, int *pbInUse){
- if( db->treehdr.iUsedShmid==iShmid ){
- *pbInUse = 1;
- return LSM_OK;
- }
- return isInUse(db, 0, iShmid, pbInUse);
- }
- int lsmLsmInUse(lsm_db *db, i64 iLsmId, int *pbInUse){
- if( db->pClient && db->pClient->iId<=iLsmId ){
- *pbInUse = 1;
- return LSM_OK;
- }
- return isInUse(db, iLsmId, 0, pbInUse);
- }
- /*
- ** This function may only be called after a successful call to
- ** lsmDbDatabaseConnect(). It returns true if the connection is in
- ** multi-process mode, or false otherwise.
- */
- int lsmDbMultiProc(lsm_db *pDb){
- return pDb->pDatabase && pDb->pDatabase->bMultiProc;
- }
- /*************************************************************************
- **************************************************************************
- **************************************************************************
- **************************************************************************
- **************************************************************************
- *************************************************************************/
- /*
- ** Ensure that database connection db has cached pointers to at least the
- ** first nChunk chunks of shared memory.
- */
- int lsmShmCacheChunks(lsm_db *db, int nChunk){
- int rc = LSM_OK;
- if( nChunk>db->nShm ){
- static const int NINCR = 16;
- Database *p = db->pDatabase;
- lsm_env *pEnv = db->pEnv;
- int nAlloc;
- int i;
- /* Ensure that the db->apShm[] array is large enough. If an attempt to
- ** allocate memory fails, return LSM_NOMEM immediately. The apShm[] array
- ** is always extended in multiples of 16 entries - so the actual allocated
- ** size can be inferred from nShm. */
- nAlloc = ((db->nShm + NINCR - 1) / NINCR) * NINCR;
- while( nChunk>=nAlloc ){
- void **apShm;
- nAlloc += NINCR;
- apShm = lsmRealloc(pEnv, db->apShm, sizeof(void*)*nAlloc);
- if( !apShm ) return LSM_NOMEM_BKPT;
- db->apShm = apShm;
- }
- if( db->bRoTrans ){
- for(i=db->nShm; rc==LSM_OK && i<nChunk; i++){
- db->apShm[i] = lsmMallocZeroRc(pEnv, LSM_SHM_CHUNK_SIZE, &rc);
- db->nShm++;
- }
- }else{
- /* Enter the client mutex */
- lsmMutexEnter(pEnv, p->pClientMutex);
- /* Extend the Database objects apShmChunk[] array if necessary. Using the
- ** same pattern as for the lsm_db.apShm[] array above. */
- nAlloc = ((p->nShmChunk + NINCR - 1) / NINCR) * NINCR;
- while( nChunk>=nAlloc ){
- void **apShm;
- nAlloc += NINCR;
- apShm = lsmRealloc(pEnv, p->apShmChunk, sizeof(void*)*nAlloc);
- if( !apShm ){
- rc = LSM_NOMEM_BKPT;
- break;
- }
- p->apShmChunk = apShm;
- }
- for(i=db->nShm; rc==LSM_OK && i<nChunk; i++){
- if( i>=p->nShmChunk ){
- void *pChunk = 0;
- if( p->bMultiProc==0 ){
- /* Single process mode */
- pChunk = lsmMallocZeroRc(pEnv, LSM_SHM_CHUNK_SIZE, &rc);
- }else{
- /* Multi-process mode */
- rc = lsmEnvShmMap(pEnv, p->pFile, i, LSM_SHM_CHUNK_SIZE, &pChunk);
- }
- if( rc==LSM_OK ){
- p->apShmChunk[i] = pChunk;
- p->nShmChunk++;
- }
- }
- if( rc==LSM_OK ){
- db->apShm[i] = p->apShmChunk[i];
- db->nShm++;
- }
- }
- /* Release the client mutex */
- lsmMutexLeave(pEnv, p->pClientMutex);
- }
- }
- return rc;
- }
- static int lockSharedFile(lsm_env *pEnv, Database *p, int iLock, int eOp){
- int rc = LSM_OK;
- if( p->bMultiProc ){
- rc = lsmEnvLock(pEnv, p->pFile, iLock, eOp);
- }
- return rc;
- }
- /*
- ** Test if it would be possible for connection db to obtain a lock of type
- ** eType on the nLock locks starting at iLock. If so, return LSM_OK. If it
- ** would not be possible to obtain the lock due to a lock held by another
- ** connection, return LSM_BUSY. If an IO or other error occurs (i.e. in the
- ** lsm_env.xTestLock function), return some other LSM error code.
- **
- ** Note that this function never actually locks the database - it merely
- ** queries the system to see if there exists a lock that would prevent
- ** it from doing so.
- */
- int lsmShmTestLock(
- lsm_db *db,
- int iLock,
- int nLock,
- int eOp
- ){
- int rc = LSM_OK;
- lsm_db *pIter;
- Database *p = db->pDatabase;
- int i;
- u64 mask = 0;
- for(i=iLock; i<(iLock+nLock); i++){
- mask |= ((u64)1 << (iLock-1));
- if( eOp==LSM_LOCK_EXCL ) mask |= ((u64)1 << (iLock+32-1));
- }
- lsmMutexEnter(db->pEnv, p->pClientMutex);
- for(pIter=p->pConn; pIter; pIter=pIter->pNext){
- if( pIter!=db && (pIter->mLock & mask) ){
- assert( pIter!=db );
- break;
- }
- }
- if( pIter ){
- rc = LSM_BUSY;
- }else if( p->bMultiProc ){
- rc = lsmEnvTestLock(db->pEnv, p->pFile, iLock, nLock, eOp);
- }
- lsmMutexLeave(db->pEnv, p->pClientMutex);
- return rc;
- }
- /*
- ** Attempt to obtain the lock identified by the iLock and bExcl parameters.
- ** If successful, return LSM_OK. If the lock cannot be obtained because
- ** there exists some other conflicting lock, return LSM_BUSY. If some other
- ** error occurs, return an LSM error code.
- **
- ** Parameter iLock must be one of LSM_LOCK_WRITER, WORKER or CHECKPOINTER,
- ** or else a value returned by the LSM_LOCK_READER macro.
- */
- int lsmShmLock(
- lsm_db *db,
- int iLock,
- int eOp, /* One of LSM_LOCK_UNLOCK, SHARED or EXCL */
- int bBlock /* True for a blocking lock */
- ){
- lsm_db *pIter;
- const u64 me = ((u64)1 << (iLock-1));
- const u64 ms = ((u64)1 << (iLock+32-1));
- int rc = LSM_OK;
- Database *p = db->pDatabase;
- assert( eOp!=LSM_LOCK_EXCL || p->bReadonly==0 );
- assert( iLock>=1 && iLock<=LSM_LOCK_RWCLIENT(LSM_LOCK_NRWCLIENT-1) );
- assert( LSM_LOCK_RWCLIENT(LSM_LOCK_NRWCLIENT-1)<=32 );
- assert( eOp==LSM_LOCK_UNLOCK || eOp==LSM_LOCK_SHARED || eOp==LSM_LOCK_EXCL );
- /* Check for a no-op. Proceed only if this is not one of those. */
- if( (eOp==LSM_LOCK_UNLOCK && (db->mLock & (me|ms))!=0)
- || (eOp==LSM_LOCK_SHARED && (db->mLock & (me|ms))!=ms)
- || (eOp==LSM_LOCK_EXCL && (db->mLock & me)==0)
- ){
- int nExcl = 0; /* Number of connections holding EXCLUSIVE */
- int nShared = 0; /* Number of connections holding SHARED */
- lsmMutexEnter(db->pEnv, p->pClientMutex);
- /* Figure out the locks currently held by this process on iLock, not
- ** including any held by connection db. */
- for(pIter=p->pConn; pIter; pIter=pIter->pNext){
- assert( (pIter->mLock & me)==0 || (pIter->mLock & ms)!=0 );
- if( pIter!=db ){
- if( pIter->mLock & me ){
- nExcl++;
- }else if( pIter->mLock & ms ){
- nShared++;
- }
- }
- }
- assert( nExcl==0 || nExcl==1 );
- assert( nExcl==0 || nShared==0 );
- assert( nExcl==0 || (db->mLock & (me|ms))==0 );
- switch( eOp ){
- case LSM_LOCK_UNLOCK:
- if( nShared==0 ){
- lockSharedFile(db->pEnv, p, iLock, LSM_LOCK_UNLOCK);
- }
- db->mLock &= ~(me|ms);
- break;
- case LSM_LOCK_SHARED:
- if( nExcl ){
- rc = LSM_BUSY;
- }else{
- if( nShared==0 ){
- rc = lockSharedFile(db->pEnv, p, iLock, LSM_LOCK_SHARED);
- }
- if( rc==LSM_OK ){
- db->mLock |= ms;
- db->mLock &= ~me;
- }
- }
- break;
- default:
- assert( eOp==LSM_LOCK_EXCL );
- if( nExcl || nShared ){
- rc = LSM_BUSY;
- }else{
- rc = lockSharedFile(db->pEnv, p, iLock, LSM_LOCK_EXCL);
- if( rc==LSM_OK ){
- db->mLock |= (me|ms);
- }
- }
- break;
- }
- lsmMutexLeave(db->pEnv, p->pClientMutex);
- }
- return rc;
- }
- #ifdef LSM_DEBUG
- int shmLockType(lsm_db *db, int iLock){
- const u64 me = ((u64)1 << (iLock-1));
- const u64 ms = ((u64)1 << (iLock+32-1));
- if( db->mLock & me ) return LSM_LOCK_EXCL;
- if( db->mLock & ms ) return LSM_LOCK_SHARED;
- return LSM_LOCK_UNLOCK;
- }
- /*
- ** The arguments passed to this function are similar to those passed to
- ** the lsmShmLock() function. However, instead of obtaining a new lock
- ** this function returns true if the specified connection already holds
- ** (or does not hold) such a lock, depending on the value of eOp. As
- ** follows:
- **
- ** (eOp==LSM_LOCK_UNLOCK) -> true if db has no lock on iLock
- ** (eOp==LSM_LOCK_SHARED) -> true if db has at least a SHARED lock on iLock.
- ** (eOp==LSM_LOCK_EXCL) -> true if db has an EXCLUSIVE lock on iLock.
- */
- int lsmShmAssertLock(lsm_db *db, int iLock, int eOp){
- int ret = 0;
- int eHave;
- assert( iLock>=1 && iLock<=LSM_LOCK_READER(LSM_LOCK_NREADER-1) );
- assert( iLock<=16 );
- assert( eOp==LSM_LOCK_UNLOCK || eOp==LSM_LOCK_SHARED || eOp==LSM_LOCK_EXCL );
- eHave = shmLockType(db, iLock);
- switch( eOp ){
- case LSM_LOCK_UNLOCK:
- ret = (eHave==LSM_LOCK_UNLOCK);
- break;
- case LSM_LOCK_SHARED:
- ret = (eHave!=LSM_LOCK_UNLOCK);
- break;
- case LSM_LOCK_EXCL:
- ret = (eHave==LSM_LOCK_EXCL);
- break;
- default:
- assert( !"bad eOp value passed to lsmShmAssertLock()" );
- break;
- }
- return ret;
- }
- int lsmShmAssertWorker(lsm_db *db){
- return lsmShmAssertLock(db, LSM_LOCK_WORKER, LSM_LOCK_EXCL) && db->pWorker;
- }
- /*
- ** This function does not contribute to library functionality, and is not
- ** included in release builds. It is intended to be called from within
- ** an interactive debugger.
- **
- ** When called, this function prints a single line of human readable output
- ** to stdout describing the locks currently held by the connection. For
- ** example:
- **
- ** (gdb) call print_db_locks(pDb)
- ** (shared on dms2) (exclusive on writer)
- */
- void print_db_locks(lsm_db *db){
- int iLock;
- for(iLock=0; iLock<16; iLock++){
- int bOne = 0;
- const char *azLock[] = {0, "shared", "exclusive"};
- const char *azName[] = {
- 0, "dms1", "dms2", "writer", "worker", "checkpointer",
- "reader0", "reader1", "reader2", "reader3", "reader4", "reader5"
- };
- int eHave = shmLockType(db, iLock);
- if( azLock[eHave] ){
- printf("%s(%s on %s)", (bOne?" ":""), azLock[eHave], azName[iLock]);
- bOne = 1;
- }
- }
- printf("\n");
- }
- void print_all_db_locks(lsm_db *db){
- lsm_db *p;
- for(p=db->pDatabase->pConn; p; p=p->pNext){
- printf("%s connection %p ", ((p==db)?"*":""), p);
- print_db_locks(p);
- }
- }
- #endif
- void lsmShmBarrier(lsm_db *db){
- lsmEnvShmBarrier(db->pEnv);
- }
- int lsm_checkpoint(lsm_db *pDb, int *pnKB){
- int rc; /* Return code */
- u32 nWrite = 0; /* Number of pages checkpointed */
- /* Attempt the checkpoint. If successful, nWrite is set to the number of
- ** pages written between this and the previous checkpoint. */
- rc = lsmCheckpointWrite(pDb, &nWrite);
- /* If required, calculate the output variable (KB of data checkpointed).
- ** Set it to zero if an error occured. */
- if( pnKB ){
- int nKB = 0;
- if( rc==LSM_OK && nWrite ){
- nKB = (((i64)nWrite * lsmFsPageSize(pDb->pFS)) + 1023) / 1024;
- }
- *pnKB = nKB;
- }
- return rc;
- }
|