Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Akka.Cluster.Sharding: duplicate shards / entities #6973

Closed
Aaronontheweb opened this issue Oct 25, 2023 · 26 comments · Fixed by #7230, #7290, #7297 or #7298
Closed

Akka.Cluster.Sharding: duplicate shards / entities #6973

Aaronontheweb opened this issue Oct 25, 2023 · 26 comments · Fixed by #7230, #7290, #7297 or #7298

Comments

@Aaronontheweb
Copy link
Member

Version Information
Version of Akka.NET? v1.5 - all versions including v1.5.13
Which Akka.NET Modules? Akka.Cluster.Sharding

Describe the bug

We have very rough and loose data on this right now, but it's been reported by multiple users including on petabridge/Akka.Persistence.Azure#350 - looks like there could be something wrong with Akka.Cluster.Sharding in v1.5 that allows a Shard to be allocated more than once. This is our thread to investigate.

If you have run into this issue, provide a full dump of your config here please

@Aaronontheweb
Copy link
Member Author

Possible causes / issues I want to investigate while we gather data from users:

  • ShardRegions are out of sync - different nodes have different ideas on where shards are stored temporarily; eventually this gets resolved by DData gossip.
  • ShardCoordinator accidentally allocates shard to multiple locations, causing the first problem on this list - but the problem is persistent
  • Rebalancing causes ShardRegions to be out of sync and the process responsible for syncing the reallocation of shards can lead to inconsistencies in the event of network disruptions. This would only occur in "scale up" scenarios where the original shard home is still alive.

All of these scenarios are things that would be caused by DData's eventual consistency, in one form or another. Going to start poking around and see if I can spot anything that might lead to problems here.

However, with the one piece of customer data I have in front of me - it's possible that this could be caused when state-store-mode=persistence too. Going to look into it.

@Aaronontheweb
Copy link
Member Author

When using state-store-mode=ddata, I think our built in replicator consistency settings are fine in v1.5:

if (settings.TuningParameters.CoordinatorStateReadMajorityPlus == int.MaxValue)
_stateReadConsistency = new ReadAll(settings.TuningParameters.WaitingForStateTimeout);
else
_stateReadConsistency = new ReadMajorityPlus(settings.TuningParameters.WaitingForStateTimeout, settings.TuningParameters.CoordinatorStateReadMajorityPlus, majorityMinCap);
if (settings.TuningParameters.CoordinatorStateWriteMajorityPlus == int.MaxValue)
_stateWriteConsistency = new WriteAll(settings.TuningParameters.UpdatingStateTimeout);
else
_stateWriteConsistency = new WriteMajorityPlus(settings.TuningParameters.UpdatingStateTimeout, settings.TuningParameters.CoordinatorStateWriteMajorityPlus, majorityMinCap);

Can rule that out as a source of problems.

@Aaronontheweb
Copy link
Member Author

Might be a false alarm - first user had multiple clusters all writing to the same persistence store for v1.5 shard coordinator data.

@JoeWorkyWork
Copy link

Here's our akka setup config as requested on discord.

services.AddAkka(AkkaSystemName, (akkaConfig, builder) =>
        {
            akkaConfig
                .WithCustomSerializer("messageOutcomeSerializer", new[] {typeof(MessageOutcome)}, system => new MessageOutcomeSerializer(system))
                .ConfigureLoggers(configBuilder => configBuilder.AddLogger<SerilogLogger>())
                .CreateActors(configuration, builder, tokenCredential);

            if (SystemEnvironment.IsDevelopment)
            {
                const int akkaPort = 4053;
                const string akkaHost = "localhost";

                akkaConfig
                    .WithInMemoryJournal()
                    .WithRemoting(akkaHost, akkaPort)
                    .WithClustering(new ClusterOptions
                    {
                        Roles = new[] { "subscriber" },
                        SeedNodes = new[] { $"akka.tcp://{AkkaSystemName}@{akkaHost}:{akkaPort}" }
                    });
            }
            else
            {
                var eventStorageOptions = configuration.GetRequiredSection("AkkaEventStorage").Get<AkkaEventStorageOptions>()!;

                akkaConfig
                    .WithAzureTableJournal(new Uri(eventStorageOptions.TableUri), tokenCredential)
                    .WithRemoting(configuration["CLUSTER_IP"], int.Parse(configuration["CLUSTER_PORT"]!)) // env variables "CLUSTER_IP" and "CLUSTER_PORT" set in kube-application-state in params.libsonnet under env+
                    .WithClustering(new ClusterOptions { Roles = new[] { "subscriber" } })
                    .WithClusterBootstrap(serviceName: "notifier", portName: "management", autoStart: true) // portName "management" set in kube-application-state in params.libsonnet under port
                    .WithKubernetesDiscovery(configuration["APP_LABEL"]) // env variable "APP_LABEL" set in kube-application-state in params.libsonnet under env+
                    .AddPetabridgeCmd(new PetabridgeCmdOptions { Host = "localhost", Port = 8222 }, cmd =>
                    {
                        cmd.RegisterCommandPalette(ClusterCommands.Instance);
                        cmd.RegisterCommandPalette(ClusterShardingCommands.Instance);
                    });

                var leaseBlobOptions = configuration.GetRequiredSection("AkkaLeaseBlob").Get<AkkaLeaseBlobOptions>()!;
                var container = new BlobContainerClient(leaseBlobOptions.BlobUri, tokenCredential);
                container.CreateIfNotExistsAsync().Wait();
                var blobLeaseClient = container.GetBlobLeaseClient();

                akkaConfig.WithSingleton<ClusterLeaseGuardActor>(
                    "listener-lease-guard",
                    Props.Create(() => new ClusterLeaseGuardActor(blobLeaseClient)));
                
            }
        });

        return services;
    }

    private static void CreateActors(this AkkaConfigurationBuilder akkaConfig,
        IConfiguration configuration,
        IServiceProvider builder,
        TokenCredential tokenCredential)
    {
        var notifierQueueOptions = configuration.GetRequiredSection("NotifierQueue").Get<NotifierQueueOptions>();
        var notifierServiceBusClient = new ServiceBusClient(notifierQueueOptions!.FullyQualifiedNamespace, tokenCredential);
        var workQueueServiceBusSender = notifierServiceBusClient.CreateSender(notifierQueueOptions.WorkQueueName);
        var projectionServiceBusSender = notifierServiceBusClient.CreateSender(notifierQueueOptions.ProjectionQueueName);

        IActorRef removeScheduleActor = null!;
        IActorRef scheduleMessageActor = null!;
        IActorRef projectionActor = null!;

        akkaConfig.WithActors((system, _) =>
        {
            removeScheduleActor = system.ActorOf(Props.Create(() => new RemoveScheduledMessageActor(workQueueServiceBusSender)));
            scheduleMessageActor = system.ActorOf(Props.Create(() => new ScheduleMessageActor(workQueueServiceBusSender)));
            projectionActor = system.ActorOf(Props.Create(() => new ProjectionWriterActor(projectionServiceBusSender)));
        });

        akkaConfig.WithShardRegion<SchedulingManagerActor>(
            nameof(SchedulingManagerActor),
            _ => Props.Create(() => new SchedulingManagerActor(removeScheduleActor, scheduleMessageActor, projectionActor, builder.GetRequiredService<IScheduleMessageCommandFactory>())
            ),
            new MessageExtractor(),
            new ShardOptions {StateStoreMode = StateStoreMode.DData, Role = "subscriber"});

        akkaConfig.WithActors((system, registry) =>
        {
            DeadLetterMonitorActor.Start(system);

            var edgeTopic = configuration.GetRequiredSection("EdgeTopic").Get<EdgeTopicOptions>();
            var edgeServiceBusClient = new ServiceBusClient(edgeTopic!.FullyQualifiedNamespace, tokenCredential);

             system.ActorOf(Props.Create(() => new PauseEdgeEventsServiceBusListenerActor(
                edgeServiceBusClient.CreateReceiver(edgeTopic.Name, edgeTopic.SubscriptionName, new ServiceBusReceiverOptions()),
                registry.Get<SchedulingManagerActor>(),
                SchedulingManagerActorBase.ConvertBusMessageToShardEnvelope)));

            var notifyEdgeActor = system.ActorOf(Props.Create(() =>
                new NotifyEdgeActor(
                    builder.GetRequiredService<IPublicWebApiService>(),
                    registry.Get<SchedulingManagerActor>(),
                    scheduleMessageActor,
                    builder.GetRequiredService<IDateTimeProvider>(),
                    builder.GetRequiredService<IDailyNotificationDateCalculator>())));

            system.ActorOf(Props.Create(() => new PauseNotificationServiceBusListenerActor(
                notifierServiceBusClient.CreateReceiver(notifierQueueOptions.WorkQueueName, new ServiceBusReceiverOptions()),
                notifyEdgeActor,
                NotifyEdgeActor.MapMessage)));
        });
    }

