diff --git a/cmd/mautrix-whatsapp/legacymigrate.go b/cmd/mautrix-whatsapp/legacymigrate.go index 16e478b0..d40bd5d3 100644 --- a/cmd/mautrix-whatsapp/legacymigrate.go +++ b/cmd/mautrix-whatsapp/legacymigrate.go @@ -57,6 +57,7 @@ func migrateLegacyConfig(helper up.Helper) { bridgeconfig.CopyToOtherLocation(helper, up.Bool, []string{"bridge", "whatsapp_thumbnail"}, []string{"network", "whatsapp_thumbnail"}) bridgeconfig.CopyToOtherLocation(helper, up.Bool, []string{"bridge", "url_previews"}, []string{"network", "url_previews"}) bridgeconfig.CopyToOtherLocation(helper, up.Bool, []string{"bridge", "force_active_delivery_receipts"}, []string{"network", "force_active_delivery_receipts"}) + bridgeconfig.CopyToOtherLocation(helper, up.Int, []string{"bridge", "history_sync", "max_initial_conversations"}, []string{"network", "history_sync", "max_initial_conversations"}) bridgeconfig.CopyToOtherLocation(helper, up.Bool, []string{"bridge", "history_sync", "request_full_sync"}, []string{"network", "history_sync", "request_full_sync"}) bridgeconfig.CopyToOtherLocation(helper, up.Int, []string{"bridge", "history_sync", "full_sync_config", "days_limit"}, []string{"network", "history_sync", "full_sync_config", "days_limit"}) bridgeconfig.CopyToOtherLocation(helper, up.Int, []string{"bridge", "history_sync", "full_sync_config", "size_limit_mb"}, []string{"network", "history_sync", "full_sync_config", "size_limit_mb"}) diff --git a/go.mod b/go.mod index 72a6ce6a..e77a3199 100644 --- a/go.mod +++ b/go.mod @@ -22,7 +22,7 @@ require ( golang.org/x/sync v0.8.0 google.golang.org/protobuf v1.34.2 gopkg.in/yaml.v3 v3.0.1 - maunium.net/go/mautrix v0.21.0 + maunium.net/go/mautrix v0.21.1-0.20240917135825-a95101ea7f01 ) require ( diff --git a/go.sum b/go.sum index c75ad838..b3a2722f 100644 --- a/go.sum +++ b/go.sum @@ -114,5 +114,5 @@ gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= maunium.net/go/mauflag v1.0.0 h1:YiaRc0tEI3toYtJMRIfjP+jklH45uDHtT80nUamyD4M= maunium.net/go/mauflag v1.0.0/go.mod h1:nLivPOpTpHnpzEh8jEdSL9UqO9+/KBJFmNRlwKfkPeA= -maunium.net/go/mautrix v0.21.0 h1:Z6nVu+clkJgj6ANwFYQQ1BtYeVXZPZ9lRgwuFN57gOY= -maunium.net/go/mautrix v0.21.0/go.mod h1:qm9oDhcHxF/Xby5RUuONIGpXw1SXXqLZj/GgvMxJxu0= +maunium.net/go/mautrix v0.21.1-0.20240917135825-a95101ea7f01 h1:KQE071yde/kCQi2cffa6MjiNtrNWSTW2YzWQmeRjx6A= +maunium.net/go/mautrix v0.21.1-0.20240917135825-a95101ea7f01/go.mod h1:qm9oDhcHxF/Xby5RUuONIGpXw1SXXqLZj/GgvMxJxu0= diff --git a/pkg/connector/backfill.go b/pkg/connector/backfill.go new file mode 100644 index 00000000..4da8358f --- /dev/null +++ b/pkg/connector/backfill.go @@ -0,0 +1,344 @@ +package connector + +import ( + "context" + "errors" + "fmt" + "strconv" + "strings" + "time" + + "github.com/rs/zerolog" + "go.mau.fi/util/ptr" + "go.mau.fi/whatsmeow" + "go.mau.fi/whatsmeow/proto/waHistorySync" + "go.mau.fi/whatsmeow/proto/waWeb" + "go.mau.fi/whatsmeow/types" + "google.golang.org/protobuf/proto" + "maunium.net/go/mautrix/bridgev2" + "maunium.net/go/mautrix/bridgev2/networkid" + "maunium.net/go/mautrix/bridgev2/simplevent" + + "maunium.net/go/mautrix-whatsapp/pkg/connector/wadb" + "maunium.net/go/mautrix-whatsapp/pkg/waid" +) + +var _ bridgev2.BackfillingNetworkAPI = (*WhatsAppClient)(nil) + +const historySyncDispatchWait = 30 * time.Second + +func (wa *WhatsAppClient) historySyncLoop() { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + oldStop := wa.stopHistorySyncLoop.Swap(&cancel) + if oldStop != nil { + (*oldStop)() + } + dispatchTimer := time.NewTimer(historySyncDispatchWait) + dispatchTimer.Stop() + wa.UserLogin.Log.Debug().Msg("Starting history sync loop") + for { + select { + case evt := <-wa.historySyncs: + dispatchTimer.Stop() + wa.handleWAHistorySync(ctx, evt) + dispatchTimer.Reset(historySyncDispatchWait) + case <-dispatchTimer.C: + wa.createPortalsFromHistorySync(ctx) + case <-ctx.Done(): + wa.UserLogin.Log.Debug().Msg("Stopping history sync loop") + return + } + } +} + +func (wa *WhatsAppClient) handleWAHistorySync(ctx context.Context, evt *waHistorySync.HistorySync) { + if evt == nil || evt.SyncType == nil { + return + } + log := wa.UserLogin.Log.With(). + Str("action", "store history sync"). + Stringer("sync_type", evt.GetSyncType()). + Uint32("chunk_order", evt.GetChunkOrder()). + Uint32("progress", evt.GetProgress()). + Logger() + ctx = log.WithContext(ctx) + if evt.GetGlobalSettings() != nil { + log.Debug().Interface("global_settings", evt.GetGlobalSettings()).Msg("Got global settings in history sync") + } + if evt.GetSyncType() == waHistorySync.HistorySync_INITIAL_STATUS_V3 || evt.GetSyncType() == waHistorySync.HistorySync_PUSH_NAME || evt.GetSyncType() == waHistorySync.HistorySync_NON_BLOCKING_DATA { + log.Debug(). + Int("conversation_count", len(evt.GetConversations())). + Int("pushname_count", len(evt.GetPushnames())). + Int("status_count", len(evt.GetStatusV3Messages())). + Int("recent_sticker_count", len(evt.GetRecentStickers())). + Int("past_participant_count", len(evt.GetPastParticipants())). + Msg("Ignoring history sync") + return + } + log.Info(). + Int("conversation_count", len(evt.GetConversations())). + Int("past_participant_count", len(evt.GetPastParticipants())). + Msg("Storing history sync") + successfullySavedTotal := 0 + failedToSaveTotal := 0 + totalMessageCount := 0 + for _, conv := range evt.GetConversations() { + jid, err := types.ParseJID(conv.GetID()) + if err != nil { + totalMessageCount += len(conv.GetMessages()) + log.Warn().Err(err). + Str("chat_jid", conv.GetID()). + Int("msg_count", len(conv.GetMessages())). + Msg("Failed to parse chat JID in history sync") + continue + } else if jid.Server == types.BroadcastServer { + log.Debug().Str("chat_jid", jid.String()).Msg("Skipping broadcast list in history sync") + continue + } else if jid.Server == types.HiddenUserServer { + log.Debug().Str("chat_jid", jid.String()).Msg("Skipping hidden user JID chat in history sync") + continue + } + totalMessageCount += len(conv.GetMessages()) + log := log.With(). + Stringer("chat_jid", jid). + Int("msg_count", len(conv.GetMessages())). + Logger() + + var minTime, maxTime time.Time + var minTimeIndex, maxTimeIndex int + + ignoredTypes := 0 + messages := make([]*wadb.HistorySyncMessageTuple, 0, len(conv.GetMessages())) + for i, rawMsg := range conv.GetMessages() { + // Don't store messages that will just be skipped. + msgEvt, err := wa.Client.ParseWebMessage(jid, rawMsg.GetMessage()) + if err != nil { + log.Warn().Err(err). + Int("msg_index", i). + Str("msg_id", rawMsg.GetMessage().GetKey().GetID()). + Uint64("msg_time_seconds", rawMsg.GetMessage().GetMessageTimestamp()). + Msg("Dropping historical message due to parse error") + continue + } + if minTime.IsZero() || msgEvt.Info.Timestamp.Before(minTime) { + minTime = msgEvt.Info.Timestamp + minTimeIndex = i + } + if maxTime.IsZero() || msgEvt.Info.Timestamp.After(maxTime) { + maxTime = msgEvt.Info.Timestamp + maxTimeIndex = i + } + + msgType := getMessageType(msgEvt.Message) + if msgType == "ignore" || strings.HasPrefix(msgType, "unknown_protocol_") { + ignoredTypes++ + continue + } + marshaled, err := proto.Marshal(rawMsg) + if err != nil { + log.Warn().Err(err). + Int("msg_index", i). + Str("msg_id", msgEvt.Info.ID). + Msg("Failed to marshal message") + continue + } + + messages = append(messages, &wadb.HistorySyncMessageTuple{Info: &msgEvt.Info, Message: marshaled}) + } + log.Debug(). + Int("wrapped_count", len(messages)). + Int("ignored_msg_type_count", ignoredTypes). + Time("lowest_time", minTime). + Int("lowest_time_index", minTimeIndex). + Time("highest_time", maxTime). + Int("highest_time_index", maxTimeIndex). + Dict("metadata", zerolog.Dict(). + Uint32("ephemeral_expiration", conv.GetEphemeralExpiration()). + Int64("ephemeral_setting_timestamp", conv.GetEphemeralSettingTimestamp()). + Bool("marked_unread", conv.GetMarkedAsUnread()). + Bool("archived", conv.GetArchived()). + Uint32("pinned", conv.GetPinned()). + Uint64("mute_end", conv.GetMuteEndTime()). + Uint32("unread_count", conv.GetUnreadCount()), + ). + Msg("Collected messages to save from history sync conversation") + + if len(messages) > 0 { + err = wa.Main.DB.Conversation.Put(ctx, wadb.NewConversation(wa.UserLogin.ID, jid, conv)) + if err != nil { + log.Err(err).Msg("Failed to save conversation metadata") + continue + } + err = wa.Main.DB.Message.Put(ctx, wa.UserLogin.ID, jid, messages) + if err != nil { + log.Err(err).Msg("Failed to save messages") + failedToSaveTotal += len(messages) + } else { + successfullySavedTotal += len(messages) + } + } + } + log.Info(). + Int("total_saved_count", successfullySavedTotal). + Int("total_failed_count", failedToSaveTotal). + Int("total_message_count", totalMessageCount). + Msg("Finished storing history sync") +} + +func (wa *WhatsAppClient) createPortalsFromHistorySync(ctx context.Context) { + log := wa.UserLogin.Log.With(). + Str("action", "create portals from history sync"). + Logger() + ctx = log.WithContext(ctx) + limit := wa.Main.Config.HistorySync.MaxInitialConversations + log.Info().Int("limit", limit).Msg("Creating portals from history sync") + conversations, err := wa.Main.DB.Conversation.GetRecent(ctx, wa.UserLogin.ID, limit) + if err != nil { + log.Err(err).Msg("Failed to get recent conversations from database") + return + } + for _, conv := range conversations { + wrappedInfo, err := wa.getChatInfo(ctx, conv.ChatJID, conv) + if errors.Is(err, whatsmeow.ErrNotInGroup) { + log.Debug().Stringer("chat_jid", conv.ChatJID). + Msg("Skipping creating room because the user is not a participant") + err = wa.Main.DB.Message.DeleteAllInChat(ctx, wa.UserLogin.ID, conv.ChatJID) + if err != nil { + log.Err(err).Msg("Failed to delete historical messages for portal") + } + continue + } else if err != nil { + log.Err(err).Stringer("chat_jid", conv.ChatJID).Msg("Failed to get chat info") + continue + } + wa.Main.Bridge.QueueRemoteEvent(wa.UserLogin, &simplevent.ChatResync{ + EventMeta: simplevent.EventMeta{ + Type: bridgev2.RemoteEventChatResync, + LogContext: nil, + PortalKey: wa.makeWAPortalKey(conv.ChatJID), + CreatePortal: true, + }, + ChatInfo: wrappedInfo, + LatestMessageTS: conv.LastMessageTimestamp, + }) + } +} + +func (wa *WhatsAppClient) FetchMessages(ctx context.Context, params bridgev2.FetchMessagesParams) (*bridgev2.FetchMessagesResponse, error) { + portalJID, err := waid.ParsePortalID(params.Portal.ID) + if err != nil { + return nil, err + } + var markRead bool + var startTime, endTime *time.Time + if params.Forward { + if params.AnchorMessage != nil { + startTime = ptr.Ptr(params.AnchorMessage.Timestamp) + } + conv, err := wa.Main.DB.Conversation.Get(ctx, wa.UserLogin.ID, portalJID) + if err != nil { + return nil, fmt.Errorf("failed to get conversation from database: %w", err) + } else if conv != nil { + markRead = !ptr.Val(conv.MarkedAsUnread) && ptr.Val(conv.UnreadCount) == 0 + } + } else if params.Cursor != "" { + endTimeUnix, err := strconv.ParseInt(string(params.Cursor), 10, 64) + if err != nil { + return nil, fmt.Errorf("failed to parse cursor: %w", err) + } + endTime = ptr.Ptr(time.Unix(endTimeUnix, 0)) + } else if params.AnchorMessage != nil { + endTime = ptr.Ptr(params.AnchorMessage.Timestamp) + } + messages, err := wa.Main.DB.Message.GetBetween(ctx, wa.UserLogin.ID, portalJID, startTime, endTime, params.Count+1) + if err != nil { + return nil, fmt.Errorf("failed to load messages from database: %w", err) + } else if len(messages) == 0 { + return &bridgev2.FetchMessagesResponse{ + HasMore: false, + Forward: params.Forward, + }, nil + } + hasMore := false + oldestTS := messages[len(messages)-1].GetMessageTimestamp() + newestTS := messages[0].GetMessageTimestamp() + if len(messages) > params.Count { + hasMore = true + // For safety, cut off messages with the oldest timestamp in the response. + // Otherwise, if there are multiple messages with the same timestamp, the next fetch may miss some. + for i := len(messages) - 2; i >= 0; i-- { + if messages[i].GetMessageTimestamp() > oldestTS { + messages = messages[:i+1] + break + } + } + } + convertedMessages := make([]*bridgev2.BackfillMessage, len(messages)) + for i, msg := range messages { + evt, err := wa.Client.ParseWebMessage(portalJID, msg) + if err != nil { + // This should never happen because the info is already parsed once before being stored in the database + return nil, fmt.Errorf("failed to parse info of message %s: %w", msg.GetKey().GetID(), err) + } + convertedMessages[i] = wa.convertHistorySyncMessage(ctx, params.Portal, &evt.Info, msg) + } + return &bridgev2.FetchMessagesResponse{ + Messages: convertedMessages, + Cursor: networkid.PaginationCursor(strconv.FormatUint(messages[0].GetMessageTimestamp(), 10)), + HasMore: hasMore, + Forward: endTime == nil, + MarkRead: markRead, + // TODO set remaining or total count + CompleteCallback: func() { + // TODO this only deletes after backfilling. If there's no need for backfill after a relogin, + // the messages will be stuck in the database + var err error + if !wa.Main.Bridge.Config.Backfill.Queue.Enabled { + // If the backfill queue isn't enabled, delete all messages after backfilling a batch. + err = wa.Main.DB.Message.DeleteAllInChat(ctx, wa.UserLogin.ID, portalJID) + } else { + // Otherwise just delete the messages that got backfilled + err = wa.Main.DB.Message.DeleteBetween(ctx, wa.UserLogin.ID, portalJID, newestTS, oldestTS) + } + if err != nil { + zerolog.Ctx(ctx).Warn().Err(err).Msg("Failed to delete messages from database after backfill") + } + }, + }, nil +} + +func (wa *WhatsAppClient) convertHistorySyncMessage( + ctx context.Context, portal *bridgev2.Portal, info *types.MessageInfo, msg *waWeb.WebMessageInfo, +) *bridgev2.BackfillMessage { + // TODO use proper intent + intent := wa.Main.Bridge.Bot + wrapped := &bridgev2.BackfillMessage{ + ConvertedMessage: wa.Main.MsgConv.ToMatrix(ctx, portal, wa.Client, intent, msg.Message, info), + Sender: wa.makeEventSender(info.Sender), + ID: waid.MakeMessageID(info.Chat, info.Sender, info.ID), + TxnID: networkid.TransactionID(waid.MakeMessageID(info.Chat, info.Sender, info.ID)), + Timestamp: info.Timestamp, + Reactions: make([]*bridgev2.BackfillReaction, len(msg.Reactions)), + } + for i, reaction := range msg.Reactions { + var sender types.JID + if reaction.GetKey().GetFromMe() { + sender = wa.JID + } else if reaction.GetKey().GetParticipant() != "" { + sender, _ = types.ParseJID(*reaction.Key.Participant) + } else if info.Chat.Server == types.DefaultUserServer { + sender = info.Chat + } + if sender.IsEmpty() { + continue + } + wrapped.Reactions[i] = &bridgev2.BackfillReaction{ + TargetPart: ptr.Ptr(networkid.PartID("")), + Timestamp: time.UnixMilli(reaction.GetSenderTimestampMS()), + Sender: wa.makeEventSender(sender), + Emoji: reaction.GetText(), + } + } + return wrapped +} diff --git a/pkg/connector/chatinfo.go b/pkg/connector/chatinfo.go index b0414eff..8192a951 100644 --- a/pkg/connector/chatinfo.go +++ b/pkg/connector/chatinfo.go @@ -7,21 +7,25 @@ import ( "github.com/rs/zerolog" "go.mau.fi/util/ptr" - "go.mau.fi/whatsmeow/proto/waHistorySync" "go.mau.fi/whatsmeow/types" "maunium.net/go/mautrix/bridgev2" "maunium.net/go/mautrix/bridgev2/database" "maunium.net/go/mautrix/bridgev2/networkid" "maunium.net/go/mautrix/event" + "maunium.net/go/mautrix-whatsapp/pkg/connector/wadb" "maunium.net/go/mautrix-whatsapp/pkg/waid" ) -func (wa *WhatsAppClient) GetChatInfo(ctx context.Context, portal *bridgev2.Portal) (wrapped *bridgev2.ChatInfo, err error) { +func (wa *WhatsAppClient) GetChatInfo(ctx context.Context, portal *bridgev2.Portal) (*bridgev2.ChatInfo, error) { portalJID, err := waid.ParsePortalID(portal.ID) if err != nil { return nil, err } + return wa.getChatInfo(ctx, portalJID, nil) +} + +func (wa *WhatsAppClient) getChatInfo(ctx context.Context, portalJID types.JID, conv *wadb.Conversation) (wrapped *bridgev2.ChatInfo, err error) { switch portalJID.Server { case types.DefaultUserServer: wrapped = wa.wrapDMInfo(portalJID) @@ -46,8 +50,15 @@ func (wa *WhatsAppClient) GetChatInfo(ctx context.Context, portal *bridgev2.Port default: return nil, fmt.Errorf("unsupported server %s", portalJID.Server) } - var conv *waHistorySync.Conversation - applyHistoryInfo(wrapped, conv) + if conv == nil { + conv, err = wa.Main.DB.Conversation.Get(ctx, wa.UserLogin.ID, portalJID) + if err != nil { + zerolog.Ctx(ctx).Warn().Err(err).Msg("Failed to get history sync conversation info") + } + } + if conv != nil { + applyHistoryInfo(wrapped, conv) + } wa.applyChatSettings(ctx, portalJID, wrapped) return wrapped, nil } @@ -79,25 +90,27 @@ func (wa *WhatsAppClient) applyChatSettings(ctx context.Context, chatID types.JI } } -func applyHistoryInfo(info *bridgev2.ChatInfo, conv *waHistorySync.Conversation) { +func applyHistoryInfo(info *bridgev2.ChatInfo, conv *wadb.Conversation) { if conv == nil { return } info.CanBackfill = true info.UserLocal = &bridgev2.UserLocalPortalInfo{ - MutedUntil: ptr.Ptr(time.Unix(int64(conv.GetMuteEndTime()), 0)), + MutedUntil: ptr.Ptr(conv.MuteEndTime), } - if conv.GetPinned() > 0 { + if ptr.Val(conv.Pinned) { info.UserLocal.Tag = ptr.Ptr(event.RoomTagFavourite) - } else if conv.GetArchived() { + } else if ptr.Val(conv.Archived) { info.UserLocal.Tag = ptr.Ptr(event.RoomTagLowPriority) } - if conv.GetEphemeralExpiration() > 0 { + if ptr.Val(conv.EphemeralExpiration) > 0 { info.Disappear = &database.DisappearingSetting{ Type: database.DisappearingTypeAfterRead, - Timer: time.Duration(conv.GetEphemeralExpiration()) * time.Second, + Timer: time.Duration(*conv.EphemeralExpiration) * time.Second, + } + if conv.EphemeralSettingTimestamp != nil { + info.ExtraUpdates = bridgev2.MergeExtraUpdaters(info.ExtraUpdates, updateDisappearingTimerSetAt(*conv.EphemeralSettingTimestamp)) } - info.ExtraUpdates = bridgev2.MergeExtraUpdaters(info.ExtraUpdates, updateDisappearingTimerSetAt(conv.GetEphemeralSettingTimestamp())) } } diff --git a/pkg/connector/client.go b/pkg/connector/client.go index d5bc3f3d..92588f4c 100644 --- a/pkg/connector/client.go +++ b/pkg/connector/client.go @@ -3,9 +3,11 @@ package connector import ( "context" "fmt" + "sync/atomic" "github.com/rs/zerolog" "go.mau.fi/whatsmeow" + "go.mau.fi/whatsmeow/proto/waHistorySync" "go.mau.fi/whatsmeow/store" "go.mau.fi/whatsmeow/types" waLog "go.mau.fi/whatsmeow/util/log" @@ -20,6 +22,8 @@ func (wa *WhatsAppConnector) LoadUserLogin(_ context.Context, login *bridgev2.Us w := &WhatsAppClient{ Main: wa, UserLogin: login, + + historySyncs: make(chan *waHistorySync.HistorySync, 64), } login.Client = w @@ -54,6 +58,9 @@ type WhatsAppClient struct { Client *whatsmeow.Client Device *store.Device JID types.JID + + historySyncs chan *waHistorySync.HistorySync + stopHistorySyncLoop atomic.Pointer[context.CancelFunc] } var _ bridgev2.NetworkAPI = (*WhatsAppClient)(nil) @@ -96,10 +103,14 @@ func (wa *WhatsAppClient) Connect(ctx context.Context) error { if err := wa.Main.updateProxy(ctx, wa.Client, false); err != nil { zerolog.Ctx(ctx).Err(err).Msg("Failed to update proxy") } + go wa.historySyncLoop() return wa.Client.Connect() } func (wa *WhatsAppClient) Disconnect() { + if stopHistorySyncLoop := wa.stopHistorySyncLoop.Swap(nil); stopHistorySyncLoop != nil { + (*stopHistorySyncLoop)() + } if cli := wa.Client; cli != nil { cli.Disconnect() wa.Client = nil diff --git a/pkg/connector/config.go b/pkg/connector/config.go index 0f5aff9c..5d4bfa68 100644 --- a/pkg/connector/config.go +++ b/pkg/connector/config.go @@ -42,8 +42,9 @@ type Config struct { ForceActiveDeliveryReceipts bool `yaml:"force_active_delivery_receipts"` HistorySync struct { - RequestFullSync bool `yaml:"request_full_sync"` - FullSyncConfig struct { + MaxInitialConversations int `yaml:"max_initial_conversations"` + RequestFullSync bool `yaml:"request_full_sync"` + FullSyncConfig struct { DaysLimit uint32 `yaml:"days_limit"` SizeLimit uint32 `yaml:"size_mb_limit"` StorageQuota uint32 `yaml:"storage_quota_mb"` @@ -94,6 +95,7 @@ func upgradeConfig(helper up.Helper) { helper.Copy(up.Bool, "whatsapp_thumbnail") helper.Copy(up.Bool, "url_previews") + helper.Copy(up.Int, "history_sync", "max_initial_conversations") helper.Copy(up.Bool, "history_sync", "request_full_sync") helper.Copy(up.Int|up.Null, "history_sync", "full_sync_config", "days_limit") helper.Copy(up.Int|up.Null, "history_sync", "full_sync_config", "size_mb_limit") diff --git a/pkg/connector/example-config.yaml b/pkg/connector/example-config.yaml index 1ff6c890..7bfe55b2 100644 --- a/pkg/connector/example-config.yaml +++ b/pkg/connector/example-config.yaml @@ -52,6 +52,10 @@ force_active_delivery_receipts: false # Settings for handling history sync payloads. history_sync: + # How many conversations should the bridge create after login? + # If -1, all conversations received from history sync will be bridged. + # Other conversations will be backfilled on demand when receiving a message. + max_initial_conversations: -1 # Should the bridge request a full sync from the phone when logging in? # This bumps the size of history syncs from 3 months to 1 year. request_full_sync: false diff --git a/pkg/connector/handlewhatsapp.go b/pkg/connector/handlewhatsapp.go index b6b54691..0e6e81f0 100644 --- a/pkg/connector/handlewhatsapp.go +++ b/pkg/connector/handlewhatsapp.go @@ -87,7 +87,9 @@ func (wa *WhatsAppClient) handleWAEvent(rawEvt any) { // TODO case *events.HistorySync: - // TODO + if wa.Main.Bridge.Config.Backfill.Enabled { + wa.historySyncs <- evt.Data + } case *events.MediaRetry: // TODO diff --git a/pkg/connector/wadb/conversation.go b/pkg/connector/wadb/conversation.go index 53bb05b9..f7e47b1e 100644 --- a/pkg/connector/wadb/conversation.go +++ b/pkg/connector/wadb/conversation.go @@ -5,6 +5,7 @@ import ( "time" "go.mau.fi/util/dbutil" + "go.mau.fi/util/ptr" "go.mau.fi/whatsmeow/proto/waHistorySync" "go.mau.fi/whatsmeow/types" "maunium.net/go/mautrix/bridgev2/networkid" @@ -20,14 +21,41 @@ type Conversation struct { UserLoginID networkid.UserLoginID ChatJID types.JID LastMessageTimestamp time.Time - Archived bool - Pinned bool + Archived *bool + Pinned *bool MuteEndTime time.Time - EndOfHistoryTransferType waHistorySync.Conversation_EndOfHistoryTransferType - EphemeralExpiration time.Duration - EphemeralSettingTimestamp int64 - MarkedAsUnread bool - UnreadCount uint32 + EndOfHistoryTransferType *waHistorySync.Conversation_EndOfHistoryTransferType + EphemeralExpiration *uint32 + EphemeralSettingTimestamp *int64 + MarkedAsUnread *bool + UnreadCount *uint32 +} + +func parseHistoryTime(ts *uint64) time.Time { + if ts == nil || *ts == 0 { + return time.Time{} + } + return time.Unix(int64(*ts), 0) +} + +func NewConversation(loginID networkid.UserLoginID, chatJID types.JID, conv *waHistorySync.Conversation) *Conversation { + var pinned *bool + if conv.Pinned != nil { + pinned = ptr.Ptr(*conv.Pinned > 0) + } + return &Conversation{ + UserLoginID: loginID, + ChatJID: chatJID, + LastMessageTimestamp: parseHistoryTime(conv.LastMsgTimestamp), + Archived: conv.Archived, + Pinned: pinned, + MuteEndTime: parseHistoryTime(conv.MuteEndTime), + EndOfHistoryTransferType: conv.EndOfHistoryTransferType, + EphemeralExpiration: conv.EphemeralExpiration, + EphemeralSettingTimestamp: conv.EphemeralSettingTimestamp, + MarkedAsUnread: conv.MarkedAsUnread, + UnreadCount: conv.UnreadCount, + } } const ( @@ -40,12 +68,19 @@ const ( ON CONFLICT (bridge_id, user_login_id, chat_jid) DO UPDATE SET last_message_timestamp=CASE - WHEN excluded.last_message_timestamp > whatsapp_history_sync_conversation.last_message_timestamp THEN excluded.last_message_timestamp + WHEN whatsapp_history_sync_conversation.last_message_timestamp IS NULL + OR excluded.last_message_timestamp > whatsapp_history_sync_conversation.last_message_timestamp + THEN excluded.last_message_timestamp ELSE whatsapp_history_sync_conversation.last_message_timestamp END, + archived=COALESCE(excluded.archived, whatsapp_history_sync_conversation.archived), + pinned=COALESCE(excluded.pinned, whatsapp_history_sync_conversation.pinned), + mute_end_time=COALESCE(excluded.mute_end_time, whatsapp_history_sync_conversation.mute_end_time), + end_of_history_transfer_type=COALESCE(excluded.end_of_history_transfer_type, whatsapp_history_sync_conversation.end_of_history_transfer_type), ephemeral_expiration=COALESCE(excluded.ephemeral_expiration, whatsapp_history_sync_conversation.ephemeral_expiration), ephemeral_setting_timestamp=COALESCE(excluded.ephemeral_setting_timestamp, whatsapp_history_sync_conversation.ephemeral_setting_timestamp), - end_of_history_transfer_type=excluded.end_of_history_transfer_type + marked_as_unread=COALESCE(excluded.marked_as_unread, whatsapp_history_sync_conversation.marked_as_unread), + unread_count=COALESCE(excluded.unread_count, whatsapp_history_sync_conversation.unread_count) ` getRecentConversations = ` SELECT @@ -97,16 +132,23 @@ func (cq *ConversationQuery) Delete(ctx context.Context, loginID networkid.UserL } func (c *Conversation) sqlVariables() []any { + var lastMessageTS, muteEndTime int64 + if !c.LastMessageTimestamp.IsZero() { + lastMessageTS = c.LastMessageTimestamp.Unix() + } + if !c.MuteEndTime.IsZero() { + muteEndTime = c.MuteEndTime.Unix() + } return []any{ c.BridgeID, c.UserLoginID, c.ChatJID, - c.LastMessageTimestamp.Unix(), + lastMessageTS, c.Archived, c.Pinned, - c.MuteEndTime.Unix(), + muteEndTime, c.EndOfHistoryTransferType, - int64(c.EphemeralExpiration.Seconds()), + c.EphemeralExpiration, c.EphemeralSettingTimestamp, c.MarkedAsUnread, c.UnreadCount, @@ -114,7 +156,7 @@ func (c *Conversation) sqlVariables() []any { } func (c *Conversation) Scan(row dbutil.Scannable) (*Conversation, error) { - var lastMessageTS, muteEndTime, ephemeralExpiration int64 + var lastMessageTS, muteEndTime int64 err := row.Scan( &c.BridgeID, &c.UserLoginID, @@ -124,7 +166,7 @@ func (c *Conversation) Scan(row dbutil.Scannable) (*Conversation, error) { &c.Pinned, &muteEndTime, &c.EndOfHistoryTransferType, - &ephemeralExpiration, + &c.EphemeralExpiration, &c.EphemeralSettingTimestamp, &c.MarkedAsUnread, &c.UnreadCount, @@ -138,6 +180,5 @@ func (c *Conversation) Scan(row dbutil.Scannable) (*Conversation, error) { if muteEndTime != 0 { c.MuteEndTime = time.Unix(muteEndTime, 0) } - c.EphemeralExpiration = time.Duration(ephemeralExpiration) * time.Second return c, nil } diff --git a/pkg/connector/wadb/message.go b/pkg/connector/wadb/message.go index 6c7aa5e9..f2450c18 100644 --- a/pkg/connector/wadb/message.go +++ b/pkg/connector/wadb/message.go @@ -6,6 +6,7 @@ import ( "time" "go.mau.fi/util/dbutil" + "go.mau.fi/util/exslices" "go.mau.fi/whatsmeow/proto/waHistorySync" "go.mau.fi/whatsmeow/proto/waWeb" "go.mau.fi/whatsmeow/types" @@ -21,7 +22,7 @@ type MessageQuery struct { const ( insertHistorySyncMessageQuery = ` INSERT INTO whatsapp_history_sync_message (bridge_id, user_login_id, chat_jid, sender_jid, message_id, timestamp, data, inserted_time) - VALUES ($1, $2, $3, $4, $5, $6, $7) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8) ON CONFLICT (bridge_id, user_login_id, chat_jid, sender_jid, message_id) DO NOTHING ` getHistorySyncMessagesBetweenQueryTemplate = ` @@ -31,9 +32,9 @@ const ( ORDER BY timestamp DESC %s ` - deleteHistorySyncMessagesBetweenExclusiveQuery = ` + deleteHistorySyncMessagesBetweenQuery = ` DELETE FROM whatsapp_history_sync_message - WHERE bridge_id=$1 AND user_login_id=$2 AND chat_jid=$3 AND timestamp<$4 AND timestamp>$5 + WHERE bridge_id=$1 AND user_login_id=$2 AND chat_jid=$3 AND timestamp<=$4 AND timestamp>=$5 ` deleteAllHistorySyncMessagesQuery = "DELETE FROM whatsapp_history_sync_message WHERE bridge_id=$1 AND user_login_id=$2" deleteHistorySyncMessagesForPortalQuery = ` @@ -48,15 +49,30 @@ const ( ` ) -func (mq *MessageQuery) Put(ctx context.Context, loginID networkid.UserLoginID, parsedInfo *types.MessageInfo, message *waHistorySync.HistorySyncMsg) error { - msgData, err := proto.Marshal(message) - if err != nil { - return err - } - _, err = mq.Exec(ctx, insertHistorySyncMessageQuery, - mq.BridgeID, loginID, parsedInfo.Chat, parsedInfo.Sender.ToNonAD(), parsedInfo.ID, - parsedInfo.Timestamp, msgData, time.Now()) - return err +type HistorySyncMessageTuple struct { + Info *types.MessageInfo + Message []byte +} + +func (t *HistorySyncMessageTuple) GetMassInsertValues() [4]any { + return [4]any{t.Info.Sender.ToNonAD(), t.Info.ID, t.Info.Timestamp.Unix(), t.Message} +} + +var batchInsertHistorySyncMessage = dbutil.NewMassInsertBuilder[*HistorySyncMessageTuple, [4]any]( + insertHistorySyncMessageQuery, "($1, $2, $3, $%d, $%d, $%d, $%d, $4)", +) + +func (mq *MessageQuery) Put(ctx context.Context, loginID networkid.UserLoginID, chatJID types.JID, messages []*HistorySyncMessageTuple) error { + return mq.DoTxn(ctx, nil, func(ctx context.Context) error { + for _, chunk := range exslices.Chunk(messages, 50) { + query, params := batchInsertHistorySyncMessage.Build([4]any{mq.BridgeID, loginID, chatJID, time.Now().Unix()}, chunk) + _, err := mq.Exec(ctx, query, params...) + if err != nil { + return err + } + } + return nil + }) } func scanWebMessageInfo(rows dbutil.Scannable) (*waWeb.WebMessageInfo, error) { @@ -100,8 +116,8 @@ func (mq *MessageQuery) GetBetween(ctx context.Context, loginID networkid.UserLo AsList() } -func (mq *MessageQuery) DeleteBetween(ctx context.Context, loginID networkid.UserLoginID, chatJID types.JID, before, after time.Time) error { - _, err := mq.Exec(ctx, deleteHistorySyncMessagesBetweenExclusiveQuery, mq.BridgeID, loginID, chatJID, before.Unix(), after.Unix()) +func (mq *MessageQuery) DeleteBetween(ctx context.Context, loginID networkid.UserLoginID, chatJID types.JID, before, after uint64) error { + _, err := mq.Exec(ctx, deleteHistorySyncMessagesBetweenQuery, mq.BridgeID, loginID, chatJID, before, after) return err } diff --git a/pkg/connector/wadb/upgrades/00-latest-schema.sql b/pkg/connector/wadb/upgrades/00-latest-schema.sql index 9eed6304..e2ab2028 100644 --- a/pkg/connector/wadb/upgrades/00-latest-schema.sql +++ b/pkg/connector/wadb/upgrades/00-latest-schema.sql @@ -1,9 +1,9 @@ -- v0 -> v2 (compatible with v2+): Latest revision CREATE TABLE whatsapp_poll_option_id ( - bridge_id TEXT NOT NULL, - msg_mxid TEXT NOT NULL, - opt_id TEXT NOT NULL, + bridge_id TEXT NOT NULL, + msg_mxid TEXT NOT NULL, + opt_id TEXT NOT NULL, opt_hash bytea NOT NULL CHECK ( length(opt_hash) = 32 ), PRIMARY KEY (bridge_id, msg_mxid, opt_id), @@ -13,19 +13,19 @@ CREATE TABLE whatsapp_poll_option_id ( ); CREATE TABLE whatsapp_history_sync_conversation ( - bridge_id TEXT NOT NULL, - user_login_id TEXT NOT NULL, - chat_jid TEXT NOT NULL, + bridge_id TEXT NOT NULL, + user_login_id TEXT NOT NULL, + chat_jid TEXT NOT NULL, - last_message_timestamp BIGINT NOT NULL, - archived BOOLEAN NOT NULL, - pinned BOOLEAN NOT NULL, - mute_end_time BIGINT NOT NULL, - end_of_history_transfer_type INTEGER NOT NULL, - ephemeral_expiration INTEGER NOT NULL, - ephemeral_setting_timestamp BIGINT NOT NULL, - marked_as_unread BOOLEAN NOT NULL, - unread_count INTEGER NOT NULL, + last_message_timestamp BIGINT, + archived BOOLEAN, + pinned BOOLEAN, + mute_end_time BIGINT, + end_of_history_transfer_type INTEGER, + ephemeral_expiration INTEGER, + ephemeral_setting_timestamp BIGINT, + marked_as_unread BOOLEAN, + unread_count INTEGER, PRIMARY KEY (bridge_id, user_login_id, chat_jid), CONSTRAINT whatsapp_history_sync_conversation_user_login_fkey FOREIGN KEY (bridge_id, user_login_id) diff --git a/pkg/connector/wadb/upgrades/02-history-sync-message-sender.postgres.sql b/pkg/connector/wadb/upgrades/02-history-sync-message-sender.postgres.sql index f012af0a..0a7175d1 100644 --- a/pkg/connector/wadb/upgrades/02-history-sync-message-sender.postgres.sql +++ b/pkg/connector/wadb/upgrades/02-history-sync-message-sender.postgres.sql @@ -2,3 +2,5 @@ -- transaction: sqlite-fkey-off ALTER TABLE whatsapp_history_sync_message ADD COLUMN sender_jid TEXT NOT NULL DEFAULT ''; ALTER TABLE whatsapp_history_sync_message ALTER COLUMN sender_jid DROP DEFAULT; +ALTER TABLE whatsapp_history_sync_message DROP CONSTRAINT whatsapp_history_sync_message_pkey; +ALTER TABLE whatsapp_history_sync_message ADD PRIMARY KEY (bridge_id, user_login_id, chat_jid, sender_jid, message_id);