mirror of
				https://github.com/superseriousbusiness/gotosocial.git
				synced 2025-10-30 22:42:24 -05:00 
			
		
		
		
	[chore] ensure worker contexts have request ID (#2120)
This commit is contained in:
		
					parent
					
						
							
								815b5291e0
							
						
					
				
			
			
				commit
				
					
						e9c3663cce
					
				
			
		
					 4 changed files with 79 additions and 20 deletions
				
			
		
							
								
								
									
										55
									
								
								internal/gtscontext/wrap.go
									
										
									
									
									
										Normal file
									
								
							
							
						
						
									
										55
									
								
								internal/gtscontext/wrap.go
									
										
									
									
									
										Normal file
									
								
							|  | @ -0,0 +1,55 @@ | ||||||
|  | // GoToSocial | ||||||
|  | // Copyright (C) GoToSocial Authors admin@gotosocial.org | ||||||
|  | // SPDX-License-Identifier: AGPL-3.0-or-later | ||||||
|  | // | ||||||
|  | // This program is free software: you can redistribute it and/or modify | ||||||
|  | // it under the terms of the GNU Affero General Public License as published by | ||||||
|  | // the Free Software Foundation, either version 3 of the License, or | ||||||
|  | // (at your option) any later version. | ||||||
|  | // | ||||||
|  | // This program is distributed in the hope that it will be useful, | ||||||
|  | // but WITHOUT ANY WARRANTY; without even the implied warranty of | ||||||
|  | // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the | ||||||
|  | // GNU Affero General Public License for more details. | ||||||
|  | // | ||||||
|  | // You should have received a copy of the GNU Affero General Public License | ||||||
|  | // along with this program.  If not, see <http://www.gnu.org/licenses/>. | ||||||
|  | 
 | ||||||
|  | package gtscontext | ||||||
|  | 
 | ||||||
|  | import ( | ||||||
|  | 	"context" | ||||||
|  | 	"time" | ||||||
|  | ) | ||||||
|  | 
 | ||||||
|  | // WithValues wraps 'ctx' to use its deadline, done channel and error, but use value store of 'values'. | ||||||
|  | func WithValues(ctx context.Context, values context.Context) context.Context { | ||||||
|  | 	if ctx == nil { | ||||||
|  | 		panic("nil base context") | ||||||
|  | 	} | ||||||
|  | 	return &wrapContext{ | ||||||
|  | 		base: ctx, | ||||||
|  | 		vals: values, | ||||||
|  | 	} | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | type wrapContext struct { | ||||||
|  | 	base context.Context | ||||||
|  | 	vals context.Context | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (ctx *wrapContext) Deadline() (deadline time.Time, ok bool) { | ||||||
|  | 	return ctx.base.Deadline() | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (ctx *wrapContext) Done() <-chan struct{} { | ||||||
|  | 	return ctx.base.Done() | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (ctx *wrapContext) Err() error { | ||||||
|  | 	return ctx.base.Err() | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (ctx *wrapContext) Value(key any) any { | ||||||
|  | 	return ctx.vals.Value(key) | ||||||
|  | } | ||||||
|  | @ -39,8 +39,8 @@ var ( | ||||||
| 	base32enc = base32.NewEncoding("0123456789abcdefghjkmnpqrstvwxyz").WithPadding(-1) | 	base32enc = base32.NewEncoding("0123456789abcdefghjkmnpqrstvwxyz").WithPadding(-1) | ||||||
| ) | ) | ||||||
| 
 | 
 | ||||||
| // generateID generates a new ID string. | // NewRequestID generates a new request ID string. | ||||||
| func generateID() string { | func NewRequestID() string { | ||||||
| 	// 0:8  = timestamp | 	// 0:8  = timestamp | ||||||
| 	// 8:12 = entropy | 	// 8:12 = entropy | ||||||
| 	// | 	// | ||||||
|  | @ -69,12 +69,10 @@ func AddRequestID(header string) gin.HandlerFunc { | ||||||
| 		// Have we found anything? | 		// Have we found anything? | ||||||
| 		if id == "" { | 		if id == "" { | ||||||
| 			// Generate new ID. | 			// Generate new ID. | ||||||
| 			// | 			id = NewRequestID() | ||||||
| 			// 0:8  = timestamp | 
 | ||||||
| 			// 8:12 = entropy | 			// Set the request ID in the req header in case | ||||||
| 			id = generateID() | 			// we pass the request along to another service. | ||||||
| 			// Set the request ID in the req header in case we pass the request along |  | ||||||
| 			// to another service |  | ||||||
| 			c.Request.Header.Set(header, id) | 			c.Request.Header.Set(header, id) | ||||||
| 		} | 		} | ||||||
| 
 | 
 | ||||||
|  |  | ||||||
|  | @ -25,6 +25,7 @@ import ( | ||||||
| 	"codeberg.org/gruf/go-logger/v2/level" | 	"codeberg.org/gruf/go-logger/v2/level" | ||||||
| 	"github.com/superseriousbusiness/gotosocial/internal/ap" | 	"github.com/superseriousbusiness/gotosocial/internal/ap" | ||||||
| 	"github.com/superseriousbusiness/gotosocial/internal/db" | 	"github.com/superseriousbusiness/gotosocial/internal/db" | ||||||
|  | 	"github.com/superseriousbusiness/gotosocial/internal/gtscontext" | ||||||
| 	"github.com/superseriousbusiness/gotosocial/internal/gtserror" | 	"github.com/superseriousbusiness/gotosocial/internal/gtserror" | ||||||
| 	"github.com/superseriousbusiness/gotosocial/internal/gtsmodel" | 	"github.com/superseriousbusiness/gotosocial/internal/gtsmodel" | ||||||
| 	"github.com/superseriousbusiness/gotosocial/internal/log" | 	"github.com/superseriousbusiness/gotosocial/internal/log" | ||||||
|  | @ -46,13 +47,15 @@ type clientAPI struct { | ||||||
| 	account    *account.Processor | 	account    *account.Processor | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func (p *Processor) EnqueueClientAPI(ctx context.Context, msgs ...messages.FromClientAPI) { | func (p *Processor) EnqueueClientAPI(cctx context.Context, msgs ...messages.FromClientAPI) { | ||||||
| 	log.Trace(ctx, "enqueuing") | 	_ = p.workers.ClientAPI.MustEnqueueCtx(cctx, func(wctx context.Context) { | ||||||
| 	_ = p.workers.ClientAPI.MustEnqueueCtx(ctx, func(ctx context.Context) { | 		// Copy caller ctx values to worker's. | ||||||
|  | 		wctx = gtscontext.WithValues(wctx, cctx) | ||||||
|  | 
 | ||||||
|  | 		// Process worker messages. | ||||||
| 		for _, msg := range msgs { | 		for _, msg := range msgs { | ||||||
| 			log.Trace(ctx, "processing: %+v", msg) | 			if err := p.ProcessFromClientAPI(wctx, msg); err != nil { | ||||||
| 			if err := p.ProcessFromClientAPI(ctx, msg); err != nil { | 				log.Errorf(wctx, "error processing client API message: %v", err) | ||||||
| 				log.Errorf(ctx, "error processing client API message: %v", err) |  | ||||||
| 			} | 			} | ||||||
| 		} | 		} | ||||||
| 	}) | 	}) | ||||||
|  |  | ||||||
|  | @ -24,6 +24,7 @@ import ( | ||||||
| 	"codeberg.org/gruf/go-kv" | 	"codeberg.org/gruf/go-kv" | ||||||
| 	"codeberg.org/gruf/go-logger/v2/level" | 	"codeberg.org/gruf/go-logger/v2/level" | ||||||
| 	"github.com/superseriousbusiness/gotosocial/internal/ap" | 	"github.com/superseriousbusiness/gotosocial/internal/ap" | ||||||
|  | 	"github.com/superseriousbusiness/gotosocial/internal/gtscontext" | ||||||
| 	"github.com/superseriousbusiness/gotosocial/internal/gtserror" | 	"github.com/superseriousbusiness/gotosocial/internal/gtserror" | ||||||
| 	"github.com/superseriousbusiness/gotosocial/internal/gtsmodel" | 	"github.com/superseriousbusiness/gotosocial/internal/gtsmodel" | ||||||
| 	"github.com/superseriousbusiness/gotosocial/internal/id" | 	"github.com/superseriousbusiness/gotosocial/internal/id" | ||||||
|  | @ -44,13 +45,15 @@ type fediAPI struct { | ||||||
| 	account    *account.Processor | 	account    *account.Processor | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func (p *Processor) EnqueueFediAPI(ctx context.Context, msgs ...messages.FromFediAPI) { | func (p *Processor) EnqueueFediAPI(cctx context.Context, msgs ...messages.FromFediAPI) { | ||||||
| 	log.Trace(ctx, "enqueuing") | 	_ = p.workers.Federator.MustEnqueueCtx(cctx, func(wctx context.Context) { | ||||||
| 	_ = p.workers.Federator.MustEnqueueCtx(ctx, func(ctx context.Context) { | 		// Copy caller ctx values to worker's. | ||||||
|  | 		wctx = gtscontext.WithValues(wctx, cctx) | ||||||
|  | 
 | ||||||
|  | 		// Process worker messages. | ||||||
| 		for _, msg := range msgs { | 		for _, msg := range msgs { | ||||||
| 			log.Trace(ctx, "processing: %+v", msg) | 			if err := p.ProcessFromFediAPI(wctx, msg); err != nil { | ||||||
| 			if err := p.ProcessFromFediAPI(ctx, msg); err != nil { | 				log.Errorf(wctx, "error processing fedi API message: %v", err) | ||||||
| 				log.Errorf(ctx, "error processing fedi API message: %v", err) |  | ||||||
| 			} | 			} | ||||||
| 		} | 		} | ||||||
| 	}) | 	}) | ||||||
|  |  | ||||||
		Loading…
	
	Add table
		Add a link
		
	
		Reference in a new issue