mirror of
				https://github.com/superseriousbusiness/gotosocial.git
				synced 2025-11-03 18:52:24 -06:00 
			
		
		
		
	* [bugfix] Fix temp table deletion causing runaway allocations * move some vars around * small fixes * rely on conn max age to recycle temp tables * fackin' ell m8
		
			
				
	
	
		
			572 lines
		
	
	
	
		
			18 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			572 lines
		
	
	
	
		
			18 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
// GoToSocial
 | 
						|
// Copyright (C) GoToSocial Authors admin@gotosocial.org
 | 
						|
// SPDX-License-Identifier: AGPL-3.0-or-later
 | 
						|
//
 | 
						|
// This program is free software: you can redistribute it and/or modify
 | 
						|
// it under the terms of the GNU Affero General Public License as published by
 | 
						|
// the Free Software Foundation, either version 3 of the License, or
 | 
						|
// (at your option) any later version.
 | 
						|
//
 | 
						|
// This program is distributed in the hope that it will be useful,
 | 
						|
// but WITHOUT ANY WARRANTY; without even the implied warranty of
 | 
						|
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 | 
						|
// GNU Affero General Public License for more details.
 | 
						|
//
 | 
						|
// You should have received a copy of the GNU Affero General Public License
 | 
						|
// along with this program.  If not, see <http://www.gnu.org/licenses/>.
 | 
						|
 | 
						|
package bundb
 | 
						|
 | 
						|
import (
 | 
						|
	"context"
 | 
						|
	"errors"
 | 
						|
	"slices"
 | 
						|
	"time"
 | 
						|
 | 
						|
	"github.com/superseriousbusiness/gotosocial/internal/db"
 | 
						|
	"github.com/superseriousbusiness/gotosocial/internal/gtscontext"
 | 
						|
	"github.com/superseriousbusiness/gotosocial/internal/gtserror"
 | 
						|
	"github.com/superseriousbusiness/gotosocial/internal/gtsmodel"
 | 
						|
	"github.com/superseriousbusiness/gotosocial/internal/id"
 | 
						|
	"github.com/superseriousbusiness/gotosocial/internal/log"
 | 
						|
	"github.com/superseriousbusiness/gotosocial/internal/paging"
 | 
						|
	"github.com/superseriousbusiness/gotosocial/internal/state"
 | 
						|
	"github.com/superseriousbusiness/gotosocial/internal/util"
 | 
						|
	"github.com/uptrace/bun"
 | 
						|
	"github.com/uptrace/bun/dialect"
 | 
						|
)
 | 
						|
 | 
						|
type conversationDB struct {
 | 
						|
	db    *bun.DB
 | 
						|
	state *state.State
 | 
						|
}
 | 
						|
 | 
						|
func (c *conversationDB) GetConversationByID(ctx context.Context, id string) (*gtsmodel.Conversation, error) {
 | 
						|
	return c.getConversation(
 | 
						|
		ctx,
 | 
						|
		"ID",
 | 
						|
		func(conversation *gtsmodel.Conversation) error {
 | 
						|
			return c.db.
 | 
						|
				NewSelect().
 | 
						|
				Model(conversation).
 | 
						|
				Where("? = ?", bun.Ident("id"), id).
 | 
						|
				Scan(ctx)
 | 
						|
		},
 | 
						|
		id,
 | 
						|
	)
 | 
						|
}
 | 
						|
 | 
						|
func (c *conversationDB) GetConversationByThreadAndAccountIDs(ctx context.Context, threadID string, accountID string, otherAccountIDs []string) (*gtsmodel.Conversation, error) {
 | 
						|
	otherAccountsKey := gtsmodel.ConversationOtherAccountsKey(otherAccountIDs)
 | 
						|
	return c.getConversation(
 | 
						|
		ctx,
 | 
						|
		"ThreadID,AccountID,OtherAccountsKey",
 | 
						|
		func(conversation *gtsmodel.Conversation) error {
 | 
						|
			return c.db.
 | 
						|
				NewSelect().
 | 
						|
				Model(conversation).
 | 
						|
				Where("? = ?", bun.Ident("thread_id"), threadID).
 | 
						|
				Where("? = ?", bun.Ident("account_id"), accountID).
 | 
						|
				Where("? = ?", bun.Ident("other_accounts_key"), otherAccountsKey).
 | 
						|
				Scan(ctx)
 | 
						|
		},
 | 
						|
		threadID,
 | 
						|
		accountID,
 | 
						|
		otherAccountsKey,
 | 
						|
	)
 | 
						|
}
 | 
						|
 | 
						|
