Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[coordinator] Consider uninitialized case for UnaggregatedClusterNamespace with dynamic clusters #2957

Merged
merged 5 commits into from
Nov 29, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/query/api/v1/handler/namespace/ready_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ func (t *testClusters) Close() error {
panic("implement me")
}

func (t *testClusters) UnaggregatedClusterNamespace() m3.ClusterNamespace {
func (t *testClusters) UnaggregatedClusterNamespace() (m3.ClusterNamespace, bool) {
panic("implement me")
}

Expand Down
8 changes: 4 additions & 4 deletions src/query/storage/m3/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,8 @@ type Clusters interface {
NonReadyClusterNamespaces() ClusterNamespaces

// UnaggregatedClusterNamespace returns the valid unaggregated
// cluster namespace.
UnaggregatedClusterNamespace() ClusterNamespace
// cluster namespace. If the namespace is not yet initialized, returns false.
UnaggregatedClusterNamespace() (ClusterNamespace, bool)

// AggregatedClusterNamespace returns an aggregated cluster namespace
// at a specific retention and resolution.
Expand Down Expand Up @@ -258,8 +258,8 @@ func (c *clusters) NonReadyClusterNamespaces() ClusterNamespaces {
return nil
}

func (c *clusters) UnaggregatedClusterNamespace() ClusterNamespace {
return c.unaggregatedNamespace
func (c *clusters) UnaggregatedClusterNamespace() (ClusterNamespace, bool) {
return c.unaggregatedNamespace, true
}

func (c *clusters) AggregatedClusterNamespace(
Expand Down
16 changes: 13 additions & 3 deletions src/query/storage/m3/cluster_resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,12 @@ func resolveClusterNamespacesForQuery(
// First check if the unaggregated cluster can fully satisfy the query range.
// If so, return it and shortcircuit, as unaggregated will necessarily have
// every metric.
unaggregated := resolveUnaggregatedNamespaceForQuery(now, start,
clusters.UnaggregatedClusterNamespace(), opts)
ns, initialized := clusters.UnaggregatedClusterNamespace()
if !initialized {
return consolidators.NamespaceInvalid, nil, errUnaggregatedNamespaceUninitialized
}

unaggregated := resolveUnaggregatedNamespaceForQuery(now, start, ns, opts)
if unaggregated.satisfies == fullySatisfiesRange {
return consolidators.NamespaceCoversAllQueryRange,
ClusterNamespaces{unaggregated.clusterNamespace},
Expand Down Expand Up @@ -329,7 +333,13 @@ func resolveClusterNamespacesForQueryWithRestrictQueryOptions(

switch restrict.MetricsType {
case storagemetadata.UnaggregatedMetricsType:
return result(clusters.UnaggregatedClusterNamespace(), nil)
ns, ok := clusters.UnaggregatedClusterNamespace()
if !ok {
return result(nil,
fmt.Errorf("could not find unaggregated namespace for storage policy: %v",
restrict.StoragePolicy.String()))
}
return result(ns, nil)
case storagemetadata.AggregatedMetricsType:
ns, ok := clusters.AggregatedClusterNamespace(RetentionResolution{
Retention: restrict.StoragePolicy.Retention().Duration(),
Expand Down
5 changes: 3 additions & 2 deletions src/query/storage/m3/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,8 @@ func TestNewClustersFromConfig(t *testing.T) {
require.NoError(t, err)

// Resolve expected clusters and check attributes
unaggregatedNs := clusters.UnaggregatedClusterNamespace()
unaggregatedNs, initialized := clusters.UnaggregatedClusterNamespace()
assert.True(t, initialized)
assert.Equal(t, "unaggregated", unaggregatedNs.NamespaceID().String())
assert.Equal(t, storagemetadata.Attributes{
MetricsType: storagemetadata.UnaggregatedMetricsType,
Expand Down Expand Up @@ -209,7 +210,7 @@ func (n *noopCluster) NonReadyClusterNamespaces() ClusterNamespaces {
panic("implement me")
}

func (n *noopCluster) UnaggregatedClusterNamespace() ClusterNamespace {
func (n *noopCluster) UnaggregatedClusterNamespace() (ClusterNamespace, bool) {
panic("implement me")
}

Expand Down
6 changes: 3 additions & 3 deletions src/query/storage/m3/dynamic_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -418,12 +418,12 @@ func (d *dynamicCluster) NonReadyClusterNamespaces() ClusterNamespaces {
return nonReadyNamespaces
}

func (d *dynamicCluster) UnaggregatedClusterNamespace() ClusterNamespace {
func (d *dynamicCluster) UnaggregatedClusterNamespace() (ClusterNamespace, bool) {
d.RLock()
unaggregatedNamespaces := d.unaggregatedNamespace
unaggregatedNamespace := d.unaggregatedNamespace
d.RUnlock()

return unaggregatedNamespaces
return unaggregatedNamespace, (unaggregatedNamespace != nil)
}

func (d *dynamicCluster) AggregatedClusterNamespace(attrs RetentionResolution) (ClusterNamespace, bool) {
Expand Down
45 changes: 44 additions & 1 deletion src/query/storage/m3/dynamic_cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,46 @@ func newNamespaceOptions() namespace.Options {
return namespace.NewOptions().SetStagingState(state)
}

func TestDynamicClustersUninitialized(t *testing.T) {
t.Parallel()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this necessary?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

new linting rule, paralleltest, that was biting me. Not sure if others have a workaround?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had a workaround here: d2a85bd 😜

Merging master should do it 👍


ctrl := xtest.NewController(t)
defer ctrl.Finish()

mockSession := client.NewMockSession(ctrl)

// setup dynamic cluster without any namespaces
mapCh := make(nsMapCh, 10)
nsInitializer := newFakeNsInitializer(t, ctrl, mapCh, false)

cfg := DynamicClusterNamespaceConfiguration{
session: mockSession,
nsInitializer: nsInitializer,
}

opts := newTestOptions(cfg)

clusters, err := NewDynamicClusters(opts)
require.NoError(t, err)

//nolint:errcheck
defer clusters.Close()

// Aggregated namespaces should not exist
_, ok := clusters.AggregatedClusterNamespace(RetentionResolution{
Retention: 48 * time.Hour,
Resolution: 1 * time.Minute,
})
require.False(t, ok)

// Unaggregated namespaces should not be initialized
_, ok = clusters.UnaggregatedClusterNamespace()
require.False(t, ok)

// Cluster namespaces should be empty
require.Len(t, clusters.ClusterNamespaces(), 0)
}

func TestDynamicClustersInitialization(t *testing.T) {
ctrl := xtest.NewController(t)
defer ctrl.Finish()
Expand Down Expand Up @@ -424,7 +464,10 @@ func assertClusterNamespace(clusters Clusters, expectedID ident.ID, expectedOpts
return false
}
} else {
ns = clusters.UnaggregatedClusterNamespace()
ns, ok = clusters.UnaggregatedClusterNamespace()
if !ok {
return false
}
}
return assert.ObjectsAreEqual(expectedID.String(), ns.NamespaceID().String()) &&
assert.ObjectsAreEqual(expectedOpts, ns.Options())
Expand Down
11 changes: 8 additions & 3 deletions src/query/storage/m3/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,9 @@ const (
var (
errUnaggregatedAndAggregatedDisabled = goerrors.New("fetch options has both" +
" aggregated and unaggregated namespace lookup disabled")
errNoNamespacesConfigured = goerrors.New("no namespaces configured")
errNoNamespacesConfigured = goerrors.New("no namespaces configured")
errUnaggregatedNamespaceUninitialized = goerrors.New(
"unaggregated namespace is not yet initialized")
)

type m3storage struct {
Expand Down Expand Up @@ -691,18 +693,21 @@ func (s *m3storage) writeSingle(
var (
namespace ClusterNamespace
err error
exists bool
)

attributes := query.Attributes()
switch attributes.MetricsType {
case storagemetadata.UnaggregatedMetricsType:
namespace = s.clusters.UnaggregatedClusterNamespace()
namespace, exists = s.clusters.UnaggregatedClusterNamespace()
if !exists {
err = errUnaggregatedNamespaceUninitialized
}
case storagemetadata.AggregatedMetricsType:
attrs := RetentionResolution{
Retention: attributes.Retention,
Resolution: attributes.Resolution,
}
var exists bool
namespace, exists = s.clusters.AggregatedClusterNamespace(attrs)
if !exists {
err = fmt.Errorf("no configured cluster namespace for: retention=%s,"+
Expand Down
20 changes: 20 additions & 0 deletions src/query/storage/m3/storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,26 @@ func TestLocalWriteAggregatedNoClusterNamespaceError(t *testing.T) {
fmt.Sprintf("unexpected error string: %v", err.Error()))
}

func TestLocalWriteUnaggregatedNamespaceUninitializedError(t *testing.T) {
t.Parallel()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto for this usage


ctrl := xtest.NewController(t)
defer ctrl.Finish()
// We setup an empty dynamic cluster, which will by default
// have an uninitialized unaggregated namespace.
store := newTestStorage(t, &dynamicCluster{})

opts := newWriteQuery(t).Options()

writeQuery, err := storage.NewWriteQuery(opts)
require.NoError(t, err)

err = store.Write(context.TODO(), writeQuery)
assert.Error(t, err)
assert.True(t, strings.Contains(err.Error(), "unaggregated namespace is not yet initialized"),
fmt.Sprintf("unexpected error string: %v", err.Error()))
}

func TestLocalWriteAggregatedInvalidMetricsTypeError(t *testing.T) {
ctrl := xtest.NewController(t)
defer ctrl.Finish()
Expand Down