-
Notifications
You must be signed in to change notification settings - Fork 23
/
FlagdProvider.cs
595 lines (530 loc) · 23.6 KB
/
FlagdProvider.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
using System;
using System.IO;
using System.Text;
using System.Linq;
using System.Threading.Tasks;
using System.Security.Cryptography.X509Certificates;
using System.Net.Security;
using Google.Protobuf.WellKnownTypes;
using Grpc.Core;
using Grpc.Net.Client;
using OpenFeature.Model;
using OpenFeature.Error;
using OpenFeature.Flagd.Grpc;
using Metadata = OpenFeature.Model.Metadata;
using Value = OpenFeature.Model.Value;
using ProtoValue = Google.Protobuf.WellKnownTypes.Value;
using System.Net.Sockets;
using System.Net.Http;
using System.Collections.Generic;
namespace OpenFeature.Contrib.Providers.Flagd
{
/// <summary>
/// FlagdProvider is the OpenFeature provider for flagD.
/// </summary>
public sealed class FlagdProvider : FeatureProvider
{
static int EventStreamRetryBaseBackoff = 1;
private readonly FlagdConfig _config;
private readonly Service.ServiceClient _client;
private readonly Metadata _providerMetadata = new Metadata("flagd Provider");
private readonly ICache<string, object> _cache;
private int _eventStreamRetries;
private int _eventStreamRetryBackoff = EventStreamRetryBaseBackoff;
private System.Threading.Mutex _mtx;
/// <summary>
/// Constructor of the provider. This constructor uses the value of the following
/// environment variables to initialise its client:
/// FLAGD_HOST - The host name of the flagd server (default="localhost")
/// FLAGD_PORT - The port of the flagd server (default="8013")
/// FLAGD_TLS - Determines whether to use https or not (default="false")
/// FLAGD_FLAGD_SERVER_CERT_PATH - The path to the client certificate (default="")
/// FLAGD_SOCKET_PATH - Path to the unix socket (default="")
/// FLAGD_CACHE - Enable or disable the cache (default="false")
/// FLAGD_MAX_CACHE_SIZE - The maximum size of the cache (default="10")
/// FLAGD_MAX_EVENT_STREAM_RETRIES - The maximum amount of retries for establishing the EventStream
/// </summary>
public FlagdProvider() : this(new FlagdConfig())
{
}
/// <summary>
/// Constructor of the provider. This constructor uses the value of the following
/// environment variables to initialise its client:
/// FLAGD_FLAGD_SERVER_CERT_PATH - The path to the client certificate (default="")
/// FLAGD_CACHE - Enable or disable the cache (default="false")
/// FLAGD_MAX_CACHE_SIZE - The maximum size of the cache (default="10")
/// FLAGD_MAX_EVENT_STREAM_RETRIES - The maximum amount of retries for establishing the EventStream
/// <param name="url">The URL of the flagD server</param>
/// <exception cref="ArgumentNullException">if no url is provided.</exception>
/// </summary>
public FlagdProvider(Uri url) : this(new FlagdConfig(url))
{
}
/// <summary>
/// Constructor of the provider.
/// <param name="config">The FlagdConfig object</param>
/// <exception cref="ArgumentNullException">if no config object is provided.</exception>
/// </summary>
public FlagdProvider(FlagdConfig config)
{
if (config == null)
{
throw new ArgumentNullException(nameof(config));
}
_config = config;
_client = BuildClientForPlatform(_config.GetUri());
_mtx = new System.Threading.Mutex();
if (_config.CacheEnabled)
{
_cache = new LRUCache<string, object>(_config.MaxCacheSize);
Task.Run(async () =>
{
await HandleEvents();
});
}
}
// just for testing, internal but visible in tests
internal FlagdProvider(Service.ServiceClient client, FlagdConfig config, ICache<string, object> cache = null)
{
_mtx = new System.Threading.Mutex();
_client = client;
_config = config;
_cache = cache;
if (_config.CacheEnabled)
{
Task.Run(async () =>
{
await HandleEvents();
});
}
}
// just for testing, internal but visible in tests
internal FlagdConfig GetConfig() => _config;
/// <summary>
/// Get the provider name.
/// </summary>
public static string GetProviderName()
{
return Api.Instance.GetProviderMetadata().Name;
}
/// <summary>
/// Return the metadata associated to this provider.
/// </summary>
public override Metadata GetMetadata() => _providerMetadata;
/// <summary>
/// Return the Grpc client of the provider
/// </summary>
public Service.ServiceClient GetClient() => _client;
/// <summary>
/// ResolveBooleanValue resolve the value for a Boolean Flag.
/// </summary>
/// <param name="flagKey">Name of the flag</param>
/// <param name="defaultValue">Default value used in case of error.</param>
/// <param name="context">Context about the user</param>
/// <returns>A ResolutionDetails object containing the value of your flag</returns>
public override async Task<ResolutionDetails<bool>> ResolveBooleanValue(string flagKey, bool defaultValue, EvaluationContext context = null)
{
return await ResolveValue(flagKey, async contextStruct =>
{
var resolveBooleanResponse = await _client.ResolveBooleanAsync(new ResolveBooleanRequest
{
Context = contextStruct,
FlagKey = flagKey
});
return new ResolutionDetails<bool>(
flagKey: flagKey,
value: (bool)resolveBooleanResponse.Value,
reason: resolveBooleanResponse.Reason,
variant: resolveBooleanResponse.Variant
);
}, context);
}
/// <summary>
/// ResolveStringValue resolve the value for a string Flag.
/// </summary>
/// <param name="flagKey">Name of the flag</param>
/// <param name="defaultValue">Default value used in case of error.</param>
/// <param name="context">Context about the user</param>
/// <returns>A ResolutionDetails object containing the value of your flag</returns>
public override async Task<ResolutionDetails<string>> ResolveStringValue(string flagKey, string defaultValue, EvaluationContext context = null)
{
return await ResolveValue(flagKey, async contextStruct =>
{
var resolveStringResponse = await _client.ResolveStringAsync(new ResolveStringRequest
{
Context = contextStruct,
FlagKey = flagKey
});
return new ResolutionDetails<string>(
flagKey: flagKey,
value: resolveStringResponse.Value,
reason: resolveStringResponse.Reason,
variant: resolveStringResponse.Variant
);
}, context);
}
/// <summary>
/// ResolveIntegerValue resolve the value for an int Flag.
/// </summary>
/// <param name="flagKey">Name of the flag</param>
/// <param name="defaultValue">Default value used in case of error.</param>
/// <param name="context">Context about the user</param>
/// <returns>A ResolutionDetails object containing the value of your flag</returns>
public override async Task<ResolutionDetails<int>> ResolveIntegerValue(string flagKey, int defaultValue, EvaluationContext context = null)
{
return await ResolveValue(flagKey, async contextStruct =>
{
var resolveIntResponse = await _client.ResolveIntAsync(new ResolveIntRequest
{
Context = contextStruct,
FlagKey = flagKey
});
return new ResolutionDetails<int>(
flagKey: flagKey,
value: (int)resolveIntResponse.Value,
reason: resolveIntResponse.Reason,
variant: resolveIntResponse.Variant
);
}, context);
}
/// <summary>
/// ResolveDoubleValue resolve the value for a double Flag.
/// </summary>
/// <param name="flagKey">Name of the flag</param>
/// <param name="defaultValue">Default value used in case of error.</param>
/// <param name="context">Context about the user</param>
/// <returns>A ResolutionDetails object containing the value of your flag</returns>
public override async Task<ResolutionDetails<double>> ResolveDoubleValue(string flagKey, double defaultValue, EvaluationContext context = null)
{
return await ResolveValue(flagKey, async contextStruct =>
{
var resolveDoubleResponse = await _client.ResolveFloatAsync(new ResolveFloatRequest
{
Context = contextStruct,
FlagKey = flagKey
});
return new ResolutionDetails<double>(
flagKey: flagKey,
value: resolveDoubleResponse.Value,
reason: resolveDoubleResponse.Reason,
variant: resolveDoubleResponse.Variant
);
}, context);
}
/// <summary>
/// ResolveStructureValue resolve the value for a Boolean Flag.
/// </summary>
/// <param name="flagKey">Name of the flag</param>
/// <param name="defaultValue">Default value used in case of error.</param>
/// <param name="context">Context about the user</param>
/// <returns>A ResolutionDetails object containing the value of your flag</returns>
public override async Task<ResolutionDetails<Value>> ResolveStructureValue(string flagKey, Value defaultValue, EvaluationContext context = null)
{
return await ResolveValue(flagKey, async contextStruct =>
{
var resolveObjectResponse = await _client.ResolveObjectAsync(new ResolveObjectRequest
{
Context = contextStruct,
FlagKey = flagKey
});
return new ResolutionDetails<Value>(
flagKey: flagKey,
value: ConvertObjectToValue(resolveObjectResponse.Value),
reason: resolveObjectResponse.Reason,
variant: resolveObjectResponse.Variant
);
}, context);
}
private async Task<ResolutionDetails<T>> ResolveValue<T>(string flagKey, Func<Struct, Task<ResolutionDetails<T>>> resolveDelegate, EvaluationContext context = null)
{
try
{
if (_config.CacheEnabled)
{
var value = _cache.TryGet(flagKey);
if (value != null)
{
return (ResolutionDetails<T>)value;
}
}
var result = await resolveDelegate.Invoke(ConvertToContext(context));
if (result.Reason.Equals("STATIC") && _config.CacheEnabled)
{
_cache.Add(flagKey, result);
}
return result;
}
catch (RpcException e)
{
throw GetOFException(e);
}
}
/// <summary>
/// GetOFException returns a OpenFeature Exception containing an error code to describe the encountered error.
/// </summary>
/// <param name="e">The exception thrown by the Grpc client</param>
/// <returns>A ResolutionDetails object containing the value of your flag</returns>
private FeatureProviderException GetOFException(Grpc.Core.RpcException e)
{
switch (e.Status.StatusCode)
{
case Grpc.Core.StatusCode.NotFound:
return new FeatureProviderException(Constant.ErrorType.FlagNotFound, e.Status.Detail, e);
case Grpc.Core.StatusCode.Unavailable:
return new FeatureProviderException(Constant.ErrorType.ProviderNotReady, e.Status.Detail, e);
case Grpc.Core.StatusCode.InvalidArgument:
return new FeatureProviderException(Constant.ErrorType.TypeMismatch, e.Status.Detail, e);
default:
return new FeatureProviderException(Constant.ErrorType.General, e.Status.Detail, e);
}
}
private async Task HandleEvents()
{
while (_eventStreamRetries < _config.MaxEventStreamRetries)
{
var call = _client.EventStream(new Empty());
try
{
// Read the response stream asynchronously
while (await call.ResponseStream.MoveNext())
{
var response = call.ResponseStream.Current;
switch (response.Type.ToLower())
{
case "configuration_change":
HandleConfigurationChangeEvent(response.Data);
break;
case "provider_ready":
HandleProviderReadyEvent();
break;
default:
break;
}
}
}
catch (RpcException ex) when (ex.StatusCode == StatusCode.Unavailable)
{
// Handle the dropped connection by reconnecting and retrying the stream
await HandleErrorEvent();
}
}
}
private void HandleConfigurationChangeEvent(Struct data)
{
// if we don't have a cache, we don't need to remove anything
if (!_config.CacheEnabled || !data.Fields.ContainsKey("flags"))
{
return;
}
try
{
if (data.Fields.TryGetValue("flags", out ProtoValue val))
{
if (val.KindCase == ProtoValue.KindOneofCase.StructValue)
{
val.StructValue.Fields.ToList().ForEach(flag =>
{
_cache.Delete(flag.Key);
});
}
var structVal = val.StructValue;
}
}
catch (Exception)
{
// purge the cache if we could not handle the configuration change event
_cache.Purge();
}
}
private void HandleProviderReadyEvent()
{
_mtx.WaitOne();
_eventStreamRetries = 0;
_eventStreamRetryBackoff = EventStreamRetryBaseBackoff;
_mtx.ReleaseMutex();
_cache.Purge();
}
private async Task HandleErrorEvent()
{
_mtx.WaitOne();
_eventStreamRetries++;
if (_eventStreamRetries > _config.MaxEventStreamRetries)
{
return;
}
_eventStreamRetryBackoff = _eventStreamRetryBackoff * 2;
_mtx.ReleaseMutex();
await Task.Delay(_eventStreamRetryBackoff * 1000);
}
/// <summary>
/// ConvertToContext converts the given EvaluationContext to a Struct.
/// </summary>
/// <param name="ctx">The evaluation context</param>
/// <returns>A Struct object containing the evaluation context</returns>
private static Struct ConvertToContext(EvaluationContext ctx)
{
if (ctx == null)
{
return new Struct();
}
var values = new Struct();
foreach (var entry in ctx)
{
values.Fields.Add(entry.Key, ConvertToProtoValue(entry.Value));
}
return values;
}
/// <summary>
/// ConvertToProtoValue converts the given Value to a ProtoValue.
/// </summary>
/// <param name="value">The value</param>
/// <returns>A ProtoValue object representing the given value</returns>
private static ProtoValue ConvertToProtoValue(Value value)
{
if (value.IsList)
{
return ProtoValue.ForList(value.AsList.Select(ConvertToProtoValue).ToArray());
}
if (value.IsStructure)
{
var values = new Struct();
foreach (var entry in value.AsStructure)
{
values.Fields.Add(entry.Key, ConvertToProtoValue(entry.Value));
}
return ProtoValue.ForStruct(values);
}
if (value.IsBoolean)
{
return ProtoValue.ForBool(value.AsBoolean ?? false);
}
if (value.IsString)
{
return ProtoValue.ForString(value.AsString);
}
if (value.IsNumber)
{
return ProtoValue.ForNumber(value.AsDouble ?? 0.0);
}
return ProtoValue.ForNull();
}
/// <summary>
/// ConvertObjectToValue converts the given Struct to a Value.
/// </summary>
/// <param name="src">The struct</param>
/// <returns>A Value object representing the given struct</returns>
private static Value ConvertObjectToValue(Struct src) =>
new Value(new Structure(src.Fields
.ToDictionary(entry => entry.Key, entry => ConvertToValue(entry.Value))));
/// <summary>
/// ConvertToValue converts the given ProtoValue to a Value.
/// </summary>
/// <param name="src">The value, represented as ProtoValue</param>
/// <returns>A Value object representing the given value</returns>
private static Value ConvertToValue(ProtoValue src)
{
switch (src.KindCase)
{
case ProtoValue.KindOneofCase.ListValue:
return new Value(src.ListValue.Values.Select(ConvertToValue).ToList());
case ProtoValue.KindOneofCase.StructValue:
return new Value(ConvertObjectToValue(src.StructValue));
case ProtoValue.KindOneofCase.None:
case ProtoValue.KindOneofCase.NullValue:
case ProtoValue.KindOneofCase.NumberValue:
case ProtoValue.KindOneofCase.StringValue:
case ProtoValue.KindOneofCase.BoolValue:
default:
return ConvertToPrimitiveValue(src);
}
}
/// <summary>
/// ConvertToPrimitiveValue converts the given ProtoValue to a Value.
/// </summary>
/// <param name="value">The value, represented as ProtoValue</param>
/// <returns>A Value object representing the given value as a primitive data type</returns>
private static Value ConvertToPrimitiveValue(ProtoValue value)
{
switch (value.KindCase)
{
case ProtoValue.KindOneofCase.BoolValue:
return new Value(value.BoolValue);
case ProtoValue.KindOneofCase.StringValue:
return new Value(value.StringValue);
case ProtoValue.KindOneofCase.NumberValue:
return new Value(value.NumberValue);
case ProtoValue.KindOneofCase.NullValue:
case ProtoValue.KindOneofCase.StructValue:
case ProtoValue.KindOneofCase.ListValue:
case ProtoValue.KindOneofCase.None:
default:
return new Value();
}
}
private Service.ServiceClient BuildClientForPlatform(Uri url)
{
var useUnixSocket = url.ToString().StartsWith("unix://");
if (!useUnixSocket)
{
#if NET462_OR_GREATER
var handler = new WinHttpHandler();
#else
var handler = new HttpClientHandler();
#endif
if (_config.UseCertificate)
{
if (File.Exists(_config.CertificatePath))
{
X509Certificate2 certificate = new X509Certificate2(_config.CertificatePath);
#if NET5_0_OR_GREATER
handler.ServerCertificateCustomValidationCallback = (message, cert, chain, _) => {
// the the custom cert to the chain, Build returns a bool if valid.
chain.ChainPolicy.TrustMode = X509ChainTrustMode.CustomRootTrust;
chain.ChainPolicy.CustomTrustStore.Add(certificate);
return chain.Build(cert);
};
#elif NET462_OR_GREATER
handler.ServerCertificateValidationCallback = (message, cert, chain, errors) => {
if (errors == SslPolicyErrors.None) { return true; }
chain.ChainPolicy.VerificationFlags = X509VerificationFlags.AllowUnknownCertificateAuthority;
chain.ChainPolicy.ExtraStore.Add(certificate);
var isChainValid = chain.Build(cert);
if (!isChainValid) { return false; }
var isValid = chain.ChainElements
.Cast<X509ChainElement>()
.Any(x => x.Certificate.RawData.SequenceEqual(certificate.GetRawCertData()));
return isValid;
};
#else
throw new ArgumentException("Custom Certificates are not supported on your platform");
#endif
}
else
{
throw new ArgumentException("Specified certificate cannot be found.");
}
}
return new Service.ServiceClient(GrpcChannel.ForAddress(url, new GrpcChannelOptions
{
HttpHandler = handler
}));
}
#if NET5_0_OR_GREATER
var udsEndPoint = new UnixDomainSocketEndPoint(url.ToString().Substring("unix://".Length));
var connectionFactory = new UnixDomainSocketConnectionFactory(udsEndPoint);
var socketsHttpHandler = new SocketsHttpHandler
{
ConnectCallback = connectionFactory.ConnectAsync
};
// point to localhost and let the custom ConnectCallback handle the communication over the unix socket
// see https://learn.microsoft.com/en-us/aspnet/core/grpc/interprocess-uds?view=aspnetcore-7.0 for more details
return new Service.ServiceClient(GrpcChannel.ForAddress("http://localhost", new GrpcChannelOptions
{
HttpHandler = socketsHttpHandler,
}));
#endif
// unix socket support is not available in this dotnet version
throw new Exception("unix sockets are not supported in this version.");
}
}
}