exec-stream-helpers.cpp 28 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845
  1. /*
  2. Copyright (C) 2004 Artem Khodush
  3. Redistribution and use in source and binary forms, with or without modification,
  4. are permitted provided that the following conditions are met:
  5. 1. Redistributions of source code must retain the above copyright notice,
  6. this list of conditions and the following disclaimer.
  7. 2. Redistributions in binary form must reproduce the above copyright notice,
  8. this list of conditions and the following disclaimer in the documentation
  9. and/or other materials provided with the distribution.
  10. 3. The name of the author may not be used to endorse or promote products
  11. derived from this software without specific prior written permission.
  12. THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED
  13. WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
  14. OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
  15. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
  16. SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
  17. PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
  18. OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
  19. WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
  20. OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE,
  21. EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
  22. */
  23. // os_error_t
  24. os_error_t::os_error_t( std::string const & msg )
  25. {
  26. compose( msg, errno );
  27. }
  28. os_error_t::os_error_t( std::string const & msg, exec_stream_t::error_code_t code )
  29. {
  30. compose( msg, code );
  31. }
  32. void os_error_t::compose( std::string const & msg, exec_stream_t::error_code_t code )
  33. {
  34. std::string s( msg );
  35. s+='\n';
  36. errno=0;
  37. char const * x=strerror( code );
  38. if( errno!=0 ) {
  39. s+="[unable to retrieve error description]";
  40. }else {
  41. s+=x;
  42. }
  43. exec_stream_t::error_t::compose( s, code );
  44. }
  45. // pipe_t
  46. pipe_t::pipe_t()
  47. : m_direction( closed )
  48. {
  49. m_fds[0]=-1;
  50. m_fds[1]=-1;
  51. }
  52. pipe_t::~pipe_t()
  53. {
  54. try {
  55. close();
  56. }catch(...) {
  57. }
  58. }
  59. int pipe_t::r() const
  60. {
  61. return m_fds[0];
  62. }
  63. int pipe_t::w() const
  64. {
  65. return m_fds[1];
  66. }
  67. void pipe_t::close_r()
  68. {
  69. if( m_direction==both || m_direction==read ) {
  70. if( ::close( m_fds[0] )==-1 ) {
  71. throw os_error_t( "pipe_t::close_r: close failed" );
  72. }
  73. m_direction= m_direction==both ? write : closed;
  74. }
  75. }
  76. void pipe_t::close_w()
  77. {
  78. if( m_direction==both || m_direction==write ) {
  79. if( ::close( m_fds[1] )==-1 ) {
  80. throw os_error_t( "pipe_t::close_w: close failed" );
  81. }
  82. m_direction= m_direction==both ? read : closed;
  83. }
  84. }
  85. void pipe_t::close()
  86. {
  87. close_r();
  88. close_w();
  89. }
  90. void pipe_t::open()
  91. {
  92. close();
  93. if( pipe( m_fds )==-1 ) {
  94. throw os_error_t( "pipe_t::open(): pipe() failed" );
  95. }
  96. m_direction=both;
  97. }
  98. // mutex_t
  99. mutex_t::mutex_t()
  100. {
  101. if( int code=pthread_mutex_init( &m_mutex, 0 ) ) {
  102. throw os_error_t( "mutex_t::mutex_t: pthread_mutex_init failed", code );
  103. }
  104. }
  105. mutex_t::~mutex_t()
  106. {
  107. pthread_mutex_destroy( &m_mutex );
  108. }
  109. // grab_mutex_t
  110. grab_mutex_t::grab_mutex_t( mutex_t & mutex, mutex_registrator_t * mutex_registrator )
  111. {
  112. m_mutex=&mutex.m_mutex;
  113. m_error_code=pthread_mutex_lock( m_mutex );
  114. m_grabbed=ok();
  115. m_mutex_registrator=mutex_registrator;
  116. if( m_mutex_registrator ) {
  117. m_mutex_registrator->add( this );
  118. }
  119. }
  120. grab_mutex_t::~grab_mutex_t()
  121. {
  122. release();
  123. if( m_mutex_registrator ) {
  124. m_mutex_registrator->remove( this );
  125. }
  126. }
  127. int grab_mutex_t::release()
  128. {
  129. int code=0;
  130. if( m_grabbed ) {
  131. code=pthread_mutex_unlock( m_mutex );
  132. m_grabbed=false;
  133. }
  134. return code;
  135. }
  136. bool grab_mutex_t::ok()
  137. {
  138. return m_error_code==0;
  139. }
  140. int grab_mutex_t::error_code()
  141. {
  142. return m_error_code;
  143. }
  144. // mutex_registrator_t
  145. mutex_registrator_t::~mutex_registrator_t()
  146. {
  147. for( mutexes_t::iterator i=m_mutexes.begin(); i!=m_mutexes.end(); ++i ) {
  148. (*i)->m_mutex_registrator=0;
  149. }
  150. }
  151. void mutex_registrator_t::add( grab_mutex_t * g )
  152. {
  153. m_mutexes.insert( m_mutexes.end(), g );
  154. }
  155. void mutex_registrator_t::remove( grab_mutex_t * g )
  156. {
  157. m_mutexes.erase( std::find( m_mutexes.begin(), m_mutexes.end(), g ) );
  158. }
  159. void mutex_registrator_t::release_all()
  160. {
  161. for( mutexes_t::iterator i=m_mutexes.begin(); i!=m_mutexes.end(); ++i ) {
  162. (*i)->release();
  163. }
  164. }
  165. // wait_result_t
  166. wait_result_t::wait_result_t( unsigned signaled_state, int error_code, bool timed_out )
  167. {
  168. m_timed_out=timed_out;
  169. m_error_code=error_code;
  170. m_signaled_state= error_code==0 ? signaled_state : 0;
  171. }
  172. bool wait_result_t::ok()
  173. {
  174. return m_error_code==0;
  175. }
  176. bool wait_result_t::is_signaled( int state )
  177. {
  178. return m_signaled_state&state;
  179. }
  180. int wait_result_t::error_code()
  181. {
  182. return m_error_code;
  183. }
  184. bool wait_result_t::timed_out()
  185. {
  186. return m_timed_out;
  187. }
  188. // event_t
  189. event_t::event_t()
  190. {
  191. if( int code=pthread_cond_init( &m_cond, 0 ) ) {
  192. throw os_error_t( "event_t::event_t: pthread_cond_init failed", code );
  193. }
  194. m_state=0;
  195. }
  196. event_t::~event_t()
  197. {
  198. pthread_cond_destroy( &m_cond );
  199. }
  200. int event_t::set( unsigned bits, mutex_registrator_t * mutex_registrator )
  201. {
  202. grab_mutex_t grab_mutex( m_mutex, mutex_registrator );
  203. if( !grab_mutex.ok() ) {
  204. return grab_mutex.error_code();
  205. }
  206. int code=0;
  207. if( bits&~m_state ) {
  208. m_state|=bits;
  209. code=pthread_cond_broadcast( &m_cond );
  210. }
  211. int release_code=grab_mutex.release();
  212. if( code==0 ) {
  213. code=release_code;
  214. }
  215. return code;
  216. }
  217. int event_t::reset( unsigned bits, mutex_registrator_t * mutex_registrator )
  218. {
  219. grab_mutex_t grab_mutex( m_mutex, mutex_registrator );
  220. if( !grab_mutex.ok() ) {
  221. return grab_mutex.error_code();
  222. }
  223. m_state&=~bits;
  224. return grab_mutex.release();
  225. }
  226. wait_result_t event_t::wait( unsigned any_bits, unsigned long timeout, mutex_registrator_t * mutex_registrator )
  227. {
  228. if( any_bits==0 ) {
  229. // we ain't waiting for anything
  230. return wait_result_t( 0, 0, false );
  231. }
  232. grab_mutex_t grab_mutex( m_mutex, mutex_registrator );
  233. if( !grab_mutex.ok() ) {
  234. return wait_result_t( 0, grab_mutex.error_code(), false );
  235. }
  236. struct timeval time_val_limit;
  237. gettimeofday( &time_val_limit, 0 );
  238. struct timespec time_limit;
  239. time_limit.tv_sec=time_val_limit.tv_sec+timeout/1000;
  240. time_limit.tv_nsec=1000*(time_val_limit.tv_usec+1000*(timeout%1000));
  241. int code=0;
  242. while( code==0 && (m_state&any_bits)==0 ) {
  243. code=pthread_cond_timedwait( &m_cond, &m_mutex.m_mutex, &time_limit );
  244. }
  245. unsigned state=m_state;
  246. int release_code=grab_mutex.release();
  247. if( code==0 ) {
  248. code=release_code;
  249. }
  250. return wait_result_t( state, code, code==ETIMEDOUT );
  251. }
  252. // thread_buffer_t
  253. thread_buffer_t::thread_buffer_t( pipe_t & in_pipe, pipe_t & out_pipe, pipe_t & err_pipe, std::ostream & in )
  254. : m_in_pipe( in_pipe ), m_out_pipe( out_pipe ), m_err_pipe( err_pipe ), m_in( in )
  255. {
  256. m_in_bad=false;
  257. m_error_prefix="";
  258. m_error_code=0;
  259. m_in_wait_timeout=2000;
  260. m_out_wait_timeout=2000;
  261. m_err_wait_timeout=2000;
  262. m_thread_termination_timeout=1000;
  263. m_in_buffer_limit=0;
  264. m_out_buffer_limit=0;
  265. m_err_buffer_limit=0;
  266. m_out_read_buffer_size=4096;
  267. m_err_read_buffer_size=4096;
  268. m_thread_started=false;
  269. m_in_closed=false;
  270. }
  271. thread_buffer_t::~thread_buffer_t()
  272. {
  273. bool stopped=false;
  274. try {
  275. stopped=stop_thread();
  276. }catch( ... ) {
  277. }
  278. if( !stopped ) {
  279. try {
  280. stopped=abort_thread();
  281. }catch( ... ) {
  282. }
  283. }
  284. if( !stopped ) {
  285. std::terminate();
  286. }
  287. }
  288. void thread_buffer_t::set_wait_timeout( int stream_kind, unsigned long milliseconds )
  289. {
  290. if( m_thread_started ) {
  291. throw exec_stream_t::error_t( "thread_buffer_t::set_wait_timeout: thread already started" );
  292. }
  293. if( stream_kind&exec_stream_t::s_in ) {
  294. m_in_wait_timeout=milliseconds;
  295. }
  296. if( stream_kind&exec_stream_t::s_out ) {
  297. m_out_wait_timeout=milliseconds;
  298. }
  299. if( stream_kind&exec_stream_t::s_err ) {
  300. m_err_wait_timeout=milliseconds;
  301. }
  302. if( stream_kind&exec_stream_t::s_child ) {
  303. m_thread_termination_timeout=milliseconds;
  304. }
  305. }
  306. void thread_buffer_t::set_buffer_limit( int stream_kind, std::size_t limit )
  307. {
  308. if( m_thread_started ) {
  309. throw exec_stream_t::error_t( "thread_buffer_t::set_buffer_limit: thread already started" );
  310. }
  311. if( stream_kind&exec_stream_t::s_in ) {
  312. m_in_buffer_limit=limit;
  313. }
  314. if( stream_kind&exec_stream_t::s_out ) {
  315. m_out_buffer_limit=limit;
  316. }
  317. if( stream_kind&exec_stream_t::s_err ) {
  318. m_err_buffer_limit=limit;
  319. }
  320. }
  321. void thread_buffer_t::set_read_buffer_size( int stream_kind, std::size_t size )
  322. {
  323. if( m_thread_started ) {
  324. throw exec_stream_t::error_t( "thread_buffer_t::set_read_buffer_size: thread already started" );
  325. }
  326. if( stream_kind&exec_stream_t::s_out ) {
  327. m_out_read_buffer_size=size;
  328. }
  329. if( stream_kind&exec_stream_t::s_err ) {
  330. m_err_read_buffer_size=size;
  331. }
  332. }
  333. void thread_buffer_t::start()
  334. {
  335. if( m_thread_started ) {
  336. throw exec_stream_t::error_t( "thread_buffer_t::start: thread already started" );
  337. }
  338. m_in_buffer.clear();
  339. m_out_buffer.clear();
  340. m_err_buffer.clear();
  341. int code;
  342. if( (code=m_thread_control.reset( ~0u, 0 )) || (code=m_thread_control.set( exec_stream_t::s_out|exec_stream_t::s_err, 0 ) ) ) {
  343. throw os_error_t( "thread_buffer_t::start: unable to initialize m_thread_control event", code );
  344. }
  345. if( (code=m_thread_responce.reset( ~0u, 0 )) || (code=m_thread_responce.set( exec_stream_t::s_in, 0 )) ) {
  346. throw os_error_t( "thread_buffer_t::start: unable to initialize m_thread_responce event", code );
  347. }
  348. m_error_prefix="";
  349. m_error_code=0;
  350. if( int code=pthread_create( &m_thread, 0, &thread_func, this ) ) {
  351. throw os_error_t( "exec_stream_therad_t::start: pthread_create failed", code );
  352. }
  353. m_thread_started=true;
  354. m_in_closed=false;
  355. m_in_bad=false;
  356. }
  357. bool thread_buffer_t::stop_thread()
  358. {
  359. if( m_thread_started ) {
  360. if( int code=m_thread_control.set( exec_stream_t::s_child, 0 ) ) {
  361. throw os_error_t( "thread_buffer_t::stop_thread: unable to set thread termination event", code );
  362. }
  363. wait_result_t wait_result=m_thread_responce.wait( exec_stream_t::s_child, m_thread_termination_timeout, 0 );
  364. if( !wait_result.ok() && !wait_result.timed_out() ) {
  365. throw os_error_t( "thread_buffer_t::stop_thread: wait for m_thread_stopped failed", wait_result.error_code() );
  366. }
  367. if( wait_result.ok() ) {
  368. void * thread_result;
  369. if( int code=pthread_join( m_thread, &thread_result ) ) {
  370. throw os_error_t( "thread_buffer_t::stop_thread: pthread_join failed", code );
  371. }
  372. m_thread_started=false;
  373. // check for any errors encountered in the thread
  374. if( m_error_code!=0 ) {
  375. throw os_error_t( m_error_prefix, m_error_code );
  376. }
  377. return true;
  378. }else {
  379. return false;
  380. }
  381. }
  382. return true;
  383. }
  384. bool thread_buffer_t::abort_thread()
  385. {
  386. if( m_thread_started ) {
  387. if( int code=pthread_cancel( m_thread ) ) {
  388. throw os_error_t( "thread_buffer_t::abort_thread: pthread_cancel failed", code );
  389. }
  390. void * thread_result;
  391. if( int code=pthread_join( m_thread, &thread_result ) ) {
  392. throw os_error_t( "thread_buffer_t::stop_thread: pthread_join failed", code );
  393. }
  394. m_thread_started=false;
  395. }
  396. return true;
  397. }
  398. const int s_in_eof=16;
  399. const int s_out_eof=32;
  400. const int s_err_eof=64;
  401. void thread_buffer_t::get( exec_stream_t::stream_kind_t kind, char * dst, std::size_t & size, bool & no_more )
  402. {
  403. if( !m_thread_started ) {
  404. throw exec_stream_t::error_t( "thread_buffer_t::get: thread was not started" );
  405. }
  406. unsigned long timeout= kind==exec_stream_t::s_out ? m_out_wait_timeout : m_err_wait_timeout;
  407. int eof_kind= kind==exec_stream_t::s_out ? s_out_eof : s_err_eof;
  408. buffer_list_t & buffer= kind==exec_stream_t::s_out ? m_out_buffer : m_err_buffer;
  409. wait_result_t wait_result=m_thread_responce.wait( kind|exec_stream_t::s_child|eof_kind, timeout, 0 );
  410. if( !wait_result.ok() ) {
  411. throw os_error_t( "thread_buffer_t::get: wait for got_data failed", wait_result.error_code() );
  412. }
  413. if( wait_result.is_signaled( exec_stream_t::s_child ) ) {
  414. // thread stopped - no need to synchronize
  415. if( !buffer.empty() ) {
  416. // we have data - deliver it first
  417. // when thread terminated, there is no need to synchronize
  418. buffer.get( dst, size );
  419. no_more=false;
  420. }else {
  421. // thread terminated and we have no more data to return - report errors, if any
  422. if( m_error_code!=0 ) {
  423. throw os_error_t( m_error_prefix, m_error_code );
  424. }
  425. // if terminated without error - signal eof
  426. size=0;
  427. no_more=true;
  428. }
  429. }else if( wait_result.is_signaled( kind|eof_kind ) ) {
  430. // thread got some data for us - grab them
  431. grab_mutex_t grab_mutex( m_mutex, 0 );
  432. if( !grab_mutex.ok() ) {
  433. throw os_error_t( "thread_buffer_t::get: wait for mutex failed", grab_mutex.error_code() );
  434. }
  435. if( !buffer.empty() ) {
  436. buffer.get( dst, size );
  437. no_more=false;
  438. }else {
  439. size=0;
  440. no_more=wait_result.is_signaled( eof_kind );
  441. }
  442. // if no data left - make the next get() wait until it arrives
  443. if( buffer.empty() ) {
  444. if( int code=m_thread_responce.reset( kind, 0 ) ) {
  445. throw os_error_t( "thread_buffer_t::get: unable to reset got_data event", code );
  446. }
  447. }
  448. // if buffer is not too long tell the thread we want more data
  449. std::size_t buffer_limit= kind==exec_stream_t::s_out ? m_out_buffer_limit : m_err_buffer_limit;
  450. if( !buffer.full( buffer_limit ) ) {
  451. if( int code=m_thread_control.set( kind, 0 ) ) {
  452. throw os_error_t( "thread_buffer_t::get: unable to set want_data event", code );
  453. }
  454. }
  455. }
  456. no_more=false;
  457. }
  458. void thread_buffer_t::put( char * src, std::size_t & size, bool & no_more )
  459. {
  460. if( !m_thread_started ) {
  461. throw exec_stream_t::error_t( "thread_buffer_t::put: thread was not started" );
  462. }
  463. if( m_in_closed || m_in_bad ) {
  464. size=0;
  465. no_more=true;
  466. return;
  467. }
  468. // wait for both m_want_data and m_mutex
  469. wait_result_t wait_result=m_thread_responce.wait( exec_stream_t::s_in|exec_stream_t::s_child, m_in_wait_timeout, 0 );
  470. if( !wait_result.ok() ) {
  471. // workaround for versions of libstdc++ (at least in gcc 3.1 pre) that do not intercept exceptions in operator<<( std::ostream, std::string )
  472. m_in_bad=true;
  473. if( m_in.exceptions()&std::ios_base::badbit ) {
  474. throw os_error_t( "thread_buffer_t::put: wait for want_data failed", wait_result.error_code() );
  475. }else {
  476. m_in.setstate( std::ios_base::badbit );
  477. size=0;
  478. no_more=true;
  479. return;
  480. }
  481. }
  482. if( wait_result.is_signaled( exec_stream_t::s_child ) ) {
  483. // thread stopped - check for errors
  484. if( m_error_code!=0 ) {
  485. throw os_error_t( m_error_prefix, m_error_code );
  486. }
  487. // if terminated without error - signal eof, since no one will ever write our data
  488. size=0;
  489. no_more=true;
  490. }else if( wait_result.is_signaled( exec_stream_t::s_in ) ) {
  491. // thread wants some data from us - stuff them
  492. grab_mutex_t grab_mutex( m_mutex, 0 );
  493. if( !grab_mutex.ok() ) {
  494. throw os_error_t( "thread_buffer_t::put: wait for mutex failed", grab_mutex.error_code() );
  495. }
  496. no_more=false;
  497. m_in_buffer.put( src, size );
  498. // if the buffer is too long - make the next put() wait until it shrinks
  499. if( m_in_buffer.full( m_in_buffer_limit ) ) {
  500. if( int code=m_thread_responce.reset( exec_stream_t::s_in, 0 ) ) {
  501. throw os_error_t( "thread_buffer_t::put: unable to reset want_data event", code );
  502. }
  503. }
  504. // tell the thread we got data
  505. if( !m_in_buffer.empty() ) {
  506. if( int code=m_thread_control.set( exec_stream_t::s_in, 0 ) ) {
  507. throw os_error_t( "thread_buffer_t::put: unable to set got_data event", code );
  508. }
  509. }
  510. }
  511. no_more=false;
  512. }
  513. void thread_buffer_t::close_in()
  514. {
  515. if( !m_in_bad ) {
  516. m_in.flush();
  517. }
  518. if( m_thread_started ) {
  519. if( int code=m_thread_control.set( s_in_eof, 0 ) ) {
  520. throw os_error_t( "thread_buffer_t::close_in: unable to set in_got_data event", code );
  521. }
  522. m_in_closed=true;
  523. }
  524. }
  525. void mutex_cleanup( void * p )
  526. {
  527. static_cast< mutex_registrator_t * >( p )->release_all();
  528. }
  529. void * thread_buffer_t::thread_func( void * param )
  530. {
  531. thread_buffer_t * p=static_cast< thread_buffer_t * >( param );
  532. // accessing p anywhere here is safe because thread_buffer_t destructor
  533. // ensures the thread is terminated before p get destroyed
  534. char * out_read_buffer=0;
  535. char * err_read_buffer=0;
  536. bool in_eof=false;
  537. bool in_closed=false;
  538. bool out_eof=false;
  539. bool err_eof=false;
  540. mutex_registrator_t mutex_registrator;
  541. pthread_cleanup_push( mutex_cleanup, &mutex_registrator );
  542. try {
  543. out_read_buffer=new char[p->m_out_read_buffer_size];
  544. err_read_buffer=new char[p->m_err_read_buffer_size];
  545. buffer_list_t::buffer_t write_buffer;
  546. write_buffer.data=0;
  547. write_buffer.size=0;
  548. std::size_t write_buffer_offset=0;
  549. unsigned long timeout=std::max( p->m_in_wait_timeout, std::max( p->m_out_wait_timeout, p->m_err_wait_timeout ) );
  550. fd_set read_fds;
  551. FD_ZERO( &read_fds );
  552. fd_set write_fds;
  553. FD_ZERO( &write_fds );
  554. while( true ) {
  555. unsigned wait_for=exec_stream_t::s_child;
  556. if( !in_eof && write_buffer.data==0 ) {
  557. wait_for|=exec_stream_t::s_in|s_in_eof;
  558. }
  559. if( !out_eof ) {
  560. wait_for|=exec_stream_t::s_out;
  561. }
  562. if( !err_eof ) {
  563. wait_for|=exec_stream_t::s_err;
  564. }
  565. wait_result_t wait_result=p->m_thread_control.wait( wait_for, timeout, &mutex_registrator );
  566. if( !wait_result.ok() && !wait_result.timed_out() ) {
  567. p->m_error_code=wait_result.error_code();
  568. p->m_error_prefix="thread_buffer_t::thread_func: wait for thread_event failed";
  569. break;
  570. }
  571. // we need more data - get from p->m_buffers
  572. if( write_buffer.data==0 && wait_result.is_signaled( exec_stream_t::s_in|s_in_eof ) ) {
  573. grab_mutex_t grab_mutex( p->m_mutex, &mutex_registrator );
  574. if( !grab_mutex.ok() ) {
  575. p->m_error_code=grab_mutex.error_code();
  576. p->m_error_prefix="thread_buffer_t::thread_func: wait for mutex failed";
  577. break;
  578. }
  579. if( p->m_in_buffer.empty() ) {
  580. // we have empty write_buffer, empty p->m_in_buffer and we are told it will stay so - time to close child's stdin
  581. if( wait_result.is_signaled( s_in_eof ) ) {
  582. in_eof=true;
  583. }
  584. }
  585. if( !p->m_in_buffer.empty() ) {
  586. // we've got buffer - detach it
  587. write_buffer=p->m_in_buffer.detach();
  588. write_buffer_offset=0;
  589. }
  590. // if no data left in p->m_in_buffer - wait until it arrives
  591. if( p->m_in_buffer.empty() ) {
  592. // if no data for us - stop trying to get it until we are told it arrived
  593. if( int code=p->m_thread_control.reset( exec_stream_t::s_in, &mutex_registrator ) ) {
  594. p->m_error_code=code;
  595. p->m_error_prefix="thread_buffer_t::thread_func: unable to reset thread_event (s_in)";
  596. break;
  597. }
  598. }
  599. // if buffer is not too long - tell put() it can proceed
  600. if( !p->m_in_buffer.full( p->m_in_buffer_limit ) ) {
  601. if( int code=p->m_thread_responce.set( exec_stream_t::s_in, &mutex_registrator ) ) {
  602. p->m_error_code=code;
  603. p->m_error_prefix="thread_buffer_t::thread_func: unable to set in_want_data event";
  604. break;
  605. }
  606. }
  607. }
  608. if( in_eof && write_buffer.data==0 ) {
  609. p->m_in_pipe.close();
  610. in_closed=true;
  611. }
  612. // see if they want us to stop, but only when there is nothing more to write
  613. if( write_buffer.data==0 && wait_result.is_signaled( exec_stream_t::s_child ) ) {
  614. break;
  615. }
  616. // determine whether we want something
  617. if( write_buffer.data!=0 ) {
  618. FD_SET( p->m_in_pipe.w(), &write_fds );
  619. }else {
  620. FD_CLR( p->m_in_pipe.w(), &write_fds );
  621. }
  622. if( !out_eof && wait_result.is_signaled( exec_stream_t::s_out ) ) {
  623. FD_SET( p->m_out_pipe.r(), &read_fds );
  624. }else {
  625. FD_CLR( p->m_out_pipe.r(), &read_fds );
  626. }
  627. if( !err_eof && wait_result.is_signaled( exec_stream_t::s_err ) ) {
  628. FD_SET( p->m_err_pipe.r(), &read_fds );
  629. }else {
  630. FD_CLR( p->m_err_pipe.r(), &read_fds );
  631. }
  632. if( FD_ISSET( p->m_in_pipe.w(), &write_fds ) || FD_ISSET( p->m_out_pipe.r(), &read_fds ) || FD_ISSET( p->m_err_pipe.r(), &read_fds ) ) {
  633. // we want something - get it
  634. struct timeval select_timeout;
  635. select_timeout.tv_sec=0;
  636. select_timeout.tv_usec=100000;
  637. int nfds=std::max( p->m_in_pipe.w(), std::max( p->m_out_pipe.r(), p->m_err_pipe.r() ) )+1;
  638. if( select( nfds, &read_fds, &write_fds, 0, &select_timeout )==-1 ) {
  639. p->m_error_code=errno;
  640. p->m_error_prefix="thread_buffer_t::thread_func: select failed";
  641. break;
  642. }
  643. }
  644. // determine what we got
  645. if( FD_ISSET( p->m_in_pipe.w(), &write_fds ) ) {
  646. // it seems we may write to child's stdin
  647. int n_written=write( p->m_in_pipe.w(), write_buffer.data+write_buffer_offset, write_buffer.size-write_buffer_offset );
  648. if( n_written==-1 ) {
  649. if( errno!=EAGAIN ) {
  650. p->m_error_code=errno;
  651. p->m_error_prefix="thread_buffer_t::thread_func: write to child stdin failed";
  652. break;
  653. }
  654. }else {
  655. write_buffer_offset+=n_written;
  656. if( write_buffer_offset==write_buffer.size ) {
  657. delete[] write_buffer.data;
  658. write_buffer.data=0;
  659. write_buffer.size=0;
  660. }
  661. }
  662. }
  663. if( FD_ISSET( p->m_out_pipe.r(), &read_fds ) ) {
  664. // it seems we may read child's stdout
  665. int n_out_read=read( p->m_out_pipe.r(), out_read_buffer, p->m_out_read_buffer_size );
  666. if( n_out_read==-1 ) {
  667. if( errno!=EAGAIN ) {
  668. p->m_error_code=errno;
  669. p->m_error_prefix="exec_stream_t::thread_func: read from child stdout failed";
  670. break;
  671. }
  672. }else {
  673. grab_mutex_t grab_mutex( p->m_mutex, &mutex_registrator );
  674. if( n_out_read!=0 ) {
  675. p->m_out_buffer.put( out_read_buffer, n_out_read );
  676. // if buffer is full - stop reading
  677. if( p->m_out_buffer.full( p->m_out_buffer_limit ) ) {
  678. if( int code=p->m_thread_control.reset( exec_stream_t::s_out, &mutex_registrator ) ) {
  679. p->m_error_code=code;
  680. p->m_error_prefix="exec_stream_t::thread_func: unable to reset m_out_want_data event";
  681. break;
  682. }
  683. }
  684. }
  685. unsigned responce=exec_stream_t::s_out;
  686. if( n_out_read==0 ) { // EOF when read 0 bytes while select told that it's ready
  687. out_eof=true;
  688. responce|=s_out_eof;
  689. }
  690. // we got either data or eof - tell always
  691. if( int code=p->m_thread_responce.set( responce, &mutex_registrator ) ) {
  692. p->m_error_code=code;
  693. p->m_error_prefix="exec_stream_t::thread_func: unable to set out_got_data event";
  694. break;
  695. }
  696. }
  697. }
  698. if( FD_ISSET( p->m_err_pipe.r(), &read_fds ) ) {
  699. // it seemds we may read child's stderr
  700. int n_err_read=read( p->m_err_pipe.r(), err_read_buffer, p->m_err_read_buffer_size );
  701. if( n_err_read==-1 ) {
  702. if( errno!=EAGAIN ) {
  703. p->m_error_code=errno;
  704. p->m_error_prefix="exec_stream_t::thread_func: read from child stderr failed";
  705. break;
  706. }
  707. }else {
  708. grab_mutex_t grab_mutex( p->m_mutex, &mutex_registrator );
  709. if( n_err_read!=0 ) {
  710. p->m_err_buffer.put( err_read_buffer, n_err_read );
  711. // if buffer is full - stop reading
  712. if( p->m_err_buffer.full( p->m_err_buffer_limit ) ) {
  713. if( int code=p->m_thread_control.reset( exec_stream_t::s_err, &mutex_registrator ) ) {
  714. p->m_error_code=code;
  715. p->m_error_prefix="exec_stream_t::thread_func: unable to reset m_err_want_data event";
  716. break;
  717. }
  718. }
  719. }
  720. unsigned responce=exec_stream_t::s_err;
  721. if( n_err_read==0 ) {
  722. err_eof=true;
  723. responce|=s_err_eof;
  724. }
  725. // we got either data or eof - tell always
  726. if( int code=p->m_thread_responce.set( responce, &mutex_registrator ) ) {
  727. p->m_error_code=code;
  728. p->m_error_prefix="exec_stream_t::thread_func: unable to set err_got_data event";
  729. break;
  730. }
  731. }
  732. }
  733. if( in_closed && out_eof && err_eof ) {
  734. // have nothing more to do
  735. break;
  736. }
  737. }
  738. delete[] write_buffer.data;
  739. }catch( ... ) {
  740. // might only be std::bad_alloc
  741. p->m_error_code=0;
  742. p->m_error_prefix="thread_buffer_t::writer_thread: exception caught";
  743. }
  744. delete[] out_read_buffer;
  745. delete[] err_read_buffer;
  746. // tell everyone that we've stopped, so that get() and put() will be unblocked
  747. if( int code=p->m_thread_responce.set( exec_stream_t::s_child, &mutex_registrator ) ) {
  748. p->m_error_code=code;
  749. p->m_error_prefix="exec_stream_t::thread_func: unable to set thread_stopped event";
  750. }
  751. pthread_cleanup_pop( 0 );
  752. return 0;
  753. }