Skip to content

Commit

Permalink
wadb: add wrappers for backfill and poll option tables
Browse files Browse the repository at this point in the history
  • Loading branch information
tulir committed Sep 17, 2024
1 parent 7264c7f commit edf6b17
Show file tree
Hide file tree
Showing 13 changed files with 569 additions and 49 deletions.
3 changes: 2 additions & 1 deletion cmd/mautrix-whatsapp/legacymigrate.sql
Original file line number Diff line number Diff line change
Expand Up @@ -228,13 +228,14 @@ LEFT JOIN user_login ON user_login.user_mxid = history_sync_conversation_old.use
WHERE user_login.id IS NOT NULL;

INSERT INTO whatsapp_history_sync_message (
bridge_id, user_login_id, chat_jid, message_id, timestamp, data, inserted_time
bridge_id, user_login_id, chat_jid, sender_jid, message_id, timestamp, data, inserted_time
)
SELECT
'',
user_login.id,
conversation_id,
message_id,
'',
-- only: postgres
CAST(EXTRACT(EPOCH FROM timestamp) AS BIGINT),
-- only: sqlite (line commented)
Expand Down
2 changes: 1 addition & 1 deletion cmd/mautrix-whatsapp/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func main() {
"v0.11.0",
m.LegacyMigrateWithAnotherUpgrader(
legacyMigrateRenameTables, legacyMigrateCopyData, 17,
upgrades.Table, "whatsapp_version", 1,
upgrades.Table, "whatsapp_version", 2,
),
true,
)
Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ require (
github.com/rs/zerolog v1.33.0
github.com/skip2/go-qrcode v0.0.0-20200617195104-da1b6568686e
github.com/tidwall/gjson v1.17.3
go.mau.fi/util v0.7.1-0.20240913091524-7617daa66719
go.mau.fi/util v0.8.1-0.20240917114523-1ba4f6274db5
go.mau.fi/webp v0.1.0
go.mau.fi/whatsmeow v0.0.0-20240916205343-ea8c175b2e2c
golang.org/x/exp v0.0.0-20240909161429-701f63a606c0
Expand All @@ -22,7 +22,7 @@ require (
golang.org/x/sync v0.8.0
google.golang.org/protobuf v1.34.2
gopkg.in/yaml.v3 v3.0.1
maunium.net/go/mautrix v0.20.1-0.20240914094516-d89dac594db0
maunium.net/go/mautrix v0.21.0
)

require (
Expand Down
8 changes: 4 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,8 @@ github.com/yuin/goldmark v1.7.4 h1:BDXOHExt+A7gwPCJgPIIq7ENvceR7we7rOS9TNoLZeg=
github.com/yuin/goldmark v1.7.4/go.mod h1:uzxRWxtg69N339t3louHJ7+O03ezfj6PlliRlaOzY1E=
go.mau.fi/libsignal v0.1.1 h1:m/0PGBh4QKP/I1MQ44ti4C0fMbLMuHb95cmDw01FIpI=
go.mau.fi/libsignal v0.1.1/go.mod h1:QLs89F/OA3ThdSL2Wz2p+o+fi8uuQUz0e1BRa6ExdBw=
go.mau.fi/util v0.7.1-0.20240913091524-7617daa66719 h1:sg1P/f4RHY1JuAwsPOjTCsZr8ROzR9bRTtnvvBu42d4=
go.mau.fi/util v0.7.1-0.20240913091524-7617daa66719/go.mod h1:1Ixb8HWoVbl3rT6nAX6nV4iMkzn7KU/KXwE0Rn5RmsQ=
go.mau.fi/util v0.8.1-0.20240917114523-1ba4f6274db5 h1:UBob83/x5OS6nLUAGaZepQtaTcnHdRKpJswvoS3MQUs=
go.mau.fi/util v0.8.1-0.20240917114523-1ba4f6274db5/go.mod h1:1Ixb8HWoVbl3rT6nAX6nV4iMkzn7KU/KXwE0Rn5RmsQ=
go.mau.fi/webp v0.1.0 h1:BHObH/DcFntT9KYun5pDr0Ot4eUZO8k2C7eP7vF4ueA=
go.mau.fi/webp v0.1.0/go.mod h1:e42Z+VMFrUMS9cpEwGRIor+lQWO8oUAyPyMtcL+NMt8=
go.mau.fi/whatsmeow v0.0.0-20240916205343-ea8c175b2e2c h1:0pNAbeBNHdDmLbHG2bj+tnQnwE5YZVE83/QAfMlaYs4=
Expand Down Expand Up @@ -114,5 +114,5 @@ gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
maunium.net/go/mauflag v1.0.0 h1:YiaRc0tEI3toYtJMRIfjP+jklH45uDHtT80nUamyD4M=
maunium.net/go/mauflag v1.0.0/go.mod h1:nLivPOpTpHnpzEh8jEdSL9UqO9+/KBJFmNRlwKfkPeA=
maunium.net/go/mautrix v0.20.1-0.20240914094516-d89dac594db0 h1:fTX1P8TPv+oUqHGu08jj6FYH+Q/fC9jtmvkXcAw+KTo=
maunium.net/go/mautrix v0.20.1-0.20240914094516-d89dac594db0/go.mod h1:amzKPIZVO7v1piD2JhKG1RvGZoV+5wEZfoHaEXOjjqA=
maunium.net/go/mautrix v0.21.0 h1:Z6nVu+clkJgj6ANwFYQQ1BtYeVXZPZ9lRgwuFN57gOY=
maunium.net/go/mautrix v0.21.0/go.mod h1:qm9oDhcHxF/Xby5RUuONIGpXw1SXXqLZj/GgvMxJxu0=
2 changes: 1 addition & 1 deletion pkg/connector/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func (wa *WhatsAppConnector) GetName() bridgev2.BridgeName {
func (wa *WhatsAppConnector) Init(bridge *bridgev2.Bridge) {
wa.Bridge = bridge
wa.MsgConv = msgconv.New(bridge)
wa.DB = wadb.New(bridge.DB.Database, bridge.Log.With().Str("db_section", "whatsapp").Logger())
wa.DB = wadb.New(bridge.ID, bridge.DB.Database, bridge.Log.With().Str("db_section", "whatsapp").Logger())

wa.DeviceStore = sqlstore.NewWithDB(
bridge.DB.RawDB,
Expand Down
143 changes: 143 additions & 0 deletions pkg/connector/wadb/conversation.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
package wadb

import (
"context"
"time"

"go.mau.fi/util/dbutil"
"go.mau.fi/whatsmeow/proto/waHistorySync"
"go.mau.fi/whatsmeow/types"
"maunium.net/go/mautrix/bridgev2/networkid"
)

type ConversationQuery struct {
BridgeID networkid.BridgeID
*dbutil.QueryHelper[*Conversation]
}

type Conversation struct {
BridgeID networkid.BridgeID
UserLoginID networkid.UserLoginID
ChatJID types.JID
LastMessageTimestamp time.Time
Archived bool
Pinned bool
MuteEndTime time.Time
EndOfHistoryTransferType waHistorySync.Conversation_EndOfHistoryTransferType
EphemeralExpiration time.Duration
EphemeralSettingTimestamp int64
MarkedAsUnread bool
UnreadCount uint32
}

const (
upsertHistorySyncConversationQuery = `
INSERT INTO whatsapp_history_sync_conversation (
bridge_id, user_login_id, chat_jid, last_message_timestamp, archived, pinned, mute_end_time,
end_of_history_transfer_type, ephemeral_expiration, ephemeral_setting_timestamp, marked_as_unread, unread_count
)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12)
ON CONFLICT (bridge_id, user_login_id, chat_jid)
DO UPDATE SET
last_message_timestamp=CASE
WHEN excluded.last_message_timestamp > whatsapp_history_sync_conversation.last_message_timestamp THEN excluded.last_message_timestamp
ELSE whatsapp_history_sync_conversation.last_message_timestamp
END,
ephemeral_expiration=COALESCE(excluded.ephemeral_expiration, whatsapp_history_sync_conversation.ephemeral_expiration),
ephemeral_setting_timestamp=COALESCE(excluded.ephemeral_setting_timestamp, whatsapp_history_sync_conversation.ephemeral_setting_timestamp),
end_of_history_transfer_type=excluded.end_of_history_transfer_type
`
getRecentConversations = `
SELECT
bridge_id, user_login_id, chat_jid, last_message_timestamp, archived, pinned, mute_end_time,
end_of_history_transfer_type, ephemeral_expiration, ephemeral_setting_timestamp, marked_as_unread, unread_count
FROM whatsapp_history_sync_conversation
WHERE bridge_id=$1 AND user_login_id=$2
ORDER BY last_message_timestamp DESC
LIMIT $3
`
getConversationByJID = `
SELECT
bridge_id, user_login_id, chat_jid, last_message_timestamp, archived, pinned, mute_end_time,
end_of_history_transfer_type, ephemeral_expiration, ephemeral_setting_timestamp, marked_as_unread, unread_count
FROM whatsapp_history_sync_conversation
WHERE bridge_id=$1 AND user_login_id=$2 AND chat_jid=$3
`
deleteAllConversationsQuery = "DELETE FROM whatsapp_history_sync_conversation WHERE bridge_id=$1 AND user_login_id=$2"
deleteConversationQuery = `
DELETE FROM whatsapp_history_sync_conversation
WHERE bridge_id=$1 AND user_login_id=$2 AND chat_jid=$3
`
)

func (cq *ConversationQuery) Put(ctx context.Context, conv *Conversation) error {
conv.BridgeID = cq.BridgeID
return cq.Exec(ctx, upsertHistorySyncConversationQuery, conv.sqlVariables()...)
}

func (cq *ConversationQuery) GetRecent(ctx context.Context, loginID networkid.UserLoginID, limit int) ([]*Conversation, error) {
limitPtr := &limit
// Negative limit on SQLite means unlimited, but Postgres prefers a NULL limit.
if limit < 0 && cq.GetDB().Dialect == dbutil.Postgres {
limitPtr = nil
}
return cq.QueryMany(ctx, getRecentConversations, cq.BridgeID, loginID, limitPtr)
}

func (cq *ConversationQuery) Get(ctx context.Context, loginID networkid.UserLoginID, chatJID types.JID) (*Conversation, error) {
return cq.QueryOne(ctx, getConversationByJID, cq.BridgeID, loginID, chatJID)
}

func (cq *ConversationQuery) DeleteAll(ctx context.Context, loginID networkid.UserLoginID) error {
return cq.Exec(ctx, deleteAllConversationsQuery, cq.BridgeID, loginID)
}

func (cq *ConversationQuery) Delete(ctx context.Context, loginID networkid.UserLoginID, chatJID types.JID) error {
return cq.Exec(ctx, deleteConversationQuery, cq.BridgeID, loginID, chatJID)
}

func (c *Conversation) sqlVariables() []any {
return []any{
c.BridgeID,
c.UserLoginID,
c.ChatJID,
c.LastMessageTimestamp.Unix(),
c.Archived,
c.Pinned,
c.MuteEndTime.Unix(),
c.EndOfHistoryTransferType,
int64(c.EphemeralExpiration.Seconds()),
c.EphemeralSettingTimestamp,
c.MarkedAsUnread,
c.UnreadCount,
}
}

func (c *Conversation) Scan(row dbutil.Scannable) (*Conversation, error) {
var lastMessageTS, muteEndTime, ephemeralExpiration int64
err := row.Scan(
&c.BridgeID,
&c.UserLoginID,
&c.ChatJID,
&lastMessageTS,
&c.Archived,
&c.Pinned,
&muteEndTime,
&c.EndOfHistoryTransferType,
&ephemeralExpiration,
&c.EphemeralSettingTimestamp,
&c.MarkedAsUnread,
&c.UnreadCount,
)
if err != nil {
return nil, err
}
if lastMessageTS != 0 {
c.LastMessageTimestamp = time.Unix(lastMessageTS, 0)
}
if muteEndTime != 0 {
c.MuteEndTime = time.Unix(muteEndTime, 0)
}
c.EphemeralExpiration = time.Duration(ephemeralExpiration) * time.Second
return c, nil
}
27 changes: 26 additions & 1 deletion pkg/connector/wadb/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,42 @@ package wadb
import (
"github.com/rs/zerolog"
"go.mau.fi/util/dbutil"
"maunium.net/go/mautrix/bridgev2/networkid"

"maunium.net/go/mautrix-whatsapp/pkg/connector/wadb/upgrades"
)

type Database struct {
*dbutil.Database
Conversation *ConversationQuery
Message *MessageQuery
PollOption *PollOptionQuery
MediaRequest *MediaRequestQuery
}

func New(db *dbutil.Database, log zerolog.Logger) *Database {
func New(bridgeID networkid.BridgeID, db *dbutil.Database, log zerolog.Logger) *Database {
db = db.Child("whatsapp_version", upgrades.Table, dbutil.ZeroLogger(log))
return &Database{
Database: db,
Conversation: &ConversationQuery{
BridgeID: bridgeID,
QueryHelper: dbutil.MakeQueryHelper(db, func(_ *dbutil.QueryHelper[*Conversation]) *Conversation {
return &Conversation{}
}),
},
Message: &MessageQuery{
BridgeID: bridgeID,
Database: db,
},
PollOption: &PollOptionQuery{
BridgeID: bridgeID,
Database: db,
},
MediaRequest: &MediaRequestQuery{
BridgeID: bridgeID,
QueryHelper: dbutil.MakeQueryHelper(db, func(_ *dbutil.QueryHelper[*MediaRequest]) *MediaRequest {
return &MediaRequest{}
}),
},
}
}
69 changes: 69 additions & 0 deletions pkg/connector/wadb/mediarequest.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package wadb

import (
"context"

"go.mau.fi/util/dbutil"
"maunium.net/go/mautrix/bridgev2/networkid"
"maunium.net/go/mautrix/id"
)

type MediaBackfillRequestStatus int

const (
MediaBackfillRequestStatusNotRequested MediaBackfillRequestStatus = 0
MediaBackfillRequestStatusRequested MediaBackfillRequestStatus = 1
MediaBackfillRequestStatusRequestFailed MediaBackfillRequestStatus = 2
)

type MediaRequestQuery struct {
BridgeID networkid.BridgeID
*dbutil.QueryHelper[*MediaRequest]
}

type MediaRequest struct {
BridgeID networkid.BridgeID
UserLoginID networkid.UserLoginID
PortalKey networkid.PortalKey
EventID id.EventID
MediaKey []byte
Status MediaBackfillRequestStatus
Error string
}

const (
upsertMediaRequestQuery = `
INSERT INTO whatsapp_media_backfill_request (
bridge_id, user_login_id, portal_id, portal_receiver, event_id, media_key, status, error
)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
ON CONFLICT (bridge_id, user_login_id, portal_id, portal_receiver, event_id) DO UPDATE SET
media_key=excluded.media_key, status=excluded.status, error=excluded.error
`
getAllUnrequestedMediaRequestsForUserLoginQuery = `
SELECT bridge_id, user_login_id, portal_id, portal_receiver, event_id, media_key, status, error
FROM whatsapp_media_backfill_request
WHERE bridge_id=$1 AND user_login_id=$2 AND status=0
`
)

func (mrq *MediaRequestQuery) Put(ctx context.Context, mr *MediaRequest) error {
mr.BridgeID = mrq.BridgeID
return mrq.Exec(ctx, upsertMediaRequestQuery, mr.sqlVariables()...)
}

func (mrq *MediaRequestQuery) GetUnrequestedForUserLogin(ctx context.Context, loginID networkid.UserLoginID) ([]*MediaRequest, error) {
return mrq.QueryMany(ctx, getAllUnrequestedMediaRequestsForUserLoginQuery, mrq.BridgeID, loginID)
}

func (mr *MediaRequest) Scan(row dbutil.Scannable) (*MediaRequest, error) {
err := row.Scan(&mr.BridgeID, &mr.UserLoginID, &mr.PortalKey.ID, &mr.PortalKey.Receiver, &mr.EventID, &mr.MediaKey, &mr.Status, &mr.Error)
if err != nil {
return nil, err
}
return mr, nil
}

func (mr *MediaRequest) sqlVariables() []any {
return []any{mr.BridgeID, mr.UserLoginID, mr.PortalKey.ID, mr.PortalKey.Receiver, mr.EventID, mr.MediaKey, mr.Status, mr.Error}
}
Loading

0 comments on commit edf6b17

Please sign in to comment.