@JoeWorkyWork
Copy link

This is the actorsystem Settings.ToString()

    akka : {
        version : "0.0.1 Akka"
        home : 
        loggers : ["Akka.Event.DefaultLogger, Akka, Version=1.5.13.0, Culture=neutral, PublicKeyToken=null","Akka.Logger.Serilog.SerilogLogger, Akka.Logger.Serilog, Version=1.5.12.1, Culture=neutral, PublicKeyToken=null"]
        loggers-dispatcher : akka.actor.default-dispatcher
        logger-startup-timeout : 5s
        logger-async-start : false
        logger-formatter : "Akka.Event.DefaultLogMessageFormatter, Akka, Version=1.5.13.0, Culture=neutral, PublicKeyToken=null"
        loglevel : Info
        suppress-json-serializer-warning : on
        stdout-loglevel : WARNING
        stdout-logger-class : Akka.Event.StandardOutLogger
        log-config-on-start : false
        log-serializer-override-on-start : on
        log-dead-letters : 10
        log-dead-letters-during-shutdown : off
        log-dead-letters-suspend-duration : "5 minutes"
        extensions : ["Akka.Management.Cluster.Bootstrap.ClusterBootstrapProvider, Akka.Management, Version=1.5.7.0, Culture=neutral, PublicKeyToken=null"]
        daemonic : off
        actor : {
          provider : Akka.Actor.LocalActorRefProvider
          guardian-supervisor-strategy : Akka.Actor.DefaultSupervisorStrategy
          creation-timeout : 20s
          reaper-interval : 5
          serialize-messages : off
          serialize-creators : off
          unstarted-push-timeout : 10s
          ask-timeout : infinite
          telemetry : {
            enabled : false
          }
          typed : {
            timeout : 5
          }
          inbox : {
            inbox-size : 1000
            default-timeout : 5s
          }
          router : {
            type-mapping : {
              from-code : Akka.Routing.NoRouter
              round-robin-pool : Akka.Routing.RoundRobinPool
              round-robin-group : Akka.Routing.RoundRobinGroup
              random-pool : Akka.Routing.RandomPool
              random-group : Akka.Routing.RandomGroup
              smallest-mailbox-pool : Akka.Routing.SmallestMailboxPool
              broadcast-pool : Akka.Routing.BroadcastPool
              broadcast-group : Akka.Routing.BroadcastGroup
              scatter-gather-pool : Akka.Routing.ScatterGatherFirstCompletedPool
              scatter-gather-group : Akka.Routing.ScatterGatherFirstCompletedGroup
              consistent-hashing-pool : Akka.Routing.ConsistentHashingPool
              consistent-hashing-group : Akka.Routing.ConsistentHashingGroup
              tail-chopping-pool : Akka.Routing.TailChoppingPool
              tail-chopping-group : Akka.Routing.TailChoppingGroup
              cluster-metrics-adaptive-pool : "Akka.Cluster.Metrics.AdaptiveLoadBalancingPool, Akka.Cluster.Metrics"
              cluster-metrics-adaptive-group : "Akka.Cluster.Metrics.AdaptiveLoadBalancingGroup, Akka.Cluster.Metrics"
            }
          }
          deployment : {
            default : {
              dispatcher : 
              mailbox : 
              stash-capacity : -1
              router : from-code
              nr-of-instances : 1
              within : "5 s"
              virtual-nodes-factor : 10
              routees : {
                paths : <<unknown value>>
              }
              resizer : {
                enabled : off
                lower-bound : 1
                upper-bound : 10
                pressure-threshold : 1
                rampup-rate : 0.2
                backoff-threshold : 0.3
                backoff-rate : 0.1
                messages-per-resize : 10
              }
              remote : 
              target : {
                nodes : <<unknown value>>
              }
              metrics-selector : mix
              cluster : {
                enabled : off
                max-nr-of-instances-per-node : 1
                max-total-nr-of-instances : 10000
                allow-local-routees : on
                use-role : 
              }
            }
            /SD-DNS/async-dns : {
              mailbox : unbounded
              router : round-robin-pool
              nr-of-instances : 1
            }
          }
          synchronized-dispatcher : {
            type : SynchronizedDispatcher
            executor : current-context-executor
            throughput : 10
          }
          task-dispatcher : {
            type : TaskDispatcher
            executor : task-executor
            throughput : 30
          }
          default-fork-join-dispatcher : {
            type : ForkJoinDispatcher
            executor : fork-join-executor
            throughput : 30
            dedicated-thread-pool : {
              thread-count : 3
              threadtype : background
            }
          }
          default-dispatcher : {
            type : Dispatcher
            executor : default-executor
            default-executor : {
            }
            thread-pool-executor : {
            }
            fork-join-executor : {
              parallelism-min : 8
              parallelism-factor : 1.0
              parallelism-max : 64
              task-peeking-mode : FIFO
            }
            current-context-executor : {
            }
            shutdown-timeout : 1s
            throughput : 30
            throughput-deadline-time : 0ms
            attempt-teamwork : on
            mailbox-requirement : 
          }
          internal-dispatcher : {
            type : Dispatcher
            executor : fork-join-executor
            throughput : 5
            fork-join-executor : {
              parallelism-min : 4
              parallelism-factor : 1.0
              parallelism-max : 64
            }
            channel-executor : {
              priority : high
            }
          }
          default-blocking-io-dispatcher : {
            type : Dispatcher
            executor : thread-pool-executor
            throughput : 1
          }
          default-mailbox : {
            mailbox-type : Akka.Dispatch.UnboundedMailbox
            mailbox-capacity : 1000
            mailbox-push-timeout-time : 10s
            stash-capacity : -1
          }
          mailbox : {
            requirements : {
              Akka.Dispatch.IUnboundedMessageQueueSemantics : akka.actor.mailbox.unbounded-queue-based
              Akka.Dispatch.IBoundedMessageQueueSemantics : akka.actor.mailbox.bounded-queue-based
              Akka.Dispatch.IDequeBasedMessageQueueSemantics : akka.actor.mailbox.unbounded-deque-based
              Akka.Dispatch.IUnboundedDequeBasedMessageQueueSemantics : akka.actor.mailbox.unbounded-deque-based
              Akka.Dispatch.IBoundedDequeBasedMessageQueueSemantics : akka.actor.mailbox.bounded-deque-based
              Akka.Dispatch.IMultipleConsumerSemantics : akka.actor.mailbox.unbounded-queue-based
              Akka.Event.ILoggerMessageQueueSemantics : akka.actor.mailbox.logger-queue
            }
            unbounded-queue-based : {
              mailbox-type : Akka.Dispatch.UnboundedMailbox
            }
            bounded-queue-based : {
              mailbox-type : Akka.Dispatch.BoundedMailbox
            }
            unbounded-deque-based : {
              mailbox-type : Akka.Dispatch.UnboundedDequeBasedMailbox
            }
            bounded-deque-based : {
              mailbox-type : Akka.Dispatch.BoundedDequeBasedMailbox
            }
            logger-queue : {
              mailbox-type : Akka.Event.LoggerMailboxType
            }
          }
          debug : {
            receive : off
            autoreceive : off
            lifecycle : off
            fsm : off
            event-stream : off
            unhandled : off
            router-misconfiguration : off
          }
          serializers : {
            json : "Akka.Serialization.NewtonSoftJsonSerializer, Akka"
            bytes : "Akka.Serialization.ByteArraySerializer, Akka"
            akka-containers : "Akka.Remote.Serialization.MessageContainerSerializer, Akka.Remote"
            akka-misc : "Akka.Remote.Serialization.MiscMessageSerializer, Akka.Remote"
            primitive : "Akka.Remote.Serialization.PrimitiveSerializers, Akka.Remote"
            proto : "Akka.Remote.Serialization.ProtobufSerializer, Akka.Remote"
            daemon-create : "Akka.Remote.Serialization.DaemonMsgCreateSerializer, Akka.Remote"
            akka-system-msg : "Akka.Remote.Serialization.SystemMessageSerializer, Akka.Remote"
            akka-cluster : "Akka.Cluster.Serialization.ClusterMessageSerializer, Akka.Cluster"
            reliable-delivery : "Akka.Cluster.Serialization.ReliableDeliverySerializer, Akka.Cluster"
            akka-data-replication : "Akka.DistributedData.Serialization.ReplicatorMessageSerializer, Akka.DistributedData"
            akka-replicated-data : "Akka.DistributedData.Serialization.ReplicatedDataSerializer, Akka.DistributedData"
            akka-sharding : "Akka.Cluster.Sharding.Serialization.ClusterShardingMessageSerializer, Akka.Cluster.Sharding"
            akka-cluster-client : "Akka.Cluster.Tools.Client.Serialization.ClusterClientMessageSerializer, Akka.Cluster.Tools"
            akka-pubsub : "Akka.Cluster.Tools.PublishSubscribe.Serialization.DistributedPubSubMessageSerializer, Akka.Cluster.Tools"
            akka-singleton : "Akka.Cluster.Tools.Singleton.Serialization.ClusterSingletonMessageSerializer, Akka.Cluster.Tools"
          }
          serialization-bindings : {
            System.Byte[] : bytes
            System.Object : json
            "Akka.Actor.ActorSelectionMessage, Akka" : akka-containers
            "Akka.Remote.DaemonMsgCreate, Akka.Remote" : daemon-create
            "Google.Protobuf.IMessage, Google.Protobuf" : proto
            "Akka.Actor.Identify, Akka" : akka-misc
            "Akka.Actor.ActorIdentity, Akka" : akka-misc
            "Akka.Actor.IActorRef, Akka" : akka-misc
            "Akka.Actor.PoisonPill, Akka" : akka-misc
            "Akka.Actor.Kill, Akka" : akka-misc
            "Akka.Actor.Status+Failure, Akka" : akka-misc
            "Akka.Actor.Status+Success, Akka" : akka-misc
            "Akka.Actor.RemoteScope, Akka" : akka-misc
            "Akka.Routing.FromConfig, Akka" : akka-misc
            "Akka.Routing.DefaultResizer, Akka" : akka-misc
            "Akka.Routing.RoundRobinPool, Akka" : akka-misc
            "Akka.Routing.BroadcastPool, Akka" : akka-misc
            "Akka.Routing.RandomPool, Akka" : akka-misc
            "Akka.Routing.ScatterGatherFirstCompletedPool, Akka" : akka-misc
            "Akka.Routing.TailChoppingPool, Akka" : akka-misc
            "Akka.Routing.ConsistentHashingPool, Akka" : akka-misc
            "Akka.Configuration.Config, Akka" : akka-misc
            "Akka.Remote.RemoteWatcher+Heartbeat, Akka.Remote" : akka-misc
            "Akka.Remote.RemoteWatcher+HeartbeatRsp, Akka.Remote" : akka-misc
            "Akka.Remote.Routing.RemoteRouterConfig, Akka.Remote" : akka-misc
            "Akka.Dispatch.SysMsg.SystemMessage, Akka" : akka-system-msg
            System.String : primitive
            System.Int32 : primitive
            System.Int64 : primitive
            "Akka.Cluster.IClusterMessage, Akka.Cluster" : akka-cluster
            "Akka.Cluster.Routing.ClusterRouterPool, Akka.Cluster" : akka-cluster
            "Akka.Delivery.Internal.IDeliverySerializable, Akka" : reliable-delivery
            "Akka.DistributedData.IReplicatorMessage, Akka.DistributedData" : akka-data-replication
            "Akka.DistributedData.IReplicatedDataSerialization, Akka.DistributedData" : akka-replicated-data
            "Akka.Cluster.Sharding.IClusterShardingSerializable, Akka.Cluster.Sharding" : akka-sharding
            "Akka.Cluster.Tools.Client.IClusterClientMessage, Akka.Cluster.Tools" : akka-cluster-client
            "Akka.Cluster.Tools.PublishSubscribe.IDistributedPubSubMessage, Akka.Cluster.Tools" : akka-pubsub
            "Akka.Cluster.Tools.PublishSubscribe.Internal.SendToOneSubscriber, Akka.Cluster.Tools" : akka-pubsub
            "Akka.Cluster.Tools.Singleton.IClusterSingletonMessage, Akka.Cluster.Tools" : akka-singleton
          }
          serialization-identifiers : {
            "Akka.Serialization.ByteArraySerializer, Akka" : 4
            "Akka.Serialization.NewtonSoftJsonSerializer, Akka" : 1
            "Akka.Remote.Serialization.ProtobufSerializer, Akka.Remote" : 2
            "Akka.Remote.Serialization.DaemonMsgCreateSerializer, Akka.Remote" : 3
            "Akka.Remote.Serialization.MessageContainerSerializer, Akka.Remote" : 6
            "Akka.Remote.Serialization.MiscMessageSerializer, Akka.Remote" : 16
            "Akka.Remote.Serialization.PrimitiveSerializers, Akka.Remote" : 17
            "Akka.Remote.Serialization.SystemMessageSerializer, Akka.Remote" : 22
            "Akka.Cluster.Serialization.ClusterMessageSerializer, Akka.Cluster" : 5
            "Akka.Cluster.Serialization.ReliableDeliverySerializer, Akka.Cluster" : 36
            "Akka.DistributedData.Serialization.ReplicatedDataSerializer, Akka.DistributedData" : 11
            "Akka.DistributedData.Serialization.ReplicatorMessageSerializer, Akka.DistributedData" : 12
            "Akka.Cluster.Sharding.Serialization.ClusterShardingMessageSerializer, Akka.Cluster.Sharding" : 13
            "Akka.Cluster.Tools.Client.Serialization.ClusterClientMessageSerializer, Akka.Cluster.Tools" : 15
            "Akka.Cluster.Tools.PublishSubscribe.Serialization.DistributedPubSubMessageSerializer, Akka.Cluster.Tools" : 9
            "Akka.Cluster.Tools.Singleton.Serialization.ClusterSingletonMessageSerializer, Akka.Cluster.Tools" : 14
          }
          serialization-settings : {
            json : {
              use-pooled-string-builder : true
              pooled-string-builder-minsize : 2048
              pooled-string-builder-maxsize : 32768
            }
            primitive : {
              use-legacy-behavior : on
            }
          }
        }
        channel-scheduler : {
          parallelism-min : 4
          parallelism-factor : 1
          parallelism-max : 64
          work-max : 10
          work-interval : 500
          work-step : 2
        }
        scheduler : {
          tick-duration : 10ms
          ticks-per-wheel : 512
          implementation : Akka.Actor.HashedWheelTimerScheduler
          shutdown-timeout : 5s
        }
        reliable-delivery : {
          producer-controller : {
            chunk-large-messages : off
            durable-queue : {
              request-timeout : 3s
              retry-attempts : 10
              resend-first-interval : 1s
            }
          }
          consumer-controller : {
            flow-control-window : 50
            resend-interval-min : 2s
            resend-interval-max : 30s
            only-flow-control : false
          }
          work-pulling : {
            producer-controller : {
              buffer-size : 1000
              internal-ask-timeout : 60s
              chunk-large-messages : off
            }
          }
          sharding : {
            producer-controller : {
              buffer-size : 1000
              internal-ask-timeout : 60s
              cleanup-unused-after : 120s
              resend-first-unconfirmed-idle-timeout : 10s
              chunk-large-messages : off
            }
            consumer-controller : {
              buffer-size : 1000
            }
          }
        }
        io : {
          pinned-dispatcher : {
            type : PinnedDispatcher
            executor : fork-join-executor
          }
          tcp : {
            direct-buffer-pool : {
              class : "Akka.IO.Buffers.DirectBufferPool, Akka"
              buffer-size : 512
              buffers-per-segment : 500
              initial-segments : 1
              buffer-pool-limit : 1024
            }
            disabled-buffer-pool : {
              class : "Akka.IO.Buffers.DisabledBufferPool, Akka"
              buffer-size : 512
            }
            buffer-pool : akka.io.tcp.disabled-buffer-pool
            max-channels : 256000
            selector-association-retries : 10
            batch-accept-limit : 10
            register-timeout : 5s
            max-received-message-size : unlimited
            trace-logging : off
            selector-dispatcher : akka.io.pinned-dispatcher
            worker-dispatcher : akka.actor.internal-dispatcher
            management-dispatcher : akka.actor.internal-dispatcher
            file-io-dispatcher : akka.actor.default-blocking-io-dispatcher
            file-io-transferTo-limit : 524288
            finish-connect-retries : 5
            windows-connection-abort-workaround-enabled : off
            outgoing-socket-force-ipv4 : false
          }
          udp : {
            direct-buffer-pool : {
              class : "Akka.IO.Buffers.DirectBufferPool, Akka"
              buffer-size : 512
              buffers-per-segment : 500
              initial-segments : 1
              buffer-pool-limit : 1024
            }
            disabled-buffer-pool : {
              class : "Akka.IO.Buffers.DisabledBufferPool, Akka"
              buffer-size : 512
            }
            buffer-pool : akka.io.udp.disabled-buffer-pool
            nr-of-socket-async-event-args : 32
            max-channels : 4096
            select-timeout : infinite
            selector-association-retries : 10
            receive-throughput : 3
            received-message-size-limit : unlimited
            trace-logging : off
            selector-dispatcher : akka.io.pinned-dispatcher
            worker-dispatcher : akka.actor.internal-dispatcher
            management-dispatcher : akka.actor.internal-dispatcher
          }
          udp-connected : {
            direct-buffer-pool : {
              class : "Akka.IO.Buffers.DirectBufferPool, Akka"
              buffer-size : 512
              buffers-per-segment : 500
              initial-segments : 1
              buffer-pool-limit : 1024
            }
            disabled-buffer-pool : {
              class : "Akka.IO.Buffers.DisabledBufferPool, Akka"
              buffer-size : 512
            }
            buffer-pool : akka.io.udp-connected.disabled-buffer-pool
            nr-of-socket-async-event-args : 32
            max-channels : 4096
            select-timeout : infinite
            selector-association-retries : 10
            receive-throughput : 3
            received-message-size-limit : unlimited
            trace-logging : off
            selector-dispatcher : akka.io.pinned-dispatcher
            worker-dispatcher : akka.actor.internal-dispatcher
            management-dispatcher : akka.actor.internal-dispatcher
          }
          dns : {
            dispatcher : akka.actor.internal-dispatcher
            resolver : inet-address
            inet-address : {
              provider-object : Akka.IO.InetAddressDnsProvider
              positive-ttl : 30s
              negative-ttl : 10s
              cache-cleanup-interval : 120s
              use-ipv6 : true
            }
          }
        }
        coordinated-shutdown : {
          default-phase-timeout : "5 s"
          terminate-actor-system : on
          exit-clr : off
          run-by-clr-shutdown-hook : on
          run-by-actor-system-terminate : on
          phases : {
            before-service-unbind : {
            }
            service-unbind : {
              depends-on : [before-service-unbind]
            }
            service-requests-done : {
              depends-on : [service-unbind]
            }
            service-stop : {
              depends-on : [service-requests-done]
            }
            before-cluster-shutdown : {
              depends-on : [service-stop]
            }
            cluster-sharding-shutdown-region : {
              timeout : "10 s"
              depends-on : [before-cluster-shutdown]
            }
            cluster-leave : {
              depends-on : [cluster-sharding-shutdown-region]
            }
            cluster-exiting : {
              timeout : "10 s"
              depends-on : [cluster-leave]
            }
            cluster-exiting-done : {
              depends-on : [cluster-exiting]
            }
            cluster-shutdown : {
              depends-on : [cluster-exiting-done]
            }
            before-actor-system-terminate : {
              depends-on : [cluster-shutdown]
            }
            actor-system-terminate : {
              timeout : "10 s"
              depends-on : [before-actor-system-terminate]
            }
          }
        }
        remote : {
          startup-timeout : "10 s"
          shutdown-timeout : "10 s"
          flush-wait-on-shutdown : "2 s"
          use-passive-connections : on
          backoff-interval : "0.05 s"
          command-ack-timeout : "30 s"
          handshake-timeout : "15 s"
          use-dispatcher : akka.remote.default-remote-dispatcher
          untrusted-mode : off
          trusted-selection-paths : <<unknown value>>
          log-received-messages : off
          log-sent-messages : off
          log-remote-lifecycle-events : on
          log-frame-size-exceeding : off
          log-buffer-size-exceeding : 50000
          transport-failure-detector : {
            implementation-class : Akka.Remote.DeadlineFailureDetector,Akka.Remote
            heartbeat-interval : "4 s"
            acceptable-heartbeat-pause : "120 s"
          }
          watch-failure-detector : {
            implementation-class : Akka.Remote.PhiAccrualFailureDetector,Akka.Remote
            heartbeat-interval : "1 s"
            threshold : 10.0
            max-sample-size : 200
            min-std-deviation : "100 ms"
            acceptable-heartbeat-pause : "10 s"
            unreachable-nodes-reaper-interval : 1s
            expected-response-after : "1 s"
          }
          retry-gate-closed-for : "5 s"
          prune-quarantine-marker-after : "5 d"
          quarantine-after-silence : "2 d"
          system-message-buffer-size : 20000
          system-message-ack-piggyback-timeout : "0.3 s"
          resend-interval : "2 s"
          resend-limit : 200
          initial-system-message-delivery-timeout : "3 m"
          enabled-transports : [akka.remote.dot-netty.tcp]
          adapters : {
            gremlin : Akka.Remote.Transport.FailureInjectorProvider,Akka.Remote
            trttl : Akka.Remote.Transport.ThrottlerProvider,Akka.Remote
          }
          dot-netty : {
            tcp : {
              transport-class : Akka.Remote.Transport.DotNetty.TcpTransport,Akka.Remote
              applied-adapters : <<unknown value>>
              transport-protocol : tcp
              byte-order : little-endian
              port : 4053
              public-port : 0
              hostname : notifier-3.notifier.notify-notifier.svc.cluster.local
              public-hostname : 
              dns-use-ipv6 : false
              enforce-ip-family : false
              enable-ssl : false
              enable-backwards-compatibility : false
              connection-timeout : "15 s"
              batching : {
                enabled : true
                max-pending-writes : 30
              }
              use-dispatcher-for-io : 
              write-buffer-high-water-mark : 0b
              write-buffer-low-water-mark : 0b
              send-buffer-size : 256000b
              receive-buffer-size : 256000b
              maximum-frame-size : 128000b
              backlog : 4096
              tcp-nodelay : on
              tcp-keepalive : on
              tcp-reuse-addr : off-for-windows
              server-socket-worker-pool : {
                pool-size-min : 2
                pool-size-factor : 1.0
                pool-size-max : 2
              }
              client-socket-worker-pool : {
                pool-size-min : 2
                pool-size-factor : 1.0
                pool-size-max : 2
              }
              ssl : {
                certificate : {
                  path : 
                  password : 
                  use-thumbprint-over-file : false
                  thumbprint : 
                  store-name : 
                  store-location : current-user
                }
                suppress-validation : false
              }
            }
            udp : {
              transport-protocol : udp
            }
          }
          gremlin : {
            debug : off
          }
          default-remote-dispatcher : {
            executor : fork-join-executor
            fork-join-executor : {
              parallelism-min : 2
              parallelism-factor : 0.5
              parallelism-max : 16
            }
            channel-executor : {
              priority : high
            }
          }
          backoff-remote-dispatcher : {
            executor : fork-join-executor
            fork-join-executor : {
              parallelism-min : 2
              parallelism-max : 2
            }
            channel-executor : {
              priority : low
            }
          }
        }
        cluster : {
          seed-nodes : <<unknown value>>
          seed-node-timeout : 5s
          retry-unsuccessful-join-after : 10s
          auto-down-unreachable-after : off
          shutdown-after-unsuccessful-join-seed-nodes : off
          down-removal-margin : off
          downing-provider-class : "Akka.Cluster.SBR.SplitBrainResolverProvider, Akka.Cluster"
          allow-weakly-up-members : 7s
          roles : [subscriber]
          app-version : assembly-version
          run-coordinated-shutdown-when-down : on
          role : {
          }
          min-nr-of-members : 1
          log-info : on
          log-info-verbose : off
          periodic-tasks-initial-delay : 1s
          gossip-interval : 1s
          gossip-time-to-live : 2s
          leader-actions-interval : 1s
          unreachable-nodes-reaper-interval : 1s
          publish-stats-interval : off
          use-dispatcher : 
          gossip-different-view-probability : 0.8
          reduce-gossip-different-view-probability : 400
          use-legacy-heartbeat-message : false
          failure-detector : {
            implementation-class : "Akka.Remote.PhiAccrualFailureDetector, Akka.Remote"
            heartbeat-interval : "1 s"
            threshold : 8.0
            max-sample-size : 1000
            min-std-deviation : "100 ms"
            acceptable-heartbeat-pause : "3 s"
            monitored-by-nr-of-members : 9
            expected-response-after : "1 s"
          }
          scheduler : {
            tick-duration : 33ms
            ticks-per-wheel : 512
          }
          debug : {
            verbose-heartbeat-logging : off
            verbose-receive-gossip-logging : off
          }
          split-brain-resolver : {
            active-strategy : keep-majority
            stable-after : 20s
            down-all-when-unstable : on
            static-quorum : {
              quorum-size : undefined
              role : 
            }
            keep-majority : {
              role : 
            }
            keep-oldest : {
              down-if-alone : on
              role : 
            }
            lease-majority : {
              lease-implementation : 
              lease-name : 
              acquire-lease-delay-for-minority : 2s
              release-after : 40s
              role : 
            }
            keep-referee : {
              address : 
              down-all-if-less-than-nodes : 1
            }
          }
          distributed-data : {
            name : ddataReplicator
            role : 
            gossip-interval : "2 s"
            notify-subscribers-interval : "500 ms"
            max-delta-elements : 500
            use-dispatcher : 
            pruning-interval : "120 s"
            max-pruning-dissemination : "300 s"
            pruning-marker-time-to-live : "6 h"
            serializer-cache-time-to-live : 10s
            recreate-on-failure : off
            prefer-oldest : off
            verbose-debug-logging : off
            delta-crdt : {
              enabled : on
              max-delta-size : 50
            }
            durable : {
              keys : <<unknown value>>
              pruning-marker-time-to-live : "10 d"
              store-actor-class : "Akka.DistributedData.LightningDB.LmdbDurableStore, Akka.DistributedData.LightningDB"
              use-dispatcher : akka.cluster.distributed-data.durable.pinned-store
              pinned-store : {
                executor : thread-pool-executor
                type : PinnedDispatcher
              }
              lmdb : {
                dir : ddata
                map-size : "100 MiB"
                write-behind-interval : off
              }
            }
          }
          sharding : {
            guardian-name : sharding
            role : subscriber
            remember-entities : off
            remember-entities-store : ddata
            passivate-idle-entity-after : 120s
            coordinator-failure-backoff : "5 s"
            retry-interval : 2s
            buffer-size : 100000
            handoff-timeout : 60s
            shard-start-timeout : 10s
            shard-failure-backoff : 10s
            entity-restart-backoff : 10s
            rebalance-interval : 10s
            journal-plugin-id : 
            snapshot-plugin-id : 
            state-store-mode : ddata
            snapshot-after : 1000
            keep-nr-of-batches : 2
            least-shard-allocation-strategy : {
              rebalance-absolute-limit : 0
              rebalance-relative-limit : 0.1
              rebalance-threshold : 1
              max-simultaneous-rebalance : 3
            }
            waiting-for-state-timeout : "2 s"
            updating-state-timeout : "5 s"
            shard-region-query-timeout : "3 s"
            entity-recovery-strategy : all
            entity-recovery-constant-rate-strategy : {
              frequency : "100 ms"
              number-of-entities : 5
            }
            event-sourced-remember-entities-store : {
              max-updates-per-write : 100
            }
            coordinator-singleton : akka.cluster.singleton
            coordinator-state : {
              write-majority-plus : 3
              read-majority-plus : 5
            }
            distributed-data : {
              majority-min-cap : 5
              durable : {
                keys : [shard-*]
              }
              max-delta-elements : 5
              backward-compatible-wire-format : false
            }
            use-dispatcher : 
            use-lease : 
            lease-retry-interval : 5s
            verbose-debug-logging : off
            fail-on-invalid-entity-state-transition : off
          }
          sharded-daemon-process : {
            sharding : {
              guardian-name : sharding
              role : 
              remember-entities : off
              remember-entities-store : ddata
              passivate-idle-entity-after : 120s
              coordinator-failure-backoff : "5 s"
              retry-interval : 2s
              buffer-size : 100000
              handoff-timeout : 60s
              shard-start-timeout : 10s
              shard-failure-backoff : 10s
              entity-restart-backoff : 10s
              rebalance-interval : 10s
              journal-plugin-id : 
              snapshot-plugin-id : 
              state-store-mode : persistence
              snapshot-after : 1000
              keep-nr-of-batches : 2
              least-shard-allocation-strategy : {
                rebalance-absolute-limit : 0
                rebalance-relative-limit : 0.1
                rebalance-threshold : 1
                max-simultaneous-rebalance : 3
              }
              waiting-for-state-timeout : "2 s"
              updating-state-timeout : "5 s"
              shard-region-query-timeout : "3 s"
              entity-recovery-strategy : all
              entity-recovery-constant-rate-strategy : {
                frequency : "100 ms"
                number-of-entities : 5
              }
              event-sourced-remember-entities-store : {
                max-updates-per-write : 100
              }
              coordinator-singleton : akka.cluster.singleton
              coordinator-state : {
                write-majority-plus : 3
                read-majority-plus : 5
              }
              distributed-data : {
                majority-min-cap : 5
                durable : {
                  keys : [shard-*]
                }
                max-delta-elements : 5
                backward-compatible-wire-format : false
              }
              use-dispatcher : 
              use-lease : 
              lease-retry-interval : 5s
              verbose-debug-logging : off
              fail-on-invalid-entity-state-transition : off
            }
            keep-alive-interval : 10s
          }
          client : {
            receptionist : {
              name : receptionist
              role : 
              number-of-contacts : 3
              response-tunnel-receive-timeout : 30s
              use-dispatcher : 
              heartbeat-interval : 2s
              acceptable-heartbeat-pause : 13s
              failure-detection-interval : 2s
            }
            initial-contacts : <<unknown value>>
            establishing-get-contacts-interval : 3s
            refresh-contacts-interval : 60s
            heartbeat-interval : 2s
            acceptable-heartbeat-pause : 13s
            buffer-size : 1000
            reconnect-timeout : off
          }
          pub-sub : {
            name : distributedPubSubMediator
            role : 
            routing-logic : random
            gossip-interval : 1s
            removed-time-to-live : 120s
            max-delta-elements : 3000
            send-to-dead-letters-when-no-subscribers : on
            use-dispatcher : 
          }
          singleton : {
            singleton-name : singleton
            role : 
            hand-over-retry-interval : 1s
            min-number-of-hand-over-retries : 15
            use-lease : 
            lease-retry-interval : 5s
            consider-app-version : false
          }
          singleton-proxy : {
            singleton-name : singleton
            role : 
            singleton-identification-interval : 1s
            buffer-size : 1000
          }
        }
        management : {
          http : {
            hostname : <hostname>
            port : 8558
            bind-hostname : 
            bind-port : 
            base-path : 
            routes : {
              cluster-bootstrap : "Akka.Management.Cluster.Bootstrap.ClusterBootstrapProvider, Akka.Management"
            }
            route-providers-read-only : true
          }
          cluster : {
            bootstrap : {
              new-cluster-enabled : on
              contact-point-discovery : {
                service-name : notifier
                port-name : management
                protocol : tcp
                service-namespace : <service-namespace>
                effective-name : <effective-name>
                discovery-method : akka.discovery
                stable-margin : 5s
                interval : 1s
                exponential-backoff-random-factor : 0.2
                exponential-backoff-max : 15s
                required-contact-point-nr : 2
                resolve-timeout : 3s
                contact-with-all-contact-points : true
              }
              contact-point : {
                fallback-port : <fallback-port>
                filter-on-fallback-port : true
                probing-failure-timeout : 3s
                probe-interval : 1s
                probe-interval-jitter : 0.2
              }
              join-decider : {
                class : "Akka.Management.Cluster.Bootstrap.LowestAddressJoinDecider, Akka.Management"
              }
            }
          }
        }
        http : {
          server : {
            server-header : akka-http/1.0
            default-http-port : 80
            default-https-port : 443
            remote-address-attribute : off
            termination-deadline-exceeded-response : {
              status : 503
            }
          }
        }
        discovery : {
          method : kubernetes-api
          config : {
            class : "Akka.Discovery.Config.ConfigServiceDiscovery, Akka.Discovery"
            services-path : akka.discovery.config.services
            services : {
            }
          }
          aggregate : {
            class : "Akka.Discovery.Aggregate.AggregateServiceDiscovery, Akka.Discovery"
            discovery-methods : <<unknown value>>
          }
          akka-dns : {
            class : "Akka.Discovery.Dns.DnsServiceDiscovery, Akka.Discovery"
          }
          kubernetes-api : {
            class : "Akka.Discovery.KubernetesApi.KubernetesApiServiceDiscovery, Akka.Discovery.KubernetesApi, Version=1.5.7.0, Culture=neutral, PublicKeyToken=null"
            api-ca-path : /var/run/secrets/kubernetes.io/serviceaccount/ca.crt
            api-token-path : /var/run/secrets/kubernetes.io/serviceaccount/token
            api-service-host-env-name : KUBERNETES_SERVICE_HOST
            api-service-port-env-name : KUBERNETES_SERVICE_PORT
            pod-namespace-path : /var/run/secrets/kubernetes.io/serviceaccount/namespace
            pod-namespace : <pod-namespace>
            pod-domain : cluster.local
            pod-label-selector : akkaCluster=ci-notifier-akka
            use-raw-ip : true
            container-name : 
          }
        }
        persistence : {
          journal : {
            azure-table : {
              class : "Akka.Persistence.Azure.Journal.AzureTableStorageJournal, Akka.Persistence.Azure"
              connection-string : 
              table-name : AkkaPersistenceDefaultTable
              connect-timeout : 3s
              request-timeout : 3s
              verbose-logging : off
              plugin-dispatcher : akka.actor.default-dispatcher
              development : off
              auto-initialize : on
              serializer : 
            }
            plugin : akka.persistence.journal.azure-table
          }
        }
      }
      petabridge : {
        cmd : {
          port : 8222
          host : localhost
        }
      }

