lsm_main.c 27 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009
  1. /*
  2. ** 2011-08-18
  3. **
  4. ** The author disclaims copyright to this source code. In place of
  5. ** a legal notice, here is a blessing:
  6. **
  7. ** May you do good and not evil.
  8. ** May you find forgiveness for yourself and forgive others.
  9. ** May you share freely, never taking more than you give.
  10. **
  11. *************************************************************************
  12. **
  13. ** The main interface to the LSM module.
  14. */
  15. #include "lsmInt.h"
  16. #ifdef LSM_DEBUG
  17. /*
  18. ** This function returns a copy of its only argument.
  19. **
  20. ** When the library is built with LSM_DEBUG defined, this function is called
  21. ** whenever an error code is generated (not propagated - generated). So
  22. ** if the library is mysteriously returning (say) LSM_IOERR, a breakpoint
  23. ** may be set in this function to determine why.
  24. */
  25. int lsmErrorBkpt(int rc){
  26. /* Set breakpoint here! */
  27. return rc;
  28. }
  29. /*
  30. ** This function contains various assert() statements that test that the
  31. ** lsm_db structure passed as an argument is internally consistent.
  32. */
  33. static void assert_db_state(lsm_db *pDb){
  34. /* If there is at least one cursor or a write transaction open, the database
  35. ** handle must be holding a pointer to a client snapshot. And the reverse
  36. ** - if there are no open cursors and no write transactions then there must
  37. ** not be a client snapshot. */
  38. assert( (pDb->pCsr!=0||pDb->nTransOpen>0)==(pDb->iReader>=0||pDb->bRoTrans) );
  39. assert( (pDb->iReader<0 && pDb->bRoTrans==0) || pDb->pClient!=0 );
  40. assert( pDb->nTransOpen>=0 );
  41. }
  42. #else
  43. # define assert_db_state(x)
  44. #endif
  45. /*
  46. ** The default key-compare function.
  47. */
  48. static int xCmp(void *p1, int n1, void *p2, int n2){
  49. int res;
  50. res = memcmp(p1, p2, LSM_MIN(n1, n2));
  51. if( res==0 ) res = (n1-n2);
  52. return res;
  53. }
  54. static void xLog(void *pCtx, int rc, const char *z){
  55. (void)(rc);
  56. (void)(pCtx);
  57. fprintf(stderr, "%s\n", z);
  58. fflush(stderr);
  59. }
  60. /*
  61. ** Allocate a new db handle.
  62. */
  63. int lsm_new(lsm_env *pEnv, lsm_db **ppDb){
  64. lsm_db *pDb;
  65. /* If the user did not provide an environment, use the default. */
  66. if( pEnv==0 ) pEnv = lsm_default_env();
  67. assert( pEnv );
  68. /* Allocate the new database handle */
  69. *ppDb = pDb = (lsm_db *)lsmMallocZero(pEnv, sizeof(lsm_db));
  70. if( pDb==0 ) return LSM_NOMEM_BKPT;
  71. /* Initialize the new object */
  72. pDb->pEnv = pEnv;
  73. pDb->nTreeLimit = LSM_DFLT_AUTOFLUSH;
  74. pDb->nAutockpt = LSM_DFLT_AUTOCHECKPOINT;
  75. pDb->bAutowork = LSM_DFLT_AUTOWORK;
  76. pDb->eSafety = LSM_DFLT_SAFETY;
  77. pDb->xCmp = xCmp;
  78. pDb->nDfltPgsz = LSM_DFLT_PAGE_SIZE;
  79. pDb->nDfltBlksz = LSM_DFLT_BLOCK_SIZE;
  80. pDb->nMerge = LSM_DFLT_AUTOMERGE;
  81. pDb->nMaxFreelist = LSM_MAX_FREELIST_ENTRIES;
  82. pDb->bUseLog = LSM_DFLT_USE_LOG;
  83. pDb->iReader = -1;
  84. pDb->iRwclient = -1;
  85. pDb->bMultiProc = LSM_DFLT_MULTIPLE_PROCESSES;
  86. pDb->iMmap = LSM_DFLT_MMAP;
  87. pDb->xLog = xLog;
  88. pDb->compress.iId = LSM_COMPRESSION_NONE;
  89. return LSM_OK;
  90. }
  91. lsm_env *lsm_get_env(lsm_db *pDb){
  92. assert( pDb->pEnv );
  93. return pDb->pEnv;
  94. }
  95. /*
  96. ** If database handle pDb is currently holding a client snapshot, but does
  97. ** not have any open cursors or write transactions, release it.
  98. */
  99. static void dbReleaseClientSnapshot(lsm_db *pDb){
  100. if( pDb->nTransOpen==0 && pDb->pCsr==0 ){
  101. lsmFinishReadTrans(pDb);
  102. }
  103. }
  104. static int getFullpathname(
  105. lsm_env *pEnv,
  106. const char *zRel,
  107. char **pzAbs
  108. ){
  109. int nAlloc = 0;
  110. char *zAlloc = 0;
  111. int nReq = 0;
  112. int rc;
  113. do{
  114. nAlloc = nReq;
  115. rc = pEnv->xFullpath(pEnv, zRel, zAlloc, &nReq);
  116. if( nReq>nAlloc ){
  117. zAlloc = lsmReallocOrFreeRc(pEnv, zAlloc, nReq, &rc);
  118. }
  119. }while( nReq>nAlloc && rc==LSM_OK );
  120. if( rc!=LSM_OK ){
  121. lsmFree(pEnv, zAlloc);
  122. zAlloc = 0;
  123. }
  124. *pzAbs = zAlloc;
  125. return rc;
  126. }
  127. /*
  128. ** Check that the bits in the db->mLock mask are consistent with the
  129. ** value stored in db->iRwclient. An assert shall fail otherwise.
  130. */
  131. static void assertRwclientLockValue(lsm_db *db){
  132. #ifndef NDEBUG
  133. u64 msk; /* Mask of mLock bits for RWCLIENT locks */
  134. u64 rwclient = 0; /* Bit corresponding to db->iRwclient */
  135. if( db->iRwclient>=0 ){
  136. rwclient = ((u64)1 << (LSM_LOCK_RWCLIENT(db->iRwclient)-1));
  137. }
  138. msk = ((u64)1 << (LSM_LOCK_RWCLIENT(LSM_LOCK_NRWCLIENT)-1)) - 1;
  139. msk -= (((u64)1 << (LSM_LOCK_RWCLIENT(0)-1)) - 1);
  140. assert( (db->mLock & msk)==rwclient );
  141. #endif
  142. }
  143. /*
  144. ** Open a new connection to database zFilename.
  145. */
  146. int lsm_open(lsm_db *pDb, const char *zFilename){
  147. int rc;
  148. if( pDb->pDatabase ){
  149. rc = LSM_MISUSE;
  150. }else{
  151. char *zFull;
  152. /* Translate the possibly relative pathname supplied by the user into
  153. ** an absolute pathname. This is required because the supplied path
  154. ** is used (either directly or with "-log" appended to it) for more
  155. ** than one purpose - to open both the database and log files, and
  156. ** perhaps to unlink the log file during disconnection. An absolute
  157. ** path is required to ensure that the correct files are operated
  158. ** on even if the application changes the cwd. */
  159. rc = getFullpathname(pDb->pEnv, zFilename, &zFull);
  160. assert( rc==LSM_OK || zFull==0 );
  161. /* Connect to the database. */
  162. if( rc==LSM_OK ){
  163. rc = lsmDbDatabaseConnect(pDb, zFull);
  164. }
  165. if( pDb->bReadonly==0 ){
  166. /* Configure the file-system connection with the page-size and block-size
  167. ** of this database. Even if the database file is zero bytes in size
  168. ** on disk, these values have been set in shared-memory by now, and so
  169. ** are guaranteed not to change during the lifetime of this connection.
  170. */
  171. if( rc==LSM_OK && LSM_OK==(rc = lsmCheckpointLoad(pDb, 0)) ){
  172. lsmFsSetPageSize(pDb->pFS, lsmCheckpointPgsz(pDb->aSnapshot));
  173. lsmFsSetBlockSize(pDb->pFS, lsmCheckpointBlksz(pDb->aSnapshot));
  174. }
  175. }
  176. lsmFree(pDb->pEnv, zFull);
  177. assertRwclientLockValue(pDb);
  178. }
  179. assert( pDb->bReadonly==0 || pDb->bReadonly==1 );
  180. assert( rc!=LSM_OK || (pDb->pShmhdr==0)==(pDb->bReadonly==1) );
  181. return rc;
  182. }
  183. int lsm_close(lsm_db *pDb){
  184. int rc = LSM_OK;
  185. if( pDb ){
  186. assert_db_state(pDb);
  187. if( pDb->pCsr || pDb->nTransOpen ){
  188. rc = LSM_MISUSE_BKPT;
  189. }else{
  190. lsmMCursorFreeCache(pDb);
  191. lsmFreeSnapshot(pDb->pEnv, pDb->pClient);
  192. pDb->pClient = 0;
  193. assertRwclientLockValue(pDb);
  194. lsmDbDatabaseRelease(pDb);
  195. lsmLogClose(pDb);
  196. lsmFsClose(pDb->pFS);
  197. /* assert( pDb->mLock==0 ); */
  198. /* Invoke any destructors registered for the compression or
  199. ** compression factory callbacks. */
  200. if( pDb->factory.xFree ) pDb->factory.xFree(pDb->factory.pCtx);
  201. if( pDb->compress.xFree ) pDb->compress.xFree(pDb->compress.pCtx);
  202. lsmFree(pDb->pEnv, pDb->rollback.aArray);
  203. lsmFree(pDb->pEnv, pDb->aTrans);
  204. lsmFree(pDb->pEnv, pDb->apShm);
  205. lsmFree(pDb->pEnv, pDb);
  206. }
  207. }
  208. return rc;
  209. }
  210. int lsm_config(lsm_db *pDb, int eParam, ...){
  211. int rc = LSM_OK;
  212. va_list ap;
  213. va_start(ap, eParam);
  214. switch( eParam ){
  215. case LSM_CONFIG_AUTOFLUSH: {
  216. /* This parameter is read and written in KB. But all internal
  217. ** processing is done in bytes. */
  218. int *piVal = va_arg(ap, int *);
  219. int iVal = *piVal;
  220. if( iVal>=0 && iVal<=(1024*1024) ){
  221. pDb->nTreeLimit = iVal*1024;
  222. }
  223. *piVal = (pDb->nTreeLimit / 1024);
  224. break;
  225. }
  226. case LSM_CONFIG_AUTOWORK: {
  227. int *piVal = va_arg(ap, int *);
  228. if( *piVal>=0 ){
  229. pDb->bAutowork = *piVal;
  230. }
  231. *piVal = pDb->bAutowork;
  232. break;
  233. }
  234. case LSM_CONFIG_AUTOCHECKPOINT: {
  235. /* This parameter is read and written in KB. But all internal processing
  236. ** (including the lsm_db.nAutockpt variable) is done in bytes. */
  237. int *piVal = va_arg(ap, int *);
  238. if( *piVal>=0 ){
  239. int iVal = *piVal;
  240. pDb->nAutockpt = (i64)iVal * 1024;
  241. }
  242. *piVal = (int)(pDb->nAutockpt / 1024);
  243. break;
  244. }
  245. case LSM_CONFIG_PAGE_SIZE: {
  246. int *piVal = va_arg(ap, int *);
  247. if( pDb->pDatabase ){
  248. /* If lsm_open() has been called, this is a read-only parameter.
  249. ** Set the output variable to the page-size according to the
  250. ** FileSystem object. */
  251. *piVal = lsmFsPageSize(pDb->pFS);
  252. }else{
  253. if( *piVal>=256 && *piVal<=65536 && ((*piVal-1) & *piVal)==0 ){
  254. pDb->nDfltPgsz = *piVal;
  255. }else{
  256. *piVal = pDb->nDfltPgsz;
  257. }
  258. }
  259. break;
  260. }
  261. case LSM_CONFIG_BLOCK_SIZE: {
  262. /* This parameter is read and written in KB. But all internal
  263. ** processing is done in bytes. */
  264. int *piVal = va_arg(ap, int *);
  265. if( pDb->pDatabase ){
  266. /* If lsm_open() has been called, this is a read-only parameter.
  267. ** Set the output variable to the block-size in KB according to the
  268. ** FileSystem object. */
  269. *piVal = lsmFsBlockSize(pDb->pFS) / 1024;
  270. }else{
  271. int iVal = *piVal;
  272. if( iVal>=64 && iVal<=65536 && ((iVal-1) & iVal)==0 ){
  273. pDb->nDfltBlksz = iVal * 1024;
  274. }else{
  275. *piVal = pDb->nDfltBlksz / 1024;
  276. }
  277. }
  278. break;
  279. }
  280. case LSM_CONFIG_SAFETY: {
  281. int *piVal = va_arg(ap, int *);
  282. if( *piVal>=0 && *piVal<=2 ){
  283. pDb->eSafety = *piVal;
  284. }
  285. *piVal = pDb->eSafety;
  286. break;
  287. }
  288. case LSM_CONFIG_MMAP: {
  289. int *piVal = va_arg(ap, int *);
  290. if( pDb->iReader<0 && *piVal>=0 ){
  291. pDb->iMmap = *piVal;
  292. rc = lsmFsConfigure(pDb);
  293. }
  294. *piVal = pDb->iMmap;
  295. break;
  296. }
  297. case LSM_CONFIG_USE_LOG: {
  298. int *piVal = va_arg(ap, int *);
  299. if( pDb->nTransOpen==0 && (*piVal==0 || *piVal==1) ){
  300. pDb->bUseLog = *piVal;
  301. }
  302. *piVal = pDb->bUseLog;
  303. break;
  304. }
  305. case LSM_CONFIG_AUTOMERGE: {
  306. int *piVal = va_arg(ap, int *);
  307. if( *piVal>1 ) pDb->nMerge = *piVal;
  308. *piVal = pDb->nMerge;
  309. break;
  310. }
  311. case LSM_CONFIG_MAX_FREELIST: {
  312. int *piVal = va_arg(ap, int *);
  313. if( *piVal>=2 && *piVal<=LSM_MAX_FREELIST_ENTRIES ){
  314. pDb->nMaxFreelist = *piVal;
  315. }
  316. *piVal = pDb->nMaxFreelist;
  317. break;
  318. }
  319. case LSM_CONFIG_MULTIPLE_PROCESSES: {
  320. int *piVal = va_arg(ap, int *);
  321. if( pDb->pDatabase ){
  322. /* If lsm_open() has been called, this is a read-only parameter.
  323. ** Set the output variable to true if this connection is currently
  324. ** in multi-process mode. */
  325. *piVal = lsmDbMultiProc(pDb);
  326. }else{
  327. pDb->bMultiProc = *piVal = (*piVal!=0);
  328. }
  329. break;
  330. }
  331. case LSM_CONFIG_READONLY: {
  332. int *piVal = va_arg(ap, int *);
  333. /* If lsm_open() has been called, this is a read-only parameter. */
  334. if( pDb->pDatabase==0 && *piVal>=0 ){
  335. pDb->bReadonly = *piVal = (*piVal!=0);
  336. }
  337. *piVal = pDb->bReadonly;
  338. break;
  339. }
  340. case LSM_CONFIG_SET_COMPRESSION: {
  341. lsm_compress *p = va_arg(ap, lsm_compress *);
  342. if( pDb->iReader>=0 && pDb->bInFactory==0 ){
  343. /* May not change compression schemes with an open transaction */
  344. rc = LSM_MISUSE_BKPT;
  345. }else{
  346. if( pDb->compress.xFree ){
  347. /* Invoke any destructor belonging to the current compression. */
  348. pDb->compress.xFree(pDb->compress.pCtx);
  349. }
  350. if( p->xBound==0 ){
  351. memset(&pDb->compress, 0, sizeof(lsm_compress));
  352. pDb->compress.iId = LSM_COMPRESSION_NONE;
  353. }else{
  354. memcpy(&pDb->compress, p, sizeof(lsm_compress));
  355. }
  356. rc = lsmFsConfigure(pDb);
  357. }
  358. break;
  359. }
  360. case LSM_CONFIG_SET_COMPRESSION_FACTORY: {
  361. lsm_compress_factory *p = va_arg(ap, lsm_compress_factory *);
  362. if( pDb->factory.xFree ){
  363. /* Invoke any destructor belonging to the current factory. */
  364. pDb->factory.xFree(pDb->factory.pCtx);
  365. }
  366. memcpy(&pDb->factory, p, sizeof(lsm_compress_factory));
  367. break;
  368. }
  369. case LSM_CONFIG_GET_COMPRESSION: {
  370. lsm_compress *p = va_arg(ap, lsm_compress *);
  371. memcpy(p, &pDb->compress, sizeof(lsm_compress));
  372. break;
  373. }
  374. default:
  375. rc = LSM_MISUSE;
  376. break;
  377. }
  378. va_end(ap);
  379. return rc;
  380. }
  381. void lsmAppendSegmentList(LsmString *pStr, char *zPre, Segment *pSeg){
  382. lsmStringAppendf(pStr, "%s{%lld %lld %lld %lld}", zPre,
  383. pSeg->iFirst, pSeg->iLastPg, pSeg->iRoot, pSeg->nSize
  384. );
  385. }
  386. static int infoGetWorker(lsm_db *pDb, Snapshot **pp, int *pbUnlock){
  387. int rc = LSM_OK;
  388. assert( *pbUnlock==0 );
  389. if( !pDb->pWorker ){
  390. rc = lsmBeginWork(pDb);
  391. if( rc!=LSM_OK ) return rc;
  392. *pbUnlock = 1;
  393. }
  394. if( pp ) *pp = pDb->pWorker;
  395. return rc;
  396. }
  397. static void infoFreeWorker(lsm_db *pDb, int bUnlock){
  398. if( bUnlock ){
  399. int rcdummy = LSM_BUSY;
  400. lsmFinishWork(pDb, 0, &rcdummy);
  401. }
  402. }
  403. int lsmStructList(
  404. lsm_db *pDb, /* Database handle */
  405. char **pzOut /* OUT: Nul-terminated string (tcl list) */
  406. ){
  407. Level *pTopLevel = 0; /* Top level of snapshot to report on */
  408. int rc = LSM_OK;
  409. Level *p;
  410. LsmString s;
  411. Snapshot *pWorker; /* Worker snapshot */
  412. int bUnlock = 0;
  413. /* Obtain the worker snapshot */
  414. rc = infoGetWorker(pDb, &pWorker, &bUnlock);
  415. if( rc!=LSM_OK ) return rc;
  416. /* Format the contents of the snapshot as text */
  417. pTopLevel = lsmDbSnapshotLevel(pWorker);
  418. lsmStringInit(&s, pDb->pEnv);
  419. for(p=pTopLevel; rc==LSM_OK && p; p=p->pNext){
  420. int i;
  421. lsmStringAppendf(&s, "%s{%d", (s.n ? " " : ""), (int)p->iAge);
  422. lsmAppendSegmentList(&s, " ", &p->lhs);
  423. for(i=0; rc==LSM_OK && i<p->nRight; i++){
  424. lsmAppendSegmentList(&s, " ", &p->aRhs[i]);
  425. }
  426. lsmStringAppend(&s, "}", 1);
  427. }
  428. rc = s.n>=0 ? LSM_OK : LSM_NOMEM;
  429. /* Release the snapshot and return */
  430. infoFreeWorker(pDb, bUnlock);
  431. *pzOut = s.z;
  432. return rc;
  433. }
  434. static int infoFreelistCb(void *pCtx, int iBlk, i64 iSnapshot){
  435. LsmString *pStr = (LsmString *)pCtx;
  436. lsmStringAppendf(pStr, "%s{%d %lld}", (pStr->n?" ":""), iBlk, iSnapshot);
  437. return 0;
  438. }
  439. int lsmInfoFreelist(lsm_db *pDb, char **pzOut){
  440. Snapshot *pWorker; /* Worker snapshot */
  441. int bUnlock = 0;
  442. LsmString s;
  443. int rc;
  444. /* Obtain the worker snapshot */
  445. rc = infoGetWorker(pDb, &pWorker, &bUnlock);
  446. if( rc!=LSM_OK ) return rc;
  447. lsmStringInit(&s, pDb->pEnv);
  448. rc = lsmWalkFreelist(pDb, 0, infoFreelistCb, &s);
  449. if( rc!=LSM_OK ){
  450. lsmFree(pDb->pEnv, s.z);
  451. }else{
  452. *pzOut = s.z;
  453. }
  454. /* Release the snapshot and return */
  455. infoFreeWorker(pDb, bUnlock);
  456. return rc;
  457. }
  458. static int infoTreeSize(lsm_db *db, int *pnOldKB, int *pnNewKB){
  459. ShmHeader *pShm = db->pShmhdr;
  460. TreeHeader *p = &pShm->hdr1;
  461. /* The following code suffers from two race conditions, as it accesses and
  462. ** trusts the contents of shared memory without verifying checksums:
  463. **
  464. ** * The two values read - TreeHeader.root.nByte and oldroot.nByte - are
  465. ** 32-bit fields. It is assumed that reading from one of these
  466. ** is atomic - that it is not possible to read a partially written
  467. ** garbage value. However the two values may be mutually inconsistent.
  468. **
  469. ** * TreeHeader.iLogOff is a 64-bit value. And lsmCheckpointLogOffset()
  470. ** reads a 64-bit value from a snapshot stored in shared memory. It
  471. ** is assumed that in each case it is possible to read a partially
  472. ** written garbage value. If this occurs, then the value returned
  473. ** for the size of the "old" tree may reflect the size of an "old"
  474. ** tree that was recently flushed to disk.
  475. **
  476. ** Given the context in which this function is called (as a result of an
  477. ** lsm_info(LSM_INFO_TREE_SIZE) request), neither of these are considered to
  478. ** be problems.
  479. */
  480. *pnNewKB = ((int)p->root.nByte + 1023) / 1024;
  481. if( p->iOldShmid ){
  482. if( p->iOldLog==lsmCheckpointLogOffset(pShm->aSnap1) ){
  483. *pnOldKB = 0;
  484. }else{
  485. *pnOldKB = ((int)p->oldroot.nByte + 1023) / 1024;
  486. }
  487. }else{
  488. *pnOldKB = 0;
  489. }
  490. return LSM_OK;
  491. }
  492. int lsm_info(lsm_db *pDb, int eParam, ...){
  493. int rc = LSM_OK;
  494. va_list ap;
  495. va_start(ap, eParam);
  496. switch( eParam ){
  497. case LSM_INFO_NWRITE: {
  498. int *piVal = va_arg(ap, int *);
  499. *piVal = lsmFsNWrite(pDb->pFS);
  500. break;
  501. }
  502. case LSM_INFO_NREAD: {
  503. int *piVal = va_arg(ap, int *);
  504. *piVal = lsmFsNRead(pDb->pFS);
  505. break;
  506. }
  507. case LSM_INFO_DB_STRUCTURE: {
  508. char **pzVal = va_arg(ap, char **);
  509. rc = lsmStructList(pDb, pzVal);
  510. break;
  511. }
  512. case LSM_INFO_ARRAY_STRUCTURE: {
  513. LsmPgno pgno = va_arg(ap, LsmPgno);
  514. char **pzVal = va_arg(ap, char **);
  515. rc = lsmInfoArrayStructure(pDb, 0, pgno, pzVal);
  516. break;
  517. }
  518. case LSM_INFO_ARRAY_PAGES: {
  519. LsmPgno pgno = va_arg(ap, LsmPgno);
  520. char **pzVal = va_arg(ap, char **);
  521. rc = lsmInfoArrayPages(pDb, pgno, pzVal);
  522. break;
  523. }
  524. case LSM_INFO_PAGE_HEX_DUMP:
  525. case LSM_INFO_PAGE_ASCII_DUMP: {
  526. LsmPgno pgno = va_arg(ap, LsmPgno);
  527. char **pzVal = va_arg(ap, char **);
  528. int bUnlock = 0;
  529. rc = infoGetWorker(pDb, 0, &bUnlock);
  530. if( rc==LSM_OK ){
  531. int bHex = (eParam==LSM_INFO_PAGE_HEX_DUMP);
  532. rc = lsmInfoPageDump(pDb, pgno, bHex, pzVal);
  533. }
  534. infoFreeWorker(pDb, bUnlock);
  535. break;
  536. }
  537. case LSM_INFO_LOG_STRUCTURE: {
  538. char **pzVal = va_arg(ap, char **);
  539. rc = lsmInfoLogStructure(pDb, pzVal);
  540. break;
  541. }
  542. case LSM_INFO_FREELIST: {
  543. char **pzVal = va_arg(ap, char **);
  544. rc = lsmInfoFreelist(pDb, pzVal);
  545. break;
  546. }
  547. case LSM_INFO_CHECKPOINT_SIZE: {
  548. int *pnKB = va_arg(ap, int *);
  549. rc = lsmCheckpointSize(pDb, pnKB);
  550. break;
  551. }
  552. case LSM_INFO_TREE_SIZE: {
  553. int *pnOld = va_arg(ap, int *);
  554. int *pnNew = va_arg(ap, int *);
  555. rc = infoTreeSize(pDb, pnOld, pnNew);
  556. break;
  557. }
  558. case LSM_INFO_COMPRESSION_ID: {
  559. unsigned int *piOut = va_arg(ap, unsigned int *);
  560. if( pDb->pClient ){
  561. *piOut = pDb->pClient->iCmpId;
  562. }else{
  563. rc = lsmInfoCompressionId(pDb, piOut);
  564. }
  565. break;
  566. }
  567. default:
  568. rc = LSM_MISUSE;
  569. break;
  570. }
  571. va_end(ap);
  572. return rc;
  573. }
  574. static int doWriteOp(
  575. lsm_db *pDb,
  576. int bDeleteRange,
  577. const void *pKey, int nKey, /* Key to write or delete */
  578. const void *pVal, int nVal /* Value to write. Or nVal==-1 for a delete */
  579. ){
  580. int rc = LSM_OK; /* Return code */
  581. int bCommit = 0; /* True to commit before returning */
  582. if( pDb->nTransOpen==0 ){
  583. bCommit = 1;
  584. rc = lsm_begin(pDb, 1);
  585. }
  586. if( rc==LSM_OK ){
  587. int eType = (bDeleteRange ? LSM_DRANGE : (nVal>=0?LSM_WRITE:LSM_DELETE));
  588. rc = lsmLogWrite(pDb, eType, (void *)pKey, nKey, (void *)pVal, nVal);
  589. }
  590. lsmSortedSaveTreeCursors(pDb);
  591. if( rc==LSM_OK ){
  592. int pgsz = lsmFsPageSize(pDb->pFS);
  593. int nQuant = LSM_AUTOWORK_QUANT * pgsz;
  594. int nBefore;
  595. int nAfter;
  596. int nDiff;
  597. if( nQuant>pDb->nTreeLimit ){
  598. nQuant = LSM_MAX(pDb->nTreeLimit, pgsz);
  599. }
  600. nBefore = lsmTreeSize(pDb);
  601. if( bDeleteRange ){
  602. rc = lsmTreeDelete(pDb, (void *)pKey, nKey, (void *)pVal, nVal);
  603. }else{
  604. rc = lsmTreeInsert(pDb, (void *)pKey, nKey, (void *)pVal, nVal);
  605. }
  606. nAfter = lsmTreeSize(pDb);
  607. nDiff = (nAfter/nQuant) - (nBefore/nQuant);
  608. if( rc==LSM_OK && pDb->bAutowork && nDiff!=0 ){
  609. rc = lsmSortedAutoWork(pDb, nDiff * LSM_AUTOWORK_QUANT);
  610. }
  611. }
  612. /* If a transaction was opened at the start of this function, commit it.
  613. ** Or, if an error has occurred, roll it back. */
  614. if( bCommit ){
  615. if( rc==LSM_OK ){
  616. rc = lsm_commit(pDb, 0);
  617. }else{
  618. lsm_rollback(pDb, 0);
  619. }
  620. }
  621. return rc;
  622. }
  623. /*
  624. ** Write a new value into the database.
  625. */
  626. int lsm_insert(
  627. lsm_db *db, /* Database connection */
  628. const void *pKey, int nKey, /* Key to write or delete */
  629. const void *pVal, int nVal /* Value to write. Or nVal==-1 for a delete */
  630. ){
  631. return doWriteOp(db, 0, pKey, nKey, pVal, nVal);
  632. }
  633. /*
  634. ** Delete a value from the database.
  635. */
  636. int lsm_delete(lsm_db *db, const void *pKey, int nKey){
  637. return doWriteOp(db, 0, pKey, nKey, 0, -1);
  638. }
  639. /*
  640. ** Delete a range of database keys.
  641. */
  642. int lsm_delete_range(
  643. lsm_db *db, /* Database handle */
  644. const void *pKey1, int nKey1, /* Lower bound of range to delete */
  645. const void *pKey2, int nKey2 /* Upper bound of range to delete */
  646. ){
  647. int rc = LSM_OK;
  648. if( db->xCmp((void *)pKey1, nKey1, (void *)pKey2, nKey2)<0 ){
  649. rc = doWriteOp(db, 1, pKey1, nKey1, pKey2, nKey2);
  650. }
  651. return rc;
  652. }
  653. /*
  654. ** Open a new cursor handle.
  655. **
  656. ** If there are currently no other open cursor handles, and no open write
  657. ** transaction, open a read transaction here.
  658. */
  659. int lsm_csr_open(lsm_db *pDb, lsm_cursor **ppCsr){
  660. int rc = LSM_OK; /* Return code */
  661. MultiCursor *pCsr = 0; /* New cursor object */
  662. /* Open a read transaction if one is not already open. */
  663. assert_db_state(pDb);
  664. if( pDb->pShmhdr==0 ){
  665. assert( pDb->bReadonly );
  666. rc = lsmBeginRoTrans(pDb);
  667. }else if( pDb->iReader<0 ){
  668. rc = lsmBeginReadTrans(pDb);
  669. }
  670. /* Allocate the multi-cursor. */
  671. if( rc==LSM_OK ){
  672. rc = lsmMCursorNew(pDb, &pCsr);
  673. }
  674. /* If an error has occured, set the output to NULL and delete any partially
  675. ** allocated cursor. If this means there are no open cursors, release the
  676. ** client snapshot. */
  677. if( rc!=LSM_OK ){
  678. lsmMCursorClose(pCsr, 0);
  679. dbReleaseClientSnapshot(pDb);
  680. }
  681. assert_db_state(pDb);
  682. *ppCsr = (lsm_cursor *)pCsr;
  683. return rc;
  684. }
  685. /*
  686. ** Close a cursor opened using lsm_csr_open().
  687. */
  688. int lsm_csr_close(lsm_cursor *p){
  689. if( p ){
  690. lsm_db *pDb = lsmMCursorDb((MultiCursor *)p);
  691. assert_db_state(pDb);
  692. lsmMCursorClose((MultiCursor *)p, 1);
  693. dbReleaseClientSnapshot(pDb);
  694. assert_db_state(pDb);
  695. }
  696. return LSM_OK;
  697. }
  698. /*
  699. ** Attempt to seek the cursor to the database entry specified by pKey/nKey.
  700. ** If an error occurs (e.g. an OOM or IO error), return an LSM error code.
  701. ** Otherwise, return LSM_OK.
  702. */
  703. int lsm_csr_seek(lsm_cursor *pCsr, const void *pKey, int nKey, int eSeek){
  704. return lsmMCursorSeek((MultiCursor *)pCsr, 0, (void *)pKey, nKey, eSeek);
  705. }
  706. int lsm_csr_next(lsm_cursor *pCsr){
  707. return lsmMCursorNext((MultiCursor *)pCsr);
  708. }
  709. int lsm_csr_prev(lsm_cursor *pCsr){
  710. return lsmMCursorPrev((MultiCursor *)pCsr);
  711. }
  712. int lsm_csr_first(lsm_cursor *pCsr){
  713. return lsmMCursorFirst((MultiCursor *)pCsr);
  714. }
  715. int lsm_csr_last(lsm_cursor *pCsr){
  716. return lsmMCursorLast((MultiCursor *)pCsr);
  717. }
  718. int lsm_csr_valid(lsm_cursor *pCsr){
  719. return lsmMCursorValid((MultiCursor *)pCsr);
  720. }
  721. int lsm_csr_key(lsm_cursor *pCsr, const void **ppKey, int *pnKey){
  722. return lsmMCursorKey((MultiCursor *)pCsr, (void **)ppKey, pnKey);
  723. }
  724. int lsm_csr_value(lsm_cursor *pCsr, const void **ppVal, int *pnVal){
  725. return lsmMCursorValue((MultiCursor *)pCsr, (void **)ppVal, pnVal);
  726. }
  727. void lsm_config_log(
  728. lsm_db *pDb,
  729. void (*xLog)(void *, int, const char *),
  730. void *pCtx
  731. ){
  732. pDb->xLog = xLog;
  733. pDb->pLogCtx = pCtx;
  734. }
  735. void lsm_config_work_hook(
  736. lsm_db *pDb,
  737. void (*xWork)(lsm_db *, void *),
  738. void *pCtx
  739. ){
  740. pDb->xWork = xWork;
  741. pDb->pWorkCtx = pCtx;
  742. }
  743. void lsmLogMessage(lsm_db *pDb, int rc, const char *zFormat, ...){
  744. if( pDb->xLog ){
  745. LsmString s;
  746. va_list ap, ap2;
  747. lsmStringInit(&s, pDb->pEnv);
  748. va_start(ap, zFormat);
  749. va_start(ap2, zFormat);
  750. lsmStringVAppendf(&s, zFormat, ap, ap2);
  751. va_end(ap);
  752. va_end(ap2);
  753. pDb->xLog(pDb->pLogCtx, rc, s.z);
  754. lsmStringClear(&s);
  755. }
  756. }
  757. int lsm_begin(lsm_db *pDb, int iLevel){
  758. int rc;
  759. assert_db_state( pDb );
  760. rc = (pDb->bReadonly ? LSM_READONLY : LSM_OK);
  761. /* A value less than zero means open one more transaction. */
  762. if( iLevel<0 ) iLevel = pDb->nTransOpen + 1;
  763. if( iLevel>pDb->nTransOpen ){
  764. int i;
  765. /* Extend the pDb->aTrans[] array if required. */
  766. if( rc==LSM_OK && pDb->nTransAlloc<iLevel ){
  767. TransMark *aNew; /* New allocation */
  768. int nByte = sizeof(TransMark) * (iLevel+1);
  769. aNew = (TransMark *)lsmRealloc(pDb->pEnv, pDb->aTrans, nByte);
  770. if( !aNew ){
  771. rc = LSM_NOMEM;
  772. }else{
  773. nByte = sizeof(TransMark) * (iLevel+1 - pDb->nTransAlloc);
  774. memset(&aNew[pDb->nTransAlloc], 0, nByte);
  775. pDb->nTransAlloc = iLevel+1;
  776. pDb->aTrans = aNew;
  777. }
  778. }
  779. if( rc==LSM_OK && pDb->nTransOpen==0 ){
  780. rc = lsmBeginWriteTrans(pDb);
  781. }
  782. if( rc==LSM_OK ){
  783. for(i=pDb->nTransOpen; i<iLevel; i++){
  784. lsmTreeMark(pDb, &pDb->aTrans[i].tree);
  785. lsmLogTell(pDb, &pDb->aTrans[i].log);
  786. }
  787. pDb->nTransOpen = iLevel;
  788. }
  789. }
  790. return rc;
  791. }
  792. int lsm_commit(lsm_db *pDb, int iLevel){
  793. int rc = LSM_OK;
  794. assert_db_state( pDb );
  795. /* A value less than zero means close the innermost nested transaction. */
  796. if( iLevel<0 ) iLevel = LSM_MAX(0, pDb->nTransOpen - 1);
  797. if( iLevel<pDb->nTransOpen ){
  798. if( iLevel==0 ){
  799. int rc2;
  800. /* Commit the transaction to disk. */
  801. if( rc==LSM_OK ) rc = lsmLogCommit(pDb);
  802. if( rc==LSM_OK && pDb->eSafety==LSM_SAFETY_FULL ){
  803. rc = lsmFsSyncLog(pDb->pFS);
  804. }
  805. rc2 = lsmFinishWriteTrans(pDb, (rc==LSM_OK));
  806. if( rc==LSM_OK ) rc = rc2;
  807. }
  808. pDb->nTransOpen = iLevel;
  809. }
  810. dbReleaseClientSnapshot(pDb);
  811. return rc;
  812. }
  813. int lsm_rollback(lsm_db *pDb, int iLevel){
  814. int rc = LSM_OK;
  815. assert_db_state( pDb );
  816. if( pDb->nTransOpen ){
  817. /* A value less than zero means close the innermost nested transaction. */
  818. if( iLevel<0 ) iLevel = LSM_MAX(0, pDb->nTransOpen - 1);
  819. if( iLevel<=pDb->nTransOpen ){
  820. TransMark *pMark = &pDb->aTrans[(iLevel==0 ? 0 : iLevel-1)];
  821. lsmTreeRollback(pDb, &pMark->tree);
  822. if( iLevel ) lsmLogSeek(pDb, &pMark->log);
  823. pDb->nTransOpen = iLevel;
  824. }
  825. if( pDb->nTransOpen==0 ){
  826. lsmFinishWriteTrans(pDb, 0);
  827. }
  828. dbReleaseClientSnapshot(pDb);
  829. }
  830. return rc;
  831. }
  832. int lsm_get_user_version(lsm_db *pDb, unsigned int *piUsr){
  833. int rc = LSM_OK; /* Return code */
  834. /* Open a read transaction if one is not already open. */
  835. assert_db_state(pDb);
  836. if( pDb->pShmhdr==0 ){
  837. assert( pDb->bReadonly );
  838. rc = lsmBeginRoTrans(pDb);
  839. }else if( pDb->iReader<0 ){
  840. rc = lsmBeginReadTrans(pDb);
  841. }
  842. /* Allocate the multi-cursor. */
  843. if( rc==LSM_OK ){
  844. *piUsr = pDb->treehdr.iUsrVersion;
  845. }
  846. dbReleaseClientSnapshot(pDb);
  847. assert_db_state(pDb);
  848. return rc;
  849. }
  850. int lsm_set_user_version(lsm_db *pDb, unsigned int iUsr){
  851. int rc = LSM_OK; /* Return code */
  852. int bCommit = 0; /* True to commit before returning */
  853. if( pDb->nTransOpen==0 ){
  854. bCommit = 1;
  855. rc = lsm_begin(pDb, 1);
  856. }
  857. if( rc==LSM_OK ){
  858. pDb->treehdr.iUsrVersion = iUsr;
  859. }
  860. /* If a transaction was opened at the start of this function, commit it.
  861. ** Or, if an error has occurred, roll it back. */
  862. if( bCommit ){
  863. if( rc==LSM_OK ){
  864. rc = lsm_commit(pDb, 0);
  865. }else{
  866. lsm_rollback(pDb, 0);
  867. }
  868. }
  869. return rc;
  870. }