LockFreeHash.nim 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606
  1. #nim c -t:-march=i686 --cpu:amd64 --threads:on -d:release lockfreehash.nim
  2. import math, hashes
  3. #------------------------------------------------------------------------------
  4. ## Memory Utility Functions
  5. proc newHeap*[T](): ptr T =
  6. result = cast[ptr T](alloc0(sizeof(T)))
  7. proc copyNew*[T](x: var T): ptr T =
  8. var
  9. size = sizeof(T)
  10. mem = alloc(size)
  11. copyMem(mem, x.addr, size)
  12. return cast[ptr T](mem)
  13. proc copyTo*[T](val: var T, dest: int) =
  14. copyMem(pointer(dest), val.addr, sizeof(T))
  15. proc allocType*[T](): pointer = alloc(sizeof(T))
  16. proc newShared*[T](): ptr T =
  17. result = cast[ptr T](allocShared0(sizeof(T)))
  18. proc copyShared*[T](x: var T): ptr T =
  19. var
  20. size = sizeof(T)
  21. mem = allocShared(size)
  22. copyMem(mem, x.addr, size)
  23. return cast[ptr T](mem)
  24. #------------------------------------------------------------------------------
  25. ## Pointer arithmetic
  26. proc `+`*(p: pointer, i: int): pointer {.inline.} =
  27. cast[pointer](cast[int](p) + i)
  28. const
  29. minTableSize = 8
  30. reProbeLimit = 12
  31. minCopyWork = 4096
  32. intSize = sizeof(int)
  33. when sizeof(int) == 4: # 32bit
  34. type
  35. Raw = range[0..1073741823]
  36. ## The range of uint values that can be stored directly in a value slot
  37. ## when on a 32 bit platform
  38. elif sizeof(int) == 8: # 64bit
  39. type
  40. Raw = range[0'i64..4611686018427387903'i64]
  41. ## The range of uint values that can be stored directly in a value slot
  42. ## when on a 64 bit platform
  43. else:
  44. {.error: "unsupported platform".}
  45. type
  46. Entry = tuple
  47. key: int
  48. value: int
  49. EntryArr = ptr array[0..10_000_000, Entry]
  50. PConcTable[K,V] = ptr object {.pure.}
  51. len: int
  52. used: int
  53. active: int
  54. copyIdx: int
  55. copyDone: int
  56. next: PConcTable[K,V]
  57. data: EntryArr
  58. proc setVal[K,V](table: var PConcTable[K,V], key: int, val: int,
  59. expVal: int, match: bool): int
  60. #------------------------------------------------------------------------------
  61. # Create a new table
  62. proc newLFTable*[K,V](size: int = minTableSize): PConcTable[K,V] =
  63. let
  64. dataLen = max(nextPowerOfTwo(size), minTableSize)
  65. dataSize = dataLen*sizeof(Entry)
  66. dataMem = allocShared0(dataSize)
  67. tableSize = 7 * intSize
  68. tableMem = allocShared0(tableSize)
  69. table = cast[PConcTable[K,V]](tableMem)
  70. table.len = dataLen
  71. table.used = 0
  72. table.active = 0
  73. table.copyIdx = 0
  74. table.copyDone = 0
  75. table.next = nil
  76. table.data = cast[EntryArr](dataMem)
  77. result = table
  78. #------------------------------------------------------------------------------
  79. # Delete a table
  80. proc deleteConcTable[K,V](tbl: PConcTable[K,V]) =
  81. deallocShared(tbl.data)
  82. deallocShared(tbl)
  83. #------------------------------------------------------------------------------
  84. proc `[]`[K,V](table: var PConcTable[K,V], i: int): var Entry {.inline.} =
  85. table.data[i]
  86. #------------------------------------------------------------------------------
  87. # State flags stored in ptr
  88. proc pack[T](x: T): int {.inline.} =
  89. result = (cast[int](x) shl 2)
  90. #echo("packKey ",cast[int](x) , " -> ", result)
  91. # Pop the flags off returning a 4 byte aligned ptr to our Key or Val
  92. proc pop(x: int): int {.inline.} =
  93. result = x and 0xFFFFFFFC'i32
  94. # Pop the raw value off of our Key or Val
  95. proc popRaw(x: int): int {.inline.} =
  96. result = x shr 2
  97. # Pop the flags off returning a 4 byte aligned ptr to our Key or Val
  98. proc popPtr[V](x: int): ptr V {.inline.} =
  99. result = cast[ptr V](pop(x))
  100. #echo("popPtr " & $x & " -> " & $cast[int](result))
  101. # Ghost (sentinel)
  102. # K or V is no longer valid use new table
  103. const Ghost = 0xFFFFFFFC
  104. proc isGhost(x: int): bool {.inline.} =
  105. result = x == 0xFFFFFFFC
  106. # Tombstone
  107. # applied to V = K is dead
  108. proc isTomb(x: int): bool {.inline.} =
  109. result = (x and 0x00000002) != 0
  110. proc setTomb(x: int): int {.inline.} =
  111. result = x or 0x00000002
  112. # Prime
  113. # K or V is in new table copied from old
  114. proc isPrime(x: int): bool {.inline.} =
  115. result = (x and 0x00000001) != 0
  116. proc setPrime(x: int): int {.inline.} =
  117. result = x or 0x00000001
  118. #------------------------------------------------------------------------------
  119. ##This is for i32 only need to override for i64
  120. proc hashInt(x: int):int {.inline.} =
  121. var h = uint32(x) #shr 2'u32
  122. h = h xor (h shr 16'u32)
  123. h *= 0x85ebca6b'u32
  124. h = h xor (h shr 13'u32)
  125. h *= 0xc2b2ae35'u32
  126. h = h xor (h shr 16'u32)
  127. result = int(h)
  128. #------------------------------------------------------------------------------
  129. proc resize[K,V](self: PConcTable[K,V]): PConcTable[K,V] =
  130. var next = atomic_load_n(self.next.addr, ATOMIC_RELAXED)
  131. #echo("next = " & $cast[int](next))
  132. if next != nil:
  133. #echo("A new table already exists, copy in progress")
  134. return next
  135. var
  136. oldLen = atomic_load_n(self.len.addr, ATOMIC_RELAXED)
  137. newTable = newLFTable[K,V](oldLen*2)
  138. success = atomic_compare_exchange_n(self.next.addr, next.addr, newTable,
  139. false, ATOMIC_RELAXED, ATOMIC_RELAXED)
  140. if not success:
  141. echo("someone beat us to it! delete table we just created and return his " & $cast[int](next))
  142. deleteConcTable(newTable)
  143. return next
  144. else:
  145. echo("Created New Table! " & $cast[int](newTable) & " Size = " & $newTable.len)
  146. return newTable
  147. #------------------------------------------------------------------------------
  148. #proc keyEQ[K](key1: ptr K, key2: ptr K): bool {.inline.} =
  149. proc keyEQ[K](key1: int, key2: int): bool {.inline.} =
  150. result = false
  151. when K is Raw:
  152. if key1 == key2:
  153. result = true
  154. else:
  155. var
  156. p1 = popPtr[K](key1)
  157. p2 = popPtr[K](key2)
  158. if p1 != nil and p2 != nil:
  159. if cast[int](p1) == cast[int](p2):
  160. return true
  161. if p1[] == p2[]:
  162. return true
  163. #------------------------------------------------------------------------------
  164. #proc tableFull(self: var PConcTable[K,V]) : bool {.inline.} =
  165. #------------------------------------------------------------------------------
  166. proc copySlot[K,V](idx: int, oldTbl: var PConcTable[K,V], newTbl: var PConcTable[K,V]): bool =
  167. #echo("Copy idx " & $idx)
  168. var
  169. oldVal = 0
  170. oldkey = 0
  171. ok = false
  172. result = false
  173. #Block the key so no other threads waste time here
  174. while not ok:
  175. ok = atomic_compare_exchange_n(oldTbl[idx].key.addr, oldKey.addr,
  176. setTomb(oldKey), false, ATOMIC_RELAXED, ATOMIC_RELAXED)
  177. #echo("oldKey was = " & $oldKey & " set it to tomb " & $setTomb(oldKey))
  178. #Prevent new values from appearing in the old table by priming
  179. oldVal = atomic_load_n(oldTbl[idx].value.addr, ATOMIC_RELAXED)
  180. while not isPrime(oldVal):
  181. var box = if oldVal == 0 or isTomb(oldVal) : oldVal.setTomb.setPrime
  182. else: oldVal.setPrime
  183. if atomic_compare_exchange_n(oldTbl[idx].value.addr, oldVal.addr,
  184. box, false, ATOMIC_RELAXED, ATOMIC_RELAXED):
  185. if isPrime(box) and isTomb(box):
  186. return true
  187. oldVal = box
  188. break
  189. #echo("oldVal was = ", oldVal, " set it to prime ", box)
  190. if isPrime(oldVal) and isTomb(oldVal):
  191. #when not (K is Raw):
  192. # deallocShared(popPtr[K](oldKey))
  193. return false
  194. if isTomb(oldVal):
  195. echo("oldVal is Tomb!!!, should not happen")
  196. if pop(oldVal) != 0:
  197. result = setVal(newTbl, pop(oldKey), pop(oldVal), 0, true) == 0
  198. #if result:
  199. #echo("Copied a Slot! idx= " & $idx & " key= " & $oldKey & " val= " & $oldVal)
  200. #else:
  201. #echo("copy slot failed")
  202. # Our copy is done so we disable the old slot
  203. while not ok:
  204. ok = atomic_compare_exchange_n(oldTbl[idx].value.addr, oldVal.addr,
  205. oldVal.setTomb.setPrime , false, ATOMIC_RELAXED, ATOMIC_RELAXED)
  206. #echo("disabled old slot")
  207. #echo"---------------------"
  208. #------------------------------------------------------------------------------
  209. proc promote[K,V](table: var PConcTable[K,V]) =
  210. var
  211. newData = atomic_load_n(table.next.data.addr, ATOMIC_RELAXED)
  212. newLen = atomic_load_n(table.next.len.addr, ATOMIC_RELAXED)
  213. newUsed = atomic_load_n(table.next.used.addr, ATOMIC_RELAXED)
  214. deallocShared(table.data)
  215. atomic_store_n(table.data.addr, newData, ATOMIC_RELAXED)
  216. atomic_store_n(table.len.addr, newLen, ATOMIC_RELAXED)
  217. atomic_store_n(table.used.addr, newUsed, ATOMIC_RELAXED)
  218. atomic_store_n(table.copyIdx.addr, 0, ATOMIC_RELAXED)
  219. atomic_store_n(table.copyDone.addr, 0, ATOMIC_RELAXED)
  220. deallocShared(table.next)
  221. atomic_store_n(table.next.addr, nil, ATOMIC_RELAXED)
  222. echo("new table swapped!")
  223. #------------------------------------------------------------------------------
  224. proc checkAndPromote[K,V](table: var PConcTable[K,V], workDone: int): bool =
  225. var
  226. oldLen = atomic_load_n(table.len.addr, ATOMIC_RELAXED)
  227. copyDone = atomic_load_n(table.copyDone.addr, ATOMIC_RELAXED)
  228. ok: bool
  229. result = false
  230. if workDone > 0:
  231. #echo("len to copy =" & $oldLen)
  232. #echo("copyDone + workDone = " & $copyDone & " + " & $workDone)
  233. while not ok:
  234. ok = atomic_compare_exchange_n(table.copyDone.addr, copyDone.addr,
  235. copyDone + workDone, false, ATOMIC_RELAXED, ATOMIC_RELAXED)
  236. #if ok: echo("set copyDone")
  237. # If the copy is done we can promote this table
  238. if copyDone + workDone >= oldLen:
  239. # Swap new data
  240. #echo("work is done!")
  241. table.promote
  242. result = true
  243. #------------------------------------------------------------------------------
  244. proc copySlotAndCheck[K,V](table: var PConcTable[K,V], idx: int):
  245. PConcTable[K,V] =
  246. var
  247. newTable = cast[PConcTable[K,V]](atomic_load_n(table.next.addr, ATOMIC_RELAXED))
  248. result = newTable
  249. if newTable != nil and copySlot(idx, table, newTable):
  250. #echo("copied a single slot, idx = " & $idx)
  251. if checkAndPromote(table, 1): return table
  252. #------------------------------------------------------------------------------
  253. proc helpCopy[K,V](table: var PConcTable[K,V]): PConcTable[K,V] =
  254. var
  255. newTable = cast[PConcTable[K,V]](atomic_load_n(table.next.addr, ATOMIC_RELAXED))
  256. result = newTable
  257. if newTable != nil:
  258. var
  259. oldLen = atomic_load_n(table.len.addr, ATOMIC_RELAXED)
  260. copyDone = atomic_load_n(table.copyDone.addr, ATOMIC_RELAXED)
  261. copyIdx = 0
  262. work = min(oldLen, minCopyWork)
  263. #panicStart = -1
  264. workDone = 0
  265. if copyDone < oldLen:
  266. var ok: bool
  267. while not ok:
  268. ok = atomic_compare_exchange_n(table.copyIdx.addr, copyIdx.addr,
  269. copyIdx + work, false, ATOMIC_RELAXED, ATOMIC_RELAXED)
  270. #echo("copy idx = ", copyIdx)
  271. for i in 0..work-1:
  272. var idx = (copyIdx + i) and (oldLen - 1)
  273. if copySlot(idx, table, newTable):
  274. workDone += 1
  275. if workDone > 0:
  276. #echo("did work ", workDone, " on thread ", cast[int](myThreadID[pointer]()))
  277. if checkAndPromote(table, workDone): return table
  278. # In case a thread finished all the work then got stalled before promotion
  279. if checkAndPromote(table, 0): return table
  280. #------------------------------------------------------------------------------
  281. proc setVal[K,V](table: var PConcTable[K,V], key: int, val: int,
  282. expVal: int, match: bool): int =
  283. #echo("-try set- in table ", " key = ", (popPtr[K](key)[]), " val = ", val)
  284. when K is Raw:
  285. var idx = hashInt(key)
  286. else:
  287. var idx = popPtr[K](key)[].hash
  288. var
  289. nextTable: PConcTable[K,V]
  290. probes = 1
  291. # spin until we find a key slot or build and jump to next table
  292. while true:
  293. idx = idx and (table.len - 1)
  294. #echo("try set idx = " & $idx & "for" & $key)
  295. var
  296. probedKey = 0
  297. openKey = atomic_compare_exchange_n(table[idx].key.addr, probedKey.addr,
  298. key, false, ATOMIC_RELAXED, ATOMIC_RELAXED)
  299. if openKey:
  300. if val.isTomb:
  301. #echo("val was tomb, bail, no reason to set an open slot to tomb")
  302. return val
  303. #increment used slots
  304. #echo("found an open slot, total used = " &
  305. #$atomic_add_fetch(table.used.addr, 1, ATOMIC_RELAXED))
  306. discard atomic_add_fetch(table.used.addr, 1, ATOMIC_RELAXED)
  307. break # We found an open slot
  308. #echo("set idx ", idx, " key = ", key, " probed = ", probedKey)
  309. if keyEQ[K](probedKey, key):
  310. #echo("we found the matching slot")
  311. break # We found a matching slot
  312. if (not(expVal != 0 and match)) and (probes >= reProbeLimit or key.isTomb):
  313. if key.isTomb: echo("Key is Tombstone")
  314. #if probes >= reProbeLimit: echo("Too much probing " & $probes)
  315. #echo("try to resize")
  316. #create next bigger table
  317. nextTable = resize(table)
  318. #help do some copying
  319. #echo("help copy old table to new")
  320. nextTable = helpCopy(table)
  321. #now setVal in the new table instead
  322. #echo("jumping to next table to set val")
  323. return setVal(nextTable, key, val, expVal, match)
  324. else:
  325. idx += 1
  326. probes += 1
  327. # Done spinning for a new slot
  328. var oldVal = atomic_load_n(table[idx].value.addr, ATOMIC_RELAXED)
  329. if val == oldVal:
  330. #echo("this val is alredy in the slot")
  331. return oldVal
  332. nextTable = atomic_load_n(table.next.addr, ATOMIC_SEQ_CST)
  333. if nextTable == nil and
  334. ((oldVal == 0 and
  335. (probes >= reProbeLimit or table.used / table.len > 0.8)) or
  336. (isPrime(oldVal))):
  337. if table.used / table.len > 0.8: echo("resize because usage ratio = " &
  338. $(table.used / table.len))
  339. if isPrime(oldVal): echo("old val isPrime, should be a rare mem ordering event")
  340. nextTable = resize(table)
  341. if nextTable != nil:
  342. #echo("tomb old slot then set in new table")
  343. nextTable = copySlotAndCheck(table,idx)
  344. return setVal(nextTable, key, val, expVal, match)
  345. # Finally ready to add new val to table
  346. while true:
  347. if match and oldVal != expVal:
  348. #echo("set failed, no match oldVal= " & $oldVal & " expVal= " & $expVal)
  349. return oldVal
  350. if atomic_compare_exchange_n(table[idx].value.addr, oldVal.addr,
  351. val, false, ATOMIC_RELEASE, ATOMIC_RELAXED):
  352. #echo("val set at table " & $cast[int](table))
  353. if expVal != 0:
  354. if (oldVal == 0 or isTomb(oldVal)) and not isTomb(val):
  355. discard atomic_add_fetch(table.active.addr, 1, ATOMIC_RELAXED)
  356. elif not (oldVal == 0 or isTomb(oldVal)) and isTomb(val):
  357. discard atomic_add_fetch(table.active.addr, -1, ATOMIC_RELAXED)
  358. if oldVal == 0 and expVal != 0:
  359. return setTomb(oldVal)
  360. else: return oldVal
  361. if isPrime(oldVal):
  362. nextTable = copySlotAndCheck(table, idx)
  363. return setVal(nextTable, key, val, expVal, match)
  364. #------------------------------------------------------------------------------
  365. proc getVal[K,V](table: var PConcTable[K,V], key: int): int =
  366. #echo("-try get- key = " & $key)
  367. when K is Raw:
  368. var idx = hashInt(key)
  369. else:
  370. var idx = popPtr[K](key)[].hash
  371. #echo("get idx ", idx)
  372. var
  373. probes = 0
  374. val: int
  375. while true:
  376. idx = idx and (table.len - 1)
  377. var
  378. newTable: PConcTable[K,V] # = atomic_load_n(table.next.addr, ATOMIC_ACQUIRE)
  379. probedKey = atomic_load_n(table[idx].key.addr, ATOMIC_SEQ_CST)
  380. if keyEQ[K](probedKey, key):
  381. #echo("found key after ", probes+1)
  382. val = atomic_load_n(table[idx].value.addr, ATOMIC_ACQUIRE)
  383. if not isPrime(val):
  384. if isTomb(val):
  385. #echo("val was tomb but not prime")
  386. return 0
  387. else:
  388. #echo("-GotIt- idx = ", idx, " key = ", key, " val ", val )
  389. return val
  390. else:
  391. newTable = copySlotAndCheck(table, idx)
  392. return getVal(newTable, key)
  393. else:
  394. #echo("probe ", probes, " idx = ", idx, " key = ", key, " found ", probedKey )
  395. if probes >= reProbeLimit*4 or key.isTomb:
  396. if newTable == nil:
  397. #echo("too many probes and no new table ", key, " ", idx )
  398. return 0
  399. else:
  400. newTable = helpCopy(table)
  401. return getVal(newTable, key)
  402. idx += 1
  403. probes += 1
  404. #------------------------------------------------------------------------------
  405. #proc set*(table: var PConcTable[Raw,Raw], key: Raw, val: Raw) =
  406. # discard setVal(table, pack(key), pack(key), 0, false)
  407. #proc set*[V](table: var PConcTable[Raw,V], key: Raw, val: ptr V) =
  408. # discard setVal(table, pack(key), cast[int](val), 0, false)
  409. proc set*[K,V](table: var PConcTable[K,V], key: var K, val: var V) =
  410. when not (K is Raw):
  411. var newKey = cast[int](copyShared(key))
  412. else:
  413. var newKey = pack(key)
  414. when not (V is Raw):
  415. var newVal = cast[int](copyShared(val))
  416. else:
  417. var newVal = pack(val)
  418. var oldPtr = pop(setVal(table, newKey, newVal, 0, false))
  419. #echo("oldPtr = ", cast[int](oldPtr), " newPtr = ", cast[int](newPtr))
  420. when not (V is Raw):
  421. if newVal != oldPtr and oldPtr != 0:
  422. deallocShared(cast[ptr V](oldPtr))
  423. proc get*[K,V](table: var PConcTable[K,V], key: var K): V =
  424. when not (V is Raw):
  425. when not (K is Raw):
  426. return popPtr[V](getVal(table, cast[int](key.addr)))[]
  427. else:
  428. return popPtr[V](getVal(table, pack(key)))[]
  429. else:
  430. when not (K is Raw):
  431. return popRaw(getVal(table, cast[int](key.addr)))
  432. else:
  433. return popRaw(getVal(table, pack(key)))
  434. #proc `[]`[K,V](table: var PConcTable[K,V], key: K): PEntry[K,V] {.inline.} =
  435. # getVal(table, key)
  436. #proc `[]=`[K,V](table: var PConcTable[K,V], key: K, val: V): PEntry[K,V] {.inline.} =
  437. # setVal(table, key, val)
  438. #Tests ----------------------------
  439. when not defined(testing) and isMainModule:
  440. import locks, times, mersenne
  441. const
  442. numTests = 100000
  443. numThreads = 10
  444. type
  445. TestObj = tuple
  446. thr: int
  447. f0: int
  448. f1: int
  449. Data = tuple[k: string,v: TestObj]
  450. PDataArr = array[0..numTests-1, Data]
  451. Dict = PConcTable[string,TestObj]
  452. var
  453. thr: array[0..numThreads-1, Thread[Dict]]
  454. table = newLFTable[string,TestObj](8)
  455. rand = newMersenneTwister(2525)
  456. proc createSampleData(len: int): PDataArr =
  457. #result = cast[PDataArr](allocShared0(sizeof(Data)*numTests))
  458. for i in 0..len-1:
  459. result[i].k = "mark" & $(i+1)
  460. #echo("mark" & $(i+1), " ", hash("mark" & $(i+1)))
  461. result[i].v.thr = 0
  462. result[i].v.f0 = i+1
  463. result[i].v.f1 = 0
  464. #echo("key = " & $(i+1) & " Val ptr = " & $cast[int](result[i].v.addr))
  465. proc threadProc(tp: Dict) {.thread.} =
  466. var t = cpuTime();
  467. for i in 1..numTests:
  468. var key = "mark" & $(i)
  469. var got = table.get(key)
  470. got.thr = cast[int](myThreadID[pointer]())
  471. got.f1 = got.f1 + 1
  472. table.set(key, got)
  473. t = cpuTime() - t
  474. echo t
  475. var testData = createSampleData(numTests)
  476. for i in 0..numTests-1:
  477. table.set(testData[i].k, testData[i].v)
  478. var i = 0
  479. while i < numThreads:
  480. createThread(thr[i], threadProc, table)
  481. i += 1
  482. joinThreads(thr)
  483. var fails = 0
  484. for i in 0..numTests-1:
  485. var got = table.get(testData[i].k)
  486. if got.f0 != i+1 or got.f1 != numThreads:
  487. fails += 1
  488. echo(got)
  489. echo("Failed read or write = ", fails)
  490. #for i in 1..numTests:
  491. # echo(i, " = ", hashInt(i) and 8191)
  492. deleteConcTable(table)