KafkaHandler.php 8.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276
  1. <?php
  2. /**
  3. * This program is free software; you can redistribute it and/or modify
  4. * it under the terms of the GNU General Public License as published by
  5. * the Free Software Foundation; either version 2 of the License, or
  6. * (at your option) any later version.
  7. *
  8. * This program is distributed in the hope that it will be useful,
  9. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  10. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  11. * GNU General Public License for more details.
  12. *
  13. * You should have received a copy of the GNU General Public License along
  14. * with this program; if not, write to the Free Software Foundation, Inc.,
  15. * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
  16. * http://www.gnu.org/copyleft/gpl.html
  17. *
  18. * @file
  19. */
  20. namespace MediaWiki\Logger\Monolog;
  21. use Kafka\MetaDataFromKafka;
  22. use Kafka\Produce;
  23. use Kafka\Protocol\Decoder;
  24. use MediaWiki\Logger\LoggerFactory;
  25. use Monolog\Handler\AbstractProcessingHandler;
  26. use Monolog\Logger;
  27. use Psr\Log\LoggerInterface;
  28. /**
  29. * Log handler sends log events to a kafka server.
  30. *
  31. * Constructor options array arguments:
  32. * * alias: map from monolog channel to kafka topic name. When no
  33. * alias exists the topic "monolog_$channel" will be used.
  34. * * swallowExceptions: Swallow exceptions that occur while talking to
  35. * kafka. Defaults to false.
  36. * * logExceptions: Log exceptions talking to kafka here. Either null,
  37. * the name of a channel to log to, or an object implementing
  38. * FormatterInterface. Defaults to null.
  39. *
  40. * Requires the nmred/kafka-php library, version >= 1.3.0
  41. *
  42. * @since 1.26
  43. * @author Erik Bernhardson <ebernhardson@wikimedia.org>
  44. * @copyright © 2015 Erik Bernhardson and Wikimedia Foundation.
  45. */
  46. class KafkaHandler extends AbstractProcessingHandler {
  47. /**
  48. * @var Produce Sends requests to kafka
  49. */
  50. protected $produce;
  51. /**
  52. * @var array Optional handler configuration
  53. */
  54. protected $options;
  55. /**
  56. * @var array Map from topic name to partition this request produces to
  57. */
  58. protected $partitions = [];
  59. /**
  60. * @var array defaults for constructor options
  61. */
  62. private static $defaultOptions = [
  63. 'alias' => [], // map from monolog channel to kafka topic
  64. 'swallowExceptions' => false, // swallow exceptions sending records
  65. 'logExceptions' => null, // A PSR3 logger to inform about errors
  66. 'requireAck' => 0,
  67. ];
  68. /**
  69. * @param Produce $produce Kafka instance to produce through
  70. * @param array $options optional handler configuration
  71. * @param int $level The minimum logging level at which this handler will be triggered
  72. * @param bool $bubble Whether the messages that are handled can bubble up the stack or not
  73. */
  74. public function __construct(
  75. Produce $produce, array $options, $level = Logger::DEBUG, $bubble = true
  76. ) {
  77. parent::__construct( $level, $bubble );
  78. $this->produce = $produce;
  79. $this->options = array_merge( self::$defaultOptions, $options );
  80. }
  81. /**
  82. * Constructs the necessary support objects and returns a KafkaHandler
  83. * instance.
  84. *
  85. * @param string[] $kafkaServers
  86. * @param array $options
  87. * @param int $level The minimum logging level at which this handle will be triggered
  88. * @param bool $bubble Whether the messages that are handled can bubble the stack or not
  89. * @return KafkaHandler
  90. */
  91. public static function factory(
  92. $kafkaServers, array $options = [], $level = Logger::DEBUG, $bubble = true
  93. ) {
  94. $metadata = new MetaDataFromKafka( $kafkaServers );
  95. $produce = new Produce( $metadata );
  96. if ( isset( $options['sendTimeout'] ) ) {
  97. $timeOut = $options['sendTimeout'];
  98. $produce->getClient()->setStreamOption( 'SendTimeoutSec', 0 );
  99. $produce->getClient()->setStreamOption( 'SendTimeoutUSec',
  100. intval( $timeOut * 1000000 )
  101. );
  102. }
  103. if ( isset( $options['recvTimeout'] ) ) {
  104. $timeOut = $options['recvTimeout'];
  105. $produce->getClient()->setStreamOption( 'RecvTimeoutSec', 0 );
  106. $produce->getClient()->setStreamOption( 'RecvTimeoutUSec',
  107. intval( $timeOut * 1000000 )
  108. );
  109. }
  110. if ( isset( $options['logExceptions'] ) && is_string( $options['logExceptions'] ) ) {
  111. $options['logExceptions'] = LoggerFactory::getInstance( $options['logExceptions'] );
  112. }
  113. if ( isset( $options['requireAck'] ) ) {
  114. $produce->setRequireAck( $options['requireAck'] );
  115. }
  116. return new self( $produce, $options, $level, $bubble );
  117. }
  118. /**
  119. * @inheritDoc
  120. */
  121. protected function write( array $record ) {
  122. if ( $record['formatted'] !== null ) {
  123. $this->addMessages( $record['channel'], [ $record['formatted'] ] );
  124. $this->send();
  125. }
  126. }
  127. /**
  128. * @inheritDoc
  129. */
  130. public function handleBatch( array $batch ) {
  131. $channels = [];
  132. foreach ( $batch as $record ) {
  133. if ( $record['level'] < $this->level ) {
  134. continue;
  135. }
  136. $channels[$record['channel']][] = $this->processRecord( $record );
  137. }
  138. $formatter = $this->getFormatter();
  139. foreach ( $channels as $channel => $records ) {
  140. $messages = [];
  141. foreach ( $records as $idx => $record ) {
  142. $message = $formatter->format( $record );
  143. if ( $message !== null ) {
  144. $messages[] = $message;
  145. }
  146. }
  147. if ( $messages ) {
  148. $this->addMessages( $channel, $messages );
  149. }
  150. }
  151. $this->send();
  152. }
  153. /**
  154. * Send any records in the kafka client internal queue.
  155. */
  156. protected function send() {
  157. try {
  158. $response = $this->produce->send();
  159. } catch ( \Kafka\Exception $e ) {
  160. $ignore = $this->warning(
  161. 'Error sending records to kafka: {exception}',
  162. [ 'exception' => $e ] );
  163. if ( !$ignore ) {
  164. throw $e;
  165. } else {
  166. return;
  167. }
  168. }
  169. if ( is_bool( $response ) ) {
  170. return;
  171. }
  172. $errors = [];
  173. foreach ( $response as $topicName => $partitionResponse ) {
  174. foreach ( $partitionResponse as $partition => $info ) {
  175. if ( $info['errCode'] === 0 ) {
  176. // no error
  177. continue;
  178. }
  179. $errors[] = sprintf(
  180. 'Error producing to %s (errno %d): %s',
  181. $topicName,
  182. $info['errCode'],
  183. Decoder::getError( $info['errCode'] )
  184. );
  185. }
  186. }
  187. if ( $errors ) {
  188. $error = implode( "\n", $errors );
  189. if ( !$this->warning( $error ) ) {
  190. throw new \RuntimeException( $error );
  191. }
  192. }
  193. }
  194. /**
  195. * @param string $topic Name of topic to get partition for
  196. * @return int|null The random partition to produce to for this request,
  197. * or null if a partition could not be determined.
  198. */
  199. protected function getRandomPartition( $topic ) {
  200. if ( !array_key_exists( $topic, $this->partitions ) ) {
  201. try {
  202. $partitions = $this->produce->getAvailablePartitions( $topic );
  203. } catch ( \Kafka\Exception $e ) {
  204. $ignore = $this->warning(
  205. 'Error getting metadata for kafka topic {topic}: {exception}',
  206. [ 'topic' => $topic, 'exception' => $e ] );
  207. if ( $ignore ) {
  208. return null;
  209. }
  210. throw $e;
  211. }
  212. if ( $partitions ) {
  213. $key = array_rand( $partitions );
  214. $this->partitions[$topic] = $partitions[$key];
  215. } else {
  216. $details = $this->produce->getClient()->getTopicDetail( $topic );
  217. $ignore = $this->warning(
  218. 'No partitions available for kafka topic {topic}',
  219. [ 'topic' => $topic, 'kafka' => $details ]
  220. );
  221. if ( !$ignore ) {
  222. throw new \RuntimeException( "No partitions available for kafka topic $topic" );
  223. }
  224. $this->partitions[$topic] = null;
  225. }
  226. }
  227. return $this->partitions[$topic];
  228. }
  229. /**
  230. * Adds records for a channel to the Kafka client internal queue.
  231. *
  232. * @param string $channel Name of Monolog channel records belong to
  233. * @param array $records List of records to append
  234. */
  235. protected function addMessages( $channel, array $records ) {
  236. $topic = $this->options['alias'][$channel] ?? "monolog_$channel";
  237. $partition = $this->getRandomPartition( $topic );
  238. if ( $partition !== null ) {
  239. $this->produce->setMessages( $topic, $partition, $records );
  240. }
  241. }
  242. /**
  243. * @param string $message PSR3 compatible message string
  244. * @param array $context PSR3 compatible log context
  245. * @return bool true if caller should ignore warning
  246. */
  247. protected function warning( $message, array $context = [] ) {
  248. if ( $this->options['logExceptions'] instanceof LoggerInterface ) {
  249. $this->options['logExceptions']->warning( $message, $context );
  250. }
  251. return $this->options['swallowExceptions'];
  252. }
  253. }