db.c 9.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481
  1. #include <stdio.h>
  2. #include <errno.h>
  3. #include <fcntl.h>
  4. #include <sys/mman.h>
  5. #include <unistd.h>
  6. #include <sys/stat.h>
  7. #include "db.h"
  8. void DB_InitEmpty(DB* db) {
  9. SVEC_init(&db->defs);
  10. HT_init(&db->defLookup, 128);
  11. HT_init(&db->storeLookup, 128);
  12. SVEC_init(&db->stores);
  13. }
  14. ObjDef* DB_AllocObjDef(DB* db, char* name) {
  15. ObjDef* o = SVEC_inc(&db->defs);
  16. o->id = SVEC_len(&db->defs);
  17. o->name = strint(name);
  18. HT_set(&db->defLookup, o->name, o);
  19. return o;
  20. }
  21. int DB_FinalizeObjDef(DB* db, ObjDef* o) {
  22. // TODO: validate ids, limits, field bounds and types
  23. return 0;
  24. }
  25. #define sprintfint(x, ...) \
  26. do { \
  27. char* __tmp = sprintfdup(__VA_ARGS__); \
  28. x = strint(__tmp); \
  29. free(__tmp); \
  30. } while(0);
  31. // creates a new store from scratch
  32. ObjStore* DB_CreateStore(DB* db, ObjDef* o, char* name) {
  33. ObjStore* st = calloc(1, sizeof(*st));
  34. st->name = strint(name);
  35. HT_init(&st->indices, 128);
  36. st->def = o;
  37. st->mapSize = PAGE_SIZE * 4; // pure voodoo
  38. st->mapCount = st->mapSize / o->size;
  39. // create backing files
  40. st->backingRelPath = strint(name);
  41. char* tmp = path_join(db->backingDir, st->backingRelPath);
  42. st->backingAbsPath = strint(tmp);
  43. free(tmp);
  44. sprintfint(st->metaRelPath, "%s.meta", st->backingRelPath);
  45. sprintfint(st->metaAbsPath, "%s.meta", st->backingAbsPath);
  46. sprintfint(st->statsRelPath, "%s.stats", st->backingRelPath);
  47. sprintfint(st->statsAbsPath, "%s.stats", st->backingAbsPath);
  48. // save metadata
  49. void* md;
  50. u64 mdlen;
  51. ObjDef_Serialize(o, &md, &mdlen);
  52. write_whole_file(st->metaAbsPath, md, mdlen);
  53. free(md);
  54. // set up the stats mapping
  55. st->statsSize = 512;
  56. st->statsFD = open(st->statsAbsPath, O_RDWR | O_CREAT, 0777);
  57. if(-1 == st->statsFD) {
  58. printf("could not open stats file '%s': %s\n", st->statsAbsPath, strerror(errno));
  59. exit(1);
  60. }
  61. ftruncate(st->statsFD, st->statsSize);
  62. st->stats = mmap(NULL, st->statsSize, PROT_READ | PROT_WRITE, MAP_SHARED, st->statsFD, 0);
  63. // initialize first pages
  64. pthread_mutex_init(&st->writeLock, NULL);
  65. pthread_mutex_lock(&st->writeLock);
  66. st->backingFD = open(st->backingAbsPath, O_RDWR | O_CREAT, 0777);
  67. if(-1 == st->backingFD) {
  68. printf("could not open backing file '%s': %s\n", st->backingAbsPath, strerror(errno));
  69. exit(1);
  70. }
  71. // check the file
  72. struct stat fst;
  73. fstat(st->backingFD, &fst);
  74. st->fileSize = fst.st_size;
  75. if(st->fileSize < st->mapSize) {
  76. ftruncate(st->backingFD, st->mapSize); // grow the file by truncating it.
  77. st->fileSize = st->mapSize;
  78. }
  79. st->mapsAlloc = 64;
  80. st->maps = calloc(1, st->mapsAlloc * sizeof(*st->maps));
  81. st->mapsLen = 1;
  82. void* first = mmap(NULL, st->mapSize, PROT_READ | PROT_WRITE, MAP_SHARED, st->backingFD, 0);
  83. if(first == MAP_FAILED) {
  84. printf("could not map memory: %s\n", strerror(errno));
  85. exit(1);
  86. }
  87. st->maps[0] = first;
  88. // create indices
  89. VEC_EACHP(&o->indices, i, idef) {
  90. Index* index = calloc(1, sizeof(*index));
  91. index->def = idef;
  92. HT_init(&index->hash64.ht, 128);
  93. HT_set(&st->indices, index->def->id, index);
  94. }
  95. pthread_mutex_unlock(&st->writeLock);
  96. return st;
  97. }
  98. ObjStore* DB_LoadStore(DB* db, char* name) {
  99. ObjStore* st = calloc(1, sizeof(*st));
  100. st->name = strint(name);
  101. HT_init(&st->indices, 128);
  102. st->mapSize = PAGE_SIZE * 4; // pure voodoo
  103. // backing file paths
  104. st->backingRelPath = strint(name);
  105. char* tmp = path_join(db->backingDir, st->backingRelPath);
  106. st->backingAbsPath = strint(tmp);
  107. free(tmp);
  108. sprintfint(st->metaRelPath, "%s.meta", st->backingRelPath);
  109. sprintfint(st->metaAbsPath, "%s.meta", st->backingAbsPath);
  110. sprintfint(st->statsRelPath, "%s.stats", st->backingRelPath);
  111. sprintfint(st->statsAbsPath, "%s.stats", st->backingAbsPath);
  112. // load the stats
  113. st->statsSize = 512;
  114. st->statsFD = open(st->statsAbsPath, O_RDWR | O_CREAT, 0777);
  115. if(-1 == st->statsFD) {
  116. printf("could not open stats file '%s': %s\n", st->statsAbsPath, strerror(errno));
  117. exit(1);
  118. }
  119. st->stats = mmap(NULL, st->statsSize, PROT_READ | PROT_WRITE, MAP_SHARED, st->statsFD, 0);
  120. ObjStore_ReadStats(st);
  121. // load metadata
  122. u64 mdlen;
  123. void* md = read_whole_file(st->metaAbsPath, &mdlen);
  124. st->def = calloc(1, sizeof(*st->def));
  125. ObjDef_Deserialize(st->def, md, mdlen);
  126. free(md);
  127. st->mapCount = st->mapSize / st->def->size;
  128. // initialize file and locks
  129. pthread_mutex_init(&st->writeLock, NULL);
  130. pthread_mutex_lock(&st->writeLock);
  131. st->backingFD = open(st->backingAbsPath, O_RDWR);
  132. if(-1 == st->backingFD) {
  133. printf("could not open backing file '%s': %s\n", st->backingAbsPath, strerror(errno));
  134. exit(1);
  135. }
  136. // cache the file size
  137. struct stat fst;
  138. fstat(st->backingFD, &fst);
  139. st->fileSize = fst.st_size;
  140. // memory map pointers
  141. st->mapsAlloc = 64;
  142. st->maps = calloc(1, st->mapsAlloc * sizeof(*st->maps));
  143. st->mapsLen = 0;
  144. // rebuild the indices
  145. VEC_EACHP(&st->def->indices, i, idef) {
  146. Index* index = calloc(1, sizeof(*index));
  147. index->def = idef;
  148. HT_init(&index->hash64.ht, 128);
  149. HT_set(&st->indices, index->def->id, index);
  150. ObjStore_RebuildIndex(db, st, index->def->id); // BUG: slow
  151. }
  152. pthread_mutex_unlock(&st->writeLock);
  153. return st;
  154. }
  155. // not guarded because it's only used during init before the locks even exist
  156. void ObjStore_ReadStats(ObjStore* st) {
  157. st->count = *(u64*)st->stats;
  158. }
  159. // not guarded because it's only called from within an existing lock
  160. void ObjStore_WriteStats(ObjStore* st) {
  161. *(u64*)st->stats = st->count;
  162. msync(st->stats, st->statsSize, MS_SYNC);
  163. }
  164. int ObjStore_ActivateMapping(DB* db, ObjStore* st, u32 mapIndex) {
  165. // check the maps table size
  166. if(st->mapsAlloc < mapIndex) {
  167. u64 oldAlloc = st->mapsAlloc;
  168. st->mapsAlloc *= 2; // TODO: bug, nextPOT
  169. st->maps = realloc(st->maps, st->mapsAlloc * sizeof(*st->maps));
  170. memset(st->maps + oldAlloc, 0, (st->mapsAlloc - oldAlloc) * sizeof(*st->maps));
  171. }
  172. void* p = st->maps[mapIndex];
  173. if(p) return 0; // already mapped
  174. if(st->mapsLen < mapIndex) st->mapsLen = mapIndex;
  175. // TODO: check map limit and evict some memory
  176. // check if we need to expand the file
  177. u64 neededFileSize = (mapIndex + 1) * st->mapSize;
  178. if(st->fileSize < neededFileSize) {
  179. ftruncate(st->backingFD, neededFileSize); // grow the file by truncating it.
  180. st->fileSize = neededFileSize;
  181. }
  182. p = mmap(NULL, st->mapSize, PROT_READ | PROT_WRITE, MAP_SHARED, st->backingFD, mapIndex * st->mapSize);
  183. if(p == MAP_FAILED) {
  184. printf("could not map memory: %s\n", strerror(errno));
  185. exit(1);
  186. }
  187. st->maps[mapIndex] = p;
  188. return 0;
  189. }
  190. int ObjStore_RebuildIndex(DB* db, ObjStore* st, u32 indexIndex) {
  191. Index* index = NULL;
  192. HT_get(&st->indices, indexIndex, &index);
  193. if(!index) return 2;
  194. HT_destroy(&index->hash64.ht);
  195. HT_init(&index->hash64.ht, 128);
  196. u32 off = index->def->offset;
  197. u32 sz = st->def->size;
  198. u64 loc = 0;
  199. for(u32 m = 0; m < st->mapsLen; m++) {
  200. ObjStore_ActivateMapping(db, st, m);
  201. void* map = st->maps[m] + off;
  202. for(u32 l = 0; l < st->mapCount; l++) {
  203. u64 key = *(u64*)map;
  204. HT_set(&index->hash64.ht, key, index); // TODO: multihash
  205. map += sz;
  206. if(++loc > st->count) goto DONE;
  207. }
  208. }
  209. DONE:
  210. return 0;
  211. }
  212. int ObjStore_Insert(DB* db, ObjStore* st, void* data) {
  213. u64 index = st->count;
  214. u64 objSize = st->def->size;
  215. u64 mapIndex = index / st->mapCount;
  216. u64 localIndex = index % st->mapCount;
  217. ObjStore_ActivateMapping(db, st, mapIndex);
  218. void* mapping = st->maps[mapIndex];
  219. void* dst = mapping + (localIndex * objSize);
  220. pthread_mutex_lock(&st->writeLock);
  221. memcpy(dst, data, objSize);
  222. msync(mapping, st->mapSize, MS_SYNC); // TEMP: flush the whole mapping for now
  223. st->count++;
  224. ObjStore_WriteStats(st);
  225. // process indices
  226. HT_EACH(&st->indices, i, Index*, ind) {
  227. u64 key = *(u64*)(data + ind->def->offset);
  228. HT_set(&ind->hash64.ht, key, index); // TODO: multihash
  229. }
  230. pthread_mutex_unlock(&st->writeLock);
  231. return 0;
  232. }
  233. int ObjStore_IndexFetch(DB* db, ObjStore* st, u32 indexIndex, void* key, void* out) {
  234. Index* index = NULL;
  235. HT_get(&st->indices, indexIndex, &index);
  236. if(!index) return 2;
  237. u64 loc = 0;
  238. u64 ukey = *(u64*)key;
  239. if(HT_get(&index->hash64.ht, ukey, &loc)) {
  240. return 1;
  241. }
  242. u64 objSize = st->def->size;
  243. u64 mapIndex = loc / st->mapCount;
  244. u64 localIndex = loc % st->mapCount;
  245. void* mapping = st->maps[mapIndex];
  246. void* src = mapping + (localIndex * objSize);
  247. memcpy(out, src, objSize);
  248. return 0;
  249. }
  250. #define w32(x) *((u32*)(data + len)) = (x); len += 4;
  251. #define wstr(x) \
  252. do { \
  253. u32 l = strlen(x); \
  254. w32(l); \
  255. memcpy(data + len, x, l); \
  256. len += l; \
  257. } while(0);
  258. void ObjDef_Serialize(ObjDef* def, void** pdata, u64* plen) {
  259. u64 alloc = 4096; // TODO: bug
  260. void* data = malloc(alloc);
  261. u64 len = 0;
  262. // write version
  263. w32(1);
  264. // write scalar info
  265. w32(def->id);
  266. w32(def->size);
  267. wstr(def->name);
  268. // write fields
  269. w32(VEC_LEN(&def->fields));
  270. VEC_EACHP(&def->fields, i, f) {
  271. w32(f->id);
  272. w32(f->offset);
  273. w32(f->len);
  274. w32(f->type);
  275. wstr(f->name);
  276. }
  277. // write indices
  278. w32(VEC_LEN(&def->indices));
  279. VEC_EACHP(&def->indices, i, f) {
  280. w32(f->id);
  281. w32(f->fieldID);
  282. w32(f->offset);
  283. w32(f->len);
  284. w32(f->type);
  285. wstr(f->name);
  286. }
  287. *pdata = data;
  288. *plen = len;
  289. }
  290. #define r32(x) x = *((u32*)(data + len)); len += 4;
  291. #define rstr(x) \
  292. do { \
  293. u32 l; \
  294. r32(l); \
  295. x = strnint(data + len, l); \
  296. len += l; \
  297. } while(0);
  298. void ObjDef_Deserialize(ObjDef* def, void* data, u64 maxlen) {
  299. u64 len = 0;
  300. // check version
  301. u32 v;
  302. r32(v);
  303. if(v != 1) {
  304. printf("meta file version mismatch\n");
  305. exit(1);
  306. }
  307. // read scalars
  308. r32(def->id);
  309. r32(def->size);
  310. rstr(def->name);
  311. u32 n;
  312. r32(n);
  313. for(int i = 0; i < n; i++) {
  314. FieldDef* f = VEC_INC(&def->fields);
  315. r32(f->id);
  316. r32(f->offset);
  317. r32(f->len);
  318. r32(f->type);
  319. rstr(f->name);
  320. }
  321. r32(n);
  322. for(int i = 0; i < n; i++) {
  323. IndexDef* f = VEC_INC(&def->indices);
  324. r32(f->id);
  325. r32(f->fieldID);
  326. r32(f->offset);
  327. r32(f->len);
  328. r32(f->type);
  329. rstr(f->name);
  330. }
  331. }