tioselectors.nim 20 KB


  1. discard """
  2. output: "All tests passed!"
  3. """
  4. import selectors
  5. const hasThreadSupport = compileOption("threads")
  6. template processTest(t, x: untyped) =
  7. #stdout.write(t)
  8. #stdout.flushFile()
  9. if not x: echo(t & " FAILED\r\n")
  10. when defined(macosx):
  11. echo "All tests passed!"
  12. elif not defined(windows):
  13. import os, posix, nativesockets, times
  14. when ioselSupportedPlatform:
  15. import osproc
  16. proc socket_notification_test(): bool =
  17. proc create_test_socket(): SocketHandle =
  18. var sock = posix.socket(posix.AF_INET, posix.SOCK_STREAM,
  19. posix.IPPROTO_TCP)
  20. var x: int = fcntl(sock, F_GETFL, 0)
  21. if x == -1: raiseOSError(osLastError())
  22. else:
  23. var mode = x or O_NONBLOCK
  24. if fcntl(sock, F_SETFL, mode) == -1:
  25. raiseOSError(osLastError())
  26. result = sock
  27. var client_message = "SERVER HELLO =>"
  28. var server_message = "CLIENT HELLO"
  29. var buffer : array[128, char]
  30. var selector = newSelector[int]()
  31. var client_socket = create_test_socket()
  32. var server_socket = create_test_socket()
  33. var option : int32 = 1
  34. if setsockopt(server_socket, cint(SOL_SOCKET), cint(SO_REUSEADDR),
  35. addr(option), sizeof(option).SockLen) < 0:
  36. raiseOSError(osLastError())
  37. var aiList = getAddrInfo("0.0.0.0", Port(13337))
  38. if bindAddr(server_socket, aiList.ai_addr,
  39. aiList.ai_addrlen.Socklen) < 0'i32:
  40. freeAddrInfo(aiList)
  41. raiseOSError(osLastError())
  42. if server_socket.listen() == -1:
  43. raiseOSError(osLastError())
  44. freeAddrInfo(aiList)
  45. aiList = getAddrInfo("127.0.0.1", Port(13337))
  46. discard posix.connect(client_socket, aiList.ai_addr,
  47. aiList.ai_addrlen.Socklen)
  48. registerHandle(selector, server_socket, {Event.Read}, 0)
  49. registerHandle(selector, client_socket, {Event.Write}, 0)
  50. freeAddrInfo(aiList)
  51. discard selector.select(100)
  52. var sockAddress: SockAddr
  53. var addrLen = sizeof(sockAddress).Socklen
  54. var server2_socket = accept(server_socket,
  55. cast[ptr SockAddr](addr(sockAddress)),
  56. addr(addrLen))
  57. assert(server2_socket != osInvalidSocket)
  58. selector.registerHandle(server2_socket, {Event.Read}, 0)
  59. if posix.send(client_socket, addr(client_message[0]),
  60. len(client_message), 0) == -1:
  61. raiseOSError(osLastError())
  62. selector.updateHandle(client_socket, {Event.Read})
  63. var rc2 = selector.select(100)
  64. assert(len(rc2) == 1)
  65. var read_count = posix.recv(server2_socket, addr buffer[0], 128, 0)
  66. if read_count == -1:
  67. raiseOSError(osLastError())
  68. assert(read_count == len(client_message))
  69. var test1 = true
  70. for i in 0..<read_count:
  71. if client_message[i] != buffer[i]:
  72. test1 = false
  73. break
  74. assert(test1)
  75. selector.updateHandle(server2_socket, {Event.Write})
  76. var rc3 = selector.select(0)
  77. assert(len(rc3) == 1)
  78. if posix.send(server2_socket, addr(server_message[0]),
  79. len(server_message), 0) == -1:
  80. raiseOSError(osLastError())
  81. selector.updateHandle(server2_socket, {Event.Read})
  82. var rc4 = selector.select(100)
  83. assert(len(rc4) == 1)
  84. read_count = posix.recv(client_socket, addr(buffer[0]), 128, 0)
  85. if read_count == -1:
  86. raiseOSError(osLastError())
  87. assert(read_count == len(server_message))
  88. var test2 = true
  89. for i in 0..<read_count:
  90. if server_message[i] != buffer[i]:
  91. test2 = false
  92. break
  93. assert(test2)
  94. selector.unregister(server_socket)
  95. selector.unregister(server2_socket)
  96. selector.unregister(client_socket)
  97. discard posix.close(server_socket)
  98. discard posix.close(server2_socket)
  99. discard posix.close(client_socket)
  100. assert(selector.isEmpty())
  101. close(selector)
  102. result = true
  103. proc event_notification_test(): bool =
  104. var selector = newSelector[int]()
  105. var event = newSelectEvent()
  106. selector.registerEvent(event, 1)
  107. var rc0 = selector.select(0)
  108. event.setEvent()
  109. var rc1 = selector.select(0)
  110. event.setEvent()
  111. var rc2 = selector.select(0)
  112. var rc3 = selector.select(0)
  113. assert(len(rc0) == 0 and len(rc1) == 1 and len(rc2) == 1 and len(rc3) == 0)
  114. var ev1 = selector.getData(rc1[0].fd)
  115. var ev2 = selector.getData(rc2[0].fd)
  116. assert(ev1 == 1 and ev2 == 1)
  117. selector.unregister(event)
  118. event.close()
  119. assert(selector.isEmpty())
  120. selector.close()
  121. result = true
  122. when ioselSupportedPlatform:
  123. proc timer_notification_test(): bool =
  124. var selector = newSelector[int]()
  125. var timer = selector.registerTimer(100, false, 0)
  126. var rc1 = selector.select(140)
  127. var rc2 = selector.select(140)
  128. assert(len(rc1) == 1 and len(rc2) == 1)
  129. selector.unregister(timer)
  130. discard selector.select(0)
  131. selector.registerTimer(100, true, 0)
  132. var rc4 = selector.select(120)
  133. var rc5 = selector.select(120)
  134. assert(len(rc4) == 1 and len(rc5) == 0)
  135. assert(selector.isEmpty())
  136. selector.close()
  137. result = true
  138. proc process_notification_test(): bool =
  139. var selector = newSelector[int]()
  140. var process2 = startProcess("sleep", "", ["2"], nil,
  141. {poStdErrToStdOut, poUsePath})
  142. discard startProcess("sleep", "", ["1"], nil,
  143. {poStdErrToStdOut, poUsePath})
  144. selector.registerProcess(process2.processID, 0)
  145. var rc1 = selector.select(1500)
  146. var rc2 = selector.select(1500)
  147. var r = len(rc1) + len(rc2)
  148. assert(r == 1)
  149. result = true
  150. proc signal_notification_test(): bool =
  151. var sigset1n, sigset1o, sigset2n, sigset2o: Sigset
  152. var pid = posix.getpid()
  153. discard sigemptyset(sigset1n)
  154. discard sigemptyset(sigset1o)
  155. discard sigemptyset(sigset2n)
  156. discard sigemptyset(sigset2o)
  157. when hasThreadSupport:
  158. if pthread_sigmask(SIG_BLOCK, sigset1n, sigset1o) == -1:
  159. raiseOSError(osLastError())
  160. else:
  161. if sigprocmask(SIG_BLOCK, sigset1n, sigset1o) == -1:
  162. raiseOSError(osLastError())
  163. var selector = newSelector[int]()
  164. var s1 = selector.registerSignal(SIGUSR1, 1)
  165. var s2 = selector.registerSignal(SIGUSR2, 2)
  166. var s3 = selector.registerSignal(SIGTERM, 3)
  167. discard selector.select(0)
  168. discard posix.kill(pid, SIGUSR1)
  169. discard posix.kill(pid, SIGUSR2)
  170. discard posix.kill(pid, SIGTERM)
  171. var rc = selector.select(0)
  172. var cd0 = selector.getData(rc[0].fd)
  173. var cd1 = selector.getData(rc[1].fd)
  174. var cd2 = selector.getData(rc[2].fd)
  175. selector.unregister(s1)
  176. selector.unregister(s2)
  177. selector.unregister(s3)
  178. when hasThreadSupport:
  179. if pthread_sigmask(SIG_BLOCK, sigset2n, sigset2o) == -1:
  180. raiseOSError(osLastError())
  181. else:
  182. if sigprocmask(SIG_BLOCK, sigset2n, sigset2o) == -1:
  183. raiseOSError(osLastError())
  184. assert(len(rc) == 3)
  185. assert(cd0 + cd1 + cd2 == 6, $(cd0 + cd1 + cd2)) # 1 + 2 + 3
  186. assert(equalMem(addr sigset1o, addr sigset2o, sizeof(Sigset)))
  187. assert(selector.isEmpty())
  188. result = true
  189. when defined(macosx) or defined(freebsd) or defined(openbsd) or
  190. defined(netbsd):
  191. proc rename(frompath: cstring, topath: cstring): cint
  192. {.importc: "rename", header: "<stdio.h>".}
  193. proc createFile(name: string): cint =
  194. result = posix.open(cstring(name), posix.O_CREAT or posix.O_RDWR)
  195. if result == -1:
  196. raiseOsError(osLastError())
  197. proc writeFile(name: string, data: string) =
  198. let fd = posix.open(cstring(name), posix.O_APPEND or posix.O_RDWR)
  199. if fd == -1:
  200. raiseOsError(osLastError())
  201. let length = len(data).cint
  202. if posix.write(fd, cast[pointer](unsafeAddr data[0]),
  203. len(data).cint) != length:
  204. raiseOsError(osLastError())
  205. if posix.close(fd) == -1:
  206. raiseOsError(osLastError())
  207. proc closeFile(fd: cint) =
  208. if posix.close(fd) == -1:
  209. raiseOsError(osLastError())
  210. proc removeFile(name: string) =
  211. let err = posix.unlink(cstring(name))
  212. if err == -1:
  213. raiseOsError(osLastError())
  214. proc createDir(name: string) =
  215. let err = posix.mkdir(cstring(name), 0x1FF)
  216. if err == -1:
  217. raiseOsError(osLastError())
  218. proc removeDir(name: string) =
  219. let err = posix.rmdir(cstring(name))
  220. if err == -1:
  221. raiseOsError(osLastError())
  222. proc chmodPath(name: string, mode: cint) =
  223. let err = posix.chmod(cstring(name), Mode(mode))
  224. if err == -1:
  225. raiseOsError(osLastError())
  226. proc renameFile(names: string, named: string) =
  227. let err = rename(cstring(names), cstring(named))
  228. if err == -1:
  229. raiseOsError(osLastError())
  230. proc symlink(names: string, named: string) =
  231. let err = posix.symlink(cstring(names), cstring(named))
  232. if err == -1:
  233. raiseOsError(osLastError())
  234. proc openWatch(name: string): cint =
  235. result = posix.open(cstring(name), posix.O_RDONLY)
  236. if result == -1:
  237. raiseOsError(osLastError())
  238. const
  239. testDirectory = "/tmp/kqtest"
  240. type
  241. valType = object
  242. fd: cint
  243. events: set[Event]
  244. proc vnode_test(): bool =
  245. proc validate(test: openarray[ReadyKey],
  246. check: openarray[valType]): bool =
  247. result = false
  248. if len(test) == len(check):
  249. for checkItem in check:
  250. result = false
  251. for testItem in test:
  252. if testItem.fd == checkItem.fd and
  253. checkItem.events <= testItem.events:
  254. result = true
  255. break
  256. if not result:
  257. break
  258. var res: seq[ReadyKey]
  259. var selector = newSelector[int]()
  260. var events = {Event.VnodeWrite, Event.VnodeDelete, Event.VnodeExtend,
  261. Event.VnodeAttrib, Event.VnodeLink, Event.VnodeRename,
  262. Event.VnodeRevoke}
  263. result = true
  264. discard posix.unlink(testDirectory)
  265. createDir(testDirectory)
  266. var dirfd = posix.open(cstring(testDirectory), posix.O_RDONLY)
  267. if dirfd == -1:
  268. raiseOsError(osLastError())
  269. selector.registerVnode(dirfd, events, 1)
  270. discard selector.select(0)
  271. # chmod testDirectory to 0777
  272. chmodPath(testDirectory, 0x1FF)
  273. res = selector.select(0)
  274. doAssert(len(res) == 1)
  275. doAssert(len(selector.select(0)) == 0)
  276. doAssert(res[0].fd == dirfd and
  277. {Event.Vnode, Event.VnodeAttrib} <= res[0].events)
  278. # create subdirectory
  279. createDir(testDirectory & "/test")
  280. res = selector.select(0)
  281. doAssert(len(res) == 1)
  282. doAssert(len(selector.select(0)) == 0)
  283. doAssert(res[0].fd == dirfd and
  284. {Event.Vnode, Event.VnodeWrite,
  285. Event.VnodeLink} <= res[0].events)
  286. # open test directory for watching
  287. var testfd = openWatch(testDirectory & "/test")
  288. selector.registerVnode(testfd, events, 2)
  289. doAssert(len(selector.select(0)) == 0)
  290. # rename test directory
  291. renameFile(testDirectory & "/test", testDirectory & "/renamed")
  292. res = selector.select(0)
  293. doAssert(len(res) == 2)
  294. doAssert(len(selector.select(0)) == 0)
  295. doAssert(validate(res,
  296. [valType(fd: dirfd, events: {Event.Vnode, Event.VnodeWrite}),
  297. valType(fd: testfd,
  298. events: {Event.Vnode, Event.VnodeRename})])
  299. )
  300. # remove test directory
  301. removeDir(testDirectory & "/renamed")
  302. res = selector.select(0)
  303. doAssert(len(res) == 2)
  304. doAssert(len(selector.select(0)) == 0)
  305. doAssert(validate(res,
  306. [valType(fd: dirfd, events: {Event.Vnode, Event.VnodeWrite,
  307. Event.VnodeLink}),
  308. valType(fd: testfd,
  309. events: {Event.Vnode, Event.VnodeDelete})])
  310. )
  311. # create file new test file
  312. testfd = createFile(testDirectory & "/testfile")
  313. res = selector.select(0)
  314. doAssert(len(res) == 1)
  315. doAssert(len(selector.select(0)) == 0)
  316. doAssert(res[0].fd == dirfd and
  317. {Event.Vnode, Event.VnodeWrite} <= res[0].events)
  318. # close new test file
  319. closeFile(testfd)
  320. doAssert(len(selector.select(0)) == 0)
  321. doAssert(len(selector.select(0)) == 0)
  322. # chmod test file with 0666
  323. chmodPath(testDirectory & "/testfile", 0x1B6)
  324. doAssert(len(selector.select(0)) == 0)
  325. testfd = openWatch(testDirectory & "/testfile")
  326. selector.registerVnode(testfd, events, 1)
  327. discard selector.select(0)
  328. # write data to test file
  329. writeFile(testDirectory & "/testfile", "TESTDATA")
  330. res = selector.select(0)
  331. doAssert(len(res) == 1)
  332. doAssert(len(selector.select(0)) == 0)
  333. doAssert(res[0].fd == testfd and
  334. {Event.Vnode, Event.VnodeWrite,
  335. Event.VnodeExtend} <= res[0].events)
  336. # symlink test file
  337. symlink(testDirectory & "/testfile", testDirectory & "/testlink")
  338. res = selector.select(0)
  339. doAssert(len(res) == 1)
  340. doAssert(len(selector.select(0)) == 0)
  341. doAssert(res[0].fd == dirfd and
  342. {Event.Vnode, Event.VnodeWrite} <= res[0].events)
  343. # remove test file
  344. removeFile(testDirectory & "/testfile")
  345. res = selector.select(0)
  346. doAssert(len(res) == 2)
  347. doAssert(len(selector.select(0)) == 0)
  348. doAssert(validate(res,
  349. [valType(fd: testfd, events: {Event.Vnode, Event.VnodeDelete}),
  350. valType(fd: dirfd, events: {Event.Vnode, Event.VnodeWrite})])
  351. )
  352. # remove symlink
  353. removeFile(testDirectory & "/testlink")
  354. res = selector.select(0)
  355. doAssert(len(res) == 1)
  356. doAssert(len(selector.select(0)) == 0)
  357. doAssert(res[0].fd == dirfd and
  358. {Event.Vnode, Event.VnodeWrite} <= res[0].events)
  359. # remove testDirectory
  360. removeDir(testDirectory)
  361. res = selector.select(0)
  362. doAssert(len(res) == 1)
  363. doAssert(len(selector.select(0)) == 0)
  364. doAssert(res[0].fd == dirfd and
  365. {Event.Vnode, Event.VnodeDelete} <= res[0].events)
  366. when hasThreadSupport:
  367. var counter = 0
  368. proc event_wait_thread(event: SelectEvent) {.thread.} =
  369. var selector = newSelector[int]()
  370. selector.registerEvent(event, 1)
  371. var rc = selector.select(1000)
  372. if len(rc) == 1:
  373. inc(counter)
  374. selector.unregister(event)
  375. assert(selector.isEmpty())
  376. proc mt_event_test(): bool =
  377. var
  378. thr: array[0..7, Thread[SelectEvent]]
  379. var selector = newSelector[int]()
  380. var sock = newNativeSocket()
  381. var event = newSelectEvent()
  382. for i in 0..high(thr):
  383. createThread(thr[i], event_wait_thread, event)
  384. selector.registerHandle(sock, {Event.Read}, 1)
  385. discard selector.select(500)
  386. selector.unregister(sock)
  387. event.setEvent()
  388. joinThreads(thr)
  389. assert(counter == 1)
  390. result = true
  391. processTest("Socket notification test...", socket_notification_test())
  392. processTest("User event notification test...", event_notification_test())
  393. when hasThreadSupport:
  394. processTest("Multithreaded user event notification test...",
  395. mt_event_test())
  396. when ioselSupportedPlatform:
  397. processTest("Timer notification test...", timer_notification_test())
  398. processTest("Process notification test...", process_notification_test())
  399. processTest("Signal notification test...", signal_notification_test())
  400. when defined(macosx) or defined(freebsd) or defined(openbsd) or
  401. defined(netbsd):
  402. processTest("File notification test...", vnode_test())
  403. echo("All tests passed!")
  404. else:
  405. import nativesockets, winlean, os, osproc
  406. proc socket_notification_test(): bool =
  407. proc create_test_socket(): SocketHandle =
  408. var sock = newNativeSocket()
  409. setBlocking(sock, false)
  410. result = sock
  411. var client_message = "SERVER HELLO =>"
  412. var server_message = "CLIENT HELLO"
  413. var buffer : array[128, char]
  414. var selector = newSelector[int]()
  415. var client_socket = create_test_socket()
  416. var server_socket = create_test_socket()
  417. selector.registerHandle(server_socket, {Event.Read}, 0)
  418. selector.registerHandle(client_socket, {Event.Write}, 0)
  419. var option : int32 = 1
  420. if setsockopt(server_socket, cint(SOL_SOCKET), cint(SO_REUSEADDR),
  421. addr(option), sizeof(option).SockLen) < 0:
  422. raiseOSError(osLastError())
  423. var aiList = getAddrInfo("0.0.0.0", Port(13337))
  424. if bindAddr(server_socket, aiList.ai_addr,
  425. aiList.ai_addrlen.Socklen) < 0'i32:
  426. freeAddrInfo(aiList)
  427. raiseOSError(osLastError())
  428. discard server_socket.listen()
  429. freeAddrInfo(aiList)
  430. aiList = getAddrInfo("127.0.0.1", Port(13337))
  431. discard connect(client_socket, aiList.ai_addr,
  432. aiList.ai_addrlen.Socklen)
  433. freeAddrInfo(aiList)
  434. # for some reason Windows select doesn't return both
  435. # descriptors from first call, so we need to make 2 calls
  436. var n = 0
  437. var rcm = selector.select(1000)
  438. while n < 10 and len(rcm) < 2:
  439. sleep(1000)
  440. rcm = selector.select(1000)
  441. inc(n)
  442. assert(len(rcm) == 2)
  443. var sockAddress = SockAddr()
  444. var addrLen = sizeof(sockAddress).Socklen
  445. var server2_socket = accept(server_socket,
  446. cast[ptr SockAddr](addr(sockAddress)),
  447. addr(addrLen))
  448. assert(server2_socket != osInvalidSocket)
  449. selector.registerHandle(server2_socket, {Event.Read}, 0)
  450. if send(client_socket, cast[pointer](addr(client_message[0])),
  451. cint(len(client_message)), 0) == -1:
  452. raiseOSError(osLastError())
  453. selector.updateHandle(client_socket, {Event.Read})
  454. var rc2 = selector.select(1000)
  455. assert(len(rc2) == 1)
  456. var read_count = recv(server2_socket, addr buffer[0], 128, 0)
  457. if read_count == -1:
  458. raiseOSError(osLastError())
  459. assert(read_count == len(client_message))
  460. var test1 = true
  461. for i in 0..<read_count:
  462. if client_message[i] != buffer[i]:
  463. test1 = false
  464. break
  465. assert(test1)
  466. if send(server2_socket, cast[pointer](addr(server_message[0])),
  467. cint(len(server_message)), 0) == -1:
  468. raiseOSError(osLastError())
  469. var rc3 = selector.select(0)
  470. assert(len(rc3) == 1)
  471. read_count = recv(client_socket, addr(buffer[0]), 128, 0)
  472. if read_count == -1:
  473. raiseOSError(osLastError())
  474. assert(read_count == len(server_message))
  475. var test2 = true
  476. for i in 0..<read_count:
  477. if server_message[i] != buffer[i]:
  478. test2 = false
  479. break
  480. assert(test2)
  481. selector.unregister(server_socket)
  482. selector.unregister(server2_socket)
  483. selector.unregister(client_socket)
  484. close(server_socket)
  485. close(server2_socket)
  486. close(client_socket)
  487. assert(selector.isEmpty())
  488. close(selector)
  489. result = true
  490. proc event_notification_test(): bool =
  491. var selector = newSelector[int]()
  492. var event = newSelectEvent()
  493. selector.registerEvent(event, 1)
  494. discard selector.select(0)
  495. event.trigger()
  496. var rc1 = selector.select(0)
  497. event.trigger()
  498. var rc2 = selector.select(0)
  499. var rc3 = selector.select(0)
  500. assert(len(rc1) == 1 and len(rc2) == 1 and len(rc3) == 0)
  501. var ev1 = selector.getData(rc1[0].fd)
  502. var ev2 = selector.getData(rc2[0].fd)
  503. assert(ev1 == 1 and ev2 == 1)
  504. selector.unregister(event)
  505. event.close()
  506. assert(selector.isEmpty())
  507. selector.close()
  508. result = true
  509. when hasThreadSupport:
  510. var counter = 0
  511. proc event_wait_thread(event: SelectEvent) {.thread.} =
  512. var selector = newSelector[int]()
  513. selector.registerEvent(event, 1)
  514. var rc = selector.select(1500)
  515. if len(rc) == 1:
  516. inc(counter)
  517. selector.unregister(event)
  518. assert(selector.isEmpty())
  519. proc mt_event_test(): bool =
  520. var thr: array[0..7, Thread[SelectEvent]]
  521. var event = newSelectEvent()
  522. for i in 0..high(thr):
  523. createThread(thr[i], event_wait_thread, event)
  524. event.trigger()
  525. joinThreads(thr)
  526. assert(counter == 1)
  527. result = true
  528. processTest("Socket notification test...", socket_notification_test())
  529. processTest("User event notification test...", event_notification_test())
  530. when hasThreadSupport:
  531. processTest("Multithreaded user event notification test...",
  532. mt_event_test())
  533. echo("All tests passed!")