stream_encoder_mt.c 30 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144
  1. ///////////////////////////////////////////////////////////////////////////////
  2. //
  3. /// \file stream_encoder_mt.c
  4. /// \brief Multithreaded .xz Stream encoder
  5. //
  6. // Author: Lasse Collin
  7. //
  8. // This file has been put into the public domain.
  9. // You can do whatever you want with this file.
  10. //
  11. ///////////////////////////////////////////////////////////////////////////////
  12. #include "filter_encoder.h"
  13. #include "easy_preset.h"
  14. #include "block_encoder.h"
  15. #include "block_buffer_encoder.h"
  16. #include "index_encoder.h"
  17. #include "outqueue.h"
  18. /// Maximum supported block size. This makes it simpler to prevent integer
  19. /// overflows if we are given unusually large block size.
  20. #define BLOCK_SIZE_MAX (UINT64_MAX / LZMA_THREADS_MAX)
  21. typedef enum {
  22. /// Waiting for work.
  23. THR_IDLE,
  24. /// Encoding is in progress.
  25. THR_RUN,
  26. /// Encoding is in progress but no more input data will
  27. /// be read.
  28. THR_FINISH,
  29. /// The main thread wants the thread to stop whatever it was doing
  30. /// but not exit.
  31. THR_STOP,
  32. /// The main thread wants the thread to exit. We could use
  33. /// cancellation but since there's stopped anyway, this is lazier.
  34. THR_EXIT,
  35. } worker_state;
  36. typedef struct lzma_stream_coder_s lzma_stream_coder;
  37. typedef struct worker_thread_s worker_thread;
  38. struct worker_thread_s {
  39. worker_state state;
  40. /// Input buffer of coder->block_size bytes. The main thread will
  41. /// put new input into this and update in_size accordingly. Once
  42. /// no more input is coming, state will be set to THR_FINISH.
  43. uint8_t *in;
  44. /// Amount of data available in the input buffer. This is modified
  45. /// only by the main thread.
  46. size_t in_size;
  47. /// Output buffer for this thread. This is set by the main
  48. /// thread every time a new Block is started with this thread
  49. /// structure.
  50. lzma_outbuf *outbuf;
  51. /// Pointer to the main structure is needed when putting this
  52. /// thread back to the stack of free threads.
  53. lzma_stream_coder *coder;
  54. /// The allocator is set by the main thread. Since a copy of the
  55. /// pointer is kept here, the application must not change the
  56. /// allocator before calling lzma_end().
  57. const lzma_allocator *allocator;
  58. /// Amount of uncompressed data that has already been compressed.
  59. uint64_t progress_in;
  60. /// Amount of compressed data that is ready.
  61. uint64_t progress_out;
  62. /// Block encoder
  63. lzma_next_coder block_encoder;
  64. /// Compression options for this Block
  65. lzma_block block_options;
  66. /// Next structure in the stack of free worker threads.
  67. worker_thread *next;
  68. mythread_mutex mutex;
  69. mythread_cond cond;
  70. /// The ID of this thread is used to join the thread
  71. /// when it's not needed anymore.
  72. mythread thread_id;
  73. };
  74. struct lzma_stream_coder_s {
  75. enum {
  76. SEQ_STREAM_HEADER,
  77. SEQ_BLOCK,
  78. SEQ_INDEX,
  79. SEQ_STREAM_FOOTER,
  80. } sequence;
  81. /// Start a new Block every block_size bytes of input unless
  82. /// LZMA_FULL_FLUSH or LZMA_FULL_BARRIER is used earlier.
  83. size_t block_size;
  84. /// The filter chain currently in use
  85. lzma_filter filters[LZMA_FILTERS_MAX + 1];
  86. /// Index to hold sizes of the Blocks
  87. lzma_index *index;
  88. /// Index encoder
  89. lzma_next_coder index_encoder;
  90. /// Stream Flags for encoding the Stream Header and Stream Footer.
  91. lzma_stream_flags stream_flags;
  92. /// Buffer to hold Stream Header and Stream Footer.
  93. uint8_t header[LZMA_STREAM_HEADER_SIZE];
  94. /// Read position in header[]
  95. size_t header_pos;
  96. /// Output buffer queue for compressed data
  97. lzma_outq outq;
  98. /// Maximum wait time if cannot use all the input and cannot
  99. /// fill the output buffer. This is in milliseconds.
  100. uint32_t timeout;
  101. /// Error code from a worker thread
  102. lzma_ret thread_error;
  103. /// Array of allocated thread-specific structures
  104. worker_thread *threads;
  105. /// Number of structures in "threads" above. This is also the
  106. /// number of threads that will be created at maximum.
  107. uint32_t threads_max;
  108. /// Number of thread structures that have been initialized, and
  109. /// thus the number of worker threads actually created so far.
  110. uint32_t threads_initialized;
  111. /// Stack of free threads. When a thread finishes, it puts itself
  112. /// back into this stack. This starts as empty because threads
  113. /// are created only when actually needed.
  114. worker_thread *threads_free;
  115. /// The most recent worker thread to which the main thread writes
  116. /// the new input from the application.
  117. worker_thread *thr;
  118. /// Amount of uncompressed data in Blocks that have already
  119. /// been finished.
  120. uint64_t progress_in;
  121. /// Amount of compressed data in Stream Header + Blocks that
  122. /// have already been finished.
  123. uint64_t progress_out;
  124. mythread_mutex mutex;
  125. mythread_cond cond;
  126. };
  127. /// Tell the main thread that something has gone wrong.
  128. static void
  129. worker_error(worker_thread *thr, lzma_ret ret)
  130. {
  131. assert(ret != LZMA_OK);
  132. assert(ret != LZMA_STREAM_END);
  133. mythread_sync(thr->coder->mutex) {
  134. if (thr->coder->thread_error == LZMA_OK)
  135. thr->coder->thread_error = ret;
  136. mythread_cond_signal(&thr->coder->cond);
  137. }
  138. return;
  139. }
  140. static worker_state
  141. worker_encode(worker_thread *thr, worker_state state)
  142. {
  143. assert(thr->progress_in == 0);
  144. assert(thr->progress_out == 0);
  145. // Set the Block options.
  146. thr->block_options = (lzma_block){
  147. .version = 0,
  148. .check = thr->coder->stream_flags.check,
  149. .compressed_size = thr->coder->outq.buf_size_max,
  150. .uncompressed_size = thr->coder->block_size,
  151. // TODO: To allow changing the filter chain, the filters
  152. // array must be copied to each worker_thread.
  153. .filters = thr->coder->filters,
  154. };
  155. // Calculate maximum size of the Block Header. This amount is
  156. // reserved in the beginning of the buffer so that Block Header
  157. // along with Compressed Size and Uncompressed Size can be
  158. // written there.
  159. lzma_ret ret = lzma_block_header_size(&thr->block_options);
  160. if (ret != LZMA_OK) {
  161. worker_error(thr, ret);
  162. return THR_STOP;
  163. }
  164. // Initialize the Block encoder.
  165. ret = lzma_block_encoder_init(&thr->block_encoder,
  166. thr->allocator, &thr->block_options);
  167. if (ret != LZMA_OK) {
  168. worker_error(thr, ret);
  169. return THR_STOP;
  170. }
  171. size_t in_pos = 0;
  172. size_t in_size = 0;
  173. thr->outbuf->size = thr->block_options.header_size;
  174. const size_t out_size = thr->coder->outq.buf_size_max;
  175. do {
  176. mythread_sync(thr->mutex) {
  177. // Store in_pos and out_pos into *thr so that
  178. // an application may read them via
  179. // lzma_get_progress() to get progress information.
  180. //
  181. // NOTE: These aren't updated when the encoding
  182. // finishes. Instead, the final values are taken
  183. // later from thr->outbuf.
  184. thr->progress_in = in_pos;
  185. thr->progress_out = thr->outbuf->size;
  186. while (in_size == thr->in_size
  187. && thr->state == THR_RUN)
  188. mythread_cond_wait(&thr->cond, &thr->mutex);
  189. state = thr->state;
  190. in_size = thr->in_size;
  191. }
  192. // Return if we were asked to stop or exit.
  193. if (state >= THR_STOP)
  194. return state;
  195. lzma_action action = state == THR_FINISH
  196. ? LZMA_FINISH : LZMA_RUN;
  197. // Limit the amount of input given to the Block encoder
  198. // at once. This way this thread can react fairly quickly
  199. // if the main thread wants us to stop or exit.
  200. static const size_t in_chunk_max = 16384;
  201. size_t in_limit = in_size;
  202. if (in_size - in_pos > in_chunk_max) {
  203. in_limit = in_pos + in_chunk_max;
  204. action = LZMA_RUN;
  205. }
  206. ret = thr->block_encoder.code(
  207. thr->block_encoder.coder, thr->allocator,
  208. thr->in, &in_pos, in_limit, thr->outbuf->buf,
  209. &thr->outbuf->size, out_size, action);
  210. } while (ret == LZMA_OK && thr->outbuf->size < out_size);
  211. switch (ret) {
  212. case LZMA_STREAM_END:
  213. assert(state == THR_FINISH);
  214. // Encode the Block Header. By doing it after
  215. // the compression, we can store the Compressed Size
  216. // and Uncompressed Size fields.
  217. ret = lzma_block_header_encode(&thr->block_options,
  218. thr->outbuf->buf);
  219. if (ret != LZMA_OK) {
  220. worker_error(thr, ret);
  221. return THR_STOP;
  222. }
  223. break;
  224. case LZMA_OK:
  225. // The data was incompressible. Encode it using uncompressed
  226. // LZMA2 chunks.
  227. //
  228. // First wait that we have gotten all the input.
  229. mythread_sync(thr->mutex) {
  230. while (thr->state == THR_RUN)
  231. mythread_cond_wait(&thr->cond, &thr->mutex);
  232. state = thr->state;
  233. in_size = thr->in_size;
  234. }
  235. if (state >= THR_STOP)
  236. return state;
  237. // Do the encoding. This takes care of the Block Header too.
  238. thr->outbuf->size = 0;
  239. ret = lzma_block_uncomp_encode(&thr->block_options,
  240. thr->in, in_size, thr->outbuf->buf,
  241. &thr->outbuf->size, out_size);
  242. // It shouldn't fail.
  243. if (ret != LZMA_OK) {
  244. worker_error(thr, LZMA_PROG_ERROR);
  245. return THR_STOP;
  246. }
  247. break;
  248. default:
  249. worker_error(thr, ret);
  250. return THR_STOP;
  251. }
  252. // Set the size information that will be read by the main thread
  253. // to write the Index field.
  254. thr->outbuf->unpadded_size
  255. = lzma_block_unpadded_size(&thr->block_options);
  256. assert(thr->outbuf->unpadded_size != 0);
  257. thr->outbuf->uncompressed_size = thr->block_options.uncompressed_size;
  258. return THR_FINISH;
  259. }
  260. static MYTHREAD_RET_TYPE
  261. worker_start(void *thr_ptr)
  262. {
  263. worker_thread *thr = thr_ptr;
  264. worker_state state = THR_IDLE; // Init to silence a warning
  265. while (true) {
  266. // Wait for work.
  267. mythread_sync(thr->mutex) {
  268. while (true) {
  269. // The thread is already idle so if we are
  270. // requested to stop, just set the state.
  271. if (thr->state == THR_STOP) {
  272. thr->state = THR_IDLE;
  273. mythread_cond_signal(&thr->cond);
  274. }
  275. state = thr->state;
  276. if (state != THR_IDLE)
  277. break;
  278. mythread_cond_wait(&thr->cond, &thr->mutex);
  279. }
  280. }
  281. assert(state != THR_IDLE);
  282. assert(state != THR_STOP);
  283. if (state <= THR_FINISH)
  284. state = worker_encode(thr, state);
  285. if (state == THR_EXIT)
  286. break;
  287. // Mark the thread as idle unless the main thread has
  288. // told us to exit. Signal is needed for the case
  289. // where the main thread is waiting for the threads to stop.
  290. mythread_sync(thr->mutex) {
  291. if (thr->state != THR_EXIT) {
  292. thr->state = THR_IDLE;
  293. mythread_cond_signal(&thr->cond);
  294. }
  295. }
  296. mythread_sync(thr->coder->mutex) {
  297. // Mark the output buffer as finished if
  298. // no errors occurred.
  299. thr->outbuf->finished = state == THR_FINISH;
  300. // Update the main progress info.
  301. thr->coder->progress_in
  302. += thr->outbuf->uncompressed_size;
  303. thr->coder->progress_out += thr->outbuf->size;
  304. thr->progress_in = 0;
  305. thr->progress_out = 0;
  306. // Return this thread to the stack of free threads.
  307. thr->next = thr->coder->threads_free;
  308. thr->coder->threads_free = thr;
  309. mythread_cond_signal(&thr->coder->cond);
  310. }
  311. }
  312. // Exiting, free the resources.
  313. mythread_mutex_destroy(&thr->mutex);
  314. mythread_cond_destroy(&thr->cond);
  315. lzma_next_end(&thr->block_encoder, thr->allocator);
  316. lzma_free(thr->in, thr->allocator);
  317. return MYTHREAD_RET_VALUE;
  318. }
  319. /// Make the threads stop but not exit. Optionally wait for them to stop.
  320. static void
  321. threads_stop(lzma_stream_coder *coder, bool wait_for_threads)
  322. {
  323. // Tell the threads to stop.
  324. for (uint32_t i = 0; i < coder->threads_initialized; ++i) {
  325. mythread_sync(coder->threads[i].mutex) {
  326. coder->threads[i].state = THR_STOP;
  327. mythread_cond_signal(&coder->threads[i].cond);
  328. }
  329. }
  330. if (!wait_for_threads)
  331. return;
  332. // Wait for the threads to settle in the idle state.
  333. for (uint32_t i = 0; i < coder->threads_initialized; ++i) {
  334. mythread_sync(coder->threads[i].mutex) {
  335. while (coder->threads[i].state != THR_IDLE)
  336. mythread_cond_wait(&coder->threads[i].cond,
  337. &coder->threads[i].mutex);
  338. }
  339. }
  340. return;
  341. }
  342. /// Stop the threads and free the resources associated with them.
  343. /// Wait until the threads have exited.
  344. static void
  345. threads_end(lzma_stream_coder *coder, const lzma_allocator *allocator)
  346. {
  347. for (uint32_t i = 0; i < coder->threads_initialized; ++i) {
  348. mythread_sync(coder->threads[i].mutex) {
  349. coder->threads[i].state = THR_EXIT;
  350. mythread_cond_signal(&coder->threads[i].cond);
  351. }
  352. }
  353. for (uint32_t i = 0; i < coder->threads_initialized; ++i) {
  354. int ret = mythread_join(coder->threads[i].thread_id);
  355. assert(ret == 0);
  356. (void)ret;
  357. }
  358. lzma_free(coder->threads, allocator);
  359. return;
  360. }
  361. /// Initialize a new worker_thread structure and create a new thread.
  362. static lzma_ret
  363. initialize_new_thread(lzma_stream_coder *coder,
  364. const lzma_allocator *allocator)
  365. {
  366. worker_thread *thr = &coder->threads[coder->threads_initialized];
  367. thr->in = lzma_alloc(coder->block_size, allocator);
  368. if (thr->in == NULL)
  369. return LZMA_MEM_ERROR;
  370. if (mythread_mutex_init(&thr->mutex))
  371. goto error_mutex;
  372. if (mythread_cond_init(&thr->cond))
  373. goto error_cond;
  374. thr->state = THR_IDLE;
  375. thr->allocator = allocator;
  376. thr->coder = coder;
  377. thr->progress_in = 0;
  378. thr->progress_out = 0;
  379. thr->block_encoder = LZMA_NEXT_CODER_INIT;
  380. if (mythread_create(&thr->thread_id, &worker_start, thr))
  381. goto error_thread;
  382. ++coder->threads_initialized;
  383. coder->thr = thr;
  384. return LZMA_OK;
  385. error_thread:
  386. mythread_cond_destroy(&thr->cond);
  387. error_cond:
  388. mythread_mutex_destroy(&thr->mutex);
  389. error_mutex:
  390. lzma_free(thr->in, allocator);
  391. return LZMA_MEM_ERROR;
  392. }
  393. static lzma_ret
  394. get_thread(lzma_stream_coder *coder, const lzma_allocator *allocator)
  395. {
  396. // If there are no free output subqueues, there is no
  397. // point to try getting a thread.
  398. if (!lzma_outq_has_buf(&coder->outq))
  399. return LZMA_OK;
  400. // If there is a free structure on the stack, use it.
  401. mythread_sync(coder->mutex) {
  402. if (coder->threads_free != NULL) {
  403. coder->thr = coder->threads_free;
  404. coder->threads_free = coder->threads_free->next;
  405. }
  406. }
  407. if (coder->thr == NULL) {
  408. // If there are no uninitialized structures left, return.
  409. if (coder->threads_initialized == coder->threads_max)
  410. return LZMA_OK;
  411. // Initialize a new thread.
  412. return_if_error(initialize_new_thread(coder, allocator));
  413. }
  414. // Reset the parts of the thread state that have to be done
  415. // in the main thread.
  416. mythread_sync(coder->thr->mutex) {
  417. coder->thr->state = THR_RUN;
  418. coder->thr->in_size = 0;
  419. coder->thr->outbuf = lzma_outq_get_buf(&coder->outq);
  420. mythread_cond_signal(&coder->thr->cond);
  421. }
  422. return LZMA_OK;
  423. }
  424. static lzma_ret
  425. stream_encode_in(lzma_stream_coder *coder, const lzma_allocator *allocator,
  426. const uint8_t *restrict in, size_t *restrict in_pos,
  427. size_t in_size, lzma_action action)
  428. {
  429. while (*in_pos < in_size
  430. || (coder->thr != NULL && action != LZMA_RUN)) {
  431. if (coder->thr == NULL) {
  432. // Get a new thread.
  433. const lzma_ret ret = get_thread(coder, allocator);
  434. if (coder->thr == NULL)
  435. return ret;
  436. }
  437. // Copy the input data to thread's buffer.
  438. size_t thr_in_size = coder->thr->in_size;
  439. lzma_bufcpy(in, in_pos, in_size, coder->thr->in,
  440. &thr_in_size, coder->block_size);
  441. // Tell the Block encoder to finish if
  442. // - it has got block_size bytes of input; or
  443. // - all input was used and LZMA_FINISH, LZMA_FULL_FLUSH,
  444. // or LZMA_FULL_BARRIER was used.
  445. //
  446. // TODO: LZMA_SYNC_FLUSH and LZMA_SYNC_BARRIER.
  447. const bool finish = thr_in_size == coder->block_size
  448. || (*in_pos == in_size && action != LZMA_RUN);
  449. bool block_error = false;
  450. mythread_sync(coder->thr->mutex) {
  451. if (coder->thr->state == THR_IDLE) {
  452. // Something has gone wrong with the Block
  453. // encoder. It has set coder->thread_error
  454. // which we will read a few lines later.
  455. block_error = true;
  456. } else {
  457. // Tell the Block encoder its new amount
  458. // of input and update the state if needed.
  459. coder->thr->in_size = thr_in_size;
  460. if (finish)
  461. coder->thr->state = THR_FINISH;
  462. mythread_cond_signal(&coder->thr->cond);
  463. }
  464. }
  465. if (block_error) {
  466. lzma_ret ret;
  467. mythread_sync(coder->mutex) {
  468. ret = coder->thread_error;
  469. }
  470. return ret;
  471. }
  472. if (finish)
  473. coder->thr = NULL;
  474. }
  475. return LZMA_OK;
  476. }
  477. /// Wait until more input can be consumed, more output can be read, or
  478. /// an optional timeout is reached.
  479. static bool
  480. wait_for_work(lzma_stream_coder *coder, mythread_condtime *wait_abs,
  481. bool *has_blocked, bool has_input)
  482. {
  483. if (coder->timeout != 0 && !*has_blocked) {
  484. // Every time when stream_encode_mt() is called via
  485. // lzma_code(), *has_blocked starts as false. We set it
  486. // to true here and calculate the absolute time when
  487. // we must return if there's nothing to do.
  488. //
  489. // The idea of *has_blocked is to avoid unneeded calls
  490. // to mythread_condtime_set(), which may do a syscall
  491. // depending on the operating system.
  492. *has_blocked = true;
  493. mythread_condtime_set(wait_abs, &coder->cond, coder->timeout);
  494. }
  495. bool timed_out = false;
  496. mythread_sync(coder->mutex) {
  497. // There are four things that we wait. If one of them
  498. // becomes possible, we return.
  499. // - If there is input left, we need to get a free
  500. // worker thread and an output buffer for it.
  501. // - Data ready to be read from the output queue.
  502. // - A worker thread indicates an error.
  503. // - Time out occurs.
  504. while ((!has_input || coder->threads_free == NULL
  505. || !lzma_outq_has_buf(&coder->outq))
  506. && !lzma_outq_is_readable(&coder->outq)
  507. && coder->thread_error == LZMA_OK
  508. && !timed_out) {
  509. if (coder->timeout != 0)
  510. timed_out = mythread_cond_timedwait(
  511. &coder->cond, &coder->mutex,
  512. wait_abs) != 0;
  513. else
  514. mythread_cond_wait(&coder->cond,
  515. &coder->mutex);
  516. }
  517. }
  518. return timed_out;
  519. }
  520. static lzma_ret
  521. stream_encode_mt(void *coder_ptr, const lzma_allocator *allocator,
  522. const uint8_t *restrict in, size_t *restrict in_pos,
  523. size_t in_size, uint8_t *restrict out,
  524. size_t *restrict out_pos, size_t out_size, lzma_action action)
  525. {
  526. lzma_stream_coder *coder = coder_ptr;
  527. switch (coder->sequence) {
  528. case SEQ_STREAM_HEADER:
  529. lzma_bufcpy(coder->header, &coder->header_pos,
  530. sizeof(coder->header),
  531. out, out_pos, out_size);
  532. if (coder->header_pos < sizeof(coder->header))
  533. return LZMA_OK;
  534. coder->header_pos = 0;
  535. coder->sequence = SEQ_BLOCK;
  536. // Fall through
  537. case SEQ_BLOCK: {
  538. // Initialized to silence warnings.
  539. lzma_vli unpadded_size = 0;
  540. lzma_vli uncompressed_size = 0;
  541. lzma_ret ret = LZMA_OK;
  542. // These are for wait_for_work().
  543. bool has_blocked = false;
  544. mythread_condtime wait_abs;
  545. while (true) {
  546. mythread_sync(coder->mutex) {
  547. // Check for Block encoder errors.
  548. ret = coder->thread_error;
  549. if (ret != LZMA_OK) {
  550. assert(ret != LZMA_STREAM_END);
  551. break;
  552. }
  553. // Try to read compressed data to out[].
  554. ret = lzma_outq_read(&coder->outq,
  555. out, out_pos, out_size,
  556. &unpadded_size,
  557. &uncompressed_size);
  558. }
  559. if (ret == LZMA_STREAM_END) {
  560. // End of Block. Add it to the Index.
  561. ret = lzma_index_append(coder->index,
  562. allocator, unpadded_size,
  563. uncompressed_size);
  564. // If we didn't fill the output buffer yet,
  565. // try to read more data. Maybe the next
  566. // outbuf has been finished already too.
  567. if (*out_pos < out_size)
  568. continue;
  569. }
  570. if (ret != LZMA_OK) {
  571. // coder->thread_error was set or
  572. // lzma_index_append() failed.
  573. threads_stop(coder, false);
  574. return ret;
  575. }
  576. // Try to give uncompressed data to a worker thread.
  577. ret = stream_encode_in(coder, allocator,
  578. in, in_pos, in_size, action);
  579. if (ret != LZMA_OK) {
  580. threads_stop(coder, false);
  581. return ret;
  582. }
  583. // See if we should wait or return.
  584. //
  585. // TODO: LZMA_SYNC_FLUSH and LZMA_SYNC_BARRIER.
  586. if (*in_pos == in_size) {
  587. // LZMA_RUN: More data is probably coming
  588. // so return to let the caller fill the
  589. // input buffer.
  590. if (action == LZMA_RUN)
  591. return LZMA_OK;
  592. // LZMA_FULL_BARRIER: The same as with
  593. // LZMA_RUN but tell the caller that the
  594. // barrier was completed.
  595. if (action == LZMA_FULL_BARRIER)
  596. return LZMA_STREAM_END;
  597. // Finishing or flushing isn't completed until
  598. // all input data has been encoded and copied
  599. // to the output buffer.
  600. if (lzma_outq_is_empty(&coder->outq)) {
  601. // LZMA_FINISH: Continue to encode
  602. // the Index field.
  603. if (action == LZMA_FINISH)
  604. break;
  605. // LZMA_FULL_FLUSH: Return to tell
  606. // the caller that flushing was
  607. // completed.
  608. if (action == LZMA_FULL_FLUSH)
  609. return LZMA_STREAM_END;
  610. }
  611. }
  612. // Return if there is no output space left.
  613. // This check must be done after testing the input
  614. // buffer, because we might want to use a different
  615. // return code.
  616. if (*out_pos == out_size)
  617. return LZMA_OK;
  618. // Neither in nor out has been used completely.
  619. // Wait until there's something we can do.
  620. if (wait_for_work(coder, &wait_abs, &has_blocked,
  621. *in_pos < in_size))
  622. return LZMA_TIMED_OUT;
  623. }
  624. // All Blocks have been encoded and the threads have stopped.
  625. // Prepare to encode the Index field.
  626. return_if_error(lzma_index_encoder_init(
  627. &coder->index_encoder, allocator,
  628. coder->index));
  629. coder->sequence = SEQ_INDEX;
  630. // Update the progress info to take the Index and
  631. // Stream Footer into account. Those are very fast to encode
  632. // so in terms of progress information they can be thought
  633. // to be ready to be copied out.
  634. coder->progress_out += lzma_index_size(coder->index)
  635. + LZMA_STREAM_HEADER_SIZE;
  636. }
  637. // Fall through
  638. case SEQ_INDEX: {
  639. // Call the Index encoder. It doesn't take any input, so
  640. // those pointers can be NULL.
  641. const lzma_ret ret = coder->index_encoder.code(
  642. coder->index_encoder.coder, allocator,
  643. NULL, NULL, 0,
  644. out, out_pos, out_size, LZMA_RUN);
  645. if (ret != LZMA_STREAM_END)
  646. return ret;
  647. // Encode the Stream Footer into coder->buffer.
  648. coder->stream_flags.backward_size
  649. = lzma_index_size(coder->index);
  650. if (lzma_stream_footer_encode(&coder->stream_flags,
  651. coder->header) != LZMA_OK)
  652. return LZMA_PROG_ERROR;
  653. coder->sequence = SEQ_STREAM_FOOTER;
  654. }
  655. // Fall through
  656. case SEQ_STREAM_FOOTER:
  657. lzma_bufcpy(coder->header, &coder->header_pos,
  658. sizeof(coder->header),
  659. out, out_pos, out_size);
  660. return coder->header_pos < sizeof(coder->header)
  661. ? LZMA_OK : LZMA_STREAM_END;
  662. }
  663. assert(0);
  664. return LZMA_PROG_ERROR;
  665. }
  666. static void
  667. stream_encoder_mt_end(void *coder_ptr, const lzma_allocator *allocator)
  668. {
  669. lzma_stream_coder *coder = coder_ptr;
  670. // Threads must be killed before the output queue can be freed.
  671. threads_end(coder, allocator);
  672. lzma_outq_end(&coder->outq, allocator);
  673. for (size_t i = 0; coder->filters[i].id != LZMA_VLI_UNKNOWN; ++i)
  674. lzma_free(coder->filters[i].options, allocator);
  675. lzma_next_end(&coder->index_encoder, allocator);
  676. lzma_index_end(coder->index, allocator);
  677. mythread_cond_destroy(&coder->cond);
  678. mythread_mutex_destroy(&coder->mutex);
  679. lzma_free(coder, allocator);
  680. return;
  681. }
  682. /// Options handling for lzma_stream_encoder_mt_init() and
  683. /// lzma_stream_encoder_mt_memusage()
  684. static lzma_ret
  685. get_options(const lzma_mt *options, lzma_options_easy *opt_easy,
  686. const lzma_filter **filters, uint64_t *block_size,
  687. uint64_t *outbuf_size_max)
  688. {
  689. // Validate some of the options.
  690. if (options == NULL)
  691. return LZMA_PROG_ERROR;
  692. if (options->flags != 0 || options->threads == 0
  693. || options->threads > LZMA_THREADS_MAX)
  694. return LZMA_OPTIONS_ERROR;
  695. if (options->filters != NULL) {
  696. // Filter chain was given, use it as is.
  697. *filters = options->filters;
  698. } else {
  699. // Use a preset.
  700. if (lzma_easy_preset(opt_easy, options->preset))
  701. return LZMA_OPTIONS_ERROR;
  702. *filters = opt_easy->filters;
  703. }
  704. // Block size
  705. if (options->block_size > 0) {
  706. if (options->block_size > BLOCK_SIZE_MAX)
  707. return LZMA_OPTIONS_ERROR;
  708. *block_size = options->block_size;
  709. } else {
  710. // Determine the Block size from the filter chain.
  711. *block_size = lzma_mt_block_size(*filters);
  712. if (*block_size == 0)
  713. return LZMA_OPTIONS_ERROR;
  714. assert(*block_size <= BLOCK_SIZE_MAX);
  715. }
  716. // Calculate the maximum amount output that a single output buffer
  717. // may need to hold. This is the same as the maximum total size of
  718. // a Block.
  719. *outbuf_size_max = lzma_block_buffer_bound64(*block_size);
  720. if (*outbuf_size_max == 0)
  721. return LZMA_MEM_ERROR;
  722. return LZMA_OK;
  723. }
  724. static void
  725. get_progress(void *coder_ptr, uint64_t *progress_in, uint64_t *progress_out)
  726. {
  727. lzma_stream_coder *coder = coder_ptr;
  728. // Lock coder->mutex to prevent finishing threads from moving their
  729. // progress info from the worker_thread structure to lzma_stream_coder.
  730. mythread_sync(coder->mutex) {
  731. *progress_in = coder->progress_in;
  732. *progress_out = coder->progress_out;
  733. for (size_t i = 0; i < coder->threads_initialized; ++i) {
  734. mythread_sync(coder->threads[i].mutex) {
  735. *progress_in += coder->threads[i].progress_in;
  736. *progress_out += coder->threads[i]
  737. .progress_out;
  738. }
  739. }
  740. }
  741. return;
  742. }
  743. static lzma_ret
  744. stream_encoder_mt_init(lzma_next_coder *next, const lzma_allocator *allocator,
  745. const lzma_mt *options)
  746. {
  747. lzma_next_coder_init(&stream_encoder_mt_init, next, allocator);
  748. // Get the filter chain.
  749. lzma_options_easy easy;
  750. const lzma_filter *filters;
  751. uint64_t block_size;
  752. uint64_t outbuf_size_max;
  753. return_if_error(get_options(options, &easy, &filters,
  754. &block_size, &outbuf_size_max));
  755. #if SIZE_MAX < UINT64_MAX
  756. if (block_size > SIZE_MAX)
  757. return LZMA_MEM_ERROR;
  758. #endif
  759. // Validate the filter chain so that we can give an error in this
  760. // function instead of delaying it to the first call to lzma_code().
  761. // The memory usage calculation verifies the filter chain as
  762. // a side effect so we take advatange of that.
  763. if (lzma_raw_encoder_memusage(filters) == UINT64_MAX)
  764. return LZMA_OPTIONS_ERROR;
  765. // Validate the Check ID.
  766. if ((unsigned int)(options->check) > LZMA_CHECK_ID_MAX)
  767. return LZMA_PROG_ERROR;
  768. if (!lzma_check_is_supported(options->check))
  769. return LZMA_UNSUPPORTED_CHECK;
  770. // Allocate and initialize the base structure if needed.
  771. lzma_stream_coder *coder = next->coder;
  772. if (coder == NULL) {
  773. coder = lzma_alloc(sizeof(lzma_stream_coder), allocator);
  774. if (coder == NULL)
  775. return LZMA_MEM_ERROR;
  776. next->coder = coder;
  777. // For the mutex and condition variable initializations
  778. // the error handling has to be done here because
  779. // stream_encoder_mt_end() doesn't know if they have
  780. // already been initialized or not.
  781. if (mythread_mutex_init(&coder->mutex)) {
  782. lzma_free(coder, allocator);
  783. next->coder = NULL;
  784. return LZMA_MEM_ERROR;
  785. }
  786. if (mythread_cond_init(&coder->cond)) {
  787. mythread_mutex_destroy(&coder->mutex);
  788. lzma_free(coder, allocator);
  789. next->coder = NULL;
  790. return LZMA_MEM_ERROR;
  791. }
  792. next->code = &stream_encode_mt;
  793. next->end = &stream_encoder_mt_end;
  794. next->get_progress = &get_progress;
  795. // next->update = &stream_encoder_mt_update;
  796. coder->filters[0].id = LZMA_VLI_UNKNOWN;
  797. coder->index_encoder = LZMA_NEXT_CODER_INIT;
  798. coder->index = NULL;
  799. memzero(&coder->outq, sizeof(coder->outq));
  800. coder->threads = NULL;
  801. coder->threads_max = 0;
  802. coder->threads_initialized = 0;
  803. }
  804. // Basic initializations
  805. coder->sequence = SEQ_STREAM_HEADER;
  806. coder->block_size = (size_t)(block_size);
  807. coder->thread_error = LZMA_OK;
  808. coder->thr = NULL;
  809. // Allocate the thread-specific base structures.
  810. assert(options->threads > 0);
  811. if (coder->threads_max != options->threads) {
  812. threads_end(coder, allocator);
  813. coder->threads = NULL;
  814. coder->threads_max = 0;
  815. coder->threads_initialized = 0;
  816. coder->threads_free = NULL;
  817. coder->threads = lzma_alloc(
  818. options->threads * sizeof(worker_thread),
  819. allocator);
  820. if (coder->threads == NULL)
  821. return LZMA_MEM_ERROR;
  822. coder->threads_max = options->threads;
  823. } else {
  824. // Reuse the old structures and threads. Tell the running
  825. // threads to stop and wait until they have stopped.
  826. threads_stop(coder, true);
  827. }
  828. // Output queue
  829. return_if_error(lzma_outq_init(&coder->outq, allocator,
  830. outbuf_size_max, options->threads));
  831. // Timeout
  832. coder->timeout = options->timeout;
  833. // Free the old filter chain and copy the new one.
  834. for (size_t i = 0; coder->filters[i].id != LZMA_VLI_UNKNOWN; ++i)
  835. lzma_free(coder->filters[i].options, allocator);
  836. return_if_error(lzma_filters_copy(
  837. filters, coder->filters, allocator));
  838. // Index
  839. lzma_index_end(coder->index, allocator);
  840. coder->index = lzma_index_init(allocator);
  841. if (coder->index == NULL)
  842. return LZMA_MEM_ERROR;
  843. // Stream Header
  844. coder->stream_flags.version = 0;
  845. coder->stream_flags.check = options->check;
  846. return_if_error(lzma_stream_header_encode(
  847. &coder->stream_flags, coder->header));
  848. coder->header_pos = 0;
  849. // Progress info
  850. coder->progress_in = 0;
  851. coder->progress_out = LZMA_STREAM_HEADER_SIZE;
  852. return LZMA_OK;
  853. }
  854. extern LZMA_API(lzma_ret)
  855. lzma_stream_encoder_mt(lzma_stream *strm, const lzma_mt *options)
  856. {
  857. lzma_next_strm_init(stream_encoder_mt_init, strm, options);
  858. strm->internal->supported_actions[LZMA_RUN] = true;
  859. // strm->internal->supported_actions[LZMA_SYNC_FLUSH] = true;
  860. strm->internal->supported_actions[LZMA_FULL_FLUSH] = true;
  861. strm->internal->supported_actions[LZMA_FULL_BARRIER] = true;
  862. strm->internal->supported_actions[LZMA_FINISH] = true;
  863. return LZMA_OK;
  864. }
  865. // This function name is a monster but it's consistent with the older
  866. // monster names. :-( 31 chars is the max that C99 requires so in that
  867. // sense it's not too long. ;-)
  868. extern LZMA_API(uint64_t)
  869. lzma_stream_encoder_mt_memusage(const lzma_mt *options)
  870. {
  871. lzma_options_easy easy;
  872. const lzma_filter *filters;
  873. uint64_t block_size;
  874. uint64_t outbuf_size_max;
  875. if (get_options(options, &easy, &filters, &block_size,
  876. &outbuf_size_max) != LZMA_OK)
  877. return UINT64_MAX;
  878. // Memory usage of the input buffers
  879. const uint64_t inbuf_memusage = options->threads * block_size;
  880. // Memory usage of the filter encoders
  881. uint64_t filters_memusage = lzma_raw_encoder_memusage(filters);
  882. if (filters_memusage == UINT64_MAX)
  883. return UINT64_MAX;
  884. filters_memusage *= options->threads;
  885. // Memory usage of the output queue
  886. const uint64_t outq_memusage = lzma_outq_memusage(
  887. outbuf_size_max, options->threads);
  888. if (outq_memusage == UINT64_MAX)
  889. return UINT64_MAX;
  890. // Sum them with overflow checking.
  891. uint64_t total_memusage = LZMA_MEMUSAGE_BASE
  892. + sizeof(lzma_stream_coder)
  893. + options->threads * sizeof(worker_thread);
  894. if (UINT64_MAX - total_memusage < inbuf_memusage)
  895. return UINT64_MAX;
  896. total_memusage += inbuf_memusage;
  897. if (UINT64_MAX - total_memusage < filters_memusage)
  898. return UINT64_MAX;
  899. total_memusage += filters_memusage;
  900. if (UINT64_MAX - total_memusage < outq_memusage)
  901. return UINT64_MAX;
  902. return total_memusage + outq_memusage;
  903. }