mirror of
				https://github.com/superseriousbusiness/gotosocial.git
				synced 2025-10-31 11:02:25 -05:00 
			
		
		
		
	it's coming along
This commit is contained in:
		
					parent
					
						
							
								8232400ff0
							
						
					
				
			
			
				commit
				
					
						c23075cac2
					
				
			
		
					 14 changed files with 725 additions and 198 deletions
				
			
		|  | @ -25,6 +25,7 @@ import ( | ||||||
| 	"github.com/superseriousbusiness/gotosocial/internal/api/client/search" | 	"github.com/superseriousbusiness/gotosocial/internal/api/client/search" | ||||||
| 	"github.com/superseriousbusiness/gotosocial/internal/api/client/status" | 	"github.com/superseriousbusiness/gotosocial/internal/api/client/status" | ||||||
| 	"github.com/superseriousbusiness/gotosocial/internal/api/client/timeline" | 	"github.com/superseriousbusiness/gotosocial/internal/api/client/timeline" | ||||||
|  | 	timelineprocessing "github.com/superseriousbusiness/gotosocial/internal/processing/timeline" | ||||||
| 	"github.com/superseriousbusiness/gotosocial/internal/api/s2s/user" | 	"github.com/superseriousbusiness/gotosocial/internal/api/s2s/user" | ||||||
| 	"github.com/superseriousbusiness/gotosocial/internal/api/s2s/webfinger" | 	"github.com/superseriousbusiness/gotosocial/internal/api/s2s/webfinger" | ||||||
| 	"github.com/superseriousbusiness/gotosocial/internal/api/security" | 	"github.com/superseriousbusiness/gotosocial/internal/api/security" | ||||||
|  | @ -88,13 +89,14 @@ var Start cliactions.GTSAction = func(ctx context.Context, c *config.Config, log | ||||||
| 
 | 
 | ||||||
| 	// build converters and util | 	// build converters and util | ||||||
| 	typeConverter := typeutils.NewConverter(c, dbService) | 	typeConverter := typeutils.NewConverter(c, dbService) | ||||||
|  | 	timelineManager := timelineprocessing.NewManager(dbService, typeConverter, c, log) | ||||||
| 
 | 
 | ||||||
| 	// build backend handlers | 	// build backend handlers | ||||||
| 	mediaHandler := media.New(c, dbService, storageBackend, log) | 	mediaHandler := media.New(c, dbService, storageBackend, log) | ||||||
| 	oauthServer := oauth.New(dbService, log) | 	oauthServer := oauth.New(dbService, log) | ||||||
| 	transportController := transport.NewController(c, &federation.Clock{}, http.DefaultClient, log) | 	transportController := transport.NewController(c, &federation.Clock{}, http.DefaultClient, log) | ||||||
| 	federator := federation.NewFederator(dbService, federatingDB, transportController, c, log, typeConverter) | 	federator := federation.NewFederator(dbService, federatingDB, transportController, c, log, typeConverter) | ||||||
| 	processor := processing.NewProcessor(c, typeConverter, federator, oauthServer, mediaHandler, storageBackend, dbService, log) | 	processor := processing.NewProcessor(c, typeConverter, federator, oauthServer, mediaHandler, storageBackend, timelineManager, dbService, log) | ||||||
| 	if err := processor.Start(); err != nil { | 	if err := processor.Start(); err != nil { | ||||||
| 		return fmt.Errorf("error starting processor: %s", err) | 		return fmt.Errorf("error starting processor: %s", err) | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
|  | @ -257,6 +257,8 @@ type DB interface { | ||||||
| 	// This slice will be unfiltered, not taking account of blocks and whatnot, so filter it before serving it back to a user. | 	// This slice will be unfiltered, not taking account of blocks and whatnot, so filter it before serving it back to a user. | ||||||
| 	WhoBoostedStatus(status *gtsmodel.Status) ([]*gtsmodel.Account, error) | 	WhoBoostedStatus(status *gtsmodel.Status) ([]*gtsmodel.Account, error) | ||||||
| 
 | 
 | ||||||
|  | 	GetStatusesWhereFollowing(accountID string, limit int, offsetStatusID string) ([]*gtsmodel.Status, error) | ||||||
|  | 
 | ||||||
| 	// GetHomeTimelineForAccount fetches the account's HOME timeline -- ie., posts and replies from people they *follow*. | 	// GetHomeTimelineForAccount fetches the account's HOME timeline -- ie., posts and replies from people they *follow*. | ||||||
| 	// It will use the given filters and try to return as many statuses up to the limit as possible. | 	// It will use the given filters and try to return as many statuses up to the limit as possible. | ||||||
| 	GetHomeTimelineForAccount(accountID string, maxID string, sinceID string, minID string, limit int, local bool) ([]*gtsmodel.Status, error) | 	GetHomeTimelineForAccount(accountID string, maxID string, sinceID string, minID string, limit int, local bool) ([]*gtsmodel.Status, error) | ||||||
|  |  | ||||||
|  | @ -223,12 +223,16 @@ func (ps *postgresService) GetWhere(where []db.Where, i interface{}) error { | ||||||
| 
 | 
 | ||||||
| 	q := ps.conn.Model(i) | 	q := ps.conn.Model(i) | ||||||
| 	for _, w := range where { | 	for _, w := range where { | ||||||
|  | 
 | ||||||
|  | 		if w.Value == nil { | ||||||
|  | 			q = q.Where("? IS NULL", pg.Ident(w.Key)) | ||||||
|  | 		} else { | ||||||
| 			if w.CaseInsensitive { | 			if w.CaseInsensitive { | ||||||
| 				q = q.Where("LOWER(?) = LOWER(?)", pg.Safe(w.Key), w.Value) | 				q = q.Where("LOWER(?) = LOWER(?)", pg.Safe(w.Key), w.Value) | ||||||
| 			} else { | 			} else { | ||||||
| 				q = q.Where("? = ?", pg.Safe(w.Key), w.Value) | 				q = q.Where("? = ?", pg.Safe(w.Key), w.Value) | ||||||
| 			} | 			} | ||||||
| 
 | 		} | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	if err := q.Select(); err != nil { | 	if err := q.Select(); err != nil { | ||||||
|  | @ -964,6 +968,16 @@ func (ps *postgresService) PullRelevantAccountsFromStatus(targetStatus *gtsmodel | ||||||
| 		MentionedAccounts: []*gtsmodel.Account{}, | 		MentionedAccounts: []*gtsmodel.Account{}, | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
|  | 	// get the author account | ||||||
|  | 	if targetStatus.GTSAuthorAccount == nil { | ||||||
|  | 		statusAuthor := >smodel.Account{} | ||||||
|  | 		if err := ps.conn.Model(statusAuthor).Where("id = ?", targetStatus.AccountID).Select(); err != nil { | ||||||
|  | 			return accounts, fmt.Errorf("PullRelevantAccountsFromStatus: error getting statusAuthor with id %s: %s", targetStatus.AccountID, err) | ||||||
|  | 		} | ||||||
|  | 		targetStatus.GTSAuthorAccount = statusAuthor | ||||||
|  | 	} | ||||||
|  | 	accounts.StatusAuthor = targetStatus.GTSAuthorAccount | ||||||
|  | 
 | ||||||
| 	// get the replied to account from the status and add it to the pile | 	// get the replied to account from the status and add it to the pile | ||||||
| 	if targetStatus.InReplyToAccountID != "" { | 	if targetStatus.InReplyToAccountID != "" { | ||||||
| 		repliedToAccount := >smodel.Account{} | 		repliedToAccount := >smodel.Account{} | ||||||
|  | @ -1139,6 +1153,38 @@ func (ps *postgresService) WhoBoostedStatus(status *gtsmodel.Status) ([]*gtsmode | ||||||
| 	return accounts, nil | 	return accounts, nil | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
|  | func (ps *postgresService) GetStatusesWhereFollowing(accountID string, limit int, offsetStatusID string) ([]*gtsmodel.Status, error) { | ||||||
|  | 	statuses := []*gtsmodel.Status{} | ||||||
|  | 
 | ||||||
|  | 	q := ps.conn.Model(&statuses) | ||||||
|  | 
 | ||||||
|  | 	q = q.ColumnExpr("status.*"). | ||||||
|  | 		Join("JOIN follows AS f ON f.target_account_id = status.account_id"). | ||||||
|  | 		Where("f.account_id = ?", accountID). | ||||||
|  | 		Order("status.created_at DESC") | ||||||
|  | 
 | ||||||
|  | 	if offsetStatusID != "" { | ||||||
|  | 		s := >smodel.Status{} | ||||||
|  | 		if err := ps.conn.Model(s).Where("id = ?", offsetStatusID).Select(); err != nil { | ||||||
|  | 			return nil, err | ||||||
|  | 		} | ||||||
|  | 		q = q.Where("status.created_at < ?", s.CreatedAt) | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	if limit > 0 { | ||||||
|  | 		q = q.Limit(limit) | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	err := q.Select() | ||||||
|  | 	if err != nil { | ||||||
|  | 		if err != pg.ErrNoRows { | ||||||
|  | 			return nil, err | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	return statuses, nil | ||||||
|  | } | ||||||
|  | 
 | ||||||
| func (ps *postgresService) GetHomeTimelineForAccount(accountID string, maxID string, sinceID string, minID string, limit int, local bool) ([]*gtsmodel.Status, error) { | func (ps *postgresService) GetHomeTimelineForAccount(accountID string, maxID string, sinceID string, minID string, limit int, local bool) ([]*gtsmodel.Status, error) { | ||||||
| 	statuses := []*gtsmodel.Status{} | 	statuses := []*gtsmodel.Status{} | ||||||
| 
 | 
 | ||||||
|  |  | ||||||
|  | @ -153,6 +153,7 @@ type VisibilityAdvanced struct { | ||||||
| 
 | 
 | ||||||
| // RelevantAccounts denotes accounts that are replied to, boosted by, or mentioned in a status. | // RelevantAccounts denotes accounts that are replied to, boosted by, or mentioned in a status. | ||||||
| type RelevantAccounts struct { | type RelevantAccounts struct { | ||||||
|  | 	StatusAuthor          *Account | ||||||
| 	ReplyToAccount        *Account | 	ReplyToAccount        *Account | ||||||
| 	BoostedAccount        *Account | 	BoostedAccount        *Account | ||||||
| 	BoostedReplyToAccount *Account | 	BoostedReplyToAccount *Account | ||||||
|  |  | ||||||
|  | @ -31,6 +31,7 @@ import ( | ||||||
| 	"github.com/superseriousbusiness/gotosocial/internal/gtsmodel" | 	"github.com/superseriousbusiness/gotosocial/internal/gtsmodel" | ||||||
| 	"github.com/superseriousbusiness/gotosocial/internal/media" | 	"github.com/superseriousbusiness/gotosocial/internal/media" | ||||||
| 	"github.com/superseriousbusiness/gotosocial/internal/oauth" | 	"github.com/superseriousbusiness/gotosocial/internal/oauth" | ||||||
|  | 	"github.com/superseriousbusiness/gotosocial/internal/processing/timeline" | ||||||
| 	"github.com/superseriousbusiness/gotosocial/internal/typeutils" | 	"github.com/superseriousbusiness/gotosocial/internal/typeutils" | ||||||
| ) | ) | ||||||
| 
 | 
 | ||||||
|  | @ -132,9 +133,9 @@ type Processor interface { | ||||||
| 	StatusGetContext(authed *oauth.Auth, targetStatusID string) (*apimodel.Context, ErrorWithCode) | 	StatusGetContext(authed *oauth.Auth, targetStatusID string) (*apimodel.Context, ErrorWithCode) | ||||||
| 
 | 
 | ||||||
| 	// HomeTimelineGet returns statuses from the home timeline, with the given filters/parameters. | 	// HomeTimelineGet returns statuses from the home timeline, with the given filters/parameters. | ||||||
| 	HomeTimelineGet(authed *oauth.Auth, maxID string, sinceID string, minID string, limit int, local bool) ([]apimodel.Status, ErrorWithCode) | 	HomeTimelineGet(authed *oauth.Auth, maxID string, sinceID string, minID string, limit int, local bool) ([]*apimodel.Status, ErrorWithCode) | ||||||
| 	// PublicTimelineGet returns statuses from the public/local timeline, with the given filters/parameters. | 	// PublicTimelineGet returns statuses from the public/local timeline, with the given filters/parameters. | ||||||
| 	PublicTimelineGet(authed *oauth.Auth, maxID string, sinceID string, minID string, limit int, local bool) ([]apimodel.Status, ErrorWithCode) | 	PublicTimelineGet(authed *oauth.Auth, maxID string, sinceID string, minID string, limit int, local bool) ([]*apimodel.Status, ErrorWithCode) | ||||||
| 
 | 
 | ||||||
| 	/* | 	/* | ||||||
| 		FEDERATION API-FACING PROCESSING FUNCTIONS | 		FEDERATION API-FACING PROCESSING FUNCTIONS | ||||||
|  | @ -191,11 +192,12 @@ type processor struct { | ||||||
| 	oauthServer     oauth.Server | 	oauthServer     oauth.Server | ||||||
| 	mediaHandler    media.Handler | 	mediaHandler    media.Handler | ||||||
| 	storage         blob.Storage | 	storage         blob.Storage | ||||||
|  | 	timelineManager timeline.Manager | ||||||
| 	db              db.DB | 	db              db.DB | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| // NewProcessor returns a new Processor that uses the given federator and logger | // NewProcessor returns a new Processor that uses the given federator and logger | ||||||
| func NewProcessor(config *config.Config, tc typeutils.TypeConverter, federator federation.Federator, oauthServer oauth.Server, mediaHandler media.Handler, storage blob.Storage, db db.DB, log *logrus.Logger) Processor { | func NewProcessor(config *config.Config, tc typeutils.TypeConverter, federator federation.Federator, oauthServer oauth.Server, mediaHandler media.Handler, storage blob.Storage, timelineManager timeline.Manager, db db.DB, log *logrus.Logger) Processor { | ||||||
| 	return &processor{ | 	return &processor{ | ||||||
| 		// toClientAPI:   make(chan gtsmodel.ToClientAPI, 100), | 		// toClientAPI:   make(chan gtsmodel.ToClientAPI, 100), | ||||||
| 		fromClientAPI: make(chan gtsmodel.FromClientAPI, 100), | 		fromClientAPI: make(chan gtsmodel.FromClientAPI, 100), | ||||||
|  | @ -209,6 +211,7 @@ func NewProcessor(config *config.Config, tc typeutils.TypeConverter, federator f | ||||||
| 		oauthServer:     oauthServer, | 		oauthServer:     oauthServer, | ||||||
| 		mediaHandler:    mediaHandler, | 		mediaHandler:    mediaHandler, | ||||||
| 		storage:         storage, | 		storage:         storage, | ||||||
|  | 		timelineManager: timelineManager, | ||||||
| 		db:              db, | 		db:              db, | ||||||
| 	} | 	} | ||||||
| } | } | ||||||
|  | @ -250,7 +253,7 @@ func (p *processor) Start() error { | ||||||
| 			} | 			} | ||||||
| 		} | 		} | ||||||
| 	}() | 	}() | ||||||
| 	return nil | 	return p.initTimelines() | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| // Stop stops the processor cleanly, finishing handling any remaining messages before closing down. | // Stop stops the processor cleanly, finishing handling any remaining messages before closing down. | ||||||
|  |  | ||||||
|  | @ -20,28 +20,26 @@ package processing | ||||||
| 
 | 
 | ||||||
| import ( | import ( | ||||||
| 	"fmt" | 	"fmt" | ||||||
|  | 	"sync" | ||||||
| 
 | 
 | ||||||
|  | 	"github.com/sirupsen/logrus" | ||||||
| 	apimodel "github.com/superseriousbusiness/gotosocial/internal/api/model" | 	apimodel "github.com/superseriousbusiness/gotosocial/internal/api/model" | ||||||
| 	"github.com/superseriousbusiness/gotosocial/internal/db" | 	"github.com/superseriousbusiness/gotosocial/internal/db" | ||||||
| 	"github.com/superseriousbusiness/gotosocial/internal/gtsmodel" | 	"github.com/superseriousbusiness/gotosocial/internal/gtsmodel" | ||||||
| 	"github.com/superseriousbusiness/gotosocial/internal/oauth" | 	"github.com/superseriousbusiness/gotosocial/internal/oauth" | ||||||
| ) | ) | ||||||
| 
 | 
 | ||||||
| func (p *processor) HomeTimelineGet(authed *oauth.Auth, maxID string, sinceID string, minID string, limit int, local bool) ([]apimodel.Status, ErrorWithCode) { | func (p *processor) HomeTimelineGet(authed *oauth.Auth, maxID string, sinceID string, minID string, limit int, local bool) ([]*apimodel.Status, ErrorWithCode) { | ||||||
| 	statuses, err := p.db.GetHomeTimelineForAccount(authed.Account.ID, maxID, sinceID, minID, limit, local) | 
 | ||||||
|  | 	statuses, err := p.timelineManager.HomeTimeline(authed.Account.ID, maxID, sinceID, minID, limit, local) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		return nil, NewErrorInternalError(err) | 		return nil, NewErrorInternalError(err) | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	s, err := p.filterStatuses(authed, statuses) | 	return statuses, nil | ||||||
| 	if err != nil { |  | ||||||
| 		return nil, NewErrorInternalError(err) |  | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| 	return s, nil | func (p *processor) PublicTimelineGet(authed *oauth.Auth, maxID string, sinceID string, minID string, limit int, local bool) ([]*apimodel.Status, ErrorWithCode) { | ||||||
| } |  | ||||||
| 
 |  | ||||||
| func (p *processor) PublicTimelineGet(authed *oauth.Auth, maxID string, sinceID string, minID string, limit int, local bool) ([]apimodel.Status, ErrorWithCode) { |  | ||||||
| 	statuses, err := p.db.GetPublicTimelineForAccount(authed.Account.ID, maxID, sinceID, minID, limit, local) | 	statuses, err := p.db.GetPublicTimelineForAccount(authed.Account.ID, maxID, sinceID, minID, limit, local) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		return nil, NewErrorInternalError(err) | 		return nil, NewErrorInternalError(err) | ||||||
|  | @ -55,10 +53,10 @@ func (p *processor) PublicTimelineGet(authed *oauth.Auth, maxID string, sinceID | ||||||
| 	return s, nil | 	return s, nil | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func (p *processor) filterStatuses(authed *oauth.Auth, statuses []*gtsmodel.Status) ([]apimodel.Status, error) { | func (p *processor) filterStatuses(authed *oauth.Auth, statuses []*gtsmodel.Status) ([]*apimodel.Status, error) { | ||||||
| 	l := p.log.WithField("func", "filterStatuses") | 	l := p.log.WithField("func", "filterStatuses") | ||||||
| 
 | 
 | ||||||
| 	apiStatuses := []apimodel.Status{} | 	apiStatuses := []*apimodel.Status{} | ||||||
| 	for _, s := range statuses { | 	for _, s := range statuses { | ||||||
| 		targetAccount := >smodel.Account{} | 		targetAccount := >smodel.Account{} | ||||||
| 		if err := p.db.GetByID(s.AccountID, targetAccount); err != nil { | 		if err := p.db.GetByID(s.AccountID, targetAccount); err != nil { | ||||||
|  | @ -115,8 +113,111 @@ func (p *processor) filterStatuses(authed *oauth.Auth, statuses []*gtsmodel.Stat | ||||||
| 			continue | 			continue | ||||||
| 		} | 		} | ||||||
| 
 | 
 | ||||||
| 		apiStatuses = append(apiStatuses, *apiStatus) | 		apiStatuses = append(apiStatuses, apiStatus) | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	return apiStatuses, nil | 	return apiStatuses, nil | ||||||
| } | } | ||||||
|  | 
 | ||||||
|  | func (p *processor) initTimelines() error { | ||||||
|  | 	// get all local accounts (ie., domain = nil) that aren't suspended (suspended_at = nil) | ||||||
|  | 	localAccounts := []*gtsmodel.Account{} | ||||||
|  | 	where := []db.Where{ | ||||||
|  | 		{ | ||||||
|  | 			Key: "domain", Value: nil, | ||||||
|  | 		}, | ||||||
|  | 		{ | ||||||
|  | 			Key: "suspended_at", Value: nil, | ||||||
|  | 		}, | ||||||
|  | 	} | ||||||
|  | 	if err := p.db.GetWhere(where, &localAccounts); err != nil { | ||||||
|  | 		if _, ok := err.(db.ErrNoEntries); ok { | ||||||
|  | 			return nil | ||||||
|  | 		} | ||||||
|  | 		return fmt.Errorf("initTimelines: db error initializing timelines: %s", err) | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	// we want to wait until all timelines are populated so created a waitgroup here | ||||||
|  | 	wg := &sync.WaitGroup{} | ||||||
|  | 	wg.Add(len(localAccounts)) | ||||||
|  | 
 | ||||||
|  | 	for _, localAccount := range localAccounts { | ||||||
|  | 		// to save time we can populate the timelines asynchronously | ||||||
|  | 		// this will go heavy on the database, but since we're not actually serving yet it doesn't really matter | ||||||
|  | 		go p.initTimelineFor(localAccount, wg) | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	// wait for all timelines to be populated before we exit | ||||||
|  | 	wg.Wait() | ||||||
|  | 	return nil | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (p *processor) initTimelineFor(account *gtsmodel.Account, wg *sync.WaitGroup) { | ||||||
|  | 	defer wg.Done() | ||||||
|  | 
 | ||||||
|  | 	l := p.log.WithFields(logrus.Fields{ | ||||||
|  | 		"func":      "initTimelineFor", | ||||||
|  | 		"accountID": account.ID, | ||||||
|  | 	}) | ||||||
|  | 
 | ||||||
|  | 	desiredIndexLength := p.timelineManager.GetDesiredIndexLength() | ||||||
|  | 
 | ||||||
|  | 	statuses, err := p.db.GetStatusesWhereFollowing(account.ID, desiredIndexLength, "") | ||||||
|  | 	if err != nil { | ||||||
|  | 		l.Error(fmt.Errorf("initTimelineFor: error getting statuses: %s", err)) | ||||||
|  | 		return | ||||||
|  | 	} | ||||||
|  | 	p.indexAndIngest(statuses, account, desiredIndexLength) | ||||||
|  | 
 | ||||||
|  | 	lengthNow := p.timelineManager.GetIndexedLength(account.ID) | ||||||
|  | 	if lengthNow < desiredIndexLength { | ||||||
|  | 		// try and get more posts from the last ID onwards | ||||||
|  | 		rearmostStatusID, err := p.timelineManager.GetOldestIndexedID(account.ID) | ||||||
|  | 		if err != nil { | ||||||
|  | 			l.Error(fmt.Errorf("initTimelineFor: error getting id of rearmost status: %s", err)) | ||||||
|  | 			return | ||||||
|  | 		} | ||||||
|  | 
 | ||||||
|  | 		if rearmostStatusID != "" { | ||||||
|  | 			moreStatuses, err := p.db.GetStatusesWhereFollowing(account.ID, desiredIndexLength / 2, rearmostStatusID) | ||||||
|  | 			if err != nil { | ||||||
|  | 				l.Error(fmt.Errorf("initTimelineFor: error getting more statuses: %s", err)) | ||||||
|  | 				return | ||||||
|  | 			} | ||||||
|  | 			p.indexAndIngest(moreStatuses, account, desiredIndexLength) | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	l.Debugf("prepared timeline of length %d for account %s", lengthNow, account.ID) | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (p *processor) indexAndIngest(statuses []*gtsmodel.Status, timelineAccount *gtsmodel.Account, desiredIndexLength int) { | ||||||
|  | 	l := p.log.WithFields(logrus.Fields{ | ||||||
|  | 		"func":      "indexAndIngest", | ||||||
|  | 		"accountID": timelineAccount.ID, | ||||||
|  | 	}) | ||||||
|  | 
 | ||||||
|  | 	for _, s := range statuses { | ||||||
|  | 		relevantAccounts, err := p.db.PullRelevantAccountsFromStatus(s) | ||||||
|  | 		if err != nil { | ||||||
|  | 			l.Error(fmt.Errorf("initTimelineFor: error getting relevant accounts from status %s: %s", s.ID, err)) | ||||||
|  | 			continue | ||||||
|  | 		} | ||||||
|  | 		visible, err := p.db.StatusVisible(s, relevantAccounts.StatusAuthor, timelineAccount, relevantAccounts) | ||||||
|  | 		if err != nil { | ||||||
|  | 			l.Error(fmt.Errorf("initTimelineFor: error checking visibility of status %s: %s", s.ID, err)) | ||||||
|  | 			continue | ||||||
|  | 		} | ||||||
|  | 		if visible { | ||||||
|  | 			if err := p.timelineManager.Ingest(s, timelineAccount.ID); err != nil { | ||||||
|  | 				l.Error(fmt.Errorf("initTimelineFor: error ingesting status %s: %s", s.ID, err)) | ||||||
|  | 				continue | ||||||
|  | 			} | ||||||
|  | 
 | ||||||
|  | 			// check if we have enough posts now and return if we do | ||||||
|  | 			if p.timelineManager.GetIndexedLength(timelineAccount.ID) >= desiredIndexLength { | ||||||
|  | 				return | ||||||
|  | 			} | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  | } | ||||||
|  |  | ||||||
|  | @ -19,35 +19,158 @@ | ||||||
| package timeline | package timeline | ||||||
| 
 | 
 | ||||||
| import ( | import ( | ||||||
|  | 	"sync" | ||||||
|  | 
 | ||||||
|  | 	"github.com/sirupsen/logrus" | ||||||
| 	apimodel "github.com/superseriousbusiness/gotosocial/internal/api/model" | 	apimodel "github.com/superseriousbusiness/gotosocial/internal/api/model" | ||||||
| 	"github.com/superseriousbusiness/gotosocial/internal/config" | 	"github.com/superseriousbusiness/gotosocial/internal/config" | ||||||
| 	"github.com/superseriousbusiness/gotosocial/internal/db" | 	"github.com/superseriousbusiness/gotosocial/internal/db" | ||||||
| 	"github.com/superseriousbusiness/gotosocial/internal/gtsmodel" | 	"github.com/superseriousbusiness/gotosocial/internal/gtsmodel" | ||||||
|  | 	"github.com/superseriousbusiness/gotosocial/internal/typeutils" | ||||||
|  | ) | ||||||
|  | 
 | ||||||
|  | const ( | ||||||
|  | 	preparedPostsMinLength = 80 | ||||||
|  | 	desiredPostIndexLength = 400 | ||||||
| ) | ) | ||||||
| 
 | 
 | ||||||
| type Manager interface { | type Manager interface { | ||||||
| 	Ingest(status *gtsmodel.Status) error | 	Ingest(status *gtsmodel.Status, timelineAccountID string) error | ||||||
| 	HomeTimelineGet(account *gtsmodel.Account, maxID string, sinceID string, minID string, limit int, local bool) ([]apimodel.Status, error) | 	HomeTimeline(timelineAccountID string, maxID string, sinceID string, minID string, limit int, local bool) ([]*apimodel.Status, error) | ||||||
|  | 	GetIndexedLength(timelineAccountID string) int | ||||||
|  | 	GetDesiredIndexLength() int | ||||||
|  | 	GetOldestIndexedID(timelineAccountID string) (string, error) | ||||||
|  | 	PrepareXFromTop(timelineAccountID string, limit int) error | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func NewManager(db db.DB, config *config.Config) Manager { | func NewManager(db db.DB, tc typeutils.TypeConverter, config *config.Config, log *logrus.Logger) Manager { | ||||||
| 	return &manager{ | 	return &manager{ | ||||||
| 		accountTimelines: make(map[string]*timeline), | 		accountTimelines: sync.Map{}, | ||||||
| 		db:               db, | 		db:               db, | ||||||
|  | 		tc:               tc, | ||||||
| 		config:           config, | 		config:           config, | ||||||
|  | 		log:              log, | ||||||
| 	} | 	} | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| type manager struct { | type manager struct { | ||||||
| 	accountTimelines map[string]*timeline | 	accountTimelines sync.Map | ||||||
| 	db               db.DB | 	db               db.DB | ||||||
|  | 	tc               typeutils.TypeConverter | ||||||
| 	config           *config.Config | 	config           *config.Config | ||||||
|  | 	log              *logrus.Logger | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func (m *manager) Ingest(status *gtsmodel.Status) error { | func (m *manager) Ingest(status *gtsmodel.Status, timelineAccountID string) error { | ||||||
| 	return nil | 	l := m.log.WithFields(logrus.Fields{ | ||||||
|  | 		"func":              "Ingest", | ||||||
|  | 		"timelineAccountID": timelineAccountID, | ||||||
|  | 		"statusID":          status.ID, | ||||||
|  | 	}) | ||||||
|  | 
 | ||||||
|  | 	var t Timeline | ||||||
|  | 	i, ok := m.accountTimelines.Load(timelineAccountID) | ||||||
|  | 	if !ok { | ||||||
|  | 		t = NewTimeline(timelineAccountID, m.db, m.tc) | ||||||
|  | 		m.accountTimelines.Store(timelineAccountID, t) | ||||||
|  | 	} else { | ||||||
|  | 		t = i.(Timeline) | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| func (m *manager) HomeTimelineGet(account *gtsmodel.Account, maxID string, sinceID string, minID string, limit int, local bool) ([]apimodel.Status, error) { | 	l.Trace("ingesting status") | ||||||
| 	return nil, nil | 
 | ||||||
|  | 	return t.IndexOne(status.CreatedAt, status.ID) | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (m *manager) Remove(statusID string, timelineAccountID string) error { | ||||||
|  | 	l := m.log.WithFields(logrus.Fields{ | ||||||
|  | 		"func":              "Remove", | ||||||
|  | 		"timelineAccountID": timelineAccountID, | ||||||
|  | 		"statusID":          statusID, | ||||||
|  | 	}) | ||||||
|  | 
 | ||||||
|  | 	var t Timeline | ||||||
|  | 	i, ok := m.accountTimelines.Load(timelineAccountID) | ||||||
|  | 	if !ok { | ||||||
|  | 		t = NewTimeline(timelineAccountID, m.db, m.tc) | ||||||
|  | 		m.accountTimelines.Store(timelineAccountID, t) | ||||||
|  | 	} else { | ||||||
|  | 		t = i.(Timeline) | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	l.Trace("removing status") | ||||||
|  | 
 | ||||||
|  | 	return t.Remove(statusID) | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (m *manager) HomeTimeline(timelineAccountID string, maxID string, sinceID string, minID string, limit int, local bool) ([]*apimodel.Status, error) { | ||||||
|  | 	l := m.log.WithFields(logrus.Fields{ | ||||||
|  | 		"func":              "HomeTimelineGet", | ||||||
|  | 		"timelineAccountID": timelineAccountID, | ||||||
|  | 	}) | ||||||
|  | 
 | ||||||
|  | 	var t Timeline | ||||||
|  | 	i, ok := m.accountTimelines.Load(timelineAccountID) | ||||||
|  | 	if !ok { | ||||||
|  | 		t = NewTimeline(timelineAccountID, m.db, m.tc) | ||||||
|  | 		m.accountTimelines.Store(timelineAccountID, t) | ||||||
|  | 	} else { | ||||||
|  | 		t = i.(Timeline) | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	var err error | ||||||
|  | 	var statuses []*apimodel.Status | ||||||
|  | 	if maxID != "" { | ||||||
|  | 		statuses, err = t.GetXBehindID(limit, maxID) | ||||||
|  | 	} else { | ||||||
|  | 		statuses, err = t.GetXFromTop(limit) | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	if err != nil { | ||||||
|  | 		l.Errorf("error getting statuses: %s", err) | ||||||
|  | 	} | ||||||
|  | 	return statuses, nil | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (m *manager) GetIndexedLength(timelineAccountID string) int { | ||||||
|  | 	var t Timeline | ||||||
|  | 	i, ok := m.accountTimelines.Load(timelineAccountID) | ||||||
|  | 	if !ok { | ||||||
|  | 		t = NewTimeline(timelineAccountID, m.db, m.tc) | ||||||
|  | 		m.accountTimelines.Store(timelineAccountID, t) | ||||||
|  | 	} else { | ||||||
|  | 		t = i.(Timeline) | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	return t.PostIndexLength() | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (m *manager) GetDesiredIndexLength() int { | ||||||
|  | 	return desiredPostIndexLength | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (m *manager) GetOldestIndexedID(timelineAccountID string) (string, error) { | ||||||
|  | 	var t Timeline | ||||||
|  | 	i, ok := m.accountTimelines.Load(timelineAccountID) | ||||||
|  | 	if !ok { | ||||||
|  | 		t = NewTimeline(timelineAccountID, m.db, m.tc) | ||||||
|  | 		m.accountTimelines.Store(timelineAccountID, t) | ||||||
|  | 	} else { | ||||||
|  | 		t = i.(Timeline) | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	return t.OldestIndexedPostID() | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (m *manager) PrepareXFromTop(timelineAccountID string, limit int) error { | ||||||
|  | 	var t Timeline | ||||||
|  | 	i, ok := m.accountTimelines.Load(timelineAccountID) | ||||||
|  | 	if !ok { | ||||||
|  | 		t = NewTimeline(timelineAccountID, m.db, m.tc) | ||||||
|  | 		m.accountTimelines.Store(timelineAccountID, t) | ||||||
|  | 	} else { | ||||||
|  | 		t = i.(Timeline) | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	return t.PrepareXFromTop(limit) | ||||||
| } | } | ||||||
|  |  | ||||||
|  | @ -1 +0,0 @@ | ||||||
| package timeline |  | ||||||
							
								
								
									
										47
									
								
								internal/processing/timeline/postindex.go
									
										
									
									
									
										Normal file
									
								
							
							
						
						
									
										47
									
								
								internal/processing/timeline/postindex.go
									
										
									
									
									
										Normal file
									
								
							|  | @ -0,0 +1,47 @@ | ||||||
|  | package timeline | ||||||
|  | 
 | ||||||
|  | import ( | ||||||
|  | 	"container/list" | ||||||
|  | 	"errors" | ||||||
|  | 	"time" | ||||||
|  | ) | ||||||
|  | 
 | ||||||
|  | type postIndex struct { | ||||||
|  | 	data *list.List | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | type postIndexEntry struct { | ||||||
|  | 	createdAt time.Time | ||||||
|  | 	statusID  string | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (p *postIndex) index(i *postIndexEntry) error { | ||||||
|  | 
 | ||||||
|  | 	if p.data == nil { | ||||||
|  | 		p.data = &list.List{} | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	// if we have no entries yet, this is both the newest and oldest entry, so just put it in the front | ||||||
|  | 	if p.data.Len() == 0 { | ||||||
|  | 		p.data.PushFront(i) | ||||||
|  | 		return nil | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	// we need to iterate through the index to make sure we put this post in the appropriate place according to when it was created | ||||||
|  | 	for e := p.data.Front(); e != nil; e = e.Next() { | ||||||
|  | 		entry, ok := e.Value.(*postIndexEntry) | ||||||
|  | 		if !ok { | ||||||
|  | 			return errors.New("Remove: could not parse e as a postIndexEntry") | ||||||
|  | 		} | ||||||
|  | 
 | ||||||
|  | 		// if the post to index is newer than e, insert it before e in the list | ||||||
|  | 		if i.createdAt.After(entry.createdAt) { | ||||||
|  | 			p.data.InsertBefore(i, e) | ||||||
|  | 			return nil | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	// if we reach this point it's the oldest post we've seen so put it at the back | ||||||
|  | 	p.data.PushBack(i) | ||||||
|  | 	return nil | ||||||
|  | } | ||||||
							
								
								
									
										49
									
								
								internal/processing/timeline/preparedposts.go
									
										
									
									
									
										Normal file
									
								
							
							
						
						
									
										49
									
								
								internal/processing/timeline/preparedposts.go
									
										
									
									
									
										Normal file
									
								
							|  | @ -0,0 +1,49 @@ | ||||||
|  | package timeline | ||||||
|  | 
 | ||||||
|  | import ( | ||||||
|  | 	"container/list" | ||||||
|  | 	"errors" | ||||||
|  | 	"time" | ||||||
|  | 
 | ||||||
|  | 	apimodel "github.com/superseriousbusiness/gotosocial/internal/api/model" | ||||||
|  | ) | ||||||
|  | 
 | ||||||
|  | type preparedPosts struct { | ||||||
|  | 	data *list.List | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | type preparedPostsEntry struct { | ||||||
|  | 	createdAt time.Time | ||||||
|  | 	statusID  string | ||||||
|  | 	prepared  *apimodel.Status | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (p *preparedPosts) insertPrepared(i *preparedPostsEntry) error { | ||||||
|  | 	if p.data == nil { | ||||||
|  | 		p.data = &list.List{} | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	// if we have no entries yet, this is both the newest and oldest entry, so just put it in the front | ||||||
|  | 	if p.data.Len() == 0 { | ||||||
|  | 		p.data.PushFront(i) | ||||||
|  | 		return nil | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	// we need to iterate through the index to make sure we put this post in the appropriate place according to when it was created | ||||||
|  | 	for e := p.data.Front(); e != nil; e = e.Next() { | ||||||
|  | 		entry, ok := e.Value.(*preparedPostsEntry) | ||||||
|  | 		if !ok { | ||||||
|  | 			return errors.New("index: could not parse e as a preparedPostsEntry") | ||||||
|  | 		} | ||||||
|  | 
 | ||||||
|  | 		// if the post to index is newer than e, insert it before e in the list | ||||||
|  | 		if i.createdAt.After(entry.createdAt) { | ||||||
|  | 			p.data.InsertBefore(i, e) | ||||||
|  | 			return nil | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	// if we reach this point it's the oldest post we've seen so put it at the back | ||||||
|  | 	p.data.PushBack(i) | ||||||
|  | 	return nil | ||||||
|  | } | ||||||
|  | @ -1,56 +0,0 @@ | ||||||
| package timeline |  | ||||||
| 
 |  | ||||||
| import ( |  | ||||||
| 	"sort" |  | ||||||
| 	"sync" |  | ||||||
| ) |  | ||||||
| 
 |  | ||||||
| type sharedCache struct { |  | ||||||
| 	data      map[string]*post |  | ||||||
| 	maxLength int |  | ||||||
| 	*sync.Mutex |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| func newSharedCache(maxLength int) *sharedCache { |  | ||||||
| 	return &sharedCache{ |  | ||||||
| 		data:      make(map[string]*post), |  | ||||||
| 		maxLength: maxLength, |  | ||||||
| 	} |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| func (s *sharedCache) shrink() { |  | ||||||
| 	// check if the length is longer than max size |  | ||||||
| 	toRemove := len(s.data) - s.maxLength |  | ||||||
| 
 |  | ||||||
| 	if toRemove > 0 { |  | ||||||
| 		// we have stuff to remove so lock the map while we work |  | ||||||
| 		s.Lock() |  | ||||||
| 		defer s.Unlock() |  | ||||||
| 
 |  | ||||||
| 		// we need to time-sort the map to remove the oldest entries |  | ||||||
| 		// the below code gives us a slice of keys, arranged from newest to oldest |  | ||||||
| 		postSlice := make([]*post, 0, len(s.data)) |  | ||||||
| 		for _, v := range s.data { |  | ||||||
| 			postSlice = append(postSlice, v) |  | ||||||
| 		} |  | ||||||
| 		sort.Slice(postSlice, func(i int, j int) bool { |  | ||||||
| 			return postSlice[i].createdAt.After(postSlice[j].createdAt) |  | ||||||
| 		}) |  | ||||||
| 
 |  | ||||||
| 		// now for each entry we have to remove, delete the entry from the map by its status ID |  | ||||||
| 		for i := 0; i < toRemove; i = i + 1 { |  | ||||||
| 			statusID := postSlice[i].statusID |  | ||||||
| 			delete(s.data, statusID) |  | ||||||
| 		} |  | ||||||
| 	} |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| func (s *sharedCache) put(post *post) { |  | ||||||
| 	s.Lock() |  | ||||||
| 	defer s.Unlock() |  | ||||||
| 	s.data[post.statusID] = post |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| func (s *sharedCache) get(statusID string) *post { |  | ||||||
| 	return s.data[statusID] |  | ||||||
| } |  | ||||||
|  | @ -27,151 +27,351 @@ import ( | ||||||
| 
 | 
 | ||||||
| 	apimodel "github.com/superseriousbusiness/gotosocial/internal/api/model" | 	apimodel "github.com/superseriousbusiness/gotosocial/internal/api/model" | ||||||
| 	"github.com/superseriousbusiness/gotosocial/internal/db" | 	"github.com/superseriousbusiness/gotosocial/internal/db" | ||||||
|  | 	"github.com/superseriousbusiness/gotosocial/internal/gtsmodel" | ||||||
|  | 	"github.com/superseriousbusiness/gotosocial/internal/typeutils" | ||||||
| ) | ) | ||||||
| 
 | 
 | ||||||
| const ( | const ( | ||||||
| 	fromLatest             = "FROM_LATEST" | 	fromLatest             = "FROM_LATEST" | ||||||
| 	postIndexMinLength     = 200 | 	preparedPostsMaxLength = desiredPostIndexLength | ||||||
| 	postIndexMaxLength     = 400 |  | ||||||
| 	preparedPostsMaxLength = 400 |  | ||||||
| 	preparedPostsMinLength = 80 |  | ||||||
| ) | ) | ||||||
| 
 | 
 | ||||||
| type timeline struct { | type Timeline interface { | ||||||
| 	//  | 	// GetXFromTop returns x amount of posts from the top of the timeline, from newest to oldest. | ||||||
| 	postIndex     *list.List | 	GetXFromTop(amount int) ([]*apimodel.Status, error) | ||||||
| 	preparedPosts *list.List | 	// GetXFromTop returns x amount of posts from the given id onwards, from newest to oldest. | ||||||
| 	accountID     string | 	GetXBehindID(amount int, fromID string) ([]*apimodel.Status, error) | ||||||
| 	db            db.DB | 
 | ||||||
| 	*sync.Mutex | 	// IndexOne puts a status into the timeline at the appropriate place according to its 'createdAt' property. | ||||||
|  | 	IndexOne(statusCreatedAt time.Time, statusID string) error | ||||||
|  | 	// IndexMany instructs the timeline to index all the given posts. | ||||||
|  | 	IndexMany([]*apimodel.Status) error | ||||||
|  | 	// Remove removes a status from the timeline. | ||||||
|  | 	Remove(statusID string) error | ||||||
|  | 	// OldestIndexedPostID returns the id of the rearmost (ie., the oldest) indexed post, or an error if something goes wrong. | ||||||
|  | 	// If nothing goes wrong but there's no oldest post, an empty string will be returned so make sure to check for this. | ||||||
|  | 	OldestIndexedPostID() (string, error) | ||||||
|  | 
 | ||||||
|  | 	// PrepareXFromTop instructs the timeline to prepare x amount of posts from the top of the timeline. | ||||||
|  | 	PrepareXFromTop(amount int) error | ||||||
|  | 	// PrepareXFromIndex instrucst the timeline to prepare the next amount of entries for serialization, from index onwards. | ||||||
|  | 	PrepareXFromIndex(amount int, index int) error | ||||||
|  | 
 | ||||||
|  | 	// ActualPostIndexLength returns the actual length of the post index at this point in time. | ||||||
|  | 	PostIndexLength() int | ||||||
|  | 
 | ||||||
|  | 	// Reset instructs the timeline to reset to its base state -- cache only the minimum amount of posts. | ||||||
|  | 	Reset() error | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func newTimeline(accountID string, db db.DB, sharedCache *list.List) *timeline { | type timeline struct { | ||||||
|  | 	postIndex     *postIndex | ||||||
|  | 	preparedPosts *preparedPosts | ||||||
|  | 	accountID     string | ||||||
|  | 	account       *gtsmodel.Account | ||||||
|  | 	db            db.DB | ||||||
|  | 	tc            typeutils.TypeConverter | ||||||
|  | 	sync.Mutex | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func NewTimeline(accountID string, db db.DB, typeConverter typeutils.TypeConverter) Timeline { | ||||||
| 	return &timeline{ | 	return &timeline{ | ||||||
| 		postIndex:     list.New(), | 		postIndex:     &postIndex{}, | ||||||
| 		preparedPosts: list.New(), | 		preparedPosts: &preparedPosts{}, | ||||||
| 		accountID:     accountID, | 		accountID:     accountID, | ||||||
| 		db:            db, | 		db:            db, | ||||||
|  | 		tc:            typeConverter, | ||||||
| 	} | 	} | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func (t *timeline) prepareNextXFromID(amount int, fromID string) error { | func (t *timeline) PrepareXFromIndex(amount int, index int) error { | ||||||
| 	t.Lock() | 	t.Lock() | ||||||
| 	defer t.Unlock() | 	defer t.Unlock() | ||||||
| 
 | 
 | ||||||
| 	prepared := make([]*post, 0, amount) | 	var indexed int | ||||||
| 
 | 	var prepared int | ||||||
| 	// find the mark in the index -- we want x statuses after this | 	var preparing bool | ||||||
| 	var fromMark *list.Element | 	for e := t.postIndex.data.Front(); e != nil; e = e.Next() { | ||||||
| 	for e := t.postIndex.Front(); e != nil; e = e.Next() { | 		entry, ok := e.Value.(*postIndexEntry) | ||||||
| 		p, ok := e.Value.(*post) |  | ||||||
| 		if !ok { | 		if !ok { | ||||||
| 			return errors.New("could not convert interface to post") | 			return errors.New("PrepareXFromTop: could not parse e as a postIndexEntry") | ||||||
| 		} | 		} | ||||||
| 
 | 
 | ||||||
| 		if p.statusID == fromID { | 		if !preparing { | ||||||
| 			fromMark = e | 			// we haven't hit the index we need to prepare from yet | ||||||
|  | 			if indexed == index { | ||||||
|  | 				preparing = true | ||||||
|  | 			} | ||||||
|  | 			indexed = indexed + 1 | ||||||
|  | 			continue | ||||||
|  | 		} else { | ||||||
|  | 			if err := t.prepare(entry.statusID); err != nil { | ||||||
|  | 				return fmt.Errorf("PrepareXFromTop: error preparing status with id %s: %s", entry.statusID, err) | ||||||
|  | 			} | ||||||
|  | 			prepared = prepared + 1 | ||||||
|  | 			if prepared >= amount { | ||||||
|  | 				// we're done | ||||||
|  | 				fmt.Printf("\n\n\nprepared %d entries\n\n\n", prepared) | ||||||
|  | 				break | ||||||
|  | 			} | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	return nil | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (t *timeline) PrepareXFromTop(amount int) error { | ||||||
|  | 	t.Lock() | ||||||
|  | 	defer t.Unlock() | ||||||
|  | 
 | ||||||
|  | 	t.preparedPosts.data.Init() | ||||||
|  | 
 | ||||||
|  | 	var prepared int | ||||||
|  | 	for e := t.postIndex.data.Front(); e != nil; e = e.Next() { | ||||||
|  | 		entry, ok := e.Value.(*postIndexEntry) | ||||||
|  | 		if !ok { | ||||||
|  | 			return errors.New("PrepareXFromTop: could not parse e as a postIndexEntry") | ||||||
|  | 		} | ||||||
|  | 
 | ||||||
|  | 		if err := t.prepare(entry.statusID); err != nil { | ||||||
|  | 			return fmt.Errorf("PrepareXFromTop: error preparing status with id %s: %s", entry.statusID, err) | ||||||
|  | 		} | ||||||
|  | 
 | ||||||
|  | 		prepared = prepared + 1 | ||||||
|  | 		if prepared >= amount { | ||||||
|  | 			// we're done | ||||||
| 			break | 			break | ||||||
| 		} | 		} | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	if fromMark == nil { |  | ||||||
| 		// we can't find the given id in the index -_- |  | ||||||
| 		return fmt.Errorf("prepareNextXFromID: fromID %s not found in index", fromID) |  | ||||||
| 	} |  | ||||||
| 
 |  | ||||||
| 	for e := fromMark.Next(); e != nil; e = e.Next() { |  | ||||||
| 
 |  | ||||||
| 	} |  | ||||||
| 
 |  | ||||||
| 	return nil | 	return nil | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func (t *timeline) getXFromTop(amount int) ([]*apimodel.Status, error) { | func (t *timeline) GetXFromTop(amount int) ([]*apimodel.Status, error) { | ||||||
| 	statuses := []*apimodel.Status{} | 	// make a slice of statuses with the length we need to return | ||||||
| 	if amount == 0 { | 	statuses := make([]*apimodel.Status, 0, amount) | ||||||
| 		return statuses, nil | 
 | ||||||
|  | 	// if there are no prepared posts, just return the empty slice | ||||||
|  | 	if t.preparedPosts.data == nil { | ||||||
|  | 		t.preparedPosts.data = &list.List{} | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	if len(t.readyToGo) < amount { | 	// make sure we have enough posts prepared to return | ||||||
| 		if err := t.prepareNextXFromID(amount, fromLatest); err != nil { | 	if t.preparedPosts.data.Len() < amount { | ||||||
|  | 		if err := t.PrepareXFromTop(amount); err != nil { | ||||||
| 			return nil, err | 			return nil, err | ||||||
| 		} | 		} | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	return t.readyToGo[:amount], nil | 	// work through the prepared posts from the top and return | ||||||
|  | 	var served int | ||||||
|  | 	for e := t.preparedPosts.data.Front(); e != nil; e = e.Next() { | ||||||
|  | 		entry, ok := e.Value.(*preparedPostsEntry) | ||||||
|  | 		if !ok { | ||||||
|  | 			return nil, errors.New("GetXFromTop: could not parse e as a preparedPostsEntry") | ||||||
|  | 		} | ||||||
|  | 		statuses = append(statuses, entry.prepared) | ||||||
|  | 		served = served + 1 | ||||||
|  | 		if served >= amount { | ||||||
|  | 			break | ||||||
|  | 		} | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| // getXFromID gets x amount of posts in chronological order from the given ID onwards, NOT including the given id. |  | ||||||
| // The posts will be taken from the preparedPosts pile, unless nothing is ready to go. |  | ||||||
| func (t *timeline) getXFromID(amount int, fromID string) ([]*apimodel.Status, error) { |  | ||||||
| 	statuses := []*apimodel.Status{} |  | ||||||
| 	if amount == 0 || fromID == "" { |  | ||||||
| 	return statuses, nil | 	return statuses, nil | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| 	// get the position of the given id in the ready to go pile | func (t *timeline) GetXBehindID(amount int, fromID string) ([]*apimodel.Status, error) { | ||||||
| 	var indexOfID *int | 	// make a slice of statuses with the length we need to return | ||||||
| 	for i, s := range t.readyToGo { | 	statuses := make([]*apimodel.Status, 0, amount) | ||||||
| 		if s.ID == fromID { | 
 | ||||||
| 			indexOfID = &i | 	// if there are no prepared posts, just return the empty slice | ||||||
| 		} | 	if t.preparedPosts.data == nil { | ||||||
|  | 		t.preparedPosts.data = &list.List{} | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	// the status isn't in the ready to go pile so prepare it | 	// find the position of id | ||||||
| 	if indexOfID == nil { | 	var position int | ||||||
| 		if err := t.prepareNextXFromID(amount, fromID); err != nil { | 	for e := t.preparedPosts.data.Front(); e != nil; e = e.Next() { | ||||||
|  | 		entry, ok := e.Value.(*preparedPostsEntry) | ||||||
|  | 		if !ok { | ||||||
|  | 			return nil, errors.New("GetXBehindID: could not parse e as a preparedPostsEntry") | ||||||
|  | 		} | ||||||
|  | 		if entry.statusID == fromID { | ||||||
|  | 			fmt.Printf("\n\n\nfromid %s is at position %d\n\n\n", fromID, position) | ||||||
|  | 			break | ||||||
|  | 		} | ||||||
|  | 		position = position + 1 | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	// make sure we have enough posts prepared to return | ||||||
|  | 	if t.preparedPosts.data.Len() < amount+position { | ||||||
|  | 
 | ||||||
|  | 		if err := t.PrepareXFromIndex(amount, position); err != nil { | ||||||
| 			return nil, err | 			return nil, err | ||||||
| 		} | 		} | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	return nil, nil | 	// iterate through the modified list until we hit the fromID again | ||||||
|  | 	var serving bool | ||||||
|  | 	var served int | ||||||
|  | 	for e := t.preparedPosts.data.Front(); e != nil; e = e.Next() { | ||||||
|  | 		entry, ok := e.Value.(*preparedPostsEntry) | ||||||
|  | 		if !ok { | ||||||
|  | 			return nil, errors.New("GetXBehindID: could not parse e as a preparedPostsEntry") | ||||||
| 		} | 		} | ||||||
| 
 | 
 | ||||||
| func (t *timeline) insert(status *apimodel.Status) error { | 		if !serving { | ||||||
|  | 			// we're not serving yet but we might on the next time round if we hit our from id | ||||||
|  | 			if entry.statusID == fromID { | ||||||
|  | 				fmt.Printf("\n\n\nwe've hit fromid %s at position %d, will now serve\n\n\n", fromID, position) | ||||||
|  | 				serving = true | ||||||
|  | 				continue | ||||||
|  | 			} | ||||||
|  | 		} else { | ||||||
|  | 			// we're serving now! | ||||||
|  | 			statuses = append(statuses, entry.prepared) | ||||||
|  | 			served = served + 1 | ||||||
|  | 			if served >= amount { | ||||||
|  | 				break | ||||||
|  | 			} | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	return statuses, nil | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (t *timeline) IndexOne(statusCreatedAt time.Time, statusID string) error { | ||||||
| 	t.Lock() | 	t.Lock() | ||||||
| 	defer t.Unlock() | 	defer t.Unlock() | ||||||
| 
 | 
 | ||||||
| 	createdAt, err := time.Parse(time.RFC3339, status.CreatedAt) | 	postIndexEntry := &postIndexEntry{ | ||||||
| 	if err != nil { | 		createdAt: statusCreatedAt, | ||||||
| 		return fmt.Errorf("insert: could not parse time %s: %s", status.CreatedAt, err) | 		statusID:  statusID, | ||||||
|  | 	} | ||||||
|  | 	return t.postIndex.index(postIndexEntry) | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| 	newPost := &post{ | func (t *timeline) Remove(statusID string) error { | ||||||
| 		createdAt:  createdAt, | 	t.Lock() | ||||||
| 		statusID:   status.ID, | 	defer t.Unlock() | ||||||
| 		serialized: status, |  | ||||||
| 	} |  | ||||||
| 
 | 
 | ||||||
| 	if t.index == nil { | 	// remove the entry from the post index | ||||||
| 		t.index.PushFront(newPost) | 	for e := t.postIndex.data.Front(); e != nil; e = e.Next() { | ||||||
| 	} | 		entry, ok := e.Value.(*postIndexEntry) | ||||||
| 
 |  | ||||||
| 	for e := t.index.Front(); e != nil; e = e.Next() { |  | ||||||
| 		p, ok := e.Value.(*post) |  | ||||||
| 		if !ok { | 		if !ok { | ||||||
| 			return errors.New("could not convert interface to post") | 			return errors.New("Remove: could not parse e as a postIndexEntry") | ||||||
| 		} | 		} | ||||||
| 
 | 		if entry.statusID == statusID { | ||||||
| 		if newPost.createdAt.After(p.createdAt) { | 			t.postIndex.data.Remove(e) | ||||||
| 			// this is a newer post so insert it just before the post it's newer than | 			break // bail once we found it | ||||||
| 			t.index.InsertBefore(newPost, e) | 		} | ||||||
| 			return nil | 	} | ||||||
|  | 
 | ||||||
|  | 	// remove the entry from prepared posts | ||||||
|  | 	for e := t.preparedPosts.data.Front(); e != nil; e = e.Next() { | ||||||
|  | 		entry, ok := e.Value.(*preparedPostsEntry) | ||||||
|  | 		if !ok { | ||||||
|  | 			return errors.New("Remove: could not parse e as a preparedPostsEntry") | ||||||
|  | 		} | ||||||
|  | 		if entry.statusID == statusID { | ||||||
|  | 			t.preparedPosts.data.Remove(e) | ||||||
|  | 			break // bail once we found it | ||||||
| 		} | 		} | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	// if we haven't returned yet it's the oldest post we've seen so shove it at the back |  | ||||||
| 	t.index.PushBack(newPost) |  | ||||||
| 	return nil | 	return nil | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| type preparedPostsEntry struct { | func (t *timeline) IndexMany(statuses []*apimodel.Status) error { | ||||||
| 	createdAt  time.Time | 	t.Lock() | ||||||
| 	statusID   string | 	defer t.Unlock() | ||||||
| 	serialized *apimodel.Status | 
 | ||||||
|  | 	// add statuses to the index | ||||||
|  | 	for _, s := range statuses { | ||||||
|  | 		createdAt, err := time.Parse(s.CreatedAt, time.RFC3339) | ||||||
|  | 		if err != nil { | ||||||
|  | 			return fmt.Errorf("IndexMany: could not parse time %s on status id %s: %s", s.CreatedAt, s.ID, err) | ||||||
|  | 		} | ||||||
|  | 		postIndexEntry := &postIndexEntry{ | ||||||
|  | 			createdAt: createdAt, | ||||||
|  | 			statusID:  s.ID, | ||||||
|  | 		} | ||||||
|  | 		if err := t.postIndex.index(postIndexEntry); err != nil { | ||||||
|  | 			return err | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  | 	return nil | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| type postIndexEntry struct { | func (t *timeline) Reset() error { | ||||||
| 	createdAt  time.Time | 	return nil | ||||||
| 	statusID   string | } | ||||||
|  | 
 | ||||||
|  | func (t *timeline) PostIndexLength() int { | ||||||
|  | 	if t.postIndex == nil || t.postIndex.data == nil { | ||||||
|  | 		return 0 | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	return t.postIndex.data.Len() | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (t *timeline) OldestIndexedPostID() (string, error) { | ||||||
|  | 	var id string | ||||||
|  | 	if t.postIndex == nil || t.postIndex.data == nil { | ||||||
|  | 		return id, nil | ||||||
|  | 	} | ||||||
|  | 	e := t.postIndex.data.Back() | ||||||
|  | 
 | ||||||
|  | 	if e == nil { | ||||||
|  | 		return id, nil | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	entry, ok := e.Value.(*postIndexEntry) | ||||||
|  | 	if !ok { | ||||||
|  | 		return id, errors.New("OldestIndexedPostID: could not parse e as a postIndexEntry") | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	return entry.statusID, nil | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (t *timeline) prepare(statusID string) error { | ||||||
|  | 	gtsStatus := >smodel.Status{} | ||||||
|  | 	if err := t.db.GetByID(statusID, gtsStatus); err != nil { | ||||||
|  | 		return err | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	if t.account == nil { | ||||||
|  | 		timelineOwnerAccount := >smodel.Account{} | ||||||
|  | 		if err := t.db.GetByID(t.accountID, timelineOwnerAccount); err != nil { | ||||||
|  | 			return err | ||||||
|  | 		} | ||||||
|  | 		t.account = timelineOwnerAccount | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	relevantAccounts, err := t.db.PullRelevantAccountsFromStatus(gtsStatus) | ||||||
|  | 	if err != nil { | ||||||
|  | 		return err | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	var reblogOfStatus *gtsmodel.Status | ||||||
|  | 	if gtsStatus.BoostOfID != "" { | ||||||
|  | 		s := >smodel.Status{} | ||||||
|  | 		if err := t.db.GetByID(gtsStatus.BoostOfID, s); err != nil { | ||||||
|  | 			return err | ||||||
|  | 		} | ||||||
|  | 		reblogOfStatus = s | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	apiModelStatus, err := t.tc.StatusToMasto(gtsStatus, relevantAccounts.StatusAuthor, t.account, relevantAccounts.BoostedAccount, relevantAccounts.ReplyToAccount, reblogOfStatus) | ||||||
|  | 	if err != nil { | ||||||
|  | 		return err | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	preparedPostsEntry := &preparedPostsEntry{ | ||||||
|  | 		createdAt: gtsStatus.CreatedAt, | ||||||
|  | 		statusID:  statusID, | ||||||
|  | 		prepared:  apiModelStatus, | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	return t.preparedPosts.insertPrepared(preparedPostsEntry) | ||||||
| } | } | ||||||
|  |  | ||||||
|  | @ -27,5 +27,5 @@ import ( | ||||||
| 
 | 
 | ||||||
| // NewTestProcessor returns a Processor suitable for testing purposes | // NewTestProcessor returns a Processor suitable for testing purposes | ||||||
| func NewTestProcessor(db db.DB, storage blob.Storage, federator federation.Federator) processing.Processor { | func NewTestProcessor(db db.DB, storage blob.Storage, federator federation.Federator) processing.Processor { | ||||||
| 	return processing.NewProcessor(NewTestConfig(), NewTestTypeConverter(db), federator, NewTestOauthServer(db), NewTestMediaHandler(db, storage), storage, db, NewTestLog()) | 	return processing.NewProcessor(NewTestConfig(), NewTestTypeConverter(db), federator, NewTestOauthServer(db), NewTestMediaHandler(db, storage), storage, NewTestTimelineManager(db), db, NewTestLog()) | ||||||
| } | } | ||||||
|  |  | ||||||
							
								
								
									
										10
									
								
								testrig/timelinemanager.go
									
										
									
									
									
										Normal file
									
								
							
							
						
						
									
										10
									
								
								testrig/timelinemanager.go
									
										
									
									
									
										Normal file
									
								
							|  | @ -0,0 +1,10 @@ | ||||||
|  | package testrig | ||||||
|  | 
 | ||||||
|  | import ( | ||||||
|  | 	"github.com/superseriousbusiness/gotosocial/internal/db" | ||||||
|  | 	"github.com/superseriousbusiness/gotosocial/internal/processing/timeline" | ||||||
|  | ) | ||||||
|  | 
 | ||||||
|  | func NewTestTimelineManager(db db.DB) timeline.Manager { | ||||||
|  | 	return timeline.NewManager(db, NewTestTypeConverter(db), NewTestConfig(), NewTestLog()) | ||||||
|  | } | ||||||
		Loading…
	
	Add table
		Add a link
		
	
		Reference in a new issue