spawn.nim 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446
  1. #
  2. #
  3. # The Nim Compiler
  4. # (c) Copyright 2015 Andreas Rumpf
  5. #
  6. # See the file "copying.txt", included in this
  7. # distribution, for details about the copyright.
  8. #
  9. ## This module implements threadpool's ``spawn``.
  10. import ast, types, idents, magicsys, msgs, options, modulegraphs,
  11. lowerings, liftdestructors, renderer
  12. from trees import getMagic, getRoot
  13. proc callProc(a: PNode): PNode =
  14. result = newNodeI(nkCall, a.info)
  15. result.add a
  16. result.typ() = a.typ.returnType
  17. # we have 4 cases to consider:
  18. # - a void proc --> nothing to do
  19. # - a proc returning GC'ed memory --> requires a flowVar
  20. # - a proc returning non GC'ed memory --> pass as hidden 'var' parameter
  21. # - not in a parallel environment --> requires a flowVar for memory safety
  22. type
  23. TSpawnResult* = enum
  24. srVoid, srFlowVar, srByVar
  25. TFlowVarKind = enum
  26. fvInvalid # invalid type T for 'FlowVar[T]'
  27. fvGC # FlowVar of a GC'ed type
  28. fvBlob # FlowVar of a blob type
  29. proc spawnResult*(t: PType; inParallel: bool): TSpawnResult =
  30. if t.isEmptyType: srVoid
  31. elif inParallel and not containsGarbageCollectedRef(t): srByVar
  32. else: srFlowVar
  33. proc flowVarKind(c: ConfigRef, t: PType): TFlowVarKind =
  34. if c.selectedGC in {gcArc, gcOrc, gcAtomicArc}: fvBlob
  35. elif t.skipTypes(abstractInst).kind in {tyRef, tyString, tySequence}: fvGC
  36. elif containsGarbageCollectedRef(t): fvInvalid
  37. else: fvBlob
  38. proc typeNeedsNoDeepCopy(t: PType): bool =
  39. var t = t.skipTypes(abstractInst)
  40. # for the tconvexhull example (and others) we're a bit lax here and pretend
  41. # seqs and strings are *by value* only and 'shallow' doesn't exist!
  42. if t.kind == tyString: return true
  43. # note that seq[T] is fine, but 'var seq[T]' is not, so we need to skip 'var'
  44. # for the stricter check and likewise we can skip 'seq' for a less
  45. # strict check:
  46. if t.kind in {tyVar, tyLent, tySequence}: t = t.elementType
  47. result = not containsGarbageCollectedRef(t)
  48. proc addLocalVar(g: ModuleGraph; varSection, varInit: PNode; idgen: IdGenerator; owner: PSym; typ: PType;
  49. v: PNode; useShallowCopy=false): PSym =
  50. result = newSym(skTemp, getIdent(g.cache, genPrefix), idgen, owner, varSection.info,
  51. owner.options)
  52. result.typ = typ
  53. incl(result.flags, sfFromGeneric)
  54. var vpart = newNodeI(nkIdentDefs, varSection.info, 3)
  55. vpart[0] = newSymNode(result)
  56. vpart[1] = newNodeI(nkEmpty, varSection.info)
  57. vpart[2] = if varInit.isNil: v else: vpart[1]
  58. varSection.add vpart
  59. if varInit != nil:
  60. if g.config.selectedGC in {gcArc, gcOrc, gcAtomicArc}:
  61. # inject destructors pass will do its own analysis
  62. varInit.add newFastMoveStmt(g, newSymNode(result), v)
  63. else:
  64. if useShallowCopy and typeNeedsNoDeepCopy(typ) or optTinyRtti in g.config.globalOptions:
  65. varInit.add newFastMoveStmt(g, newSymNode(result), v)
  66. else:
  67. let deepCopyCall = newNodeI(nkCall, varInit.info, 3)
  68. deepCopyCall[0] = newSymNode(getSysMagic(g, varSection.info, "deepCopy", mDeepCopy))
  69. deepCopyCall[1] = newSymNode(result)
  70. deepCopyCall[2] = v
  71. varInit.add deepCopyCall
  72. discard """
  73. We generate roughly this:
  74. proc f_wrapper(thread, args) =
  75. barrierEnter(args.barrier) # for parallel statement
  76. var a = args.a # thread transfer; deepCopy or shallowCopy or no copy
  77. # depending on whether we're in a 'parallel' statement
  78. var b = args.b
  79. var fv = args.fv
  80. fv.owner = thread # optional
  81. nimArgsPassingDone() # signal parent that the work is done
  82. #
  83. args.fv.blob = f(a, b, ...)
  84. nimFlowVarSignal(args.fv)
  85. # - or -
  86. f(a, b, ...)
  87. barrierLeave(args.barrier) # for parallel statement
  88. stmtList:
  89. var scratchObj
  90. scratchObj.a = a
  91. scratchObj.b = b
  92. nimSpawn(f_wrapper, addr scratchObj)
  93. scratchObj.fv # optional
  94. """
  95. proc castToVoidPointer(g: ModuleGraph, n: PNode, fvField: PNode): PNode =
  96. if g.config.backend == backendCpp:
  97. result = fvField
  98. else:
  99. let ptrType = getSysType(g, n.info, tyPointer)
  100. result = newNodeI(nkCast, fvField.info)
  101. result.add newNodeI(nkEmpty, fvField.info)
  102. result.add fvField
  103. result.typ() = ptrType
  104. proc createWrapperProc(g: ModuleGraph; f: PNode; threadParam, argsParam: PSym;
  105. varSection, varInit, call, barrier, fv: PNode;
  106. idgen: IdGenerator;
  107. spawnKind: TSpawnResult, result: PSym) =
  108. var body = newNodeI(nkStmtList, f.info)
  109. var threadLocalBarrier: PSym = nil
  110. if barrier != nil:
  111. var varSection2 = newNodeI(nkVarSection, barrier.info)
  112. threadLocalBarrier = addLocalVar(g, varSection2, nil, idgen, result,
  113. barrier.typ, barrier)
  114. body.add varSection2
  115. body.add callCodegenProc(g, "barrierEnter", threadLocalBarrier.info,
  116. threadLocalBarrier.newSymNode)
  117. var threadLocalProm: PSym = nil
  118. if spawnKind == srByVar:
  119. threadLocalProm = addLocalVar(g, varSection, nil, idgen, result, fv.typ, fv)
  120. elif fv != nil:
  121. internalAssert g.config, fv.typ.kind == tyGenericInst
  122. threadLocalProm = addLocalVar(g, varSection, nil, idgen, result, fv.typ, fv)
  123. body.add varSection
  124. body.add varInit
  125. if fv != nil and spawnKind != srByVar:
  126. # generate:
  127. # fv.owner = threadParam
  128. body.add newAsgnStmt(indirectAccess(threadLocalProm.newSymNode,
  129. "owner", fv.info, g.cache), threadParam.newSymNode)
  130. body.add callCodegenProc(g, "nimArgsPassingDone", threadParam.info,
  131. threadParam.newSymNode)
  132. if spawnKind == srByVar:
  133. body.add newAsgnStmt(genDeref(threadLocalProm.newSymNode), call)
  134. elif fv != nil:
  135. let fk = flowVarKind(g.config, fv.typ.firstGenericParam)
  136. if fk == fvInvalid:
  137. localError(g.config, f.info, "cannot create a flowVar of type: " &
  138. typeToString(fv.typ.firstGenericParam))
  139. body.add newAsgnStmt(indirectAccess(threadLocalProm.newSymNode,
  140. if fk == fvGC: "data" else: "blob", fv.info, g.cache), call)
  141. if fk == fvGC:
  142. let incRefCall = newNodeI(nkCall, fv.info, 2)
  143. incRefCall[0] = newSymNode(getSysMagic(g, fv.info, "GCref", mGCref))
  144. incRefCall[1] = indirectAccess(threadLocalProm.newSymNode,
  145. "data", fv.info, g.cache)
  146. body.add incRefCall
  147. if barrier == nil:
  148. # by now 'fv' is shared and thus might have beeen overwritten! we need
  149. # to use the thread-local view instead:
  150. let castExpr = castToVoidPointer(g, f, threadLocalProm.newSymNode)
  151. body.add callCodegenProc(g, "nimFlowVarSignal", threadLocalProm.info,
  152. castExpr)
  153. else:
  154. body.add call
  155. if barrier != nil:
  156. body.add callCodegenProc(g, "barrierLeave", threadLocalBarrier.info,
  157. threadLocalBarrier.newSymNode)
  158. var params = newNodeI(nkFormalParams, f.info)
  159. params.add newNodeI(nkEmpty, f.info)
  160. params.add threadParam.newSymNode
  161. params.add argsParam.newSymNode
  162. var t = newType(tyProc, idgen, threadParam.owner)
  163. t.rawAddSon nil
  164. t.rawAddSon threadParam.typ
  165. t.rawAddSon argsParam.typ
  166. t.n = newNodeI(nkFormalParams, f.info)
  167. t.n.add newNodeI(nkEffectList, f.info)
  168. t.n.add threadParam.newSymNode
  169. t.n.add argsParam.newSymNode
  170. let emptyNode = newNodeI(nkEmpty, f.info)
  171. result.ast = newProcNode(nkProcDef, f.info, body = body,
  172. params = params, name = newSymNode(result), pattern = emptyNode,
  173. genericParams = emptyNode, pragmas = emptyNode,
  174. exceptions = emptyNode)
  175. result.typ = t
  176. proc createCastExpr(argsParam: PSym; objType: PType; idgen: IdGenerator): PNode =
  177. result = newNodeI(nkCast, argsParam.info)
  178. result.add newNodeI(nkEmpty, argsParam.info)
  179. result.add newSymNode(argsParam)
  180. result.typ() = newType(tyPtr, idgen, objType.owner)
  181. result.typ.rawAddSon(objType)
  182. template checkMagicProcs(g: ModuleGraph, n: PNode, formal: PNode) =
  183. if (formal.typ.kind == tyVarargs and formal.typ.elementType.kind in {tyTyped, tyUntyped}) or
  184. formal.typ.kind in {tyTyped, tyUntyped}:
  185. localError(g.config, n.info, "'spawn'ed function cannot have a 'typed' or 'untyped' parameter")
  186. proc setupArgsForConcurrency(g: ModuleGraph; n: PNode; objType: PType;
  187. idgen: IdGenerator; owner: PSym; scratchObj: PSym,
  188. castExpr, call,
  189. varSection, varInit, result: PNode) =
  190. let formals = n[0].typ.n
  191. let tmpName = getIdent(g.cache, genPrefix)
  192. for i in 1..<n.len:
  193. # we pick n's type here, which hopefully is 'tyArray' and not
  194. # 'tyOpenArray':
  195. var argType = n[i].typ.skipTypes(abstractInst)
  196. if i < formals.len:
  197. if formals[i].typ.kind in {tyVar, tyLent}:
  198. localError(g.config, n[i].info, "'spawn'ed function cannot have a 'var' parameter")
  199. checkMagicProcs(g, n[i], formals[i])
  200. if formals[i].typ.kind in {tyTypeDesc, tyStatic}:
  201. continue
  202. #elif containsTyRef(argType):
  203. # localError(n[i].info, "'spawn'ed function cannot refer to 'ref'/closure")
  204. let fieldname = if i < formals.len: formals[i].sym.name else: tmpName
  205. var field = newSym(skField, fieldname, idgen, objType.owner, n.info, g.config.options)
  206. field.typ = argType
  207. discard objType.addField(field, g.cache, idgen)
  208. result.add newFastAsgnStmt(newDotExpr(scratchObj, field), n[i])
  209. let temp = addLocalVar(g, varSection, varInit, idgen, owner, argType,
  210. indirectAccess(castExpr, field, n.info))
  211. call.add(newSymNode(temp))
  212. proc setupArgsForParallelism(g: ModuleGraph; n: PNode; objType: PType;
  213. idgen: IdGenerator;
  214. owner: PSym; scratchObj: PSym;
  215. castExpr, call,
  216. varSection, varInit, result: PNode) =
  217. let formals = n[0].typ.n
  218. let tmpName = getIdent(g.cache, genPrefix)
  219. # we need to copy the foreign scratch object fields into local variables
  220. # for correctness: These are called 'threadLocal' here.
  221. for i in 1..<n.len:
  222. let n = n[i]
  223. if i < formals.len and formals[i].typ.kind in {tyStatic, tyTypeDesc}:
  224. continue
  225. checkMagicProcs(g, n, formals[i])
  226. let argType = skipTypes(if i < formals.len: formals[i].typ else: n.typ,
  227. abstractInst)
  228. #if containsTyRef(argType):
  229. # localError(n.info, "'spawn'ed function cannot refer to 'ref'/closure")
  230. let fieldname = if i < formals.len: formals[i].sym.name else: tmpName
  231. var field = newSym(skField, fieldname, idgen, objType.owner, n.info, g.config.options)
  232. if argType.kind in {tyVarargs, tyOpenArray}:
  233. # important special case: we always create a zero-copy slice:
  234. let slice = newNodeI(nkCall, n.info, 4)
  235. slice.typ() = n.typ
  236. slice[0] = newSymNode(createMagic(g, idgen, "slice", mSlice))
  237. slice[0].typ() = getSysType(g, n.info, tyInt) # fake type
  238. var fieldB = newSym(skField, tmpName, idgen, objType.owner, n.info, g.config.options)
  239. fieldB.typ = getSysType(g, n.info, tyInt)
  240. discard objType.addField(fieldB, g.cache, idgen)
  241. if getMagic(n) == mSlice:
  242. let a = genAddrOf(n[1], idgen)
  243. field.typ = a.typ
  244. discard objType.addField(field, g.cache, idgen)
  245. result.add newFastAsgnStmt(newDotExpr(scratchObj, field), a)
  246. var fieldA = newSym(skField, tmpName, idgen, objType.owner, n.info, g.config.options)
  247. fieldA.typ = getSysType(g, n.info, tyInt)
  248. discard objType.addField(fieldA, g.cache, idgen)
  249. result.add newFastAsgnStmt(newDotExpr(scratchObj, fieldA), n[2])
  250. result.add newFastAsgnStmt(newDotExpr(scratchObj, fieldB), n[3])
  251. let threadLocal = addLocalVar(g, varSection, nil, idgen, owner, fieldA.typ,
  252. indirectAccess(castExpr, fieldA, n.info),
  253. useShallowCopy=true)
  254. slice[2] = threadLocal.newSymNode
  255. else:
  256. let a = genAddrOf(n, idgen)
  257. field.typ = a.typ
  258. discard objType.addField(field, g.cache, idgen)
  259. result.add newFastAsgnStmt(newDotExpr(scratchObj, field), a)
  260. result.add newFastAsgnStmt(newDotExpr(scratchObj, fieldB), genHigh(g, n))
  261. slice[2] = newIntLit(g, n.info, 0)
  262. # the array itself does not need to go through a thread local variable:
  263. slice[1] = genDeref(indirectAccess(castExpr, field, n.info))
  264. let threadLocal = addLocalVar(g, varSection, nil, idgen, owner, fieldB.typ,
  265. indirectAccess(castExpr, fieldB, n.info),
  266. useShallowCopy=true)
  267. slice[3] = threadLocal.newSymNode
  268. call.add slice
  269. elif (let size = computeSize(g.config, argType); size < 0 or size > 16) and
  270. n.getRoot != nil:
  271. # it is more efficient to pass a pointer instead:
  272. let a = genAddrOf(n, idgen)
  273. field.typ = a.typ
  274. discard objType.addField(field, g.cache, idgen)
  275. result.add newFastAsgnStmt(newDotExpr(scratchObj, field), a)
  276. let threadLocal = addLocalVar(g, varSection, nil, idgen, owner, field.typ,
  277. indirectAccess(castExpr, field, n.info),
  278. useShallowCopy=true)
  279. call.add(genDeref(threadLocal.newSymNode))
  280. else:
  281. # boring case
  282. field.typ = argType
  283. discard objType.addField(field, g.cache, idgen)
  284. result.add newFastAsgnStmt(newDotExpr(scratchObj, field), n)
  285. let threadLocal = addLocalVar(g, varSection, varInit,
  286. idgen, owner, field.typ,
  287. indirectAccess(castExpr, field, n.info),
  288. useShallowCopy=true)
  289. call.add(threadLocal.newSymNode)
  290. proc wrapProcForSpawn*(g: ModuleGraph; idgen: IdGenerator; owner: PSym; spawnExpr: PNode; retType: PType;
  291. barrier: PNode = nil, dest: PNode = nil): PNode =
  292. # if 'barrier' != nil, then it is in a 'parallel' section and we
  293. # generate quite different code
  294. let n = spawnExpr[^2]
  295. let spawnKind = spawnResult(retType, barrier!=nil)
  296. case spawnKind
  297. of srVoid:
  298. internalAssert g.config, dest == nil
  299. result = newNodeI(nkStmtList, n.info)
  300. of srFlowVar:
  301. internalAssert g.config, dest == nil
  302. result = newNodeIT(nkStmtListExpr, n.info, retType)
  303. of srByVar:
  304. if dest == nil: localError(g.config, n.info, "'spawn' must not be discarded")
  305. result = newNodeI(nkStmtList, n.info)
  306. if n.kind notin nkCallKinds:
  307. localError(g.config, n.info, "'spawn' takes a call expression; got: " & $n)
  308. return
  309. if optThreadAnalysis in g.config.globalOptions:
  310. if {tfThread, tfNoSideEffect} * n[0].typ.flags == {}:
  311. localError(g.config, n.info, "'spawn' takes a GC safe call expression")
  312. var fn = n[0]
  313. let
  314. name = (if fn.kind == nkSym: fn.sym.name.s else: genPrefix) & "Wrapper"
  315. wrapperProc = newSym(skProc, getIdent(g.cache, name), idgen, owner, fn.info, g.config.options)
  316. threadParam = newSym(skParam, getIdent(g.cache, "thread"), idgen, wrapperProc, n.info, g.config.options)
  317. argsParam = newSym(skParam, getIdent(g.cache, "args"), idgen, wrapperProc, n.info, g.config.options)
  318. wrapperProc.flags.incl sfInjectDestructors
  319. block:
  320. let ptrType = getSysType(g, n.info, tyPointer)
  321. threadParam.typ = ptrType
  322. argsParam.typ = ptrType
  323. argsParam.position = 1
  324. var objType = createObj(g, idgen, owner, n.info)
  325. incl(objType.flags, tfFinal)
  326. let castExpr = createCastExpr(argsParam, objType, idgen)
  327. var scratchObj = newSym(skVar, getIdent(g.cache, "scratch"), idgen, owner, n.info, g.config.options)
  328. block:
  329. scratchObj.typ = objType
  330. incl(scratchObj.flags, sfFromGeneric)
  331. var varSectionB = newNodeI(nkVarSection, n.info)
  332. varSectionB.addVar(scratchObj.newSymNode)
  333. result.add varSectionB
  334. var call = newNodeIT(nkCall, n.info, n.typ)
  335. # templates and macros are in fact valid here due to the nature of
  336. # the transformation:
  337. if fn.kind == nkClosure or (fn.typ != nil and fn.typ.callConv == ccClosure):
  338. localError(g.config, n.info, "closure in spawn environment is not allowed")
  339. if not (fn.kind == nkSym and fn.sym.kind in {skProc, skTemplate, skMacro,
  340. skFunc, skMethod, skConverter}):
  341. # for indirect calls we pass the function pointer in the scratchObj
  342. var argType = n[0].typ.skipTypes(abstractInst)
  343. var field = newSym(skField, getIdent(g.cache, "fn"), idgen, owner, n.info, g.config.options)
  344. field.typ = argType
  345. discard objType.addField(field, g.cache, idgen)
  346. result.add newFastAsgnStmt(newDotExpr(scratchObj, field), n[0])
  347. fn = indirectAccess(castExpr, field, n.info)
  348. elif fn.kind == nkSym and fn.sym.kind == skIterator:
  349. localError(g.config, n.info, "iterator in spawn environment is not allowed")
  350. call.add(fn)
  351. var varSection = newNodeI(nkVarSection, n.info)
  352. var varInit = newNodeI(nkStmtList, n.info)
  353. if barrier.isNil:
  354. setupArgsForConcurrency(g, n, objType, idgen, wrapperProc, scratchObj, castExpr, call,
  355. varSection, varInit, result)
  356. else:
  357. setupArgsForParallelism(g, n, objType, idgen, wrapperProc, scratchObj, castExpr, call,
  358. varSection, varInit, result)
  359. var barrierAsExpr: PNode = nil
  360. if barrier != nil:
  361. let typ = newType(tyPtr, idgen, owner)
  362. typ.rawAddSon(magicsys.getCompilerProc(g, "Barrier").typ)
  363. var field = newSym(skField, getIdent(g.cache, "barrier"), idgen, owner, n.info, g.config.options)
  364. field.typ = typ
  365. discard objType.addField(field, g.cache, idgen)
  366. result.add newFastAsgnStmt(newDotExpr(scratchObj, field), barrier)
  367. barrierAsExpr = indirectAccess(castExpr, field, n.info)
  368. var fvField, fvAsExpr: PNode = nil
  369. if spawnKind == srFlowVar:
  370. var field = newSym(skField, getIdent(g.cache, "fv"), idgen, owner, n.info, g.config.options)
  371. field.typ = retType
  372. discard objType.addField(field, g.cache, idgen)
  373. fvField = newDotExpr(scratchObj, field)
  374. fvAsExpr = indirectAccess(castExpr, field, n.info)
  375. # create flowVar:
  376. result.add newFastAsgnStmt(fvField, callProc(spawnExpr[^1]))
  377. if barrier == nil:
  378. let castExpr = castToVoidPointer(g, n, fvField)
  379. result.add callCodegenProc(g, "nimFlowVarCreateSemaphore", fvField.info, castExpr)
  380. elif spawnKind == srByVar:
  381. var field = newSym(skField, getIdent(g.cache, "fv"), idgen, owner, n.info, g.config.options)
  382. field.typ = newType(tyPtr, idgen, objType.owner)
  383. field.typ.rawAddSon(retType)
  384. discard objType.addField(field, g.cache, idgen)
  385. fvAsExpr = indirectAccess(castExpr, field, n.info)
  386. result.add newFastAsgnStmt(newDotExpr(scratchObj, field), genAddrOf(dest, idgen))
  387. createTypeBoundOps(g, nil, objType, n.info, idgen)
  388. createWrapperProc(g, fn, threadParam, argsParam,
  389. varSection, varInit, call,
  390. barrierAsExpr, fvAsExpr, idgen, spawnKind, wrapperProc)
  391. result.add callCodegenProc(g, "nimSpawn" & $spawnExpr.len, wrapperProc.info,
  392. wrapperProc.newSymNode, genAddrOf(scratchObj.newSymNode, idgen), nil, spawnExpr)
  393. if spawnKind == srFlowVar: result.add fvField