func (c *conversationDB) getConversation(
 | 
						|
	ctx context.Context,
 | 
						|
	lookup string,
 | 
						|
	dbQuery func(conversation *gtsmodel.Conversation) error,
 | 
						|
	keyParts ...any,
 | 
						|
) (*gtsmodel.Conversation, error) {
 | 
						|
	// Fetch conversation from cache with loader callback
 | 
						|
	conversation, err := c.state.Caches.DB.Conversation.LoadOne(lookup, func() (*gtsmodel.Conversation, error) {
 | 
						|
		var conversation gtsmodel.Conversation
 | 
						|
 | 
						|
		// Not cached! Perform database query
 | 
						|
		if err := dbQuery(&conversation); err != nil {
 | 
						|
			return nil, err
 | 
						|
		}
 | 
						|
 | 
						|
		return &conversation, nil
 | 
						|
	}, keyParts...)
 | 
						|
	if err != nil {
 | 
						|
		// already processe
 | 
						|
		return nil, err
 | 
						|
	}
 | 
						|
 | 
						|
	if gtscontext.Barebones(ctx) {
 | 
						|
		// Only a barebones model was requested.
 | 
						|
		return conversation, nil
 | 
						|
	}
 | 
						|
 | 
						|
	if err := c.populateConversation(ctx, conversation); err != nil {
 | 
						|
		return nil, err
 | 
						|
	}
 | 
						|
 | 
						|
	return conversation, nil
 | 
						|
}
 | 
						|
 | 
						|
