thrpool_server.c 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580
  1. /* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
  2. /* This Source Code Form is subject to the terms of the Mozilla Public
  3. * License, v. 2.0. If a copy of the MPL was not distributed with this
  4. * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
  5. /***********************************************************************
  6. **
  7. ** Name: thrpool.c
  8. **
  9. ** Description: Test threadpool functionality.
  10. **
  11. ** Modification History:
  12. */
  13. #include "primpl.h"
  14. #include "plgetopt.h"
  15. #include <stdio.h>
  16. #include <string.h>
  17. #include <errno.h>
  18. #ifdef XP_UNIX
  19. #include <sys/mman.h>
  20. #endif
  21. #if defined(_PR_PTHREADS)
  22. #include <pthread.h>
  23. #endif
  24. /* for getcwd */
  25. #if defined(XP_UNIX) || defined (XP_OS2)
  26. #include <unistd.h>
  27. #elif defined(XP_PC)
  28. #include <direct.h>
  29. #endif
  30. #ifdef WIN32
  31. #include <process.h>
  32. #endif
  33. static int _debug_on = 0;
  34. static char *program_name = NULL;
  35. static void serve_client_write(void *arg);
  36. #include "obsolete/prsem.h"
  37. #ifdef XP_PC
  38. #define mode_t int
  39. #endif
  40. #define DPRINTF(arg) if (_debug_on) printf arg
  41. #define BUF_DATA_SIZE (2 * 1024)
  42. #define TCP_MESG_SIZE 1024
  43. #define NUM_TCP_CLIENTS 10 /* for a listen queue depth of 5 */
  44. #define NUM_TCP_CONNECTIONS_PER_CLIENT 10
  45. #define NUM_TCP_MESGS_PER_CONNECTION 10
  46. #define TCP_SERVER_PORT 10000
  47. #define SERVER_MAX_BIND_COUNT 100
  48. #ifdef WINCE
  49. char *getcwd(char *buf, size_t size)
  50. {
  51. wchar_t wpath[MAX_PATH];
  52. _wgetcwd(wpath, MAX_PATH);
  53. WideCharToMultiByte(CP_ACP, 0, wpath, -1, buf, size, 0, 0);
  54. }
  55. #define perror(s)
  56. #endif
  57. static PRInt32 num_tcp_clients = NUM_TCP_CLIENTS;
  58. static PRInt32 num_tcp_connections_per_client = NUM_TCP_CONNECTIONS_PER_CLIENT;
  59. static PRInt32 tcp_mesg_size = TCP_MESG_SIZE;
  60. static PRInt32 num_tcp_mesgs_per_connection = NUM_TCP_MESGS_PER_CONNECTION;
  61. static void TCP_Server_Accept(void *arg);
  62. int failed_already=0;
  63. typedef struct buffer {
  64. char data[BUF_DATA_SIZE];
  65. } buffer;
  66. typedef struct Server_Param {
  67. PRJobIoDesc iod; /* socket to read from/write to */
  68. PRInt32 datalen; /* bytes of data transfered in each read/write */
  69. PRNetAddr netaddr;
  70. PRMonitor *exit_mon; /* monitor to signal on exit */
  71. PRInt32 *job_counterp; /* counter to decrement, before exit */
  72. PRInt32 conn_counter; /* counter to decrement, before exit */
  73. PRThreadPool *tp;
  74. } Server_Param;
  75. typedef struct Serve_Client_Param {
  76. PRJobIoDesc iod; /* socket to read from/write to */
  77. PRInt32 datalen; /* bytes of data transfered in each read/write */
  78. PRMonitor *exit_mon; /* monitor to signal on exit */
  79. PRInt32 *job_counterp; /* counter to decrement, before exit */
  80. PRThreadPool *tp;
  81. } Serve_Client_Param;
  82. typedef struct Session {
  83. PRJobIoDesc iod; /* socket to read from/write to */
  84. buffer *in_buf;
  85. PRInt32 bytes;
  86. PRInt32 msg_num;
  87. PRInt32 bytes_read;
  88. PRMonitor *exit_mon; /* monitor to signal on exit */
  89. PRInt32 *job_counterp; /* counter to decrement, before exit */
  90. PRThreadPool *tp;
  91. } Session;
  92. static void
  93. serve_client_read(void *arg)
  94. {
  95. Session *sp = (Session *) arg;
  96. int rem;
  97. int bytes;
  98. int offset;
  99. PRFileDesc *sockfd;
  100. char *buf;
  101. PRJob *jobp;
  102. PRIntervalTime timeout = PR_INTERVAL_NO_TIMEOUT;
  103. sockfd = sp->iod.socket;
  104. buf = sp->in_buf->data;
  105. PR_ASSERT(sp->msg_num < num_tcp_mesgs_per_connection);
  106. PR_ASSERT(sp->bytes_read < sp->bytes);
  107. offset = sp->bytes_read;
  108. rem = sp->bytes - offset;
  109. bytes = PR_Recv(sockfd, buf + offset, rem, 0, timeout);
  110. if (bytes < 0) {
  111. return;
  112. }
  113. sp->bytes_read += bytes;
  114. sp->iod.timeout = PR_SecondsToInterval(60);
  115. if (sp->bytes_read < sp->bytes) {
  116. jobp = PR_QueueJob_Read(sp->tp, &sp->iod, serve_client_read, sp,
  117. PR_FALSE);
  118. PR_ASSERT(NULL != jobp);
  119. return;
  120. }
  121. PR_ASSERT(sp->bytes_read == sp->bytes);
  122. DPRINTF(("serve_client: read complete, msg(%d) \n", sp->msg_num));
  123. sp->iod.timeout = PR_SecondsToInterval(60);
  124. jobp = PR_QueueJob_Write(sp->tp, &sp->iod, serve_client_write, sp,
  125. PR_FALSE);
  126. PR_ASSERT(NULL != jobp);
  127. return;
  128. }
  129. static void
  130. serve_client_write(void *arg)
  131. {
  132. Session *sp = (Session *) arg;
  133. int bytes;
  134. PRFileDesc *sockfd;
  135. char *buf;
  136. PRJob *jobp;
  137. sockfd = sp->iod.socket;
  138. buf = sp->in_buf->data;
  139. PR_ASSERT(sp->msg_num < num_tcp_mesgs_per_connection);
  140. bytes = PR_Send(sockfd, buf, sp->bytes, 0, PR_INTERVAL_NO_TIMEOUT);
  141. PR_ASSERT(bytes == sp->bytes);
  142. if (bytes < 0) {
  143. return;
  144. }
  145. DPRINTF(("serve_client: write complete, msg(%d) \n", sp->msg_num));
  146. sp->msg_num++;
  147. if (sp->msg_num < num_tcp_mesgs_per_connection) {
  148. sp->bytes_read = 0;
  149. sp->iod.timeout = PR_SecondsToInterval(60);
  150. jobp = PR_QueueJob_Read(sp->tp, &sp->iod, serve_client_read, sp,
  151. PR_FALSE);
  152. PR_ASSERT(NULL != jobp);
  153. return;
  154. }
  155. DPRINTF(("serve_client: read/write complete, msg(%d) \n", sp->msg_num));
  156. if (PR_Shutdown(sockfd, PR_SHUTDOWN_BOTH) < 0) {
  157. fprintf(stderr,"%s: ERROR - PR_Shutdown\n", program_name);
  158. }
  159. PR_Close(sockfd);
  160. PR_EnterMonitor(sp->exit_mon);
  161. --(*sp->job_counterp);
  162. PR_Notify(sp->exit_mon);
  163. PR_ExitMonitor(sp->exit_mon);
  164. PR_DELETE(sp->in_buf);
  165. PR_DELETE(sp);
  166. return;
  167. }
  168. /*
  169. * Serve_Client
  170. * Thread, started by the server, for serving a client connection.
  171. * Reads data from socket and writes it back, unmodified, and
  172. * closes the socket
  173. */
  174. static void PR_CALLBACK
  175. Serve_Client(void *arg)
  176. {
  177. Serve_Client_Param *scp = (Serve_Client_Param *) arg;
  178. buffer *in_buf;
  179. Session *sp;
  180. PRJob *jobp;
  181. sp = PR_NEW(Session);
  182. sp->iod = scp->iod;
  183. in_buf = PR_NEW(buffer);
  184. if (in_buf == NULL) {
  185. fprintf(stderr,"%s: failed to alloc buffer struct\n",program_name);
  186. failed_already=1;
  187. return;
  188. }
  189. sp->in_buf = in_buf;
  190. sp->bytes = scp->datalen;
  191. sp->msg_num = 0;
  192. sp->bytes_read = 0;
  193. sp->tp = scp->tp;
  194. sp->exit_mon = scp->exit_mon;
  195. sp->job_counterp = scp->job_counterp;
  196. sp->iod.timeout = PR_SecondsToInterval(60);
  197. jobp = PR_QueueJob_Read(sp->tp, &sp->iod, serve_client_read, sp,
  198. PR_FALSE);
  199. PR_ASSERT(NULL != jobp);
  200. PR_DELETE(scp);
  201. }
  202. static void
  203. print_stats(void *arg)
  204. {
  205. Server_Param *sp = (Server_Param *) arg;
  206. PRThreadPool *tp = sp->tp;
  207. PRInt32 counter;
  208. PRJob *jobp;
  209. PR_EnterMonitor(sp->exit_mon);
  210. counter = (*sp->job_counterp);
  211. PR_ExitMonitor(sp->exit_mon);
  212. printf("PRINT_STATS: #client connections = %d\n",counter);
  213. jobp = PR_QueueJob_Timer(tp, PR_MillisecondsToInterval(500),
  214. print_stats, sp, PR_FALSE);
  215. PR_ASSERT(NULL != jobp);
  216. }
  217. static int job_counter = 0;
  218. /*
  219. * TCP Server
  220. * Server binds an address to a socket, starts a client process and
  221. * listens for incoming connections.
  222. * Each client connects to the server and sends a chunk of data
  223. * Starts a Serve_Client job for each incoming connection, to read
  224. * the data from the client and send it back to the client, unmodified.
  225. * Each client checks that data received from server is same as the
  226. * data it sent to the server.
  227. * Finally, the threadpool is shutdown
  228. */
  229. static void PR_CALLBACK
  230. TCP_Server(void *arg)
  231. {
  232. PRThreadPool *tp = (PRThreadPool *) arg;
  233. Server_Param *sp;
  234. PRFileDesc *sockfd;
  235. PRNetAddr netaddr;
  236. PRMonitor *sc_mon;
  237. PRJob *jobp;
  238. int i;
  239. PRStatus rval;
  240. /*
  241. * Create a tcp socket
  242. */
  243. if ((sockfd = PR_NewTCPSocket()) == NULL) {
  244. fprintf(stderr,"%s: PR_NewTCPSocket failed\n", program_name);
  245. return;
  246. }
  247. memset(&netaddr, 0, sizeof(netaddr));
  248. netaddr.inet.family = PR_AF_INET;
  249. netaddr.inet.port = PR_htons(TCP_SERVER_PORT);
  250. netaddr.inet.ip = PR_htonl(PR_INADDR_ANY);
  251. /*
  252. * try a few times to bind server's address, if addresses are in
  253. * use
  254. */
  255. i = 0;
  256. while (PR_Bind(sockfd, &netaddr) < 0) {
  257. if (PR_GetError() == PR_ADDRESS_IN_USE_ERROR) {
  258. netaddr.inet.port += 2;
  259. if (i++ < SERVER_MAX_BIND_COUNT) {
  260. continue;
  261. }
  262. }
  263. fprintf(stderr,"%s: ERROR - PR_Bind failed\n", program_name);
  264. perror("PR_Bind");
  265. failed_already=1;
  266. return;
  267. }
  268. if (PR_Listen(sockfd, 32) < 0) {
  269. fprintf(stderr,"%s: ERROR - PR_Listen failed\n", program_name);
  270. failed_already=1;
  271. return;
  272. }
  273. if (PR_GetSockName(sockfd, &netaddr) < 0) {
  274. fprintf(stderr,"%s: ERROR - PR_GetSockName failed\n", program_name);
  275. failed_already=1;
  276. return;
  277. }
  278. DPRINTF((
  279. "TCP_Server: PR_BIND netaddr.inet.ip = 0x%lx, netaddr.inet.port = %d\n",
  280. netaddr.inet.ip, netaddr.inet.port));
  281. sp = PR_NEW(Server_Param);
  282. if (sp == NULL) {
  283. fprintf(stderr,"%s: PR_NEW failed\n", program_name);
  284. failed_already=1;
  285. return;
  286. }
  287. sp->iod.socket = sockfd;
  288. sp->iod.timeout = PR_SecondsToInterval(60);
  289. sp->datalen = tcp_mesg_size;
  290. sp->exit_mon = sc_mon;
  291. sp->job_counterp = &job_counter;
  292. sp->conn_counter = 0;
  293. sp->tp = tp;
  294. sp->netaddr = netaddr;
  295. /* create and cancel an io job */
  296. jobp = PR_QueueJob_Accept(tp, &sp->iod, TCP_Server_Accept, sp,
  297. PR_FALSE);
  298. PR_ASSERT(NULL != jobp);
  299. rval = PR_CancelJob(jobp);
  300. PR_ASSERT(PR_SUCCESS == rval);
  301. /*
  302. * create the client process
  303. */
  304. {
  305. #define MAX_ARGS 4
  306. char *argv[MAX_ARGS + 1];
  307. int index = 0;
  308. char port[32];
  309. char path[1024 + sizeof("/thrpool_client")];
  310. getcwd(path, sizeof(path));
  311. (void)strcat(path, "/thrpool_client");
  312. #ifdef XP_PC
  313. (void)strcat(path, ".exe");
  314. #endif
  315. argv[index++] = path;
  316. sprintf(port,"%d",PR_ntohs(netaddr.inet.port));
  317. if (_debug_on)
  318. {
  319. argv[index++] = "-d";
  320. argv[index++] = "-p";
  321. argv[index++] = port;
  322. argv[index++] = NULL;
  323. } else {
  324. argv[index++] = "-p";
  325. argv[index++] = port;
  326. argv[index++] = NULL;
  327. }
  328. PR_ASSERT(MAX_ARGS >= (index - 1));
  329. DPRINTF(("creating client process %s ...\n", path));
  330. if (PR_FAILURE == PR_CreateProcessDetached(path, argv, NULL, NULL)) {
  331. fprintf(stderr,
  332. "thrpool_server: ERROR - PR_CreateProcessDetached failed\n");
  333. failed_already=1;
  334. return;
  335. }
  336. }
  337. sc_mon = PR_NewMonitor();
  338. if (sc_mon == NULL) {
  339. fprintf(stderr,"%s: PR_NewMonitor failed\n", program_name);
  340. failed_already=1;
  341. return;
  342. }
  343. sp->iod.socket = sockfd;
  344. sp->iod.timeout = PR_SecondsToInterval(60);
  345. sp->datalen = tcp_mesg_size;
  346. sp->exit_mon = sc_mon;
  347. sp->job_counterp = &job_counter;
  348. sp->conn_counter = 0;
  349. sp->tp = tp;
  350. sp->netaddr = netaddr;
  351. /* create and cancel a timer job */
  352. jobp = PR_QueueJob_Timer(tp, PR_MillisecondsToInterval(5000),
  353. print_stats, sp, PR_FALSE);
  354. PR_ASSERT(NULL != jobp);
  355. rval = PR_CancelJob(jobp);
  356. PR_ASSERT(PR_SUCCESS == rval);
  357. DPRINTF(("TCP_Server: Accepting connections \n"));
  358. jobp = PR_QueueJob_Accept(tp, &sp->iod, TCP_Server_Accept, sp,
  359. PR_FALSE);
  360. PR_ASSERT(NULL != jobp);
  361. return;
  362. }
  363. static void
  364. TCP_Server_Accept(void *arg)
  365. {
  366. Server_Param *sp = (Server_Param *) arg;
  367. PRThreadPool *tp = sp->tp;
  368. Serve_Client_Param *scp;
  369. PRFileDesc *newsockfd;
  370. PRJob *jobp;
  371. if ((newsockfd = PR_Accept(sp->iod.socket, &sp->netaddr,
  372. PR_INTERVAL_NO_TIMEOUT)) == NULL) {
  373. fprintf(stderr,"%s: ERROR - PR_Accept failed\n", program_name);
  374. failed_already=1;
  375. goto exit;
  376. }
  377. scp = PR_NEW(Serve_Client_Param);
  378. if (scp == NULL) {
  379. fprintf(stderr,"%s: PR_NEW failed\n", program_name);
  380. failed_already=1;
  381. goto exit;
  382. }
  383. /*
  384. * Start a Serve_Client job for each incoming connection
  385. */
  386. scp->iod.socket = newsockfd;
  387. scp->iod.timeout = PR_SecondsToInterval(60);
  388. scp->datalen = tcp_mesg_size;
  389. scp->exit_mon = sp->exit_mon;
  390. scp->job_counterp = sp->job_counterp;
  391. scp->tp = sp->tp;
  392. PR_EnterMonitor(sp->exit_mon);
  393. (*sp->job_counterp)++;
  394. PR_ExitMonitor(sp->exit_mon);
  395. jobp = PR_QueueJob(tp, Serve_Client, scp,
  396. PR_FALSE);
  397. PR_ASSERT(NULL != jobp);
  398. DPRINTF(("TCP_Server: Created Serve_Client = 0x%lx\n", jobp));
  399. /*
  400. * single-threaded update; no lock needed
  401. */
  402. sp->conn_counter++;
  403. if (sp->conn_counter <
  404. (num_tcp_clients * num_tcp_connections_per_client)) {
  405. jobp = PR_QueueJob_Accept(tp, &sp->iod, TCP_Server_Accept, sp,
  406. PR_FALSE);
  407. PR_ASSERT(NULL != jobp);
  408. return;
  409. }
  410. jobp = PR_QueueJob_Timer(tp, PR_MillisecondsToInterval(500),
  411. print_stats, sp, PR_FALSE);
  412. PR_ASSERT(NULL != jobp);
  413. DPRINTF(("TCP_Server: Created print_stats timer job = 0x%lx\n", jobp));
  414. exit:
  415. PR_EnterMonitor(sp->exit_mon);
  416. /* Wait for server jobs to finish */
  417. while (0 != *sp->job_counterp) {
  418. PR_Wait(sp->exit_mon, PR_INTERVAL_NO_TIMEOUT);
  419. DPRINTF(("TCP_Server: conn_counter = %d\n",
  420. *sp->job_counterp));
  421. }
  422. PR_ExitMonitor(sp->exit_mon);
  423. if (sp->iod.socket) {
  424. PR_Close(sp->iod.socket);
  425. }
  426. PR_DestroyMonitor(sp->exit_mon);
  427. printf("%30s","TCP_Socket_Client_Server_Test:");
  428. printf("%2ld Server %2ld Clients %2ld connections_per_client\n",1l,
  429. num_tcp_clients, num_tcp_connections_per_client);
  430. printf("%30s %2ld messages_per_connection %4ld bytes_per_message\n",":",
  431. num_tcp_mesgs_per_connection, tcp_mesg_size);
  432. DPRINTF(("%s: calling PR_ShutdownThreadPool\n", program_name));
  433. PR_ShutdownThreadPool(sp->tp);
  434. PR_DELETE(sp);
  435. }
  436. /************************************************************************/
  437. #define DEFAULT_INITIAL_THREADS 4
  438. #define DEFAULT_MAX_THREADS 100
  439. #define DEFAULT_STACKSIZE (512 * 1024)
  440. int main(int argc, char **argv)
  441. {
  442. PRInt32 initial_threads = DEFAULT_INITIAL_THREADS;
  443. PRInt32 max_threads = DEFAULT_MAX_THREADS;
  444. PRInt32 stacksize = DEFAULT_STACKSIZE;
  445. PRThreadPool *tp = NULL;
  446. PRStatus rv;
  447. PRJob *jobp;
  448. /*
  449. * -d debug mode
  450. */
  451. PLOptStatus os;
  452. PLOptState *opt;
  453. program_name = argv[0];
  454. opt = PL_CreateOptState(argc, argv, "d");
  455. while (PL_OPT_EOL != (os = PL_GetNextOpt(opt)))
  456. {
  457. if (PL_OPT_BAD == os) {
  458. continue;
  459. }
  460. switch (opt->option)
  461. {
  462. case 'd': /* debug mode */
  463. _debug_on = 1;
  464. break;
  465. default:
  466. break;
  467. }
  468. }
  469. PL_DestroyOptState(opt);
  470. PR_Init(PR_USER_THREAD, PR_PRIORITY_NORMAL, 0);
  471. PR_STDIO_INIT();
  472. PR_SetConcurrency(4);
  473. tp = PR_CreateThreadPool(initial_threads, max_threads, stacksize);
  474. if (NULL == tp) {
  475. printf("PR_CreateThreadPool failed\n");
  476. failed_already=1;
  477. goto done;
  478. }
  479. jobp = PR_QueueJob(tp, TCP_Server, tp, PR_TRUE);
  480. rv = PR_JoinJob(jobp);
  481. PR_ASSERT(PR_SUCCESS == rv);
  482. DPRINTF(("%s: calling PR_JoinThreadPool\n", program_name));
  483. rv = PR_JoinThreadPool(tp);
  484. PR_ASSERT(PR_SUCCESS == rv);
  485. DPRINTF(("%s: returning from PR_JoinThreadPool\n", program_name));
  486. done:
  487. PR_Cleanup();
  488. if (failed_already) {
  489. return 1;
  490. }
  491. else {
  492. return 0;
  493. }
  494. }