@Aaronontheweb
Copy link
Member Author

        akkaConfig.WithShardRegion<SchedulingManagerActor>(
            nameof(SchedulingManagerActor),
            _ => Props.Create(() => new SchedulingManagerActor(removeScheduleActor, scheduleMessageActor, projectionActor, builder.GetRequiredService<IScheduleMessageCommandFactory>())
            ),
            new MessageExtractor(),
            new ShardOptions {StateStoreMode = StateStoreMode.DData, Role = "subscriber"});

Ok, so no remember-entities and using DData for state storage. That's helpful - do you have any logs from the "duplicate" entities we can look at @JoeWorkyWork ?

@Aaronontheweb
Copy link
Member Author

So far, from the two users who have reported this issue to me (the third user had self-inflicted problems) - it looks like this issue occurs when state-store-mode = ddata and remember-entities is off. Modeling the state machine now so I can help get a better idea of where this can possibly occur.

@Aaronontheweb
Copy link
Member Author

AkkaDotNet+1 5+Cluster Sharding+duplicate+entity+bug+search

Spent a few hours going through this, looked at a few areas where a duplicate shard might be possible, but was able to rule them out. In order to solve this I think I'm going to need a dump with a large number of DEBUG logs from the sharding system, to see what all the coordinator was doing when this issue might have occurred. In lieu of that, some reproduction steps.

