bumps our uptrace/bun dependencies to v1.2.10 (#3865)

This commit is contained in:
kim 2025-03-03 10:42:05 +00:00 committed by tobi
commit 12d4d36bc8
37 changed files with 518 additions and 225 deletions

126
vendor/github.com/uptrace/bun/db.go generated vendored
View file

@ -2,14 +2,13 @@ package bun
import (
"context"
"crypto/rand"
cryptorand "crypto/rand"
"database/sql"
"encoding/hex"
"fmt"
"reflect"
"strings"
"sync/atomic"
"time"
"github.com/uptrace/bun/dialect/feature"
"github.com/uptrace/bun/internal"
@ -41,6 +40,12 @@ func WithDiscardUnknownColumns() DBOption {
}
}
// ConnResolver enables routing queries to multiple databases.
type ConnResolver interface {
ResolveConn(query Query) IConn
Close() error
}
func WithConnResolver(resolver ConnResolver) DBOption {
return func(db *DB) {
db.resolver = resolver
@ -633,7 +638,7 @@ func (tx Tx) Begin() (Tx, error) {
func (tx Tx) BeginTx(ctx context.Context, _ *sql.TxOptions) (Tx, error) {
// mssql savepoint names are limited to 32 characters
sp := make([]byte, 14)
_, err := rand.Read(sp)
_, err := cryptorand.Read(sp)
if err != nil {
return Tx{}, err
}
@ -739,121 +744,6 @@ func (tx Tx) NewDropColumn() *DropColumnQuery {
return NewDropColumnQuery(tx.db).Conn(tx)
}
//------------------------------------------------------------------------------
func (db *DB) makeQueryBytes() []byte {
return internal.MakeQueryBytes()
}
//------------------------------------------------------------------------------
// ConnResolver enables routing queries to multiple databases.
type ConnResolver interface {
ResolveConn(query Query) IConn
Close() error
}
// TODO:
// - make monitoring interval configurable
// - make ping timeout configutable
// - allow adding read/write replicas for multi-master replication
type ReadWriteConnResolver struct {
replicas []*sql.DB // read-only replicas
healthyReplicas atomic.Pointer[[]*sql.DB]
nextReplica atomic.Int64
closed atomic.Bool
}
func NewReadWriteConnResolver(opts ...ReadWriteConnResolverOption) *ReadWriteConnResolver {
r := new(ReadWriteConnResolver)
for _, opt := range opts {
opt(r)
}
if len(r.replicas) > 0 {
r.healthyReplicas.Store(&r.replicas)
go r.monitor()
}
return r
}
type ReadWriteConnResolverOption func(r *ReadWriteConnResolver)
func WithReadOnlyReplica(dbs ...*sql.DB) ReadWriteConnResolverOption {
return func(r *ReadWriteConnResolver) {
r.replicas = append(r.replicas, dbs...)
}
}
func (r *ReadWriteConnResolver) Close() error {
if r.closed.Swap(true) {
return nil
}
var firstErr error
for _, db := range r.replicas {
if err := db.Close(); err != nil && firstErr == nil {
firstErr = err
}
}
return firstErr
}
// healthyReplica returns a random healthy replica.
func (r *ReadWriteConnResolver) ResolveConn(query Query) IConn {
if len(r.replicas) == 0 || !isReadOnlyQuery(query) {
return nil
}
replicas := r.loadHealthyReplicas()
if len(replicas) == 0 {
return nil
}
if len(replicas) == 1 {
return replicas[0]
}
i := r.nextReplica.Add(1)
return replicas[int(i)%len(replicas)]
}
func isReadOnlyQuery(query Query) bool {
sel, ok := query.(*SelectQuery)
if !ok {
return false
}
for _, el := range sel.with {
if !isReadOnlyQuery(el.query) {
return false
}
}
return true
}
func (r *ReadWriteConnResolver) loadHealthyReplicas() []*sql.DB {
if ptr := r.healthyReplicas.Load(); ptr != nil {
return *ptr
}
return nil
}
func (r *ReadWriteConnResolver) monitor() {
const interval = 5 * time.Second
for !r.closed.Load() {
healthy := make([]*sql.DB, 0, len(r.replicas))
for _, replica := range r.replicas {
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
err := replica.PingContext(ctx)
cancel()
if err == nil {
healthy = append(healthy, replica)
}
}
r.healthyReplicas.Store(&healthy)
time.Sleep(interval)
}
}