123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481 |
- #include <stdio.h>
- #include <errno.h>
- #include <fcntl.h>
- #include <sys/mman.h>
- #include <unistd.h>
- #include <sys/stat.h>
- #include "db.h"
- void DB_InitEmpty(DB* db) {
- SVEC_init(&db->defs);
- HT_init(&db->defLookup, 128);
- HT_init(&db->storeLookup, 128);
- SVEC_init(&db->stores);
- }
- ObjDef* DB_AllocObjDef(DB* db, char* name) {
- ObjDef* o = SVEC_inc(&db->defs);
- o->id = SVEC_len(&db->defs);
- o->name = strint(name);
-
- HT_set(&db->defLookup, o->name, o);
-
- return o;
- }
- int DB_FinalizeObjDef(DB* db, ObjDef* o) {
-
- // TODO: validate ids, limits, field bounds and types
-
-
- return 0;
- }
- #define sprintfint(x, ...) \
- do { \
- char* __tmp = sprintfdup(__VA_ARGS__); \
- x = strint(__tmp); \
- free(__tmp); \
- } while(0);
- // creates a new store from scratch
- ObjStore* DB_CreateStore(DB* db, ObjDef* o, char* name) {
-
- ObjStore* st = calloc(1, sizeof(*st));
- st->name = strint(name);
- HT_init(&st->indices, 128);
- st->def = o;
-
- st->mapSize = PAGE_SIZE * 4; // pure voodoo
- st->mapCount = st->mapSize / o->size;
-
- // create backing files
- st->backingRelPath = strint(name);
- char* tmp = path_join(db->backingDir, st->backingRelPath);
- st->backingAbsPath = strint(tmp);
- free(tmp);
-
- sprintfint(st->metaRelPath, "%s.meta", st->backingRelPath);
- sprintfint(st->metaAbsPath, "%s.meta", st->backingAbsPath);
- sprintfint(st->statsRelPath, "%s.stats", st->backingRelPath);
- sprintfint(st->statsAbsPath, "%s.stats", st->backingAbsPath);
-
-
- // save metadata
- void* md;
- u64 mdlen;
- ObjDef_Serialize(o, &md, &mdlen);
- write_whole_file(st->metaAbsPath, md, mdlen);
- free(md);
-
- // set up the stats mapping
- st->statsSize = 512;
- st->statsFD = open(st->statsAbsPath, O_RDWR | O_CREAT, 0777);
- if(-1 == st->statsFD) {
- printf("could not open stats file '%s': %s\n", st->statsAbsPath, strerror(errno));
- exit(1);
- }
- ftruncate(st->statsFD, st->statsSize);
- st->stats = mmap(NULL, st->statsSize, PROT_READ | PROT_WRITE, MAP_SHARED, st->statsFD, 0);
-
-
- // initialize first pages
- pthread_mutex_init(&st->writeLock, NULL);
- pthread_mutex_lock(&st->writeLock);
-
- st->backingFD = open(st->backingAbsPath, O_RDWR | O_CREAT, 0777);
- if(-1 == st->backingFD) {
- printf("could not open backing file '%s': %s\n", st->backingAbsPath, strerror(errno));
- exit(1);
- }
-
- // check the file
- struct stat fst;
- fstat(st->backingFD, &fst);
- st->fileSize = fst.st_size;
- if(st->fileSize < st->mapSize) {
- ftruncate(st->backingFD, st->mapSize); // grow the file by truncating it.
- st->fileSize = st->mapSize;
- }
-
- st->mapsAlloc = 64;
- st->maps = calloc(1, st->mapsAlloc * sizeof(*st->maps));
- st->mapsLen = 1;
-
- void* first = mmap(NULL, st->mapSize, PROT_READ | PROT_WRITE, MAP_SHARED, st->backingFD, 0);
- if(first == MAP_FAILED) {
- printf("could not map memory: %s\n", strerror(errno));
- exit(1);
- }
-
- st->maps[0] = first;
-
- // create indices
- VEC_EACHP(&o->indices, i, idef) {
- Index* index = calloc(1, sizeof(*index));
- index->def = idef;
- HT_init(&index->hash64.ht, 128);
-
- HT_set(&st->indices, index->def->id, index);
- }
-
- pthread_mutex_unlock(&st->writeLock);
-
- return st;
- }
- ObjStore* DB_LoadStore(DB* db, char* name) {
-
- ObjStore* st = calloc(1, sizeof(*st));
- st->name = strint(name);
- HT_init(&st->indices, 128);
- st->mapSize = PAGE_SIZE * 4; // pure voodoo
-
- // backing file paths
- st->backingRelPath = strint(name);
- char* tmp = path_join(db->backingDir, st->backingRelPath);
- st->backingAbsPath = strint(tmp);
- free(tmp);
-
- sprintfint(st->metaRelPath, "%s.meta", st->backingRelPath);
- sprintfint(st->metaAbsPath, "%s.meta", st->backingAbsPath);
- sprintfint(st->statsRelPath, "%s.stats", st->backingRelPath);
- sprintfint(st->statsAbsPath, "%s.stats", st->backingAbsPath);
-
- // load the stats
- st->statsSize = 512;
- st->statsFD = open(st->statsAbsPath, O_RDWR | O_CREAT, 0777);
- if(-1 == st->statsFD) {
- printf("could not open stats file '%s': %s\n", st->statsAbsPath, strerror(errno));
- exit(1);
- }
- st->stats = mmap(NULL, st->statsSize, PROT_READ | PROT_WRITE, MAP_SHARED, st->statsFD, 0);
-
- ObjStore_ReadStats(st);
-
- // load metadata
- u64 mdlen;
- void* md = read_whole_file(st->metaAbsPath, &mdlen);
- st->def = calloc(1, sizeof(*st->def));
- ObjDef_Deserialize(st->def, md, mdlen);
- free(md);
-
- st->mapCount = st->mapSize / st->def->size;
-
- // initialize file and locks
- pthread_mutex_init(&st->writeLock, NULL);
- pthread_mutex_lock(&st->writeLock);
-
- st->backingFD = open(st->backingAbsPath, O_RDWR);
- if(-1 == st->backingFD) {
- printf("could not open backing file '%s': %s\n", st->backingAbsPath, strerror(errno));
- exit(1);
- }
-
- // cache the file size
- struct stat fst;
- fstat(st->backingFD, &fst);
- st->fileSize = fst.st_size;
-
- // memory map pointers
- st->mapsAlloc = 64;
- st->maps = calloc(1, st->mapsAlloc * sizeof(*st->maps));
- st->mapsLen = 0;
-
-
- // rebuild the indices
- VEC_EACHP(&st->def->indices, i, idef) {
- Index* index = calloc(1, sizeof(*index));
- index->def = idef;
- HT_init(&index->hash64.ht, 128);
-
- HT_set(&st->indices, index->def->id, index);
-
- ObjStore_RebuildIndex(db, st, index->def->id); // BUG: slow
- }
-
- pthread_mutex_unlock(&st->writeLock);
-
- return st;
- }
- // not guarded because it's only used during init before the locks even exist
- void ObjStore_ReadStats(ObjStore* st) {
- st->count = *(u64*)st->stats;
- }
- // not guarded because it's only called from within an existing lock
- void ObjStore_WriteStats(ObjStore* st) {
- *(u64*)st->stats = st->count;
- msync(st->stats, st->statsSize, MS_SYNC);
- }
- int ObjStore_ActivateMapping(DB* db, ObjStore* st, u32 mapIndex) {
-
- // check the maps table size
- if(st->mapsAlloc < mapIndex) {
- u64 oldAlloc = st->mapsAlloc;
-
- st->mapsAlloc *= 2; // TODO: bug, nextPOT
- st->maps = realloc(st->maps, st->mapsAlloc * sizeof(*st->maps));
-
- memset(st->maps + oldAlloc, 0, (st->mapsAlloc - oldAlloc) * sizeof(*st->maps));
- }
-
- void* p = st->maps[mapIndex];
- if(p) return 0; // already mapped
-
- if(st->mapsLen < mapIndex) st->mapsLen = mapIndex;
-
- // TODO: check map limit and evict some memory
-
-
- // check if we need to expand the file
- u64 neededFileSize = (mapIndex + 1) * st->mapSize;
-
- if(st->fileSize < neededFileSize) {
- ftruncate(st->backingFD, neededFileSize); // grow the file by truncating it.
- st->fileSize = neededFileSize;
- }
-
-
- p = mmap(NULL, st->mapSize, PROT_READ | PROT_WRITE, MAP_SHARED, st->backingFD, mapIndex * st->mapSize);
- if(p == MAP_FAILED) {
- printf("could not map memory: %s\n", strerror(errno));
- exit(1);
- }
-
- st->maps[mapIndex] = p;
-
- return 0;
- }
- int ObjStore_RebuildIndex(DB* db, ObjStore* st, u32 indexIndex) {
- Index* index = NULL;
- HT_get(&st->indices, indexIndex, &index);
- if(!index) return 2;
-
- HT_destroy(&index->hash64.ht);
- HT_init(&index->hash64.ht, 128);
-
- u32 off = index->def->offset;
- u32 sz = st->def->size;
-
- u64 loc = 0;
- for(u32 m = 0; m < st->mapsLen; m++) {
- ObjStore_ActivateMapping(db, st, m);
- void* map = st->maps[m] + off;
-
- for(u32 l = 0; l < st->mapCount; l++) {
-
- u64 key = *(u64*)map;
- HT_set(&index->hash64.ht, key, index); // TODO: multihash
-
- map += sz;
-
- if(++loc > st->count) goto DONE;
- }
- }
- DONE:
-
- return 0;
- }
- int ObjStore_Insert(DB* db, ObjStore* st, void* data) {
-
- u64 index = st->count;
-
- u64 objSize = st->def->size;
- u64 mapIndex = index / st->mapCount;
- u64 localIndex = index % st->mapCount;
-
- ObjStore_ActivateMapping(db, st, mapIndex);
-
- void* mapping = st->maps[mapIndex];
- void* dst = mapping + (localIndex * objSize);
-
- pthread_mutex_lock(&st->writeLock);
-
- memcpy(dst, data, objSize);
- msync(mapping, st->mapSize, MS_SYNC); // TEMP: flush the whole mapping for now
-
- st->count++;
- ObjStore_WriteStats(st);
-
- // process indices
- HT_EACH(&st->indices, i, Index*, ind) {
-
-
- u64 key = *(u64*)(data + ind->def->offset);
- HT_set(&ind->hash64.ht, key, index); // TODO: multihash
- }
-
-
- pthread_mutex_unlock(&st->writeLock);
-
- return 0;
- }
- int ObjStore_IndexFetch(DB* db, ObjStore* st, u32 indexIndex, void* key, void* out) {
- Index* index = NULL;
- HT_get(&st->indices, indexIndex, &index);
- if(!index) return 2;
-
- u64 loc = 0;
- u64 ukey = *(u64*)key;
- if(HT_get(&index->hash64.ht, ukey, &loc)) {
- return 1;
- }
-
- u64 objSize = st->def->size;
- u64 mapIndex = loc / st->mapCount;
- u64 localIndex = loc % st->mapCount;
-
- void* mapping = st->maps[mapIndex];
- void* src = mapping + (localIndex * objSize);
-
- memcpy(out, src, objSize);
-
- return 0;
- }
- #define w32(x) *((u32*)(data + len)) = (x); len += 4;
- #define wstr(x) \
- do { \
- u32 l = strlen(x); \
- w32(l); \
- memcpy(data + len, x, l); \
- len += l; \
- } while(0);
- void ObjDef_Serialize(ObjDef* def, void** pdata, u64* plen) {
-
- u64 alloc = 4096; // TODO: bug
- void* data = malloc(alloc);
- u64 len = 0;
-
- // write version
- w32(1);
-
- // write scalar info
- w32(def->id);
- w32(def->size);
-
- wstr(def->name);
-
- // write fields
- w32(VEC_LEN(&def->fields));
- VEC_EACHP(&def->fields, i, f) {
- w32(f->id);
- w32(f->offset);
- w32(f->len);
- w32(f->type);
- wstr(f->name);
- }
-
- // write indices
- w32(VEC_LEN(&def->indices));
- VEC_EACHP(&def->indices, i, f) {
- w32(f->id);
- w32(f->fieldID);
- w32(f->offset);
- w32(f->len);
- w32(f->type);
- wstr(f->name);
- }
-
- *pdata = data;
- *plen = len;
- }
- #define r32(x) x = *((u32*)(data + len)); len += 4;
- #define rstr(x) \
- do { \
- u32 l; \
- r32(l); \
- x = strnint(data + len, l); \
- len += l; \
- } while(0);
- void ObjDef_Deserialize(ObjDef* def, void* data, u64 maxlen) {
-
- u64 len = 0;
-
- // check version
- u32 v;
- r32(v);
-
- if(v != 1) {
- printf("meta file version mismatch\n");
- exit(1);
- }
-
- // read scalars
- r32(def->id);
- r32(def->size);
- rstr(def->name);
- u32 n;
- r32(n);
- for(int i = 0; i < n; i++) {
- FieldDef* f = VEC_INC(&def->fields);
- r32(f->id);
- r32(f->offset);
- r32(f->len);
- r32(f->type);
- rstr(f->name);
- }
-
- r32(n);
- for(int i = 0; i < n; i++) {
- IndexDef* f = VEC_INC(&def->indices);
- r32(f->id);
- r32(f->fieldID);
- r32(f->offset);
- r32(f->len);
- r32(f->type);
- rstr(f->name);
- }
- }
|