@alireza267
Copy link

I am going to change the log level to DEBUG and check if it causes any performance issues. I hope to capture helpful logs.

@JoeWorkyWork
Copy link

That's helpful - do you have any logs from the "duplicate" entities we can look at @JoeWorkyWork ?

We have added more logging but no error yet.
The 7:th this month is a real busy day for our system so i'll probably have some logs after that @Aaronontheweb

@JoeWorkyWork
Copy link

I don't have any examples from our prod environment, but this is from our test environment. Same setup.
Below it seems to occur around Nov 7, 2023 @ 09:13:28.210

Timestamp	                Properties.ActorPath_s	                                           Properties.MachineName_s	Properties.CorrelationId_s	RenderedMessage
Nov 7, 2023 @ 09:14:28.846	akka://Notifier/user/StreamSupervisor-0/Flow-0-0-unknown-operation	notifier-0	187aaaa4-cc12-47a7-bef8-027abbcca3c5	Successfully handled message of type "NextNotificationDateChangedEvent" in 148ms
Nov 7, 2023 @ 09:14:28.809	akka://Notifier/system/sharding/SchedulingManagerActor/98/2980421	notifier-0	22d856b4-8b9c-44a9-8053-47c9c5c83df7	Applied event "NotificationScheduledEvent" to event stream
Nov 7, 2023 @ 09:14:28.807	akka://Notifier/system/sharding/SchedulingManagerActor/98/2980421	notifier-0	22d856b4-8b9c-44a9-8053-47c9c5c83df7	Applied event "ScheduledNotificationRemovedEvent" to event stream
Nov 7, 2023 @ 09:14:28.779	akka://Notifier/user/ScheduleMessageActor	notifier-0	22d856b4-8b9c-44a9-8053-47c9c5c83df7	Scheduled message with id: 558330. Scheduled to: 12/07/2023 09:00:00 +01:00."
Nov 7, 2023 @ 09:14:28.766	akka://Notifier/user/RemoveScheduledMessageActor	notifier-0	22d856b4-8b9c-44a9-8053-47c9c5c83df7	Removed scheduled message with message id 557776 that was scheduled to 11/08/2023 09:00:00 +01:00
Nov 7, 2023 @ 09:14:28.731	akka://Notifier/system/sharding/SchedulingManagerActor/98/2980421	notifier-0	22d856b4-8b9c-44a9-8053-47c9c5c83df7	Applied event "NextNotificationDateReceivedEvent" to event stream
Nov 7, 2023 @ 09:14:28.698	akka://Notifier/user/StreamSupervisor-0/Flow-0-0-unknown-operation	notifier-0	187aaaa4-cc12-47a7-bef8-027abbcca3c5	Received message of type "NextNotificationDateChangedEvent"
Nov 7, 2023 @ 09:13:28.350	akka://Notifier/user/StreamSupervisor-0/Flow-0-0-unknown-operation	notifier-0	3c92c3dc-66f4-4ce3-bfb8-82dab411fc82	Successfully handled message of type "AccountStateChangedEvent" in 109ms
Nov 7, 2023 @ 09:13:28.311	akka://Notifier/system/sharding/SchedulingManagerActor/98/2980421	notifier-0	22d856b4-8b9c-44a9-8053-47c9c5c83df7	Applied event "AccountStateInformationReceivedEvent" to event stream
Nov 7, 2023 @ 09:13:28.241	akka://Notifier/user/StreamSupervisor-0/Flow-0-0-unknown-operation	notifier-0	3c92c3dc-66f4-4ce3-bfb8-82dab411fc82	Received message of type "AccountStateChangedEvent"
Nov 7, 2023 @ 09:13:28.219	akka://Notifier/user/DeadLetterMonitorActor	notifier-0	-	DeadLetter captured, sender: "[akka://Notifier/system/sharding/SchedulingManagerActor/98/2980421#1116756156]", recipient: "[akka://Notifier/system/sharding/SchedulingManagerActor/98/2980421#1116756156]"
Nov 7, 2023 @ 09:13:28.219	akka://Notifier/system/sharding/SchedulingManagerActor/98/2980421	notifier-0	-	"Rejected to persist event type [""Notifier.Domain.Events.NextNotificationDateReceivedEvent""] with sequence number [13] for persistenceId [""customer-2980421""] due to [""0:The specified entity already exists.
RequestId:71cc6a9c-0002-00c7-1952-118c6e000000
Time:2023-11-07T08:13:28.2094231Z
 The index of the entity that caused the error can be found in FailedTransactionActionIndex.
