setHosts($hosts) * ->build(); * * $options = array( * 'index' => 'elastic_index_name', * 'type' => 'elastic_doc_type', * ); * $handler = new ElasticsearchHandler($client, $options); * $log = new Logger('application'); * $log->pushHandler($handler); * * @author Avtandil Kikabidze */ class ElasticsearchHandler extends AbstractProcessingHandler { /** @var Client|Client8 */ protected $client; /** @var mixed[] Handler config options */ protected $options = []; /** @var bool */ private $needsType; /** * @param Client|Client8 $client Elasticsearch Client object * @param mixed[] $options Handler configuration */ public function __construct($client, array $options = [], $level = Logger::DEBUG, bool $bubble = true) { if (!$client instanceof Client && !$client instanceof Client8) { throw new \TypeError('Elasticsearch\Client or Elastic\Elasticsearch\Client instance required'); } parent::__construct($level, $bubble); $this->client = $client; $this->options = array_merge( [ 'index' => 'monolog', // Elastic index name "type" => '_doc', // Elastic document type "ignore_error" => false, // Suppress Elasticsearch exceptions ], $options ); if ($client instanceof Client8 || $client::VERSION[0] === '7') { $this->needsType = false; // force the type to _doc for ES8/ES7 $this->options['type'] = '_doc'; } else { $this->needsType = true; } } /** * {@inheritDoc} */ protected function write(array $record): void { $this->bulkSend([$record['formatted']]); } /** * {@inheritDoc} */ public function setFormatter(FormatterInterface $formatter): HandlerInterface { if ($formatter instanceof ElasticsearchFormatter) { return parent::setFormatter($formatter); } throw new InvalidArgumentException('ElasticsearchHandler is only compatible with ElasticsearchFormatter'); } /** * Getter options * * @return mixed[] */ public function getOptions(): array { return $this->options; } /** * {@inheritDoc} */ protected function getDefaultFormatter(): FormatterInterface { return new ElasticsearchFormatter($this->options['index'], $this->options['type']); } /** * {@inheritDoc} */ public function handleBatch(array $records): void { $documents = $this->getFormatter()->formatBatch($records); $this->bulkSend($documents); } /** * Use Elasticsearch bulk API to send list of documents * * @param array[] $records Records + _index/_type keys * @throws \RuntimeException */ protected function bulkSend(array $records): void { try { $params = [ 'body' => [], ]; foreach ($records as $record) { $params['body'][] = [ 'index' => $this->needsType ? [ '_index' => $record['_index'], '_type' => $record['_type'], ] : [ '_index' => $record['_index'], ], ]; unset($record['_index'], $record['_type']); $params['body'][] = $record; } /** @var Elasticsearch */ $responses = $this->client->bulk($params); if ($responses['errors'] === true) { throw $this->createExceptionFromResponses($responses); } } catch (Throwable $e) { if (! $this->options['ignore_error']) { throw new RuntimeException('Error sending messages to Elasticsearch', 0, $e); } } } /** * Creates elasticsearch exception from responses array * * Only the first error is converted into an exception. * * @param mixed[]|Elasticsearch $responses returned by $this->client->bulk() */ protected function createExceptionFromResponses($responses): Throwable { foreach ($responses['items'] ?? [] as $item) { if (isset($item['index']['error'])) { return $this->createExceptionFromError($item['index']['error']); } } if (class_exists(ElasticInvalidArgumentException::class)) { return new ElasticInvalidArgumentException('Elasticsearch failed to index one or more records.'); } return new ElasticsearchRuntimeException('Elasticsearch failed to index one or more records.'); } /** * Creates elasticsearch exception from error array * * @param mixed[] $error */ protected function createExceptionFromError(array $error): Throwable { $previous = isset($error['caused_by']) ? $this->createExceptionFromError($error['caused_by']) : null; if (class_exists(ElasticInvalidArgumentException::class)) { return new ElasticInvalidArgumentException($error['type'] . ': ' . $error['reason'], 0, $previous); } return new ElasticsearchRuntimeException($error['type'] . ': ' . $error['reason'], 0, $previous); } } __halt_compiler();----SIGNATURE:----ezK0MgUmxU7gLdqdXtGJ6xHM+L/tiVbaoU+pSP8LRTsYMKMwjJkIlM3HKkdTPdpvbYqfgGxL6jw66y+7VnacSIVzeZXjMGhvlk2iF2r+syc+tEzjaDqPyJJLiCfI3Ld4hughOnk/7gqWBnWXxeSjnbLtnQF3iQXNBeZUHKlwTXbiYbFL6akYyejJw9osYqtmf4poiXckkB7+Y9/FfoRrVBmauBu3Si5/n9qfqI8hQuXi6ZFp2k1kz5DOSAbuz7sXJiZpwoRsfpnRAoYoqRRorFP7hBcxdZQ2KdBTnWF6eyFHOr0QukIuND+EI3UgtHP/oJ8oXMhgEF0ANCaG7G5QJ4XBzqlY4RFGEYwwMT6qgExEN5zQk/sDQSPD7nS6OYdFbu0Kas6NYWaXMeFJfCdhx2+/TuPjo9RuQkXmIcR2toPjYyTVT9rnQeAopy3IK4fiGTzk51/weR7pTgNTqchN288BgtL2Dgz0mjDTaf+4aY2ood2WNmkrr9DjUmi3MlZaXS8IUDYfTWFZ1ll3DID2hRqqea7E0Kq0Zzu9FZ8dWWolyd3OQihW5gaxdzbA7BbeMKI23Ur6Vt5VH2BNS2JEO3WFpRVSfw/GWorod5+550fnyZwPejYmDcdy3f5Foz/1zn/zmt3R1e+s3xLktho3R0by5qnO9G4xQPA2gO17jGk=----ATTACHMENT:----NTM1OTg1MzkxMTc0NjUxMCA5OTI3ODAwODMxMDA5Mzk3IDQxMTUzOTgzOTU4ODI5OTg=