EventRelayerKafka.php 1.3 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364
  1. <?php
  2. use Kafka\Produce;
  3. /**
  4. * Event relayer for Apache Kafka.
  5. * Configuring for WANCache:
  6. * 'relayerConfig' => [ 'class' => 'EventRelayerKafka', 'KafkaEventHost' => 'localhost:9092' ],
  7. */
  8. class EventRelayerKafka extends EventRelayer {
  9. /**
  10. * Configuration.
  11. *
  12. * @var Config
  13. */
  14. protected $config;
  15. /**
  16. * Kafka producer.
  17. *
  18. * @var Produce
  19. */
  20. protected $producer;
  21. /**
  22. * Create Kafka producer.
  23. *
  24. * @param array $params
  25. */
  26. public function __construct( array $params ) {
  27. parent::__construct( $params );
  28. $this->config = new HashConfig( $params );
  29. if ( !$this->config->has( 'KafkaEventHost' ) ) {
  30. throw new InvalidArgumentException( "KafkaEventHost must be configured" );
  31. }
  32. }
  33. /**
  34. * Get the producer object from kafka-php.
  35. * @return Produce
  36. */
  37. protected function getKafkaProducer() {
  38. if ( !$this->producer ) {
  39. $this->producer = Produce::getInstance(
  40. null, null, $this->config->get( 'KafkaEventHost' ) );
  41. }
  42. return $this->producer;
  43. }
  44. protected function doNotify( $channel, array $events ) {
  45. $jsonEvents = array_map( 'json_encode', $events );
  46. try {
  47. $producer = $this->getKafkaProducer();
  48. $producer->setMessages( $channel, 0, $jsonEvents );
  49. $producer->send();
  50. } catch ( \Kafka\Exception $e ) {
  51. $this->logger->warning( "Sending events failed: $e" );
  52. return false;
  53. }
  54. return true;
  55. }
  56. }