Status: 409 (Conflict)
ErrorCode: EntityAlreadyExists
"
Nov 7, 2023 @ 09:13:28.219	akka://Notifier/system/deadLetterListener	notifier-0	-	Message [SchedulingManagerMessage`1] from [akka://Notifier/system/sharding/SchedulingManagerActor/98/2980421#1116756156] to [akka://Notifier/system/sharding/SchedulingManagerActor/98/2980421#1116756156] was not delivered. [3] dead letters encountered. If this is not an expected behavior then [akka://Notifier/system/sharding/SchedulingManagerActor/98/2980421#1116756156] may have terminated unexpectedly. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'. Message content: Notifier.Infrastructure.Akka.Models.SchedulingManagerMessage`1[Notifier.Worker.Akka.Models.ActorCommunication.ManageSchedulingForAccountCommand]
Nov 7, 2023 @ 09:13:28.210	akka://Notifier/system/sharding/SchedulingManagerActor/98/2980421	notifier-0	22d856b4-8b9c-44a9-8053-47c9c5c83df7	Persistence of "NextNotificationDateReceivedEvent" rejected
Nov 7, 2023 @ 09:13:28.210	akka://Notifier/system/sharding/SchedulingManagerActor/98/2980421	notifier-0	22d856b4-8b9c-44a9-8053-47c9c5c83df7	Persistence rejected indicates a duplicate writer scenario. Will stop actor immediately. No ack to service bus will be made, the lock will eventually expire and be retried
Nov 7, 2023 @ 09:13:28.197	akka://Notifier/user/StreamSupervisor-0/Flow-0-0-unknown-operation	notifier-0	ff715abd-1a31-4c27-a0de-9a3e605b5a03	Received message of type "NextNotificationDateChangedEvent"
Nov 7, 2023 @ 09:13:21.842	akka://Notifier/user/StreamSupervisor-0/Flow-3-0-unknown-operation	notifier-3	f2ca3a0e-2f73-43b6-a940-ba9edee16367	Successfully handled message of type "NotificationEvent" in 185ms
Nov 7, 2023 @ 09:13:21.780	akka://Notifier/system/sharding/SchedulingManagerActor/98/2980421	notifier-3	22d856b4-8b9c-44a9-8053-47c9c5c83df7	Applied event "NotificationScheduledEvent" to event stream
Nov 7, 2023 @ 09:13:21.751	akka://Notifier/user/ScheduleMessageActor	notifier-3	22d856b4-8b9c-44a9-8053-47c9c5c83df7	Scheduled message with id: 557776. Scheduled to: 11/08/2023 09:00:00 +01:00.
Nov 7, 2023 @ 09:13:21.686	akka://Notifier/system/sharding/SchedulingManagerActor/98/2980421	notifier-3	22d856b4-8b9c-44a9-8053-47c9c5c83df7	Applied event "DueDateReceivedEvent" to event stream
Nov 7, 2023 @ 09:13:21.657	akka://Notifier/user/StreamSupervisor-0/Flow-3-0-unknown-operation	notifier-3	f2ca3a0e-2f73-43b6-a940-ba9edee16367	Received message of type "NotificationEvent"
Nov 7, 2023 @ 09:13:21.530	akka://Notifier/user/StreamSupervisor-0/Flow-3-0-unknown-operation	notifier-3	4b6f16f6-7f27-4be1-a353-fee7a8c9d73c	Successfully handled message of type "AccountStateChangedEvent" in 151ms
Nov 7, 2023 @ 09:13:21.494	akka://Notifier/system/sharding/SchedulingManagerActor/98/2980421	notifier-3	22d856b4-8b9c-44a9-8053-47c9c5c83df7	Applied event "AccountInformationReceivedEvent" to event stream
Nov 7, 2023 @ 09:13:21.494	akka://Notifier/system/sharding/SchedulingManagerActor/98/2980421	notifier-3	22d856b4-8b9c-44a9-8053-47c9c5c83df7	Applied event "AccountStateInformationReceivedEvent" to event stream
Nov 7, 2023 @ 09:13:21.379	akka://Notifier/user/StreamSupervisor-0/Flow-3-0-unknown-operation	notifier-3	4b6f16f6-7f27-4be1-a353-fee7a8c9d73c	Received message of type "AccountStateChangedEvent"
Nov 7, 2023 @ 09:11:00.481	akka://Notifier/user/StreamSupervisor-0/Flow-1-0-unknown-operation	notifier-0	f3434cb4-b184-4dc9-bb15-411c9d0937e0	Successfully handled message of type "NotifyAccountsMessage" in 4260ms
Nov 7, 2023 @ 09:11:00.318	akka://Notifier/user/NotifyEdgeActor	notifier-0	f3434cb4-b184-4dc9-bb15-411c9d0937e0	Rest response received from "EdgePublicWebApi"
Nov 7, 2023 @ 09:11:00.318	akka://Notifier/user/NotifyEdgeActor	notifier-0	f3434cb4-b184-4dc9-bb15-411c9d0937e0	Notified customer with id 2980421
Nov 7, 2023 @ 09:10:59.541	akka://Notifier/system/sharding/SchedulingManagerActor/98/2980421	notifier-0	f3434cb4-b184-4dc9-bb15-411c9d0937e0	Applied event "NotifyAccountsMessageReceivedEvent" to event stream
Nov 7, 2023 @ 09:10:59.490	akka://Notifier/user/NotifyEdgeActor	notifier-0	f3434cb4-b184-4dc9-bb15-411c9d0937e0	Rest request sent to "EdgePublicWebApi"
Nov 7, 2023 @ 09:10:56.220	akka://Notifier/user/StreamSupervisor-0/Flow-1-0-unknown-operation	notifier-0	f3434cb4-b184-4dc9-bb15-411c9d0937e0	Received message of type "NotifyAccountsMessage"

