Line data Source code
1 : // Licensed under the Apache License, Version 2.0
2 : // Copyright 2025, Michael Bushe, All rights reserved.
3 :
4 : import 'dart:async';
5 :
6 : import 'package:grpc/grpc.dart';
7 :
8 : import '../../../../dartastic_opentelemetry.dart';
9 : import '../../../../proto/collector/metrics/v1/metrics_service.pbgrpc.dart';
10 : import '../../../../proto/common/v1/common.pb.dart' as common_proto;
11 : import '../../../../proto/metrics/v1/metrics.pb.dart' as proto;
12 : import '../../../trace/export/otlp/certificate_utils.dart';
13 : import 'metric_transformer.dart';
14 :
15 : /// OtlpGrpcMetricExporter exports metrics to the OpenTelemetry collector via gRPC.
16 : class OtlpGrpcMetricExporter implements MetricExporter {
17 : final MetricsServiceClient _client;
18 : bool _shutdown = false;
19 :
20 : // Static channel reference to allow shutdown
21 69 : static late ClientChannel _channel;
22 :
23 : /// Creates a new OtlpGrpcMetricExporter with the given configuration.
24 69 : OtlpGrpcMetricExporter(OtlpGrpcMetricExporterConfig config)
25 69 : : _client = _createClient(config);
26 :
27 : /// Creates channel credentials based on configuration.
28 : ///
29 : /// If insecure is true, returns insecure credentials.
30 : /// Otherwise, creates secure credentials with optional custom certificates for mTLS.
31 69 : static ChannelCredentials _createChannelCredentials(
32 : OtlpGrpcMetricExporterConfig config) {
33 69 : if (config.insecure) {
34 : return const ChannelCredentials.insecure();
35 : }
36 :
37 : // If no custom certificates are provided, use default secure credentials
38 69 : if (config.certificate == null &&
39 69 : config.clientKey == null &&
40 69 : config.clientCertificate == null) {
41 : return const ChannelCredentials.secure();
42 : }
43 :
44 : try {
45 0 : final context = CertificateUtils.createSecurityContext(
46 0 : certificate: config.certificate,
47 0 : clientKey: config.clientKey,
48 0 : clientCertificate: config.clientCertificate,
49 : );
50 :
51 : if (context == null) {
52 : return const ChannelCredentials.secure();
53 : }
54 :
55 : return const ChannelCredentials.secure(
56 : certificates: null, // We're using SecurityContext instead
57 : authority: null,
58 : onBadCertificate: null,
59 : );
60 : } catch (e) {
61 0 : if (OTelLog.isError()) {
62 0 : OTelLog.error(
63 0 : 'OtlpGrpcMetricExporter: Failed to load certificates: $e');
64 : }
65 : // Fall back to default secure credentials on error
66 : return const ChannelCredentials.secure();
67 : }
68 : }
69 :
70 69 : static MetricsServiceClient _createClient(
71 : OtlpGrpcMetricExporterConfig config) {
72 69 : final channelOptions = ChannelOptions(
73 69 : credentials: _createChannelCredentials(config),
74 69 : codecRegistry: CodecRegistry(codecs: const [GzipCodec()]),
75 : );
76 :
77 : // Parse host and port from endpoint
78 138 : final Uri uri = Uri.parse(config.endpoint);
79 69 : final String host = uri.host;
80 : final int port =
81 207 : uri.port > 0 ? uri.port : (uri.scheme == 'https' ? 443 : 80);
82 :
83 69 : if (OTelLog.isLogExport()) {
84 69 : OTelLog.logExport(
85 69 : 'OtlpGrpcMetricExporter: Creating client for $host:$port');
86 : }
87 :
88 : // We store the channel separately to be able to shut it down later
89 69 : _channel = ClientChannel(
90 : host,
91 : port: port,
92 : options: channelOptions,
93 : );
94 :
95 : // Build call options with headers and compression
96 69 : final callOptionsBuilder = CallOptions(
97 138 : timeout: Duration(milliseconds: config.timeoutMillis),
98 : );
99 :
100 : // Add custom headers if provided
101 69 : final Map<String, String> metadata = {};
102 69 : if (config.headers != null) {
103 0 : metadata.addAll(config.headers!);
104 : }
105 :
106 : // Add compression header if enabled
107 69 : if (config.compression) {
108 0 : metadata['grpc-encoding'] = 'gzip';
109 : }
110 :
111 69 : return MetricsServiceClient(
112 69 : _channel,
113 69 : options: metadata.isNotEmpty
114 0 : ? callOptionsBuilder.mergedWith(CallOptions(metadata: metadata))
115 : : callOptionsBuilder,
116 : );
117 : }
118 :
119 1 : @override
120 : Future<bool> export(MetricData data) async {
121 1 : if (_shutdown) {
122 0 : if (OTelLog.isLogExport()) {
123 0 : OTelLog.logExport(
124 : 'OtlpGrpcMetricExporter: Cannot export after shutdown');
125 : }
126 : return false;
127 : }
128 :
129 2 : if (data.metrics.isEmpty) {
130 0 : if (OTelLog.isLogExport()) {
131 0 : OTelLog.logExport('OtlpGrpcMetricExporter: No metrics to export');
132 : }
133 : return true;
134 : }
135 :
136 : try {
137 1 : if (OTelLog.isLogExport()) {
138 1 : OTelLog.logExport(
139 3 : 'OtlpGrpcMetricExporter: Exporting ${data.metrics.length} metrics');
140 2 : for (final metric in data.metrics) {
141 1 : OTelLog.logExport(
142 5 : ' - ${metric.name} (${metric.type}): ${metric.points.length} data points');
143 : }
144 : }
145 :
146 : // Transform metrics data to protocol buffers
147 1 : final request = _buildExportRequest(data);
148 :
149 : // Export to the collector
150 2 : await _client.export(request);
151 :
152 0 : if (OTelLog.isLogExport()) {
153 0 : OTelLog.logExport('OtlpGrpcMetricExporter: Export successful');
154 : }
155 : return true;
156 : } catch (e, stackTrace) {
157 1 : if (OTelLog.isLogExport()) {
158 2 : OTelLog.logExport('OtlpGrpcMetricExporter: Export failed: $e');
159 2 : OTelLog.logExport('Stack trace: $stackTrace');
160 : }
161 : return false;
162 : }
163 : }
164 :
165 : /// Builds the export request from the given metrics data.
166 1 : ExportMetricsServiceRequest _buildExportRequest(MetricData data) {
167 1 : final request = ExportMetricsServiceRequest();
168 1 : final resourceMetrics = proto.ResourceMetrics();
169 :
170 : // Add resource
171 1 : if (data.resource != null) {
172 1 : resourceMetrics.resource =
173 2 : MetricTransformer.transformResource(data.resource!);
174 : } else {
175 : // Create empty resource if none provided
176 0 : resourceMetrics.resource =
177 0 : MetricTransformer.transformResource(OTel.resource(null));
178 : }
179 :
180 : // Add scope metrics
181 1 : final scopeMetrics = proto.ScopeMetrics();
182 1 : scopeMetrics.metrics
183 3 : .addAll(data.metrics.map(MetricTransformer.transformMetric));
184 :
185 : // Add instrumentation scope (hardcoded for now)
186 1 : final scope = common_proto.InstrumentationScope();
187 1 : scope.name = '@dart/dartastic_opentelemetry';
188 1 : scope.version = '1.0.0';
189 1 : scopeMetrics.scope = scope;
190 :
191 2 : resourceMetrics.scopeMetrics.add(scopeMetrics);
192 2 : request.resourceMetrics.add(resourceMetrics);
193 :
194 : return request;
195 : }
196 :
197 1 : @override
198 : Future<bool> forceFlush() async {
199 : // No-op for this exporter
200 : return true;
201 : }
202 :
203 57 : @override
204 : Future<bool> shutdown() async {
205 57 : if (_shutdown) {
206 : return true;
207 : }
208 :
209 57 : _shutdown = true;
210 : try {
211 : // Close the gRPC channel
212 : // Shutdown the stored channel
213 114 : await _channel.shutdown();
214 :
215 57 : if (OTelLog.isLogExport()) {
216 57 : OTelLog.logExport('OtlpGrpcMetricExporter: Channel shutdown completed');
217 : }
218 : return true;
219 : } catch (e) {
220 0 : if (OTelLog.isLogExport()) {
221 0 : OTelLog.logExport('OtlpGrpcMetricExporter: Shutdown failed: $e');
222 : }
223 : return false;
224 : }
225 : }
226 : }
|