# Copyright The OpenTelemetry Authors
# SPDX-License-Identifier: Apache-2.0
from collections.abc import Iterable, Sequence
from collections.abc import Sequence as TypingSequence
from os import environ
from typing import Literal
from grpc import ChannelCredentials, Compression, StatusCode
from opentelemetry.exporter.otlp.proto.common._log_encoder import encode_logs
from opentelemetry.exporter.otlp.proto.grpc.exporter import (
OTLPExporterMixin,
_get_credentials,
environ_to_compression,
)
from opentelemetry.metrics import MeterProvider
from opentelemetry.proto.collector.logs.v1.logs_service_pb2 import (
ExportLogsServiceRequest,
)
from opentelemetry.proto.collector.logs.v1.logs_service_pb2_grpc import (
LogsServiceStub,
)
from opentelemetry.sdk._logs import ReadableLogRecord
from opentelemetry.sdk._logs.export import (
LogRecordExporter,
LogRecordExportResult,
)
from opentelemetry.sdk.environment_variables import (
_OTEL_PYTHON_EXPORTER_OTLP_GRPC_LOGS_CREDENTIAL_PROVIDER,
OTEL_EXPORTER_OTLP_LOGS_CERTIFICATE,
OTEL_EXPORTER_OTLP_LOGS_CLIENT_CERTIFICATE,
OTEL_EXPORTER_OTLP_LOGS_CLIENT_KEY,
OTEL_EXPORTER_OTLP_LOGS_COMPRESSION,
OTEL_EXPORTER_OTLP_LOGS_ENDPOINT,
OTEL_EXPORTER_OTLP_LOGS_HEADERS,
OTEL_EXPORTER_OTLP_LOGS_INSECURE,
OTEL_EXPORTER_OTLP_LOGS_TIMEOUT,
)
from opentelemetry.semconv._incubating.attributes.otel_attributes import (
OtelComponentTypeValues,
)
[docs]
class OTLPLogExporter(
LogRecordExporter,
OTLPExporterMixin[
Sequence[ReadableLogRecord],
ExportLogsServiceRequest,
LogRecordExportResult,
LogsServiceStub,
],
):
def __init__(
self,
endpoint: str | None = None,
insecure: bool | None = None,
credentials: ChannelCredentials | None = None,
headers: TypingSequence[tuple[str, str]]
| dict[str, str]
| str
| None = None,
timeout: float | None = None,
compression: Compression | None = None,
channel_options: tuple[tuple[str, str]] | None = None,
retryable_error_codes: Iterable[StatusCode] | None = None,
*,
meter_provider: MeterProvider | None = None,
):
insecure_logs = environ.get(OTEL_EXPORTER_OTLP_LOGS_INSECURE)
if insecure is None and insecure_logs is not None:
insecure = insecure_logs.lower() == "true"
if (
not insecure
and environ.get(OTEL_EXPORTER_OTLP_LOGS_CERTIFICATE) is not None
):
credentials = _get_credentials(
credentials,
_OTEL_PYTHON_EXPORTER_OTLP_GRPC_LOGS_CREDENTIAL_PROVIDER,
OTEL_EXPORTER_OTLP_LOGS_CERTIFICATE,
OTEL_EXPORTER_OTLP_LOGS_CLIENT_KEY,
OTEL_EXPORTER_OTLP_LOGS_CLIENT_CERTIFICATE,
)
environ_timeout = environ.get(OTEL_EXPORTER_OTLP_LOGS_TIMEOUT)
environ_timeout = (
float(environ_timeout) if environ_timeout is not None else None
)
compression = (
environ_to_compression(OTEL_EXPORTER_OTLP_LOGS_COMPRESSION)
if compression is None
else compression
)
OTLPExporterMixin.__init__(
self,
endpoint=endpoint or environ.get(OTEL_EXPORTER_OTLP_LOGS_ENDPOINT),
insecure=insecure,
credentials=credentials,
headers=headers or environ.get(OTEL_EXPORTER_OTLP_LOGS_HEADERS),
timeout=timeout or environ_timeout,
compression=compression,
stub=LogsServiceStub,
result=LogRecordExportResult,
channel_options=channel_options,
retryable_error_codes=retryable_error_codes,
component_type=OtelComponentTypeValues.OTLP_GRPC_LOG_EXPORTER,
signal="logs",
meter_provider=meter_provider,
)
def _translate_data(
self, data: Sequence[ReadableLogRecord]
) -> ExportLogsServiceRequest:
return encode_logs(data)
def _count_data(self, data: Sequence[ReadableLogRecord]):
return len(data)
[docs]
def export( # type: ignore [reportIncompatibleMethodOverride]
self,
batch: Sequence[ReadableLogRecord],
) -> Literal[LogRecordExportResult.SUCCESS, LogRecordExportResult.FAILURE]:
return OTLPExporterMixin._export(self, batch)
[docs]
def shutdown(self, timeout_millis: float = 30_000, **kwargs) -> None:
OTLPExporterMixin.shutdown(self, timeout_millis=timeout_millis)
[docs]
def force_flush(self, timeout_millis: float = 10_000) -> bool:
"""Nothing is buffered in this exporter, so this method does nothing."""
return True
@property
def _exporting(self) -> str:
return "logs"