@Aaronontheweb
Copy link
Member Author

I'm back from vacation and I'll be picking up work on this again

@Aaronontheweb Aaronontheweb modified the milestones: 1.5.14, 1.5.15 Nov 29, 2023
@JoeWorkyWork
Copy link

Welcome back :)
Any progress on this issue?

@Aaronontheweb
Copy link
Member Author

Yes! I have some data that indicates that this is a problem caused by a shard rebalancing / handoff timing out. Going to write a reproduction for that as soon as I can. Doing some onsite work with a customer this week but this is high on my to-do list.

@Arkatufus
Copy link
Contributor

@JoeWorkyWork Can you tell us under what condition were the cluster in when these problems occured?

Were you updating the cluster, were there different versions of Akka running in the cluster at the time, did any of the cluster node leave/rejoin at the time, what version(s) of Akka were running in the cluster, etc.

@Aaronontheweb Aaronontheweb modified the milestones: 1.5.15, 1.5.16 Jan 10, 2024
@Aaronontheweb Aaronontheweb modified the milestones: 1.5.16, 1.5.17 Jan 31, 2024
@JoeWorkyWork
Copy link

Sorry missed your reply @Arkatufus. I'll get back with some more info next week hopefully.

@JoeWorkyWork
Copy link

@Arkatufus Sadly we didn't save any raw logs from that incident, so can't give you much on the cluster behavior.
But from my memory and git logs we deployed the "faulty" version 1 or 2 days before the incident (12th november).
In that deploy we upgraded Akka.Persistence.Azure.Hosting from 1.5.1 to 1.5.13.

