123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013201420152016201720182019202020212022202320242025202620272028202920302031203220332034203520362037203820392040204120422043204420452046204720482049205020512052205320542055205620572058205920602061206220632064206520662067206820692070207120722073207420752076207720782079208020812082208320842085208620872088208920902091209220932094209520962097209820992100210121022103210421052106210721082109211021112112211321142115211621172118211921202121212221232124212521262127212821292130213121322133213421352136213721382139214021412142214321442145214621472148214921502151215221532154215521562157215821592160216121622163216421652166216721682169217021712172217321742175217621772178217921802181218221832184218521862187218821892190219121922193219421952196219721982199220022012202220322042205220622072208220922102211221222132214221522162217221822192220222122222223222422252226222722282229223022312232223322342235223622372238223922402241224222432244224522462247224822492250225122522253225422552256225722582259226022612262226322642265226622672268226922702271227222732274227522762277227822792280228122822283228422852286228722882289229022912292229322942295229622972298229923002301230223032304230523062307230823092310231123122313231423152316231723182319232023212322232323242325232623272328232923302331233223332334233523362337233823392340234123422343234423452346234723482349235023512352235323542355235623572358235923602361236223632364236523662367236823692370237123722373237423752376237723782379238023812382238323842385238623872388238923902391239223932394239523962397239823992400240124022403240424052406240724082409241024112412241324142415241624172418241924202421242224232424242524262427242824292430243124322433243424352436243724382439244024412442244324442445244624472448244924502451245224532454245524562457245824592460246124622463246424652466 |
- /*
- ** 2011-08-18
- **
- ** The author disclaims copyright to this source code. In place of
- ** a legal notice, here is a blessing:
- **
- ** May you do good and not evil.
- ** May you find forgiveness for yourself and forgive others.
- ** May you share freely, never taking more than you give.
- **
- *************************************************************************
- **
- ** This file contains the implementation of an in-memory tree structure.
- **
- ** Technically the tree is a B-tree of order 4 (in the Knuth sense - each
- ** node may have up to 4 children). Keys are stored within B-tree nodes by
- ** reference. This may be slightly slower than a conventional red-black
- ** tree, but it is simpler. It is also an easier structure to modify to
- ** create a version that supports nested transaction rollback.
- **
- ** This tree does not currently support a delete operation. One is not
- ** required. When LSM deletes a key from a database, it inserts a DELETE
- ** marker into the data structure. As a result, although the value associated
- ** with a key stored in the in-memory tree structure may be modified, no
- ** keys are ever removed.
- */
- /*
- ** MVCC NOTES
- **
- ** The in-memory tree structure supports SQLite-style MVCC. This means
- ** that while one client is writing to the tree structure, other clients
- ** may still be querying an older snapshot of the tree.
- **
- ** One way to implement this is to use an append-only b-tree. In this
- ** case instead of modifying nodes in-place, a copy of the node is made
- ** and the required modifications made to the copy. The parent of the
- ** node is then modified (to update the pointer so that it points to
- ** the new copy), which causes a copy of the parent to be made, and so on.
- ** This means that each time the tree is written to a new root node is
- ** created. A snapshot is identified by the root node that it uses.
- **
- ** The problem with the above is that each time the tree is written to,
- ** a copy of the node structure modified and all of its ancestor nodes
- ** is made. This may prove excessive with large tree structures.
- **
- ** To reduce this overhead, the data structure used for a tree node is
- ** designed so that it may be edited in place exactly once without
- ** affecting existing users. In other words, the node structure is capable
- ** of storing two separate versions of the node at the same time.
- ** When a node is to be edited, if the node structure already contains
- ** two versions, a copy is made as in the append-only approach. Or, if
- ** it only contains a single version, it is edited in place.
- **
- ** This reduces the overhead so that, roughly, one new node structure
- ** must be allocated for each write (on top of those allocations that
- ** would have been required by a non-MVCC tree). Logic: Assume that at
- ** any time, 50% of nodes in the tree already contain 2 versions. When
- ** a new entry is written to a node, there is a 50% chance that a copy
- ** of the node will be required. And a 25% chance that a copy of its
- ** parent is required. And so on.
- **
- ** ROLLBACK
- **
- ** The in-memory tree also supports transaction and sub-transaction
- ** rollback. In order to rollback to point in time X, the following is
- ** necessary:
- **
- ** 1. All memory allocated since X must be freed, and
- ** 2. All "v2" data adding to nodes that existed at X should be zeroed.
- ** 3. The root node must be restored to its X value.
- **
- ** The Mempool object used to allocate memory for the tree supports
- ** operation (1) - see the lsmPoolMark() and lsmPoolRevert() functions.
- **
- ** To support (2), all nodes that have v2 data are part of a singly linked
- ** list, sorted by the age of the v2 data (nodes that have had data added
- ** most recently are at the end of the list). So to zero all v2 data added
- ** since X, the linked list is traversed from the first node added following
- ** X onwards.
- **
- */
- #ifndef _LSM_INT_H
- # include "lsmInt.h"
- #endif
- #include <string.h>
- #define MAX_DEPTH 32
- typedef struct TreeKey TreeKey;
- typedef struct TreeNode TreeNode;
- typedef struct TreeLeaf TreeLeaf;
- typedef struct NodeVersion NodeVersion;
- struct TreeOld {
- u32 iShmid; /* Last shared-memory chunk in use by old */
- u32 iRoot; /* Offset of root node in shm file */
- u32 nHeight; /* Height of tree structure */
- };
- #if 0
- /*
- ** assert() that a TreeKey.flags value is sane. Usage:
- **
- ** assert( lsmAssertFlagsOk(pTreeKey->flags) );
- */
- static int lsmAssertFlagsOk(u8 keyflags){
- /* At least one flag must be set. Otherwise, what is this key doing? */
- assert( keyflags!=0 );
- /* The POINT_DELETE and INSERT flags cannot both be set. */
- assert( (keyflags & LSM_POINT_DELETE)==0 || (keyflags & LSM_INSERT)==0 );
- /* If both the START_DELETE and END_DELETE flags are set, then the INSERT
- ** flag must also be set. In other words - the three DELETE flags cannot
- ** all be set */
- assert( (keyflags & LSM_END_DELETE)==0
- || (keyflags & LSM_START_DELETE)==0
- || (keyflags & LSM_POINT_DELETE)==0
- );
- return 1;
- }
- #endif
- static int assert_delete_ranges_match(lsm_db *);
- static int treeCountEntries(lsm_db *db);
- /*
- ** Container for a key-value pair. Within the *-shm file, each key/value
- ** pair is stored in a single allocation (which may not actually be
- ** contiguous in memory). Layout is the TreeKey structure, followed by
- ** the nKey bytes of key blob, followed by the nValue bytes of value blob
- ** (if nValue is non-negative).
- */
- struct TreeKey {
- int nKey; /* Size of pKey in bytes */
- int nValue; /* Size of pValue. Or negative. */
- u8 flags; /* Various LSM_XXX flags */
- };
- #define TKV_KEY(p) ((void *)&(p)[1])
- #define TKV_VAL(p) ((void *)(((u8 *)&(p)[1]) + (p)->nKey))
- /*
- ** A single tree node. A node structure may contain up to 3 key/value
- ** pairs. Internal (non-leaf) nodes have up to 4 children.
- **
- ** TODO: Update the format of this to be more compact. Get it working
- ** first though...
- */
- struct TreeNode {
- u32 aiKeyPtr[3]; /* Array of pointers to TreeKey objects */
- /* The following fields are present for interior nodes only, not leaves. */
- u32 aiChildPtr[4]; /* Array of pointers to child nodes */
- /* The extra child pointer slot. */
- u32 iV2; /* Transaction number of v2 */
- u8 iV2Child; /* apChild[] entry replaced by pV2Ptr */
- u32 iV2Ptr; /* Substitute pointer */
- };
- struct TreeLeaf {
- u32 aiKeyPtr[3]; /* Array of pointers to TreeKey objects */
- };
- typedef struct TreeBlob TreeBlob;
- struct TreeBlob {
- int n;
- u8 *a;
- };
- /*
- ** Cursor for searching a tree structure.
- **
- ** If a cursor does not point to any element (a.k.a. EOF), then the
- ** TreeCursor.iNode variable is set to a negative value. Otherwise, the
- ** cursor currently points to key aiCell[iNode] on node apTreeNode[iNode].
- **
- ** Entries in the apTreeNode[] and aiCell[] arrays contain the node and
- ** index of the TreeNode.apChild[] pointer followed to descend to the
- ** current element. Hence apTreeNode[0] always contains the root node of
- ** the tree.
- */
- struct TreeCursor {
- lsm_db *pDb; /* Database handle for this cursor */
- TreeRoot *pRoot; /* Root node and height of tree to access */
- int iNode; /* Cursor points at apTreeNode[iNode] */
- TreeNode *apTreeNode[MAX_DEPTH];/* Current position in tree */
- u8 aiCell[MAX_DEPTH]; /* Current position in tree */
- TreeKey *pSave; /* Saved key */
- TreeBlob blob; /* Dynamic storage for a key */
- };
- /*
- ** A value guaranteed to be larger than the largest possible transaction
- ** id (TreeHeader.iTransId).
- */
- #define WORKING_VERSION (1<<30)
- static int tblobGrow(lsm_db *pDb, TreeBlob *p, int n, int *pRc){
- if( n>p->n ){
- lsmFree(pDb->pEnv, p->a);
- p->a = lsmMallocRc(pDb->pEnv, n, pRc);
- p->n = n;
- }
- return (p->a==0);
- }
- static void tblobFree(lsm_db *pDb, TreeBlob *p){
- lsmFree(pDb->pEnv, p->a);
- }
- /***********************************************************************
- ** Start of IntArray methods. */
- /*
- ** Append value iVal to the contents of IntArray *p. Return LSM_OK if
- ** successful, or LSM_NOMEM if an OOM condition is encountered.
- */
- static int intArrayAppend(lsm_env *pEnv, IntArray *p, u32 iVal){
- assert( p->nArray<=p->nAlloc );
- if( p->nArray>=p->nAlloc ){
- u32 *aNew;
- int nNew = p->nArray ? p->nArray*2 : 128;
- aNew = lsmRealloc(pEnv, p->aArray, nNew*sizeof(u32));
- if( !aNew ) return LSM_NOMEM_BKPT;
- p->aArray = aNew;
- p->nAlloc = nNew;
- }
- p->aArray[p->nArray++] = iVal;
- return LSM_OK;
- }
- /*
- ** Zero the IntArray object.
- */
- static void intArrayFree(lsm_env *pEnv, IntArray *p){
- p->nArray = 0;
- }
- /*
- ** Return the number of entries currently in the int-array object.
- */
- static int intArraySize(IntArray *p){
- return p->nArray;
- }
- /*
- ** Return a copy of the iIdx'th entry in the int-array.
- */
- static u32 intArrayEntry(IntArray *p, int iIdx){
- return p->aArray[iIdx];
- }
- /*
- ** Truncate the int-array so that all but the first nVal values are
- ** discarded.
- */
- static void intArrayTruncate(IntArray *p, int nVal){
- p->nArray = nVal;
- }
- /* End of IntArray methods.
- ***********************************************************************/
- static int treeKeycmp(void *p1, int n1, void *p2, int n2){
- int res;
- res = memcmp(p1, p2, LSM_MIN(n1, n2));
- if( res==0 ) res = (n1-n2);
- return res;
- }
- /*
- ** The pointer passed as the first argument points to an interior node,
- ** not a leaf. This function returns the offset of the iCell'th child
- ** sub-tree of the node.
- */
- static u32 getChildPtr(TreeNode *p, int iVersion, int iCell){
- assert( iVersion>=0 );
- assert( iCell>=0 && iCell<=array_size(p->aiChildPtr) );
- if( p->iV2 && p->iV2<=(u32)iVersion && iCell==p->iV2Child ) return p->iV2Ptr;
- return p->aiChildPtr[iCell];
- }
- /*
- ** Given an offset within the *-shm file, return the associated chunk number.
- */
- static int treeOffsetToChunk(u32 iOff){
- assert( LSM_SHM_CHUNK_SIZE==(1<<15) );
- return (int)(iOff>>15);
- }
- #define treeShmptrUnsafe(pDb, iPtr) \
- (&((u8*)((pDb)->apShm[(iPtr)>>15]))[(iPtr) & (LSM_SHM_CHUNK_SIZE-1)])
- /*
- ** Return a pointer to the mapped memory location associated with *-shm
- ** file offset iPtr.
- */
- static void *treeShmptr(lsm_db *pDb, u32 iPtr){
- assert( (iPtr>>15)<(u32)pDb->nShm );
- assert( pDb->apShm[iPtr>>15] );
- return iPtr ? treeShmptrUnsafe(pDb, iPtr) : 0;
- }
- static ShmChunk * treeShmChunk(lsm_db *pDb, int iChunk){
- return (ShmChunk *)(pDb->apShm[iChunk]);
- }
- static ShmChunk * treeShmChunkRc(lsm_db *pDb, int iChunk, int *pRc){
- assert( *pRc==LSM_OK );
- if( iChunk<pDb->nShm || LSM_OK==(*pRc = lsmShmCacheChunks(pDb, iChunk+1)) ){
- return (ShmChunk *)(pDb->apShm[iChunk]);
- }
- return 0;
- }
- #ifndef NDEBUG
- static void assertIsWorkingChild(
- lsm_db *db,
- TreeNode *pNode,
- TreeNode *pParent,
- int iCell
- ){
- TreeNode *p;
- u32 iPtr = getChildPtr(pParent, WORKING_VERSION, iCell);
- p = treeShmptr(db, iPtr);
- assert( p==pNode );
- }
- #else
- # define assertIsWorkingChild(w,x,y,z)
- #endif
- /* Values for the third argument to treeShmkey(). */
- #define TKV_LOADKEY 1
- #define TKV_LOADVAL 2
- static TreeKey *treeShmkey(
- lsm_db *pDb, /* Database handle */
- u32 iPtr, /* Shmptr to TreeKey struct */
- int eLoad, /* Either zero or a TREEKEY_LOADXXX value */
- TreeBlob *pBlob, /* Used if dynamic memory is required */
- int *pRc /* IN/OUT: Error code */
- ){
- TreeKey *pRet;
- assert( eLoad==TKV_LOADKEY || eLoad==TKV_LOADVAL );
- pRet = (TreeKey *)treeShmptr(pDb, iPtr);
- if( pRet ){
- int nReq; /* Bytes of space required at pRet */
- int nAvail; /* Bytes of space available at pRet */
- nReq = sizeof(TreeKey) + pRet->nKey;
- if( eLoad==TKV_LOADVAL && pRet->nValue>0 ){
- nReq += pRet->nValue;
- }
- assert( LSM_SHM_CHUNK_SIZE==(1<<15) );
- nAvail = LSM_SHM_CHUNK_SIZE - (iPtr & (LSM_SHM_CHUNK_SIZE-1));
- if( nAvail<nReq ){
- if( tblobGrow(pDb, pBlob, nReq, pRc)==0 ){
- int nLoad = 0;
- while( *pRc==LSM_OK ){
- ShmChunk *pChunk;
- void *p = treeShmptr(pDb, iPtr);
- int n = LSM_MIN(nAvail, nReq-nLoad);
- memcpy(&pBlob->a[nLoad], p, n);
- nLoad += n;
- if( nLoad==nReq ) break;
- pChunk = treeShmChunk(pDb, treeOffsetToChunk(iPtr));
- assert( pChunk );
- iPtr = (pChunk->iNext * LSM_SHM_CHUNK_SIZE) + LSM_SHM_CHUNK_HDR;
- nAvail = LSM_SHM_CHUNK_SIZE - LSM_SHM_CHUNK_HDR;
- }
- }
- pRet = (TreeKey *)(pBlob->a);
- }
- }
- return pRet;
- }
- #if defined(LSM_DEBUG) && defined(LSM_EXPENSIVE_ASSERT)
- void assert_leaf_looks_ok(TreeNode *pNode){
- assert( pNode->apKey[1] );
- }
- void assert_node_looks_ok(TreeNode *pNode, int nHeight){
- if( pNode ){
- assert( pNode->apKey[1] );
- if( nHeight>1 ){
- int i;
- assert( getChildPtr(pNode, WORKING_VERSION, 1) );
- assert( getChildPtr(pNode, WORKING_VERSION, 2) );
- for(i=0; i<4; i++){
- assert_node_looks_ok(getChildPtr(pNode, WORKING_VERSION, i), nHeight-1);
- }
- }
- }
- }
- /*
- ** Run various assert() statements to check that the working-version of the
- ** tree is correct in the following respects:
- **
- ** * todo...
- */
- void assert_tree_looks_ok(int rc, Tree *pTree){
- }
- #else
- # define assert_tree_looks_ok(x,y)
- #endif
- void lsmFlagsToString(int flags, char *zFlags){
- zFlags[0] = (flags & LSM_END_DELETE) ? ']' : '.';
- /* Only one of LSM_POINT_DELETE, LSM_INSERT and LSM_SEPARATOR should ever
- ** be set. If this is not true, write a '?' to the output. */
- switch( flags & (LSM_POINT_DELETE|LSM_INSERT|LSM_SEPARATOR) ){
- case 0: zFlags[1] = '.'; break;
- case LSM_POINT_DELETE: zFlags[1] = '-'; break;
- case LSM_INSERT: zFlags[1] = '+'; break;
- case LSM_SEPARATOR: zFlags[1] = '^'; break;
- default: zFlags[1] = '?'; break;
- }
- zFlags[2] = (flags & LSM_SYSTEMKEY) ? '*' : '.';
- zFlags[3] = (flags & LSM_START_DELETE) ? '[' : '.';
- zFlags[4] = '\0';
- }
- #ifdef LSM_DEBUG
- /*
- ** Pointer pBlob points to a buffer containing a blob of binary data
- ** nBlob bytes long. Append the contents of this blob to *pStr, with
- ** each octet represented by a 2-digit hexadecimal number. For example,
- ** if the input blob is three bytes in size and contains {0x01, 0x44, 0xFF},
- ** then "0144ff" is appended to *pStr.
- */
- static void lsmAppendStrBlob(LsmString *pStr, void *pBlob, int nBlob){
- int i;
- lsmStringExtend(pStr, nBlob*2);
- if( pStr->nAlloc==0 ) return;
- for(i=0; i<nBlob; i++){
- u8 c = ((u8*)pBlob)[i];
- if( c>='a' && c<='z' ){
- pStr->z[pStr->n++] = c;
- }else if( c!=0 || nBlob==1 || i!=(nBlob-1) ){
- pStr->z[pStr->n++] = "0123456789abcdef"[(c>>4)&0xf];
- pStr->z[pStr->n++] = "0123456789abcdef"[c&0xf];
- }
- }
- pStr->z[pStr->n] = 0;
- }
- #if 0 /* NOT USED */
- /*
- ** Append nIndent space (0x20) characters to string *pStr.
- */
- static void lsmAppendIndent(LsmString *pStr, int nIndent){
- int i;
- lsmStringExtend(pStr, nIndent);
- for(i=0; i<nIndent; i++) lsmStringAppend(pStr, " ", 1);
- }
- #endif
- static void strAppendFlags(LsmString *pStr, u8 flags){
- char zFlags[8];
- lsmFlagsToString(flags, zFlags);
- zFlags[4] = ':';
- lsmStringAppend(pStr, zFlags, 5);
- }
- void dump_node_contents(
- lsm_db *pDb,
- u32 iNode, /* Print out the contents of this node */
- char *zPath, /* Path from root to this node */
- int nPath, /* Number of bytes in zPath */
- int nHeight /* Height: (0==leaf) (1==parent-of-leaf) */
- ){
- const char *zSpace = " ";
- int i;
- int rc = LSM_OK;
- LsmString s;
- TreeNode *pNode;
- TreeBlob b = {0, 0};
- pNode = (TreeNode *)treeShmptr(pDb, iNode);
- if( nHeight==0 ){
- /* Append the nIndent bytes of space to string s. */
- lsmStringInit(&s, pDb->pEnv);
- /* Append each key to string s. */
- for(i=0; i<3; i++){
- u32 iPtr = pNode->aiKeyPtr[i];
- if( iPtr ){
- TreeKey *pKey = treeShmkey(pDb, pNode->aiKeyPtr[i],TKV_LOADKEY, &b,&rc);
- strAppendFlags(&s, pKey->flags);
- lsmAppendStrBlob(&s, TKV_KEY(pKey), pKey->nKey);
- lsmStringAppend(&s, " ", -1);
- }
- }
- printf("% 6d %.*sleaf%.*s: %s\n",
- iNode, nPath, zPath, 20-nPath-4, zSpace, s.z
- );
- lsmStringClear(&s);
- }else{
- for(i=0; i<4 && nHeight>0; i++){
- u32 iPtr = getChildPtr(pNode, pDb->treehdr.root.iTransId, i);
- zPath[nPath] = (char)(i+'0');
- zPath[nPath+1] = '/';
- if( iPtr ){
- dump_node_contents(pDb, iPtr, zPath, nPath+2, nHeight-1);
- }
- if( i!=3 && pNode->aiKeyPtr[i] ){
- TreeKey *pKey = treeShmkey(pDb, pNode->aiKeyPtr[i], TKV_LOADKEY,&b,&rc);
- lsmStringInit(&s, pDb->pEnv);
- strAppendFlags(&s, pKey->flags);
- lsmAppendStrBlob(&s, TKV_KEY(pKey), pKey->nKey);
- printf("% 6d %.*s%.*s: %s\n",
- iNode, nPath+1, zPath, 20-nPath-1, zSpace, s.z);
- lsmStringClear(&s);
- }
- }
- }
- tblobFree(pDb, &b);
- }
- void dump_tree_contents(lsm_db *pDb, const char *zCaption){
- char zPath[64];
- TreeRoot *p = &pDb->treehdr.root;
- printf("\n%s\n", zCaption);
- zPath[0] = '/';
- if( p->iRoot ){
- dump_node_contents(pDb, p->iRoot, zPath, 1, p->nHeight-1);
- }
- fflush(stdout);
- }
- #endif
- /*
- ** Initialize a cursor object, the space for which has already been
- ** allocated.
- */
- static void treeCursorInit(lsm_db *pDb, int bOld, TreeCursor *pCsr){
- memset(pCsr, 0, sizeof(TreeCursor));
- pCsr->pDb = pDb;
- if( bOld ){
- pCsr->pRoot = &pDb->treehdr.oldroot;
- }else{
- pCsr->pRoot = &pDb->treehdr.root;
- }
- pCsr->iNode = -1;
- }
- /*
- ** Return a pointer to the mapping of the TreeKey object that the cursor
- ** is pointing to.
- */
- static TreeKey *csrGetKey(TreeCursor *pCsr, TreeBlob *pBlob, int *pRc){
- TreeKey *pRet;
- lsm_db *pDb = pCsr->pDb;
- u32 iPtr = pCsr->apTreeNode[pCsr->iNode]->aiKeyPtr[pCsr->aiCell[pCsr->iNode]];
- assert( iPtr );
- pRet = (TreeKey*)treeShmptrUnsafe(pDb, iPtr);
- if( !(pRet->flags & LSM_CONTIGUOUS) ){
- pRet = treeShmkey(pDb, iPtr, TKV_LOADVAL, pBlob, pRc);
- }
- return pRet;
- }
- /*
- ** Save the current position of tree cursor pCsr.
- */
- int lsmTreeCursorSave(TreeCursor *pCsr){
- int rc = LSM_OK;
- if( pCsr && pCsr->pSave==0 ){
- int iNode = pCsr->iNode;
- if( iNode>=0 ){
- pCsr->pSave = csrGetKey(pCsr, &pCsr->blob, &rc);
- }
- pCsr->iNode = -1;
- }
- return rc;
- }
- /*
- ** Restore the position of a saved tree cursor.
- */
- static int treeCursorRestore(TreeCursor *pCsr, int *pRes){
- int rc = LSM_OK;
- if( pCsr->pSave ){
- TreeKey *pKey = pCsr->pSave;
- pCsr->pSave = 0;
- if( pRes ){
- rc = lsmTreeCursorSeek(pCsr, TKV_KEY(pKey), pKey->nKey, pRes);
- }
- }
- return rc;
- }
- /*
- ** Allocate nByte bytes of space within the *-shm file. If successful,
- ** return LSM_OK and set *piPtr to the offset within the file at which
- ** the allocated space is located.
- */
- static u32 treeShmalloc(lsm_db *pDb, int bAlign, int nByte, int *pRc){
- u32 iRet = 0;
- if( *pRc==LSM_OK ){
- const static int CHUNK_SIZE = LSM_SHM_CHUNK_SIZE;
- const static int CHUNK_HDR = LSM_SHM_CHUNK_HDR;
- u32 iWrite; /* Current write offset */
- u32 iEof; /* End of current chunk */
- int iChunk; /* Current chunk */
- assert( nByte <= (CHUNK_SIZE-CHUNK_HDR) );
- /* Check if there is enough space on the current chunk to fit the
- ** new allocation. If not, link in a new chunk and put the new
- ** allocation at the start of it. */
- iWrite = pDb->treehdr.iWrite;
- if( bAlign ){
- iWrite = (iWrite + 3) & ~0x0003;
- assert( (iWrite % 4)==0 );
- }
- assert( iWrite );
- iChunk = treeOffsetToChunk(iWrite-1);
- iEof = (iChunk+1) * CHUNK_SIZE;
- assert( iEof>=iWrite && (iEof-iWrite)<(u32)CHUNK_SIZE );
- if( (iWrite+nByte)>iEof ){
- ShmChunk *pHdr; /* Header of chunk just finished (iChunk) */
- ShmChunk *pFirst; /* Header of chunk treehdr.iFirst */
- ShmChunk *pNext; /* Header of new chunk */
- int iNext = 0; /* Next chunk */
- int rc = LSM_OK;
- pFirst = treeShmChunk(pDb, pDb->treehdr.iFirst);
- assert( shm_sequence_ge(pDb->treehdr.iUsedShmid, pFirst->iShmid) );
- assert( (pDb->treehdr.iNextShmid+1-pDb->treehdr.nChunk)==pFirst->iShmid );
- /* Check if the chunk at the start of the linked list is still in
- ** use. If not, reuse it. If so, allocate a new chunk by appending
- ** to the *-shm file. */
- if( pDb->treehdr.iUsedShmid!=pFirst->iShmid ){
- int bInUse;
- rc = lsmTreeInUse(pDb, pFirst->iShmid, &bInUse);
- if( rc!=LSM_OK ){
- *pRc = rc;
- return 0;
- }
- if( bInUse==0 ){
- iNext = pDb->treehdr.iFirst;
- pDb->treehdr.iFirst = pFirst->iNext;
- assert( pDb->treehdr.iFirst );
- }
- }
- if( iNext==0 ) iNext = pDb->treehdr.nChunk++;
- /* Set the header values for the new chunk */
- pNext = treeShmChunkRc(pDb, iNext, &rc);
- if( pNext ){
- pNext->iNext = 0;
- pNext->iShmid = (pDb->treehdr.iNextShmid++);
- }else{
- *pRc = rc;
- return 0;
- }
- /* Set the header values for the chunk just finished */
- pHdr = (ShmChunk *)treeShmptr(pDb, iChunk*CHUNK_SIZE);
- pHdr->iNext = iNext;
- /* Advance to the next chunk */
- iWrite = iNext * CHUNK_SIZE + CHUNK_HDR;
- }
- /* Allocate space at iWrite. */
- iRet = iWrite;
- pDb->treehdr.iWrite = iWrite + nByte;
- pDb->treehdr.root.nByte += nByte;
- }
- return iRet;
- }
- /*
- ** Allocate and zero nByte bytes of space within the *-shm file.
- */
- static void *treeShmallocZero(lsm_db *pDb, int nByte, u32 *piPtr, int *pRc){
- u32 iPtr;
- void *p;
- iPtr = treeShmalloc(pDb, 1, nByte, pRc);
- p = treeShmptr(pDb, iPtr);
- if( p ){
- assert( *pRc==LSM_OK );
- memset(p, 0, nByte);
- *piPtr = iPtr;
- }
- return p;
- }
- static TreeNode *newTreeNode(lsm_db *pDb, u32 *piPtr, int *pRc){
- return treeShmallocZero(pDb, sizeof(TreeNode), piPtr, pRc);
- }
- static TreeLeaf *newTreeLeaf(lsm_db *pDb, u32 *piPtr, int *pRc){
- return treeShmallocZero(pDb, sizeof(TreeLeaf), piPtr, pRc);
- }
- static TreeKey *newTreeKey(
- lsm_db *pDb,
- u32 *piPtr,
- void *pKey, int nKey, /* Key data */
- void *pVal, int nVal, /* Value data (or nVal<0 for delete) */
- int *pRc
- ){
- TreeKey *p;
- u32 iPtr;
- u32 iEnd;
- int nRem;
- u8 *a;
- int n;
- /* Allocate space for the TreeKey structure itself */
- *piPtr = iPtr = treeShmalloc(pDb, 1, sizeof(TreeKey), pRc);
- p = treeShmptr(pDb, iPtr);
- if( *pRc ) return 0;
- p->nKey = nKey;
- p->nValue = nVal;
- /* Allocate and populate the space required for the key and value. */
- n = nRem = nKey;
- a = (u8 *)pKey;
- while( a ){
- while( nRem>0 ){
- u8 *aAlloc;
- int nAlloc;
- u32 iWrite;
- iWrite = (pDb->treehdr.iWrite & (LSM_SHM_CHUNK_SIZE-1));
- iWrite = LSM_MAX(iWrite, LSM_SHM_CHUNK_HDR);
- nAlloc = LSM_MIN((LSM_SHM_CHUNK_SIZE-iWrite), (u32)nRem);
- aAlloc = treeShmptr(pDb, treeShmalloc(pDb, 0, nAlloc, pRc));
- if( aAlloc==0 ) break;
- memcpy(aAlloc, &a[n-nRem], nAlloc);
- nRem -= nAlloc;
- }
- a = pVal;
- n = nRem = nVal;
- pVal = 0;
- }
- iEnd = iPtr + sizeof(TreeKey) + nKey + LSM_MAX(0, nVal);
- if( (iPtr & ~(LSM_SHM_CHUNK_SIZE-1))!=(iEnd & ~(LSM_SHM_CHUNK_SIZE-1)) ){
- p->flags = 0;
- }else{
- p->flags = LSM_CONTIGUOUS;
- }
- if( *pRc ) return 0;
- #if 0
- printf("store: %d %s\n", (int)iPtr, (char *)pKey);
- #endif
- return p;
- }
- static TreeNode *copyTreeNode(
- lsm_db *pDb,
- TreeNode *pOld,
- u32 *piNew,
- int *pRc
- ){
- TreeNode *pNew;
- pNew = newTreeNode(pDb, piNew, pRc);
- if( pNew ){
- memcpy(pNew->aiKeyPtr, pOld->aiKeyPtr, sizeof(pNew->aiKeyPtr));
- memcpy(pNew->aiChildPtr, pOld->aiChildPtr, sizeof(pNew->aiChildPtr));
- if( pOld->iV2 ) pNew->aiChildPtr[pOld->iV2Child] = pOld->iV2Ptr;
- }
- return pNew;
- }
- static TreeNode *copyTreeLeaf(
- lsm_db *pDb,
- TreeLeaf *pOld,
- u32 *piNew,
- int *pRc
- ){
- TreeLeaf *pNew;
- pNew = newTreeLeaf(pDb, piNew, pRc);
- if( pNew ){
- memcpy(pNew, pOld, sizeof(TreeLeaf));
- }
- return (TreeNode *)pNew;
- }
- /*
- ** The tree cursor passed as the second argument currently points to an
- ** internal node (not a leaf). Specifically, to a sub-tree pointer. This
- ** function replaces the sub-tree that the cursor currently points to
- ** with sub-tree pNew.
- **
- ** The sub-tree may be replaced either by writing the "v2 data" on the
- ** internal node, or by allocating a new TreeNode structure and then
- ** calling this function on the parent of the internal node.
- */
- static int treeUpdatePtr(lsm_db *pDb, TreeCursor *pCsr, u32 iNew){
- int rc = LSM_OK;
- if( pCsr->iNode<0 ){
- /* iNew is the new root node */
- pDb->treehdr.root.iRoot = iNew;
- }else{
- /* If this node already has version 2 content, allocate a copy and
- ** update the copy with the new pointer value. Otherwise, store the
- ** new pointer as v2 data within the current node structure. */
- TreeNode *p; /* The node to be modified */
- int iChildPtr; /* apChild[] entry to modify */
- p = pCsr->apTreeNode[pCsr->iNode];
- iChildPtr = pCsr->aiCell[pCsr->iNode];
- if( p->iV2 ){
- /* The "allocate new TreeNode" option */
- u32 iCopy;
- TreeNode *pCopy;
- pCopy = copyTreeNode(pDb, p, &iCopy, &rc);
- if( pCopy ){
- assert( rc==LSM_OK );
- pCopy->aiChildPtr[iChildPtr] = iNew;
- pCsr->iNode--;
- rc = treeUpdatePtr(pDb, pCsr, iCopy);
- }
- }else{
- /* The "v2 data" option */
- u32 iPtr;
- assert( pDb->treehdr.root.iTransId>0 );
- if( pCsr->iNode ){
- iPtr = getChildPtr(
- pCsr->apTreeNode[pCsr->iNode-1],
- pDb->treehdr.root.iTransId, pCsr->aiCell[pCsr->iNode-1]
- );
- }else{
- iPtr = pDb->treehdr.root.iRoot;
- }
- rc = intArrayAppend(pDb->pEnv, &pDb->rollback, iPtr);
- if( rc==LSM_OK ){
- p->iV2 = pDb->treehdr.root.iTransId;
- p->iV2Child = (u8)iChildPtr;
- p->iV2Ptr = iNew;
- }
- }
- }
- return rc;
- }
- /*
- ** Cursor pCsr points at a node that is part of pTree. This function
- ** inserts a new key and optionally child node pointer into that node.
- **
- ** The position into which the new key and pointer are inserted is
- ** determined by the iSlot parameter. The new key will be inserted to
- ** the left of the key currently stored in apKey[iSlot]. Or, if iSlot is
- ** greater than the index of the rightmost key in the node.
- **
- ** Pointer pLeftPtr points to a child tree that contains keys that are
- ** smaller than pTreeKey.
- */
- static int treeInsert(
- lsm_db *pDb, /* Database handle */
- TreeCursor *pCsr, /* Cursor indicating path to insert at */
- u32 iLeftPtr, /* Left child pointer */
- u32 iTreeKey, /* Location of key to insert */
- u32 iRightPtr, /* Right child pointer */
- int iSlot /* Position to insert key into */
- ){
- int rc = LSM_OK;
- TreeNode *pNode = pCsr->apTreeNode[pCsr->iNode];
- /* Check if the node is currently full. If so, split pNode in two and
- ** call this function recursively to add a key to the parent. Otherwise,
- ** insert the new key directly into pNode. */
- assert( pNode->aiKeyPtr[1] );
- if( pNode->aiKeyPtr[0] && pNode->aiKeyPtr[2] ){
- u32 iLeft; TreeNode *pLeft; /* New left-hand sibling node */
- u32 iRight; TreeNode *pRight; /* New right-hand sibling node */
- pLeft = newTreeNode(pDb, &iLeft, &rc);
- pRight = newTreeNode(pDb, &iRight, &rc);
- if( rc ) return rc;
- pLeft->aiChildPtr[1] = getChildPtr(pNode, WORKING_VERSION, 0);
- pLeft->aiKeyPtr[1] = pNode->aiKeyPtr[0];
- pLeft->aiChildPtr[2] = getChildPtr(pNode, WORKING_VERSION, 1);
- pRight->aiChildPtr[1] = getChildPtr(pNode, WORKING_VERSION, 2);
- pRight->aiKeyPtr[1] = pNode->aiKeyPtr[2];
- pRight->aiChildPtr[2] = getChildPtr(pNode, WORKING_VERSION, 3);
- if( pCsr->iNode==0 ){
- /* pNode is the root of the tree. Grow the tree by one level. */
- u32 iRoot; TreeNode *pRoot; /* New root node */
- pRoot = newTreeNode(pDb, &iRoot, &rc);
- pRoot->aiKeyPtr[1] = pNode->aiKeyPtr[1];
- pRoot->aiChildPtr[1] = iLeft;
- pRoot->aiChildPtr[2] = iRight;
- pDb->treehdr.root.iRoot = iRoot;
- pDb->treehdr.root.nHeight++;
- }else{
- pCsr->iNode--;
- rc = treeInsert(pDb, pCsr,
- iLeft, pNode->aiKeyPtr[1], iRight, pCsr->aiCell[pCsr->iNode]
- );
- }
- assert( pLeft->iV2==0 );
- assert( pRight->iV2==0 );
- switch( iSlot ){
- case 0:
- pLeft->aiKeyPtr[0] = iTreeKey;
- pLeft->aiChildPtr[0] = iLeftPtr;
- if( iRightPtr ) pLeft->aiChildPtr[1] = iRightPtr;
- break;
- case 1:
- pLeft->aiChildPtr[3] = (iRightPtr ? iRightPtr : pLeft->aiChildPtr[2]);
- pLeft->aiKeyPtr[2] = iTreeKey;
- pLeft->aiChildPtr[2] = iLeftPtr;
- break;
- case 2:
- pRight->aiKeyPtr[0] = iTreeKey;
- pRight->aiChildPtr[0] = iLeftPtr;
- if( iRightPtr ) pRight->aiChildPtr[1] = iRightPtr;
- break;
- case 3:
- pRight->aiChildPtr[3] = (iRightPtr ? iRightPtr : pRight->aiChildPtr[2]);
- pRight->aiKeyPtr[2] = iTreeKey;
- pRight->aiChildPtr[2] = iLeftPtr;
- break;
- }
- }else{
- TreeNode *pNew;
- u32 *piKey;
- u32 *piChild;
- u32 iStore = 0;
- u32 iNew = 0;
- int i;
- /* Allocate a new version of node pNode. */
- pNew = newTreeNode(pDb, &iNew, &rc);
- if( rc ) return rc;
- piKey = pNew->aiKeyPtr;
- piChild = pNew->aiChildPtr;
- for(i=0; i<iSlot; i++){
- if( pNode->aiKeyPtr[i] ){
- *(piKey++) = pNode->aiKeyPtr[i];
- *(piChild++) = getChildPtr(pNode, WORKING_VERSION, i);
- }
- }
- *piKey++ = iTreeKey;
- *piChild++ = iLeftPtr;
- iStore = iRightPtr;
- for(i=iSlot; i<3; i++){
- if( pNode->aiKeyPtr[i] ){
- *(piKey++) = pNode->aiKeyPtr[i];
- *(piChild++) = iStore ? iStore : getChildPtr(pNode, WORKING_VERSION, i);
- iStore = 0;
- }
- }
- if( iStore ){
- *piChild = iStore;
- }else{
- *piChild = getChildPtr(pNode, WORKING_VERSION,
- (pNode->aiKeyPtr[2] ? 3 : 2)
- );
- }
- pCsr->iNode--;
- rc = treeUpdatePtr(pDb, pCsr, iNew);
- }
- return rc;
- }
- static int treeInsertLeaf(
- lsm_db *pDb, /* Database handle */
- TreeCursor *pCsr, /* Cursor structure */
- u32 iTreeKey, /* Key pointer to insert */
- int iSlot /* Insert key to the left of this */
- ){
- int rc = LSM_OK; /* Return code */
- TreeNode *pLeaf = pCsr->apTreeNode[pCsr->iNode];
- TreeLeaf *pNew;
- u32 iNew;
- assert( iSlot>=0 && iSlot<=4 );
- assert( pCsr->iNode>0 );
- assert( pLeaf->aiKeyPtr[1] );
- pCsr->iNode--;
- pNew = newTreeLeaf(pDb, &iNew, &rc);
- if( pNew ){
- if( pLeaf->aiKeyPtr[0] && pLeaf->aiKeyPtr[2] ){
- /* The leaf is full. Split it in two. */
- TreeLeaf *pRight;
- u32 iRight;
- pRight = newTreeLeaf(pDb, &iRight, &rc);
- if( pRight ){
- assert( rc==LSM_OK );
- pNew->aiKeyPtr[1] = pLeaf->aiKeyPtr[0];
- pRight->aiKeyPtr[1] = pLeaf->aiKeyPtr[2];
- switch( iSlot ){
- case 0: pNew->aiKeyPtr[0] = iTreeKey; break;
- case 1: pNew->aiKeyPtr[2] = iTreeKey; break;
- case 2: pRight->aiKeyPtr[0] = iTreeKey; break;
- case 3: pRight->aiKeyPtr[2] = iTreeKey; break;
- }
- rc = treeInsert(pDb, pCsr, iNew, pLeaf->aiKeyPtr[1], iRight,
- pCsr->aiCell[pCsr->iNode]
- );
- }
- }else{
- int iOut = 0;
- int i;
- for(i=0; i<4; i++){
- if( i==iSlot ) pNew->aiKeyPtr[iOut++] = iTreeKey;
- if( i<3 && pLeaf->aiKeyPtr[i] ){
- pNew->aiKeyPtr[iOut++] = pLeaf->aiKeyPtr[i];
- }
- }
- rc = treeUpdatePtr(pDb, pCsr, iNew);
- }
- }
- return rc;
- }
- void lsmTreeMakeOld(lsm_db *pDb){
- /* A write transaction must be open. Otherwise the code below that
- ** assumes (pDb->pClient->iLogOff) is current may malfunction.
- **
- ** Update: currently this assert fails due to lsm_flush(), which does
- ** not set nTransOpen.
- */
- assert( /* pDb->nTransOpen>0 && */ pDb->iReader>=0 );
- if( pDb->treehdr.iOldShmid==0 ){
- pDb->treehdr.iOldLog = (pDb->treehdr.log.aRegion[2].iEnd << 1);
- pDb->treehdr.iOldLog |= (~(pDb->pClient->iLogOff) & (i64)0x0001);
- pDb->treehdr.oldcksum0 = pDb->treehdr.log.cksum0;
- pDb->treehdr.oldcksum1 = pDb->treehdr.log.cksum1;
- pDb->treehdr.iOldShmid = pDb->treehdr.iNextShmid-1;
- memcpy(&pDb->treehdr.oldroot, &pDb->treehdr.root, sizeof(TreeRoot));
- pDb->treehdr.root.iTransId = 1;
- pDb->treehdr.root.iRoot = 0;
- pDb->treehdr.root.nHeight = 0;
- pDb->treehdr.root.nByte = 0;
- }
- }
- void lsmTreeDiscardOld(lsm_db *pDb){
- assert( lsmShmAssertLock(pDb, LSM_LOCK_WRITER, LSM_LOCK_EXCL)
- || lsmShmAssertLock(pDb, LSM_LOCK_DMS2, LSM_LOCK_EXCL)
- );
- pDb->treehdr.iUsedShmid = pDb->treehdr.iOldShmid;
- pDb->treehdr.iOldShmid = 0;
- }
- int lsmTreeHasOld(lsm_db *pDb){
- return pDb->treehdr.iOldShmid!=0;
- }
- /*
- ** This function is called during recovery to initialize the
- ** tree header. Only the database connections private copy of the tree-header
- ** is initialized here - it will be copied into shared memory if log file
- ** recovery is successful.
- */
- int lsmTreeInit(lsm_db *pDb){
- ShmChunk *pOne;
- int rc = LSM_OK;
- memset(&pDb->treehdr, 0, sizeof(TreeHeader));
- pDb->treehdr.root.iTransId = 1;
- pDb->treehdr.iFirst = 1;
- pDb->treehdr.nChunk = 2;
- pDb->treehdr.iWrite = LSM_SHM_CHUNK_SIZE + LSM_SHM_CHUNK_HDR;
- pDb->treehdr.iNextShmid = 2;
- pDb->treehdr.iUsedShmid = 1;
- pOne = treeShmChunkRc(pDb, 1, &rc);
- if( pOne ){
- pOne->iNext = 0;
- pOne->iShmid = 1;
- }
- return rc;
- }
- static void treeHeaderChecksum(
- TreeHeader *pHdr,
- u32 *aCksum
- ){
- u32 cksum1 = 0x12345678;
- u32 cksum2 = 0x9ABCDEF0;
- u32 *a = (u32 *)pHdr;
- int i;
- assert( (offsetof(TreeHeader, aCksum) + sizeof(u32)*2)==sizeof(TreeHeader) );
- assert( (sizeof(TreeHeader) % (sizeof(u32)*2))==0 );
- for(i=0; i<(offsetof(TreeHeader, aCksum) / sizeof(u32)); i+=2){
- cksum1 += a[i];
- cksum2 += (cksum1 + a[i+1]);
- }
- aCksum[0] = cksum1;
- aCksum[1] = cksum2;
- }
- /*
- ** Return true if the checksum stored in TreeHeader object *pHdr is
- ** consistent with the contents of its other fields.
- */
- static int treeHeaderChecksumOk(TreeHeader *pHdr){
- u32 aCksum[2];
- treeHeaderChecksum(pHdr, aCksum);
- return (0==memcmp(aCksum, pHdr->aCksum, sizeof(aCksum)));
- }
- /*
- ** This type is used by functions lsmTreeRepair() and treeSortByShmid() to
- ** make relinking the linked list of shared-memory chunks easier.
- */
- typedef struct ShmChunkLoc ShmChunkLoc;
- struct ShmChunkLoc {
- ShmChunk *pShm;
- u32 iLoc;
- };
- /*
- ** This function checks that the linked list of shared memory chunks
- ** that starts at chunk db->treehdr.iFirst:
- **
- ** 1) Includes all chunks in the shared-memory region, and
- ** 2) Links them together in order of ascending shm-id.
- **
- ** If no error occurs and the conditions above are met, LSM_OK is returned.
- **
- ** If either of the conditions are untrue, LSM_CORRUPT is returned. Or, if
- ** an error is encountered before the checks are completed, another LSM error
- ** code (i.e. LSM_IOERR or LSM_NOMEM) may be returned.
- */
- static int treeCheckLinkedList(lsm_db *db){
- int rc = LSM_OK;
- int nVisit = 0;
- ShmChunk *p;
- p = treeShmChunkRc(db, db->treehdr.iFirst, &rc);
- while( rc==LSM_OK && p ){
- if( p->iNext ){
- if( p->iNext>=db->treehdr.nChunk ){
- rc = LSM_CORRUPT_BKPT;
- }else{
- ShmChunk *pNext = treeShmChunkRc(db, p->iNext, &rc);
- if( rc==LSM_OK ){
- if( pNext->iShmid!=p->iShmid+1 ){
- rc = LSM_CORRUPT_BKPT;
- }
- p = pNext;
- }
- }
- }else{
- p = 0;
- }
- nVisit++;
- }
- if( rc==LSM_OK && (u32)nVisit!=db->treehdr.nChunk-1 ){
- rc = LSM_CORRUPT_BKPT;
- }
- return rc;
- }
- /*
- ** Iterate through the current in-memory tree. If there are any v2-pointers
- ** with transaction ids larger than db->treehdr.iTransId, zero them.
- */
- static int treeRepairPtrs(lsm_db *db){
- int rc = LSM_OK;
- if( db->treehdr.root.nHeight>1 ){
- TreeCursor csr; /* Cursor used to iterate through tree */
- u32 iTransId = db->treehdr.root.iTransId;
- /* Initialize the cursor structure. Also decrement the nHeight variable
- ** in the tree-header. This will prevent the cursor from visiting any
- ** leaf nodes. */
- db->treehdr.root.nHeight--;
- treeCursorInit(db, 0, &csr);
- rc = lsmTreeCursorEnd(&csr, 0);
- while( rc==LSM_OK && lsmTreeCursorValid(&csr) ){
- TreeNode *pNode = csr.apTreeNode[csr.iNode];
- if( pNode->iV2>iTransId ){
- pNode->iV2Child = 0;
- pNode->iV2Ptr = 0;
- pNode->iV2 = 0;
- }
- rc = lsmTreeCursorNext(&csr);
- }
- tblobFree(csr.pDb, &csr.blob);
- db->treehdr.root.nHeight++;
- }
- return rc;
- }
- static int treeRepairList(lsm_db *db){
- int rc = LSM_OK;
- int i;
- ShmChunk *p;
- ShmChunk *pMin = 0;
- u32 iMin = 0;
- /* Iterate through all shm chunks. Find the smallest shm-id present in
- ** the shared-memory region. */
- for(i=1; rc==LSM_OK && (u32)i<db->treehdr.nChunk; i++){
- p = treeShmChunkRc(db, i, &rc);
- if( p && (pMin==0 || shm_sequence_ge(pMin->iShmid, p->iShmid)) ){
- pMin = p;
- iMin = i;
- }
- }
- /* Fix the shm-id values on any chunks with a shm-id greater than or
- ** equal to treehdr.iNextShmid. Then do a merge-sort of all chunks to
- ** fix the ShmChunk.iNext pointers.
- */
- if( rc==LSM_OK ){
- int nSort;
- int nByte;
- u32 iPrevShmid;
- ShmChunkLoc *aSort;
- /* Allocate space for a merge sort. */
- nSort = 1;
- while( (u32)nSort < (db->treehdr.nChunk-1) ) nSort = nSort * 2;
- nByte = sizeof(ShmChunkLoc) * nSort * 2;
- aSort = lsmMallocZeroRc(db->pEnv, nByte, &rc);
- iPrevShmid = pMin->iShmid;
- /* Fix all shm-ids, if required. */
- if( rc==LSM_OK ){
- iPrevShmid = pMin->iShmid-1;
- for(i=1; (u32)i<db->treehdr.nChunk; i++){
- p = treeShmChunk(db, i);
- aSort[i-1].pShm = p;
- aSort[i-1].iLoc = i;
- if( (u32)i!=db->treehdr.iFirst ){
- if( shm_sequence_ge(p->iShmid, db->treehdr.iNextShmid) ){
- p->iShmid = iPrevShmid--;
- }
- }
- }
- if( iMin!=db->treehdr.iFirst ){
- p = treeShmChunk(db, db->treehdr.iFirst);
- p->iShmid = iPrevShmid;
- }
- }
- if( rc==LSM_OK ){
- ShmChunkLoc *aSpace = &aSort[nSort];
- for(i=0; i<nSort; i++){
- if( aSort[i].pShm ){
- assert( shm_sequence_ge(aSort[i].pShm->iShmid, iPrevShmid) );
- assert( aSpace[aSort[i].pShm->iShmid - iPrevShmid].pShm==0 );
- aSpace[aSort[i].pShm->iShmid - iPrevShmid] = aSort[i];
- }
- }
- if( aSpace[nSort-1].pShm ) aSpace[nSort-1].pShm->iNext = 0;
- for(i=0; i<nSort-1; i++){
- if( aSpace[i].pShm ){
- aSpace[i].pShm->iNext = aSpace[i+1].iLoc;
- }
- }
- rc = treeCheckLinkedList(db);
- lsmFree(db->pEnv, aSort);
- }
- }
- return rc;
- }
- /*
- ** This function is called as part of opening a write-transaction if the
- ** writer-flag is already set - indicating that the previous writer
- ** failed before ending its transaction.
- */
- int lsmTreeRepair(lsm_db *db){
- int rc = LSM_OK;
- TreeHeader hdr;
- ShmHeader *pHdr = db->pShmhdr;
- /* Ensure that the two tree-headers are consistent. Copy one over the other
- ** if necessary. Prefer the data from a tree-header for which the checksum
- ** computes. Or, if they both compute, prefer tree-header-1. */
- if( memcmp(&pHdr->hdr1, &pHdr->hdr2, sizeof(TreeHeader)) ){
- if( treeHeaderChecksumOk(&pHdr->hdr1) ){
- memcpy(&pHdr->hdr2, &pHdr->hdr1, sizeof(TreeHeader));
- }else{
- memcpy(&pHdr->hdr1, &pHdr->hdr2, sizeof(TreeHeader));
- }
- }
- /* Save the connections current copy of the tree-header. It will be
- ** restored before returning. */
- memcpy(&hdr, &db->treehdr, sizeof(TreeHeader));
- /* Walk the tree. Zero any v2 pointers with a transaction-id greater than
- ** the transaction-id currently in the tree-headers. */
- rc = treeRepairPtrs(db);
- /* Repair the linked list of shared-memory chunks. */
- if( rc==LSM_OK ){
- rc = treeRepairList(db);
- }
- memcpy(&db->treehdr, &hdr, sizeof(TreeHeader));
- return rc;
- }
- static void treeOverwriteKey(lsm_db *db, TreeCursor *pCsr, u32 iKey, int *pRc){
- if( *pRc==LSM_OK ){
- TreeRoot *p = &db->treehdr.root;
- TreeNode *pNew;
- u32 iNew;
- TreeNode *pNode = pCsr->apTreeNode[pCsr->iNode];
- int iCell = pCsr->aiCell[pCsr->iNode];
- /* Create a copy of this node */
- if( (pCsr->iNode>0 && (u32)pCsr->iNode==(p->nHeight-1)) ){
- pNew = copyTreeLeaf(db, (TreeLeaf *)pNode, &iNew, pRc);
- }else{
- pNew = copyTreeNode(db, pNode, &iNew, pRc);
- }
- if( pNew ){
- /* Modify the value in the new version */
- pNew->aiKeyPtr[iCell] = iKey;
- /* Change the pointer in the parent (if any) to point at the new
- ** TreeNode */
- pCsr->iNode--;
- treeUpdatePtr(db, pCsr, iNew);
- }
- }
- }
- static int treeNextIsEndDelete(lsm_db *db, TreeCursor *pCsr){
- int iNode = pCsr->iNode;
- int iCell = pCsr->aiCell[iNode]+1;
- /* Cursor currently points to a leaf node. */
- assert( (u32)pCsr->iNode==(db->treehdr.root.nHeight-1) );
- while( iNode>=0 ){
- TreeNode *pNode = pCsr->apTreeNode[iNode];
- if( iCell<3 && pNode->aiKeyPtr[iCell] ){
- int rc = LSM_OK;
- TreeKey *pKey = treeShmptr(db, pNode->aiKeyPtr[iCell]);
- assert( rc==LSM_OK );
- return ((pKey->flags & LSM_END_DELETE) ? 1 : 0);
- }
- iNode--;
- iCell = pCsr->aiCell[iNode];
- }
- return 0;
- }
- static int treePrevIsStartDelete(lsm_db *db, TreeCursor *pCsr){
- int iNode = pCsr->iNode;
- /* Cursor currently points to a leaf node. */
- assert( (u32)pCsr->iNode==(db->treehdr.root.nHeight-1) );
- while( iNode>=0 ){
- TreeNode *pNode = pCsr->apTreeNode[iNode];
- int iCell = pCsr->aiCell[iNode]-1;
- if( iCell>=0 && pNode->aiKeyPtr[iCell] ){
- int rc = LSM_OK;
- TreeKey *pKey = treeShmptr(db, pNode->aiKeyPtr[iCell]);
- assert( rc==LSM_OK );
- return ((pKey->flags & LSM_START_DELETE) ? 1 : 0);
- }
- iNode--;
- }
- return 0;
- }
- static int treeInsertEntry(
- lsm_db *pDb, /* Database handle */
- int flags, /* Flags associated with entry */
- void *pKey, /* Pointer to key data */
- int nKey, /* Size of key data in bytes */
- void *pVal, /* Pointer to value data (or NULL) */
- int nVal /* Bytes in value data (or -ve for delete) */
- ){
- int rc = LSM_OK; /* Return Code */
- TreeKey *pTreeKey; /* New key-value being inserted */
- u32 iTreeKey;
- TreeRoot *p = &pDb->treehdr.root;
- TreeCursor csr; /* Cursor to seek to pKey/nKey */
- int res = 0; /* Result of seek operation on csr */
- assert( nVal>=0 || pVal==0 );
- assert_tree_looks_ok(LSM_OK, pTree);
- assert( flags==LSM_INSERT || flags==LSM_POINT_DELETE
- || flags==LSM_START_DELETE || flags==LSM_END_DELETE
- );
- assert( (flags & LSM_CONTIGUOUS)==0 );
- #if 0
- dump_tree_contents(pDb, "before");
- #endif
- if( p->iRoot ){
- TreeKey *pRes; /* Key at end of seek operation */
- treeCursorInit(pDb, 0, &csr);
- /* Seek to the leaf (or internal node) that the new key belongs on */
- rc = lsmTreeCursorSeek(&csr, pKey, nKey, &res);
- pRes = csrGetKey(&csr, &csr.blob, &rc);
- if( rc!=LSM_OK ) return rc;
- assert( pRes );
- if( flags==LSM_START_DELETE ){
- /* When inserting a start-delete-range entry, if the key that
- ** occurs immediately before the new entry is already a START_DELETE,
- ** then the new entry is not required. */
- if( (res<=0 && (pRes->flags & LSM_START_DELETE))
- || (res>0 && treePrevIsStartDelete(pDb, &csr))
- ){
- goto insert_entry_out;
- }
- }else if( flags==LSM_END_DELETE ){
- /* When inserting an start-delete-range entry, if the key that
- ** occurs immediately after the new entry is already an END_DELETE,
- ** then the new entry is not required. */
- if( (res<0 && treeNextIsEndDelete(pDb, &csr))
- || (res>=0 && (pRes->flags & LSM_END_DELETE))
- ){
- goto insert_entry_out;
- }
- }
- if( res==0 && (flags & (LSM_END_DELETE|LSM_START_DELETE)) ){
- if( pRes->flags & LSM_INSERT ){
- nVal = pRes->nValue;
- pVal = TKV_VAL(pRes);
- }
- flags = flags | pRes->flags;
- }
- if( flags & (LSM_INSERT|LSM_POINT_DELETE) ){
- if( (res<0 && (pRes->flags & LSM_START_DELETE))
- || (res>0 && (pRes->flags & LSM_END_DELETE))
- ){
- flags = flags | (LSM_END_DELETE|LSM_START_DELETE);
- }else if( res==0 ){
- flags = flags | (pRes->flags & (LSM_END_DELETE|LSM_START_DELETE));
- }
- }
- }else{
- memset(&csr, 0, sizeof(TreeCursor));
- }
- /* Allocate and populate a new key-value pair structure */
- pTreeKey = newTreeKey(pDb, &iTreeKey, pKey, nKey, pVal, nVal, &rc);
- if( rc!=LSM_OK ) return rc;
- assert( pTreeKey->flags==0 || pTreeKey->flags==LSM_CONTIGUOUS );
- pTreeKey->flags |= flags;
- if( p->iRoot==0 ){
- /* The tree is completely empty. Add a new root node and install
- ** (pKey/nKey) as the middle entry. Even though it is a leaf at the
- ** moment, use newTreeNode() to allocate the node (i.e. allocate enough
- ** space for the fields used by interior nodes). This is because the
- ** treeInsert() routine may convert this node to an interior node. */
- TreeNode *pRoot = newTreeNode(pDb, &p->iRoot, &rc);
- if( rc==LSM_OK ){
- assert( p->nHeight==0 );
- pRoot->aiKeyPtr[1] = iTreeKey;
- p->nHeight = 1;
- }
- }else{
- if( res==0 ){
- /* The search found a match within the tree. */
- treeOverwriteKey(pDb, &csr, iTreeKey, &rc);
- }else{
- /* The cursor now points to the leaf node into which the new entry should
- ** be inserted. There may or may not be a free slot within the leaf for
- ** the new key-value pair.
- **
- ** iSlot is set to the index of the key within pLeaf that the new key
- ** should be inserted to the left of (or to a value 1 greater than the
- ** index of the rightmost key if the new key is larger than all keys
- ** currently stored in the node).
- */
- int iSlot = csr.aiCell[csr.iNode] + (res<0);
- if( csr.iNode==0 ){
- rc = treeInsert(pDb, &csr, 0, iTreeKey, 0, iSlot);
- }else{
- rc = treeInsertLeaf(pDb, &csr, iTreeKey, iSlot);
- }
- }
- }
- #if 0
- dump_tree_contents(pDb, "after");
- #endif
- insert_entry_out:
- tblobFree(pDb, &csr.blob);
- assert_tree_looks_ok(rc, pTree);
- return rc;
- }
- /*
- ** Insert a new entry into the in-memory tree.
- **
- ** If the value of the 5th parameter, nVal, is negative, then a delete-marker
- ** is inserted into the tree. In this case the value pointer, pVal, must be
- ** NULL.
- */
- int lsmTreeInsert(
- lsm_db *pDb, /* Database handle */
- void *pKey, /* Pointer to key data */
- int nKey, /* Size of key data in bytes */
- void *pVal, /* Pointer to value data (or NULL) */
- int nVal /* Bytes in value data (or -ve for delete) */
- ){
- int flags;
- if( nVal<0 ){
- flags = LSM_POINT_DELETE;
- }else{
- flags = LSM_INSERT;
- }
- return treeInsertEntry(pDb, flags, pKey, nKey, pVal, nVal);
- }
- static int treeDeleteEntry(lsm_db *db, TreeCursor *pCsr, u32 iNewptr){
- TreeRoot *p = &db->treehdr.root;
- TreeNode *pNode = pCsr->apTreeNode[pCsr->iNode];
- int iSlot = pCsr->aiCell[pCsr->iNode];
- int bLeaf;
- int rc = LSM_OK;
- assert( pNode->aiKeyPtr[1] );
- assert( pNode->aiKeyPtr[iSlot] );
- assert( iSlot==0 || iSlot==1 || iSlot==2 );
- assert( ((u32)pCsr->iNode==(db->treehdr.root.nHeight-1))==(iNewptr==0) );
- bLeaf = ((u32)pCsr->iNode==(p->nHeight-1) && p->nHeight>1);
-
- if( pNode->aiKeyPtr[0] || pNode->aiKeyPtr[2] ){
- /* There are currently at least 2 keys on this node. So just create
- ** a new copy of the node with one of the keys removed. If the node
- ** happens to be the root node of the tree, allocate an entire
- ** TreeNode structure instead of just a TreeLeaf. */
- TreeNode *pNew;
- u32 iNew;
- if( bLeaf ){
- pNew = (TreeNode *)newTreeLeaf(db, &iNew, &rc);
- }else{
- pNew = newTreeNode(db, &iNew, &rc);
- }
- if( pNew ){
- int i;
- int iOut = 1;
- for(i=0; i<4; i++){
- if( i==iSlot ){
- i++;
- if( bLeaf==0 ) pNew->aiChildPtr[iOut] = iNewptr;
- if( i<3 ) pNew->aiKeyPtr[iOut] = pNode->aiKeyPtr[i];
- iOut++;
- }else if( bLeaf || p->nHeight==1 ){
- if( i<3 && pNode->aiKeyPtr[i] ){
- pNew->aiKeyPtr[iOut++] = pNode->aiKeyPtr[i];
- }
- }else{
- if( getChildPtr(pNode, WORKING_VERSION, i) ){
- pNew->aiChildPtr[iOut] = getChildPtr(pNode, WORKING_VERSION, i);
- if( i<3 ) pNew->aiKeyPtr[iOut] = pNode->aiKeyPtr[i];
- iOut++;
- }
- }
- }
- assert( iOut<=4 );
- assert( bLeaf || pNew->aiChildPtr[0]==0 );
- pCsr->iNode--;
- rc = treeUpdatePtr(db, pCsr, iNew);
- }
- }else if( pCsr->iNode==0 ){
- /* Removing the only key in the root node. iNewptr is the new root. */
- assert( iSlot==1 );
- db->treehdr.root.iRoot = iNewptr;
- db->treehdr.root.nHeight--;
- }else{
- /* There is only one key on this node and the node is not the root
- ** node. Find a peer for this node. Then redistribute the contents of
- ** the peer and the parent cell between the parent and either one or
- ** two new nodes. */
- TreeNode *pParent; /* Parent tree node */
- int iPSlot;
- u32 iPeer; /* Pointer to peer leaf node */
- int iDir;
- TreeNode *pPeer; /* The peer leaf node */
- TreeNode *pNew1; u32 iNew1; /* First new leaf node */
- assert( iSlot==1 );
- pParent = pCsr->apTreeNode[pCsr->iNode-1];
- iPSlot = pCsr->aiCell[pCsr->iNode-1];
- if( iPSlot>0 && getChildPtr(pParent, WORKING_VERSION, iPSlot-1) ){
- iDir = -1;
- }else{
- iDir = +1;
- }
- iPeer = getChildPtr(pParent, WORKING_VERSION, iPSlot+iDir);
- pPeer = (TreeNode *)treeShmptr(db, iPeer);
- assertIsWorkingChild(db, pNode, pParent, iPSlot);
- /* Allocate the first new leaf node. This is always required. */
- if( bLeaf ){
- pNew1 = (TreeNode *)newTreeLeaf(db, &iNew1, &rc);
- }else{
- pNew1 = (TreeNode *)newTreeNode(db, &iNew1, &rc);
- }
- if( pPeer->aiKeyPtr[0] && pPeer->aiKeyPtr[2] ){
- /* Peer node is completely full. This means that two new leaf nodes
- ** and a new parent node are required. */
- TreeNode *pNew2; u32 iNew2; /* Second new leaf node */
- TreeNode *pNewP; u32 iNewP; /* New parent node */
- if( bLeaf ){
- pNew2 = (TreeNode *)newTreeLeaf(db, &iNew2, &rc);
- }else{
- pNew2 = (TreeNode *)newTreeNode(db, &iNew2, &rc);
- }
- pNewP = copyTreeNode(db, pParent, &iNewP, &rc);
- if( iDir==-1 ){
- pNew1->aiKeyPtr[1] = pPeer->aiKeyPtr[0];
- if( bLeaf==0 ){
- pNew1->aiChildPtr[1] = getChildPtr(pPeer, WORKING_VERSION, 0);
- pNew1->aiChildPtr[2] = getChildPtr(pPeer, WORKING_VERSION, 1);
- }
- pNewP->aiChildPtr[iPSlot-1] = iNew1;
- pNewP->aiKeyPtr[iPSlot-1] = pPeer->aiKeyPtr[1];
- pNewP->aiChildPtr[iPSlot] = iNew2;
- pNew2->aiKeyPtr[0] = pPeer->aiKeyPtr[2];
- pNew2->aiKeyPtr[1] = pParent->aiKeyPtr[iPSlot-1];
- if( bLeaf==0 ){
- pNew2->aiChildPtr[0] = getChildPtr(pPeer, WORKING_VERSION, 2);
- pNew2->aiChildPtr[1] = getChildPtr(pPeer, WORKING_VERSION, 3);
- pNew2->aiChildPtr[2] = iNewptr;
- }
- }else{
- pNew1->aiKeyPtr[1] = pParent->aiKeyPtr[iPSlot];
- if( bLeaf==0 ){
- pNew1->aiChildPtr[1] = iNewptr;
- pNew1->aiChildPtr[2] = getChildPtr(pPeer, WORKING_VERSION, 0);
- }
- pNewP->aiChildPtr[iPSlot] = iNew1;
- pNewP->aiKeyPtr[iPSlot] = pPeer->aiKeyPtr[0];
- pNewP->aiChildPtr[iPSlot+1] = iNew2;
- pNew2->aiKeyPtr[0] = pPeer->aiKeyPtr[1];
- pNew2->aiKeyPtr[1] = pPeer->aiKeyPtr[2];
- if( bLeaf==0 ){
- pNew2->aiChildPtr[0] = getChildPtr(pPeer, WORKING_VERSION, 1);
- pNew2->aiChildPtr[1] = getChildPtr(pPeer, WORKING_VERSION, 2);
- pNew2->aiChildPtr[2] = getChildPtr(pPeer, WORKING_VERSION, 3);
- }
- }
- assert( pCsr->iNode>=1 );
- pCsr->iNode -= 2;
- if( rc==LSM_OK ){
- assert( pNew1->aiKeyPtr[1] && pNew2->aiKeyPtr[1] );
- rc = treeUpdatePtr(db, pCsr, iNewP);
- }
- }else{
- int iKOut = 0;
- int iPOut = 0;
- int i;
- pCsr->iNode--;
- if( iDir==1 ){
- pNew1->aiKeyPtr[iKOut++] = pParent->aiKeyPtr[iPSlot];
- if( bLeaf==0 ) pNew1->aiChildPtr[iPOut++] = iNewptr;
- }
- for(i=0; i<3; i++){
- if( pPeer->aiKeyPtr[i] ){
- pNew1->aiKeyPtr[iKOut++] = pPeer->aiKeyPtr[i];
- }
- }
- if( bLeaf==0 ){
- for(i=0; i<4; i++){
- if( getChildPtr(pPeer, WORKING_VERSION, i) ){
- pNew1->aiChildPtr[iPOut++] = getChildPtr(pPeer, WORKING_VERSION, i);
- }
- }
- }
- if( iDir==-1 ){
- iPSlot--;
- pNew1->aiKeyPtr[iKOut++] = pParent->aiKeyPtr[iPSlot];
- if( bLeaf==0 ) pNew1->aiChildPtr[iPOut++] = iNewptr;
- pCsr->aiCell[pCsr->iNode] = (u8)iPSlot;
- }
- rc = treeDeleteEntry(db, pCsr, iNew1);
- }
- }
- return rc;
- }
- /*
- ** Delete a range of keys from the tree structure (i.e. the lsm_delete_range()
- ** function, not lsm_delete()).
- **
- ** This is a two step process:
- **
- ** 1) Remove all entries currently stored in the tree that have keys
- ** that fall into the deleted range.
- **
- ** TODO: There are surely good ways to optimize this step - removing
- ** a range of keys from a b-tree. But for now, this function removes
- ** them one at a time using the usual approach.
- **
- ** 2) Unless the largest key smaller than or equal to (pKey1/nKey1) is
- ** already marked as START_DELETE, insert a START_DELETE key.
- ** Similarly, unless the smallest key greater than or equal to
- ** (pKey2/nKey2) is already START_END, insert a START_END key.
- */
- int lsmTreeDelete(
- lsm_db *db,
- void *pKey1, int nKey1, /* Start of range */
- void *pKey2, int nKey2 /* End of range */
- ){
- int rc = LSM_OK;
- int bDone = 0;
- TreeRoot *p = &db->treehdr.root;
- TreeBlob blob = {0, 0};
- /* The range must be sensible - that (key1 < key2). */
- assert( treeKeycmp(pKey1, nKey1, pKey2, nKey2)<0 );
- assert( assert_delete_ranges_match(db) );
- #if 0
- static int nCall = 0;
- printf("\n");
- nCall++;
- printf("%d delete %s .. %s\n", nCall, (char *)pKey1, (char *)pKey2);
- dump_tree_contents(db, "before delete");
- #endif
- /* Step 1. This loop runs until the tree contains no keys within the
- ** range being deleted. Or until an error occurs. */
- while( bDone==0 && rc==LSM_OK ){
- int res;
- TreeCursor csr; /* Cursor to seek to first key in range */
- void *pDel; int nDel; /* Key to (possibly) delete this iteration */
- #ifndef NDEBUG
- int nEntry = treeCountEntries(db);
- #endif
- /* Seek the cursor to the first entry in the tree greater than pKey1. */
- treeCursorInit(db, 0, &csr);
- lsmTreeCursorSeek(&csr, pKey1, nKey1, &res);
- if( res<=0 && lsmTreeCursorValid(&csr) ) lsmTreeCursorNext(&csr);
- /* If there is no such entry, or if it is greater than pKey2, then the
- ** tree now contains no keys in the range being deleted. In this case
- ** break out of the loop. */
- bDone = 1;
- if( lsmTreeCursorValid(&csr) ){
- lsmTreeCursorKey(&csr, 0, &pDel, &nDel);
- if( treeKeycmp(pDel, nDel, pKey2, nKey2)<0 ) bDone = 0;
- }
- if( bDone==0 ){
- if( (u32)csr.iNode==(p->nHeight-1) ){
- /* The element to delete already lies on a leaf node */
- rc = treeDeleteEntry(db, &csr, 0);
- }else{
- /* 1. Overwrite the current key with a copy of the next key in the
- ** tree (key N).
- **
- ** 2. Seek to key N (cursor will stop at the internal node copy of
- ** N). Move to the next key (original copy of N). Delete
- ** this entry.
- */
- u32 iKey;
- TreeKey *pKey;
- int iNode = csr.iNode;
- lsmTreeCursorNext(&csr);
- assert( (u32)csr.iNode==(p->nHeight-1) );
- iKey = csr.apTreeNode[csr.iNode]->aiKeyPtr[csr.aiCell[csr.iNode]];
- lsmTreeCursorPrev(&csr);
- treeOverwriteKey(db, &csr, iKey, &rc);
- pKey = treeShmkey(db, iKey, TKV_LOADKEY, &blob, &rc);
- if( pKey ){
- rc = lsmTreeCursorSeek(&csr, TKV_KEY(pKey), pKey->nKey, &res);
- }
- if( rc==LSM_OK ){
- assert( res==0 && csr.iNode==iNode );
- rc = lsmTreeCursorNext(&csr);
- if( rc==LSM_OK ){
- rc = treeDeleteEntry(db, &csr, 0);
- }
- }
- }
- }
- /* Clean up any memory allocated by the cursor. */
- tblobFree(db, &csr.blob);
- #if 0
- dump_tree_contents(db, "ddd delete");
- #endif
- assert( bDone || treeCountEntries(db)==(nEntry-1) );
- }
- #if 0
- dump_tree_contents(db, "during delete");
- #endif
- /* Now insert the START_DELETE and END_DELETE keys. */
- if( rc==LSM_OK ){
- rc = treeInsertEntry(db, LSM_START_DELETE, pKey1, nKey1, 0, -1);
- }
- #if 0
- dump_tree_contents(db, "during delete 2");
- #endif
- if( rc==LSM_OK ){
- rc = treeInsertEntry(db, LSM_END_DELETE, pKey2, nKey2, 0, -1);
- }
- #if 0
- dump_tree_contents(db, "after delete");
- #endif
- tblobFree(db, &blob);
- assert( assert_delete_ranges_match(db) );
- return rc;
- }
- /*
- ** Return, in bytes, the amount of memory currently used by the tree
- ** structure.
- */
- int lsmTreeSize(lsm_db *pDb){
- return pDb->treehdr.root.nByte;
- }
- /*
- ** Open a cursor on the in-memory tree pTree.
- */
- int lsmTreeCursorNew(lsm_db *pDb, int bOld, TreeCursor **ppCsr){
- TreeCursor *pCsr;
- *ppCsr = pCsr = lsmMalloc(pDb->pEnv, sizeof(TreeCursor));
- if( pCsr ){
- treeCursorInit(pDb, bOld, pCsr);
- return LSM_OK;
- }
- return LSM_NOMEM_BKPT;
- }
- /*
- ** Close an in-memory tree cursor.
- */
- void lsmTreeCursorDestroy(TreeCursor *pCsr){
- if( pCsr ){
- tblobFree(pCsr->pDb, &pCsr->blob);
- lsmFree(pCsr->pDb->pEnv, pCsr);
- }
- }
- void lsmTreeCursorReset(TreeCursor *pCsr){
- if( pCsr ){
- pCsr->iNode = -1;
- pCsr->pSave = 0;
- }
- }
- #ifndef NDEBUG
- static int treeCsrCompare(TreeCursor *pCsr, void *pKey, int nKey, int *pRc){
- TreeKey *p;
- int cmp = 0;
- assert( pCsr->iNode>=0 );
- p = csrGetKey(pCsr, &pCsr->blob, pRc);
- if( p ){
- cmp = treeKeycmp(TKV_KEY(p), p->nKey, pKey, nKey);
- }
- return cmp;
- }
- #endif
- /*
- ** Attempt to seek the cursor passed as the first argument to key (pKey/nKey)
- ** in the tree structure. If an exact match for the key is found, leave the
- ** cursor pointing to it and set *pRes to zero before returning. If an
- ** exact match cannot be found, do one of the following:
- **
- ** * Leave the cursor pointing to the smallest element in the tree that
- ** is larger than the key and set *pRes to +1, or
- **
- ** * Leave the cursor pointing to the largest element in the tree that
- ** is smaller than the key and set *pRes to -1, or
- **
- ** * If the tree is empty, leave the cursor at EOF and set *pRes to -1.
- */
- int lsmTreeCursorSeek(TreeCursor *pCsr, void *pKey, int nKey, int *pRes){
- int rc = LSM_OK; /* Return code */
- lsm_db *pDb = pCsr->pDb;
- TreeRoot *pRoot = pCsr->pRoot;
- u32 iNodePtr; /* Location of current node in search */
- /* Discard any saved position data */
- treeCursorRestore(pCsr, 0);
- iNodePtr = pRoot->iRoot;
- if( iNodePtr==0 ){
- /* Either an error occurred or the tree is completely empty. */
- assert( rc!=LSM_OK || pRoot->iRoot==0 );
- *pRes = -1;
- pCsr->iNode = -1;
- }else{
- TreeBlob b = {0, 0};
- int res = 0; /* Result of comparison function */
- int iNode = -1;
- while( iNodePtr ){
- TreeNode *pNode; /* Node at location iNodePtr */
- int iTest; /* Index of second key to test (0 or 2) */
- u32 iTreeKey;
- TreeKey *pTreeKey; /* Key to compare against */
- pNode = (TreeNode *)treeShmptrUnsafe(pDb, iNodePtr);
- iNode++;
- pCsr->apTreeNode[iNode] = pNode;
- /* Compare (pKey/nKey) with the key in the middle slot of B-tree node
- ** pNode. The middle slot is never empty. If the comparison is a match,
- ** then the search is finished. Break out of the loop. */
- pTreeKey = (TreeKey*)treeShmptrUnsafe(pDb, pNode->aiKeyPtr[1]);
- if( !(pTreeKey->flags & LSM_CONTIGUOUS) ){
- pTreeKey = treeShmkey(pDb, pNode->aiKeyPtr[1], TKV_LOADKEY, &b, &rc);
- if( rc!=LSM_OK ) break;
- }
- res = treeKeycmp((void *)&pTreeKey[1], pTreeKey->nKey, pKey, nKey);
- if( res==0 ){
- pCsr->aiCell[iNode] = 1;
- break;
- }
- /* Based on the results of the previous comparison, compare (pKey/nKey)
- ** to either the left or right key of the B-tree node, if such a key
- ** exists. */
- iTest = (res>0 ? 0 : 2);
- iTreeKey = pNode->aiKeyPtr[iTest];
- if( iTreeKey ){
- pTreeKey = (TreeKey*)treeShmptrUnsafe(pDb, iTreeKey);
- if( !(pTreeKey->flags & LSM_CONTIGUOUS) ){
- pTreeKey = treeShmkey(pDb, iTreeKey, TKV_LOADKEY, &b, &rc);
- if( rc ) break;
- }
- res = treeKeycmp((void *)&pTreeKey[1], pTreeKey->nKey, pKey, nKey);
- if( res==0 ){
- pCsr->aiCell[iNode] = (u8)iTest;
- break;
- }
- }else{
- iTest = 1;
- }
- if( (u32)iNode<(pRoot->nHeight-1) ){
- iNodePtr = getChildPtr(pNode, pRoot->iTransId, iTest + (res<0));
- }else{
- iNodePtr = 0;
- }
- pCsr->aiCell[iNode] = (u8)(iTest + (iNodePtr && (res<0)));
- }
- *pRes = res;
- pCsr->iNode = iNode;
- tblobFree(pDb, &b);
- }
- /* assert() that *pRes has been set properly */
- #ifndef NDEBUG
- if( rc==LSM_OK && lsmTreeCursorValid(pCsr) ){
- int cmp = treeCsrCompare(pCsr, pKey, nKey, &rc);
- assert( rc!=LSM_OK || *pRes==cmp || (*pRes ^ cmp)>0 );
- }
- #endif
- return rc;
- }
- int lsmTreeCursorNext(TreeCursor *pCsr){
- #ifndef NDEBUG
- TreeKey *pK1;
- TreeBlob key1 = {0, 0};
- #endif
- lsm_db *pDb = pCsr->pDb;
- TreeRoot *pRoot = pCsr->pRoot;
- const int iLeaf = pRoot->nHeight-1;
- int iCell;
- int rc = LSM_OK;
- TreeNode *pNode;
- /* Restore the cursor position, if required */
- int iRestore = 0;
- treeCursorRestore(pCsr, &iRestore);
- if( iRestore>0 ) return LSM_OK;
- /* Save a pointer to the current key. This is used in an assert() at the
- ** end of this function - to check that the 'next' key really is larger
- ** than the current key. */
- #ifndef NDEBUG
- pK1 = csrGetKey(pCsr, &key1, &rc);
- if( rc!=LSM_OK ) return rc;
- #endif
- assert( lsmTreeCursorValid(pCsr) );
- assert( pCsr->aiCell[pCsr->iNode]<3 );
- pNode = pCsr->apTreeNode[pCsr->iNode];
- iCell = ++pCsr->aiCell[pCsr->iNode];
- /* If the current node is not a leaf, and the current cell has sub-tree
- ** associated with it, descend to the left-most key on the left-most
- ** leaf of the sub-tree. */
- if( pCsr->iNode<iLeaf && getChildPtr(pNode, pRoot->iTransId, iCell) ){
- do {
- u32 iNodePtr;
- pCsr->iNode++;
- iNodePtr = getChildPtr(pNode, pRoot->iTransId, iCell);
- pNode = (TreeNode *)treeShmptr(pDb, iNodePtr);
- pCsr->apTreeNode[pCsr->iNode] = pNode;
- iCell = pCsr->aiCell[pCsr->iNode] = (pNode->aiKeyPtr[0]==0);
- }while( pCsr->iNode < iLeaf );
- }
- /* Otherwise, the next key is found by following pointer up the tree
- ** until there is a key immediately to the right of the pointer followed
- ** to reach the sub-tree containing the current key. */
- else if( iCell>=3 || pNode->aiKeyPtr[iCell]==0 ){
- while( (--pCsr->iNode)>=0 ){
- iCell = pCsr->aiCell[pCsr->iNode];
- if( iCell<3 && pCsr->apTreeNode[pCsr->iNode]->aiKeyPtr[iCell] ) break;
- }
- }
- #ifndef NDEBUG
- if( pCsr->iNode>=0 ){
- TreeKey *pK2 = csrGetKey(pCsr, &pCsr->blob, &rc);
- assert( rc||treeKeycmp(TKV_KEY(pK2),pK2->nKey,TKV_KEY(pK1),pK1->nKey)>=0 );
- }
- tblobFree(pDb, &key1);
- #endif
- return rc;
- }
- int lsmTreeCursorPrev(TreeCursor *pCsr){
- #ifndef NDEBUG
- TreeKey *pK1;
- TreeBlob key1 = {0, 0};
- #endif
- lsm_db *pDb = pCsr->pDb;
- TreeRoot *pRoot = pCsr->pRoot;
- const int iLeaf = pRoot->nHeight-1;
- int iCell;
- int rc = LSM_OK;
- TreeNode *pNode;
- /* Restore the cursor position, if required */
- int iRestore = 0;
- treeCursorRestore(pCsr, &iRestore);
- if( iRestore<0 ) return LSM_OK;
- /* Save a pointer to the current key. This is used in an assert() at the
- ** end of this function - to check that the 'next' key really is smaller
- ** than the current key. */
- #ifndef NDEBUG
- pK1 = csrGetKey(pCsr, &key1, &rc);
- if( rc!=LSM_OK ) return rc;
- #endif
- assert( lsmTreeCursorValid(pCsr) );
- pNode = pCsr->apTreeNode[pCsr->iNode];
- iCell = pCsr->aiCell[pCsr->iNode];
- assert( iCell>=0 && iCell<3 );
- /* If the current node is not a leaf, and the current cell has sub-tree
- ** associated with it, descend to the right-most key on the right-most
- ** leaf of the sub-tree. */
- if( pCsr->iNode<iLeaf && getChildPtr(pNode, pRoot->iTransId, iCell) ){
- do {
- u32 iNodePtr;
- pCsr->iNode++;
- iNodePtr = getChildPtr(pNode, pRoot->iTransId, iCell);
- pNode = (TreeNode *)treeShmptr(pDb, iNodePtr);
- if( rc!=LSM_OK ) break;
- pCsr->apTreeNode[pCsr->iNode] = pNode;
- iCell = 1 + (pNode->aiKeyPtr[2]!=0) + (pCsr->iNode < iLeaf);
- pCsr->aiCell[pCsr->iNode] = (u8)iCell;
- }while( pCsr->iNode < iLeaf );
- }
- /* Otherwise, the next key is found by following pointer up the tree until
- ** there is a key immediately to the left of the pointer followed to reach
- ** the sub-tree containing the current key. */
- else{
- do {
- iCell = pCsr->aiCell[pCsr->iNode]-1;
- if( iCell>=0 && pCsr->apTreeNode[pCsr->iNode]->aiKeyPtr[iCell] ) break;
- }while( (--pCsr->iNode)>=0 );
- pCsr->aiCell[pCsr->iNode] = (u8)iCell;
- }
- #ifndef NDEBUG
- if( pCsr->iNode>=0 ){
- TreeKey *pK2 = csrGetKey(pCsr, &pCsr->blob, &rc);
- assert( rc || treeKeycmp(TKV_KEY(pK2),pK2->nKey,TKV_KEY(pK1),pK1->nKey)<0 );
- }
- tblobFree(pDb, &key1);
- #endif
- return rc;
- }
- /*
- ** Move the cursor to the first (bLast==0) or last (bLast!=0) entry in the
- ** in-memory tree.
- */
- int lsmTreeCursorEnd(TreeCursor *pCsr, int bLast){
- lsm_db *pDb = pCsr->pDb;
- TreeRoot *pRoot = pCsr->pRoot;
- int rc = LSM_OK;
- u32 iNodePtr;
- pCsr->iNode = -1;
- /* Discard any saved position data */
- treeCursorRestore(pCsr, 0);
- iNodePtr = pRoot->iRoot;
- while( iNodePtr ){
- int iCell;
- TreeNode *pNode;
- pNode = (TreeNode *)treeShmptr(pDb, iNodePtr);
- if( rc ) break;
- if( bLast ){
- iCell = ((pNode->aiKeyPtr[2]==0) ? 2 : 3);
- }else{
- iCell = ((pNode->aiKeyPtr[0]==0) ? 1 : 0);
- }
- pCsr->iNode++;
- pCsr->apTreeNode[pCsr->iNode] = pNode;
- if( (u32)pCsr->iNode<pRoot->nHeight-1 ){
- iNodePtr = getChildPtr(pNode, pRoot->iTransId, iCell);
- }else{
- iNodePtr = 0;
- }
- pCsr->aiCell[pCsr->iNode] = (u8)(iCell - (iNodePtr==0 && bLast));
- }
- return rc;
- }
- int lsmTreeCursorFlags(TreeCursor *pCsr){
- int flags = 0;
- if( pCsr && pCsr->iNode>=0 ){
- int rc = LSM_OK;
- TreeKey *pKey = (TreeKey *)treeShmptrUnsafe(pCsr->pDb,
- pCsr->apTreeNode[pCsr->iNode]->aiKeyPtr[pCsr->aiCell[pCsr->iNode]]
- );
- assert( rc==LSM_OK );
- flags = (pKey->flags & ~LSM_CONTIGUOUS);
- }
- return flags;
- }
- int lsmTreeCursorKey(TreeCursor *pCsr, int *pFlags, void **ppKey, int *pnKey){
- TreeKey *pTreeKey;
- int rc = LSM_OK;
- assert( lsmTreeCursorValid(pCsr) );
- pTreeKey = pCsr->pSave;
- if( !pTreeKey ){
- pTreeKey = csrGetKey(pCsr, &pCsr->blob, &rc);
- }
- if( rc==LSM_OK ){
- *pnKey = pTreeKey->nKey;
- if( pFlags ) *pFlags = pTreeKey->flags;
- *ppKey = (void *)&pTreeKey[1];
- }
- return rc;
- }
- int lsmTreeCursorValue(TreeCursor *pCsr, void **ppVal, int *pnVal){
- int res = 0;
- int rc;
- rc = treeCursorRestore(pCsr, &res);
- if( res==0 ){
- TreeKey *pTreeKey = csrGetKey(pCsr, &pCsr->blob, &rc);
- if( rc==LSM_OK ){
- if( pTreeKey->flags & LSM_INSERT ){
- *pnVal = pTreeKey->nValue;
- *ppVal = TKV_VAL(pTreeKey);
- }else{
- *ppVal = 0;
- *pnVal = -1;
- }
- }
- }else{
- *ppVal = 0;
- *pnVal = 0;
- }
- return rc;
- }
- /*
- ** Return true if the cursor currently points to a valid entry.
- */
- int lsmTreeCursorValid(TreeCursor *pCsr){
- return (pCsr && (pCsr->pSave || pCsr->iNode>=0));
- }
- /*
- ** Store a mark in *pMark. Later on, a call to lsmTreeRollback() with a
- ** pointer to the same TreeMark structure may be used to roll the tree
- ** contents back to their current state.
- */
- void lsmTreeMark(lsm_db *pDb, TreeMark *pMark){
- pMark->iRoot = pDb->treehdr.root.iRoot;
- pMark->nHeight = pDb->treehdr.root.nHeight;
- pMark->iWrite = pDb->treehdr.iWrite;
- pMark->nChunk = pDb->treehdr.nChunk;
- pMark->iNextShmid = pDb->treehdr.iNextShmid;
- pMark->iRollback = intArraySize(&pDb->rollback);
- }
- /*
- ** Roll back to mark pMark. Structure *pMark should have been previously
- ** populated by a call to lsmTreeMark().
- */
- void lsmTreeRollback(lsm_db *pDb, TreeMark *pMark){
- int iIdx;
- int nIdx;
- u32 iNext;
- ShmChunk *pChunk;
- u32 iChunk;
- u32 iShmid;
- /* Revert all required v2 pointers. */
- nIdx = intArraySize(&pDb->rollback);
- for(iIdx = pMark->iRollback; iIdx<nIdx; iIdx++){
- TreeNode *pNode;
- pNode = treeShmptr(pDb, intArrayEntry(&pDb->rollback, iIdx));
- assert( pNode );
- pNode->iV2 = 0;
- pNode->iV2Child = 0;
- pNode->iV2Ptr = 0;
- }
- intArrayTruncate(&pDb->rollback, pMark->iRollback);
- /* Restore the free-chunk list. */
- assert( pMark->iWrite!=0 );
- iChunk = treeOffsetToChunk(pMark->iWrite-1);
- pChunk = treeShmChunk(pDb, iChunk);
- iNext = pChunk->iNext;
- pChunk->iNext = 0;
- pChunk = treeShmChunk(pDb, pDb->treehdr.iFirst);
- iShmid = pChunk->iShmid-1;
- while( iNext ){
- u32 iFree = iNext; /* Current chunk being rollback-freed */
- ShmChunk *pFree; /* Pointer to chunk iFree */
- pFree = treeShmChunk(pDb, iFree);
- iNext = pFree->iNext;
- if( iFree<pMark->nChunk ){
- pFree->iNext = pDb->treehdr.iFirst;
- pFree->iShmid = iShmid--;
- pDb->treehdr.iFirst = iFree;
- }
- }
- /* Restore the tree-header fields */
- pDb->treehdr.root.iRoot = pMark->iRoot;
- pDb->treehdr.root.nHeight = pMark->nHeight;
- pDb->treehdr.iWrite = pMark->iWrite;
- pDb->treehdr.nChunk = pMark->nChunk;
- pDb->treehdr.iNextShmid = pMark->iNextShmid;
- }
- /*
- ** Load the in-memory tree header from shared-memory into pDb->treehdr.
- ** If the header cannot be loaded, return LSM_PROTOCOL.
- **
- ** If the header is successfully loaded and parameter piRead is not NULL,
- ** is is set to 1 if the header was loaded from ShmHeader.hdr1, or 2 if
- ** the header was loaded from ShmHeader.hdr2.
- */
- int lsmTreeLoadHeader(lsm_db *pDb, int *piRead){
- int nRem = LSM_ATTEMPTS_BEFORE_PROTOCOL;
- while( (nRem--)>0 ){
- ShmHeader *pShm = pDb->pShmhdr;
- memcpy(&pDb->treehdr, &pShm->hdr1, sizeof(TreeHeader));
- if( treeHeaderChecksumOk(&pDb->treehdr) ){
- if( piRead ) *piRead = 1;
- return LSM_OK;
- }
- memcpy(&pDb->treehdr, &pShm->hdr2, sizeof(TreeHeader));
- if( treeHeaderChecksumOk(&pDb->treehdr) ){
- if( piRead ) *piRead = 2;
- return LSM_OK;
- }
- lsmShmBarrier(pDb);
- }
- return LSM_PROTOCOL_BKPT;
- }
- int lsmTreeLoadHeaderOk(lsm_db *pDb, int iRead){
- TreeHeader *p = (iRead==1) ? &pDb->pShmhdr->hdr1 : &pDb->pShmhdr->hdr2;
- assert( iRead==1 || iRead==2 );
- return (0==memcmp(pDb->treehdr.aCksum, p->aCksum, sizeof(u32)*2));
- }
- /*
- ** This function is called to conclude a transaction. If argument bCommit
- ** is true, the transaction is committed. Otherwise it is rolled back.
- */
- int lsmTreeEndTransaction(lsm_db *pDb, int bCommit){
- ShmHeader *pShm = pDb->pShmhdr;
- treeHeaderChecksum(&pDb->treehdr, pDb->treehdr.aCksum);
- memcpy(&pShm->hdr2, &pDb->treehdr, sizeof(TreeHeader));
- lsmShmBarrier(pDb);
- memcpy(&pShm->hdr1, &pDb->treehdr, sizeof(TreeHeader));
- pShm->bWriter = 0;
- intArrayFree(pDb->pEnv, &pDb->rollback);
- return LSM_OK;
- }
- #ifndef NDEBUG
- static int assert_delete_ranges_match(lsm_db *db){
- int prev = 0;
- TreeBlob blob = {0, 0};
- TreeCursor csr; /* Cursor used to iterate through tree */
- int rc;
- treeCursorInit(db, 0, &csr);
- for( rc = lsmTreeCursorEnd(&csr, 0);
- rc==LSM_OK && lsmTreeCursorValid(&csr);
- rc = lsmTreeCursorNext(&csr)
- ){
- TreeKey *pKey = csrGetKey(&csr, &blob, &rc);
- if( rc!=LSM_OK ) break;
- assert( ((prev&LSM_START_DELETE)==0)==((pKey->flags&LSM_END_DELETE)==0) );
- prev = pKey->flags;
- }
- tblobFree(csr.pDb, &csr.blob);
- tblobFree(csr.pDb, &blob);
- return 1;
- }
- static int treeCountEntries(lsm_db *db){
- TreeCursor csr; /* Cursor used to iterate through tree */
- int rc;
- int nEntry = 0;
- treeCursorInit(db, 0, &csr);
- for( rc = lsmTreeCursorEnd(&csr, 0);
- rc==LSM_OK && lsmTreeCursorValid(&csr);
- rc = lsmTreeCursorNext(&csr)
- ){
- nEntry++;
- }
- tblobFree(csr.pDb, &csr.blob);
- return nEntry;
- }
- #endif
|