exec-stream-helpers.cpp 23 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728
  1. /*
  2. Copyright (C) 2004 Artem Khodush
  3. Redistribution and use in source and binary forms, with or without modification,
  4. are permitted provided that the following conditions are met:
  5. 1. Redistributions of source code must retain the above copyright notice,
  6. this list of conditions and the following disclaimer.
  7. 2. Redistributions in binary form must reproduce the above copyright notice,
  8. this list of conditions and the following disclaimer in the documentation
  9. and/or other materials provided with the distribution.
  10. 3. The name of the author may not be used to endorse or promote products
  11. derived from this software without specific prior written permission.
  12. THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED
  13. WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
  14. OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
  15. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
  16. SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
  17. PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
  18. OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
  19. WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
  20. OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE,
  21. EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
  22. */
  23. // os_error_t
  24. os_error_t::os_error_t( std::string const & msg )
  25. {
  26. compose( msg, GetLastError() );
  27. }
  28. os_error_t::os_error_t( std::string const & msg, exec_stream_t::error_code_t code )
  29. {
  30. compose( msg, code );
  31. }
  32. void os_error_t::compose( std::string const & msg, exec_stream_t::error_code_t code )
  33. {
  34. std::string s( msg );
  35. s+='\n';
  36. LPVOID buf;
  37. if( FormatMessage( FORMAT_MESSAGE_ALLOCATE_BUFFER|FORMAT_MESSAGE_FROM_SYSTEM,
  38. 0,
  39. code,
  40. MAKELANGID( LANG_NEUTRAL, SUBLANG_DEFAULT ),
  41. (LPTSTR) &buf,
  42. 0,
  43. 0
  44. )==0 ) {
  45. s+="[unable to retrieve error description]";
  46. }else {
  47. // FormatMessage may return \n-terminated string
  48. LPTSTR str_buf=(LPTSTR)buf;
  49. std::size_t buf_len=strlen( str_buf );
  50. while( buf_len>0 && str_buf[buf_len-1]=='\n' ) {
  51. --buf_len;
  52. str_buf[buf_len]=0;
  53. }
  54. s+=(LPTSTR)buf;
  55. LocalFree( buf );
  56. }
  57. exec_stream_t::error_t::compose( s, code );
  58. }
  59. // pipe_t
  60. pipe_t::pipe_t()
  61. : m_direction( closed ), m_r( INVALID_HANDLE_VALUE ), m_w( INVALID_HANDLE_VALUE )
  62. {
  63. open();
  64. }
  65. pipe_t::~pipe_t()
  66. {
  67. close();
  68. }
  69. void pipe_t::close_r()
  70. {
  71. if( m_direction==both || m_direction==read ) {
  72. if( !CloseHandle( m_r ) ) {
  73. throw os_error_t( "pipe_t::close_r: CloseHandle failed" );
  74. }
  75. m_direction= m_direction==both ? write : closed;
  76. }
  77. }
  78. void pipe_t::close_w()
  79. {
  80. if( m_direction==both || m_direction==write ) {
  81. if( !CloseHandle( m_w ) ) {
  82. throw os_error_t( "pipe_t::close_w: CloseHandle failed" );
  83. }
  84. m_direction= m_direction==both ? read : closed;
  85. }
  86. }
  87. void pipe_t::close()
  88. {
  89. close_r();
  90. close_w();
  91. }
  92. void pipe_t::open()
  93. {
  94. close();
  95. SECURITY_ATTRIBUTES sa;
  96. sa.nLength=sizeof( sa );
  97. sa.bInheritHandle=true;
  98. sa.lpSecurityDescriptor=0;
  99. if( !CreatePipe( &m_r, &m_w, &sa, 0 ) )
  100. throw os_error_t( "pipe_t::pipe_t: CreatePipe failed" );
  101. m_direction=both;
  102. }
  103. HANDLE pipe_t::r() const
  104. {
  105. return m_r;
  106. }
  107. HANDLE pipe_t::w() const
  108. {
  109. return m_w;
  110. }
  111. // set_stdhandle_t
  112. set_stdhandle_t::set_stdhandle_t( DWORD kind, HANDLE handle )
  113. : m_kind( kind ), m_save_handle( GetStdHandle( kind ) )
  114. {
  115. if( m_save_handle==INVALID_HANDLE_VALUE )
  116. throw os_error_t( "set_stdhandle_t::set_stdhandle_t: GetStdHandle() failed" );
  117. if( !SetStdHandle( kind, handle ) )
  118. throw os_error_t( "set_stdhandle_t::set_stdhandle_t: SetStdHandle() failed" );
  119. }
  120. set_stdhandle_t::~set_stdhandle_t()
  121. {
  122. SetStdHandle( m_kind, m_save_handle );
  123. }
  124. //wait_result_t
  125. wait_result_t::wait_result_t()
  126. {
  127. m_signaled_object=INVALID_HANDLE_VALUE;
  128. m_timed_out=false;
  129. m_error_code=ERROR_SUCCESS;
  130. m_error_message="";
  131. }
  132. wait_result_t::wait_result_t( DWORD wait_result, int objects_count, HANDLE const * objects )
  133. {
  134. m_signaled_object=INVALID_HANDLE_VALUE;
  135. m_timed_out=false;
  136. m_error_code=ERROR_SUCCESS;
  137. m_error_message="";
  138. if( wait_result>=WAIT_OBJECT_0 && wait_result<WAIT_OBJECT_0+objects_count ) {
  139. m_signaled_object=objects[wait_result-WAIT_OBJECT_0];
  140. }else if( wait_result>=WAIT_ABANDONED_0 && wait_result<WAIT_ABANDONED_0+objects_count ) {
  141. m_error_message="wait_result_t: one of the wait objects was abandoned";
  142. }else if( wait_result==WAIT_TIMEOUT ) {
  143. m_timed_out=true;
  144. m_error_message="wait_result_t: timeout elapsed";
  145. }else if( wait_result==WAIT_FAILED ) {
  146. m_error_code=GetLastError();
  147. }else {
  148. m_error_message="wait_result_t: weird error: unrecognised WaitForMultipleObjects return value";
  149. m_error_code=wait_result;
  150. }
  151. }
  152. bool wait_result_t::ok()
  153. {
  154. return m_error_code==ERROR_SUCCESS && m_error_message[0]==0;
  155. }
  156. bool wait_result_t::is_signaled( event_t & event )
  157. {
  158. return m_signaled_object==event.m_handle;
  159. }
  160. bool wait_result_t::timed_out()
  161. {
  162. return m_timed_out;
  163. }
  164. DWORD wait_result_t::error_code()
  165. {
  166. return m_error_code;
  167. }
  168. char const * wait_result_t::error_message()
  169. {
  170. return m_error_message;
  171. }
  172. // event_t
  173. event_t::event_t()
  174. {
  175. m_handle=CreateEvent( 0, TRUE, FALSE, 0 );
  176. if( m_handle==0 ) {
  177. throw os_error_t( "event_t::event_t: create event failed" );
  178. }
  179. }
  180. event_t::~event_t()
  181. {
  182. CloseHandle( m_handle );
  183. }
  184. bool event_t::set()
  185. {
  186. return SetEvent( m_handle )!=0;
  187. }
  188. bool event_t::reset()
  189. {
  190. return ResetEvent( m_handle )!=0;
  191. }
  192. // wait functions
  193. wait_result_t wait( HANDLE h, DWORD timeout )
  194. {
  195. return wait_result_t( WaitForSingleObject( h, timeout ), 1, &h );
  196. }
  197. wait_result_t wait( event_t & e, DWORD timeout )
  198. {
  199. return wait_result_t( WaitForSingleObject( e.m_handle, timeout ), 1, &e.m_handle );
  200. }
  201. wait_result_t wait( event_t & e1, event_t & e2, DWORD timeout )
  202. {
  203. HANDLE h[2];
  204. h[0]=e1.m_handle;
  205. h[1]=e2.m_handle;
  206. return wait_result_t( WaitForMultipleObjects( 2, h, FALSE, timeout ), 2, h );
  207. }
  208. // mutex_t
  209. mutex_t::mutex_t()
  210. {
  211. m_handle=CreateMutex( 0, FALSE, 0 );
  212. if( m_handle==0 ) {
  213. throw os_error_t( "mutex_t::mutex_t: CreateMutex failed" );
  214. }
  215. }
  216. mutex_t::~mutex_t()
  217. {
  218. CloseHandle( m_handle );
  219. }
  220. // grab_mutex_t
  221. grab_mutex_t::grab_mutex_t( mutex_t & mutex, DWORD timeout )
  222. {
  223. m_mutex=mutex.m_handle;
  224. m_wait_result=wait( m_mutex, timeout );
  225. }
  226. grab_mutex_t::~grab_mutex_t()
  227. {
  228. if( m_wait_result.ok() ) {
  229. ReleaseMutex( m_mutex );
  230. }
  231. }
  232. bool grab_mutex_t::ok()
  233. {
  234. return m_wait_result.ok();
  235. }
  236. DWORD grab_mutex_t::error_code()
  237. {
  238. return m_wait_result.error_code();
  239. }
  240. char const * grab_mutex_t::error_message()
  241. {
  242. return m_wait_result.error_message();
  243. }
  244. // thread_buffer_t
  245. thread_buffer_t::thread_buffer_t()
  246. {
  247. m_direction=dir_none;
  248. m_message_prefix="";
  249. m_error_code=ERROR_SUCCESS;
  250. m_error_message="";
  251. m_wait_timeout=2000;
  252. m_buffer_limit=0;
  253. m_read_buffer_size=4096;
  254. m_thread=0;
  255. m_thread_termination_timeout=500;
  256. m_translate_crlf=true;
  257. }
  258. thread_buffer_t::~thread_buffer_t()
  259. {
  260. bool stopped=false;
  261. try {
  262. stopped=stop_thread();
  263. }catch(...){
  264. }
  265. if( !stopped ) {
  266. // one more time, with attitude
  267. try {
  268. stopped=abort_thread();
  269. }catch(...){
  270. }
  271. if( !stopped ) {
  272. DWORD code=GetLastError();
  273. // otherwize, the thread will be left running loose stomping on freed memory.
  274. std::terminate();
  275. }
  276. }
  277. }
  278. void thread_buffer_t::set_wait_timeout( DWORD milliseconds )
  279. {
  280. if( m_direction!=dir_none ) {
  281. throw exec_stream_t::error_t( "thread_buffer_t::set_wait_timeout: thread already started" );
  282. }
  283. m_wait_timeout=milliseconds;
  284. }
  285. // next three set values that are accessed in the same thread only, so they may be called anytime
  286. void thread_buffer_t::set_thread_termination_timeout( DWORD milliseconds ) {
  287. m_thread_termination_timeout=milliseconds;
  288. }
  289. void thread_buffer_t::set_binary_mode()
  290. {
  291. m_translate_crlf=false;
  292. }
  293. void thread_buffer_t::set_text_mode()
  294. {
  295. m_translate_crlf=true;
  296. }
  297. void thread_buffer_t::set_buffer_limit( std::size_t limit )
  298. {
  299. if( m_direction!=dir_none ) {
  300. throw exec_stream_t::error_t( "thread_buffer_t::set_buffer_limit: thread already started" );
  301. }
  302. m_buffer_limit=limit;
  303. }
  304. void thread_buffer_t::set_read_buffer_size( std::size_t size )
  305. {
  306. if( m_direction!=dir_none ) {
  307. throw exec_stream_t::error_t( "thread_buffer_t::set_read_buffer_size: thread already started" );
  308. }
  309. m_read_buffer_size=size;
  310. }
  311. void thread_buffer_t::start_reader_thread( HANDLE pipe )
  312. {
  313. start_thread( pipe, dir_read );
  314. }
  315. void thread_buffer_t::start_writer_thread( HANDLE pipe )
  316. {
  317. start_thread( pipe, dir_write );
  318. }
  319. void thread_buffer_t::start_thread( HANDLE pipe, direction_t direction )
  320. {
  321. if( m_direction!=dir_none ) {
  322. throw exec_stream_t::error_t( "thread_buffer_t::start_thread: thread already started" );
  323. }
  324. m_buffer_list.clear();
  325. m_pipe=pipe;
  326. if( !m_stop_thread.reset() ) {
  327. throw os_error_t( "thread_buffer_t::start_thread: unable to initialize m_stop_thread event" );
  328. }
  329. if( !m_got_data.reset() ) {
  330. throw os_error_t( "thread_buffer_t::start_thread: unable to initialize m_got_data event" );
  331. }
  332. if( !m_want_data.set() ) {
  333. throw os_error_t( "thread_buffer_t::start_thread: unable to initialize m_want_data event" );
  334. }
  335. DWORD thread_id;
  336. m_thread=CreateThread( 0, 0, direction==dir_read ? reader_thread : writer_thread, this, 0, &thread_id );
  337. if( m_thread==0 ) {
  338. throw os_error_t( "thread_buffer_t::start_thread: unable to start thread" );
  339. }
  340. m_direction= direction==dir_read ? dir_read : dir_write;
  341. }
  342. bool thread_buffer_t::check_thread_stopped()
  343. {
  344. wait_result_t wait_result=wait( m_thread, m_thread_termination_timeout );
  345. if( !wait_result.ok() && !wait_result.timed_out() ) {
  346. check_error( "thread_buffer_t::check_thread_stopped: wait for thread to finish failed", wait_result.error_code(), wait_result.error_message() );
  347. }
  348. if( wait_result.ok() ) {
  349. CloseHandle( m_thread );
  350. m_direction=dir_none;
  351. return true;
  352. }else {
  353. return false;
  354. }
  355. }
  356. bool thread_buffer_t::stop_thread()
  357. {
  358. if( m_direction!=dir_none ) {
  359. if( !m_stop_thread.set() ) {
  360. throw os_error_t( "thread_buffer_t::stop_thread: m_stop_thread.set() failed" );
  361. }
  362. bool res=check_thread_stopped();
  363. if( res ) {
  364. check_error( m_message_prefix, m_error_code, m_error_message );
  365. }
  366. return res;
  367. }
  368. return true;
  369. }
  370. bool thread_buffer_t::abort_thread()
  371. {
  372. if( m_direction!=dir_none ) {
  373. if( !TerminateThread( m_thread, 0 ) ) {
  374. throw os_error_t( "exec_steam_t::abort_thread: TerminateThread failed" );
  375. }
  376. return check_thread_stopped();
  377. }
  378. return true;
  379. }
  380. void thread_buffer_t::get( exec_stream_t::stream_kind_t, char * dst, std::size_t & size, bool & no_more )
  381. {
  382. if( m_direction!=dir_read ) {
  383. throw exec_stream_t::error_t( "thread_buffer_t::get: thread was not started or was started for writing" );
  384. }
  385. // check thread status
  386. DWORD thread_exit_code;
  387. if( !GetExitCodeThread( m_thread, &thread_exit_code ) ) {
  388. throw os_error_t( "thread_buffer_t::get: GetExitCodeThread failed" );
  389. }
  390. if( thread_exit_code!=STILL_ACTIVE ) {
  391. if( !m_buffer_list.empty() ) {
  392. // we have data - deliver it first
  393. // when thread terminated, there is no need to synchronize
  394. if( m_translate_crlf ) {
  395. m_buffer_list.get_translate_crlf( dst, size );
  396. }else {
  397. m_buffer_list.get( dst, size );
  398. }
  399. no_more=false;
  400. }else {
  401. // thread terminated and we have no more data to return - report errors, if any
  402. check_error( m_message_prefix, m_error_code, m_error_message );
  403. // if terminated without error - signal eof
  404. no_more=true;
  405. size=0;
  406. }
  407. }else {
  408. no_more=false;
  409. // thread still running - synchronize
  410. // wait for both m_got_data, m_mutex
  411. wait_result_t wait_result=wait( m_got_data, m_wait_timeout );
  412. if( !wait_result.ok() ) {
  413. check_error( "thread_buffer_t::get: wait for got_data failed", wait_result.error_code(), wait_result.error_message() );
  414. }
  415. grab_mutex_t grab_mutex( m_mutex, m_wait_timeout );
  416. if( !grab_mutex.ok() ) {
  417. check_error( "thread_buffer_t::get: wait for mutex failed", grab_mutex.error_code(), grab_mutex.error_message() );
  418. }
  419. if( m_translate_crlf ) {
  420. m_buffer_list.get_translate_crlf( dst, size );
  421. }else {
  422. m_buffer_list.get( dst, size );
  423. }
  424. // if buffer is not too long tell the thread we want more data
  425. if( !m_buffer_list.full( m_buffer_limit ) ) {
  426. if( !m_want_data.set() ) {
  427. throw os_error_t( "thread_buffer_t::get: unable to set m_want_data event" );
  428. }
  429. }
  430. // if no data left - make the next get() wait until it arrives
  431. if( m_buffer_list.empty() ) {
  432. if( !m_got_data.reset() ) {
  433. throw os_error_t( "thread_buffer_t::get: unable to reset m_got_data event" );
  434. }
  435. }
  436. }
  437. }
  438. DWORD WINAPI thread_buffer_t::reader_thread( LPVOID param )
  439. {
  440. thread_buffer_t * p=static_cast< thread_buffer_t * >( param );
  441. // accessing p anywhere here is safe because thread_buffer_t destructor
  442. // ensures the thread is terminated before p get destroyed
  443. char * read_buffer=0;
  444. try {
  445. read_buffer=new char[p->m_read_buffer_size];
  446. while( true ) {
  447. // see if get() wants more data, or if someone wants to stop the thread
  448. wait_result_t wait_result=wait( p->m_stop_thread, p->m_want_data, p->m_wait_timeout );
  449. if( !wait_result.ok() && !wait_result.timed_out() ) {
  450. p->note_thread_error( "thread_buffer_t::reader_thread: wait for want_data, destruction failed", wait_result.error_code(), wait_result.error_message() );
  451. break;
  452. }
  453. if( wait_result.is_signaled( p->m_stop_thread ) ) {
  454. // they want us to stop
  455. break;
  456. }
  457. if( wait_result.is_signaled( p->m_want_data ) ) {
  458. // they want more data - read the file
  459. DWORD read_size=0;
  460. DWORD read_status=ERROR_SUCCESS;
  461. if( !ReadFile( p->m_pipe, read_buffer, p->m_read_buffer_size, &read_size, 0 ) ) {
  462. read_status=GetLastError();
  463. if( read_status!=ERROR_BROKEN_PIPE ) {
  464. p->note_thread_error( "thread_buffer_t::reader_thread: ReadFile failed", read_status, "" );
  465. break;
  466. }
  467. }
  468. // read something - append to p->m_buffers
  469. if( read_size!=0 ) {
  470. grab_mutex_t grab_mutex( p->m_mutex, p->m_wait_timeout );
  471. if( !grab_mutex.ok() ) {
  472. p->note_thread_error( "thread_buffer_t::reader_thread: wait for mutex failed", grab_mutex.error_code(), grab_mutex.error_message() );
  473. break;
  474. }
  475. p->m_buffer_list.put( read_buffer, read_size );
  476. // if buffer is too long - do not read any more until it shrinks
  477. if( p->m_buffer_list.full( p->m_buffer_limit ) ) {
  478. if( !p->m_want_data.reset() ) {
  479. p->note_thread_error( "thread_buffer_t::reader_thread: unable to reset m_want_data event", GetLastError(), "" );
  480. break;
  481. }
  482. }
  483. // tell get() we got some data
  484. if( !p->m_got_data.set() ) {
  485. p->note_thread_error( "thread_buffer_t::reader_thread: unable to set m_got_data event", GetLastError(), "" );
  486. break;
  487. }
  488. }
  489. // pipe broken - quit thread, which will be seen by get() as eof.
  490. if( read_status==ERROR_BROKEN_PIPE ) {
  491. break;
  492. }
  493. }
  494. }
  495. }catch( ... ) {
  496. // might only be std::bad_alloc
  497. p->note_thread_error( "", ERROR_SUCCESS, "thread_buffer_t::reader_thread: unknown exception caught" );
  498. }
  499. delete[] read_buffer;
  500. // ensure that get() is not left waiting on got_data
  501. p->m_got_data.set();
  502. return 0;
  503. }
  504. void thread_buffer_t::put( char * const src, std::size_t & size, bool & no_more )
  505. {
  506. if( m_direction!=dir_write ) {
  507. throw exec_stream_t::error_t( "thread_buffer_t::put: thread not started or started for reading" );
  508. }
  509. // check thread status
  510. DWORD thread_exit_code;
  511. if( !GetExitCodeThread( m_thread, &thread_exit_code ) ) {
  512. throw os_error_t( "thread_buffer_t::get: GetExitCodeThread failed" );
  513. }
  514. if( thread_exit_code!=STILL_ACTIVE ) {
  515. // thread terminated - check for errors
  516. check_error( m_message_prefix, m_error_code, m_error_message );
  517. // if terminated without error - signal eof, since no one will ever write our data
  518. size=0;
  519. no_more=true;
  520. }else {
  521. // wait for both m_want_data and m_mutex
  522. wait_result_t wait_result=wait( m_want_data, m_wait_timeout );
  523. if( !wait_result.ok() ) {
  524. check_error( "thread_buffer_t::put: wait for want_data failed", wait_result.error_code(), wait_result.error_message() );
  525. }
  526. grab_mutex_t grab_mutex( m_mutex, m_wait_timeout );
  527. if( !grab_mutex.ok() ) {
  528. check_error( "thread_buffer_t::put: wait for mutex failed", grab_mutex.error_code(), grab_mutex.error_message() );
  529. }
  530. // got them - put data
  531. no_more=false;
  532. if( m_translate_crlf ) {
  533. m_buffer_list.put_translate_crlf( src, size );
  534. }else {
  535. m_buffer_list.put( src, size );
  536. }
  537. // if the buffer is too long - make the next put() wait until it shrinks
  538. if( m_buffer_list.full( m_buffer_limit ) ) {
  539. if( !m_want_data.reset() ) {
  540. throw os_error_t( "thread_buffer_t::put: unable to reset m_want_data event" );
  541. }
  542. }
  543. // tell the thread we got data
  544. if( !m_buffer_list.empty() ) {
  545. if( !m_got_data.set() ) {
  546. throw os_error_t( "thread_buffer_t::put: unable to set m_got_data event" );
  547. }
  548. }
  549. }
  550. }
  551. DWORD WINAPI thread_buffer_t::writer_thread( LPVOID param )
  552. {
  553. thread_buffer_t * p=static_cast< thread_buffer_t * >( param );
  554. // accessing p anywhere here is safe because thread_buffer_t destructor
  555. // ensures the thread is terminated before p get destroyed
  556. try {
  557. buffer_list_t::buffer_t buffer;
  558. buffer.data=0;
  559. buffer.size=0;
  560. std::size_t buffer_offset=0;
  561. while( true ) {
  562. // wait for got_data or destruction, ignore timeout errors
  563. // for destruction the timeout is normally expected,
  564. // for got data the timeout is not normally expected but tolerable (no one wants to write)
  565. wait_result_t wait_result=wait( p->m_got_data, p->m_stop_thread, p->m_wait_timeout );
  566. if( !wait_result.ok() && !wait_result.timed_out() ) {
  567. p->note_thread_error( "thread_buffer_t::writer_thread: wait for got_data, destruction failed", wait_result.error_code(), wait_result.error_message() );
  568. break;
  569. }
  570. // if no data in local buffer to write - get from p->m_buffers
  571. if( buffer.data==0 && wait_result.is_signaled( p->m_got_data ) ) {
  572. grab_mutex_t grab_mutex( p->m_mutex, p->m_wait_timeout );
  573. if( !grab_mutex.ok() ) {
  574. p->note_thread_error( "thread_buffer_t::writer_thread: wait for mutex failed", grab_mutex.error_code(), grab_mutex.error_message() );
  575. break;
  576. }
  577. if( !p->m_buffer_list.empty() ) {
  578. // we've got buffer - detach it
  579. buffer=p->m_buffer_list.detach();
  580. buffer_offset=0;
  581. }
  582. // if no data left in p->m_buffers - wait until it arrives
  583. if( p->m_buffer_list.empty() ) {
  584. if( !p->m_got_data.reset() ) {
  585. p->note_thread_error( "thread_buffer_t::writer_thread: unable to reset m_got_data event", GetLastError(), "" );
  586. break;
  587. }
  588. }
  589. // if buffer is not too long - tell put() it can proceed
  590. if( !p->m_buffer_list.full( p->m_buffer_limit ) ) {
  591. if( !p->m_want_data.set() ) {
  592. p->note_thread_error( "thread_buffer_t::writer_thread: unable to set m_want_data event", GetLastError(), "" );
  593. break;
  594. }
  595. }
  596. }
  597. // see if they want us to stop, but only when all is written
  598. if( buffer.data==0 && wait_result.is_signaled( p->m_stop_thread ) ) {
  599. break;
  600. }
  601. if( buffer.data!=0 ) {
  602. // we have buffer - write it
  603. DWORD written_size;
  604. if( !WriteFile( p->m_pipe, buffer.data+buffer_offset, buffer.size-buffer_offset, &written_size, 0 ) ) {
  605. p->note_thread_error( "thread_buffer_t::writer_thread: WriteFile failed", GetLastError(), "" );
  606. break;
  607. }
  608. buffer_offset+=written_size;
  609. if( buffer_offset==buffer.size ) {
  610. delete[] buffer.data;
  611. buffer.data=0;
  612. }
  613. }
  614. }
  615. // we won't be writing any more - close child's stdin
  616. CloseHandle( p->m_pipe );
  617. // buffer may be left astray - clean up
  618. delete[] buffer.data;
  619. }catch( ... ) {
  620. // unreachable code. really.
  621. p->note_thread_error( "", ERROR_SUCCESS, "thread_buffer_t::writer_thread: unknown exception caught" );
  622. }
  623. // ensure that put() is not left waiting on m_want_data
  624. p->m_want_data.set();
  625. return 0;
  626. }
  627. void thread_buffer_t::check_error( std::string const & message_prefix, DWORD error_code, std::string const & error_message )
  628. {
  629. if( !error_message.empty() ) {
  630. throw exec_stream_t::error_t( message_prefix+"\n"+error_message, error_code );
  631. }else if( error_code!=ERROR_SUCCESS ) {
  632. throw os_error_t( message_prefix, error_code );
  633. }
  634. }
  635. void thread_buffer_t::note_thread_error( char const * message_prefix, DWORD error_code, char const * error_message )
  636. {
  637. m_message_prefix=message_prefix;
  638. m_error_code=error_code;
  639. m_error_message=error_message;
  640. }