Complete list of akka versions at the incident:

    <PackageVersion Include="Akka" Version="1.5.13" />
    <PackageVersion Include="Akka.Hosting" Version="1.5.13" />
    <PackageVersion Include="Akka.Management" Version="1.5.7" />
    <PackageVersion Include="Akka.Logger.Serilog" Version="1.5.12.1" />
    <PackageVersion Include="Akka.Cluster" Version="1.5.13" />
    <PackageVersion Include="Akka.Cluster.Hosting" Version="1.5.13" />
    <PackageVersion Include="Akka.Cluster.Sharding" Version="1.5.13" />
    <PackageVersion Include="Akka.Persistence.Azure.Hosting" Version="1.5.13" />
    <PackageVersion Include="Akka.Persistence.Hosting" Version="1.5.13" />
    <PackageVersion Include="Akka.Persistence.TestKit.Xunit2" Version="1.5.13" />
    <PackageVersion Include="Akka.TestKit" Version="1.5.13" />
    <PackageVersion Include="Akka.TestKit.Xunit2" Version="1.5.13" />
    <PackageVersion Include="Akka.DependencyInjection" Version="1.5.13" />
    <PackageVersion Include="Akka.Serialization.Hyperion" Version="1.5.13" />
    <PackageVersion Include="Akka.Streams" Version="1.5.13" />
    <PackageVersion Include="Akka.Streams.Azure.ServiceBus" Version="1.5.8" />
    <PackageVersion Include="Akka.Discovery.KubernetesApi" Version="1.5.7" />

Rough timeline:

  • Incident occurs under load, tons of conflict error while persisting data
  • We restart the k8s pods, still errors
  • Decide to rollback the akka package that was upgraded
  • Rollback successful and no errors.

Later we upgraded to .13 and no errors in production

@Aaronontheweb Aaronontheweb removed this from the 1.5.17 milestone Mar 5, 2024
@Aaronontheweb Aaronontheweb reopened this Jun 24, 2024
@Aaronontheweb
Copy link
Member Author

Did not mean to close this issue - still under investigation.

@Aaronontheweb
Copy link
Member Author

I think I've found the smoking gun here, from some of the logs provided on #7285

Host 1

