lsm_shared.c 59 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995
  1. /*
  2. ** 2012-01-23
  3. **
  4. ** The author disclaims copyright to this source code. In place of
  5. ** a legal notice, here is a blessing:
  6. **
  7. ** May you do good and not evil.
  8. ** May you find forgiveness for yourself and forgive others.
  9. ** May you share freely, never taking more than you give.
  10. **
  11. *************************************************************************
  12. **
  13. ** Utilities used to help multiple LSM clients to coexist within the
  14. ** same process space.
  15. */
  16. #include "lsmInt.h"
  17. /*
  18. ** Global data. All global variables used by code in this file are grouped
  19. ** into the following structure instance.
  20. **
  21. ** pDatabase:
  22. ** Linked list of all Database objects allocated within this process.
  23. ** This list may not be traversed without holding the global mutex (see
  24. ** functions enterGlobalMutex() and leaveGlobalMutex()).
  25. */
  26. static struct SharedData {
  27. Database *pDatabase; /* Linked list of all Database objects */
  28. } gShared;
  29. /*
  30. ** Database structure. There is one such structure for each distinct
  31. ** database accessed by this process. They are stored in the singly linked
  32. ** list starting at global variable gShared.pDatabase. Database objects are
  33. ** reference counted. Once the number of connections to the associated
  34. ** database drops to zero, they are removed from the linked list and deleted.
  35. **
  36. ** pFile:
  37. ** In multi-process mode, this file descriptor is used to obtain locks
  38. ** and to access shared-memory. In single process mode, its only job is
  39. ** to hold the exclusive lock on the file.
  40. **
  41. */
  42. struct Database {
  43. /* Protected by the global mutex (enterGlobalMutex/leaveGlobalMutex): */
  44. char *zName; /* Canonical path to database file */
  45. int nName; /* strlen(zName) */
  46. int nDbRef; /* Number of associated lsm_db handles */
  47. Database *pDbNext; /* Next Database structure in global list */
  48. /* Protected by the local mutex (pClientMutex) */
  49. int bReadonly; /* True if Database.pFile is read-only */
  50. int bMultiProc; /* True if running in multi-process mode */
  51. lsm_file *pFile; /* Used for locks/shm in multi-proc mode */
  52. LsmFile *pLsmFile; /* List of deferred closes */
  53. lsm_mutex *pClientMutex; /* Protects the apShmChunk[] and pConn */
  54. int nShmChunk; /* Number of entries in apShmChunk[] array */
  55. void **apShmChunk; /* Array of "shared" memory regions */
  56. lsm_db *pConn; /* List of connections to this db. */
  57. };
  58. /*
  59. ** Functions to enter and leave the global mutex. This mutex is used
  60. ** to protect the global linked-list headed at gShared.pDatabase.
  61. */
  62. static int enterGlobalMutex(lsm_env *pEnv){
  63. lsm_mutex *p;
  64. int rc = lsmMutexStatic(pEnv, LSM_MUTEX_GLOBAL, &p);
  65. if( rc==LSM_OK ) lsmMutexEnter(pEnv, p);
  66. return rc;
  67. }
  68. static void leaveGlobalMutex(lsm_env *pEnv){
  69. lsm_mutex *p;
  70. lsmMutexStatic(pEnv, LSM_MUTEX_GLOBAL, &p);
  71. lsmMutexLeave(pEnv, p);
  72. }
  73. #ifdef LSM_DEBUG
  74. static int holdingGlobalMutex(lsm_env *pEnv){
  75. lsm_mutex *p;
  76. lsmMutexStatic(pEnv, LSM_MUTEX_GLOBAL, &p);
  77. return lsmMutexHeld(pEnv, p);
  78. }
  79. #endif
  80. #if 0
  81. static void assertNotInFreelist(Freelist *p, int iBlk){
  82. int i;
  83. for(i=0; i<p->nEntry; i++){
  84. assert( p->aEntry[i].iBlk!=iBlk );
  85. }
  86. }
  87. #else
  88. # define assertNotInFreelist(x,y)
  89. #endif
  90. /*
  91. ** Append an entry to the free-list. If (iId==-1), this is a delete.
  92. */
  93. int freelistAppend(lsm_db *db, u32 iBlk, i64 iId){
  94. lsm_env *pEnv = db->pEnv;
  95. Freelist *p;
  96. int i;
  97. assert( iId==-1 || iId>=0 );
  98. p = db->bUseFreelist ? db->pFreelist : &db->pWorker->freelist;
  99. /* Extend the space allocated for the freelist, if required */
  100. assert( p->nAlloc>=p->nEntry );
  101. if( p->nAlloc==p->nEntry ){
  102. int nNew;
  103. int nByte;
  104. FreelistEntry *aNew;
  105. nNew = (p->nAlloc==0 ? 4 : p->nAlloc*2);
  106. nByte = sizeof(FreelistEntry) * nNew;
  107. aNew = (FreelistEntry *)lsmRealloc(pEnv, p->aEntry, nByte);
  108. if( !aNew ) return LSM_NOMEM_BKPT;
  109. p->nAlloc = nNew;
  110. p->aEntry = aNew;
  111. }
  112. for(i=0; i<p->nEntry; i++){
  113. assert( i==0 || p->aEntry[i].iBlk > p->aEntry[i-1].iBlk );
  114. if( p->aEntry[i].iBlk>=iBlk ) break;
  115. }
  116. if( i<p->nEntry && p->aEntry[i].iBlk==iBlk ){
  117. /* Clobber an existing entry */
  118. p->aEntry[i].iId = iId;
  119. }else{
  120. /* Insert a new entry into the list */
  121. int nByte = sizeof(FreelistEntry)*(p->nEntry-i);
  122. memmove(&p->aEntry[i+1], &p->aEntry[i], nByte);
  123. p->aEntry[i].iBlk = iBlk;
  124. p->aEntry[i].iId = iId;
  125. p->nEntry++;
  126. }
  127. return LSM_OK;
  128. }
  129. /*
  130. ** This function frees all resources held by the Database structure passed
  131. ** as the only argument.
  132. */
  133. static void freeDatabase(lsm_env *pEnv, Database *p){
  134. assert( holdingGlobalMutex(pEnv) );
  135. if( p ){
  136. /* Free the mutexes */
  137. lsmMutexDel(pEnv, p->pClientMutex);
  138. if( p->pFile ){
  139. lsmEnvClose(pEnv, p->pFile);
  140. }
  141. /* Free the array of shm pointers */
  142. lsmFree(pEnv, p->apShmChunk);
  143. /* Free the memory allocated for the Database struct itself */
  144. lsmFree(pEnv, p);
  145. }
  146. }
  147. typedef struct DbTruncateCtx DbTruncateCtx;
  148. struct DbTruncateCtx {
  149. int nBlock;
  150. i64 iInUse;
  151. };
  152. static int dbTruncateCb(void *pCtx, int iBlk, i64 iSnapshot){
  153. DbTruncateCtx *p = (DbTruncateCtx *)pCtx;
  154. if( iBlk!=p->nBlock || (p->iInUse>=0 && iSnapshot>=p->iInUse) ) return 1;
  155. p->nBlock--;
  156. return 0;
  157. }
  158. static int dbTruncate(lsm_db *pDb, i64 iInUse){
  159. int rc = LSM_OK;
  160. #if 0
  161. int i;
  162. DbTruncateCtx ctx;
  163. assert( pDb->pWorker );
  164. ctx.nBlock = pDb->pWorker->nBlock;
  165. ctx.iInUse = iInUse;
  166. rc = lsmWalkFreelist(pDb, 1, dbTruncateCb, (void *)&ctx);
  167. for(i=ctx.nBlock+1; rc==LSM_OK && i<=pDb->pWorker->nBlock; i++){
  168. rc = freelistAppend(pDb, i, -1);
  169. }
  170. if( rc==LSM_OK ){
  171. #ifdef LSM_LOG_FREELIST
  172. if( ctx.nBlock!=pDb->pWorker->nBlock ){
  173. lsmLogMessage(pDb, 0,
  174. "dbTruncate(): truncated db to %d blocks",ctx.nBlock
  175. );
  176. }
  177. #endif
  178. pDb->pWorker->nBlock = ctx.nBlock;
  179. }
  180. #endif
  181. return rc;
  182. }
  183. /*
  184. ** This function is called during database shutdown (when the number of
  185. ** connections drops from one to zero). It truncates the database file
  186. ** to as small a size as possible without truncating away any blocks that
  187. ** contain data.
  188. */
  189. static int dbTruncateFile(lsm_db *pDb){
  190. int rc;
  191. assert( pDb->pWorker==0 );
  192. assert( lsmShmAssertLock(pDb, LSM_LOCK_DMS1, LSM_LOCK_EXCL) );
  193. rc = lsmCheckpointLoadWorker(pDb);
  194. if( rc==LSM_OK ){
  195. DbTruncateCtx ctx;
  196. /* Walk the database free-block-list in reverse order. Set ctx.nBlock
  197. ** to the block number of the last block in the database that actually
  198. ** contains data. */
  199. ctx.nBlock = pDb->pWorker->nBlock;
  200. ctx.iInUse = -1;
  201. rc = lsmWalkFreelist(pDb, 1, dbTruncateCb, (void *)&ctx);
  202. /* If the last block that contains data is not already the last block in
  203. ** the database file, truncate the database file so that it is. */
  204. if( rc==LSM_OK ){
  205. rc = lsmFsTruncateDb(
  206. pDb->pFS, (i64)ctx.nBlock*lsmFsBlockSize(pDb->pFS)
  207. );
  208. }
  209. }
  210. lsmFreeSnapshot(pDb->pEnv, pDb->pWorker);
  211. pDb->pWorker = 0;
  212. return rc;
  213. }
  214. static void doDbDisconnect(lsm_db *pDb){
  215. int rc;
  216. if( pDb->bReadonly ){
  217. lsmShmLock(pDb, LSM_LOCK_DMS3, LSM_LOCK_UNLOCK, 0);
  218. }else{
  219. /* Block for an exclusive lock on DMS1. This lock serializes all calls
  220. ** to doDbConnect() and doDbDisconnect() across all processes. */
  221. rc = lsmShmLock(pDb, LSM_LOCK_DMS1, LSM_LOCK_EXCL, 1);
  222. if( rc==LSM_OK ){
  223. lsmShmLock(pDb, LSM_LOCK_DMS2, LSM_LOCK_UNLOCK, 0);
  224. /* Try an exclusive lock on DMS2. If successful, this is the last
  225. ** connection to the database. In this case flush the contents of the
  226. ** in-memory tree to disk and write a checkpoint. */
  227. rc = lsmShmTestLock(pDb, LSM_LOCK_DMS2, 1, LSM_LOCK_EXCL);
  228. if( rc==LSM_OK ){
  229. rc = lsmShmTestLock(pDb, LSM_LOCK_CHECKPOINTER, 1, LSM_LOCK_EXCL);
  230. }
  231. if( rc==LSM_OK ){
  232. int bReadonly = 0; /* True if there exist read-only conns. */
  233. /* Flush the in-memory tree, if required. If there is data to flush,
  234. ** this will create a new client snapshot in Database.pClient. The
  235. ** checkpoint (serialization) of this snapshot may be written to disk
  236. ** by the following block.
  237. **
  238. ** There is no need to take a WRITER lock here. That there are no
  239. ** other locks on DMS2 guarantees that there are no other read-write
  240. ** connections at this time (and the lock on DMS1 guarantees that
  241. ** no new ones may appear).
  242. */
  243. rc = lsmTreeLoadHeader(pDb, 0);
  244. if( rc==LSM_OK && (lsmTreeHasOld(pDb) || lsmTreeSize(pDb)>0) ){
  245. rc = lsmFlushTreeToDisk(pDb);
  246. }
  247. /* Now check if there are any read-only connections. If there are,
  248. ** then do not truncate the db file or unlink the shared-memory
  249. ** region. */
  250. if( rc==LSM_OK ){
  251. rc = lsmShmTestLock(pDb, LSM_LOCK_DMS3, 1, LSM_LOCK_EXCL);
  252. if( rc==LSM_BUSY ){
  253. bReadonly = 1;
  254. rc = LSM_OK;
  255. }
  256. }
  257. /* Write a checkpoint to disk. */
  258. if( rc==LSM_OK ){
  259. rc = lsmCheckpointWrite(pDb, 0);
  260. }
  261. /* If the checkpoint was written successfully, delete the log file
  262. ** and, if possible, truncate the database file. */
  263. if( rc==LSM_OK ){
  264. int bRotrans = 0;
  265. Database *p = pDb->pDatabase;
  266. /* The log file may only be deleted if there are no clients
  267. ** read-only clients running rotrans transactions. */
  268. rc = lsmDetectRoTrans(pDb, &bRotrans);
  269. if( rc==LSM_OK && bRotrans==0 ){
  270. lsmFsCloseAndDeleteLog(pDb->pFS);
  271. }
  272. /* The database may only be truncated if there exist no read-only
  273. ** clients - either connected or running rotrans transactions. */
  274. if( bReadonly==0 && bRotrans==0 ){
  275. lsmFsUnmap(pDb->pFS);
  276. dbTruncateFile(pDb);
  277. if( p->pFile && p->bMultiProc ){
  278. lsmEnvShmUnmap(pDb->pEnv, p->pFile, 1);
  279. }
  280. }
  281. }
  282. }
  283. }
  284. if( pDb->iRwclient>=0 ){
  285. lsmShmLock(pDb, LSM_LOCK_RWCLIENT(pDb->iRwclient), LSM_LOCK_UNLOCK, 0);
  286. pDb->iRwclient = -1;
  287. }
  288. lsmShmLock(pDb, LSM_LOCK_DMS1, LSM_LOCK_UNLOCK, 0);
  289. }
  290. pDb->pShmhdr = 0;
  291. }
  292. static int doDbConnect(lsm_db *pDb){
  293. const int nUsMax = 100000; /* Max value for nUs */
  294. int nUs = 1000; /* us to wait between DMS1 attempts */
  295. int rc;
  296. /* Obtain a pointer to the shared-memory header */
  297. assert( pDb->pShmhdr==0 );
  298. assert( pDb->bReadonly==0 );
  299. /* Block for an exclusive lock on DMS1. This lock serializes all calls
  300. ** to doDbConnect() and doDbDisconnect() across all processes. */
  301. while( 1 ){
  302. rc = lsmShmLock(pDb, LSM_LOCK_DMS1, LSM_LOCK_EXCL, 1);
  303. if( rc!=LSM_BUSY ) break;
  304. lsmEnvSleep(pDb->pEnv, nUs);
  305. nUs = nUs * 2;
  306. if( nUs>nUsMax ) nUs = nUsMax;
  307. }
  308. if( rc==LSM_OK ){
  309. rc = lsmShmCacheChunks(pDb, 1);
  310. }
  311. if( rc!=LSM_OK ) return rc;
  312. pDb->pShmhdr = (ShmHeader *)pDb->apShm[0];
  313. /* Try an exclusive lock on DMS2/DMS3. If successful, this is the first
  314. ** and only connection to the database. In this case initialize the
  315. ** shared-memory and run log file recovery. */
  316. assert( LSM_LOCK_DMS3==1+LSM_LOCK_DMS2 );
  317. rc = lsmShmTestLock(pDb, LSM_LOCK_DMS2, 2, LSM_LOCK_EXCL);
  318. if( rc==LSM_OK ){
  319. memset(pDb->pShmhdr, 0, sizeof(ShmHeader));
  320. rc = lsmCheckpointRecover(pDb);
  321. if( rc==LSM_OK ){
  322. rc = lsmLogRecover(pDb);
  323. }
  324. if( rc==LSM_OK ){
  325. ShmHeader *pShm = pDb->pShmhdr;
  326. pShm->aReader[0].iLsmId = lsmCheckpointId(pShm->aSnap1, 0);
  327. pShm->aReader[0].iTreeId = pDb->treehdr.iUsedShmid;
  328. }
  329. }else if( rc==LSM_BUSY ){
  330. rc = LSM_OK;
  331. }
  332. /* Take a shared lock on DMS2. In multi-process mode this lock "cannot"
  333. ** fail, as connections may only hold an exclusive lock on DMS2 if they
  334. ** first hold an exclusive lock on DMS1. And this connection is currently
  335. ** holding the exclusive lock on DSM1.
  336. **
  337. ** However, if some other connection has the database open in single-process
  338. ** mode, this operation will fail. In this case, return the error to the
  339. ** caller - the attempt to connect to the db has failed.
  340. */
  341. if( rc==LSM_OK ){
  342. rc = lsmShmLock(pDb, LSM_LOCK_DMS2, LSM_LOCK_SHARED, 0);
  343. }
  344. /* If anything went wrong, unlock DMS2. Otherwise, try to take an exclusive
  345. ** lock on one of the LSM_LOCK_RWCLIENT() locks. Unlock DMS1 in any case. */
  346. if( rc!=LSM_OK ){
  347. pDb->pShmhdr = 0;
  348. }else{
  349. int i;
  350. for(i=0; i<LSM_LOCK_NRWCLIENT; i++){
  351. int rc2 = lsmShmLock(pDb, LSM_LOCK_RWCLIENT(i), LSM_LOCK_EXCL, 0);
  352. if( rc2==LSM_OK ) pDb->iRwclient = i;
  353. if( rc2!=LSM_BUSY ){
  354. rc = rc2;
  355. break;
  356. }
  357. }
  358. }
  359. lsmShmLock(pDb, LSM_LOCK_DMS1, LSM_LOCK_UNLOCK, 0);
  360. return rc;
  361. }
  362. static int dbOpenSharedFd(lsm_env *pEnv, Database *p, int bRoOk){
  363. int rc;
  364. rc = lsmEnvOpen(pEnv, p->zName, 0, &p->pFile);
  365. if( rc==LSM_IOERR && bRoOk ){
  366. rc = lsmEnvOpen(pEnv, p->zName, LSM_OPEN_READONLY, &p->pFile);
  367. p->bReadonly = 1;
  368. }
  369. return rc;
  370. }
  371. /*
  372. ** Return a reference to the shared Database handle for the database
  373. ** identified by canonical path zName. If this is the first connection to
  374. ** the named database, a new Database object is allocated. Otherwise, a
  375. ** pointer to an existing object is returned.
  376. **
  377. ** If successful, *ppDatabase is set to point to the shared Database
  378. ** structure and LSM_OK returned. Otherwise, *ppDatabase is set to NULL
  379. ** and and LSM error code returned.
  380. **
  381. ** Each successful call to this function should be (eventually) matched
  382. ** by a call to lsmDbDatabaseRelease().
  383. */
  384. int lsmDbDatabaseConnect(
  385. lsm_db *pDb, /* Database handle */
  386. const char *zName /* Full-path to db file */
  387. ){
  388. lsm_env *pEnv = pDb->pEnv;
  389. int rc; /* Return code */
  390. Database *p = 0; /* Pointer returned via *ppDatabase */
  391. int nName = lsmStrlen(zName);
  392. assert( pDb->pDatabase==0 );
  393. rc = enterGlobalMutex(pEnv);
  394. if( rc==LSM_OK ){
  395. /* Search the global list for an existing object. TODO: Need something
  396. ** better than the memcmp() below to figure out if a given Database
  397. ** object represents the requested file. */
  398. for(p=gShared.pDatabase; p; p=p->pDbNext){
  399. if( nName==p->nName && 0==memcmp(zName, p->zName, nName) ) break;
  400. }
  401. /* If no suitable Database object was found, allocate a new one. */
  402. if( p==0 ){
  403. p = (Database *)lsmMallocZeroRc(pEnv, sizeof(Database)+nName+1, &rc);
  404. /* If the allocation was successful, fill in other fields and
  405. ** allocate the client mutex. */
  406. if( rc==LSM_OK ){
  407. p->bMultiProc = pDb->bMultiProc;
  408. p->zName = (char *)&p[1];
  409. p->nName = nName;
  410. memcpy((void *)p->zName, zName, nName+1);
  411. rc = lsmMutexNew(pEnv, &p->pClientMutex);
  412. }
  413. /* If nothing has gone wrong so far, open the shared fd. And if that
  414. ** succeeds and this connection requested single-process mode,
  415. ** attempt to take the exclusive lock on DMS2. */
  416. if( rc==LSM_OK ){
  417. int bReadonly = (pDb->bReadonly && pDb->bMultiProc);
  418. rc = dbOpenSharedFd(pDb->pEnv, p, bReadonly);
  419. }
  420. if( rc==LSM_OK && p->bMultiProc==0 ){
  421. /* Hold an exclusive lock DMS1 while grabbing DMS2. This ensures
  422. ** that any ongoing call to doDbDisconnect() (even one in another
  423. ** process) is finished before proceeding. */
  424. assert( p->bReadonly==0 );
  425. rc = lsmEnvLock(pDb->pEnv, p->pFile, LSM_LOCK_DMS1, LSM_LOCK_EXCL);
  426. if( rc==LSM_OK ){
  427. rc = lsmEnvLock(pDb->pEnv, p->pFile, LSM_LOCK_DMS2, LSM_LOCK_EXCL);
  428. lsmEnvLock(pDb->pEnv, p->pFile, LSM_LOCK_DMS1, LSM_LOCK_UNLOCK);
  429. }
  430. }
  431. if( rc==LSM_OK ){
  432. p->pDbNext = gShared.pDatabase;
  433. gShared.pDatabase = p;
  434. }else{
  435. freeDatabase(pEnv, p);
  436. p = 0;
  437. }
  438. }
  439. if( p ){
  440. p->nDbRef++;
  441. }
  442. leaveGlobalMutex(pEnv);
  443. if( p ){
  444. lsmMutexEnter(pDb->pEnv, p->pClientMutex);
  445. pDb->pNext = p->pConn;
  446. p->pConn = pDb;
  447. lsmMutexLeave(pDb->pEnv, p->pClientMutex);
  448. }
  449. }
  450. pDb->pDatabase = p;
  451. if( rc==LSM_OK ){
  452. assert( p );
  453. rc = lsmFsOpen(pDb, zName, p->bReadonly);
  454. }
  455. /* If the db handle is read-write, then connect to the system now. Run
  456. ** recovery as necessary. Or, if this is a read-only database handle,
  457. ** defer attempting to connect to the system until a read-transaction
  458. ** is opened. */
  459. if( rc==LSM_OK ){
  460. rc = lsmFsConfigure(pDb);
  461. }
  462. if( rc==LSM_OK && pDb->bReadonly==0 ){
  463. rc = doDbConnect(pDb);
  464. }
  465. return rc;
  466. }
  467. static void dbDeferClose(lsm_db *pDb){
  468. if( pDb->pFS ){
  469. LsmFile *pLsmFile;
  470. Database *p = pDb->pDatabase;
  471. pLsmFile = lsmFsDeferClose(pDb->pFS);
  472. pLsmFile->pNext = p->pLsmFile;
  473. p->pLsmFile = pLsmFile;
  474. }
  475. }
  476. LsmFile *lsmDbRecycleFd(lsm_db *db){
  477. LsmFile *pRet;
  478. Database *p = db->pDatabase;
  479. lsmMutexEnter(db->pEnv, p->pClientMutex);
  480. if( (pRet = p->pLsmFile)!=0 ){
  481. p->pLsmFile = pRet->pNext;
  482. }
  483. lsmMutexLeave(db->pEnv, p->pClientMutex);
  484. return pRet;
  485. }
  486. /*
  487. ** Release a reference to a Database object obtained from
  488. ** lsmDbDatabaseConnect(). There should be exactly one call to this function
  489. ** for each successful call to Find().
  490. */
  491. void lsmDbDatabaseRelease(lsm_db *pDb){
  492. Database *p = pDb->pDatabase;
  493. if( p ){
  494. lsm_db **ppDb;
  495. if( pDb->pShmhdr ){
  496. doDbDisconnect(pDb);
  497. }
  498. lsmFsUnmap(pDb->pFS);
  499. lsmMutexEnter(pDb->pEnv, p->pClientMutex);
  500. for(ppDb=&p->pConn; *ppDb!=pDb; ppDb=&((*ppDb)->pNext));
  501. *ppDb = pDb->pNext;
  502. dbDeferClose(pDb);
  503. lsmMutexLeave(pDb->pEnv, p->pClientMutex);
  504. enterGlobalMutex(pDb->pEnv);
  505. p->nDbRef--;
  506. if( p->nDbRef==0 ){
  507. LsmFile *pIter;
  508. LsmFile *pNext;
  509. Database **pp;
  510. /* Remove the Database structure from the linked list. */
  511. for(pp=&gShared.pDatabase; *pp!=p; pp=&((*pp)->pDbNext));
  512. *pp = p->pDbNext;
  513. /* If they were allocated from the heap, free the shared memory chunks */
  514. if( p->bMultiProc==0 ){
  515. int i;
  516. for(i=0; i<p->nShmChunk; i++){
  517. lsmFree(pDb->pEnv, p->apShmChunk[i]);
  518. }
  519. }
  520. /* Close any outstanding file descriptors */
  521. for(pIter=p->pLsmFile; pIter; pIter=pNext){
  522. pNext = pIter->pNext;
  523. lsmEnvClose(pDb->pEnv, pIter->pFile);
  524. lsmFree(pDb->pEnv, pIter);
  525. }
  526. freeDatabase(pDb->pEnv, p);
  527. }
  528. leaveGlobalMutex(pDb->pEnv);
  529. }
  530. }
  531. Level *lsmDbSnapshotLevel(Snapshot *pSnapshot){
  532. return pSnapshot->pLevel;
  533. }
  534. void lsmDbSnapshotSetLevel(Snapshot *pSnap, Level *pLevel){
  535. pSnap->pLevel = pLevel;
  536. }
  537. /* TODO: Shuffle things around to get rid of this */
  538. static int firstSnapshotInUse(lsm_db *, i64 *);
  539. /*
  540. ** Context object used by the lsmWalkFreelist() utility.
  541. */
  542. typedef struct WalkFreelistCtx WalkFreelistCtx;
  543. struct WalkFreelistCtx {
  544. lsm_db *pDb;
  545. int bReverse;
  546. Freelist *pFreelist;
  547. int iFree;
  548. int (*xUsr)(void *, int, i64); /* User callback function */
  549. void *pUsrctx; /* User callback context */
  550. int bDone; /* Set to true after xUsr() returns true */
  551. };
  552. /*
  553. ** Callback used by lsmWalkFreelist().
  554. */
  555. static int walkFreelistCb(void *pCtx, int iBlk, i64 iSnapshot){
  556. WalkFreelistCtx *p = (WalkFreelistCtx *)pCtx;
  557. const int iDir = (p->bReverse ? -1 : 1);
  558. Freelist *pFree = p->pFreelist;
  559. assert( p->bDone==0 );
  560. assert( iBlk>=0 );
  561. if( pFree ){
  562. while( (p->iFree < pFree->nEntry) && p->iFree>=0 ){
  563. FreelistEntry *pEntry = &pFree->aEntry[p->iFree];
  564. if( (p->bReverse==0 && pEntry->iBlk>(u32)iBlk)
  565. || (p->bReverse!=0 && pEntry->iBlk<(u32)iBlk)
  566. ){
  567. break;
  568. }else{
  569. p->iFree += iDir;
  570. if( pEntry->iId>=0
  571. && p->xUsr(p->pUsrctx, pEntry->iBlk, pEntry->iId)
  572. ){
  573. p->bDone = 1;
  574. return 1;
  575. }
  576. if( pEntry->iBlk==(u32)iBlk ) return 0;
  577. }
  578. }
  579. }
  580. if( p->xUsr(p->pUsrctx, iBlk, iSnapshot) ){
  581. p->bDone = 1;
  582. return 1;
  583. }
  584. return 0;
  585. }
  586. /*
  587. ** The database handle passed as the first argument must be the worker
  588. ** connection. This function iterates through the contents of the current
  589. ** free block list, invoking the supplied callback once for each list
  590. ** element.
  591. **
  592. ** The difference between this function and lsmSortedWalkFreelist() is
  593. ** that lsmSortedWalkFreelist() only considers those free-list elements
  594. ** stored within the LSM. This function also merges in any in-memory
  595. ** elements.
  596. */
  597. int lsmWalkFreelist(
  598. lsm_db *pDb, /* Database handle (must be worker) */
  599. int bReverse, /* True to iterate from largest to smallest */
  600. int (*x)(void *, int, i64), /* Callback function */
  601. void *pCtx /* First argument to pass to callback */
  602. ){
  603. const int iDir = (bReverse ? -1 : 1);
  604. int rc;
  605. int iCtx;
  606. WalkFreelistCtx ctx[2];
  607. ctx[0].pDb = pDb;
  608. ctx[0].bReverse = bReverse;
  609. ctx[0].pFreelist = &pDb->pWorker->freelist;
  610. if( ctx[0].pFreelist && bReverse ){
  611. ctx[0].iFree = ctx[0].pFreelist->nEntry-1;
  612. }else{
  613. ctx[0].iFree = 0;
  614. }
  615. ctx[0].xUsr = walkFreelistCb;
  616. ctx[0].pUsrctx = (void *)&ctx[1];
  617. ctx[0].bDone = 0;
  618. ctx[1].pDb = pDb;
  619. ctx[1].bReverse = bReverse;
  620. ctx[1].pFreelist = pDb->pFreelist;
  621. if( ctx[1].pFreelist && bReverse ){
  622. ctx[1].iFree = ctx[1].pFreelist->nEntry-1;
  623. }else{
  624. ctx[1].iFree = 0;
  625. }
  626. ctx[1].xUsr = x;
  627. ctx[1].pUsrctx = pCtx;
  628. ctx[1].bDone = 0;
  629. rc = lsmSortedWalkFreelist(pDb, bReverse, walkFreelistCb, (void *)&ctx[0]);
  630. if( ctx[0].bDone==0 ){
  631. for(iCtx=0; iCtx<2; iCtx++){
  632. int i;
  633. WalkFreelistCtx *p = &ctx[iCtx];
  634. for(i=p->iFree;
  635. p->pFreelist && rc==LSM_OK && i<p->pFreelist->nEntry && i>=0;
  636. i += iDir
  637. ){
  638. FreelistEntry *pEntry = &p->pFreelist->aEntry[i];
  639. if( pEntry->iId>=0 && p->xUsr(p->pUsrctx, pEntry->iBlk, pEntry->iId) ){
  640. return LSM_OK;
  641. }
  642. }
  643. }
  644. }
  645. return rc;
  646. }
  647. typedef struct FindFreeblockCtx FindFreeblockCtx;
  648. struct FindFreeblockCtx {
  649. i64 iInUse;
  650. int iRet;
  651. int bNotOne;
  652. };
  653. static int findFreeblockCb(void *pCtx, int iBlk, i64 iSnapshot){
  654. FindFreeblockCtx *p = (FindFreeblockCtx *)pCtx;
  655. if( iSnapshot<p->iInUse && (iBlk!=1 || p->bNotOne==0) ){
  656. p->iRet = iBlk;
  657. return 1;
  658. }
  659. return 0;
  660. }
  661. static int findFreeblock(lsm_db *pDb, i64 iInUse, int bNotOne, int *piRet){
  662. int rc; /* Return code */
  663. FindFreeblockCtx ctx; /* Context object */
  664. ctx.iInUse = iInUse;
  665. ctx.iRet = 0;
  666. ctx.bNotOne = bNotOne;
  667. rc = lsmWalkFreelist(pDb, 0, findFreeblockCb, (void *)&ctx);
  668. *piRet = ctx.iRet;
  669. return rc;
  670. }
  671. /*
  672. ** Allocate a new database file block to write data to, either by extending
  673. ** the database file or by recycling a free-list entry. The worker snapshot
  674. ** must be held in order to call this function.
  675. **
  676. ** If successful, *piBlk is set to the block number allocated and LSM_OK is
  677. ** returned. Otherwise, *piBlk is zeroed and an lsm error code returned.
  678. */
  679. int lsmBlockAllocate(lsm_db *pDb, int iBefore, int *piBlk){
  680. Snapshot *p = pDb->pWorker;
  681. int iRet = 0; /* Block number of allocated block */
  682. int rc = LSM_OK;
  683. i64 iInUse = 0; /* Snapshot id still in use */
  684. i64 iSynced = 0; /* Snapshot id synced to disk */
  685. assert( p );
  686. #ifdef LSM_LOG_FREELIST
  687. {
  688. static int nCall = 0;
  689. char *zFree = 0;
  690. nCall++;
  691. rc = lsmInfoFreelist(pDb, &zFree);
  692. if( rc!=LSM_OK ) return rc;
  693. lsmLogMessage(pDb, 0, "lsmBlockAllocate(): %d freelist: %s", nCall, zFree);
  694. lsmFree(pDb->pEnv, zFree);
  695. }
  696. #endif
  697. /* Set iInUse to the smallest snapshot id that is either:
  698. **
  699. ** * Currently in use by a database client,
  700. ** * May be used by a database client in the future, or
  701. ** * Is the most recently checkpointed snapshot (i.e. the one that will
  702. ** be used following recovery if a failure occurs at this point).
  703. */
  704. rc = lsmCheckpointSynced(pDb, &iSynced, 0, 0);
  705. if( rc==LSM_OK && iSynced==0 ) iSynced = p->iId;
  706. iInUse = iSynced;
  707. if( rc==LSM_OK && pDb->iReader>=0 ){
  708. assert( pDb->pClient );
  709. iInUse = LSM_MIN(iInUse, pDb->pClient->iId);
  710. }
  711. if( rc==LSM_OK ) rc = firstSnapshotInUse(pDb, &iInUse);
  712. #ifdef LSM_LOG_FREELIST
  713. {
  714. lsmLogMessage(pDb, 0, "lsmBlockAllocate(): "
  715. "snapshot-in-use: %lld (iSynced=%lld) (client-id=%lld)",
  716. iInUse, iSynced, (pDb->iReader>=0 ? pDb->pClient->iId : 0)
  717. );
  718. }
  719. #endif
  720. /* Unless there exists a read-only transaction (which prevents us from
  721. ** recycling any blocks regardless, query the free block list for a
  722. ** suitable block to reuse.
  723. **
  724. ** It might seem more natural to check for a read-only transaction at
  725. ** the start of this function. However, it is better do wait until after
  726. ** the call to lsmCheckpointSynced() to do so.
  727. */
  728. if( rc==LSM_OK ){
  729. int bRotrans;
  730. rc = lsmDetectRoTrans(pDb, &bRotrans);
  731. if( rc==LSM_OK && bRotrans==0 ){
  732. rc = findFreeblock(pDb, iInUse, (iBefore>0), &iRet);
  733. }
  734. }
  735. if( iBefore>0 && (iRet<=0 || iRet>=iBefore) ){
  736. iRet = 0;
  737. }else if( rc==LSM_OK ){
  738. /* If a block was found in the free block list, use it and remove it from
  739. ** the list. Otherwise, if no suitable block was found, allocate one from
  740. ** the end of the file. */
  741. if( iRet>0 ){
  742. #ifdef LSM_LOG_FREELIST
  743. lsmLogMessage(pDb, 0,
  744. "reusing block %d (snapshot-in-use=%lld)", iRet, iInUse);
  745. #endif
  746. rc = freelistAppend(pDb, iRet, -1);
  747. if( rc==LSM_OK ){
  748. rc = dbTruncate(pDb, iInUse);
  749. }
  750. }else{
  751. iRet = ++(p->nBlock);
  752. #ifdef LSM_LOG_FREELIST
  753. lsmLogMessage(pDb, 0, "extending file to %d blocks", iRet);
  754. #endif
  755. }
  756. }
  757. assert( iBefore>0 || iRet>0 || rc!=LSM_OK );
  758. *piBlk = iRet;
  759. return rc;
  760. }
  761. /*
  762. ** Free a database block. The worker snapshot must be held in order to call
  763. ** this function.
  764. **
  765. ** If successful, LSM_OK is returned. Otherwise, an lsm error code (e.g.
  766. ** LSM_NOMEM).
  767. */
  768. int lsmBlockFree(lsm_db *pDb, int iBlk){
  769. Snapshot *p = pDb->pWorker;
  770. assert( lsmShmAssertWorker(pDb) );
  771. #ifdef LSM_LOG_FREELIST
  772. lsmLogMessage(pDb, LSM_OK, "lsmBlockFree(): Free block %d", iBlk);
  773. #endif
  774. return freelistAppend(pDb, iBlk, p->iId);
  775. }
  776. /*
  777. ** Refree a database block. The worker snapshot must be held in order to call
  778. ** this function.
  779. **
  780. ** Refreeing is required when a block is allocated using lsmBlockAllocate()
  781. ** but then not used. This function is used to push the block back onto
  782. ** the freelist. Refreeing a block is different from freeing is, as a refreed
  783. ** block may be reused immediately. Whereas a freed block can not be reused
  784. ** until (at least) after the next checkpoint.
  785. */
  786. int lsmBlockRefree(lsm_db *pDb, int iBlk){
  787. int rc = LSM_OK; /* Return code */
  788. #ifdef LSM_LOG_FREELIST
  789. lsmLogMessage(pDb, LSM_OK, "lsmBlockRefree(): Refree block %d", iBlk);
  790. #endif
  791. rc = freelistAppend(pDb, iBlk, 0);
  792. return rc;
  793. }
  794. /*
  795. ** If required, copy a database checkpoint from shared memory into the
  796. ** database itself.
  797. **
  798. ** The WORKER lock must not be held when this is called. This is because
  799. ** this function may indirectly call fsync(). And the WORKER lock should
  800. ** not be held that long (in case it is required by a client flushing an
  801. ** in-memory tree to disk).
  802. */
  803. int lsmCheckpointWrite(lsm_db *pDb, u32 *pnWrite){
  804. int rc; /* Return Code */
  805. u32 nWrite = 0;
  806. assert( pDb->pWorker==0 );
  807. assert( 1 || pDb->pClient==0 );
  808. assert( lsmShmAssertLock(pDb, LSM_LOCK_WORKER, LSM_LOCK_UNLOCK) );
  809. rc = lsmShmLock(pDb, LSM_LOCK_CHECKPOINTER, LSM_LOCK_EXCL, 0);
  810. if( rc!=LSM_OK ) return rc;
  811. rc = lsmCheckpointLoad(pDb, 0);
  812. if( rc==LSM_OK ){
  813. int nBlock = lsmCheckpointNBlock(pDb->aSnapshot);
  814. ShmHeader *pShm = pDb->pShmhdr;
  815. int bDone = 0; /* True if checkpoint is already stored */
  816. /* Check if this checkpoint has already been written to the database
  817. ** file. If so, set variable bDone to true. */
  818. if( pShm->iMetaPage ){
  819. MetaPage *pPg; /* Meta page */
  820. u8 *aData; /* Meta-page data buffer */
  821. int nData; /* Size of aData[] in bytes */
  822. i64 iCkpt; /* Id of checkpoint just loaded */
  823. i64 iDisk = 0; /* Id of checkpoint already stored in db */
  824. iCkpt = lsmCheckpointId(pDb->aSnapshot, 0);
  825. rc = lsmFsMetaPageGet(pDb->pFS, 0, pShm->iMetaPage, &pPg);
  826. if( rc==LSM_OK ){
  827. aData = lsmFsMetaPageData(pPg, &nData);
  828. iDisk = lsmCheckpointId((u32 *)aData, 1);
  829. nWrite = lsmCheckpointNWrite((u32 *)aData, 1);
  830. lsmFsMetaPageRelease(pPg);
  831. }
  832. bDone = (iDisk>=iCkpt);
  833. }
  834. if( rc==LSM_OK && bDone==0 ){
  835. int iMeta = (pShm->iMetaPage % 2) + 1;
  836. if( pDb->eSafety!=LSM_SAFETY_OFF ){
  837. rc = lsmFsSyncDb(pDb->pFS, nBlock);
  838. }
  839. if( rc==LSM_OK ) rc = lsmCheckpointStore(pDb, iMeta);
  840. if( rc==LSM_OK && pDb->eSafety!=LSM_SAFETY_OFF){
  841. rc = lsmFsSyncDb(pDb->pFS, 0);
  842. }
  843. if( rc==LSM_OK ){
  844. pShm->iMetaPage = iMeta;
  845. nWrite = lsmCheckpointNWrite(pDb->aSnapshot, 0) - nWrite;
  846. }
  847. #ifdef LSM_LOG_WORK
  848. lsmLogMessage(pDb, 0, "finish checkpoint %d",
  849. (int)lsmCheckpointId(pDb->aSnapshot, 0)
  850. );
  851. #endif
  852. }
  853. }
  854. lsmShmLock(pDb, LSM_LOCK_CHECKPOINTER, LSM_LOCK_UNLOCK, 0);
  855. if( pnWrite && rc==LSM_OK ) *pnWrite = nWrite;
  856. return rc;
  857. }
  858. int lsmBeginWork(lsm_db *pDb){
  859. int rc;
  860. /* Attempt to take the WORKER lock */
  861. rc = lsmShmLock(pDb, LSM_LOCK_WORKER, LSM_LOCK_EXCL, 0);
  862. /* Deserialize the current worker snapshot */
  863. if( rc==LSM_OK ){
  864. rc = lsmCheckpointLoadWorker(pDb);
  865. }
  866. return rc;
  867. }
  868. void lsmFreeSnapshot(lsm_env *pEnv, Snapshot *p){
  869. if( p ){
  870. lsmSortedFreeLevel(pEnv, p->pLevel);
  871. lsmFree(pEnv, p->freelist.aEntry);
  872. lsmFree(pEnv, p->redirect.a);
  873. lsmFree(pEnv, p);
  874. }
  875. }
  876. /*
  877. ** Attempt to populate one of the read-lock slots to contain lock values
  878. ** iLsm/iShm. Or, if such a slot exists already, this function is a no-op.
  879. **
  880. ** It is not an error if no slot can be populated because the write-lock
  881. ** cannot be obtained. If any other error occurs, return an LSM error code.
  882. ** Otherwise, LSM_OK.
  883. **
  884. ** This function is called at various points to try to ensure that there
  885. ** always exists at least one read-lock slot that can be used by a read-only
  886. ** client. And so that, in the usual case, there is an "exact match" available
  887. ** whenever a read transaction is opened by any client. At present this
  888. ** function is called when:
  889. **
  890. ** * A write transaction that called lsmTreeDiscardOld() is committed, and
  891. ** * Whenever the working snapshot is updated (i.e. lsmFinishWork()).
  892. */
  893. static int dbSetReadLock(lsm_db *db, i64 iLsm, u32 iShm){
  894. int rc = LSM_OK;
  895. ShmHeader *pShm = db->pShmhdr;
  896. int i;
  897. /* Check if there is already a slot containing the required values. */
  898. for(i=0; i<LSM_LOCK_NREADER; i++){
  899. ShmReader *p = &pShm->aReader[i];
  900. if( p->iLsmId==iLsm && p->iTreeId==iShm ) return LSM_OK;
  901. }
  902. /* Iterate through all read-lock slots, attempting to take a write-lock
  903. ** on each of them. If a write-lock succeeds, populate the locked slot
  904. ** with the required values and break out of the loop. */
  905. for(i=0; rc==LSM_OK && i<LSM_LOCK_NREADER; i++){
  906. rc = lsmShmLock(db, LSM_LOCK_READER(i), LSM_LOCK_EXCL, 0);
  907. if( rc==LSM_BUSY ){
  908. rc = LSM_OK;
  909. }else{
  910. ShmReader *p = &pShm->aReader[i];
  911. p->iLsmId = iLsm;
  912. p->iTreeId = iShm;
  913. lsmShmLock(db, LSM_LOCK_READER(i), LSM_LOCK_UNLOCK, 0);
  914. break;
  915. }
  916. }
  917. return rc;
  918. }
  919. /*
  920. ** Release the read-lock currently held by connection db.
  921. */
  922. int dbReleaseReadlock(lsm_db *db){
  923. int rc = LSM_OK;
  924. if( db->iReader>=0 ){
  925. rc = lsmShmLock(db, LSM_LOCK_READER(db->iReader), LSM_LOCK_UNLOCK, 0);
  926. db->iReader = -1;
  927. }
  928. db->bRoTrans = 0;
  929. return rc;
  930. }
  931. /*
  932. ** Argument bFlush is true if the contents of the in-memory tree has just
  933. ** been flushed to disk. The significance of this is that once the snapshot
  934. ** created to hold the updated state of the database is synced to disk, log
  935. ** file space can be recycled.
  936. */
  937. void lsmFinishWork(lsm_db *pDb, int bFlush, int *pRc){
  938. int rc = *pRc;
  939. assert( rc!=0 || pDb->pWorker );
  940. if( pDb->pWorker ){
  941. /* If no error has occurred, serialize the worker snapshot and write
  942. ** it to shared memory. */
  943. if( rc==LSM_OK ){
  944. rc = lsmSaveWorker(pDb, bFlush);
  945. }
  946. /* Assuming no error has occurred, update a read lock slot with the
  947. ** new snapshot id (see comments above function dbSetReadLock()). */
  948. if( rc==LSM_OK ){
  949. if( pDb->iReader<0 ){
  950. rc = lsmTreeLoadHeader(pDb, 0);
  951. }
  952. if( rc==LSM_OK ){
  953. rc = dbSetReadLock(pDb, pDb->pWorker->iId, pDb->treehdr.iUsedShmid);
  954. }
  955. }
  956. /* Free the snapshot object. */
  957. lsmFreeSnapshot(pDb->pEnv, pDb->pWorker);
  958. pDb->pWorker = 0;
  959. }
  960. lsmShmLock(pDb, LSM_LOCK_WORKER, LSM_LOCK_UNLOCK, 0);
  961. *pRc = rc;
  962. }
  963. /*
  964. ** Called when recovery is finished.
  965. */
  966. int lsmFinishRecovery(lsm_db *pDb){
  967. lsmTreeEndTransaction(pDb, 1);
  968. return LSM_OK;
  969. }
  970. /*
  971. ** Check if the currently configured compression functions
  972. ** (LSM_CONFIG_SET_COMPRESSION) are compatible with a database that has its
  973. ** compression id set to iReq. Compression routines are compatible if iReq
  974. ** is zero (indicating the database is empty), or if it is equal to the
  975. ** compression id of the configured compression routines.
  976. **
  977. ** If the check shows that the current compression are incompatible and there
  978. ** is a compression factory registered, give it a chance to install new
  979. ** compression routines.
  980. **
  981. ** If, after any registered factory is invoked, the compression functions
  982. ** are still incompatible, return LSM_MISMATCH. Otherwise, LSM_OK.
  983. */
  984. int lsmCheckCompressionId(lsm_db *pDb, u32 iReq){
  985. if( iReq!=LSM_COMPRESSION_EMPTY && pDb->compress.iId!=iReq ){
  986. if( pDb->factory.xFactory ){
  987. pDb->bInFactory = 1;
  988. pDb->factory.xFactory(pDb->factory.pCtx, pDb, iReq);
  989. pDb->bInFactory = 0;
  990. }
  991. if( pDb->compress.iId!=iReq ){
  992. /* Incompatible */
  993. return LSM_MISMATCH;
  994. }
  995. }
  996. /* Compatible */
  997. return LSM_OK;
  998. }
  999. /*
  1000. ** Begin a read transaction. This function is a no-op if the connection
  1001. ** passed as the only argument already has an open read transaction.
  1002. */
  1003. int lsmBeginReadTrans(lsm_db *pDb){
  1004. const int MAX_READLOCK_ATTEMPTS = 10;
  1005. const int nMaxAttempt = (pDb->bRoTrans ? 1 : MAX_READLOCK_ATTEMPTS);
  1006. int rc = LSM_OK; /* Return code */
  1007. int iAttempt = 0;
  1008. assert( pDb->pWorker==0 );
  1009. while( rc==LSM_OK && pDb->iReader<0 && (iAttempt++)<nMaxAttempt ){
  1010. int iTreehdr = 0;
  1011. int iSnap = 0;
  1012. assert( pDb->pCsr==0 && pDb->nTransOpen==0 );
  1013. /* Load the in-memory tree header. */
  1014. rc = lsmTreeLoadHeader(pDb, &iTreehdr);
  1015. /* Load the database snapshot */
  1016. if( rc==LSM_OK ){
  1017. if( lsmCheckpointClientCacheOk(pDb)==0 ){
  1018. lsmFreeSnapshot(pDb->pEnv, pDb->pClient);
  1019. pDb->pClient = 0;
  1020. lsmMCursorFreeCache(pDb);
  1021. lsmFsPurgeCache(pDb->pFS);
  1022. rc = lsmCheckpointLoad(pDb, &iSnap);
  1023. }else{
  1024. iSnap = 1;
  1025. }
  1026. }
  1027. /* Take a read-lock on the tree and snapshot just loaded. Then check
  1028. ** that the shared-memory still contains the same values. If so, proceed.
  1029. ** Otherwise, relinquish the read-lock and retry the whole procedure
  1030. ** (starting with loading the in-memory tree header). */
  1031. if( rc==LSM_OK ){
  1032. u32 iShmMax = pDb->treehdr.iUsedShmid;
  1033. u32 iShmMin = pDb->treehdr.iNextShmid+1-LSM_MAX_SHMCHUNKS;
  1034. rc = lsmReadlock(
  1035. pDb, lsmCheckpointId(pDb->aSnapshot, 0), iShmMin, iShmMax
  1036. );
  1037. if( rc==LSM_OK ){
  1038. if( lsmTreeLoadHeaderOk(pDb, iTreehdr)
  1039. && lsmCheckpointLoadOk(pDb, iSnap)
  1040. ){
  1041. /* Read lock has been successfully obtained. Deserialize the
  1042. ** checkpoint just loaded. TODO: This will be removed after
  1043. ** lsm_sorted.c is changed to work directly from the serialized
  1044. ** version of the snapshot. */
  1045. if( pDb->pClient==0 ){
  1046. rc = lsmCheckpointDeserialize(pDb, 0, pDb->aSnapshot,&pDb->pClient);
  1047. }
  1048. assert( (rc==LSM_OK)==(pDb->pClient!=0) );
  1049. assert( pDb->iReader>=0 );
  1050. /* Check that the client has the right compression hooks loaded.
  1051. ** If not, set rc to LSM_MISMATCH. */
  1052. if( rc==LSM_OK ){
  1053. rc = lsmCheckCompressionId(pDb, pDb->pClient->iCmpId);
  1054. }
  1055. }else{
  1056. rc = dbReleaseReadlock(pDb);
  1057. }
  1058. }
  1059. if( rc==LSM_BUSY ){
  1060. rc = LSM_OK;
  1061. }
  1062. }
  1063. #if 0
  1064. if( rc==LSM_OK && pDb->pClient ){
  1065. fprintf(stderr,
  1066. "reading %p: snapshot:%d used-shmid:%d trans-id:%d iOldShmid=%d\n",
  1067. (void *)pDb,
  1068. (int)pDb->pClient->iId, (int)pDb->treehdr.iUsedShmid,
  1069. (int)pDb->treehdr.root.iTransId,
  1070. (int)pDb->treehdr.iOldShmid
  1071. );
  1072. }
  1073. #endif
  1074. }
  1075. if( rc==LSM_OK ){
  1076. rc = lsmShmCacheChunks(pDb, pDb->treehdr.nChunk);
  1077. }
  1078. if( rc!=LSM_OK ){
  1079. dbReleaseReadlock(pDb);
  1080. }
  1081. if( pDb->pClient==0 && rc==LSM_OK ) rc = LSM_BUSY;
  1082. return rc;
  1083. }
  1084. /*
  1085. ** This function is used by a read-write connection to determine if there
  1086. ** are currently one or more read-only transactions open on the database
  1087. ** (in this context a read-only transaction is one opened by a read-only
  1088. ** connection on a non-live database).
  1089. **
  1090. ** If no error occurs, LSM_OK is returned and *pbExists is set to true if
  1091. ** some other connection has a read-only transaction open, or false
  1092. ** otherwise. If an error occurs an LSM error code is returned and the final
  1093. ** value of *pbExist is undefined.
  1094. */
  1095. int lsmDetectRoTrans(lsm_db *db, int *pbExist){
  1096. int rc;
  1097. /* Only a read-write connection may use this function. */
  1098. assert( db->bReadonly==0 );
  1099. rc = lsmShmTestLock(db, LSM_LOCK_ROTRANS, 1, LSM_LOCK_EXCL);
  1100. if( rc==LSM_BUSY ){
  1101. *pbExist = 1;
  1102. rc = LSM_OK;
  1103. }else{
  1104. *pbExist = 0;
  1105. }
  1106. return rc;
  1107. }
  1108. /*
  1109. ** db is a read-only database handle in the disconnected state. This function
  1110. ** attempts to open a read-transaction on the database. This may involve
  1111. ** connecting to the database system (opening shared memory etc.).
  1112. */
  1113. int lsmBeginRoTrans(lsm_db *db){
  1114. int rc = LSM_OK;
  1115. assert( db->bReadonly && db->pShmhdr==0 );
  1116. assert( db->iReader<0 );
  1117. if( db->bRoTrans==0 ){
  1118. /* Attempt a shared-lock on DMS1. */
  1119. rc = lsmShmLock(db, LSM_LOCK_DMS1, LSM_LOCK_SHARED, 0);
  1120. if( rc!=LSM_OK ) return rc;
  1121. rc = lsmShmTestLock(
  1122. db, LSM_LOCK_RWCLIENT(0), LSM_LOCK_NREADER, LSM_LOCK_SHARED
  1123. );
  1124. if( rc==LSM_OK ){
  1125. /* System is not live. Take a SHARED lock on the ROTRANS byte and
  1126. ** release DMS1. Locking ROTRANS tells all read-write clients that they
  1127. ** may not recycle any disk space from within the database or log files,
  1128. ** as a read-only client may be using it. */
  1129. rc = lsmShmLock(db, LSM_LOCK_ROTRANS, LSM_LOCK_SHARED, 0);
  1130. lsmShmLock(db, LSM_LOCK_DMS1, LSM_LOCK_UNLOCK, 0);
  1131. if( rc==LSM_OK ){
  1132. db->bRoTrans = 1;
  1133. rc = lsmShmCacheChunks(db, 1);
  1134. if( rc==LSM_OK ){
  1135. db->pShmhdr = (ShmHeader *)db->apShm[0];
  1136. memset(db->pShmhdr, 0, sizeof(ShmHeader));
  1137. rc = lsmCheckpointRecover(db);
  1138. if( rc==LSM_OK ){
  1139. rc = lsmLogRecover(db);
  1140. }
  1141. }
  1142. }
  1143. }else if( rc==LSM_BUSY ){
  1144. /* System is live! */
  1145. rc = lsmShmLock(db, LSM_LOCK_DMS3, LSM_LOCK_SHARED, 0);
  1146. lsmShmLock(db, LSM_LOCK_DMS1, LSM_LOCK_UNLOCK, 0);
  1147. if( rc==LSM_OK ){
  1148. rc = lsmShmCacheChunks(db, 1);
  1149. if( rc==LSM_OK ){
  1150. db->pShmhdr = (ShmHeader *)db->apShm[0];
  1151. }
  1152. }
  1153. }
  1154. /* In 'lsm_open()' we don't update the page and block sizes in the
  1155. ** Filesystem for 'readonly' connection. Because member 'db->pShmhdr' is a
  1156. ** nullpointer, this prevents loading a checkpoint. Now that the system is
  1157. ** live this member should be set. So we can update both values in
  1158. ** the Filesystem.
  1159. **
  1160. ** Configure the file-system connection with the page-size and block-size
  1161. ** of this database. Even if the database file is zero bytes in size
  1162. ** on disk, these values have been set in shared-memory by now, and so
  1163. ** are guaranteed not to change during the lifetime of this connection. */
  1164. if( LSM_OK==rc
  1165. && 0==lsmCheckpointClientCacheOk(db)
  1166. && LSM_OK==(rc=lsmCheckpointLoad(db, 0))
  1167. ){
  1168. lsmFsSetPageSize(db->pFS, lsmCheckpointPgsz(db->aSnapshot));
  1169. lsmFsSetBlockSize(db->pFS, lsmCheckpointBlksz(db->aSnapshot));
  1170. }
  1171. if( rc==LSM_OK ){
  1172. rc = lsmBeginReadTrans(db);
  1173. }
  1174. }
  1175. return rc;
  1176. }
  1177. /*
  1178. ** Close the currently open read transaction.
  1179. */
  1180. void lsmFinishReadTrans(lsm_db *pDb){
  1181. /* Worker connections should not be closing read transactions. And
  1182. ** read transactions should only be closed after all cursors and write
  1183. ** transactions have been closed. Finally pClient should be non-NULL
  1184. ** only iff pDb->iReader>=0. */
  1185. assert( pDb->pWorker==0 );
  1186. assert( pDb->pCsr==0 && pDb->nTransOpen==0 );
  1187. if( pDb->bRoTrans ){
  1188. int i;
  1189. for(i=0; i<pDb->nShm; i++){
  1190. lsmFree(pDb->pEnv, pDb->apShm[i]);
  1191. }
  1192. lsmFree(pDb->pEnv, pDb->apShm);
  1193. pDb->apShm = 0;
  1194. pDb->nShm = 0;
  1195. pDb->pShmhdr = 0;
  1196. lsmShmLock(pDb, LSM_LOCK_ROTRANS, LSM_LOCK_UNLOCK, 0);
  1197. }
  1198. dbReleaseReadlock(pDb);
  1199. }
  1200. /*
  1201. ** Open a write transaction.
  1202. */
  1203. int lsmBeginWriteTrans(lsm_db *pDb){
  1204. int rc = LSM_OK; /* Return code */
  1205. ShmHeader *pShm = pDb->pShmhdr; /* Shared memory header */
  1206. assert( pDb->nTransOpen==0 );
  1207. assert( pDb->bDiscardOld==0 );
  1208. assert( pDb->bReadonly==0 );
  1209. /* If there is no read-transaction open, open one now. */
  1210. if( pDb->iReader<0 ){
  1211. rc = lsmBeginReadTrans(pDb);
  1212. }
  1213. /* Attempt to take the WRITER lock */
  1214. if( rc==LSM_OK ){
  1215. rc = lsmShmLock(pDb, LSM_LOCK_WRITER, LSM_LOCK_EXCL, 0);
  1216. }
  1217. /* If the previous writer failed mid-transaction, run emergency rollback. */
  1218. if( rc==LSM_OK && pShm->bWriter ){
  1219. rc = lsmTreeRepair(pDb);
  1220. if( rc==LSM_OK ) pShm->bWriter = 0;
  1221. }
  1222. /* Check that this connection is currently reading from the most recent
  1223. ** version of the database. If not, return LSM_BUSY. */
  1224. if( rc==LSM_OK && memcmp(&pShm->hdr1, &pDb->treehdr, sizeof(TreeHeader)) ){
  1225. rc = LSM_BUSY;
  1226. }
  1227. if( rc==LSM_OK ){
  1228. rc = lsmLogBegin(pDb);
  1229. }
  1230. /* If everything was successful, set the "transaction-in-progress" flag
  1231. ** and return LSM_OK. Otherwise, if some error occurred, relinquish the
  1232. ** WRITER lock and return an error code. */
  1233. if( rc==LSM_OK ){
  1234. TreeHeader *p = &pDb->treehdr;
  1235. pShm->bWriter = 1;
  1236. p->root.iTransId++;
  1237. if( lsmTreeHasOld(pDb) && p->iOldLog==pDb->pClient->iLogOff ){
  1238. lsmTreeDiscardOld(pDb);
  1239. pDb->bDiscardOld = 1;
  1240. }
  1241. }else{
  1242. lsmShmLock(pDb, LSM_LOCK_WRITER, LSM_LOCK_UNLOCK, 0);
  1243. if( pDb->pCsr==0 ) lsmFinishReadTrans(pDb);
  1244. }
  1245. return rc;
  1246. }
  1247. /*
  1248. ** End the current write transaction. The connection is left with an open
  1249. ** read transaction. It is an error to call this if there is no open write
  1250. ** transaction.
  1251. **
  1252. ** If the transaction was committed, then a commit record has already been
  1253. ** written into the log file when this function is called. Or, if the
  1254. ** transaction was rolled back, both the log file and in-memory tree
  1255. ** structure have already been restored. In either case, this function
  1256. ** merely releases locks and other resources held by the write-transaction.
  1257. **
  1258. ** LSM_OK is returned if successful, or an LSM error code otherwise.
  1259. */
  1260. int lsmFinishWriteTrans(lsm_db *pDb, int bCommit){
  1261. int rc = LSM_OK;
  1262. int bFlush = 0;
  1263. lsmLogEnd(pDb, bCommit);
  1264. if( rc==LSM_OK && bCommit && lsmTreeSize(pDb)>pDb->nTreeLimit ){
  1265. bFlush = 1;
  1266. lsmTreeMakeOld(pDb);
  1267. }
  1268. lsmTreeEndTransaction(pDb, bCommit);
  1269. if( rc==LSM_OK ){
  1270. if( bFlush && pDb->bAutowork ){
  1271. rc = lsmSortedAutoWork(pDb, 1);
  1272. }else if( bCommit && pDb->bDiscardOld ){
  1273. rc = dbSetReadLock(pDb, pDb->pClient->iId, pDb->treehdr.iUsedShmid);
  1274. }
  1275. }
  1276. pDb->bDiscardOld = 0;
  1277. lsmShmLock(pDb, LSM_LOCK_WRITER, LSM_LOCK_UNLOCK, 0);
  1278. if( bFlush && pDb->bAutowork==0 && pDb->xWork ){
  1279. pDb->xWork(pDb, pDb->pWorkCtx);
  1280. }
  1281. return rc;
  1282. }
  1283. /*
  1284. ** Return non-zero if the caller is holding the client mutex.
  1285. */
  1286. #ifdef LSM_DEBUG
  1287. int lsmHoldingClientMutex(lsm_db *pDb){
  1288. return lsmMutexHeld(pDb->pEnv, pDb->pDatabase->pClientMutex);
  1289. }
  1290. #endif
  1291. static int slotIsUsable(ShmReader *p, i64 iLsm, u32 iShmMin, u32 iShmMax){
  1292. return(
  1293. p->iLsmId && p->iLsmId<=iLsm
  1294. && shm_sequence_ge(iShmMax, p->iTreeId)
  1295. && shm_sequence_ge(p->iTreeId, iShmMin)
  1296. );
  1297. }
  1298. /*
  1299. ** Obtain a read-lock on database version identified by the combination
  1300. ** of snapshot iLsm and tree iTree. Return LSM_OK if successful, or
  1301. ** an LSM error code otherwise.
  1302. */
  1303. int lsmReadlock(lsm_db *db, i64 iLsm, u32 iShmMin, u32 iShmMax){
  1304. int rc = LSM_OK;
  1305. ShmHeader *pShm = db->pShmhdr;
  1306. int i;
  1307. assert( db->iReader<0 );
  1308. assert( shm_sequence_ge(iShmMax, iShmMin) );
  1309. /* This is a no-op if the read-only transaction flag is set. */
  1310. if( db->bRoTrans ){
  1311. db->iReader = 0;
  1312. return LSM_OK;
  1313. }
  1314. /* Search for an exact match. */
  1315. for(i=0; db->iReader<0 && rc==LSM_OK && i<LSM_LOCK_NREADER; i++){
  1316. ShmReader *p = &pShm->aReader[i];
  1317. if( p->iLsmId==iLsm && p->iTreeId==iShmMax ){
  1318. rc = lsmShmLock(db, LSM_LOCK_READER(i), LSM_LOCK_SHARED, 0);
  1319. if( rc==LSM_OK && p->iLsmId==iLsm && p->iTreeId==iShmMax ){
  1320. db->iReader = i;
  1321. }else if( rc==LSM_BUSY ){
  1322. rc = LSM_OK;
  1323. }
  1324. }
  1325. }
  1326. /* Try to obtain a write-lock on each slot, in order. If successful, set
  1327. ** the slot values to iLsm/iTree. */
  1328. for(i=0; db->iReader<0 && rc==LSM_OK && i<LSM_LOCK_NREADER; i++){
  1329. rc = lsmShmLock(db, LSM_LOCK_READER(i), LSM_LOCK_EXCL, 0);
  1330. if( rc==LSM_BUSY ){
  1331. rc = LSM_OK;
  1332. }else{
  1333. ShmReader *p = &pShm->aReader[i];
  1334. p->iLsmId = iLsm;
  1335. p->iTreeId = iShmMax;
  1336. rc = lsmShmLock(db, LSM_LOCK_READER(i), LSM_LOCK_SHARED, 0);
  1337. assert( rc!=LSM_BUSY );
  1338. if( rc==LSM_OK ) db->iReader = i;
  1339. }
  1340. }
  1341. /* Search for any usable slot */
  1342. for(i=0; db->iReader<0 && rc==LSM_OK && i<LSM_LOCK_NREADER; i++){
  1343. ShmReader *p = &pShm->aReader[i];
  1344. if( slotIsUsable(p, iLsm, iShmMin, iShmMax) ){
  1345. rc = lsmShmLock(db, LSM_LOCK_READER(i), LSM_LOCK_SHARED, 0);
  1346. if( rc==LSM_OK && slotIsUsable(p, iLsm, iShmMin, iShmMax) ){
  1347. db->iReader = i;
  1348. }else if( rc==LSM_BUSY ){
  1349. rc = LSM_OK;
  1350. }
  1351. }
  1352. }
  1353. if( rc==LSM_OK && db->iReader<0 ){
  1354. rc = LSM_BUSY;
  1355. }
  1356. return rc;
  1357. }
  1358. /*
  1359. ** This is used to check if there exists a read-lock locking a particular
  1360. ** version of either the in-memory tree or database file.
  1361. **
  1362. ** If iLsmId is non-zero, then it is a snapshot id. If there exists a
  1363. ** read-lock using this snapshot or newer, set *pbInUse to true. Or,
  1364. ** if there is no such read-lock, set it to false.
  1365. **
  1366. ** Or, if iLsmId is zero, then iShmid is a shared-memory sequence id.
  1367. ** Search for a read-lock using this sequence id or newer. etc.
  1368. */
  1369. static int isInUse(lsm_db *db, i64 iLsmId, u32 iShmid, int *pbInUse){
  1370. ShmHeader *pShm = db->pShmhdr;
  1371. int i;
  1372. int rc = LSM_OK;
  1373. for(i=0; rc==LSM_OK && i<LSM_LOCK_NREADER; i++){
  1374. ShmReader *p = &pShm->aReader[i];
  1375. if( p->iLsmId ){
  1376. if( (iLsmId!=0 && p->iLsmId!=0 && iLsmId>=p->iLsmId)
  1377. || (iLsmId==0 && shm_sequence_ge(p->iTreeId, iShmid))
  1378. ){
  1379. rc = lsmShmLock(db, LSM_LOCK_READER(i), LSM_LOCK_EXCL, 0);
  1380. if( rc==LSM_OK ){
  1381. p->iLsmId = 0;
  1382. lsmShmLock(db, LSM_LOCK_READER(i), LSM_LOCK_UNLOCK, 0);
  1383. }
  1384. }
  1385. }
  1386. }
  1387. if( rc==LSM_BUSY ){
  1388. *pbInUse = 1;
  1389. return LSM_OK;
  1390. }
  1391. *pbInUse = 0;
  1392. return rc;
  1393. }
  1394. /*
  1395. ** This function is called by worker connections to determine the smallest
  1396. ** snapshot id that is currently in use by a database client. The worker
  1397. ** connection uses this result to determine whether or not it is safe to
  1398. ** recycle a database block.
  1399. */
  1400. static int firstSnapshotInUse(
  1401. lsm_db *db, /* Database handle */
  1402. i64 *piInUse /* IN/OUT: Smallest snapshot id in use */
  1403. ){
  1404. ShmHeader *pShm = db->pShmhdr;
  1405. i64 iInUse = *piInUse;
  1406. int i;
  1407. assert( iInUse>0 );
  1408. for(i=0; i<LSM_LOCK_NREADER; i++){
  1409. ShmReader *p = &pShm->aReader[i];
  1410. if( p->iLsmId ){
  1411. i64 iThis = p->iLsmId;
  1412. if( iThis!=0 && iInUse>iThis ){
  1413. int rc = lsmShmLock(db, LSM_LOCK_READER(i), LSM_LOCK_EXCL, 0);
  1414. if( rc==LSM_OK ){
  1415. p->iLsmId = 0;
  1416. lsmShmLock(db, LSM_LOCK_READER(i), LSM_LOCK_UNLOCK, 0);
  1417. }else if( rc==LSM_BUSY ){
  1418. iInUse = iThis;
  1419. }else{
  1420. /* Some error other than LSM_BUSY. Return the error code to
  1421. ** the caller in this case. */
  1422. return rc;
  1423. }
  1424. }
  1425. }
  1426. }
  1427. *piInUse = iInUse;
  1428. return LSM_OK;
  1429. }
  1430. int lsmTreeInUse(lsm_db *db, u32 iShmid, int *pbInUse){
  1431. if( db->treehdr.iUsedShmid==iShmid ){
  1432. *pbInUse = 1;
  1433. return LSM_OK;
  1434. }
  1435. return isInUse(db, 0, iShmid, pbInUse);
  1436. }
  1437. int lsmLsmInUse(lsm_db *db, i64 iLsmId, int *pbInUse){
  1438. if( db->pClient && db->pClient->iId<=iLsmId ){
  1439. *pbInUse = 1;
  1440. return LSM_OK;
  1441. }
  1442. return isInUse(db, iLsmId, 0, pbInUse);
  1443. }
  1444. /*
  1445. ** This function may only be called after a successful call to
  1446. ** lsmDbDatabaseConnect(). It returns true if the connection is in
  1447. ** multi-process mode, or false otherwise.
  1448. */
  1449. int lsmDbMultiProc(lsm_db *pDb){
  1450. return pDb->pDatabase && pDb->pDatabase->bMultiProc;
  1451. }
  1452. /*************************************************************************
  1453. **************************************************************************
  1454. **************************************************************************
  1455. **************************************************************************
  1456. **************************************************************************
  1457. *************************************************************************/
  1458. /*
  1459. ** Ensure that database connection db has cached pointers to at least the
  1460. ** first nChunk chunks of shared memory.
  1461. */
  1462. int lsmShmCacheChunks(lsm_db *db, int nChunk){
  1463. int rc = LSM_OK;
  1464. if( nChunk>db->nShm ){
  1465. static const int NINCR = 16;
  1466. Database *p = db->pDatabase;
  1467. lsm_env *pEnv = db->pEnv;
  1468. int nAlloc;
  1469. int i;
  1470. /* Ensure that the db->apShm[] array is large enough. If an attempt to
  1471. ** allocate memory fails, return LSM_NOMEM immediately. The apShm[] array
  1472. ** is always extended in multiples of 16 entries - so the actual allocated
  1473. ** size can be inferred from nShm. */
  1474. nAlloc = ((db->nShm + NINCR - 1) / NINCR) * NINCR;
  1475. while( nChunk>=nAlloc ){
  1476. void **apShm;
  1477. nAlloc += NINCR;
  1478. apShm = lsmRealloc(pEnv, db->apShm, sizeof(void*)*nAlloc);
  1479. if( !apShm ) return LSM_NOMEM_BKPT;
  1480. db->apShm = apShm;
  1481. }
  1482. if( db->bRoTrans ){
  1483. for(i=db->nShm; rc==LSM_OK && i<nChunk; i++){
  1484. db->apShm[i] = lsmMallocZeroRc(pEnv, LSM_SHM_CHUNK_SIZE, &rc);
  1485. db->nShm++;
  1486. }
  1487. }else{
  1488. /* Enter the client mutex */
  1489. lsmMutexEnter(pEnv, p->pClientMutex);
  1490. /* Extend the Database objects apShmChunk[] array if necessary. Using the
  1491. ** same pattern as for the lsm_db.apShm[] array above. */
  1492. nAlloc = ((p->nShmChunk + NINCR - 1) / NINCR) * NINCR;
  1493. while( nChunk>=nAlloc ){
  1494. void **apShm;
  1495. nAlloc += NINCR;
  1496. apShm = lsmRealloc(pEnv, p->apShmChunk, sizeof(void*)*nAlloc);
  1497. if( !apShm ){
  1498. rc = LSM_NOMEM_BKPT;
  1499. break;
  1500. }
  1501. p->apShmChunk = apShm;
  1502. }
  1503. for(i=db->nShm; rc==LSM_OK && i<nChunk; i++){
  1504. if( i>=p->nShmChunk ){
  1505. void *pChunk = 0;
  1506. if( p->bMultiProc==0 ){
  1507. /* Single process mode */
  1508. pChunk = lsmMallocZeroRc(pEnv, LSM_SHM_CHUNK_SIZE, &rc);
  1509. }else{
  1510. /* Multi-process mode */
  1511. rc = lsmEnvShmMap(pEnv, p->pFile, i, LSM_SHM_CHUNK_SIZE, &pChunk);
  1512. }
  1513. if( rc==LSM_OK ){
  1514. p->apShmChunk[i] = pChunk;
  1515. p->nShmChunk++;
  1516. }
  1517. }
  1518. if( rc==LSM_OK ){
  1519. db->apShm[i] = p->apShmChunk[i];
  1520. db->nShm++;
  1521. }
  1522. }
  1523. /* Release the client mutex */
  1524. lsmMutexLeave(pEnv, p->pClientMutex);
  1525. }
  1526. }
  1527. return rc;
  1528. }
  1529. static int lockSharedFile(lsm_env *pEnv, Database *p, int iLock, int eOp){
  1530. int rc = LSM_OK;
  1531. if( p->bMultiProc ){
  1532. rc = lsmEnvLock(pEnv, p->pFile, iLock, eOp);
  1533. }
  1534. return rc;
  1535. }
  1536. /*
  1537. ** Test if it would be possible for connection db to obtain a lock of type
  1538. ** eType on the nLock locks starting at iLock. If so, return LSM_OK. If it
  1539. ** would not be possible to obtain the lock due to a lock held by another
  1540. ** connection, return LSM_BUSY. If an IO or other error occurs (i.e. in the
  1541. ** lsm_env.xTestLock function), return some other LSM error code.
  1542. **
  1543. ** Note that this function never actually locks the database - it merely
  1544. ** queries the system to see if there exists a lock that would prevent
  1545. ** it from doing so.
  1546. */
  1547. int lsmShmTestLock(
  1548. lsm_db *db,
  1549. int iLock,
  1550. int nLock,
  1551. int eOp
  1552. ){
  1553. int rc = LSM_OK;
  1554. lsm_db *pIter;
  1555. Database *p = db->pDatabase;
  1556. int i;
  1557. u64 mask = 0;
  1558. for(i=iLock; i<(iLock+nLock); i++){
  1559. mask |= ((u64)1 << (iLock-1));
  1560. if( eOp==LSM_LOCK_EXCL ) mask |= ((u64)1 << (iLock+32-1));
  1561. }
  1562. lsmMutexEnter(db->pEnv, p->pClientMutex);
  1563. for(pIter=p->pConn; pIter; pIter=pIter->pNext){
  1564. if( pIter!=db && (pIter->mLock & mask) ){
  1565. assert( pIter!=db );
  1566. break;
  1567. }
  1568. }
  1569. if( pIter ){
  1570. rc = LSM_BUSY;
  1571. }else if( p->bMultiProc ){
  1572. rc = lsmEnvTestLock(db->pEnv, p->pFile, iLock, nLock, eOp);
  1573. }
  1574. lsmMutexLeave(db->pEnv, p->pClientMutex);
  1575. return rc;
  1576. }
  1577. /*
  1578. ** Attempt to obtain the lock identified by the iLock and bExcl parameters.
  1579. ** If successful, return LSM_OK. If the lock cannot be obtained because
  1580. ** there exists some other conflicting lock, return LSM_BUSY. If some other
  1581. ** error occurs, return an LSM error code.
  1582. **
  1583. ** Parameter iLock must be one of LSM_LOCK_WRITER, WORKER or CHECKPOINTER,
  1584. ** or else a value returned by the LSM_LOCK_READER macro.
  1585. */
  1586. int lsmShmLock(
  1587. lsm_db *db,
  1588. int iLock,
  1589. int eOp, /* One of LSM_LOCK_UNLOCK, SHARED or EXCL */
  1590. int bBlock /* True for a blocking lock */
  1591. ){
  1592. lsm_db *pIter;
  1593. const u64 me = ((u64)1 << (iLock-1));
  1594. const u64 ms = ((u64)1 << (iLock+32-1));
  1595. int rc = LSM_OK;
  1596. Database *p = db->pDatabase;
  1597. assert( eOp!=LSM_LOCK_EXCL || p->bReadonly==0 );
  1598. assert( iLock>=1 && iLock<=LSM_LOCK_RWCLIENT(LSM_LOCK_NRWCLIENT-1) );
  1599. assert( LSM_LOCK_RWCLIENT(LSM_LOCK_NRWCLIENT-1)<=32 );
  1600. assert( eOp==LSM_LOCK_UNLOCK || eOp==LSM_LOCK_SHARED || eOp==LSM_LOCK_EXCL );
  1601. /* Check for a no-op. Proceed only if this is not one of those. */
  1602. if( (eOp==LSM_LOCK_UNLOCK && (db->mLock & (me|ms))!=0)
  1603. || (eOp==LSM_LOCK_SHARED && (db->mLock & (me|ms))!=ms)
  1604. || (eOp==LSM_LOCK_EXCL && (db->mLock & me)==0)
  1605. ){
  1606. int nExcl = 0; /* Number of connections holding EXCLUSIVE */
  1607. int nShared = 0; /* Number of connections holding SHARED */
  1608. lsmMutexEnter(db->pEnv, p->pClientMutex);
  1609. /* Figure out the locks currently held by this process on iLock, not
  1610. ** including any held by connection db. */
  1611. for(pIter=p->pConn; pIter; pIter=pIter->pNext){
  1612. assert( (pIter->mLock & me)==0 || (pIter->mLock & ms)!=0 );
  1613. if( pIter!=db ){
  1614. if( pIter->mLock & me ){
  1615. nExcl++;
  1616. }else if( pIter->mLock & ms ){
  1617. nShared++;
  1618. }
  1619. }
  1620. }
  1621. assert( nExcl==0 || nExcl==1 );
  1622. assert( nExcl==0 || nShared==0 );
  1623. assert( nExcl==0 || (db->mLock & (me|ms))==0 );
  1624. switch( eOp ){
  1625. case LSM_LOCK_UNLOCK:
  1626. if( nShared==0 ){
  1627. lockSharedFile(db->pEnv, p, iLock, LSM_LOCK_UNLOCK);
  1628. }
  1629. db->mLock &= ~(me|ms);
  1630. break;
  1631. case LSM_LOCK_SHARED:
  1632. if( nExcl ){
  1633. rc = LSM_BUSY;
  1634. }else{
  1635. if( nShared==0 ){
  1636. rc = lockSharedFile(db->pEnv, p, iLock, LSM_LOCK_SHARED);
  1637. }
  1638. if( rc==LSM_OK ){
  1639. db->mLock |= ms;
  1640. db->mLock &= ~me;
  1641. }
  1642. }
  1643. break;
  1644. default:
  1645. assert( eOp==LSM_LOCK_EXCL );
  1646. if( nExcl || nShared ){
  1647. rc = LSM_BUSY;
  1648. }else{
  1649. rc = lockSharedFile(db->pEnv, p, iLock, LSM_LOCK_EXCL);
  1650. if( rc==LSM_OK ){
  1651. db->mLock |= (me|ms);
  1652. }
  1653. }
  1654. break;
  1655. }
  1656. lsmMutexLeave(db->pEnv, p->pClientMutex);
  1657. }
  1658. return rc;
  1659. }
  1660. #ifdef LSM_DEBUG
  1661. int shmLockType(lsm_db *db, int iLock){
  1662. const u64 me = ((u64)1 << (iLock-1));
  1663. const u64 ms = ((u64)1 << (iLock+32-1));
  1664. if( db->mLock & me ) return LSM_LOCK_EXCL;
  1665. if( db->mLock & ms ) return LSM_LOCK_SHARED;
  1666. return LSM_LOCK_UNLOCK;
  1667. }
  1668. /*
  1669. ** The arguments passed to this function are similar to those passed to
  1670. ** the lsmShmLock() function. However, instead of obtaining a new lock
  1671. ** this function returns true if the specified connection already holds
  1672. ** (or does not hold) such a lock, depending on the value of eOp. As
  1673. ** follows:
  1674. **
  1675. ** (eOp==LSM_LOCK_UNLOCK) -> true if db has no lock on iLock
  1676. ** (eOp==LSM_LOCK_SHARED) -> true if db has at least a SHARED lock on iLock.
  1677. ** (eOp==LSM_LOCK_EXCL) -> true if db has an EXCLUSIVE lock on iLock.
  1678. */
  1679. int lsmShmAssertLock(lsm_db *db, int iLock, int eOp){
  1680. int ret = 0;
  1681. int eHave;
  1682. assert( iLock>=1 && iLock<=LSM_LOCK_READER(LSM_LOCK_NREADER-1) );
  1683. assert( iLock<=16 );
  1684. assert( eOp==LSM_LOCK_UNLOCK || eOp==LSM_LOCK_SHARED || eOp==LSM_LOCK_EXCL );
  1685. eHave = shmLockType(db, iLock);
  1686. switch( eOp ){
  1687. case LSM_LOCK_UNLOCK:
  1688. ret = (eHave==LSM_LOCK_UNLOCK);
  1689. break;
  1690. case LSM_LOCK_SHARED:
  1691. ret = (eHave!=LSM_LOCK_UNLOCK);
  1692. break;
  1693. case LSM_LOCK_EXCL:
  1694. ret = (eHave==LSM_LOCK_EXCL);
  1695. break;
  1696. default:
  1697. assert( !"bad eOp value passed to lsmShmAssertLock()" );
  1698. break;
  1699. }
  1700. return ret;
  1701. }
  1702. int lsmShmAssertWorker(lsm_db *db){
  1703. return lsmShmAssertLock(db, LSM_LOCK_WORKER, LSM_LOCK_EXCL) && db->pWorker;
  1704. }
  1705. /*
  1706. ** This function does not contribute to library functionality, and is not
  1707. ** included in release builds. It is intended to be called from within
  1708. ** an interactive debugger.
  1709. **
  1710. ** When called, this function prints a single line of human readable output
  1711. ** to stdout describing the locks currently held by the connection. For
  1712. ** example:
  1713. **
  1714. ** (gdb) call print_db_locks(pDb)
  1715. ** (shared on dms2) (exclusive on writer)
  1716. */
  1717. void print_db_locks(lsm_db *db){
  1718. int iLock;
  1719. for(iLock=0; iLock<16; iLock++){
  1720. int bOne = 0;
  1721. const char *azLock[] = {0, "shared", "exclusive"};
  1722. const char *azName[] = {
  1723. 0, "dms1", "dms2", "writer", "worker", "checkpointer",
  1724. "reader0", "reader1", "reader2", "reader3", "reader4", "reader5"
  1725. };
  1726. int eHave = shmLockType(db, iLock);
  1727. if( azLock[eHave] ){
  1728. printf("%s(%s on %s)", (bOne?" ":""), azLock[eHave], azName[iLock]);
  1729. bOne = 1;
  1730. }
  1731. }
  1732. printf("\n");
  1733. }
  1734. void print_all_db_locks(lsm_db *db){
  1735. lsm_db *p;
  1736. for(p=db->pDatabase->pConn; p; p=p->pNext){
  1737. printf("%s connection %p ", ((p==db)?"*":""), p);
  1738. print_db_locks(p);
  1739. }
  1740. }
  1741. #endif
  1742. void lsmShmBarrier(lsm_db *db){
  1743. lsmEnvShmBarrier(db->pEnv);
  1744. }
  1745. int lsm_checkpoint(lsm_db *pDb, int *pnKB){
  1746. int rc; /* Return code */
  1747. u32 nWrite = 0; /* Number of pages checkpointed */
  1748. /* Attempt the checkpoint. If successful, nWrite is set to the number of
  1749. ** pages written between this and the previous checkpoint. */
  1750. rc = lsmCheckpointWrite(pDb, &nWrite);
  1751. /* If required, calculate the output variable (KB of data checkpointed).
  1752. ** Set it to zero if an error occured. */
  1753. if( pnKB ){
  1754. int nKB = 0;
  1755. if( rc==LSM_OK && nWrite ){
  1756. nKB = (((i64)nWrite * lsmFsPageSize(pDb->pFS)) + 1023) / 1024;
  1757. }
  1758. *pnKB = nKB;
  1759. }
  1760. return rc;
  1761. }