123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610 |
- #nim c -t:-march=i686 --cpu:amd64 --threads:on -d:release lockfreehash.nim
- import math, hashes
- #------------------------------------------------------------------------------
- ## Memory Utility Functions
- proc newHeap*[T](): ptr T =
- result = cast[ptr T](alloc0(sizeof(T)))
- proc copyNew*[T](x: var T): ptr T =
- var
- size = sizeof(T)
- mem = alloc(size)
- copyMem(mem, x.addr, size)
- return cast[ptr T](mem)
- proc copyTo*[T](val: var T, dest: int) =
- copyMem(pointer(dest), val.addr, sizeof(T))
- proc allocType*[T](): pointer = alloc(sizeof(T))
- proc newShared*[T](): ptr T =
- result = cast[ptr T](allocShared0(sizeof(T)))
- proc copyShared*[T](x: var T): ptr T =
- var
- size = sizeof(T)
- mem = allocShared(size)
- copyMem(mem, x.addr, size)
- return cast[ptr T](mem)
- #------------------------------------------------------------------------------
- ## Pointer arithmetic
- proc `+`*(p: pointer, i: int): pointer {.inline.} =
- cast[pointer](cast[int](p) + i)
- const
- minTableSize = 8
- reProbeLimit = 12
- minCopyWork = 4096
- intSize = sizeof(int)
- when sizeof(int) == 4: # 32bit
- type
- Raw = range[0..1073741823]
- ## The range of uint values that can be stored directly in a value slot
- ## when on a 32 bit platform
- {.deprecated: [TRaw: Raw].}
- elif sizeof(int) == 8: # 64bit
- type
- Raw = range[0'i64..4611686018427387903'i64]
- ## The range of uint values that can be stored directly in a value slot
- ## when on a 64 bit platform
- {.deprecated: [TRaw: Raw].}
- else:
- {.error: "unsupported platform".}
- type
- Entry = tuple
- key: int
- value: int
- EntryArr = ptr array[0..10_000_000, Entry]
- PConcTable[K,V] = ptr object {.pure.}
- len: int
- used: int
- active: int
- copyIdx: int
- copyDone: int
- next: PConcTable[K,V]
- data: EntryArr
- {.deprecated: [TEntry: Entry, TEntryArr: EntryArr].}
- proc setVal[K,V](table: var PConcTable[K,V], key: int, val: int,
- expVal: int, match: bool): int
- #------------------------------------------------------------------------------
- # Create a new table
- proc newLFTable*[K,V](size: int = minTableSize): PConcTable[K,V] =
- let
- dataLen = max(nextPowerOfTwo(size), minTableSize)
- dataSize = dataLen*sizeof(Entry)
- dataMem = allocShared0(dataSize)
- tableSize = 7 * intSize
- tableMem = allocShared0(tableSize)
- table = cast[PConcTable[K,V]](tableMem)
- table.len = dataLen
- table.used = 0
- table.active = 0
- table.copyIdx = 0
- table.copyDone = 0
- table.next = nil
- table.data = cast[EntryArr](dataMem)
- result = table
- #------------------------------------------------------------------------------
- # Delete a table
- proc deleteConcTable[K,V](tbl: PConcTable[K,V]) =
- deallocShared(tbl.data)
- deallocShared(tbl)
- #------------------------------------------------------------------------------
- proc `[]`[K,V](table: var PConcTable[K,V], i: int): var Entry {.inline.} =
- table.data[i]
- #------------------------------------------------------------------------------
- # State flags stored in ptr
- proc pack[T](x: T): int {.inline.} =
- result = (cast[int](x) shl 2)
- #echo("packKey ",cast[int](x) , " -> ", result)
- # Pop the flags off returning a 4 byte aligned ptr to our Key or Val
- proc pop(x: int): int {.inline.} =
- result = x and 0xFFFFFFFC'i32
- # Pop the raw value off of our Key or Val
- proc popRaw(x: int): int {.inline.} =
- result = x shr 2
- # Pop the flags off returning a 4 byte aligned ptr to our Key or Val
- proc popPtr[V](x: int): ptr V {.inline.} =
- result = cast[ptr V](pop(x))
- #echo("popPtr " & $x & " -> " & $cast[int](result))
- # Ghost (sentinel)
- # K or V is no longer valid use new table
- const Ghost = 0xFFFFFFFC
- proc isGhost(x: int): bool {.inline.} =
- result = x == 0xFFFFFFFC
- # Tombstone
- # applied to V = K is dead
- proc isTomb(x: int): bool {.inline.} =
- result = (x and 0x00000002) != 0
- proc setTomb(x: int): int {.inline.} =
- result = x or 0x00000002
- # Prime
- # K or V is in new table copied from old
- proc isPrime(x: int): bool {.inline.} =
- result = (x and 0x00000001) != 0
- proc setPrime(x: int): int {.inline.} =
- result = x or 0x00000001
- #------------------------------------------------------------------------------
- ##This is for i32 only need to override for i64
- proc hashInt(x: int):int {.inline.} =
- var h = uint32(x) #shr 2'u32
- h = h xor (h shr 16'u32)
- h *= 0x85ebca6b'u32
- h = h xor (h shr 13'u32)
- h *= 0xc2b2ae35'u32
- h = h xor (h shr 16'u32)
- result = int(h)
- #------------------------------------------------------------------------------
- proc resize[K,V](self: PConcTable[K,V]): PConcTable[K,V] =
- var next = atomic_load_n(self.next.addr, ATOMIC_RELAXED)
- #echo("next = " & $cast[int](next))
- if next != nil:
- #echo("A new table already exists, copy in progress")
- return next
- var
- oldLen = atomic_load_n(self.len.addr, ATOMIC_RELAXED)
- newTable = newLFTable[K,V](oldLen*2)
- success = atomic_compare_exchange_n(self.next.addr, next.addr, newTable,
- false, ATOMIC_RELAXED, ATOMIC_RELAXED)
- if not success:
- echo("someone beat us to it! delete table we just created and return his " & $cast[int](next))
- deleteConcTable(newTable)
- return next
- else:
- echo("Created New Table! " & $cast[int](newTable) & " Size = " & $newTable.len)
- return newTable
- #------------------------------------------------------------------------------
- #proc keyEQ[K](key1: ptr K, key2: ptr K): bool {.inline.} =
- proc keyEQ[K](key1: int, key2: int): bool {.inline.} =
- result = false
- when K is Raw:
- if key1 == key2:
- result = true
- else:
- var
- p1 = popPtr[K](key1)
- p2 = popPtr[K](key2)
- if p1 != nil and p2 != nil:
- if cast[int](p1) == cast[int](p2):
- return true
- if p1[] == p2[]:
- return true
- #------------------------------------------------------------------------------
- #proc tableFull(self: var PConcTable[K,V]) : bool {.inline.} =
- #------------------------------------------------------------------------------
- proc copySlot[K,V](idx: int, oldTbl: var PConcTable[K,V], newTbl: var PConcTable[K,V]): bool =
- #echo("Copy idx " & $idx)
- var
- oldVal = 0
- oldkey = 0
- ok = false
- result = false
- #Block the key so no other threads waste time here
- while not ok:
- ok = atomic_compare_exchange_n(oldTbl[idx].key.addr, oldKey.addr,
- setTomb(oldKey), false, ATOMIC_RELAXED, ATOMIC_RELAXED)
- #echo("oldKey was = " & $oldKey & " set it to tomb " & $setTomb(oldKey))
- #Prevent new values from appearing in the old table by priming
- oldVal = atomic_load_n(oldTbl[idx].value.addr, ATOMIC_RELAXED)
- while not isPrime(oldVal):
- var box = if oldVal == 0 or isTomb(oldVal) : oldVal.setTomb.setPrime
- else: oldVal.setPrime
- if atomic_compare_exchange_n(oldTbl[idx].value.addr, oldVal.addr,
- box, false, ATOMIC_RELAXED, ATOMIC_RELAXED):
- if isPrime(box) and isTomb(box):
- return true
- oldVal = box
- break
- #echo("oldVal was = ", oldVal, " set it to prime ", box)
- if isPrime(oldVal) and isTomb(oldVal):
- #when not (K is Raw):
- # deallocShared(popPtr[K](oldKey))
- return false
- if isTomb(oldVal):
- echo("oldVal is Tomb!!!, should not happen")
- if pop(oldVal) != 0:
- result = setVal(newTbl, pop(oldKey), pop(oldVal), 0, true) == 0
- #if result:
- #echo("Copied a Slot! idx= " & $idx & " key= " & $oldKey & " val= " & $oldVal)
- #else:
- #echo("copy slot failed")
- # Our copy is done so we disable the old slot
- while not ok:
- ok = atomic_compare_exchange_n(oldTbl[idx].value.addr, oldVal.addr,
- oldVal.setTomb.setPrime , false, ATOMIC_RELAXED, ATOMIC_RELAXED)
- #echo("disabled old slot")
- #echo"---------------------"
- #------------------------------------------------------------------------------
- proc promote[K,V](table: var PConcTable[K,V]) =
- var
- newData = atomic_load_n(table.next.data.addr, ATOMIC_RELAXED)
- newLen = atomic_load_n(table.next.len.addr, ATOMIC_RELAXED)
- newUsed = atomic_load_n(table.next.used.addr, ATOMIC_RELAXED)
- deallocShared(table.data)
- atomic_store_n(table.data.addr, newData, ATOMIC_RELAXED)
- atomic_store_n(table.len.addr, newLen, ATOMIC_RELAXED)
- atomic_store_n(table.used.addr, newUsed, ATOMIC_RELAXED)
- atomic_store_n(table.copyIdx.addr, 0, ATOMIC_RELAXED)
- atomic_store_n(table.copyDone.addr, 0, ATOMIC_RELAXED)
- deallocShared(table.next)
- atomic_store_n(table.next.addr, nil, ATOMIC_RELAXED)
- echo("new table swapped!")
- #------------------------------------------------------------------------------
- proc checkAndPromote[K,V](table: var PConcTable[K,V], workDone: int): bool =
- var
- oldLen = atomic_load_n(table.len.addr, ATOMIC_RELAXED)
- copyDone = atomic_load_n(table.copyDone.addr, ATOMIC_RELAXED)
- ok: bool
- result = false
- if workDone > 0:
- #echo("len to copy =" & $oldLen)
- #echo("copyDone + workDone = " & $copyDone & " + " & $workDone)
- while not ok:
- ok = atomic_compare_exchange_n(table.copyDone.addr, copyDone.addr,
- copyDone + workDone, false, ATOMIC_RELAXED, ATOMIC_RELAXED)
- #if ok: echo("set copyDone")
- # If the copy is done we can promote this table
- if copyDone + workDone >= oldLen:
- # Swap new data
- #echo("work is done!")
- table.promote
- result = true
- #------------------------------------------------------------------------------
- proc copySlotAndCheck[K,V](table: var PConcTable[K,V], idx: int):
- PConcTable[K,V] =
- var
- newTable = cast[PConcTable[K,V]](atomic_load_n(table.next.addr, ATOMIC_RELAXED))
- result = newTable
- if newTable != nil and copySlot(idx, table, newTable):
- #echo("copied a single slot, idx = " & $idx)
- if checkAndPromote(table, 1): return table
- #------------------------------------------------------------------------------
- proc helpCopy[K,V](table: var PConcTable[K,V]): PConcTable[K,V] =
- var
- newTable = cast[PConcTable[K,V]](atomic_load_n(table.next.addr, ATOMIC_RELAXED))
- result = newTable
- if newTable != nil:
- var
- oldLen = atomic_load_n(table.len.addr, ATOMIC_RELAXED)
- copyDone = atomic_load_n(table.copyDone.addr, ATOMIC_RELAXED)
- copyIdx = 0
- work = min(oldLen, minCopyWork)
- #panicStart = -1
- workDone = 0
- if copyDone < oldLen:
- var ok: bool
- while not ok:
- ok = atomic_compare_exchange_n(table.copyIdx.addr, copyIdx.addr,
- copyIdx + work, false, ATOMIC_RELAXED, ATOMIC_RELAXED)
- #echo("copy idx = ", copyIdx)
- for i in 0..work-1:
- var idx = (copyIdx + i) and (oldLen - 1)
- if copySlot(idx, table, newTable):
- workDone += 1
- if workDone > 0:
- #echo("did work ", workDone, " on thread ", cast[int](myThreadID[pointer]()))
- if checkAndPromote(table, workDone): return table
- # In case a thread finished all the work then got stalled before promotion
- if checkAndPromote(table, 0): return table
- #------------------------------------------------------------------------------
- proc setVal[K,V](table: var PConcTable[K,V], key: int, val: int,
- expVal: int, match: bool): int =
- #echo("-try set- in table ", " key = ", (popPtr[K](key)[]), " val = ", val)
- when K is Raw:
- var idx = hashInt(key)
- else:
- var idx = popPtr[K](key)[].hash
- var
- nextTable: PConcTable[K,V]
- probes = 1
- # spin until we find a key slot or build and jump to next table
- while true:
- idx = idx and (table.len - 1)
- #echo("try set idx = " & $idx & "for" & $key)
- var
- probedKey = 0
- openKey = atomic_compare_exchange_n(table[idx].key.addr, probedKey.addr,
- key, false, ATOMIC_RELAXED, ATOMIC_RELAXED)
- if openKey:
- if val.isTomb:
- #echo("val was tomb, bail, no reason to set an open slot to tomb")
- return val
- #increment used slots
- #echo("found an open slot, total used = " &
- #$atomic_add_fetch(table.used.addr, 1, ATOMIC_RELAXED))
- discard atomic_add_fetch(table.used.addr, 1, ATOMIC_RELAXED)
- break # We found an open slot
- #echo("set idx ", idx, " key = ", key, " probed = ", probedKey)
- if keyEQ[K](probedKey, key):
- #echo("we found the matching slot")
- break # We found a matching slot
- if (not(expVal != 0 and match)) and (probes >= reProbeLimit or key.isTomb):
- if key.isTomb: echo("Key is Tombstone")
- #if probes >= reProbeLimit: echo("Too much probing " & $probes)
- #echo("try to resize")
- #create next bigger table
- nextTable = resize(table)
- #help do some copying
- #echo("help copy old table to new")
- nextTable = helpCopy(table)
- #now setVal in the new table instead
- #echo("jumping to next table to set val")
- return setVal(nextTable, key, val, expVal, match)
- else:
- idx += 1
- probes += 1
- # Done spinning for a new slot
- var oldVal = atomic_load_n(table[idx].value.addr, ATOMIC_RELAXED)
- if val == oldVal:
- #echo("this val is alredy in the slot")
- return oldVal
- nextTable = atomic_load_n(table.next.addr, ATOMIC_SEQ_CST)
- if nextTable == nil and
- ((oldVal == 0 and
- (probes >= reProbeLimit or table.used / table.len > 0.8)) or
- (isPrime(oldVal))):
- if table.used / table.len > 0.8: echo("resize because usage ratio = " &
- $(table.used / table.len))
- if isPrime(oldVal): echo("old val isPrime, should be a rare mem ordering event")
- nextTable = resize(table)
- if nextTable != nil:
- #echo("tomb old slot then set in new table")
- nextTable = copySlotAndCheck(table,idx)
- return setVal(nextTable, key, val, expVal, match)
- # Finally ready to add new val to table
- while true:
- if match and oldVal != expVal:
- #echo("set failed, no match oldVal= " & $oldVal & " expVal= " & $expVal)
- return oldVal
- if atomic_compare_exchange_n(table[idx].value.addr, oldVal.addr,
- val, false, ATOMIC_RELEASE, ATOMIC_RELAXED):
- #echo("val set at table " & $cast[int](table))
- if expVal != 0:
- if (oldVal == 0 or isTomb(oldVal)) and not isTomb(val):
- discard atomic_add_fetch(table.active.addr, 1, ATOMIC_RELAXED)
- elif not (oldVal == 0 or isTomb(oldVal)) and isTomb(val):
- discard atomic_add_fetch(table.active.addr, -1, ATOMIC_RELAXED)
- if oldVal == 0 and expVal != 0:
- return setTomb(oldVal)
- else: return oldVal
- if isPrime(oldVal):
- nextTable = copySlotAndCheck(table, idx)
- return setVal(nextTable, key, val, expVal, match)
- #------------------------------------------------------------------------------
- proc getVal[K,V](table: var PConcTable[K,V], key: int): int =
- #echo("-try get- key = " & $key)
- when K is Raw:
- var idx = hashInt(key)
- else:
- var idx = popPtr[K](key)[].hash
- #echo("get idx ", idx)
- var
- probes = 0
- val: int
- while true:
- idx = idx and (table.len - 1)
- var
- newTable: PConcTable[K,V] # = atomic_load_n(table.next.addr, ATOMIC_ACQUIRE)
- probedKey = atomic_load_n(table[idx].key.addr, ATOMIC_SEQ_CST)
- if keyEQ[K](probedKey, key):
- #echo("found key after ", probes+1)
- val = atomic_load_n(table[idx].value.addr, ATOMIC_ACQUIRE)
- if not isPrime(val):
- if isTomb(val):
- #echo("val was tomb but not prime")
- return 0
- else:
- #echo("-GotIt- idx = ", idx, " key = ", key, " val ", val )
- return val
- else:
- newTable = copySlotAndCheck(table, idx)
- return getVal(newTable, key)
- else:
- #echo("probe ", probes, " idx = ", idx, " key = ", key, " found ", probedKey )
- if probes >= reProbeLimit*4 or key.isTomb:
- if newTable == nil:
- #echo("too many probes and no new table ", key, " ", idx )
- return 0
- else:
- newTable = helpCopy(table)
- return getVal(newTable, key)
- idx += 1
- probes += 1
- #------------------------------------------------------------------------------
- #proc set*(table: var PConcTable[Raw,Raw], key: Raw, val: Raw) =
- # discard setVal(table, pack(key), pack(key), 0, false)
- #proc set*[V](table: var PConcTable[Raw,V], key: Raw, val: ptr V) =
- # discard setVal(table, pack(key), cast[int](val), 0, false)
- proc set*[K,V](table: var PConcTable[K,V], key: var K, val: var V) =
- when not (K is Raw):
- var newKey = cast[int](copyShared(key))
- else:
- var newKey = pack(key)
- when not (V is Raw):
- var newVal = cast[int](copyShared(val))
- else:
- var newVal = pack(val)
- var oldPtr = pop(setVal(table, newKey, newVal, 0, false))
- #echo("oldPtr = ", cast[int](oldPtr), " newPtr = ", cast[int](newPtr))
- when not (V is Raw):
- if newVal != oldPtr and oldPtr != 0:
- deallocShared(cast[ptr V](oldPtr))
- proc get*[K,V](table: var PConcTable[K,V], key: var K): V =
- when not (V is Raw):
- when not (K is Raw):
- return popPtr[V](getVal(table, cast[int](key.addr)))[]
- else:
- return popPtr[V](getVal(table, pack(key)))[]
- else:
- when not (K is Raw):
- return popRaw(getVal(table, cast[int](key.addr)))
- else:
- return popRaw(getVal(table, pack(key)))
- #proc `[]`[K,V](table: var PConcTable[K,V], key: K): PEntry[K,V] {.inline.} =
- # getVal(table, key)
- #proc `[]=`[K,V](table: var PConcTable[K,V], key: K, val: V): PEntry[K,V] {.inline.} =
- # setVal(table, key, val)
- #Tests ----------------------------
- when not defined(testing) and isMainModule:
- import locks, times, mersenne
- const
- numTests = 100000
- numThreads = 10
- type
- TestObj = tuple
- thr: int
- f0: int
- f1: int
- Data = tuple[k: string,v: TestObj]
- PDataArr = array[0..numTests-1, Data]
- Dict = PConcTable[string,TestObj]
- {.deprecated: [TTestObj: TestObj, TData: Data].}
- var
- thr: array[0..numThreads-1, Thread[Dict]]
- table = newLFTable[string,TestObj](8)
- rand = newMersenneTwister(2525)
- proc createSampleData(len: int): PDataArr =
- #result = cast[PDataArr](allocShared0(sizeof(Data)*numTests))
- for i in 0..len-1:
- result[i].k = "mark" & $(i+1)
- #echo("mark" & $(i+1), " ", hash("mark" & $(i+1)))
- result[i].v.thr = 0
- result[i].v.f0 = i+1
- result[i].v.f1 = 0
- #echo("key = " & $(i+1) & " Val ptr = " & $cast[int](result[i].v.addr))
- proc threadProc(tp: Dict) {.thread.} =
- var t = cpuTime();
- for i in 1..numTests:
- var key = "mark" & $(i)
- var got = table.get(key)
- got.thr = cast[int](myThreadID[pointer]())
- got.f1 = got.f1 + 1
- table.set(key, got)
- t = cpuTime() - t
- echo t
- var testData = createSampleData(numTests)
- for i in 0..numTests-1:
- table.set(testData[i].k, testData[i].v)
- var i = 0
- while i < numThreads:
- createThread(thr[i], threadProc, table)
- i += 1
- joinThreads(thr)
- var fails = 0
- for i in 0..numTests-1:
- var got = table.get(testData[i].k)
- if got.f0 != i+1 or got.f1 != numThreads:
- fails += 1
- echo(got)
- echo("Failed read or write = ", fails)
- #for i in 1..numTests:
- # echo(i, " = ", hashInt(i) and 8191)
- deleteConcTable(table)
|