spawn.nim 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429
  1. #
  2. #
  3. # The Nim Compiler
  4. # (c) Copyright 2015 Andreas Rumpf
  5. #
  6. # See the file "copying.txt", included in this
  7. # distribution, for details about the copyright.
  8. #
  9. ## This module implements threadpool's ``spawn``.
  10. import ast, types, idents, magicsys, msgs, options, modulegraphs,
  11. lowerings
  12. from trees import getMagic
  13. proc callProc(a: PNode): PNode =
  14. result = newNodeI(nkCall, a.info)
  15. result.add a
  16. result.typ = a.typ.sons[0]
  17. # we have 4 cases to consider:
  18. # - a void proc --> nothing to do
  19. # - a proc returning GC'ed memory --> requires a flowVar
  20. # - a proc returning non GC'ed memory --> pass as hidden 'var' parameter
  21. # - not in a parallel environment --> requires a flowVar for memory safety
  22. type
  23. TSpawnResult* = enum
  24. srVoid, srFlowVar, srByVar
  25. TFlowVarKind = enum
  26. fvInvalid # invalid type T for 'FlowVar[T]'
  27. fvGC # FlowVar of a GC'ed type
  28. fvBlob # FlowVar of a blob type
  29. proc spawnResult*(t: PType; inParallel: bool): TSpawnResult =
  30. if t.isEmptyType: srVoid
  31. elif inParallel and not containsGarbageCollectedRef(t): srByVar
  32. else: srFlowVar
  33. proc flowVarKind(t: PType): TFlowVarKind =
  34. if t.skipTypes(abstractInst).kind in {tyRef, tyString, tySequence}: fvGC
  35. elif containsGarbageCollectedRef(t): fvInvalid
  36. else: fvBlob
  37. proc typeNeedsNoDeepCopy(t: PType): bool =
  38. var t = t.skipTypes(abstractInst)
  39. # for the tconvexhull example (and others) we're a bit lax here and pretend
  40. # seqs and strings are *by value* only and 'shallow' doesn't exist!
  41. if t.kind == tyString: return true
  42. # note that seq[T] is fine, but 'var seq[T]' is not, so we need to skip 'var'
  43. # for the stricter check and likewise we can skip 'seq' for a less
  44. # strict check:
  45. if t.kind in {tyVar, tyLent, tySequence}: t = t.lastSon
  46. result = not containsGarbageCollectedRef(t)
  47. proc addLocalVar(g: ModuleGraph; varSection, varInit: PNode; owner: PSym; typ: PType;
  48. v: PNode; useShallowCopy=false): PSym =
  49. result = newSym(skTemp, getIdent(g.cache, genPrefix), owner, varSection.info,
  50. owner.options)
  51. result.typ = typ
  52. incl(result.flags, sfFromGeneric)
  53. var vpart = newNodeI(nkIdentDefs, varSection.info, 3)
  54. vpart.sons[0] = newSymNode(result)
  55. vpart.sons[1] = newNodeI(nkEmpty, varSection.info)
  56. vpart.sons[2] = if varInit.isNil: v else: vpart[1]
  57. varSection.add vpart
  58. if varInit != nil:
  59. if useShallowCopy and typeNeedsNoDeepCopy(typ):
  60. varInit.add newFastAsgnStmt(newSymNode(result), v)
  61. else:
  62. let deepCopyCall = newNodeI(nkCall, varInit.info, 3)
  63. deepCopyCall.sons[0] = newSymNode(getSysMagic(g, varSection.info, "deepCopy", mDeepCopy))
  64. deepCopyCall.sons[1] = newSymNode(result)
  65. deepCopyCall.sons[2] = v
  66. varInit.add deepCopyCall
  67. discard """
  68. We generate roughly this:
  69. proc f_wrapper(thread, args) =
  70. barrierEnter(args.barrier) # for parallel statement
  71. var a = args.a # thread transfer; deepCopy or shallowCopy or no copy
  72. # depending on whether we're in a 'parallel' statement
  73. var b = args.b
  74. var fv = args.fv
  75. fv.owner = thread # optional
  76. nimArgsPassingDone() # signal parent that the work is done
  77. #
  78. args.fv.blob = f(a, b, ...)
  79. nimFlowVarSignal(args.fv)
  80. # - or -
  81. f(a, b, ...)
  82. barrierLeave(args.barrier) # for parallel statement
  83. stmtList:
  84. var scratchObj
  85. scratchObj.a = a
  86. scratchObj.b = b
  87. nimSpawn(f_wrapper, addr scratchObj)
  88. scratchObj.fv # optional
  89. """
  90. proc createWrapperProc(g: ModuleGraph; f: PNode; threadParam, argsParam: PSym;
  91. varSection, varInit, call, barrier, fv: PNode;
  92. spawnKind: TSpawnResult): PSym =
  93. var body = newNodeI(nkStmtList, f.info)
  94. body.flags.incl nfTransf # do not transform further
  95. var threadLocalBarrier: PSym
  96. if barrier != nil:
  97. var varSection2 = newNodeI(nkVarSection, barrier.info)
  98. threadLocalBarrier = addLocalVar(g, varSection2, nil, argsParam.owner,
  99. barrier.typ, barrier)
  100. body.add varSection2
  101. body.add callCodegenProc(g, "barrierEnter", threadLocalBarrier.info,
  102. threadLocalBarrier.newSymNode)
  103. var threadLocalProm: PSym
  104. if spawnKind == srByVar:
  105. threadLocalProm = addLocalVar(g, varSection, nil, argsParam.owner, fv.typ, fv)
  106. elif fv != nil:
  107. internalAssert g.config, fv.typ.kind == tyGenericInst
  108. threadLocalProm = addLocalVar(g, varSection, nil, argsParam.owner, fv.typ, fv)
  109. body.add varSection
  110. body.add varInit
  111. if fv != nil and spawnKind != srByVar:
  112. # generate:
  113. # fv.owner = threadParam
  114. body.add newAsgnStmt(indirectAccess(threadLocalProm.newSymNode,
  115. "owner", fv.info, g.cache), threadParam.newSymNode)
  116. body.add callCodegenProc(g, "nimArgsPassingDone", threadParam.info,
  117. threadParam.newSymNode)
  118. if spawnKind == srByVar:
  119. body.add newAsgnStmt(genDeref(threadLocalProm.newSymNode), call)
  120. elif fv != nil:
  121. let fk = fv.typ.sons[1].flowVarKind
  122. if fk == fvInvalid:
  123. localError(g.config, f.info, "cannot create a flowVar of type: " &
  124. typeToString(fv.typ.sons[1]))
  125. body.add newAsgnStmt(indirectAccess(threadLocalProm.newSymNode,
  126. if fk == fvGC: "data" else: "blob", fv.info, g.cache), call)
  127. if fk == fvGC:
  128. let incRefCall = newNodeI(nkCall, fv.info, 2)
  129. incRefCall.sons[0] = newSymNode(getSysMagic(g, fv.info, "GCref", mGCref))
  130. incRefCall.sons[1] = indirectAccess(threadLocalProm.newSymNode,
  131. "data", fv.info, g.cache)
  132. body.add incRefCall
  133. if barrier == nil:
  134. # by now 'fv' is shared and thus might have beeen overwritten! we need
  135. # to use the thread-local view instead:
  136. body.add callCodegenProc(g, "nimFlowVarSignal", threadLocalProm.info,
  137. threadLocalProm.newSymNode)
  138. else:
  139. body.add call
  140. if barrier != nil:
  141. body.add callCodegenProc(g, "barrierLeave", threadLocalBarrier.info,
  142. threadLocalBarrier.newSymNode)
  143. var params = newNodeI(nkFormalParams, f.info)
  144. params.add newNodeI(nkEmpty, f.info)
  145. params.add threadParam.newSymNode
  146. params.add argsParam.newSymNode
  147. var t = newType(tyProc, threadParam.owner)
  148. t.rawAddSon nil
  149. t.rawAddSon threadParam.typ
  150. t.rawAddSon argsParam.typ
  151. t.n = newNodeI(nkFormalParams, f.info)
  152. t.n.add newNodeI(nkEffectList, f.info)
  153. t.n.add threadParam.newSymNode
  154. t.n.add argsParam.newSymNode
  155. let name = (if f.kind == nkSym: f.sym.name.s else: genPrefix) & "Wrapper"
  156. result = newSym(skProc, getIdent(g.cache, name), argsParam.owner, f.info,
  157. argsParam.options)
  158. let emptyNode = newNodeI(nkEmpty, f.info)
  159. result.ast = newProcNode(nkProcDef, f.info, body = body,
  160. params = params, name = newSymNode(result), pattern = emptyNode,
  161. genericParams = emptyNode, pragmas = emptyNode,
  162. exceptions = emptyNode)
  163. result.typ = t
  164. proc createCastExpr(argsParam: PSym; objType: PType): PNode =
  165. result = newNodeI(nkCast, argsParam.info)
  166. result.add newNodeI(nkEmpty, argsParam.info)
  167. result.add newSymNode(argsParam)
  168. result.typ = newType(tyPtr, objType.owner)
  169. result.typ.rawAddSon(objType)
  170. proc setupArgsForConcurrency(g: ModuleGraph; n: PNode; objType: PType; scratchObj: PSym,
  171. castExpr, call,
  172. varSection, varInit, result: PNode) =
  173. let formals = n[0].typ.n
  174. let tmpName = getIdent(g.cache, genPrefix)
  175. for i in 1 ..< n.len:
  176. # we pick n's type here, which hopefully is 'tyArray' and not
  177. # 'tyOpenArray':
  178. var argType = n[i].typ.skipTypes(abstractInst)
  179. if i < formals.len and formals[i].typ.kind in {tyVar, tyLent}:
  180. localError(g.config, n[i].info, "'spawn'ed function cannot have a 'var' parameter")
  181. #elif containsTyRef(argType):
  182. # localError(n[i].info, "'spawn'ed function cannot refer to 'ref'/closure")
  183. let fieldname = if i < formals.len: formals[i].sym.name else: tmpName
  184. var field = newSym(skField, fieldname, objType.owner, n.info, g.config.options)
  185. field.typ = argType
  186. objType.addField(field, g.cache)
  187. result.add newFastAsgnStmt(newDotExpr(scratchObj, field), n[i])
  188. let temp = addLocalVar(g, varSection, varInit, objType.owner, argType,
  189. indirectAccess(castExpr, field, n.info))
  190. call.add(newSymNode(temp))
  191. proc getRoot*(n: PNode): PSym =
  192. ## ``getRoot`` takes a *path* ``n``. A path is an lvalue expression
  193. ## like ``obj.x[i].y``. The *root* of a path is the symbol that can be
  194. ## determined as the owner; ``obj`` in the example.
  195. case n.kind
  196. of nkSym:
  197. if n.sym.kind in {skVar, skResult, skTemp, skLet, skForVar}:
  198. result = n.sym
  199. of nkDotExpr, nkBracketExpr, nkHiddenDeref, nkDerefExpr,
  200. nkObjUpConv, nkObjDownConv, nkCheckedFieldExpr:
  201. result = getRoot(n.sons[0])
  202. of nkHiddenStdConv, nkHiddenSubConv, nkConv:
  203. result = getRoot(n.sons[1])
  204. of nkCallKinds:
  205. if getMagic(n) == mSlice: result = getRoot(n.sons[1])
  206. else: discard
  207. proc setupArgsForParallelism(g: ModuleGraph; n: PNode; objType: PType; scratchObj: PSym;
  208. castExpr, call,
  209. varSection, varInit, result: PNode) =
  210. let formals = n[0].typ.n
  211. let tmpName = getIdent(g.cache, genPrefix)
  212. # we need to copy the foreign scratch object fields into local variables
  213. # for correctness: These are called 'threadLocal' here.
  214. for i in 1 ..< n.len:
  215. let n = n[i]
  216. let argType = skipTypes(if i < formals.len: formals[i].typ else: n.typ,
  217. abstractInst)
  218. #if containsTyRef(argType):
  219. # localError(n.info, "'spawn'ed function cannot refer to 'ref'/closure")
  220. let fieldname = if i < formals.len: formals[i].sym.name else: tmpName
  221. var field = newSym(skField, fieldname, objType.owner, n.info, g.config.options)
  222. if argType.kind in {tyVarargs, tyOpenArray}:
  223. # important special case: we always create a zero-copy slice:
  224. let slice = newNodeI(nkCall, n.info, 4)
  225. slice.typ = n.typ
  226. slice.sons[0] = newSymNode(createMagic(g, "slice", mSlice))
  227. slice.sons[0].typ = getSysType(g, n.info, tyInt) # fake type
  228. var fieldB = newSym(skField, tmpName, objType.owner, n.info, g.config.options)
  229. fieldB.typ = getSysType(g, n.info, tyInt)
  230. objType.addField(fieldB, g.cache)
  231. if getMagic(n) == mSlice:
  232. let a = genAddrOf(n[1])
  233. field.typ = a.typ
  234. objType.addField(field, g.cache)
  235. result.add newFastAsgnStmt(newDotExpr(scratchObj, field), a)
  236. var fieldA = newSym(skField, tmpName, objType.owner, n.info, g.config.options)
  237. fieldA.typ = getSysType(g, n.info, tyInt)
  238. objType.addField(fieldA, g.cache)
  239. result.add newFastAsgnStmt(newDotExpr(scratchObj, fieldA), n[2])
  240. result.add newFastAsgnStmt(newDotExpr(scratchObj, fieldB), n[3])
  241. let threadLocal = addLocalVar(g, varSection,nil, objType.owner, fieldA.typ,
  242. indirectAccess(castExpr, fieldA, n.info),
  243. useShallowCopy=true)
  244. slice.sons[2] = threadLocal.newSymNode
  245. else:
  246. let a = genAddrOf(n)
  247. field.typ = a.typ
  248. objType.addField(field, g.cache)
  249. result.add newFastAsgnStmt(newDotExpr(scratchObj, field), a)
  250. result.add newFastAsgnStmt(newDotExpr(scratchObj, fieldB), genHigh(g, n))
  251. slice.sons[2] = newIntLit(g, n.info, 0)
  252. # the array itself does not need to go through a thread local variable:
  253. slice.sons[1] = genDeref(indirectAccess(castExpr, field, n.info))
  254. let threadLocal = addLocalVar(g, varSection,nil, objType.owner, fieldB.typ,
  255. indirectAccess(castExpr, fieldB, n.info),
  256. useShallowCopy=true)
  257. slice.sons[3] = threadLocal.newSymNode
  258. call.add slice
  259. elif (let size = computeSize(g.config, argType); size < 0 or size > 16) and
  260. n.getRoot != nil:
  261. # it is more efficient to pass a pointer instead:
  262. let a = genAddrOf(n)
  263. field.typ = a.typ
  264. objType.addField(field, g.cache)
  265. result.add newFastAsgnStmt(newDotExpr(scratchObj, field), a)
  266. let threadLocal = addLocalVar(g, varSection,nil, objType.owner, field.typ,
  267. indirectAccess(castExpr, field, n.info),
  268. useShallowCopy=true)
  269. call.add(genDeref(threadLocal.newSymNode))
  270. else:
  271. # boring case
  272. field.typ = argType
  273. objType.addField(field, g.cache)
  274. result.add newFastAsgnStmt(newDotExpr(scratchObj, field), n)
  275. let threadLocal = addLocalVar(g, varSection, varInit,
  276. objType.owner, field.typ,
  277. indirectAccess(castExpr, field, n.info),
  278. useShallowCopy=true)
  279. call.add(threadLocal.newSymNode)
  280. proc wrapProcForSpawn*(g: ModuleGraph; owner: PSym; spawnExpr: PNode; retType: PType;
  281. barrier, dest: PNode = nil): PNode =
  282. # if 'barrier' != nil, then it is in a 'parallel' section and we
  283. # generate quite different code
  284. let n = spawnExpr[^2]
  285. let spawnKind = spawnResult(retType, barrier!=nil)
  286. case spawnKind
  287. of srVoid:
  288. internalAssert g.config, dest == nil
  289. result = newNodeI(nkStmtList, n.info)
  290. of srFlowVar:
  291. internalAssert g.config, dest == nil
  292. result = newNodeIT(nkStmtListExpr, n.info, retType)
  293. of srByVar:
  294. if dest == nil: localError(g.config, n.info, "'spawn' must not be discarded")
  295. result = newNodeI(nkStmtList, n.info)
  296. if n.kind notin nkCallKinds:
  297. localError(g.config, n.info, "'spawn' takes a call expression")
  298. return
  299. if optThreadAnalysis in g.config.globalOptions:
  300. if {tfThread, tfNoSideEffect} * n[0].typ.flags == {}:
  301. localError(g.config, n.info, "'spawn' takes a GC safe call expression")
  302. var
  303. threadParam = newSym(skParam, getIdent(g.cache, "thread"), owner, n.info, g.config.options)
  304. argsParam = newSym(skParam, getIdent(g.cache, "args"), owner, n.info, g.config.options)
  305. block:
  306. let ptrType = getSysType(g, n.info, tyPointer)
  307. threadParam.typ = ptrType
  308. argsParam.typ = ptrType
  309. argsParam.position = 1
  310. var objType = createObj(g, owner, n.info)
  311. incl(objType.flags, tfFinal)
  312. let castExpr = createCastExpr(argsParam, objType)
  313. var scratchObj = newSym(skVar, getIdent(g.cache, "scratch"), owner, n.info, g.config.options)
  314. block:
  315. scratchObj.typ = objType
  316. incl(scratchObj.flags, sfFromGeneric)
  317. var varSectionB = newNodeI(nkVarSection, n.info)
  318. varSectionB.addVar(scratchObj.newSymNode)
  319. result.add varSectionB
  320. var call = newNodeIT(nkCall, n.info, n.typ)
  321. var fn = n.sons[0]
  322. # templates and macros are in fact valid here due to the nature of
  323. # the transformation:
  324. if fn.kind == nkClosure or (fn.typ != nil and fn.typ.callConv == ccClosure):
  325. localError(g.config, n.info, "closure in spawn environment is not allowed")
  326. if not (fn.kind == nkSym and fn.sym.kind in {skProc, skTemplate, skMacro,
  327. skFunc, skMethod, skConverter}):
  328. # for indirect calls we pass the function pointer in the scratchObj
  329. var argType = n[0].typ.skipTypes(abstractInst)
  330. var field = newSym(skField, getIdent(g.cache, "fn"), owner, n.info, g.config.options)
  331. field.typ = argType
  332. objType.addField(field, g.cache)
  333. result.add newFastAsgnStmt(newDotExpr(scratchObj, field), n[0])
  334. fn = indirectAccess(castExpr, field, n.info)
  335. elif fn.kind == nkSym and fn.sym.kind == skIterator:
  336. localError(g.config, n.info, "iterator in spawn environment is not allowed")
  337. elif fn.typ.callConv == ccClosure:
  338. localError(g.config, n.info, "closure in spawn environment is not allowed")
  339. call.add(fn)
  340. var varSection = newNodeI(nkVarSection, n.info)
  341. var varInit = newNodeI(nkStmtList, n.info)
  342. if barrier.isNil:
  343. setupArgsForConcurrency(g, n, objType, scratchObj, castExpr, call,
  344. varSection, varInit, result)
  345. else:
  346. setupArgsForParallelism(g, n, objType, scratchObj, castExpr, call,
  347. varSection, varInit, result)
  348. var barrierAsExpr: PNode = nil
  349. if barrier != nil:
  350. let typ = newType(tyPtr, owner)
  351. typ.rawAddSon(magicsys.getCompilerProc(g, "Barrier").typ)
  352. var field = newSym(skField, getIdent(g.cache, "barrier"), owner, n.info, g.config.options)
  353. field.typ = typ
  354. objType.addField(field, g.cache)
  355. result.add newFastAsgnStmt(newDotExpr(scratchObj, field), barrier)
  356. barrierAsExpr = indirectAccess(castExpr, field, n.info)
  357. var fvField, fvAsExpr: PNode = nil
  358. if spawnKind == srFlowVar:
  359. var field = newSym(skField, getIdent(g.cache, "fv"), owner, n.info, g.config.options)
  360. field.typ = retType
  361. objType.addField(field, g.cache)
  362. fvField = newDotExpr(scratchObj, field)
  363. fvAsExpr = indirectAccess(castExpr, field, n.info)
  364. # create flowVar:
  365. result.add newFastAsgnStmt(fvField, callProc(spawnExpr[^1]))
  366. if barrier == nil:
  367. result.add callCodegenProc(g, "nimFlowVarCreateSemaphore", fvField.info,
  368. fvField)
  369. elif spawnKind == srByVar:
  370. var field = newSym(skField, getIdent(g.cache, "fv"), owner, n.info, g.config.options)
  371. field.typ = newType(tyPtr, objType.owner)
  372. field.typ.rawAddSon(retType)
  373. objType.addField(field, g.cache)
  374. fvAsExpr = indirectAccess(castExpr, field, n.info)
  375. result.add newFastAsgnStmt(newDotExpr(scratchObj, field), genAddrOf(dest))
  376. let wrapper = createWrapperProc(g, fn, threadParam, argsParam,
  377. varSection, varInit, call,
  378. barrierAsExpr, fvAsExpr, spawnKind)
  379. result.add callCodegenProc(g, "nimSpawn" & $spawnExpr.len, wrapper.info,
  380. wrapper.newSymNode, genAddrOf(scratchObj.newSymNode), nil, spawnExpr)
  381. if spawnKind == srFlowVar: result.add fvField