host1       | [INFO][07/17/2024 11:22:45.287Z][Thread 0020][Cluster (akka://CustomApplication)] Cluster Node [akka.tcp://CustomApplication@host1:8110] - is the new leader among reachable nodes (more leaders may exist)
host1       | [INFO][07/17/2024 11:22:46.625Z][Thread 0012][remoting (akka://CustomApplication)] Starting remoting
host1       | [INFO][07/17/2024 11:22:46.630Z][Thread 0012][remoting (akka://CustomApplication)] Remoting started; listening on addresses : [akka.tcp://CustomApplication@host1:9998]
host1       | [INFO][07/17/2024 11:22:46.630Z][Thread 0012][remoting (akka://CustomApplication)] Remoting now listens on addresses: [akka.tcp://CustomApplication@host1:9998]
host1       | [INFO][07/17/2024 11:22:46.631Z][Thread 0012][Cluster (akka://CustomApplication)] Cluster Node [akka.tcp://CustomApplication@host1:9998] - Starting up...
host1       | [INFO][07/17/2024 11:22:46.634Z][Thread 0012][Cluster (akka://CustomApplication)] Cluster Node [akka.tcp://CustomApplication@host1:9998] - Started up successfully
host1       | [INFO][07/17/2024 11:22:46.644Z][Thread 0011][Cluster (akka://CustomApplication)] Cluster Node [akka.tcp://CustomApplication@host1:9998] - Welcome from [akka.tcp://CustomApplication@host3:8110]
host1       | [INFO][07/17/2024 11:22:46.665Z][Thread 0021][akka.tcp://CustomApplication@host1:9998/system/sharding/EdgeActorRegion] EdgeActorRegion: Idle entities will be passivated after [00:02:00]
host1       | [INFO][07/17/2024 11:22:46.667Z][Thread 0014][Cluster (akka://CustomApplication)] Cluster Node [akka.tcp://CustomApplication@host1:8110] - Received InitJoin message from [[akka.tcp://CustomApplication@host1:9998/system/cluster/core/daemon/joinSeedNodeProcess-1#688040923]] to [akka.tcp://CustomApplication@host1:8110]
host1       | [INFO][07/17/2024 11:22:46.668Z][Thread 0014][Cluster (akka://CustomApplication)] Cluster Node [akka.tcp://CustomApplication@host1:8110] - Sending InitJoinAck message from node [akka.tcp://CustomApplication@host1:8110] to [[akka.tcp://CustomApplication@host1:9998/system/cluster/core/daemon/joinSeedNodeProcess-1#688040923]]
host1       | [INFO][07/17/2024 11:22:48.291Z][Thread 0014][Cluster (akka://CustomApplication)] Cluster Node [akka.tcp://CustomApplication@host1:8110] - Leader is moving node [akka.tcp://CustomApplication@host1:9998] to [Up]
host1       | [INFO][07/17/2024 11:22:48.656Z][Thread 0011][akka.tcp://CustomApplication@host1:9998/system/sharding/EdgeActorRegionCoordinator] ClusterSingletonManager state change [Start -> Younger] Akka.Cluster.Tools.Singleton.Uninitialized
host1       | [INFO][07/17/2024 11:23:22.706Z][Thread 0014][akka://CustomApplication/system/sharding/EdgeActorRegion/0/mosquitto%3A1883] Message [ConnectionConfiguration] from [akka.tcp://CustomApplication@host3:9998/user/EdgeManagerActor#1502967810] to [akka://CustomApplication/system/sharding/EdgeActorRegion/0/mosquitto%3A1883#633271427] was unhandled. [1] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'. Message content: test.CustomApplication.Configuration.ConnectionConfiguration
host1       | 2024-07-17 11:23:22.741 +00:00 <11> [INF] mosquitto:1883 | Application connected mosquitto:1883
host1       | 2024-07-17 11:23:22.757 +00:00 <13> [INF] akka://CustomApplication/system/sharding/EdgeActorRegion/0/mosquitto%3A1883 | Running

Host 2

Host 2, which starts up and joins the cluster later, immediately allocates a duplicate shard:

host2       | [INFO][07/17/2024 11:23:57.566Z][Thread 0005][Cluster (akka://CustomApplication)] Cluster Node [akka.tcp://CustomApplication@host2:9998] - Started up successfully
host2       | [INFO][07/17/2024 11:23:57.578Z][Thread 0007][Cluster (akka://CustomApplication)] Cluster Node [akka.tcp://CustomApplication@host2:9998] - Welcome from [akka.tcp://CustomApplication@host3:8110]
host2       | [INFO][07/17/2024 11:23:57.577Z][Thread 0011][Cluster (akka://CustomApplication)] Cluster Node [akka.tcp://CustomApplication@host2:8110] - Received InitJoin message from [[akka.tcp://CustomApplication@host2:9998/system/cluster/core/daemon/joinSeedNodeProcess-1#1394894317]] to [akka.tcp://CustomApplication@host2:8110]
host2       | [INFO][07/17/2024 11:23:57.577Z][Thread 0011][Cluster (akka://CustomApplication)] Cluster Node [akka.tcp://CustomApplication@host2:8110] - Sending InitJoinAck message from node [akka.tcp://CustomApplication@host2:8110] to [[akka.tcp://CustomApplication@host2:9998/system/cluster/core/daemon/joinSeedNodeProcess-1#1394894317]]
host2       | [INFO][07/17/2024 11:23:57.600Z][Thread 0021][akka.tcp://CustomApplication@host2:9998/system/sharding/EdgeActorRegion] EdgeActorRegion: Idle entities will be passivated after [00:02:00]
host2       | [INFO][07/17/2024 11:23:59.663Z][Thread 0013][akka://CustomApplication/system/sharding/EdgeActorRegion/0/mosquitto%3A1883] Message [ConnectionConfiguration] from [akka://CustomApplication/user/EdgeManagerActor#1233956126] to [akka://CustomApplication/system/sharding/EdgeActorRegion/0/mosquitto%3A1883#2022509903] was unhandled. [1] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'. Message content: test.CustomApplication.Configuration.ConnectionConfiguration
host2       | 2024-07-17 11:23:59.705 +00:00 <21> [INF] mosquitto:1883 | Application connected mosquitto:1883
host2       | 2024-07-17 11:23:59.722 +00:00 <13> [INF] akka://CustomApplication/system/sharding/EdgeActorRegion/0/mosquitto%3A1883 | Running
host2       | [INFO][07/17/2024 11:24:00.584Z][Thread 0021][akka.tcp://CustomApplication@host2:9998/system/sharding/EdgeActorRegionCoordinator] ClusterSingletonManager state change [Start -> Younger] Akka.Cluster.Tools.Singleton.Uninitialized
host2       | 2024-07-17 11:24:07.708 +00:00 <14> [DBG] akka://CustomApplication/system/sharding/EdgeActorRegion/0/mosquitto%3A1883 | Received connection configuration message 
host2       | 2024-07-17 11:24:09.708 +00:00 <21> [WRN] mosquitto:1883 | Application disconnected mosquitto:1883
host2       | 2024-07-17 11:24:17.707 +00:00 <13> [DBG] akka://CustomApplication/system/sharding/EdgeActorRegion/0/mosquitto%3A1883 | Received connection configuration message 
host2       | 2024-07-17 11:24:19.709 +00:00 <13> [INF] mosquitto:1883 | Application connected mosquitto:1883
host2       | 2024-07-17 11:24:27.706 +00:00 <14> [DBG] akka://CustomApplication/system/sharding/EdgeActorRegion/0/mosquitto%3A1883 | Received connection configuration message 

The logs are incomplete and don't indicate a hand-off or anything, but they make me think that there's a problem with how the ShardRegion handles things that occur immediately at node startup - there's no indication that the coordinator attempted to hand anything off. I'm going to reconfigure my test lab sample and see how it goes.

@Aaronontheweb
Copy link
Member Author

Aaronontheweb commented Jul 22, 2024

Still not closed yet - GitHub pulled the trigger a tad early.

Aaronontheweb added a commit to Aaronontheweb/akka.net that referenced this issue Jul 22, 2024
we would have seen A LOT of smoke and fire if this didn't work correctly, but since we're in the midst of testing for all sorts of member transition-related issues for akkadotnet#6973 we thought it would be best to add a sanity check.
Aaronontheweb added a commit that referenced this issue Jul 22, 2024
we would have seen A LOT of smoke and fire if this didn't work correctly, but since we're in the midst of testing for all sorts of member transition-related issues for #6973 we thought it would be best to add a sanity check.
Aaronontheweb added a commit to Aaronontheweb/akka.net that referenced this issue Jul 23, 2024
Eliminates the source of akkadotnet#6793, which was caused by using the incorrect ordering methodology when it came to determining which `ClusterSingletonManager` to hand-over to during member state transitions.

close akkadotnet#6973
close akkadotnet#7196
@Aaronontheweb
Copy link
Member Author

Gonna give my test lab reproductions an entire night to try to reproduce this, but the long and the short of it is:

  1. We've been able to reproduce the problem reliably
  2. We identified the problem using primarily traces from Phobos plus some custom telemetry I built (a duplicate detection system)
  3. Issue appears to be fixed between Akka.Cluster.Tools.Singleton / Akka.Cluster.Sharding: fix duplicate shards caused by incorrect ClusterSingletonManager HandOver #7297 and Akka.Cluster.Tools: fix mutability and oldest state bugs with ClusterSingletonManager #7298 - both PRs are necessary.

@Aaronontheweb
Copy link
Member Author

Will share a post-mortem on here or perhaps on the YouTube channel, but this fix is going into v1.5.27.

@Aaronontheweb Aaronontheweb changed the title Akka.Cluster.Sharding: duplicate shards / entities in v1.5 Akka.Cluster.Sharding: duplicate shards / entities Jul 25, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment