lsm_tree.c 72 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013201420152016201720182019202020212022202320242025202620272028202920302031203220332034203520362037203820392040204120422043204420452046204720482049205020512052205320542055205620572058205920602061206220632064206520662067206820692070207120722073207420752076207720782079208020812082208320842085208620872088208920902091209220932094209520962097209820992100210121022103210421052106210721082109211021112112211321142115211621172118211921202121212221232124212521262127212821292130213121322133213421352136213721382139214021412142214321442145214621472148214921502151215221532154215521562157215821592160216121622163216421652166216721682169217021712172217321742175217621772178217921802181218221832184218521862187218821892190219121922193219421952196219721982199220022012202220322042205220622072208220922102211221222132214221522162217221822192220222122222223222422252226222722282229223022312232223322342235223622372238223922402241224222432244224522462247224822492250225122522253225422552256225722582259226022612262226322642265226622672268226922702271227222732274227522762277227822792280228122822283228422852286228722882289229022912292229322942295229622972298229923002301230223032304230523062307230823092310231123122313231423152316231723182319232023212322232323242325232623272328232923302331233223332334233523362337233823392340234123422343234423452346234723482349235023512352235323542355235623572358235923602361236223632364236523662367236823692370237123722373237423752376237723782379238023812382238323842385238623872388238923902391239223932394239523962397239823992400240124022403240424052406240724082409241024112412241324142415241624172418241924202421242224232424242524262427242824292430243124322433243424352436243724382439244024412442244324442445244624472448244924502451245224532454245524562457245824592460246124622463246424652466
  1. /*
  2. ** 2011-08-18
  3. **
  4. ** The author disclaims copyright to this source code. In place of
  5. ** a legal notice, here is a blessing:
  6. **
  7. ** May you do good and not evil.
  8. ** May you find forgiveness for yourself and forgive others.
  9. ** May you share freely, never taking more than you give.
  10. **
  11. *************************************************************************
  12. **
  13. ** This file contains the implementation of an in-memory tree structure.
  14. **
  15. ** Technically the tree is a B-tree of order 4 (in the Knuth sense - each
  16. ** node may have up to 4 children). Keys are stored within B-tree nodes by
  17. ** reference. This may be slightly slower than a conventional red-black
  18. ** tree, but it is simpler. It is also an easier structure to modify to
  19. ** create a version that supports nested transaction rollback.
  20. **
  21. ** This tree does not currently support a delete operation. One is not
  22. ** required. When LSM deletes a key from a database, it inserts a DELETE
  23. ** marker into the data structure. As a result, although the value associated
  24. ** with a key stored in the in-memory tree structure may be modified, no
  25. ** keys are ever removed.
  26. */
  27. /*
  28. ** MVCC NOTES
  29. **
  30. ** The in-memory tree structure supports SQLite-style MVCC. This means
  31. ** that while one client is writing to the tree structure, other clients
  32. ** may still be querying an older snapshot of the tree.
  33. **
  34. ** One way to implement this is to use an append-only b-tree. In this
  35. ** case instead of modifying nodes in-place, a copy of the node is made
  36. ** and the required modifications made to the copy. The parent of the
  37. ** node is then modified (to update the pointer so that it points to
  38. ** the new copy), which causes a copy of the parent to be made, and so on.
  39. ** This means that each time the tree is written to a new root node is
  40. ** created. A snapshot is identified by the root node that it uses.
  41. **
  42. ** The problem with the above is that each time the tree is written to,
  43. ** a copy of the node structure modified and all of its ancestor nodes
  44. ** is made. This may prove excessive with large tree structures.
  45. **
  46. ** To reduce this overhead, the data structure used for a tree node is
  47. ** designed so that it may be edited in place exactly once without
  48. ** affecting existing users. In other words, the node structure is capable
  49. ** of storing two separate versions of the node at the same time.
  50. ** When a node is to be edited, if the node structure already contains
  51. ** two versions, a copy is made as in the append-only approach. Or, if
  52. ** it only contains a single version, it is edited in place.
  53. **
  54. ** This reduces the overhead so that, roughly, one new node structure
  55. ** must be allocated for each write (on top of those allocations that
  56. ** would have been required by a non-MVCC tree). Logic: Assume that at
  57. ** any time, 50% of nodes in the tree already contain 2 versions. When
  58. ** a new entry is written to a node, there is a 50% chance that a copy
  59. ** of the node will be required. And a 25% chance that a copy of its
  60. ** parent is required. And so on.
  61. **
  62. ** ROLLBACK
  63. **
  64. ** The in-memory tree also supports transaction and sub-transaction
  65. ** rollback. In order to rollback to point in time X, the following is
  66. ** necessary:
  67. **
  68. ** 1. All memory allocated since X must be freed, and
  69. ** 2. All "v2" data adding to nodes that existed at X should be zeroed.
  70. ** 3. The root node must be restored to its X value.
  71. **
  72. ** The Mempool object used to allocate memory for the tree supports
  73. ** operation (1) - see the lsmPoolMark() and lsmPoolRevert() functions.
  74. **
  75. ** To support (2), all nodes that have v2 data are part of a singly linked
  76. ** list, sorted by the age of the v2 data (nodes that have had data added
  77. ** most recently are at the end of the list). So to zero all v2 data added
  78. ** since X, the linked list is traversed from the first node added following
  79. ** X onwards.
  80. **
  81. */
  82. #ifndef _LSM_INT_H
  83. # include "lsmInt.h"
  84. #endif
  85. #include <string.h>
  86. #define MAX_DEPTH 32
  87. typedef struct TreeKey TreeKey;
  88. typedef struct TreeNode TreeNode;
  89. typedef struct TreeLeaf TreeLeaf;
  90. typedef struct NodeVersion NodeVersion;
  91. struct TreeOld {
  92. u32 iShmid; /* Last shared-memory chunk in use by old */
  93. u32 iRoot; /* Offset of root node in shm file */
  94. u32 nHeight; /* Height of tree structure */
  95. };
  96. #if 0
  97. /*
  98. ** assert() that a TreeKey.flags value is sane. Usage:
  99. **
  100. ** assert( lsmAssertFlagsOk(pTreeKey->flags) );
  101. */
  102. static int lsmAssertFlagsOk(u8 keyflags){
  103. /* At least one flag must be set. Otherwise, what is this key doing? */
  104. assert( keyflags!=0 );
  105. /* The POINT_DELETE and INSERT flags cannot both be set. */
  106. assert( (keyflags & LSM_POINT_DELETE)==0 || (keyflags & LSM_INSERT)==0 );
  107. /* If both the START_DELETE and END_DELETE flags are set, then the INSERT
  108. ** flag must also be set. In other words - the three DELETE flags cannot
  109. ** all be set */
  110. assert( (keyflags & LSM_END_DELETE)==0
  111. || (keyflags & LSM_START_DELETE)==0
  112. || (keyflags & LSM_POINT_DELETE)==0
  113. );
  114. return 1;
  115. }
  116. #endif
  117. static int assert_delete_ranges_match(lsm_db *);
  118. static int treeCountEntries(lsm_db *db);
  119. /*
  120. ** Container for a key-value pair. Within the *-shm file, each key/value
  121. ** pair is stored in a single allocation (which may not actually be
  122. ** contiguous in memory). Layout is the TreeKey structure, followed by
  123. ** the nKey bytes of key blob, followed by the nValue bytes of value blob
  124. ** (if nValue is non-negative).
  125. */
  126. struct TreeKey {
  127. int nKey; /* Size of pKey in bytes */
  128. int nValue; /* Size of pValue. Or negative. */
  129. u8 flags; /* Various LSM_XXX flags */
  130. };
  131. #define TKV_KEY(p) ((void *)&(p)[1])
  132. #define TKV_VAL(p) ((void *)(((u8 *)&(p)[1]) + (p)->nKey))
  133. /*
  134. ** A single tree node. A node structure may contain up to 3 key/value
  135. ** pairs. Internal (non-leaf) nodes have up to 4 children.
  136. **
  137. ** TODO: Update the format of this to be more compact. Get it working
  138. ** first though...
  139. */
  140. struct TreeNode {
  141. u32 aiKeyPtr[3]; /* Array of pointers to TreeKey objects */
  142. /* The following fields are present for interior nodes only, not leaves. */
  143. u32 aiChildPtr[4]; /* Array of pointers to child nodes */
  144. /* The extra child pointer slot. */
  145. u32 iV2; /* Transaction number of v2 */
  146. u8 iV2Child; /* apChild[] entry replaced by pV2Ptr */
  147. u32 iV2Ptr; /* Substitute pointer */
  148. };
  149. struct TreeLeaf {
  150. u32 aiKeyPtr[3]; /* Array of pointers to TreeKey objects */
  151. };
  152. typedef struct TreeBlob TreeBlob;
  153. struct TreeBlob {
  154. int n;
  155. u8 *a;
  156. };
  157. /*
  158. ** Cursor for searching a tree structure.
  159. **
  160. ** If a cursor does not point to any element (a.k.a. EOF), then the
  161. ** TreeCursor.iNode variable is set to a negative value. Otherwise, the
  162. ** cursor currently points to key aiCell[iNode] on node apTreeNode[iNode].
  163. **
  164. ** Entries in the apTreeNode[] and aiCell[] arrays contain the node and
  165. ** index of the TreeNode.apChild[] pointer followed to descend to the
  166. ** current element. Hence apTreeNode[0] always contains the root node of
  167. ** the tree.
  168. */
  169. struct TreeCursor {
  170. lsm_db *pDb; /* Database handle for this cursor */
  171. TreeRoot *pRoot; /* Root node and height of tree to access */
  172. int iNode; /* Cursor points at apTreeNode[iNode] */
  173. TreeNode *apTreeNode[MAX_DEPTH];/* Current position in tree */
  174. u8 aiCell[MAX_DEPTH]; /* Current position in tree */
  175. TreeKey *pSave; /* Saved key */
  176. TreeBlob blob; /* Dynamic storage for a key */
  177. };
  178. /*
  179. ** A value guaranteed to be larger than the largest possible transaction
  180. ** id (TreeHeader.iTransId).
  181. */
  182. #define WORKING_VERSION (1<<30)
  183. static int tblobGrow(lsm_db *pDb, TreeBlob *p, int n, int *pRc){
  184. if( n>p->n ){
  185. lsmFree(pDb->pEnv, p->a);
  186. p->a = lsmMallocRc(pDb->pEnv, n, pRc);
  187. p->n = n;
  188. }
  189. return (p->a==0);
  190. }
  191. static void tblobFree(lsm_db *pDb, TreeBlob *p){
  192. lsmFree(pDb->pEnv, p->a);
  193. }
  194. /***********************************************************************
  195. ** Start of IntArray methods. */
  196. /*
  197. ** Append value iVal to the contents of IntArray *p. Return LSM_OK if
  198. ** successful, or LSM_NOMEM if an OOM condition is encountered.
  199. */
  200. static int intArrayAppend(lsm_env *pEnv, IntArray *p, u32 iVal){
  201. assert( p->nArray<=p->nAlloc );
  202. if( p->nArray>=p->nAlloc ){
  203. u32 *aNew;
  204. int nNew = p->nArray ? p->nArray*2 : 128;
  205. aNew = lsmRealloc(pEnv, p->aArray, nNew*sizeof(u32));
  206. if( !aNew ) return LSM_NOMEM_BKPT;
  207. p->aArray = aNew;
  208. p->nAlloc = nNew;
  209. }
  210. p->aArray[p->nArray++] = iVal;
  211. return LSM_OK;
  212. }
  213. /*
  214. ** Zero the IntArray object.
  215. */
  216. static void intArrayFree(lsm_env *pEnv, IntArray *p){
  217. p->nArray = 0;
  218. }
  219. /*
  220. ** Return the number of entries currently in the int-array object.
  221. */
  222. static int intArraySize(IntArray *p){
  223. return p->nArray;
  224. }
  225. /*
  226. ** Return a copy of the iIdx'th entry in the int-array.
  227. */
  228. static u32 intArrayEntry(IntArray *p, int iIdx){
  229. return p->aArray[iIdx];
  230. }
  231. /*
  232. ** Truncate the int-array so that all but the first nVal values are
  233. ** discarded.
  234. */
  235. static void intArrayTruncate(IntArray *p, int nVal){
  236. p->nArray = nVal;
  237. }
  238. /* End of IntArray methods.
  239. ***********************************************************************/
  240. static int treeKeycmp(void *p1, int n1, void *p2, int n2){
  241. int res;
  242. res = memcmp(p1, p2, LSM_MIN(n1, n2));
  243. if( res==0 ) res = (n1-n2);
  244. return res;
  245. }
  246. /*
  247. ** The pointer passed as the first argument points to an interior node,
  248. ** not a leaf. This function returns the offset of the iCell'th child
  249. ** sub-tree of the node.
  250. */
  251. static u32 getChildPtr(TreeNode *p, int iVersion, int iCell){
  252. assert( iVersion>=0 );
  253. assert( iCell>=0 && iCell<=array_size(p->aiChildPtr) );
  254. if( p->iV2 && p->iV2<=(u32)iVersion && iCell==p->iV2Child ) return p->iV2Ptr;
  255. return p->aiChildPtr[iCell];
  256. }
  257. /*
  258. ** Given an offset within the *-shm file, return the associated chunk number.
  259. */
  260. static int treeOffsetToChunk(u32 iOff){
  261. assert( LSM_SHM_CHUNK_SIZE==(1<<15) );
  262. return (int)(iOff>>15);
  263. }
  264. #define treeShmptrUnsafe(pDb, iPtr) \
  265. (&((u8*)((pDb)->apShm[(iPtr)>>15]))[(iPtr) & (LSM_SHM_CHUNK_SIZE-1)])
  266. /*
  267. ** Return a pointer to the mapped memory location associated with *-shm
  268. ** file offset iPtr.
  269. */
  270. static void *treeShmptr(lsm_db *pDb, u32 iPtr){
  271. assert( (iPtr>>15)<(u32)pDb->nShm );
  272. assert( pDb->apShm[iPtr>>15] );
  273. return iPtr ? treeShmptrUnsafe(pDb, iPtr) : 0;
  274. }
  275. static ShmChunk * treeShmChunk(lsm_db *pDb, int iChunk){
  276. return (ShmChunk *)(pDb->apShm[iChunk]);
  277. }
  278. static ShmChunk * treeShmChunkRc(lsm_db *pDb, int iChunk, int *pRc){
  279. assert( *pRc==LSM_OK );
  280. if( iChunk<pDb->nShm || LSM_OK==(*pRc = lsmShmCacheChunks(pDb, iChunk+1)) ){
  281. return (ShmChunk *)(pDb->apShm[iChunk]);
  282. }
  283. return 0;
  284. }
  285. #ifndef NDEBUG
  286. static void assertIsWorkingChild(
  287. lsm_db *db,
  288. TreeNode *pNode,
  289. TreeNode *pParent,
  290. int iCell
  291. ){
  292. TreeNode *p;
  293. u32 iPtr = getChildPtr(pParent, WORKING_VERSION, iCell);
  294. p = treeShmptr(db, iPtr);
  295. assert( p==pNode );
  296. }
  297. #else
  298. # define assertIsWorkingChild(w,x,y,z)
  299. #endif
  300. /* Values for the third argument to treeShmkey(). */
  301. #define TKV_LOADKEY 1
  302. #define TKV_LOADVAL 2
  303. static TreeKey *treeShmkey(
  304. lsm_db *pDb, /* Database handle */
  305. u32 iPtr, /* Shmptr to TreeKey struct */
  306. int eLoad, /* Either zero or a TREEKEY_LOADXXX value */
  307. TreeBlob *pBlob, /* Used if dynamic memory is required */
  308. int *pRc /* IN/OUT: Error code */
  309. ){
  310. TreeKey *pRet;
  311. assert( eLoad==TKV_LOADKEY || eLoad==TKV_LOADVAL );
  312. pRet = (TreeKey *)treeShmptr(pDb, iPtr);
  313. if( pRet ){
  314. int nReq; /* Bytes of space required at pRet */
  315. int nAvail; /* Bytes of space available at pRet */
  316. nReq = sizeof(TreeKey) + pRet->nKey;
  317. if( eLoad==TKV_LOADVAL && pRet->nValue>0 ){
  318. nReq += pRet->nValue;
  319. }
  320. assert( LSM_SHM_CHUNK_SIZE==(1<<15) );
  321. nAvail = LSM_SHM_CHUNK_SIZE - (iPtr & (LSM_SHM_CHUNK_SIZE-1));
  322. if( nAvail<nReq ){
  323. if( tblobGrow(pDb, pBlob, nReq, pRc)==0 ){
  324. int nLoad = 0;
  325. while( *pRc==LSM_OK ){
  326. ShmChunk *pChunk;
  327. void *p = treeShmptr(pDb, iPtr);
  328. int n = LSM_MIN(nAvail, nReq-nLoad);
  329. memcpy(&pBlob->a[nLoad], p, n);
  330. nLoad += n;
  331. if( nLoad==nReq ) break;
  332. pChunk = treeShmChunk(pDb, treeOffsetToChunk(iPtr));
  333. assert( pChunk );
  334. iPtr = (pChunk->iNext * LSM_SHM_CHUNK_SIZE) + LSM_SHM_CHUNK_HDR;
  335. nAvail = LSM_SHM_CHUNK_SIZE - LSM_SHM_CHUNK_HDR;
  336. }
  337. }
  338. pRet = (TreeKey *)(pBlob->a);
  339. }
  340. }
  341. return pRet;
  342. }
  343. #if defined(LSM_DEBUG) && defined(LSM_EXPENSIVE_ASSERT)
  344. void assert_leaf_looks_ok(TreeNode *pNode){
  345. assert( pNode->apKey[1] );
  346. }
  347. void assert_node_looks_ok(TreeNode *pNode, int nHeight){
  348. if( pNode ){
  349. assert( pNode->apKey[1] );
  350. if( nHeight>1 ){
  351. int i;
  352. assert( getChildPtr(pNode, WORKING_VERSION, 1) );
  353. assert( getChildPtr(pNode, WORKING_VERSION, 2) );
  354. for(i=0; i<4; i++){
  355. assert_node_looks_ok(getChildPtr(pNode, WORKING_VERSION, i), nHeight-1);
  356. }
  357. }
  358. }
  359. }
  360. /*
  361. ** Run various assert() statements to check that the working-version of the
  362. ** tree is correct in the following respects:
  363. **
  364. ** * todo...
  365. */
  366. void assert_tree_looks_ok(int rc, Tree *pTree){
  367. }
  368. #else
  369. # define assert_tree_looks_ok(x,y)
  370. #endif
  371. void lsmFlagsToString(int flags, char *zFlags){
  372. zFlags[0] = (flags & LSM_END_DELETE) ? ']' : '.';
  373. /* Only one of LSM_POINT_DELETE, LSM_INSERT and LSM_SEPARATOR should ever
  374. ** be set. If this is not true, write a '?' to the output. */
  375. switch( flags & (LSM_POINT_DELETE|LSM_INSERT|LSM_SEPARATOR) ){
  376. case 0: zFlags[1] = '.'; break;
  377. case LSM_POINT_DELETE: zFlags[1] = '-'; break;
  378. case LSM_INSERT: zFlags[1] = '+'; break;
  379. case LSM_SEPARATOR: zFlags[1] = '^'; break;
  380. default: zFlags[1] = '?'; break;
  381. }
  382. zFlags[2] = (flags & LSM_SYSTEMKEY) ? '*' : '.';
  383. zFlags[3] = (flags & LSM_START_DELETE) ? '[' : '.';
  384. zFlags[4] = '\0';
  385. }
  386. #ifdef LSM_DEBUG
  387. /*
  388. ** Pointer pBlob points to a buffer containing a blob of binary data
  389. ** nBlob bytes long. Append the contents of this blob to *pStr, with
  390. ** each octet represented by a 2-digit hexadecimal number. For example,
  391. ** if the input blob is three bytes in size and contains {0x01, 0x44, 0xFF},
  392. ** then "0144ff" is appended to *pStr.
  393. */
  394. static void lsmAppendStrBlob(LsmString *pStr, void *pBlob, int nBlob){
  395. int i;
  396. lsmStringExtend(pStr, nBlob*2);
  397. if( pStr->nAlloc==0 ) return;
  398. for(i=0; i<nBlob; i++){
  399. u8 c = ((u8*)pBlob)[i];
  400. if( c>='a' && c<='z' ){
  401. pStr->z[pStr->n++] = c;
  402. }else if( c!=0 || nBlob==1 || i!=(nBlob-1) ){
  403. pStr->z[pStr->n++] = "0123456789abcdef"[(c>>4)&0xf];
  404. pStr->z[pStr->n++] = "0123456789abcdef"[c&0xf];
  405. }
  406. }
  407. pStr->z[pStr->n] = 0;
  408. }
  409. #if 0 /* NOT USED */
  410. /*
  411. ** Append nIndent space (0x20) characters to string *pStr.
  412. */
  413. static void lsmAppendIndent(LsmString *pStr, int nIndent){
  414. int i;
  415. lsmStringExtend(pStr, nIndent);
  416. for(i=0; i<nIndent; i++) lsmStringAppend(pStr, " ", 1);
  417. }
  418. #endif
  419. static void strAppendFlags(LsmString *pStr, u8 flags){
  420. char zFlags[8];
  421. lsmFlagsToString(flags, zFlags);
  422. zFlags[4] = ':';
  423. lsmStringAppend(pStr, zFlags, 5);
  424. }
  425. void dump_node_contents(
  426. lsm_db *pDb,
  427. u32 iNode, /* Print out the contents of this node */
  428. char *zPath, /* Path from root to this node */
  429. int nPath, /* Number of bytes in zPath */
  430. int nHeight /* Height: (0==leaf) (1==parent-of-leaf) */
  431. ){
  432. const char *zSpace = " ";
  433. int i;
  434. int rc = LSM_OK;
  435. LsmString s;
  436. TreeNode *pNode;
  437. TreeBlob b = {0, 0};
  438. pNode = (TreeNode *)treeShmptr(pDb, iNode);
  439. if( nHeight==0 ){
  440. /* Append the nIndent bytes of space to string s. */
  441. lsmStringInit(&s, pDb->pEnv);
  442. /* Append each key to string s. */
  443. for(i=0; i<3; i++){
  444. u32 iPtr = pNode->aiKeyPtr[i];
  445. if( iPtr ){
  446. TreeKey *pKey = treeShmkey(pDb, pNode->aiKeyPtr[i],TKV_LOADKEY, &b,&rc);
  447. strAppendFlags(&s, pKey->flags);
  448. lsmAppendStrBlob(&s, TKV_KEY(pKey), pKey->nKey);
  449. lsmStringAppend(&s, " ", -1);
  450. }
  451. }
  452. printf("% 6d %.*sleaf%.*s: %s\n",
  453. iNode, nPath, zPath, 20-nPath-4, zSpace, s.z
  454. );
  455. lsmStringClear(&s);
  456. }else{
  457. for(i=0; i<4 && nHeight>0; i++){
  458. u32 iPtr = getChildPtr(pNode, pDb->treehdr.root.iTransId, i);
  459. zPath[nPath] = (char)(i+'0');
  460. zPath[nPath+1] = '/';
  461. if( iPtr ){
  462. dump_node_contents(pDb, iPtr, zPath, nPath+2, nHeight-1);
  463. }
  464. if( i!=3 && pNode->aiKeyPtr[i] ){
  465. TreeKey *pKey = treeShmkey(pDb, pNode->aiKeyPtr[i], TKV_LOADKEY,&b,&rc);
  466. lsmStringInit(&s, pDb->pEnv);
  467. strAppendFlags(&s, pKey->flags);
  468. lsmAppendStrBlob(&s, TKV_KEY(pKey), pKey->nKey);
  469. printf("% 6d %.*s%.*s: %s\n",
  470. iNode, nPath+1, zPath, 20-nPath-1, zSpace, s.z);
  471. lsmStringClear(&s);
  472. }
  473. }
  474. }
  475. tblobFree(pDb, &b);
  476. }
  477. void dump_tree_contents(lsm_db *pDb, const char *zCaption){
  478. char zPath[64];
  479. TreeRoot *p = &pDb->treehdr.root;
  480. printf("\n%s\n", zCaption);
  481. zPath[0] = '/';
  482. if( p->iRoot ){
  483. dump_node_contents(pDb, p->iRoot, zPath, 1, p->nHeight-1);
  484. }
  485. fflush(stdout);
  486. }
  487. #endif
  488. /*
  489. ** Initialize a cursor object, the space for which has already been
  490. ** allocated.
  491. */
  492. static void treeCursorInit(lsm_db *pDb, int bOld, TreeCursor *pCsr){
  493. memset(pCsr, 0, sizeof(TreeCursor));
  494. pCsr->pDb = pDb;
  495. if( bOld ){
  496. pCsr->pRoot = &pDb->treehdr.oldroot;
  497. }else{
  498. pCsr->pRoot = &pDb->treehdr.root;
  499. }
  500. pCsr->iNode = -1;
  501. }
  502. /*
  503. ** Return a pointer to the mapping of the TreeKey object that the cursor
  504. ** is pointing to.
  505. */
  506. static TreeKey *csrGetKey(TreeCursor *pCsr, TreeBlob *pBlob, int *pRc){
  507. TreeKey *pRet;
  508. lsm_db *pDb = pCsr->pDb;
  509. u32 iPtr = pCsr->apTreeNode[pCsr->iNode]->aiKeyPtr[pCsr->aiCell[pCsr->iNode]];
  510. assert( iPtr );
  511. pRet = (TreeKey*)treeShmptrUnsafe(pDb, iPtr);
  512. if( !(pRet->flags & LSM_CONTIGUOUS) ){
  513. pRet = treeShmkey(pDb, iPtr, TKV_LOADVAL, pBlob, pRc);
  514. }
  515. return pRet;
  516. }
  517. /*
  518. ** Save the current position of tree cursor pCsr.
  519. */
  520. int lsmTreeCursorSave(TreeCursor *pCsr){
  521. int rc = LSM_OK;
  522. if( pCsr && pCsr->pSave==0 ){
  523. int iNode = pCsr->iNode;
  524. if( iNode>=0 ){
  525. pCsr->pSave = csrGetKey(pCsr, &pCsr->blob, &rc);
  526. }
  527. pCsr->iNode = -1;
  528. }
  529. return rc;
  530. }
  531. /*
  532. ** Restore the position of a saved tree cursor.
  533. */
  534. static int treeCursorRestore(TreeCursor *pCsr, int *pRes){
  535. int rc = LSM_OK;
  536. if( pCsr->pSave ){
  537. TreeKey *pKey = pCsr->pSave;
  538. pCsr->pSave = 0;
  539. if( pRes ){
  540. rc = lsmTreeCursorSeek(pCsr, TKV_KEY(pKey), pKey->nKey, pRes);
  541. }
  542. }
  543. return rc;
  544. }
  545. /*
  546. ** Allocate nByte bytes of space within the *-shm file. If successful,
  547. ** return LSM_OK and set *piPtr to the offset within the file at which
  548. ** the allocated space is located.
  549. */
  550. static u32 treeShmalloc(lsm_db *pDb, int bAlign, int nByte, int *pRc){
  551. u32 iRet = 0;
  552. if( *pRc==LSM_OK ){
  553. const static int CHUNK_SIZE = LSM_SHM_CHUNK_SIZE;
  554. const static int CHUNK_HDR = LSM_SHM_CHUNK_HDR;
  555. u32 iWrite; /* Current write offset */
  556. u32 iEof; /* End of current chunk */
  557. int iChunk; /* Current chunk */
  558. assert( nByte <= (CHUNK_SIZE-CHUNK_HDR) );
  559. /* Check if there is enough space on the current chunk to fit the
  560. ** new allocation. If not, link in a new chunk and put the new
  561. ** allocation at the start of it. */
  562. iWrite = pDb->treehdr.iWrite;
  563. if( bAlign ){
  564. iWrite = (iWrite + 3) & ~0x0003;
  565. assert( (iWrite % 4)==0 );
  566. }
  567. assert( iWrite );
  568. iChunk = treeOffsetToChunk(iWrite-1);
  569. iEof = (iChunk+1) * CHUNK_SIZE;
  570. assert( iEof>=iWrite && (iEof-iWrite)<(u32)CHUNK_SIZE );
  571. if( (iWrite+nByte)>iEof ){
  572. ShmChunk *pHdr; /* Header of chunk just finished (iChunk) */
  573. ShmChunk *pFirst; /* Header of chunk treehdr.iFirst */
  574. ShmChunk *pNext; /* Header of new chunk */
  575. int iNext = 0; /* Next chunk */
  576. int rc = LSM_OK;
  577. pFirst = treeShmChunk(pDb, pDb->treehdr.iFirst);
  578. assert( shm_sequence_ge(pDb->treehdr.iUsedShmid, pFirst->iShmid) );
  579. assert( (pDb->treehdr.iNextShmid+1-pDb->treehdr.nChunk)==pFirst->iShmid );
  580. /* Check if the chunk at the start of the linked list is still in
  581. ** use. If not, reuse it. If so, allocate a new chunk by appending
  582. ** to the *-shm file. */
  583. if( pDb->treehdr.iUsedShmid!=pFirst->iShmid ){
  584. int bInUse;
  585. rc = lsmTreeInUse(pDb, pFirst->iShmid, &bInUse);
  586. if( rc!=LSM_OK ){
  587. *pRc = rc;
  588. return 0;
  589. }
  590. if( bInUse==0 ){
  591. iNext = pDb->treehdr.iFirst;
  592. pDb->treehdr.iFirst = pFirst->iNext;
  593. assert( pDb->treehdr.iFirst );
  594. }
  595. }
  596. if( iNext==0 ) iNext = pDb->treehdr.nChunk++;
  597. /* Set the header values for the new chunk */
  598. pNext = treeShmChunkRc(pDb, iNext, &rc);
  599. if( pNext ){
  600. pNext->iNext = 0;
  601. pNext->iShmid = (pDb->treehdr.iNextShmid++);
  602. }else{
  603. *pRc = rc;
  604. return 0;
  605. }
  606. /* Set the header values for the chunk just finished */
  607. pHdr = (ShmChunk *)treeShmptr(pDb, iChunk*CHUNK_SIZE);
  608. pHdr->iNext = iNext;
  609. /* Advance to the next chunk */
  610. iWrite = iNext * CHUNK_SIZE + CHUNK_HDR;
  611. }
  612. /* Allocate space at iWrite. */
  613. iRet = iWrite;
  614. pDb->treehdr.iWrite = iWrite + nByte;
  615. pDb->treehdr.root.nByte += nByte;
  616. }
  617. return iRet;
  618. }
  619. /*
  620. ** Allocate and zero nByte bytes of space within the *-shm file.
  621. */
  622. static void *treeShmallocZero(lsm_db *pDb, int nByte, u32 *piPtr, int *pRc){
  623. u32 iPtr;
  624. void *p;
  625. iPtr = treeShmalloc(pDb, 1, nByte, pRc);
  626. p = treeShmptr(pDb, iPtr);
  627. if( p ){
  628. assert( *pRc==LSM_OK );
  629. memset(p, 0, nByte);
  630. *piPtr = iPtr;
  631. }
  632. return p;
  633. }
  634. static TreeNode *newTreeNode(lsm_db *pDb, u32 *piPtr, int *pRc){
  635. return treeShmallocZero(pDb, sizeof(TreeNode), piPtr, pRc);
  636. }
  637. static TreeLeaf *newTreeLeaf(lsm_db *pDb, u32 *piPtr, int *pRc){
  638. return treeShmallocZero(pDb, sizeof(TreeLeaf), piPtr, pRc);
  639. }
  640. static TreeKey *newTreeKey(
  641. lsm_db *pDb,
  642. u32 *piPtr,
  643. void *pKey, int nKey, /* Key data */
  644. void *pVal, int nVal, /* Value data (or nVal<0 for delete) */
  645. int *pRc
  646. ){
  647. TreeKey *p;
  648. u32 iPtr;
  649. u32 iEnd;
  650. int nRem;
  651. u8 *a;
  652. int n;
  653. /* Allocate space for the TreeKey structure itself */
  654. *piPtr = iPtr = treeShmalloc(pDb, 1, sizeof(TreeKey), pRc);
  655. p = treeShmptr(pDb, iPtr);
  656. if( *pRc ) return 0;
  657. p->nKey = nKey;
  658. p->nValue = nVal;
  659. /* Allocate and populate the space required for the key and value. */
  660. n = nRem = nKey;
  661. a = (u8 *)pKey;
  662. while( a ){
  663. while( nRem>0 ){
  664. u8 *aAlloc;
  665. int nAlloc;
  666. u32 iWrite;
  667. iWrite = (pDb->treehdr.iWrite & (LSM_SHM_CHUNK_SIZE-1));
  668. iWrite = LSM_MAX(iWrite, LSM_SHM_CHUNK_HDR);
  669. nAlloc = LSM_MIN((LSM_SHM_CHUNK_SIZE-iWrite), (u32)nRem);
  670. aAlloc = treeShmptr(pDb, treeShmalloc(pDb, 0, nAlloc, pRc));
  671. if( aAlloc==0 ) break;
  672. memcpy(aAlloc, &a[n-nRem], nAlloc);
  673. nRem -= nAlloc;
  674. }
  675. a = pVal;
  676. n = nRem = nVal;
  677. pVal = 0;
  678. }
  679. iEnd = iPtr + sizeof(TreeKey) + nKey + LSM_MAX(0, nVal);
  680. if( (iPtr & ~(LSM_SHM_CHUNK_SIZE-1))!=(iEnd & ~(LSM_SHM_CHUNK_SIZE-1)) ){
  681. p->flags = 0;
  682. }else{
  683. p->flags = LSM_CONTIGUOUS;
  684. }
  685. if( *pRc ) return 0;
  686. #if 0
  687. printf("store: %d %s\n", (int)iPtr, (char *)pKey);
  688. #endif
  689. return p;
  690. }
  691. static TreeNode *copyTreeNode(
  692. lsm_db *pDb,
  693. TreeNode *pOld,
  694. u32 *piNew,
  695. int *pRc
  696. ){
  697. TreeNode *pNew;
  698. pNew = newTreeNode(pDb, piNew, pRc);
  699. if( pNew ){
  700. memcpy(pNew->aiKeyPtr, pOld->aiKeyPtr, sizeof(pNew->aiKeyPtr));
  701. memcpy(pNew->aiChildPtr, pOld->aiChildPtr, sizeof(pNew->aiChildPtr));
  702. if( pOld->iV2 ) pNew->aiChildPtr[pOld->iV2Child] = pOld->iV2Ptr;
  703. }
  704. return pNew;
  705. }
  706. static TreeNode *copyTreeLeaf(
  707. lsm_db *pDb,
  708. TreeLeaf *pOld,
  709. u32 *piNew,
  710. int *pRc
  711. ){
  712. TreeLeaf *pNew;
  713. pNew = newTreeLeaf(pDb, piNew, pRc);
  714. if( pNew ){
  715. memcpy(pNew, pOld, sizeof(TreeLeaf));
  716. }
  717. return (TreeNode *)pNew;
  718. }
  719. /*
  720. ** The tree cursor passed as the second argument currently points to an
  721. ** internal node (not a leaf). Specifically, to a sub-tree pointer. This
  722. ** function replaces the sub-tree that the cursor currently points to
  723. ** with sub-tree pNew.
  724. **
  725. ** The sub-tree may be replaced either by writing the "v2 data" on the
  726. ** internal node, or by allocating a new TreeNode structure and then
  727. ** calling this function on the parent of the internal node.
  728. */
  729. static int treeUpdatePtr(lsm_db *pDb, TreeCursor *pCsr, u32 iNew){
  730. int rc = LSM_OK;
  731. if( pCsr->iNode<0 ){
  732. /* iNew is the new root node */
  733. pDb->treehdr.root.iRoot = iNew;
  734. }else{
  735. /* If this node already has version 2 content, allocate a copy and
  736. ** update the copy with the new pointer value. Otherwise, store the
  737. ** new pointer as v2 data within the current node structure. */
  738. TreeNode *p; /* The node to be modified */
  739. int iChildPtr; /* apChild[] entry to modify */
  740. p = pCsr->apTreeNode[pCsr->iNode];
  741. iChildPtr = pCsr->aiCell[pCsr->iNode];
  742. if( p->iV2 ){
  743. /* The "allocate new TreeNode" option */
  744. u32 iCopy;
  745. TreeNode *pCopy;
  746. pCopy = copyTreeNode(pDb, p, &iCopy, &rc);
  747. if( pCopy ){
  748. assert( rc==LSM_OK );
  749. pCopy->aiChildPtr[iChildPtr] = iNew;
  750. pCsr->iNode--;
  751. rc = treeUpdatePtr(pDb, pCsr, iCopy);
  752. }
  753. }else{
  754. /* The "v2 data" option */
  755. u32 iPtr;
  756. assert( pDb->treehdr.root.iTransId>0 );
  757. if( pCsr->iNode ){
  758. iPtr = getChildPtr(
  759. pCsr->apTreeNode[pCsr->iNode-1],
  760. pDb->treehdr.root.iTransId, pCsr->aiCell[pCsr->iNode-1]
  761. );
  762. }else{
  763. iPtr = pDb->treehdr.root.iRoot;
  764. }
  765. rc = intArrayAppend(pDb->pEnv, &pDb->rollback, iPtr);
  766. if( rc==LSM_OK ){
  767. p->iV2 = pDb->treehdr.root.iTransId;
  768. p->iV2Child = (u8)iChildPtr;
  769. p->iV2Ptr = iNew;
  770. }
  771. }
  772. }
  773. return rc;
  774. }
  775. /*
  776. ** Cursor pCsr points at a node that is part of pTree. This function
  777. ** inserts a new key and optionally child node pointer into that node.
  778. **
  779. ** The position into which the new key and pointer are inserted is
  780. ** determined by the iSlot parameter. The new key will be inserted to
  781. ** the left of the key currently stored in apKey[iSlot]. Or, if iSlot is
  782. ** greater than the index of the rightmost key in the node.
  783. **
  784. ** Pointer pLeftPtr points to a child tree that contains keys that are
  785. ** smaller than pTreeKey.
  786. */
  787. static int treeInsert(
  788. lsm_db *pDb, /* Database handle */
  789. TreeCursor *pCsr, /* Cursor indicating path to insert at */
  790. u32 iLeftPtr, /* Left child pointer */
  791. u32 iTreeKey, /* Location of key to insert */
  792. u32 iRightPtr, /* Right child pointer */
  793. int iSlot /* Position to insert key into */
  794. ){
  795. int rc = LSM_OK;
  796. TreeNode *pNode = pCsr->apTreeNode[pCsr->iNode];
  797. /* Check if the node is currently full. If so, split pNode in two and
  798. ** call this function recursively to add a key to the parent. Otherwise,
  799. ** insert the new key directly into pNode. */
  800. assert( pNode->aiKeyPtr[1] );
  801. if( pNode->aiKeyPtr[0] && pNode->aiKeyPtr[2] ){
  802. u32 iLeft; TreeNode *pLeft; /* New left-hand sibling node */
  803. u32 iRight; TreeNode *pRight; /* New right-hand sibling node */
  804. pLeft = newTreeNode(pDb, &iLeft, &rc);
  805. pRight = newTreeNode(pDb, &iRight, &rc);
  806. if( rc ) return rc;
  807. pLeft->aiChildPtr[1] = getChildPtr(pNode, WORKING_VERSION, 0);
  808. pLeft->aiKeyPtr[1] = pNode->aiKeyPtr[0];
  809. pLeft->aiChildPtr[2] = getChildPtr(pNode, WORKING_VERSION, 1);
  810. pRight->aiChildPtr[1] = getChildPtr(pNode, WORKING_VERSION, 2);
  811. pRight->aiKeyPtr[1] = pNode->aiKeyPtr[2];
  812. pRight->aiChildPtr[2] = getChildPtr(pNode, WORKING_VERSION, 3);
  813. if( pCsr->iNode==0 ){
  814. /* pNode is the root of the tree. Grow the tree by one level. */
  815. u32 iRoot; TreeNode *pRoot; /* New root node */
  816. pRoot = newTreeNode(pDb, &iRoot, &rc);
  817. pRoot->aiKeyPtr[1] = pNode->aiKeyPtr[1];
  818. pRoot->aiChildPtr[1] = iLeft;
  819. pRoot->aiChildPtr[2] = iRight;
  820. pDb->treehdr.root.iRoot = iRoot;
  821. pDb->treehdr.root.nHeight++;
  822. }else{
  823. pCsr->iNode--;
  824. rc = treeInsert(pDb, pCsr,
  825. iLeft, pNode->aiKeyPtr[1], iRight, pCsr->aiCell[pCsr->iNode]
  826. );
  827. }
  828. assert( pLeft->iV2==0 );
  829. assert( pRight->iV2==0 );
  830. switch( iSlot ){
  831. case 0:
  832. pLeft->aiKeyPtr[0] = iTreeKey;
  833. pLeft->aiChildPtr[0] = iLeftPtr;
  834. if( iRightPtr ) pLeft->aiChildPtr[1] = iRightPtr;
  835. break;
  836. case 1:
  837. pLeft->aiChildPtr[3] = (iRightPtr ? iRightPtr : pLeft->aiChildPtr[2]);
  838. pLeft->aiKeyPtr[2] = iTreeKey;
  839. pLeft->aiChildPtr[2] = iLeftPtr;
  840. break;
  841. case 2:
  842. pRight->aiKeyPtr[0] = iTreeKey;
  843. pRight->aiChildPtr[0] = iLeftPtr;
  844. if( iRightPtr ) pRight->aiChildPtr[1] = iRightPtr;
  845. break;
  846. case 3:
  847. pRight->aiChildPtr[3] = (iRightPtr ? iRightPtr : pRight->aiChildPtr[2]);
  848. pRight->aiKeyPtr[2] = iTreeKey;
  849. pRight->aiChildPtr[2] = iLeftPtr;
  850. break;
  851. }
  852. }else{
  853. TreeNode *pNew;
  854. u32 *piKey;
  855. u32 *piChild;
  856. u32 iStore = 0;
  857. u32 iNew = 0;
  858. int i;
  859. /* Allocate a new version of node pNode. */
  860. pNew = newTreeNode(pDb, &iNew, &rc);
  861. if( rc ) return rc;
  862. piKey = pNew->aiKeyPtr;
  863. piChild = pNew->aiChildPtr;
  864. for(i=0; i<iSlot; i++){
  865. if( pNode->aiKeyPtr[i] ){
  866. *(piKey++) = pNode->aiKeyPtr[i];
  867. *(piChild++) = getChildPtr(pNode, WORKING_VERSION, i);
  868. }
  869. }
  870. *piKey++ = iTreeKey;
  871. *piChild++ = iLeftPtr;
  872. iStore = iRightPtr;
  873. for(i=iSlot; i<3; i++){
  874. if( pNode->aiKeyPtr[i] ){
  875. *(piKey++) = pNode->aiKeyPtr[i];
  876. *(piChild++) = iStore ? iStore : getChildPtr(pNode, WORKING_VERSION, i);
  877. iStore = 0;
  878. }
  879. }
  880. if( iStore ){
  881. *piChild = iStore;
  882. }else{
  883. *piChild = getChildPtr(pNode, WORKING_VERSION,
  884. (pNode->aiKeyPtr[2] ? 3 : 2)
  885. );
  886. }
  887. pCsr->iNode--;
  888. rc = treeUpdatePtr(pDb, pCsr, iNew);
  889. }
  890. return rc;
  891. }
  892. static int treeInsertLeaf(
  893. lsm_db *pDb, /* Database handle */
  894. TreeCursor *pCsr, /* Cursor structure */
  895. u32 iTreeKey, /* Key pointer to insert */
  896. int iSlot /* Insert key to the left of this */
  897. ){
  898. int rc = LSM_OK; /* Return code */
  899. TreeNode *pLeaf = pCsr->apTreeNode[pCsr->iNode];
  900. TreeLeaf *pNew;
  901. u32 iNew;
  902. assert( iSlot>=0 && iSlot<=4 );
  903. assert( pCsr->iNode>0 );
  904. assert( pLeaf->aiKeyPtr[1] );
  905. pCsr->iNode--;
  906. pNew = newTreeLeaf(pDb, &iNew, &rc);
  907. if( pNew ){
  908. if( pLeaf->aiKeyPtr[0] && pLeaf->aiKeyPtr[2] ){
  909. /* The leaf is full. Split it in two. */
  910. TreeLeaf *pRight;
  911. u32 iRight;
  912. pRight = newTreeLeaf(pDb, &iRight, &rc);
  913. if( pRight ){
  914. assert( rc==LSM_OK );
  915. pNew->aiKeyPtr[1] = pLeaf->aiKeyPtr[0];
  916. pRight->aiKeyPtr[1] = pLeaf->aiKeyPtr[2];
  917. switch( iSlot ){
  918. case 0: pNew->aiKeyPtr[0] = iTreeKey; break;
  919. case 1: pNew->aiKeyPtr[2] = iTreeKey; break;
  920. case 2: pRight->aiKeyPtr[0] = iTreeKey; break;
  921. case 3: pRight->aiKeyPtr[2] = iTreeKey; break;
  922. }
  923. rc = treeInsert(pDb, pCsr, iNew, pLeaf->aiKeyPtr[1], iRight,
  924. pCsr->aiCell[pCsr->iNode]
  925. );
  926. }
  927. }else{
  928. int iOut = 0;
  929. int i;
  930. for(i=0; i<4; i++){
  931. if( i==iSlot ) pNew->aiKeyPtr[iOut++] = iTreeKey;
  932. if( i<3 && pLeaf->aiKeyPtr[i] ){
  933. pNew->aiKeyPtr[iOut++] = pLeaf->aiKeyPtr[i];
  934. }
  935. }
  936. rc = treeUpdatePtr(pDb, pCsr, iNew);
  937. }
  938. }
  939. return rc;
  940. }
  941. void lsmTreeMakeOld(lsm_db *pDb){
  942. /* A write transaction must be open. Otherwise the code below that
  943. ** assumes (pDb->pClient->iLogOff) is current may malfunction.
  944. **
  945. ** Update: currently this assert fails due to lsm_flush(), which does
  946. ** not set nTransOpen.
  947. */
  948. assert( /* pDb->nTransOpen>0 && */ pDb->iReader>=0 );
  949. if( pDb->treehdr.iOldShmid==0 ){
  950. pDb->treehdr.iOldLog = (pDb->treehdr.log.aRegion[2].iEnd << 1);
  951. pDb->treehdr.iOldLog |= (~(pDb->pClient->iLogOff) & (i64)0x0001);
  952. pDb->treehdr.oldcksum0 = pDb->treehdr.log.cksum0;
  953. pDb->treehdr.oldcksum1 = pDb->treehdr.log.cksum1;
  954. pDb->treehdr.iOldShmid = pDb->treehdr.iNextShmid-1;
  955. memcpy(&pDb->treehdr.oldroot, &pDb->treehdr.root, sizeof(TreeRoot));
  956. pDb->treehdr.root.iTransId = 1;
  957. pDb->treehdr.root.iRoot = 0;
  958. pDb->treehdr.root.nHeight = 0;
  959. pDb->treehdr.root.nByte = 0;
  960. }
  961. }
  962. void lsmTreeDiscardOld(lsm_db *pDb){
  963. assert( lsmShmAssertLock(pDb, LSM_LOCK_WRITER, LSM_LOCK_EXCL)
  964. || lsmShmAssertLock(pDb, LSM_LOCK_DMS2, LSM_LOCK_EXCL)
  965. );
  966. pDb->treehdr.iUsedShmid = pDb->treehdr.iOldShmid;
  967. pDb->treehdr.iOldShmid = 0;
  968. }
  969. int lsmTreeHasOld(lsm_db *pDb){
  970. return pDb->treehdr.iOldShmid!=0;
  971. }
  972. /*
  973. ** This function is called during recovery to initialize the
  974. ** tree header. Only the database connections private copy of the tree-header
  975. ** is initialized here - it will be copied into shared memory if log file
  976. ** recovery is successful.
  977. */
  978. int lsmTreeInit(lsm_db *pDb){
  979. ShmChunk *pOne;
  980. int rc = LSM_OK;
  981. memset(&pDb->treehdr, 0, sizeof(TreeHeader));
  982. pDb->treehdr.root.iTransId = 1;
  983. pDb->treehdr.iFirst = 1;
  984. pDb->treehdr.nChunk = 2;
  985. pDb->treehdr.iWrite = LSM_SHM_CHUNK_SIZE + LSM_SHM_CHUNK_HDR;
  986. pDb->treehdr.iNextShmid = 2;
  987. pDb->treehdr.iUsedShmid = 1;
  988. pOne = treeShmChunkRc(pDb, 1, &rc);
  989. if( pOne ){
  990. pOne->iNext = 0;
  991. pOne->iShmid = 1;
  992. }
  993. return rc;
  994. }
  995. static void treeHeaderChecksum(
  996. TreeHeader *pHdr,
  997. u32 *aCksum
  998. ){
  999. u32 cksum1 = 0x12345678;
  1000. u32 cksum2 = 0x9ABCDEF0;
  1001. u32 *a = (u32 *)pHdr;
  1002. int i;
  1003. assert( (offsetof(TreeHeader, aCksum) + sizeof(u32)*2)==sizeof(TreeHeader) );
  1004. assert( (sizeof(TreeHeader) % (sizeof(u32)*2))==0 );
  1005. for(i=0; i<(offsetof(TreeHeader, aCksum) / sizeof(u32)); i+=2){
  1006. cksum1 += a[i];
  1007. cksum2 += (cksum1 + a[i+1]);
  1008. }
  1009. aCksum[0] = cksum1;
  1010. aCksum[1] = cksum2;
  1011. }
  1012. /*
  1013. ** Return true if the checksum stored in TreeHeader object *pHdr is
  1014. ** consistent with the contents of its other fields.
  1015. */
  1016. static int treeHeaderChecksumOk(TreeHeader *pHdr){
  1017. u32 aCksum[2];
  1018. treeHeaderChecksum(pHdr, aCksum);
  1019. return (0==memcmp(aCksum, pHdr->aCksum, sizeof(aCksum)));
  1020. }
  1021. /*
  1022. ** This type is used by functions lsmTreeRepair() and treeSortByShmid() to
  1023. ** make relinking the linked list of shared-memory chunks easier.
  1024. */
  1025. typedef struct ShmChunkLoc ShmChunkLoc;
  1026. struct ShmChunkLoc {
  1027. ShmChunk *pShm;
  1028. u32 iLoc;
  1029. };
  1030. /*
  1031. ** This function checks that the linked list of shared memory chunks
  1032. ** that starts at chunk db->treehdr.iFirst:
  1033. **
  1034. ** 1) Includes all chunks in the shared-memory region, and
  1035. ** 2) Links them together in order of ascending shm-id.
  1036. **
  1037. ** If no error occurs and the conditions above are met, LSM_OK is returned.
  1038. **
  1039. ** If either of the conditions are untrue, LSM_CORRUPT is returned. Or, if
  1040. ** an error is encountered before the checks are completed, another LSM error
  1041. ** code (i.e. LSM_IOERR or LSM_NOMEM) may be returned.
  1042. */
  1043. static int treeCheckLinkedList(lsm_db *db){
  1044. int rc = LSM_OK;
  1045. int nVisit = 0;
  1046. ShmChunk *p;
  1047. p = treeShmChunkRc(db, db->treehdr.iFirst, &rc);
  1048. while( rc==LSM_OK && p ){
  1049. if( p->iNext ){
  1050. if( p->iNext>=db->treehdr.nChunk ){
  1051. rc = LSM_CORRUPT_BKPT;
  1052. }else{
  1053. ShmChunk *pNext = treeShmChunkRc(db, p->iNext, &rc);
  1054. if( rc==LSM_OK ){
  1055. if( pNext->iShmid!=p->iShmid+1 ){
  1056. rc = LSM_CORRUPT_BKPT;
  1057. }
  1058. p = pNext;
  1059. }
  1060. }
  1061. }else{
  1062. p = 0;
  1063. }
  1064. nVisit++;
  1065. }
  1066. if( rc==LSM_OK && (u32)nVisit!=db->treehdr.nChunk-1 ){
  1067. rc = LSM_CORRUPT_BKPT;
  1068. }
  1069. return rc;
  1070. }
  1071. /*
  1072. ** Iterate through the current in-memory tree. If there are any v2-pointers
  1073. ** with transaction ids larger than db->treehdr.iTransId, zero them.
  1074. */
  1075. static int treeRepairPtrs(lsm_db *db){
  1076. int rc = LSM_OK;
  1077. if( db->treehdr.root.nHeight>1 ){
  1078. TreeCursor csr; /* Cursor used to iterate through tree */
  1079. u32 iTransId = db->treehdr.root.iTransId;
  1080. /* Initialize the cursor structure. Also decrement the nHeight variable
  1081. ** in the tree-header. This will prevent the cursor from visiting any
  1082. ** leaf nodes. */
  1083. db->treehdr.root.nHeight--;
  1084. treeCursorInit(db, 0, &csr);
  1085. rc = lsmTreeCursorEnd(&csr, 0);
  1086. while( rc==LSM_OK && lsmTreeCursorValid(&csr) ){
  1087. TreeNode *pNode = csr.apTreeNode[csr.iNode];
  1088. if( pNode->iV2>iTransId ){
  1089. pNode->iV2Child = 0;
  1090. pNode->iV2Ptr = 0;
  1091. pNode->iV2 = 0;
  1092. }
  1093. rc = lsmTreeCursorNext(&csr);
  1094. }
  1095. tblobFree(csr.pDb, &csr.blob);
  1096. db->treehdr.root.nHeight++;
  1097. }
  1098. return rc;
  1099. }
  1100. static int treeRepairList(lsm_db *db){
  1101. int rc = LSM_OK;
  1102. int i;
  1103. ShmChunk *p;
  1104. ShmChunk *pMin = 0;
  1105. u32 iMin = 0;
  1106. /* Iterate through all shm chunks. Find the smallest shm-id present in
  1107. ** the shared-memory region. */
  1108. for(i=1; rc==LSM_OK && (u32)i<db->treehdr.nChunk; i++){
  1109. p = treeShmChunkRc(db, i, &rc);
  1110. if( p && (pMin==0 || shm_sequence_ge(pMin->iShmid, p->iShmid)) ){
  1111. pMin = p;
  1112. iMin = i;
  1113. }
  1114. }
  1115. /* Fix the shm-id values on any chunks with a shm-id greater than or
  1116. ** equal to treehdr.iNextShmid. Then do a merge-sort of all chunks to
  1117. ** fix the ShmChunk.iNext pointers.
  1118. */
  1119. if( rc==LSM_OK ){
  1120. int nSort;
  1121. int nByte;
  1122. u32 iPrevShmid;
  1123. ShmChunkLoc *aSort;
  1124. /* Allocate space for a merge sort. */
  1125. nSort = 1;
  1126. while( (u32)nSort < (db->treehdr.nChunk-1) ) nSort = nSort * 2;
  1127. nByte = sizeof(ShmChunkLoc) * nSort * 2;
  1128. aSort = lsmMallocZeroRc(db->pEnv, nByte, &rc);
  1129. iPrevShmid = pMin->iShmid;
  1130. /* Fix all shm-ids, if required. */
  1131. if( rc==LSM_OK ){
  1132. iPrevShmid = pMin->iShmid-1;
  1133. for(i=1; (u32)i<db->treehdr.nChunk; i++){
  1134. p = treeShmChunk(db, i);
  1135. aSort[i-1].pShm = p;
  1136. aSort[i-1].iLoc = i;
  1137. if( (u32)i!=db->treehdr.iFirst ){
  1138. if( shm_sequence_ge(p->iShmid, db->treehdr.iNextShmid) ){
  1139. p->iShmid = iPrevShmid--;
  1140. }
  1141. }
  1142. }
  1143. if( iMin!=db->treehdr.iFirst ){
  1144. p = treeShmChunk(db, db->treehdr.iFirst);
  1145. p->iShmid = iPrevShmid;
  1146. }
  1147. }
  1148. if( rc==LSM_OK ){
  1149. ShmChunkLoc *aSpace = &aSort[nSort];
  1150. for(i=0; i<nSort; i++){
  1151. if( aSort[i].pShm ){
  1152. assert( shm_sequence_ge(aSort[i].pShm->iShmid, iPrevShmid) );
  1153. assert( aSpace[aSort[i].pShm->iShmid - iPrevShmid].pShm==0 );
  1154. aSpace[aSort[i].pShm->iShmid - iPrevShmid] = aSort[i];
  1155. }
  1156. }
  1157. if( aSpace[nSort-1].pShm ) aSpace[nSort-1].pShm->iNext = 0;
  1158. for(i=0; i<nSort-1; i++){
  1159. if( aSpace[i].pShm ){
  1160. aSpace[i].pShm->iNext = aSpace[i+1].iLoc;
  1161. }
  1162. }
  1163. rc = treeCheckLinkedList(db);
  1164. lsmFree(db->pEnv, aSort);
  1165. }
  1166. }
  1167. return rc;
  1168. }
  1169. /*
  1170. ** This function is called as part of opening a write-transaction if the
  1171. ** writer-flag is already set - indicating that the previous writer
  1172. ** failed before ending its transaction.
  1173. */
  1174. int lsmTreeRepair(lsm_db *db){
  1175. int rc = LSM_OK;
  1176. TreeHeader hdr;
  1177. ShmHeader *pHdr = db->pShmhdr;
  1178. /* Ensure that the two tree-headers are consistent. Copy one over the other
  1179. ** if necessary. Prefer the data from a tree-header for which the checksum
  1180. ** computes. Or, if they both compute, prefer tree-header-1. */
  1181. if( memcmp(&pHdr->hdr1, &pHdr->hdr2, sizeof(TreeHeader)) ){
  1182. if( treeHeaderChecksumOk(&pHdr->hdr1) ){
  1183. memcpy(&pHdr->hdr2, &pHdr->hdr1, sizeof(TreeHeader));
  1184. }else{
  1185. memcpy(&pHdr->hdr1, &pHdr->hdr2, sizeof(TreeHeader));
  1186. }
  1187. }
  1188. /* Save the connections current copy of the tree-header. It will be
  1189. ** restored before returning. */
  1190. memcpy(&hdr, &db->treehdr, sizeof(TreeHeader));
  1191. /* Walk the tree. Zero any v2 pointers with a transaction-id greater than
  1192. ** the transaction-id currently in the tree-headers. */
  1193. rc = treeRepairPtrs(db);
  1194. /* Repair the linked list of shared-memory chunks. */
  1195. if( rc==LSM_OK ){
  1196. rc = treeRepairList(db);
  1197. }
  1198. memcpy(&db->treehdr, &hdr, sizeof(TreeHeader));
  1199. return rc;
  1200. }
  1201. static void treeOverwriteKey(lsm_db *db, TreeCursor *pCsr, u32 iKey, int *pRc){
  1202. if( *pRc==LSM_OK ){
  1203. TreeRoot *p = &db->treehdr.root;
  1204. TreeNode *pNew;
  1205. u32 iNew;
  1206. TreeNode *pNode = pCsr->apTreeNode[pCsr->iNode];
  1207. int iCell = pCsr->aiCell[pCsr->iNode];
  1208. /* Create a copy of this node */
  1209. if( (pCsr->iNode>0 && (u32)pCsr->iNode==(p->nHeight-1)) ){
  1210. pNew = copyTreeLeaf(db, (TreeLeaf *)pNode, &iNew, pRc);
  1211. }else{
  1212. pNew = copyTreeNode(db, pNode, &iNew, pRc);
  1213. }
  1214. if( pNew ){
  1215. /* Modify the value in the new version */
  1216. pNew->aiKeyPtr[iCell] = iKey;
  1217. /* Change the pointer in the parent (if any) to point at the new
  1218. ** TreeNode */
  1219. pCsr->iNode--;
  1220. treeUpdatePtr(db, pCsr, iNew);
  1221. }
  1222. }
  1223. }
  1224. static int treeNextIsEndDelete(lsm_db *db, TreeCursor *pCsr){
  1225. int iNode = pCsr->iNode;
  1226. int iCell = pCsr->aiCell[iNode]+1;
  1227. /* Cursor currently points to a leaf node. */
  1228. assert( (u32)pCsr->iNode==(db->treehdr.root.nHeight-1) );
  1229. while( iNode>=0 ){
  1230. TreeNode *pNode = pCsr->apTreeNode[iNode];
  1231. if( iCell<3 && pNode->aiKeyPtr[iCell] ){
  1232. int rc = LSM_OK;
  1233. TreeKey *pKey = treeShmptr(db, pNode->aiKeyPtr[iCell]);
  1234. assert( rc==LSM_OK );
  1235. return ((pKey->flags & LSM_END_DELETE) ? 1 : 0);
  1236. }
  1237. iNode--;
  1238. iCell = pCsr->aiCell[iNode];
  1239. }
  1240. return 0;
  1241. }
  1242. static int treePrevIsStartDelete(lsm_db *db, TreeCursor *pCsr){
  1243. int iNode = pCsr->iNode;
  1244. /* Cursor currently points to a leaf node. */
  1245. assert( (u32)pCsr->iNode==(db->treehdr.root.nHeight-1) );
  1246. while( iNode>=0 ){
  1247. TreeNode *pNode = pCsr->apTreeNode[iNode];
  1248. int iCell = pCsr->aiCell[iNode]-1;
  1249. if( iCell>=0 && pNode->aiKeyPtr[iCell] ){
  1250. int rc = LSM_OK;
  1251. TreeKey *pKey = treeShmptr(db, pNode->aiKeyPtr[iCell]);
  1252. assert( rc==LSM_OK );
  1253. return ((pKey->flags & LSM_START_DELETE) ? 1 : 0);
  1254. }
  1255. iNode--;
  1256. }
  1257. return 0;
  1258. }
  1259. static int treeInsertEntry(
  1260. lsm_db *pDb, /* Database handle */
  1261. int flags, /* Flags associated with entry */
  1262. void *pKey, /* Pointer to key data */
  1263. int nKey, /* Size of key data in bytes */
  1264. void *pVal, /* Pointer to value data (or NULL) */
  1265. int nVal /* Bytes in value data (or -ve for delete) */
  1266. ){
  1267. int rc = LSM_OK; /* Return Code */
  1268. TreeKey *pTreeKey; /* New key-value being inserted */
  1269. u32 iTreeKey;
  1270. TreeRoot *p = &pDb->treehdr.root;
  1271. TreeCursor csr; /* Cursor to seek to pKey/nKey */
  1272. int res = 0; /* Result of seek operation on csr */
  1273. assert( nVal>=0 || pVal==0 );
  1274. assert_tree_looks_ok(LSM_OK, pTree);
  1275. assert( flags==LSM_INSERT || flags==LSM_POINT_DELETE
  1276. || flags==LSM_START_DELETE || flags==LSM_END_DELETE
  1277. );
  1278. assert( (flags & LSM_CONTIGUOUS)==0 );
  1279. #if 0
  1280. dump_tree_contents(pDb, "before");
  1281. #endif
  1282. if( p->iRoot ){
  1283. TreeKey *pRes; /* Key at end of seek operation */
  1284. treeCursorInit(pDb, 0, &csr);
  1285. /* Seek to the leaf (or internal node) that the new key belongs on */
  1286. rc = lsmTreeCursorSeek(&csr, pKey, nKey, &res);
  1287. pRes = csrGetKey(&csr, &csr.blob, &rc);
  1288. if( rc!=LSM_OK ) return rc;
  1289. assert( pRes );
  1290. if( flags==LSM_START_DELETE ){
  1291. /* When inserting a start-delete-range entry, if the key that
  1292. ** occurs immediately before the new entry is already a START_DELETE,
  1293. ** then the new entry is not required. */
  1294. if( (res<=0 && (pRes->flags & LSM_START_DELETE))
  1295. || (res>0 && treePrevIsStartDelete(pDb, &csr))
  1296. ){
  1297. goto insert_entry_out;
  1298. }
  1299. }else if( flags==LSM_END_DELETE ){
  1300. /* When inserting an start-delete-range entry, if the key that
  1301. ** occurs immediately after the new entry is already an END_DELETE,
  1302. ** then the new entry is not required. */
  1303. if( (res<0 && treeNextIsEndDelete(pDb, &csr))
  1304. || (res>=0 && (pRes->flags & LSM_END_DELETE))
  1305. ){
  1306. goto insert_entry_out;
  1307. }
  1308. }
  1309. if( res==0 && (flags & (LSM_END_DELETE|LSM_START_DELETE)) ){
  1310. if( pRes->flags & LSM_INSERT ){
  1311. nVal = pRes->nValue;
  1312. pVal = TKV_VAL(pRes);
  1313. }
  1314. flags = flags | pRes->flags;
  1315. }
  1316. if( flags & (LSM_INSERT|LSM_POINT_DELETE) ){
  1317. if( (res<0 && (pRes->flags & LSM_START_DELETE))
  1318. || (res>0 && (pRes->flags & LSM_END_DELETE))
  1319. ){
  1320. flags = flags | (LSM_END_DELETE|LSM_START_DELETE);
  1321. }else if( res==0 ){
  1322. flags = flags | (pRes->flags & (LSM_END_DELETE|LSM_START_DELETE));
  1323. }
  1324. }
  1325. }else{
  1326. memset(&csr, 0, sizeof(TreeCursor));
  1327. }
  1328. /* Allocate and populate a new key-value pair structure */
  1329. pTreeKey = newTreeKey(pDb, &iTreeKey, pKey, nKey, pVal, nVal, &rc);
  1330. if( rc!=LSM_OK ) return rc;
  1331. assert( pTreeKey->flags==0 || pTreeKey->flags==LSM_CONTIGUOUS );
  1332. pTreeKey->flags |= flags;
  1333. if( p->iRoot==0 ){
  1334. /* The tree is completely empty. Add a new root node and install
  1335. ** (pKey/nKey) as the middle entry. Even though it is a leaf at the
  1336. ** moment, use newTreeNode() to allocate the node (i.e. allocate enough
  1337. ** space for the fields used by interior nodes). This is because the
  1338. ** treeInsert() routine may convert this node to an interior node. */
  1339. TreeNode *pRoot = newTreeNode(pDb, &p->iRoot, &rc);
  1340. if( rc==LSM_OK ){
  1341. assert( p->nHeight==0 );
  1342. pRoot->aiKeyPtr[1] = iTreeKey;
  1343. p->nHeight = 1;
  1344. }
  1345. }else{
  1346. if( res==0 ){
  1347. /* The search found a match within the tree. */
  1348. treeOverwriteKey(pDb, &csr, iTreeKey, &rc);
  1349. }else{
  1350. /* The cursor now points to the leaf node into which the new entry should
  1351. ** be inserted. There may or may not be a free slot within the leaf for
  1352. ** the new key-value pair.
  1353. **
  1354. ** iSlot is set to the index of the key within pLeaf that the new key
  1355. ** should be inserted to the left of (or to a value 1 greater than the
  1356. ** index of the rightmost key if the new key is larger than all keys
  1357. ** currently stored in the node).
  1358. */
  1359. int iSlot = csr.aiCell[csr.iNode] + (res<0);
  1360. if( csr.iNode==0 ){
  1361. rc = treeInsert(pDb, &csr, 0, iTreeKey, 0, iSlot);
  1362. }else{
  1363. rc = treeInsertLeaf(pDb, &csr, iTreeKey, iSlot);
  1364. }
  1365. }
  1366. }
  1367. #if 0
  1368. dump_tree_contents(pDb, "after");
  1369. #endif
  1370. insert_entry_out:
  1371. tblobFree(pDb, &csr.blob);
  1372. assert_tree_looks_ok(rc, pTree);
  1373. return rc;
  1374. }
  1375. /*
  1376. ** Insert a new entry into the in-memory tree.
  1377. **
  1378. ** If the value of the 5th parameter, nVal, is negative, then a delete-marker
  1379. ** is inserted into the tree. In this case the value pointer, pVal, must be
  1380. ** NULL.
  1381. */
  1382. int lsmTreeInsert(
  1383. lsm_db *pDb, /* Database handle */
  1384. void *pKey, /* Pointer to key data */
  1385. int nKey, /* Size of key data in bytes */
  1386. void *pVal, /* Pointer to value data (or NULL) */
  1387. int nVal /* Bytes in value data (or -ve for delete) */
  1388. ){
  1389. int flags;
  1390. if( nVal<0 ){
  1391. flags = LSM_POINT_DELETE;
  1392. }else{
  1393. flags = LSM_INSERT;
  1394. }
  1395. return treeInsertEntry(pDb, flags, pKey, nKey, pVal, nVal);
  1396. }
  1397. static int treeDeleteEntry(lsm_db *db, TreeCursor *pCsr, u32 iNewptr){
  1398. TreeRoot *p = &db->treehdr.root;
  1399. TreeNode *pNode = pCsr->apTreeNode[pCsr->iNode];
  1400. int iSlot = pCsr->aiCell[pCsr->iNode];
  1401. int bLeaf;
  1402. int rc = LSM_OK;
  1403. assert( pNode->aiKeyPtr[1] );
  1404. assert( pNode->aiKeyPtr[iSlot] );
  1405. assert( iSlot==0 || iSlot==1 || iSlot==2 );
  1406. assert( ((u32)pCsr->iNode==(db->treehdr.root.nHeight-1))==(iNewptr==0) );
  1407. bLeaf = ((u32)pCsr->iNode==(p->nHeight-1) && p->nHeight>1);
  1408. if( pNode->aiKeyPtr[0] || pNode->aiKeyPtr[2] ){
  1409. /* There are currently at least 2 keys on this node. So just create
  1410. ** a new copy of the node with one of the keys removed. If the node
  1411. ** happens to be the root node of the tree, allocate an entire
  1412. ** TreeNode structure instead of just a TreeLeaf. */
  1413. TreeNode *pNew;
  1414. u32 iNew;
  1415. if( bLeaf ){
  1416. pNew = (TreeNode *)newTreeLeaf(db, &iNew, &rc);
  1417. }else{
  1418. pNew = newTreeNode(db, &iNew, &rc);
  1419. }
  1420. if( pNew ){
  1421. int i;
  1422. int iOut = 1;
  1423. for(i=0; i<4; i++){
  1424. if( i==iSlot ){
  1425. i++;
  1426. if( bLeaf==0 ) pNew->aiChildPtr[iOut] = iNewptr;
  1427. if( i<3 ) pNew->aiKeyPtr[iOut] = pNode->aiKeyPtr[i];
  1428. iOut++;
  1429. }else if( bLeaf || p->nHeight==1 ){
  1430. if( i<3 && pNode->aiKeyPtr[i] ){
  1431. pNew->aiKeyPtr[iOut++] = pNode->aiKeyPtr[i];
  1432. }
  1433. }else{
  1434. if( getChildPtr(pNode, WORKING_VERSION, i) ){
  1435. pNew->aiChildPtr[iOut] = getChildPtr(pNode, WORKING_VERSION, i);
  1436. if( i<3 ) pNew->aiKeyPtr[iOut] = pNode->aiKeyPtr[i];
  1437. iOut++;
  1438. }
  1439. }
  1440. }
  1441. assert( iOut<=4 );
  1442. assert( bLeaf || pNew->aiChildPtr[0]==0 );
  1443. pCsr->iNode--;
  1444. rc = treeUpdatePtr(db, pCsr, iNew);
  1445. }
  1446. }else if( pCsr->iNode==0 ){
  1447. /* Removing the only key in the root node. iNewptr is the new root. */
  1448. assert( iSlot==1 );
  1449. db->treehdr.root.iRoot = iNewptr;
  1450. db->treehdr.root.nHeight--;
  1451. }else{
  1452. /* There is only one key on this node and the node is not the root
  1453. ** node. Find a peer for this node. Then redistribute the contents of
  1454. ** the peer and the parent cell between the parent and either one or
  1455. ** two new nodes. */
  1456. TreeNode *pParent; /* Parent tree node */
  1457. int iPSlot;
  1458. u32 iPeer; /* Pointer to peer leaf node */
  1459. int iDir;
  1460. TreeNode *pPeer; /* The peer leaf node */
  1461. TreeNode *pNew1; u32 iNew1; /* First new leaf node */
  1462. assert( iSlot==1 );
  1463. pParent = pCsr->apTreeNode[pCsr->iNode-1];
  1464. iPSlot = pCsr->aiCell[pCsr->iNode-1];
  1465. if( iPSlot>0 && getChildPtr(pParent, WORKING_VERSION, iPSlot-1) ){
  1466. iDir = -1;
  1467. }else{
  1468. iDir = +1;
  1469. }
  1470. iPeer = getChildPtr(pParent, WORKING_VERSION, iPSlot+iDir);
  1471. pPeer = (TreeNode *)treeShmptr(db, iPeer);
  1472. assertIsWorkingChild(db, pNode, pParent, iPSlot);
  1473. /* Allocate the first new leaf node. This is always required. */
  1474. if( bLeaf ){
  1475. pNew1 = (TreeNode *)newTreeLeaf(db, &iNew1, &rc);
  1476. }else{
  1477. pNew1 = (TreeNode *)newTreeNode(db, &iNew1, &rc);
  1478. }
  1479. if( pPeer->aiKeyPtr[0] && pPeer->aiKeyPtr[2] ){
  1480. /* Peer node is completely full. This means that two new leaf nodes
  1481. ** and a new parent node are required. */
  1482. TreeNode *pNew2; u32 iNew2; /* Second new leaf node */
  1483. TreeNode *pNewP; u32 iNewP; /* New parent node */
  1484. if( bLeaf ){
  1485. pNew2 = (TreeNode *)newTreeLeaf(db, &iNew2, &rc);
  1486. }else{
  1487. pNew2 = (TreeNode *)newTreeNode(db, &iNew2, &rc);
  1488. }
  1489. pNewP = copyTreeNode(db, pParent, &iNewP, &rc);
  1490. if( iDir==-1 ){
  1491. pNew1->aiKeyPtr[1] = pPeer->aiKeyPtr[0];
  1492. if( bLeaf==0 ){
  1493. pNew1->aiChildPtr[1] = getChildPtr(pPeer, WORKING_VERSION, 0);
  1494. pNew1->aiChildPtr[2] = getChildPtr(pPeer, WORKING_VERSION, 1);
  1495. }
  1496. pNewP->aiChildPtr[iPSlot-1] = iNew1;
  1497. pNewP->aiKeyPtr[iPSlot-1] = pPeer->aiKeyPtr[1];
  1498. pNewP->aiChildPtr[iPSlot] = iNew2;
  1499. pNew2->aiKeyPtr[0] = pPeer->aiKeyPtr[2];
  1500. pNew2->aiKeyPtr[1] = pParent->aiKeyPtr[iPSlot-1];
  1501. if( bLeaf==0 ){
  1502. pNew2->aiChildPtr[0] = getChildPtr(pPeer, WORKING_VERSION, 2);
  1503. pNew2->aiChildPtr[1] = getChildPtr(pPeer, WORKING_VERSION, 3);
  1504. pNew2->aiChildPtr[2] = iNewptr;
  1505. }
  1506. }else{
  1507. pNew1->aiKeyPtr[1] = pParent->aiKeyPtr[iPSlot];
  1508. if( bLeaf==0 ){
  1509. pNew1->aiChildPtr[1] = iNewptr;
  1510. pNew1->aiChildPtr[2] = getChildPtr(pPeer, WORKING_VERSION, 0);
  1511. }
  1512. pNewP->aiChildPtr[iPSlot] = iNew1;
  1513. pNewP->aiKeyPtr[iPSlot] = pPeer->aiKeyPtr[0];
  1514. pNewP->aiChildPtr[iPSlot+1] = iNew2;
  1515. pNew2->aiKeyPtr[0] = pPeer->aiKeyPtr[1];
  1516. pNew2->aiKeyPtr[1] = pPeer->aiKeyPtr[2];
  1517. if( bLeaf==0 ){
  1518. pNew2->aiChildPtr[0] = getChildPtr(pPeer, WORKING_VERSION, 1);
  1519. pNew2->aiChildPtr[1] = getChildPtr(pPeer, WORKING_VERSION, 2);
  1520. pNew2->aiChildPtr[2] = getChildPtr(pPeer, WORKING_VERSION, 3);
  1521. }
  1522. }
  1523. assert( pCsr->iNode>=1 );
  1524. pCsr->iNode -= 2;
  1525. if( rc==LSM_OK ){
  1526. assert( pNew1->aiKeyPtr[1] && pNew2->aiKeyPtr[1] );
  1527. rc = treeUpdatePtr(db, pCsr, iNewP);
  1528. }
  1529. }else{
  1530. int iKOut = 0;
  1531. int iPOut = 0;
  1532. int i;
  1533. pCsr->iNode--;
  1534. if( iDir==1 ){
  1535. pNew1->aiKeyPtr[iKOut++] = pParent->aiKeyPtr[iPSlot];
  1536. if( bLeaf==0 ) pNew1->aiChildPtr[iPOut++] = iNewptr;
  1537. }
  1538. for(i=0; i<3; i++){
  1539. if( pPeer->aiKeyPtr[i] ){
  1540. pNew1->aiKeyPtr[iKOut++] = pPeer->aiKeyPtr[i];
  1541. }
  1542. }
  1543. if( bLeaf==0 ){
  1544. for(i=0; i<4; i++){
  1545. if( getChildPtr(pPeer, WORKING_VERSION, i) ){
  1546. pNew1->aiChildPtr[iPOut++] = getChildPtr(pPeer, WORKING_VERSION, i);
  1547. }
  1548. }
  1549. }
  1550. if( iDir==-1 ){
  1551. iPSlot--;
  1552. pNew1->aiKeyPtr[iKOut++] = pParent->aiKeyPtr[iPSlot];
  1553. if( bLeaf==0 ) pNew1->aiChildPtr[iPOut++] = iNewptr;
  1554. pCsr->aiCell[pCsr->iNode] = (u8)iPSlot;
  1555. }
  1556. rc = treeDeleteEntry(db, pCsr, iNew1);
  1557. }
  1558. }
  1559. return rc;
  1560. }
  1561. /*
  1562. ** Delete a range of keys from the tree structure (i.e. the lsm_delete_range()
  1563. ** function, not lsm_delete()).
  1564. **
  1565. ** This is a two step process:
  1566. **
  1567. ** 1) Remove all entries currently stored in the tree that have keys
  1568. ** that fall into the deleted range.
  1569. **
  1570. ** TODO: There are surely good ways to optimize this step - removing
  1571. ** a range of keys from a b-tree. But for now, this function removes
  1572. ** them one at a time using the usual approach.
  1573. **
  1574. ** 2) Unless the largest key smaller than or equal to (pKey1/nKey1) is
  1575. ** already marked as START_DELETE, insert a START_DELETE key.
  1576. ** Similarly, unless the smallest key greater than or equal to
  1577. ** (pKey2/nKey2) is already START_END, insert a START_END key.
  1578. */
  1579. int lsmTreeDelete(
  1580. lsm_db *db,
  1581. void *pKey1, int nKey1, /* Start of range */
  1582. void *pKey2, int nKey2 /* End of range */
  1583. ){
  1584. int rc = LSM_OK;
  1585. int bDone = 0;
  1586. TreeRoot *p = &db->treehdr.root;
  1587. TreeBlob blob = {0, 0};
  1588. /* The range must be sensible - that (key1 < key2). */
  1589. assert( treeKeycmp(pKey1, nKey1, pKey2, nKey2)<0 );
  1590. assert( assert_delete_ranges_match(db) );
  1591. #if 0
  1592. static int nCall = 0;
  1593. printf("\n");
  1594. nCall++;
  1595. printf("%d delete %s .. %s\n", nCall, (char *)pKey1, (char *)pKey2);
  1596. dump_tree_contents(db, "before delete");
  1597. #endif
  1598. /* Step 1. This loop runs until the tree contains no keys within the
  1599. ** range being deleted. Or until an error occurs. */
  1600. while( bDone==0 && rc==LSM_OK ){
  1601. int res;
  1602. TreeCursor csr; /* Cursor to seek to first key in range */
  1603. void *pDel; int nDel; /* Key to (possibly) delete this iteration */
  1604. #ifndef NDEBUG
  1605. int nEntry = treeCountEntries(db);
  1606. #endif
  1607. /* Seek the cursor to the first entry in the tree greater than pKey1. */
  1608. treeCursorInit(db, 0, &csr);
  1609. lsmTreeCursorSeek(&csr, pKey1, nKey1, &res);
  1610. if( res<=0 && lsmTreeCursorValid(&csr) ) lsmTreeCursorNext(&csr);
  1611. /* If there is no such entry, or if it is greater than pKey2, then the
  1612. ** tree now contains no keys in the range being deleted. In this case
  1613. ** break out of the loop. */
  1614. bDone = 1;
  1615. if( lsmTreeCursorValid(&csr) ){
  1616. lsmTreeCursorKey(&csr, 0, &pDel, &nDel);
  1617. if( treeKeycmp(pDel, nDel, pKey2, nKey2)<0 ) bDone = 0;
  1618. }
  1619. if( bDone==0 ){
  1620. if( (u32)csr.iNode==(p->nHeight-1) ){
  1621. /* The element to delete already lies on a leaf node */
  1622. rc = treeDeleteEntry(db, &csr, 0);
  1623. }else{
  1624. /* 1. Overwrite the current key with a copy of the next key in the
  1625. ** tree (key N).
  1626. **
  1627. ** 2. Seek to key N (cursor will stop at the internal node copy of
  1628. ** N). Move to the next key (original copy of N). Delete
  1629. ** this entry.
  1630. */
  1631. u32 iKey;
  1632. TreeKey *pKey;
  1633. int iNode = csr.iNode;
  1634. lsmTreeCursorNext(&csr);
  1635. assert( (u32)csr.iNode==(p->nHeight-1) );
  1636. iKey = csr.apTreeNode[csr.iNode]->aiKeyPtr[csr.aiCell[csr.iNode]];
  1637. lsmTreeCursorPrev(&csr);
  1638. treeOverwriteKey(db, &csr, iKey, &rc);
  1639. pKey = treeShmkey(db, iKey, TKV_LOADKEY, &blob, &rc);
  1640. if( pKey ){
  1641. rc = lsmTreeCursorSeek(&csr, TKV_KEY(pKey), pKey->nKey, &res);
  1642. }
  1643. if( rc==LSM_OK ){
  1644. assert( res==0 && csr.iNode==iNode );
  1645. rc = lsmTreeCursorNext(&csr);
  1646. if( rc==LSM_OK ){
  1647. rc = treeDeleteEntry(db, &csr, 0);
  1648. }
  1649. }
  1650. }
  1651. }
  1652. /* Clean up any memory allocated by the cursor. */
  1653. tblobFree(db, &csr.blob);
  1654. #if 0
  1655. dump_tree_contents(db, "ddd delete");
  1656. #endif
  1657. assert( bDone || treeCountEntries(db)==(nEntry-1) );
  1658. }
  1659. #if 0
  1660. dump_tree_contents(db, "during delete");
  1661. #endif
  1662. /* Now insert the START_DELETE and END_DELETE keys. */
  1663. if( rc==LSM_OK ){
  1664. rc = treeInsertEntry(db, LSM_START_DELETE, pKey1, nKey1, 0, -1);
  1665. }
  1666. #if 0
  1667. dump_tree_contents(db, "during delete 2");
  1668. #endif
  1669. if( rc==LSM_OK ){
  1670. rc = treeInsertEntry(db, LSM_END_DELETE, pKey2, nKey2, 0, -1);
  1671. }
  1672. #if 0
  1673. dump_tree_contents(db, "after delete");
  1674. #endif
  1675. tblobFree(db, &blob);
  1676. assert( assert_delete_ranges_match(db) );
  1677. return rc;
  1678. }
  1679. /*
  1680. ** Return, in bytes, the amount of memory currently used by the tree
  1681. ** structure.
  1682. */
  1683. int lsmTreeSize(lsm_db *pDb){
  1684. return pDb->treehdr.root.nByte;
  1685. }
  1686. /*
  1687. ** Open a cursor on the in-memory tree pTree.
  1688. */
  1689. int lsmTreeCursorNew(lsm_db *pDb, int bOld, TreeCursor **ppCsr){
  1690. TreeCursor *pCsr;
  1691. *ppCsr = pCsr = lsmMalloc(pDb->pEnv, sizeof(TreeCursor));
  1692. if( pCsr ){
  1693. treeCursorInit(pDb, bOld, pCsr);
  1694. return LSM_OK;
  1695. }
  1696. return LSM_NOMEM_BKPT;
  1697. }
  1698. /*
  1699. ** Close an in-memory tree cursor.
  1700. */
  1701. void lsmTreeCursorDestroy(TreeCursor *pCsr){
  1702. if( pCsr ){
  1703. tblobFree(pCsr->pDb, &pCsr->blob);
  1704. lsmFree(pCsr->pDb->pEnv, pCsr);
  1705. }
  1706. }
  1707. void lsmTreeCursorReset(TreeCursor *pCsr){
  1708. if( pCsr ){
  1709. pCsr->iNode = -1;
  1710. pCsr->pSave = 0;
  1711. }
  1712. }
  1713. #ifndef NDEBUG
  1714. static int treeCsrCompare(TreeCursor *pCsr, void *pKey, int nKey, int *pRc){
  1715. TreeKey *p;
  1716. int cmp = 0;
  1717. assert( pCsr->iNode>=0 );
  1718. p = csrGetKey(pCsr, &pCsr->blob, pRc);
  1719. if( p ){
  1720. cmp = treeKeycmp(TKV_KEY(p), p->nKey, pKey, nKey);
  1721. }
  1722. return cmp;
  1723. }
  1724. #endif
  1725. /*
  1726. ** Attempt to seek the cursor passed as the first argument to key (pKey/nKey)
  1727. ** in the tree structure. If an exact match for the key is found, leave the
  1728. ** cursor pointing to it and set *pRes to zero before returning. If an
  1729. ** exact match cannot be found, do one of the following:
  1730. **
  1731. ** * Leave the cursor pointing to the smallest element in the tree that
  1732. ** is larger than the key and set *pRes to +1, or
  1733. **
  1734. ** * Leave the cursor pointing to the largest element in the tree that
  1735. ** is smaller than the key and set *pRes to -1, or
  1736. **
  1737. ** * If the tree is empty, leave the cursor at EOF and set *pRes to -1.
  1738. */
  1739. int lsmTreeCursorSeek(TreeCursor *pCsr, void *pKey, int nKey, int *pRes){
  1740. int rc = LSM_OK; /* Return code */
  1741. lsm_db *pDb = pCsr->pDb;
  1742. TreeRoot *pRoot = pCsr->pRoot;
  1743. u32 iNodePtr; /* Location of current node in search */
  1744. /* Discard any saved position data */
  1745. treeCursorRestore(pCsr, 0);
  1746. iNodePtr = pRoot->iRoot;
  1747. if( iNodePtr==0 ){
  1748. /* Either an error occurred or the tree is completely empty. */
  1749. assert( rc!=LSM_OK || pRoot->iRoot==0 );
  1750. *pRes = -1;
  1751. pCsr->iNode = -1;
  1752. }else{
  1753. TreeBlob b = {0, 0};
  1754. int res = 0; /* Result of comparison function */
  1755. int iNode = -1;
  1756. while( iNodePtr ){
  1757. TreeNode *pNode; /* Node at location iNodePtr */
  1758. int iTest; /* Index of second key to test (0 or 2) */
  1759. u32 iTreeKey;
  1760. TreeKey *pTreeKey; /* Key to compare against */
  1761. pNode = (TreeNode *)treeShmptrUnsafe(pDb, iNodePtr);
  1762. iNode++;
  1763. pCsr->apTreeNode[iNode] = pNode;
  1764. /* Compare (pKey/nKey) with the key in the middle slot of B-tree node
  1765. ** pNode. The middle slot is never empty. If the comparison is a match,
  1766. ** then the search is finished. Break out of the loop. */
  1767. pTreeKey = (TreeKey*)treeShmptrUnsafe(pDb, pNode->aiKeyPtr[1]);
  1768. if( !(pTreeKey->flags & LSM_CONTIGUOUS) ){
  1769. pTreeKey = treeShmkey(pDb, pNode->aiKeyPtr[1], TKV_LOADKEY, &b, &rc);
  1770. if( rc!=LSM_OK ) break;
  1771. }
  1772. res = treeKeycmp((void *)&pTreeKey[1], pTreeKey->nKey, pKey, nKey);
  1773. if( res==0 ){
  1774. pCsr->aiCell[iNode] = 1;
  1775. break;
  1776. }
  1777. /* Based on the results of the previous comparison, compare (pKey/nKey)
  1778. ** to either the left or right key of the B-tree node, if such a key
  1779. ** exists. */
  1780. iTest = (res>0 ? 0 : 2);
  1781. iTreeKey = pNode->aiKeyPtr[iTest];
  1782. if( iTreeKey ){
  1783. pTreeKey = (TreeKey*)treeShmptrUnsafe(pDb, iTreeKey);
  1784. if( !(pTreeKey->flags & LSM_CONTIGUOUS) ){
  1785. pTreeKey = treeShmkey(pDb, iTreeKey, TKV_LOADKEY, &b, &rc);
  1786. if( rc ) break;
  1787. }
  1788. res = treeKeycmp((void *)&pTreeKey[1], pTreeKey->nKey, pKey, nKey);
  1789. if( res==0 ){
  1790. pCsr->aiCell[iNode] = (u8)iTest;
  1791. break;
  1792. }
  1793. }else{
  1794. iTest = 1;
  1795. }
  1796. if( (u32)iNode<(pRoot->nHeight-1) ){
  1797. iNodePtr = getChildPtr(pNode, pRoot->iTransId, iTest + (res<0));
  1798. }else{
  1799. iNodePtr = 0;
  1800. }
  1801. pCsr->aiCell[iNode] = (u8)(iTest + (iNodePtr && (res<0)));
  1802. }
  1803. *pRes = res;
  1804. pCsr->iNode = iNode;
  1805. tblobFree(pDb, &b);
  1806. }
  1807. /* assert() that *pRes has been set properly */
  1808. #ifndef NDEBUG
  1809. if( rc==LSM_OK && lsmTreeCursorValid(pCsr) ){
  1810. int cmp = treeCsrCompare(pCsr, pKey, nKey, &rc);
  1811. assert( rc!=LSM_OK || *pRes==cmp || (*pRes ^ cmp)>0 );
  1812. }
  1813. #endif
  1814. return rc;
  1815. }
  1816. int lsmTreeCursorNext(TreeCursor *pCsr){
  1817. #ifndef NDEBUG
  1818. TreeKey *pK1;
  1819. TreeBlob key1 = {0, 0};
  1820. #endif
  1821. lsm_db *pDb = pCsr->pDb;
  1822. TreeRoot *pRoot = pCsr->pRoot;
  1823. const int iLeaf = pRoot->nHeight-1;
  1824. int iCell;
  1825. int rc = LSM_OK;
  1826. TreeNode *pNode;
  1827. /* Restore the cursor position, if required */
  1828. int iRestore = 0;
  1829. treeCursorRestore(pCsr, &iRestore);
  1830. if( iRestore>0 ) return LSM_OK;
  1831. /* Save a pointer to the current key. This is used in an assert() at the
  1832. ** end of this function - to check that the 'next' key really is larger
  1833. ** than the current key. */
  1834. #ifndef NDEBUG
  1835. pK1 = csrGetKey(pCsr, &key1, &rc);
  1836. if( rc!=LSM_OK ) return rc;
  1837. #endif
  1838. assert( lsmTreeCursorValid(pCsr) );
  1839. assert( pCsr->aiCell[pCsr->iNode]<3 );
  1840. pNode = pCsr->apTreeNode[pCsr->iNode];
  1841. iCell = ++pCsr->aiCell[pCsr->iNode];
  1842. /* If the current node is not a leaf, and the current cell has sub-tree
  1843. ** associated with it, descend to the left-most key on the left-most
  1844. ** leaf of the sub-tree. */
  1845. if( pCsr->iNode<iLeaf && getChildPtr(pNode, pRoot->iTransId, iCell) ){
  1846. do {
  1847. u32 iNodePtr;
  1848. pCsr->iNode++;
  1849. iNodePtr = getChildPtr(pNode, pRoot->iTransId, iCell);
  1850. pNode = (TreeNode *)treeShmptr(pDb, iNodePtr);
  1851. pCsr->apTreeNode[pCsr->iNode] = pNode;
  1852. iCell = pCsr->aiCell[pCsr->iNode] = (pNode->aiKeyPtr[0]==0);
  1853. }while( pCsr->iNode < iLeaf );
  1854. }
  1855. /* Otherwise, the next key is found by following pointer up the tree
  1856. ** until there is a key immediately to the right of the pointer followed
  1857. ** to reach the sub-tree containing the current key. */
  1858. else if( iCell>=3 || pNode->aiKeyPtr[iCell]==0 ){
  1859. while( (--pCsr->iNode)>=0 ){
  1860. iCell = pCsr->aiCell[pCsr->iNode];
  1861. if( iCell<3 && pCsr->apTreeNode[pCsr->iNode]->aiKeyPtr[iCell] ) break;
  1862. }
  1863. }
  1864. #ifndef NDEBUG
  1865. if( pCsr->iNode>=0 ){
  1866. TreeKey *pK2 = csrGetKey(pCsr, &pCsr->blob, &rc);
  1867. assert( rc||treeKeycmp(TKV_KEY(pK2),pK2->nKey,TKV_KEY(pK1),pK1->nKey)>=0 );
  1868. }
  1869. tblobFree(pDb, &key1);
  1870. #endif
  1871. return rc;
  1872. }
  1873. int lsmTreeCursorPrev(TreeCursor *pCsr){
  1874. #ifndef NDEBUG
  1875. TreeKey *pK1;
  1876. TreeBlob key1 = {0, 0};
  1877. #endif
  1878. lsm_db *pDb = pCsr->pDb;
  1879. TreeRoot *pRoot = pCsr->pRoot;
  1880. const int iLeaf = pRoot->nHeight-1;
  1881. int iCell;
  1882. int rc = LSM_OK;
  1883. TreeNode *pNode;
  1884. /* Restore the cursor position, if required */
  1885. int iRestore = 0;
  1886. treeCursorRestore(pCsr, &iRestore);
  1887. if( iRestore<0 ) return LSM_OK;
  1888. /* Save a pointer to the current key. This is used in an assert() at the
  1889. ** end of this function - to check that the 'next' key really is smaller
  1890. ** than the current key. */
  1891. #ifndef NDEBUG
  1892. pK1 = csrGetKey(pCsr, &key1, &rc);
  1893. if( rc!=LSM_OK ) return rc;
  1894. #endif
  1895. assert( lsmTreeCursorValid(pCsr) );
  1896. pNode = pCsr->apTreeNode[pCsr->iNode];
  1897. iCell = pCsr->aiCell[pCsr->iNode];
  1898. assert( iCell>=0 && iCell<3 );
  1899. /* If the current node is not a leaf, and the current cell has sub-tree
  1900. ** associated with it, descend to the right-most key on the right-most
  1901. ** leaf of the sub-tree. */
  1902. if( pCsr->iNode<iLeaf && getChildPtr(pNode, pRoot->iTransId, iCell) ){
  1903. do {
  1904. u32 iNodePtr;
  1905. pCsr->iNode++;
  1906. iNodePtr = getChildPtr(pNode, pRoot->iTransId, iCell);
  1907. pNode = (TreeNode *)treeShmptr(pDb, iNodePtr);
  1908. if( rc!=LSM_OK ) break;
  1909. pCsr->apTreeNode[pCsr->iNode] = pNode;
  1910. iCell = 1 + (pNode->aiKeyPtr[2]!=0) + (pCsr->iNode < iLeaf);
  1911. pCsr->aiCell[pCsr->iNode] = (u8)iCell;
  1912. }while( pCsr->iNode < iLeaf );
  1913. }
  1914. /* Otherwise, the next key is found by following pointer up the tree until
  1915. ** there is a key immediately to the left of the pointer followed to reach
  1916. ** the sub-tree containing the current key. */
  1917. else{
  1918. do {
  1919. iCell = pCsr->aiCell[pCsr->iNode]-1;
  1920. if( iCell>=0 && pCsr->apTreeNode[pCsr->iNode]->aiKeyPtr[iCell] ) break;
  1921. }while( (--pCsr->iNode)>=0 );
  1922. pCsr->aiCell[pCsr->iNode] = (u8)iCell;
  1923. }
  1924. #ifndef NDEBUG
  1925. if( pCsr->iNode>=0 ){
  1926. TreeKey *pK2 = csrGetKey(pCsr, &pCsr->blob, &rc);
  1927. assert( rc || treeKeycmp(TKV_KEY(pK2),pK2->nKey,TKV_KEY(pK1),pK1->nKey)<0 );
  1928. }
  1929. tblobFree(pDb, &key1);
  1930. #endif
  1931. return rc;
  1932. }
  1933. /*
  1934. ** Move the cursor to the first (bLast==0) or last (bLast!=0) entry in the
  1935. ** in-memory tree.
  1936. */
  1937. int lsmTreeCursorEnd(TreeCursor *pCsr, int bLast){
  1938. lsm_db *pDb = pCsr->pDb;
  1939. TreeRoot *pRoot = pCsr->pRoot;
  1940. int rc = LSM_OK;
  1941. u32 iNodePtr;
  1942. pCsr->iNode = -1;
  1943. /* Discard any saved position data */
  1944. treeCursorRestore(pCsr, 0);
  1945. iNodePtr = pRoot->iRoot;
  1946. while( iNodePtr ){
  1947. int iCell;
  1948. TreeNode *pNode;
  1949. pNode = (TreeNode *)treeShmptr(pDb, iNodePtr);
  1950. if( rc ) break;
  1951. if( bLast ){
  1952. iCell = ((pNode->aiKeyPtr[2]==0) ? 2 : 3);
  1953. }else{
  1954. iCell = ((pNode->aiKeyPtr[0]==0) ? 1 : 0);
  1955. }
  1956. pCsr->iNode++;
  1957. pCsr->apTreeNode[pCsr->iNode] = pNode;
  1958. if( (u32)pCsr->iNode<pRoot->nHeight-1 ){
  1959. iNodePtr = getChildPtr(pNode, pRoot->iTransId, iCell);
  1960. }else{
  1961. iNodePtr = 0;
  1962. }
  1963. pCsr->aiCell[pCsr->iNode] = (u8)(iCell - (iNodePtr==0 && bLast));
  1964. }
  1965. return rc;
  1966. }
  1967. int lsmTreeCursorFlags(TreeCursor *pCsr){
  1968. int flags = 0;
  1969. if( pCsr && pCsr->iNode>=0 ){
  1970. int rc = LSM_OK;
  1971. TreeKey *pKey = (TreeKey *)treeShmptrUnsafe(pCsr->pDb,
  1972. pCsr->apTreeNode[pCsr->iNode]->aiKeyPtr[pCsr->aiCell[pCsr->iNode]]
  1973. );
  1974. assert( rc==LSM_OK );
  1975. flags = (pKey->flags & ~LSM_CONTIGUOUS);
  1976. }
  1977. return flags;
  1978. }
  1979. int lsmTreeCursorKey(TreeCursor *pCsr, int *pFlags, void **ppKey, int *pnKey){
  1980. TreeKey *pTreeKey;
  1981. int rc = LSM_OK;
  1982. assert( lsmTreeCursorValid(pCsr) );
  1983. pTreeKey = pCsr->pSave;
  1984. if( !pTreeKey ){
  1985. pTreeKey = csrGetKey(pCsr, &pCsr->blob, &rc);
  1986. }
  1987. if( rc==LSM_OK ){
  1988. *pnKey = pTreeKey->nKey;
  1989. if( pFlags ) *pFlags = pTreeKey->flags;
  1990. *ppKey = (void *)&pTreeKey[1];
  1991. }
  1992. return rc;
  1993. }
  1994. int lsmTreeCursorValue(TreeCursor *pCsr, void **ppVal, int *pnVal){
  1995. int res = 0;
  1996. int rc;
  1997. rc = treeCursorRestore(pCsr, &res);
  1998. if( res==0 ){
  1999. TreeKey *pTreeKey = csrGetKey(pCsr, &pCsr->blob, &rc);
  2000. if( rc==LSM_OK ){
  2001. if( pTreeKey->flags & LSM_INSERT ){
  2002. *pnVal = pTreeKey->nValue;
  2003. *ppVal = TKV_VAL(pTreeKey);
  2004. }else{
  2005. *ppVal = 0;
  2006. *pnVal = -1;
  2007. }
  2008. }
  2009. }else{
  2010. *ppVal = 0;
  2011. *pnVal = 0;
  2012. }
  2013. return rc;
  2014. }
  2015. /*
  2016. ** Return true if the cursor currently points to a valid entry.
  2017. */
  2018. int lsmTreeCursorValid(TreeCursor *pCsr){
  2019. return (pCsr && (pCsr->pSave || pCsr->iNode>=0));
  2020. }
  2021. /*
  2022. ** Store a mark in *pMark. Later on, a call to lsmTreeRollback() with a
  2023. ** pointer to the same TreeMark structure may be used to roll the tree
  2024. ** contents back to their current state.
  2025. */
  2026. void lsmTreeMark(lsm_db *pDb, TreeMark *pMark){
  2027. pMark->iRoot = pDb->treehdr.root.iRoot;
  2028. pMark->nHeight = pDb->treehdr.root.nHeight;
  2029. pMark->iWrite = pDb->treehdr.iWrite;
  2030. pMark->nChunk = pDb->treehdr.nChunk;
  2031. pMark->iNextShmid = pDb->treehdr.iNextShmid;
  2032. pMark->iRollback = intArraySize(&pDb->rollback);
  2033. }
  2034. /*
  2035. ** Roll back to mark pMark. Structure *pMark should have been previously
  2036. ** populated by a call to lsmTreeMark().
  2037. */
  2038. void lsmTreeRollback(lsm_db *pDb, TreeMark *pMark){
  2039. int iIdx;
  2040. int nIdx;
  2041. u32 iNext;
  2042. ShmChunk *pChunk;
  2043. u32 iChunk;
  2044. u32 iShmid;
  2045. /* Revert all required v2 pointers. */
  2046. nIdx = intArraySize(&pDb->rollback);
  2047. for(iIdx = pMark->iRollback; iIdx<nIdx; iIdx++){
  2048. TreeNode *pNode;
  2049. pNode = treeShmptr(pDb, intArrayEntry(&pDb->rollback, iIdx));
  2050. assert( pNode );
  2051. pNode->iV2 = 0;
  2052. pNode->iV2Child = 0;
  2053. pNode->iV2Ptr = 0;
  2054. }
  2055. intArrayTruncate(&pDb->rollback, pMark->iRollback);
  2056. /* Restore the free-chunk list. */
  2057. assert( pMark->iWrite!=0 );
  2058. iChunk = treeOffsetToChunk(pMark->iWrite-1);
  2059. pChunk = treeShmChunk(pDb, iChunk);
  2060. iNext = pChunk->iNext;
  2061. pChunk->iNext = 0;
  2062. pChunk = treeShmChunk(pDb, pDb->treehdr.iFirst);
  2063. iShmid = pChunk->iShmid-1;
  2064. while( iNext ){
  2065. u32 iFree = iNext; /* Current chunk being rollback-freed */
  2066. ShmChunk *pFree; /* Pointer to chunk iFree */
  2067. pFree = treeShmChunk(pDb, iFree);
  2068. iNext = pFree->iNext;
  2069. if( iFree<pMark->nChunk ){
  2070. pFree->iNext = pDb->treehdr.iFirst;
  2071. pFree->iShmid = iShmid--;
  2072. pDb->treehdr.iFirst = iFree;
  2073. }
  2074. }
  2075. /* Restore the tree-header fields */
  2076. pDb->treehdr.root.iRoot = pMark->iRoot;
  2077. pDb->treehdr.root.nHeight = pMark->nHeight;
  2078. pDb->treehdr.iWrite = pMark->iWrite;
  2079. pDb->treehdr.nChunk = pMark->nChunk;
  2080. pDb->treehdr.iNextShmid = pMark->iNextShmid;
  2081. }
  2082. /*
  2083. ** Load the in-memory tree header from shared-memory into pDb->treehdr.
  2084. ** If the header cannot be loaded, return LSM_PROTOCOL.
  2085. **
  2086. ** If the header is successfully loaded and parameter piRead is not NULL,
  2087. ** is is set to 1 if the header was loaded from ShmHeader.hdr1, or 2 if
  2088. ** the header was loaded from ShmHeader.hdr2.
  2089. */
  2090. int lsmTreeLoadHeader(lsm_db *pDb, int *piRead){
  2091. int nRem = LSM_ATTEMPTS_BEFORE_PROTOCOL;
  2092. while( (nRem--)>0 ){
  2093. ShmHeader *pShm = pDb->pShmhdr;
  2094. memcpy(&pDb->treehdr, &pShm->hdr1, sizeof(TreeHeader));
  2095. if( treeHeaderChecksumOk(&pDb->treehdr) ){
  2096. if( piRead ) *piRead = 1;
  2097. return LSM_OK;
  2098. }
  2099. memcpy(&pDb->treehdr, &pShm->hdr2, sizeof(TreeHeader));
  2100. if( treeHeaderChecksumOk(&pDb->treehdr) ){
  2101. if( piRead ) *piRead = 2;
  2102. return LSM_OK;
  2103. }
  2104. lsmShmBarrier(pDb);
  2105. }
  2106. return LSM_PROTOCOL_BKPT;
  2107. }
  2108. int lsmTreeLoadHeaderOk(lsm_db *pDb, int iRead){
  2109. TreeHeader *p = (iRead==1) ? &pDb->pShmhdr->hdr1 : &pDb->pShmhdr->hdr2;
  2110. assert( iRead==1 || iRead==2 );
  2111. return (0==memcmp(pDb->treehdr.aCksum, p->aCksum, sizeof(u32)*2));
  2112. }
  2113. /*
  2114. ** This function is called to conclude a transaction. If argument bCommit
  2115. ** is true, the transaction is committed. Otherwise it is rolled back.
  2116. */
  2117. int lsmTreeEndTransaction(lsm_db *pDb, int bCommit){
  2118. ShmHeader *pShm = pDb->pShmhdr;
  2119. treeHeaderChecksum(&pDb->treehdr, pDb->treehdr.aCksum);
  2120. memcpy(&pShm->hdr2, &pDb->treehdr, sizeof(TreeHeader));
  2121. lsmShmBarrier(pDb);
  2122. memcpy(&pShm->hdr1, &pDb->treehdr, sizeof(TreeHeader));
  2123. pShm->bWriter = 0;
  2124. intArrayFree(pDb->pEnv, &pDb->rollback);
  2125. return LSM_OK;
  2126. }
  2127. #ifndef NDEBUG
  2128. static int assert_delete_ranges_match(lsm_db *db){
  2129. int prev = 0;
  2130. TreeBlob blob = {0, 0};
  2131. TreeCursor csr; /* Cursor used to iterate through tree */
  2132. int rc;
  2133. treeCursorInit(db, 0, &csr);
  2134. for( rc = lsmTreeCursorEnd(&csr, 0);
  2135. rc==LSM_OK && lsmTreeCursorValid(&csr);
  2136. rc = lsmTreeCursorNext(&csr)
  2137. ){
  2138. TreeKey *pKey = csrGetKey(&csr, &blob, &rc);
  2139. if( rc!=LSM_OK ) break;
  2140. assert( ((prev&LSM_START_DELETE)==0)==((pKey->flags&LSM_END_DELETE)==0) );
  2141. prev = pKey->flags;
  2142. }
  2143. tblobFree(csr.pDb, &csr.blob);
  2144. tblobFree(csr.pDb, &blob);
  2145. return 1;
  2146. }
  2147. static int treeCountEntries(lsm_db *db){
  2148. TreeCursor csr; /* Cursor used to iterate through tree */
  2149. int rc;
  2150. int nEntry = 0;
  2151. treeCursorInit(db, 0, &csr);
  2152. for( rc = lsmTreeCursorEnd(&csr, 0);
  2153. rc==LSM_OK && lsmTreeCursorValid(&csr);
  2154. rc = lsmTreeCursorNext(&csr)
  2155. ){
  2156. nEntry++;
  2157. }
  2158. tblobFree(csr.pDb, &csr.blob);
  2159. return nEntry;
  2160. }
  2161. #endif