func (c *conversationDB) populateConversation(ctx context.Context, conversation *gtsmodel.Conversation) error {
 | 
						|
	var (
 | 
						|
		errs gtserror.MultiError
 | 
						|
		err  error
 | 
						|
	)
 | 
						|
 | 
						|
	if conversation.Account == nil {
 | 
						|
		conversation.Account, err = c.state.DB.GetAccountByID(
 | 
						|
			gtscontext.SetBarebones(ctx),
 | 
						|
			conversation.AccountID,
 | 
						|
		)
 | 
						|
		if err != nil {
 | 
						|
			errs.Appendf("error populating conversation owner account: %w", err)
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	if conversation.OtherAccounts == nil {
 | 
						|
		conversation.OtherAccounts, err = c.state.DB.GetAccountsByIDs(
 | 
						|
			gtscontext.SetBarebones(ctx),
 | 
						|
			conversation.OtherAccountIDs,
 | 
						|
		)
 | 
						|
		if err != nil {
 | 
						|
			errs.Appendf("error populating other conversation accounts: %w", err)
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	if conversation.LastStatus == nil && conversation.LastStatusID != "" {
 | 
						|
		conversation.LastStatus, err = c.state.DB.GetStatusByID(
 | 
						|
			gtscontext.SetBarebones(ctx),
 | 
						|
			conversation.LastStatusID,
 | 
						|
		)
 | 
						|
		if err != nil {
 | 
						|
			errs.Appendf("error populating conversation last status: %w", err)
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	return errs.Combine()
 | 
						|
}
 | 
						|
 | 
						|
func (c *conversationDB) GetConversationsByOwnerAccountID(ctx context.Context, accountID string, page *paging.Page) ([]*gtsmodel.Conversation, error) {
 | 
						|
	conversationLastStatusIDs, err := c.getAccountConversationLastStatusIDs(ctx, accountID, page)
 | 
						|
	if err != nil {
 | 
						|
		return nil, err
 | 
						|
	}
 | 
						|
	return c.getConversationsByLastStatusIDs(ctx, accountID, conversationLastStatusIDs)
 | 
						|
}
 | 
						|
 | 
						|
func (c *conversationDB) getAccountConversationLastStatusIDs(ctx context.Context, accountID string, page *paging.Page) ([]string, error) {
 | 
						|
	return loadPagedIDs(&c.state.Caches.DB.ConversationLastStatusIDs, accountID, page, func() ([]string, error) {
 | 
						|
		var conversationLastStatusIDs []string
 | 
						|
 | 
						|
		// Conversation last status IDs not in cache. Perform DB query.
 | 
						|
		if _, err := c.db.
 | 
						|
			NewSelect().
 | 
						|
			Model((*gtsmodel.Conversation)(nil)).
 | 
						|
			Column("last_status_id").
 | 
						|
			Where("? = ?", bun.Ident("account_id"), accountID).
 | 
						|
			OrderExpr("? DESC", bun.Ident("last_status_id")).
 | 
						|
			Exec(ctx, &conversationLastStatusIDs); // nocollapse
 | 
						|
		err != nil && !errors.Is(err, db.ErrNoEntries) {
 | 
						|
			return nil, err
 | 
						|
		}
 | 
						|
 | 
						|
		return conversationLastStatusIDs, nil
 | 
						|
	})
 | 
						|
}
 | 
						|
 | 
						|
func (c *conversationDB) getConversationsByLastStatusIDs(
 | 
						|
	ctx context.Context,
 | 
						|
	accountID string,
 | 
						|
	conversationLastStatusIDs []string,
 | 
						|
) ([]*gtsmodel.Conversation, error) {
 | 
						|
	// Load all conversation IDs via cache loader callbacks.
 | 
						|
	conversations, err := c.state.Caches.DB.Conversation.LoadIDs2Part(
 | 
						|
		"AccountID,LastStatusID",
 | 
						|
		accountID,
 | 
						|
		conversationLastStatusIDs,
 | 
						|
		func(accountID string, uncached []string) ([]*gtsmodel.Conversation, error) {
 | 
						|
			// Avoid querying
 | 
						|
			// if none uncached.
 | 
						|
			count := len(uncached)
 | 
						|
			if count == 0 {
 | 
						|
				return nil, nil
 | 
						|
			}
 | 
						|
 | 
						|
			// Preallocate expected length of uncached conversations.
 | 
						|
			conversations := make([]*gtsmodel.Conversation, 0, count)
 | 
						|
 | 
						|
			// Perform database query scanning the remaining (uncached) IDs.
 | 
						|
			if err := c.db.NewSelect().
 | 
						|
				Model(&conversations).
 | 
						|
				Where("? = ?", bun.Ident("account_id"), accountID).
 | 
						|
				Where("? IN (?)", bun.Ident("last_status_id"), bun.In(uncached)).
 | 
						|
				Scan(ctx); err != nil {
 | 
						|
				return nil, err
 | 
						|
			}
 | 
						|
 | 
						|
			return conversations, nil
 | 
						|
		},
 | 
						|
	)
 | 
						|
	if err != nil {
 | 
						|
		return nil, err
 | 
						|
	}
 | 
						|
 | 
						|
	// Reorder the conversations by their last status IDs to ensure correct order.
 | 
						|
	getID := func(b *gtsmodel.Conversation) string { return b.ID }
 | 
						|
	util.OrderBy(conversations, conversationLastStatusIDs, getID)
 | 
						|
 | 
						|
	if gtscontext.Barebones(ctx) {
 | 
						|
		// no need to fully populate.
 | 
						|
		return conversations, nil
 | 
						|
	}
 | 
						|
 | 
						|
	// Populate all loaded conversations, removing those we fail to populate.
 | 
						|
	conversations = slices.DeleteFunc(conversations, func(conversation *gtsmodel.Conversation) bool {
 | 
						|
		if err := c.populateConversation(ctx, conversation); err != nil {
 | 
						|
			log.Errorf(ctx, "error populating conversation %s: %v", conversation.ID, err)
 | 
						|
			return true
 | 
						|
		}
 | 
						|
		return false
 | 
						|
	})
 | 
						|
 | 
						|
	return conversations, nil
 | 
						|
}
 | 
						|
 | 
						|
func (c *conversationDB) UpsertConversation(ctx context.Context, conversation *gtsmodel.Conversation, columns ...string) error {
 | 
						|
	// If we're updating by column, ensure "updated_at" is included.
 | 
						|
	if len(columns) > 0 {
 | 
						|
		columns = append(columns, "updated_at")
 | 
						|
	}
 | 
						|
 | 
						|
	return c.state.Caches.DB.Conversation.Store(conversation, func() error {
 | 
						|
		_, err := NewUpsert(c.db).
 | 
						|
			Model(conversation).
 | 
						|
			Constraint("id").
 | 
						|
			Column(columns...).
 | 
						|
			Exec(ctx)
 | 
						|
		return err
 | 
						|
	})
 | 
						|
}
 | 
						|
 | 
						|
func (c *conversationDB) LinkConversationToStatus(ctx context.Context, conversationID string, statusID string) error {
 | 
						|
	conversationToStatus := >smodel.ConversationToStatus{
 | 
						|
		ConversationID: conversationID,
 | 
						|
		StatusID:       statusID,
 | 
						|
	}
 | 
						|
 | 
						|
	if _, err := c.db.NewInsert().
 | 
						|
		Model(conversationToStatus).
 | 
						|
		Exec(ctx); // nocollapse
 | 
						|
	err != nil {
 | 
						|
		return err
 | 
						|
	}
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
func (c *conversationDB) DeleteConversationByID(ctx context.Context, id string) error {
 | 
						|
	// Load conversation into cache before attempting a delete,
 | 
						|
	// as we need it cached in order to trigger the invalidate
 | 
						|
	// callback. This in turn invalidates others.
 | 
						|
	_, err := c.GetConversationByID(gtscontext.SetBarebones(ctx), id)
 | 
						|
	if err != nil {
 | 
						|
		if errors.Is(err, db.ErrNoEntries) {
 | 
						|
			// not an issue.
 | 
						|
			err = nil
 | 
						|
		}
 | 
						|
		return err
 | 
						|
	}
 | 
						|
 | 
						|
	// Drop this now-cached conversation on return after delete.
 | 
						|
	defer c.state.Caches.DB.Conversation.Invalidate("ID", id)
 | 
						|
 | 
						|
	// Finally delete conversation from DB.
 | 
						|
	_, err = c.db.NewDelete().
 | 
						|
		Model((*gtsmodel.Conversation)(nil)).
 | 
						|
		Where("? = ?", bun.Ident("id"), id).
 | 
						|
		Exec(ctx)
 | 
						|
	return err
 | 
						|
}
 | 
						|
 | 
						|
func (c *conversationDB) DeleteConversationsByOwnerAccountID(ctx context.Context, accountID string) error {
 | 
						|
	defer func() {
 | 
						|
		// Invalidate any cached conversations and conversation IDs owned by this account on return.
 | 
						|
		// Conversation invalidate hooks only invalidate the conversation ID cache,
 | 
						|
		// so we don't need to load all conversations into the cache to run invalidation hooks,
 | 
						|
		// as with some other object types (blocks, for example).
 | 
						|
		c.state.Caches.DB.Conversation.Invalidate("AccountID", accountID)
 | 
						|
		// In case there were no cached conversations,
 | 
						|
		// explicitly invalidate the user's conversation last status ID cache.
 | 
						|
		c.state.Caches.DB.ConversationLastStatusIDs.Invalidate(accountID)
 | 
						|
	}()
 | 
						|
 | 
						|
	return c.db.RunInTx(ctx, nil, func(ctx context.Context, tx bun.Tx) error {
 | 
						|
		// Delete conversations matching the account ID.
 | 
						|
		deletedConversationIDs := []string{}
 | 
						|
		if err := tx.NewDelete().
 | 
						|
			Model((*gtsmodel.Conversation)(nil)).
 | 
						|
			Where("? = ?", bun.Ident("account_id"), accountID).
 | 
						|
			Returning("?", bun.Ident("id")).
 | 
						|
			Scan(ctx, &deletedConversationIDs); // nocollapse
 | 
						|
		err != nil {
 | 
						|
			return gtserror.Newf("error deleting conversations for account %s: %w", accountID, err)
 | 
						|
		}
 | 
						|
 | 
						|
		if len(deletedConversationIDs) == 0 {
 | 
						|
			// Nothing
 | 
						|
			// to delete.
 | 
						|
			return nil
 | 
						|
		}
 | 
						|
 | 
						|
		// Delete any conversation-to-status links
 | 
						|
		// matching the deleted conversation IDs.
 | 
						|
		if _, err := tx.NewDelete().
 | 
						|
			Model((*gtsmodel.ConversationToStatus)(nil)).
 | 
						|
			Where("? IN (?)", bun.Ident("conversation_id"), bun.In(deletedConversationIDs)).
 | 
						|
			Exec(ctx); // nocollapse
 | 
						|
		err != nil {
 | 
						|
			return gtserror.Newf("error deleting conversation-to-status links for account %s: %w", accountID, err)
 | 
						|
		}
 | 
						|
 | 
						|
		return nil
 | 
						|
	})
 | 
						|
}
 | 
						|
 | 
						|
func (c *conversationDB) DeleteStatusFromConversations(ctx context.Context, statusID string) error {
 | 
						|
	var (
 | 
						|
		updatedConversationIDs = []string{}
 | 
						|
		deletedConversationIDs = []string{}
 | 
						|
 | 
						|
		// Method of creating + dropping temp
 | 
						|
		// tables differs depending on driver.
 | 
						|
		tmpQ string
 | 
						|
	)
 | 
						|
 | 
						|
	if c.db.Dialect().Name() == dialect.PG {
 | 
						|
		// On Postgres, we can instruct PG to clean
 | 
						|
		// up temp tables on commit, so we can just
 | 
						|
		// use any connection from the pool without
 | 
						|
		// caring what happens to it when we're done.
 | 
						|
		tmpQ = "CREATE TEMPORARY TABLE ? ON COMMIT DROP AS (?)"
 | 
						|
	} else {
 | 
						|
		// On SQLite, we can't instruct SQLite to drop
 | 
						|
		// temp tables on commit, and we can't manually
 | 
						|
		// drop temp tables without triggering a bug.
 | 
						|
		// So we leave the temp tables alone, in the
 | 
						|
		// knowledge they'll be cleaned up when this
 | 
						|
		// connection gets recycled (in max 5min).
 | 
						|
		tmpQ = "CREATE TEMPORARY TABLE ? AS ?"
 | 
						|
	}
 | 
						|
 | 
						|
	if err := c.db.RunInTx(ctx, nil, func(ctx context.Context, tx bun.Tx) error {
 | 
						|
		// First delete this status from
 | 
						|
		// conversation-to-status links.
 | 
						|
		_, err := tx.
 | 
						|
			NewDelete().
 | 
						|
			Table("conversation_to_statuses").
 | 
						|
			Where("? = ?", bun.Ident("status_id"), statusID).
 | 
						|
			Exec(ctx)
 | 
						|
		if err != nil {
 | 
						|
			return gtserror.Newf(
 | 
						|
				"error deleting conversation-to-status links while deleting status %s: %w",
 | 
						|
				statusID, err,
 | 
						|
			)
 | 
						|
		}
 | 
						|
 | 
						|
		// Note: Bun doesn't currently support `CREATE TABLE … AS SELECT …`
 | 
						|
		// so we need to use raw queries to create temporary tables.
 | 
						|
 | 
						|
		// Create a temporary table containing all statuses other than
 | 
						|
		// the deleted status, in each conversation for which the deleted
 | 
						|
		// status is the last status, if there are such statuses.
 | 
						|
		//
 | 
						|
		// This will produce a query like:
 | 
						|
		//
 | 
						|
		//	CREATE TEMPORARY TABLE "conversation_statuses_01J78T2AR0YCZ4YR12WSCZ608S"
 | 
						|
		//	  AS (
 | 
						|
		//	    SELECT
 | 
						|
		//	      "conversations"."id" AS "conversation_id",
 | 
						|
		//	      "conversation_to_statuses"."status_id" AS "id",
 | 
						|
		//	      "statuses"."created_at"
 | 
						|
		//	    FROM
 | 
						|
		//	      "conversations"
 | 
						|
		//	      LEFT JOIN "conversation_to_statuses" ON (
 | 
						|
		//	        "conversations"."id" = "conversation_to_statuses"."conversation_id"
 | 
						|
		//	      )
 | 
						|
		//	      AND (
 | 
						|
		//	        "conversation_to_statuses"."status_id" != '01J78T2BQ4TN5S2XSC9VNQ5GBS'
 | 
						|
		//	      )
 | 
						|
		//	      LEFT JOIN "statuses" ON (
 | 
						|
		//	        "conversation_to_statuses"."status_id" = "statuses"."id"
 | 
						|
		//	      )
 | 
						|
		//	    WHERE
 | 
						|
		//	      (
 | 
						|
		//	        "conversations"."last_status_id" = '01J78T2BQ4TN5S2XSC9VNQ5GBS'
 | 
						|
		//	      )
 | 
						|
		//	  )
 | 
						|
		conversationStatusesTmp := "conversation_statuses_" + id.NewULID()
 | 
						|
		conversationStatusesTmpQ := tx.NewRaw(
 | 
						|
			tmpQ,
 | 
						|
			bun.Ident(conversationStatusesTmp),
 | 
						|
			tx.NewSelect().
 | 
						|
				ColumnExpr(
 | 
						|
					"? AS ?",
 | 
						|
					bun.Ident("conversations.id"),
 | 
						|
					bun.Ident("conversation_id"),
 | 
						|
				).
 | 
						|
				ColumnExpr(
 | 
						|
					"? AS ?",
 | 
						|
					bun.Ident("conversation_to_statuses.status_id"),
 | 
						|
					bun.Ident("id"),
 | 
						|
				).
 | 
						|
				Column("statuses.created_at").
 | 
						|
				Table("conversations").
 | 
						|
				Join("LEFT JOIN ?", bun.Ident("conversation_to_statuses")).
 | 
						|
				JoinOn(
 | 
						|
					"? = ?",
 | 
						|
					bun.Ident("conversations.id"),
 | 
						|
					bun.Ident("conversation_to_statuses.conversation_id"),
 | 
						|
				).
 | 
						|
				JoinOn(
 | 
						|
					"? != ?",
 | 
						|
					bun.Ident("conversation_to_statuses.status_id"),
 | 
						|
					statusID,
 | 
						|
				).
 | 
						|
				Join("LEFT JOIN ?", bun.Ident("statuses")).
 | 
						|
				JoinOn(
 | 
						|
					"? = ?",
 | 
						|
					bun.Ident("conversation_to_statuses.status_id"),
 | 
						|
					bun.Ident("statuses.id"),
 | 
						|
				).
 | 
						|
				Where(
 | 
						|
					"? = ?",
 | 
						|
					bun.Ident("conversations.last_status_id"),
 | 
						|
					statusID,
 | 
						|
				),
 | 
						|
		)
 | 
						|
		_, err = conversationStatusesTmpQ.Exec(ctx)
 | 
						|
		if err != nil {
 | 
						|
			return gtserror.Newf(
 | 
						|
				"error creating temp table %s while deleting status %s: %w",
 | 
						|
				conversationStatusesTmp, statusID, err,
 | 
						|
			)
 | 
						|
		}
 | 
						|
 | 
						|
		// Create a temporary table with the most recently created
 | 
						|
		// status in each conversation for which the deleted status
 | 
						|
		// is the last status, if there is such a status.
 | 
						|
		//
 | 
						|
		// This will produce a query like:
 | 
						|
		//
 | 
						|
		//	CREATE TEMPORARY TABLE "latest_conversation_statuses_01J78T2AR0E46SJSH6C7NRZ7MR"
 | 
						|
		//	  AS (
 | 
						|
		//	    SELECT
 | 
						|
		//	      "conversation_statuses"."conversation_id",
 | 
						|
		//	      "conversation_statuses"."id"
 | 
						|
		//	    FROM
 | 
						|
		//	      "conversation_statuses_01J78T2AR0YCZ4YR12WSCZ608S" AS "conversation_statuses"
 | 
						|
		//	      LEFT JOIN "conversation_statuses_01J78T2AR0YCZ4YR12WSCZ608S" AS "later_statuses" ON (
 | 
						|
		//	        "conversation_statuses"."conversation_id" = "later_statuses"."conversation_id"
 | 
						|
		//	      )
 | 
						|
		//	      AND (
 | 
						|
		//	        "later_statuses"."created_at" > "conversation_statuses"."created_at"
 | 
						|
		//	      )
 | 
						|
		//	    WHERE
 | 
						|
		//	      ("later_statuses"."id" IS NULL)
 | 
						|
		//	  )
 | 
						|
		latestConversationStatusesTmp := "latest_conversation_statuses_" + id.NewULID()
 | 
						|
		latestConversationStatusesTmpQ := tx.NewRaw(
 | 
						|
			tmpQ,
 | 
						|
			bun.Ident(latestConversationStatusesTmp),
 | 
						|
			tx.NewSelect().
 | 
						|
				Column(
 | 
						|
					"conversation_statuses.conversation_id",
 | 
						|
					"conversation_statuses.id",
 | 
						|
				).
 | 
						|
				TableExpr(
 | 
						|
					"? AS ?",
 | 
						|
					bun.Ident(conversationStatusesTmp),
 | 
						|
					bun.Ident("conversation_statuses"),
 | 
						|
				).
 | 
						|
				Join(
 | 
						|
					"LEFT JOIN ? AS ?",
 | 
						|
					bun.Ident(conversationStatusesTmp),
 | 
						|
					bun.Ident("later_statuses"),
 | 
						|
				).
 | 
						|
				JoinOn(
 | 
						|
					"? = ?",
 | 
						|
					bun.Ident("conversation_statuses.conversation_id"),
 | 
						|
					bun.Ident("later_statuses.conversation_id"),
 | 
						|
				).
 | 
						|
				JoinOn(
 | 
						|
					"? > ?",
 | 
						|
					bun.Ident("later_statuses.created_at"),
 | 
						|
					bun.Ident("conversation_statuses.created_at"),
 | 
						|
				).
 | 
						|
				Where("? IS NULL", bun.Ident("later_statuses.id")),
 | 
						|
		)
 | 
						|
		_, err = latestConversationStatusesTmpQ.Exec(ctx)
 | 
						|
		if err != nil {
 | 
						|
			return gtserror.Newf(
 | 
						|
				"error creating temp table %s while deleting status %s: %w",
 | 
						|
				conversationStatusesTmp, statusID, err,
 | 
						|
			)
 | 
						|
		}
 | 
						|
 | 
						|
		// For every conversation where the given status was the last one,
 | 
						|
		// reset its last status to the most recently created in the
 | 
						|
		// conversation other than that one, if there is such a status.
 | 
						|
		// Return conversation IDs for invalidation.
 | 
						|
		updateQ := tx.NewUpdate().
 | 
						|
			Table("conversations").
 | 
						|
			TableExpr("? AS ?", bun.Ident(latestConversationStatusesTmp), bun.Ident("latest_conversation_statuses")).
 | 
						|
			Set("? = ?", bun.Ident("last_status_id"), bun.Ident("latest_conversation_statuses.id")).
 | 
						|
			Set("? = ?", bun.Ident("updated_at"), time.Now()).
 | 
						|
			Where("? = ?", bun.Ident("conversations.id"), bun.Ident("latest_conversation_statuses.conversation_id")).
 | 
						|
			Where("? IS NOT NULL", bun.Ident("latest_conversation_statuses.id")).
 | 
						|
			Returning("?", bun.Ident("conversations.id"))
 | 
						|
		_, err = updateQ.Exec(ctx, &updatedConversationIDs)
 | 
						|
		if err != nil {
 | 
						|
			return gtserror.Newf(
 | 
						|
				"error rolling back last status for conversation while deleting status %s: %w",
 | 
						|
				statusID, err,
 | 
						|
			)
 | 
						|
		}
 | 
						|
 | 
						|
		// If there is no such status,
 | 
						|
		// just delete the conversation.
 | 
						|
		// Return IDs for invalidation.
 | 
						|
		_, err = tx.
 | 
						|
			NewDelete().
 | 
						|
			Table("conversations").
 | 
						|
			Where(
 | 
						|
				"? IN (?)",
 | 
						|
				bun.Ident("id"),
 | 
						|
				tx.NewSelect().
 | 
						|
					Table(latestConversationStatusesTmp).
 | 
						|
					Column("conversation_id").
 | 
						|
					Where("? IS NULL", bun.Ident("id")),
 | 
						|
			).
 | 
						|
			Returning("?", bun.Ident("id")).
 | 
						|
			Exec(ctx, &deletedConversationIDs)
 | 
						|
		if err != nil {
 | 
						|
			return gtserror.Newf(
 | 
						|
				"error deleting conversation while deleting status %s: %w",
 | 
						|
				statusID, err,
 | 
						|
			)
 | 
						|
		}
 | 
						|
 | 
						|
		return nil
 | 
						|
	}); err != nil {
 | 
						|
		return err
 | 
						|
	}
 | 
						|
 | 
						|
	// Invalidate cache entries.
 | 
						|
	updatedConversationIDs = append(updatedConversationIDs, deletedConversationIDs...)
 | 
						|
	updatedConversationIDs = util.Deduplicate(updatedConversationIDs)
 | 
						|
	c.state.Caches.DB.Conversation.InvalidateIDs("ID", updatedConversationIDs)
 | 
						|
 | 
						|
	return nil
 | 
						|
}
 |