actors.nim 7.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240
  1. #
  2. #
  3. # Nim's Runtime Library
  4. # (c) Copyright 2012 Andreas Rumpf
  5. #
  6. # See the file "copying.txt", included in this
  7. # distribution, for details about the copyright.
  8. #
  9. ## `Actor`:idx: support for Nim. An actor is implemented as a thread with
  10. ## a channel as its inbox. This module requires the ``--threads:on``
  11. ## command line switch.
  12. ##
  13. ## Example:
  14. ##
  15. ## .. code-block:: nim
  16. ##
  17. ## var
  18. ## a: ActorPool[int, void]
  19. ## createActorPool(a)
  20. ## for i in 0 ..< 300:
  21. ## a.spawn(i, proc (x: int) {.thread.} = echo x)
  22. ## a.join()
  23. ##
  24. ## **Note**: This whole module is deprecated. Use `threadpool` and ``spawn``
  25. ## instead.
  26. {.deprecated.}
  27. from os import sleep
  28. type
  29. Task*[In, Out] = object{.pure, final.} ## a task
  30. when Out isnot void:
  31. receiver*: ptr Channel[Out] ## the receiver channel of the response
  32. action*: proc (x: In): Out {.thread.} ## action to execute;
  33. ## sometimes useful
  34. shutDown*: bool ## set to tell an actor to shut-down
  35. data*: In ## the data to process
  36. Actor[In, Out] = object{.pure, final.}
  37. i: Channel[Task[In, Out]]
  38. t: Thread[ptr Actor[In, Out]]
  39. PActor*[In, Out] = ptr Actor[In, Out] ## an actor
  40. proc spawn*[In, Out](action: proc(
  41. self: PActor[In, Out]){.thread.}): PActor[In, Out] =
  42. ## creates an actor; that is a thread with an inbox. The caller MUST call
  43. ## ``join`` because that also frees the actor's associated resources.
  44. result = cast[PActor[In, Out]](allocShared0(sizeof(result[])))
  45. open(result.i)
  46. createThread(result.t, action, result)
  47. proc inbox*[In, Out](self: PActor[In, Out]): ptr Channel[In] =
  48. ## gets a pointer to the associated inbox of the actor `self`.
  49. result = addr(self.i)
  50. proc running*[In, Out](a: PActor[In, Out]): bool =
  51. ## returns true if the actor `a` is running.
  52. result = running(a.t)
  53. proc ready*[In, Out](a: PActor[In, Out]): bool =
  54. ## returns true if the actor `a` is ready to process new messages.
  55. result = ready(a.i)
  56. proc join*[In, Out](a: PActor[In, Out]) =
  57. ## joins an actor.
  58. joinThread(a.t)
  59. close(a.i)
  60. deallocShared(a)
  61. proc recv*[In, Out](a: PActor[In, Out]): Task[In, Out] =
  62. ## receives a task from `a`'s inbox.
  63. result = recv(a.i)
  64. proc send*[In, Out, X, Y](receiver: PActor[In, Out], msg: In,
  65. sender: PActor[X, Y]) =
  66. ## sends a message to `a`'s inbox.
  67. var t: Task[In, Out]
  68. t.receiver = addr(sender.i)
  69. shallowCopy(t.data, msg)
  70. send(receiver.i, t)
  71. proc send*[In, Out](receiver: PActor[In, Out], msg: In,
  72. sender: ptr Channel[Out] = nil) =
  73. ## sends a message to `receiver`'s inbox.
  74. var t: Task[In, Out]
  75. t.receiver = sender
  76. shallowCopy(t.data, msg)
  77. send(receiver.i, t)
  78. proc sendShutdown*[In, Out](receiver: PActor[In, Out]) =
  79. ## send a shutdown message to `receiver`.
  80. var t: Task[In, Out]
  81. t.shutdown = true
  82. send(receiver.i, t)
  83. proc reply*[In, Out](t: Task[In, Out], m: Out) =
  84. ## sends a message to io's output message box.
  85. when Out is void:
  86. {.error: "you cannot reply to a void outbox".}
  87. assert t.receiver != nil
  88. send(t.receiver[], m)
  89. # ----------------- actor pools ----------------------------------------------
  90. type
  91. ActorPool*[In, Out] = object{.pure, final.} ## an actor pool
  92. actors: seq[PActor[In, Out]]
  93. when Out isnot void:
  94. outputs: Channel[Out]
  95. proc `^`*[T](f: ptr Channel[T]): T =
  96. ## alias for 'recv'.
  97. result = recv(f[])
  98. proc poolWorker[In, Out](self: PActor[In, Out]) {.thread.} =
  99. while true:
  100. var m = self.recv
  101. if m.shutDown: break
  102. when Out is void:
  103. m.action(m.data)
  104. else:
  105. send(m.receiver[], m.action(m.data))
  106. #self.reply()
  107. proc createActorPool*[In, Out](a: var ActorPool[In, Out], poolSize = 4) =
  108. ## creates an actor pool.
  109. newSeq(a.actors, poolSize)
  110. when Out isnot void:
  111. open(a.outputs)
  112. for i in 0 ..< a.actors.len:
  113. a.actors[i] = spawn(poolWorker[In, Out])
  114. proc sync*[In, Out](a: var ActorPool[In, Out], polling=50) =
  115. ## waits for every actor of `a` to finish with its work. Currently this is
  116. ## implemented as polling every `polling` ms and has a slight chance
  117. ## of failing since we check for every actor to be in `ready` state and not
  118. ## for messages still in ether. This will change in a later
  119. ## version, however.
  120. var allReadyCount = 0
  121. while true:
  122. var wait = false
  123. for i in 0..high(a.actors):
  124. if not a.actors[i].i.ready:
  125. wait = true
  126. allReadyCount = 0
  127. break
  128. if not wait:
  129. # it's possible that some actor sent a message to some other actor but
  130. # both appeared to be non-working as the message takes some time to
  131. # arrive. We assume that this won't take longer than `polling` and
  132. # simply attempt a second time and declare victory then. ;-)
  133. inc allReadyCount
  134. if allReadyCount > 1: break
  135. sleep(polling)
  136. proc terminate*[In, Out](a: var ActorPool[In, Out]) =
  137. ## terminates each actor in the actor pool `a` and frees the
  138. ## resources attached to `a`.
  139. var t: Task[In, Out]
  140. t.shutdown = true
  141. for i in 0..<a.actors.len: send(a.actors[i].i, t)
  142. for i in 0..<a.actors.len: join(a.actors[i])
  143. when Out isnot void:
  144. close(a.outputs)
  145. a.actors = @[]
  146. proc join*[In, Out](a: var ActorPool[In, Out]) =
  147. ## short-cut for `sync` and then `terminate`.
  148. sync(a)
  149. terminate(a)
  150. template setupTask =
  151. t.action = action
  152. shallowCopy(t.data, input)
  153. template schedule =
  154. # extremely simple scheduler: We always try the first thread first, so that
  155. # it remains 'hot' ;-). Round-robin hurts for keeping threads hot.
  156. for i in 0..high(p.actors):
  157. if p.actors[i].i.ready:
  158. p.actors[i].i.send(t)
  159. return
  160. # no thread ready :-( --> send message to the thread which has the least
  161. # messages pending:
  162. var minIdx = -1
  163. var minVal = high(int)
  164. for i in 0..high(p.actors):
  165. var curr = p.actors[i].i.peek
  166. if curr == 0:
  167. # ok, is ready now:
  168. p.actors[i].i.send(t)
  169. return
  170. if curr < minVal and curr >= 0:
  171. minVal = curr
  172. minIdx = i
  173. if minIdx >= 0:
  174. p.actors[minIdx].i.send(t)
  175. else:
  176. raise newException(DeadThreadError, "cannot send message; thread died")
  177. proc spawn*[In, Out](p: var ActorPool[In, Out], input: In,
  178. action: proc (input: In): Out {.thread.}
  179. ): ptr Channel[Out] =
  180. ## uses the actor pool to run ``action(input)`` concurrently.
  181. ## `spawn` is guaranteed to not block.
  182. var t: Task[In, Out]
  183. setupTask()
  184. result = addr(p.outputs)
  185. t.receiver = result
  186. schedule()
  187. proc spawn*[In](p: var ActorPool[In, void], input: In,
  188. action: proc (input: In) {.thread.}) =
  189. ## uses the actor pool to run ``action(input)`` concurrently.
  190. ## `spawn` is guaranteed to not block.
  191. var t: Task[In, void]
  192. setupTask()
  193. schedule()
  194. when not defined(testing) and isMainModule:
  195. var
  196. a: ActorPool[int, void]
  197. createActorPool(a)
  198. for i in 0 ..< 300:
  199. a.spawn(i, proc (x: int) {.thread.} = echo x)
  200. when false:
  201. proc treeDepth(n: PNode): int {.thread.} =
  202. var x = a.spawn(treeDepth, n.le)
  203. var y = a.spawn(treeDepth, n.ri)
  204. result = max(^x, ^y) + 1
  205. a.join()