multiwait.c 23 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786
  1. /* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
  2. /* This Source Code Form is subject to the terms of the Mozilla Public
  3. * License, v. 2.0. If a copy of the MPL was not distributed with this
  4. * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
  5. #include "prio.h"
  6. #include "prprf.h"
  7. #include "prlog.h"
  8. #include "prmem.h"
  9. #include "pratom.h"
  10. #include "prlock.h"
  11. #include "prmwait.h"
  12. #include "prclist.h"
  13. #include "prerror.h"
  14. #include "prinrval.h"
  15. #include "prnetdb.h"
  16. #include "prthread.h"
  17. #include "plstr.h"
  18. #include "plerror.h"
  19. #include "plgetopt.h"
  20. #include <string.h>
  21. typedef struct Shared
  22. {
  23. const char *title;
  24. PRLock *list_lock;
  25. PRWaitGroup *group;
  26. PRIntervalTime timeout;
  27. } Shared;
  28. typedef enum Verbosity {silent, quiet, chatty, noisy} Verbosity;
  29. #ifdef DEBUG
  30. #define PORT_INC_DO +100
  31. #else
  32. #define PORT_INC_DO
  33. #endif
  34. #ifdef IS_64
  35. #define PORT_INC_3264 +200
  36. #else
  37. #define PORT_INC_3264
  38. #endif
  39. static PRFileDesc *debug = NULL;
  40. static PRInt32 desc_allocated = 0;
  41. static PRUint16 default_port = 12273 PORT_INC_DO PORT_INC_3264;
  42. static enum Verbosity verbosity = quiet;
  43. static PRInt32 ops_required = 1000, ops_done = 0;
  44. static PRThreadScope thread_scope = PR_LOCAL_THREAD;
  45. static PRIntn client_threads = 20, worker_threads = 2, wait_objects = 50;
  46. #if defined(DEBUG)
  47. #define MW_ASSERT(_expr) \
  48. ((_expr)?((void)0):_MW_Assert(# _expr,__FILE__,__LINE__))
  49. static void _MW_Assert(const char *s, const char *file, PRIntn ln)
  50. {
  51. if (NULL != debug) {
  52. PL_FPrintError(debug, NULL);
  53. }
  54. PR_Assert(s, file, ln);
  55. } /* _MW_Assert */
  56. #else
  57. #define MW_ASSERT(_expr)
  58. #endif
  59. static void PrintRecvDesc(PRRecvWait *desc, const char *msg)
  60. {
  61. const char *tag[] = {
  62. "PR_MW_INTERRUPT", "PR_MW_TIMEOUT",
  63. "PR_MW_FAILURE", "PR_MW_SUCCESS", "PR_MW_PENDING"
  64. };
  65. PR_fprintf(
  66. debug, "%s: PRRecvWait(@0x%x): {fd: 0x%x, outcome: %s, tmo: %u}\n",
  67. msg, desc, desc->fd, tag[desc->outcome + 3], desc->timeout);
  68. } /* PrintRecvDesc */
  69. static Shared *MakeShared(const char *title)
  70. {
  71. Shared *shared = PR_NEWZAP(Shared);
  72. shared->group = PR_CreateWaitGroup(1);
  73. shared->timeout = PR_SecondsToInterval(1);
  74. shared->list_lock = PR_NewLock();
  75. shared->title = title;
  76. return shared;
  77. } /* MakeShared */
  78. static void DestroyShared(Shared *shared)
  79. {
  80. PRStatus rv;
  81. if (verbosity > quiet) {
  82. PR_fprintf(debug, "%s: destroying group\n", shared->title);
  83. }
  84. rv = PR_DestroyWaitGroup(shared->group);
  85. MW_ASSERT(PR_SUCCESS == rv);
  86. PR_DestroyLock(shared->list_lock);
  87. PR_DELETE(shared);
  88. } /* DestroyShared */
  89. static PRRecvWait *CreateRecvWait(PRFileDesc *fd, PRIntervalTime timeout)
  90. {
  91. PRRecvWait *desc_out = PR_NEWZAP(PRRecvWait);
  92. MW_ASSERT(NULL != desc_out);
  93. MW_ASSERT(NULL != fd);
  94. desc_out->fd = fd;
  95. desc_out->timeout = timeout;
  96. desc_out->buffer.length = 120;
  97. desc_out->buffer.start = PR_CALLOC(120);
  98. PR_AtomicIncrement(&desc_allocated);
  99. if (verbosity > chatty) {
  100. PrintRecvDesc(desc_out, "Allocated");
  101. }
  102. return desc_out;
  103. } /* CreateRecvWait */
  104. static void DestroyRecvWait(PRRecvWait *desc_out)
  105. {
  106. if (verbosity > chatty) {
  107. PrintRecvDesc(desc_out, "Destroying");
  108. }
  109. PR_Close(desc_out->fd);
  110. if (NULL != desc_out->buffer.start) {
  111. PR_DELETE(desc_out->buffer.start);
  112. }
  113. PR_Free(desc_out);
  114. (void)PR_AtomicDecrement(&desc_allocated);
  115. } /* DestroyRecvWait */
  116. static void CancelGroup(Shared *shared)
  117. {
  118. PRRecvWait *desc_out;
  119. if (verbosity > quiet) {
  120. PR_fprintf(debug, "%s Reclaiming wait descriptors\n", shared->title);
  121. }
  122. do
  123. {
  124. desc_out = PR_CancelWaitGroup(shared->group);
  125. if (NULL != desc_out) {
  126. DestroyRecvWait(desc_out);
  127. }
  128. } while (NULL != desc_out);
  129. MW_ASSERT(0 == desc_allocated);
  130. MW_ASSERT(PR_GROUP_EMPTY_ERROR == PR_GetError());
  131. } /* CancelGroup */
  132. static void PR_CALLBACK ClientThread(void* arg)
  133. {
  134. PRStatus rv;
  135. PRInt32 bytes;
  136. PRIntn empty_flags = 0;
  137. PRNetAddr server_address;
  138. unsigned char buffer[100];
  139. Shared *shared = (Shared*)arg;
  140. PRFileDesc *server = PR_NewTCPSocket();
  141. if ((NULL == server)
  142. && (PR_PENDING_INTERRUPT_ERROR == PR_GetError())) {
  143. return;
  144. }
  145. MW_ASSERT(NULL != server);
  146. if (verbosity > chatty) {
  147. PR_fprintf(debug, "%s: Server socket @0x%x\n", shared->title, server);
  148. }
  149. /* Initialize the buffer so that Purify won't complain */
  150. memset(buffer, 0, sizeof(buffer));
  151. rv = PR_InitializeNetAddr(PR_IpAddrLoopback, default_port, &server_address);
  152. MW_ASSERT(PR_SUCCESS == rv);
  153. if (verbosity > quiet) {
  154. PR_fprintf(debug, "%s: Client opening connection\n", shared->title);
  155. }
  156. rv = PR_Connect(server, &server_address, PR_INTERVAL_NO_TIMEOUT);
  157. if (PR_FAILURE == rv)
  158. {
  159. if (verbosity > silent) {
  160. PL_FPrintError(debug, "Client connect failed");
  161. }
  162. return;
  163. }
  164. while (ops_done < ops_required)
  165. {
  166. bytes = PR_Send(
  167. server, buffer, sizeof(buffer), empty_flags, PR_INTERVAL_NO_TIMEOUT);
  168. if ((-1 == bytes) && (PR_PENDING_INTERRUPT_ERROR == PR_GetError())) {
  169. break;
  170. }
  171. MW_ASSERT(sizeof(buffer) == bytes);
  172. if (verbosity > chatty)
  173. PR_fprintf(
  174. debug, "%s: Client sent %d bytes\n",
  175. shared->title, sizeof(buffer));
  176. bytes = PR_Recv(
  177. server, buffer, sizeof(buffer), empty_flags, PR_INTERVAL_NO_TIMEOUT);
  178. if (verbosity > chatty)
  179. PR_fprintf(
  180. debug, "%s: Client received %d bytes\n",
  181. shared->title, sizeof(buffer));
  182. if ((-1 == bytes) && (PR_PENDING_INTERRUPT_ERROR == PR_GetError())) {
  183. break;
  184. }
  185. MW_ASSERT(sizeof(buffer) == bytes);
  186. PR_Sleep(shared->timeout);
  187. }
  188. rv = PR_Close(server);
  189. MW_ASSERT(PR_SUCCESS == rv);
  190. } /* ClientThread */
  191. static void OneInThenCancelled(Shared *shared)
  192. {
  193. PRStatus rv;
  194. PRRecvWait *desc_out, *desc_in = PR_NEWZAP(PRRecvWait);
  195. shared->timeout = PR_INTERVAL_NO_TIMEOUT;
  196. desc_in->fd = PR_NewTCPSocket();
  197. desc_in->timeout = shared->timeout;
  198. if (verbosity > chatty) {
  199. PrintRecvDesc(desc_in, "Adding desc");
  200. }
  201. rv = PR_AddWaitFileDesc(shared->group, desc_in);
  202. MW_ASSERT(PR_SUCCESS == rv);
  203. if (verbosity > chatty) {
  204. PrintRecvDesc(desc_in, "Cancelling");
  205. }
  206. rv = PR_CancelWaitFileDesc(shared->group, desc_in);
  207. MW_ASSERT(PR_SUCCESS == rv);
  208. desc_out = PR_WaitRecvReady(shared->group);
  209. MW_ASSERT(desc_out == desc_in);
  210. MW_ASSERT(PR_MW_INTERRUPT == desc_out->outcome);
  211. MW_ASSERT(PR_PENDING_INTERRUPT_ERROR == PR_GetError());
  212. if (verbosity > chatty) {
  213. PrintRecvDesc(desc_out, "Ready");
  214. }
  215. rv = PR_Close(desc_in->fd);
  216. MW_ASSERT(PR_SUCCESS == rv);
  217. if (verbosity > quiet) {
  218. PR_fprintf(debug, "%s: destroying group\n", shared->title);
  219. }
  220. PR_DELETE(desc_in);
  221. } /* OneInThenCancelled */
  222. static void OneOpOneThread(Shared *shared)
  223. {
  224. PRStatus rv;
  225. PRRecvWait *desc_out, *desc_in = PR_NEWZAP(PRRecvWait);
  226. desc_in->fd = PR_NewTCPSocket();
  227. desc_in->timeout = shared->timeout;
  228. if (verbosity > chatty) {
  229. PrintRecvDesc(desc_in, "Adding desc");
  230. }
  231. rv = PR_AddWaitFileDesc(shared->group, desc_in);
  232. MW_ASSERT(PR_SUCCESS == rv);
  233. desc_out = PR_WaitRecvReady(shared->group);
  234. MW_ASSERT(desc_out == desc_in);
  235. MW_ASSERT(PR_MW_TIMEOUT == desc_out->outcome);
  236. MW_ASSERT(PR_IO_TIMEOUT_ERROR == PR_GetError());
  237. if (verbosity > chatty) {
  238. PrintRecvDesc(desc_out, "Ready");
  239. }
  240. rv = PR_Close(desc_in->fd);
  241. MW_ASSERT(PR_SUCCESS == rv);
  242. PR_DELETE(desc_in);
  243. } /* OneOpOneThread */
  244. static void ManyOpOneThread(Shared *shared)
  245. {
  246. PRStatus rv;
  247. PRIntn index;
  248. PRRecvWait *desc_in;
  249. PRRecvWait *desc_out;
  250. if (verbosity > quiet) {
  251. PR_fprintf(debug, "%s: adding %d descs\n", shared->title, wait_objects);
  252. }
  253. for (index = 0; index < wait_objects; ++index)
  254. {
  255. desc_in = CreateRecvWait(PR_NewTCPSocket(), shared->timeout);
  256. rv = PR_AddWaitFileDesc(shared->group, desc_in);
  257. MW_ASSERT(PR_SUCCESS == rv);
  258. }
  259. while (ops_done < ops_required)
  260. {
  261. desc_out = PR_WaitRecvReady(shared->group);
  262. MW_ASSERT(PR_MW_TIMEOUT == desc_out->outcome);
  263. MW_ASSERT(PR_IO_TIMEOUT_ERROR == PR_GetError());
  264. if (verbosity > chatty) {
  265. PrintRecvDesc(desc_out, "Ready/readding");
  266. }
  267. rv = PR_AddWaitFileDesc(shared->group, desc_out);
  268. MW_ASSERT(PR_SUCCESS == rv);
  269. (void)PR_AtomicIncrement(&ops_done);
  270. }
  271. CancelGroup(shared);
  272. } /* ManyOpOneThread */
  273. static void PR_CALLBACK SomeOpsThread(void *arg)
  274. {
  275. PRRecvWait *desc_out;
  276. PRStatus rv = PR_SUCCESS;
  277. Shared *shared = (Shared*)arg;
  278. do /* until interrupted */
  279. {
  280. desc_out = PR_WaitRecvReady(shared->group);
  281. if (NULL == desc_out)
  282. {
  283. MW_ASSERT(PR_PENDING_INTERRUPT_ERROR == PR_GetError());
  284. if (verbosity > quiet) {
  285. PR_fprintf(debug, "Aborted\n");
  286. }
  287. break;
  288. }
  289. MW_ASSERT(PR_MW_TIMEOUT == desc_out->outcome);
  290. MW_ASSERT(PR_IO_TIMEOUT_ERROR == PR_GetError());
  291. if (verbosity > chatty) {
  292. PrintRecvDesc(desc_out, "Ready");
  293. }
  294. if (verbosity > chatty) {
  295. PrintRecvDesc(desc_out, "Re-Adding");
  296. }
  297. desc_out->timeout = shared->timeout;
  298. rv = PR_AddWaitFileDesc(shared->group, desc_out);
  299. PR_AtomicIncrement(&ops_done);
  300. if (ops_done > ops_required) {
  301. break;
  302. }
  303. } while (PR_SUCCESS == rv);
  304. MW_ASSERT(PR_SUCCESS == rv);
  305. } /* SomeOpsThread */
  306. static void SomeOpsSomeThreads(Shared *shared)
  307. {
  308. PRStatus rv;
  309. PRThread **thread;
  310. PRIntn index;
  311. PRRecvWait *desc_in;
  312. thread = (PRThread**)PR_CALLOC(sizeof(PRThread*) * worker_threads);
  313. /* Create some threads */
  314. if (verbosity > quiet) {
  315. PR_fprintf(debug, "%s: creating threads\n", shared->title);
  316. }
  317. for (index = 0; index < worker_threads; ++index)
  318. {
  319. thread[index] = PR_CreateThread(
  320. PR_USER_THREAD, SomeOpsThread, shared,
  321. PR_PRIORITY_HIGH, thread_scope,
  322. PR_JOINABLE_THREAD, 16 * 1024);
  323. }
  324. /* then create some operations */
  325. if (verbosity > quiet) {
  326. PR_fprintf(debug, "%s: creating desc\n", shared->title);
  327. }
  328. for (index = 0; index < wait_objects; ++index)
  329. {
  330. desc_in = CreateRecvWait(PR_NewTCPSocket(), shared->timeout);
  331. rv = PR_AddWaitFileDesc(shared->group, desc_in);
  332. MW_ASSERT(PR_SUCCESS == rv);
  333. }
  334. if (verbosity > quiet) {
  335. PR_fprintf(debug, "%s: sleeping\n", shared->title);
  336. }
  337. while (ops_done < ops_required) {
  338. PR_Sleep(shared->timeout);
  339. }
  340. if (verbosity > quiet) {
  341. PR_fprintf(debug, "%s: interrupting/joining threads\n", shared->title);
  342. }
  343. for (index = 0; index < worker_threads; ++index)
  344. {
  345. rv = PR_Interrupt(thread[index]);
  346. MW_ASSERT(PR_SUCCESS == rv);
  347. rv = PR_JoinThread(thread[index]);
  348. MW_ASSERT(PR_SUCCESS == rv);
  349. }
  350. PR_DELETE(thread);
  351. CancelGroup(shared);
  352. } /* SomeOpsSomeThreads */
  353. static PRStatus ServiceRequest(Shared *shared, PRRecvWait *desc)
  354. {
  355. PRInt32 bytes_out;
  356. if (verbosity > chatty)
  357. PR_fprintf(
  358. debug, "%s: Service received %d bytes\n",
  359. shared->title, desc->bytesRecv);
  360. if (0 == desc->bytesRecv) {
  361. goto quitting;
  362. }
  363. if ((-1 == desc->bytesRecv)
  364. && (PR_PENDING_INTERRUPT_ERROR == PR_GetError())) {
  365. goto aborted;
  366. }
  367. bytes_out = PR_Send(
  368. desc->fd, desc->buffer.start, desc->bytesRecv, 0, shared->timeout);
  369. if (verbosity > chatty)
  370. PR_fprintf(
  371. debug, "%s: Service sent %d bytes\n",
  372. shared->title, bytes_out);
  373. if ((-1 == bytes_out)
  374. && (PR_PENDING_INTERRUPT_ERROR == PR_GetError())) {
  375. goto aborted;
  376. }
  377. MW_ASSERT(bytes_out == desc->bytesRecv);
  378. return PR_SUCCESS;
  379. aborted:
  380. quitting:
  381. return PR_FAILURE;
  382. } /* ServiceRequest */
  383. static void PR_CALLBACK ServiceThread(void *arg)
  384. {
  385. PRStatus rv = PR_SUCCESS;
  386. PRRecvWait *desc_out = NULL;
  387. Shared *shared = (Shared*)arg;
  388. do /* until interrupted */
  389. {
  390. if (NULL != desc_out)
  391. {
  392. desc_out->timeout = PR_INTERVAL_NO_TIMEOUT;
  393. if (verbosity > chatty) {
  394. PrintRecvDesc(desc_out, "Service re-adding");
  395. }
  396. rv = PR_AddWaitFileDesc(shared->group, desc_out);
  397. MW_ASSERT(PR_SUCCESS == rv);
  398. }
  399. desc_out = PR_WaitRecvReady(shared->group);
  400. if (NULL == desc_out)
  401. {
  402. MW_ASSERT(PR_PENDING_INTERRUPT_ERROR == PR_GetError());
  403. break;
  404. }
  405. switch (desc_out->outcome)
  406. {
  407. case PR_MW_SUCCESS:
  408. {
  409. PR_AtomicIncrement(&ops_done);
  410. if (verbosity > chatty) {
  411. PrintRecvDesc(desc_out, "Service ready");
  412. }
  413. rv = ServiceRequest(shared, desc_out);
  414. break;
  415. }
  416. case PR_MW_INTERRUPT:
  417. MW_ASSERT(PR_PENDING_INTERRUPT_ERROR == PR_GetError());
  418. rv = PR_FAILURE; /* if interrupted, then exit */
  419. break;
  420. case PR_MW_TIMEOUT:
  421. MW_ASSERT(PR_IO_TIMEOUT_ERROR == PR_GetError());
  422. case PR_MW_FAILURE:
  423. if (verbosity > silent) {
  424. PL_FPrintError(debug, "RecvReady failure");
  425. }
  426. break;
  427. default:
  428. break;
  429. }
  430. } while (PR_SUCCESS == rv);
  431. if (NULL != desc_out) {
  432. DestroyRecvWait(desc_out);
  433. }
  434. } /* ServiceThread */
  435. static void PR_CALLBACK EnumerationThread(void *arg)
  436. {
  437. PRStatus rv;
  438. PRIntn count;
  439. PRRecvWait *desc;
  440. Shared *shared = (Shared*)arg;
  441. PRIntervalTime five_seconds = PR_SecondsToInterval(5);
  442. PRMWaitEnumerator *enumerator = PR_CreateMWaitEnumerator(shared->group);
  443. MW_ASSERT(NULL != enumerator);
  444. while (PR_SUCCESS == PR_Sleep(five_seconds))
  445. {
  446. count = 0;
  447. desc = NULL;
  448. while (NULL != (desc = PR_EnumerateWaitGroup(enumerator, desc)))
  449. {
  450. if (verbosity > chatty) {
  451. PrintRecvDesc(desc, shared->title);
  452. }
  453. count += 1;
  454. }
  455. if (verbosity > silent)
  456. PR_fprintf(debug,
  457. "%s Enumerated %d objects\n", shared->title, count);
  458. }
  459. MW_ASSERT(PR_PENDING_INTERRUPT_ERROR == PR_GetError());
  460. rv = PR_DestroyMWaitEnumerator(enumerator);
  461. MW_ASSERT(PR_SUCCESS == rv);
  462. } /* EnumerationThread */
  463. static void PR_CALLBACK ServerThread(void *arg)
  464. {
  465. PRStatus rv;
  466. PRIntn index;
  467. PRRecvWait *desc_in;
  468. PRThread **worker_thread;
  469. Shared *shared = (Shared*)arg;
  470. PRFileDesc *listener, *service;
  471. PRNetAddr server_address, client_address;
  472. worker_thread = (PRThread**)PR_CALLOC(sizeof(PRThread*) * worker_threads);
  473. if (verbosity > quiet) {
  474. PR_fprintf(debug, "%s: Server creating worker_threads\n", shared->title);
  475. }
  476. for (index = 0; index < worker_threads; ++index)
  477. {
  478. worker_thread[index] = PR_CreateThread(
  479. PR_USER_THREAD, ServiceThread, shared,
  480. PR_PRIORITY_HIGH, thread_scope,
  481. PR_JOINABLE_THREAD, 16 * 1024);
  482. }
  483. rv = PR_InitializeNetAddr(PR_IpAddrAny, default_port, &server_address);
  484. MW_ASSERT(PR_SUCCESS == rv);
  485. listener = PR_NewTCPSocket(); MW_ASSERT(NULL != listener);
  486. if (verbosity > chatty)
  487. PR_fprintf(
  488. debug, "%s: Server listener socket @0x%x\n",
  489. shared->title, listener);
  490. rv = PR_Bind(listener, &server_address); MW_ASSERT(PR_SUCCESS == rv);
  491. rv = PR_Listen(listener, 10); MW_ASSERT(PR_SUCCESS == rv);
  492. while (ops_done < ops_required)
  493. {
  494. if (verbosity > quiet) {
  495. PR_fprintf(debug, "%s: Server accepting connection\n", shared->title);
  496. }
  497. service = PR_Accept(listener, &client_address, PR_INTERVAL_NO_TIMEOUT);
  498. if (NULL == service)
  499. {
  500. if (PR_PENDING_INTERRUPT_ERROR == PR_GetError()) {
  501. break;
  502. }
  503. PL_PrintError("Accept failed");
  504. MW_ASSERT(PR_FALSE && "Accept failed");
  505. }
  506. else
  507. {
  508. desc_in = CreateRecvWait(service, shared->timeout);
  509. desc_in->timeout = PR_INTERVAL_NO_TIMEOUT;
  510. if (verbosity > chatty) {
  511. PrintRecvDesc(desc_in, "Service adding");
  512. }
  513. rv = PR_AddWaitFileDesc(shared->group, desc_in);
  514. MW_ASSERT(PR_SUCCESS == rv);
  515. }
  516. }
  517. if (verbosity > quiet) {
  518. PR_fprintf(debug, "%s: Server interrupting worker_threads\n", shared->title);
  519. }
  520. for (index = 0; index < worker_threads; ++index)
  521. {
  522. rv = PR_Interrupt(worker_thread[index]);
  523. MW_ASSERT(PR_SUCCESS == rv);
  524. rv = PR_JoinThread(worker_thread[index]);
  525. MW_ASSERT(PR_SUCCESS == rv);
  526. }
  527. PR_DELETE(worker_thread);
  528. PR_Close(listener);
  529. CancelGroup(shared);
  530. } /* ServerThread */
  531. static void RealOneGroupIO(Shared *shared)
  532. {
  533. /*
  534. ** Create a server that listens for connections and then services
  535. ** requests that come in over those connections. The server never
  536. ** deletes a connection and assumes a basic RPC model of operation.
  537. **
  538. ** Use worker_threads threads to service how every many open ports
  539. ** there might be.
  540. **
  541. ** Oh, ya. Almost forget. Create (some) clients as well.
  542. */
  543. PRStatus rv;
  544. PRIntn index;
  545. PRThread *server_thread, *enumeration_thread, **client_thread;
  546. if (verbosity > quiet) {
  547. PR_fprintf(debug, "%s: creating server_thread\n", shared->title);
  548. }
  549. server_thread = PR_CreateThread(
  550. PR_USER_THREAD, ServerThread, shared,
  551. PR_PRIORITY_HIGH, thread_scope,
  552. PR_JOINABLE_THREAD, 16 * 1024);
  553. if (verbosity > quiet) {
  554. PR_fprintf(debug, "%s: creating enumeration_thread\n", shared->title);
  555. }
  556. enumeration_thread = PR_CreateThread(
  557. PR_USER_THREAD, EnumerationThread, shared,
  558. PR_PRIORITY_HIGH, thread_scope,
  559. PR_JOINABLE_THREAD, 16 * 1024);
  560. if (verbosity > quiet) {
  561. PR_fprintf(debug, "%s: snoozing before creating clients\n", shared->title);
  562. }
  563. PR_Sleep(5 * shared->timeout);
  564. if (verbosity > quiet) {
  565. PR_fprintf(debug, "%s: creating client_threads\n", shared->title);
  566. }
  567. client_thread = (PRThread**)PR_CALLOC(sizeof(PRThread*) * client_threads);
  568. for (index = 0; index < client_threads; ++index)
  569. {
  570. client_thread[index] = PR_CreateThread(
  571. PR_USER_THREAD, ClientThread, shared,
  572. PR_PRIORITY_NORMAL, thread_scope,
  573. PR_JOINABLE_THREAD, 16 * 1024);
  574. }
  575. while (ops_done < ops_required) {
  576. PR_Sleep(shared->timeout);
  577. }
  578. if (verbosity > quiet) {
  579. PR_fprintf(debug, "%s: interrupting/joining client_threads\n", shared->title);
  580. }
  581. for (index = 0; index < client_threads; ++index)
  582. {
  583. rv = PR_Interrupt(client_thread[index]);
  584. MW_ASSERT(PR_SUCCESS == rv);
  585. rv = PR_JoinThread(client_thread[index]);
  586. MW_ASSERT(PR_SUCCESS == rv);
  587. }
  588. PR_DELETE(client_thread);
  589. if (verbosity > quiet) {
  590. PR_fprintf(debug, "%s: interrupting/joining enumeration_thread\n", shared->title);
  591. }
  592. rv = PR_Interrupt(enumeration_thread);
  593. MW_ASSERT(PR_SUCCESS == rv);
  594. rv = PR_JoinThread(enumeration_thread);
  595. MW_ASSERT(PR_SUCCESS == rv);
  596. if (verbosity > quiet) {
  597. PR_fprintf(debug, "%s: interrupting/joining server_thread\n", shared->title);
  598. }
  599. rv = PR_Interrupt(server_thread);
  600. MW_ASSERT(PR_SUCCESS == rv);
  601. rv = PR_JoinThread(server_thread);
  602. MW_ASSERT(PR_SUCCESS == rv);
  603. } /* RealOneGroupIO */
  604. static void RunThisOne(
  605. void (*func)(Shared*), const char *name, const char *test_name)
  606. {
  607. Shared *shared;
  608. if ((NULL == test_name) || (0 == PL_strcmp(name, test_name)))
  609. {
  610. if (verbosity > silent) {
  611. PR_fprintf(debug, "%s()\n", name);
  612. }
  613. shared = MakeShared(name);
  614. ops_done = 0;
  615. func(shared); /* run the test */
  616. MW_ASSERT(0 == desc_allocated);
  617. DestroyShared(shared);
  618. }
  619. } /* RunThisOne */
  620. static Verbosity ChangeVerbosity(Verbosity verbosity, PRIntn delta)
  621. {
  622. return (Verbosity)(((PRIntn)verbosity) + delta);
  623. } /* ChangeVerbosity */
  624. int main(int argc, char **argv)
  625. {
  626. PLOptStatus os;
  627. const char *test_name = NULL;
  628. PLOptState *opt = PL_CreateOptState(argc, argv, "dqGc:o:p:t:w:");
  629. while (PL_OPT_EOL != (os = PL_GetNextOpt(opt)))
  630. {
  631. if (PL_OPT_BAD == os) {
  632. continue;
  633. }
  634. switch (opt->option)
  635. {
  636. case 0:
  637. test_name = opt->value;
  638. break;
  639. case 'd': /* debug mode */
  640. if (verbosity < noisy) {
  641. verbosity = ChangeVerbosity(verbosity, 1);
  642. }
  643. break;
  644. case 'q': /* debug mode */
  645. if (verbosity > silent) {
  646. verbosity = ChangeVerbosity(verbosity, -1);
  647. }
  648. break;
  649. case 'G': /* use global threads */
  650. thread_scope = PR_GLOBAL_THREAD;
  651. break;
  652. case 'c': /* number of client threads */
  653. client_threads = atoi(opt->value);
  654. break;
  655. case 'o': /* operations to compelete */
  656. ops_required = atoi(opt->value);
  657. break;
  658. case 'p': /* default port */
  659. default_port = atoi(opt->value);
  660. break;
  661. case 't': /* number of threads waiting */
  662. worker_threads = atoi(opt->value);
  663. break;
  664. case 'w': /* number of wait objects */
  665. wait_objects = atoi(opt->value);
  666. break;
  667. default:
  668. break;
  669. }
  670. }
  671. PL_DestroyOptState(opt);
  672. if (verbosity > 0) {
  673. debug = PR_GetSpecialFD(PR_StandardError);
  674. }
  675. RunThisOne(OneInThenCancelled, "OneInThenCancelled", test_name);
  676. RunThisOne(OneOpOneThread, "OneOpOneThread", test_name);
  677. RunThisOne(ManyOpOneThread, "ManyOpOneThread", test_name);
  678. RunThisOne(SomeOpsSomeThreads, "SomeOpsSomeThreads", test_name);
  679. RunThisOne(RealOneGroupIO, "RealOneGroupIO", test_name);
  680. return 0;
  681. } /* main */
  682. /* multwait.c */