mirror of
				https://github.com/superseriousbusiness/gotosocial.git
				synced 2025-11-03 22:12:26 -06:00 
			
		
		
		
	[chore] ensure worker contexts have request ID (#2120)
This commit is contained in:
		
					parent
					
						
							
								815b5291e0
							
						
					
				
			
			
				commit
				
					
						71ed3616fd
					
				
			
		
					 4 changed files with 79 additions and 20 deletions
				
			
		
							
								
								
									
										55
									
								
								internal/gtscontext/wrap.go
									
										
									
									
									
										Normal file
									
								
							
							
						
						
									
										55
									
								
								internal/gtscontext/wrap.go
									
										
									
									
									
										Normal file
									
								
							| 
						 | 
					@ -0,0 +1,55 @@
 | 
				
			||||||
 | 
					// GoToSocial
 | 
				
			||||||
 | 
					// Copyright (C) GoToSocial Authors admin@gotosocial.org
 | 
				
			||||||
 | 
					// SPDX-License-Identifier: AGPL-3.0-or-later
 | 
				
			||||||
 | 
					//
 | 
				
			||||||
 | 
					// This program is free software: you can redistribute it and/or modify
 | 
				
			||||||
 | 
					// it under the terms of the GNU Affero General Public License as published by
 | 
				
			||||||
 | 
					// the Free Software Foundation, either version 3 of the License, or
 | 
				
			||||||
 | 
					// (at your option) any later version.
 | 
				
			||||||
 | 
					//
 | 
				
			||||||
 | 
					// This program is distributed in the hope that it will be useful,
 | 
				
			||||||
 | 
					// but WITHOUT ANY WARRANTY; without even the implied warranty of
 | 
				
			||||||
 | 
					// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 | 
				
			||||||
 | 
					// GNU Affero General Public License for more details.
 | 
				
			||||||
 | 
					//
 | 
				
			||||||
 | 
					// You should have received a copy of the GNU Affero General Public License
 | 
				
			||||||
 | 
					// along with this program.  If not, see <http://www.gnu.org/licenses/>.
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					package gtscontext
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					import (
 | 
				
			||||||
 | 
						"context"
 | 
				
			||||||
 | 
						"time"
 | 
				
			||||||
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// WithValues wraps 'ctx' to use its deadline, done channel and error, but use value store of 'values'.
 | 
				
			||||||
 | 
					func WithValues(ctx context.Context, values context.Context) context.Context {
 | 
				
			||||||
 | 
						if ctx == nil {
 | 
				
			||||||
 | 
							panic("nil base context")
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						return &wrapContext{
 | 
				
			||||||
 | 
							base: ctx,
 | 
				
			||||||
 | 
							vals: values,
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					type wrapContext struct {
 | 
				
			||||||
 | 
						base context.Context
 | 
				
			||||||
 | 
						vals context.Context
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (ctx *wrapContext) Deadline() (deadline time.Time, ok bool) {
 | 
				
			||||||
 | 
						return ctx.base.Deadline()
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (ctx *wrapContext) Done() <-chan struct{} {
 | 
				
			||||||
 | 
						return ctx.base.Done()
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (ctx *wrapContext) Err() error {
 | 
				
			||||||
 | 
						return ctx.base.Err()
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (ctx *wrapContext) Value(key any) any {
 | 
				
			||||||
 | 
						return ctx.vals.Value(key)
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
| 
						 | 
					@ -39,8 +39,8 @@ var (
 | 
				
			||||||
	base32enc = base32.NewEncoding("0123456789abcdefghjkmnpqrstvwxyz").WithPadding(-1)
 | 
						base32enc = base32.NewEncoding("0123456789abcdefghjkmnpqrstvwxyz").WithPadding(-1)
 | 
				
			||||||
)
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// generateID generates a new ID string.
 | 
					// NewRequestID generates a new request ID string.
 | 
				
			||||||
func generateID() string {
 | 
					func NewRequestID() string {
 | 
				
			||||||
	// 0:8  = timestamp
 | 
						// 0:8  = timestamp
 | 
				
			||||||
	// 8:12 = entropy
 | 
						// 8:12 = entropy
 | 
				
			||||||
	//
 | 
						//
 | 
				
			||||||
| 
						 | 
					@ -69,12 +69,10 @@ func AddRequestID(header string) gin.HandlerFunc {
 | 
				
			||||||
		// Have we found anything?
 | 
							// Have we found anything?
 | 
				
			||||||
		if id == "" {
 | 
							if id == "" {
 | 
				
			||||||
			// Generate new ID.
 | 
								// Generate new ID.
 | 
				
			||||||
			//
 | 
								id = NewRequestID()
 | 
				
			||||||
			// 0:8  = timestamp
 | 
					
 | 
				
			||||||
			// 8:12 = entropy
 | 
								// Set the request ID in the req header in case
 | 
				
			||||||
			id = generateID()
 | 
								// we pass the request along to another service.
 | 
				
			||||||
			// Set the request ID in the req header in case we pass the request along
 | 
					 | 
				
			||||||
			// to another service
 | 
					 | 
				
			||||||
			c.Request.Header.Set(header, id)
 | 
								c.Request.Header.Set(header, id)
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -25,6 +25,7 @@ import (
 | 
				
			||||||
	"codeberg.org/gruf/go-logger/v2/level"
 | 
						"codeberg.org/gruf/go-logger/v2/level"
 | 
				
			||||||
	"github.com/superseriousbusiness/gotosocial/internal/ap"
 | 
						"github.com/superseriousbusiness/gotosocial/internal/ap"
 | 
				
			||||||
	"github.com/superseriousbusiness/gotosocial/internal/db"
 | 
						"github.com/superseriousbusiness/gotosocial/internal/db"
 | 
				
			||||||
 | 
						"github.com/superseriousbusiness/gotosocial/internal/gtscontext"
 | 
				
			||||||
	"github.com/superseriousbusiness/gotosocial/internal/gtserror"
 | 
						"github.com/superseriousbusiness/gotosocial/internal/gtserror"
 | 
				
			||||||
	"github.com/superseriousbusiness/gotosocial/internal/gtsmodel"
 | 
						"github.com/superseriousbusiness/gotosocial/internal/gtsmodel"
 | 
				
			||||||
	"github.com/superseriousbusiness/gotosocial/internal/log"
 | 
						"github.com/superseriousbusiness/gotosocial/internal/log"
 | 
				
			||||||
| 
						 | 
					@ -46,13 +47,15 @@ type clientAPI struct {
 | 
				
			||||||
	account    *account.Processor
 | 
						account    *account.Processor
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (p *Processor) EnqueueClientAPI(ctx context.Context, msgs ...messages.FromClientAPI) {
 | 
					func (p *Processor) EnqueueClientAPI(cctx context.Context, msgs ...messages.FromClientAPI) {
 | 
				
			||||||
	log.Trace(ctx, "enqueuing")
 | 
						_ = p.workers.ClientAPI.MustEnqueueCtx(cctx, func(wctx context.Context) {
 | 
				
			||||||
	_ = p.workers.ClientAPI.MustEnqueueCtx(ctx, func(ctx context.Context) {
 | 
							// Copy caller ctx values to worker's.
 | 
				
			||||||
 | 
							wctx = gtscontext.WithValues(wctx, cctx)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
							// Process worker messages.
 | 
				
			||||||
		for _, msg := range msgs {
 | 
							for _, msg := range msgs {
 | 
				
			||||||
			log.Trace(ctx, "processing: %+v", msg)
 | 
								if err := p.ProcessFromClientAPI(wctx, msg); err != nil {
 | 
				
			||||||
			if err := p.ProcessFromClientAPI(ctx, msg); err != nil {
 | 
									log.Errorf(wctx, "error processing client API message: %v", err)
 | 
				
			||||||
				log.Errorf(ctx, "error processing client API message: %v", err)
 | 
					 | 
				
			||||||
			}
 | 
								}
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
	})
 | 
						})
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -24,6 +24,7 @@ import (
 | 
				
			||||||
	"codeberg.org/gruf/go-kv"
 | 
						"codeberg.org/gruf/go-kv"
 | 
				
			||||||
	"codeberg.org/gruf/go-logger/v2/level"
 | 
						"codeberg.org/gruf/go-logger/v2/level"
 | 
				
			||||||
	"github.com/superseriousbusiness/gotosocial/internal/ap"
 | 
						"github.com/superseriousbusiness/gotosocial/internal/ap"
 | 
				
			||||||
 | 
						"github.com/superseriousbusiness/gotosocial/internal/gtscontext"
 | 
				
			||||||
	"github.com/superseriousbusiness/gotosocial/internal/gtserror"
 | 
						"github.com/superseriousbusiness/gotosocial/internal/gtserror"
 | 
				
			||||||
	"github.com/superseriousbusiness/gotosocial/internal/gtsmodel"
 | 
						"github.com/superseriousbusiness/gotosocial/internal/gtsmodel"
 | 
				
			||||||
	"github.com/superseriousbusiness/gotosocial/internal/id"
 | 
						"github.com/superseriousbusiness/gotosocial/internal/id"
 | 
				
			||||||
| 
						 | 
					@ -44,13 +45,15 @@ type fediAPI struct {
 | 
				
			||||||
	account    *account.Processor
 | 
						account    *account.Processor
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (p *Processor) EnqueueFediAPI(ctx context.Context, msgs ...messages.FromFediAPI) {
 | 
					func (p *Processor) EnqueueFediAPI(cctx context.Context, msgs ...messages.FromFediAPI) {
 | 
				
			||||||
	log.Trace(ctx, "enqueuing")
 | 
						_ = p.workers.Federator.MustEnqueueCtx(cctx, func(wctx context.Context) {
 | 
				
			||||||
	_ = p.workers.Federator.MustEnqueueCtx(ctx, func(ctx context.Context) {
 | 
							// Copy caller ctx values to worker's.
 | 
				
			||||||
 | 
							wctx = gtscontext.WithValues(wctx, cctx)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
							// Process worker messages.
 | 
				
			||||||
		for _, msg := range msgs {
 | 
							for _, msg := range msgs {
 | 
				
			||||||
			log.Trace(ctx, "processing: %+v", msg)
 | 
								if err := p.ProcessFromFediAPI(wctx, msg); err != nil {
 | 
				
			||||||
			if err := p.ProcessFromFediAPI(ctx, msg); err != nil {
 | 
									log.Errorf(wctx, "error processing fedi API message: %v", err)
 | 
				
			||||||
				log.Errorf(ctx, "error processing fedi API message: %v", err)
 | 
					 | 
				
			||||||
			}
 | 
								}
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
	})
 | 
						})
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
		Loading…
	
	Add table
		Add a link
		
	
		Reference in a new issue