mirror of
				https://github.com/superseriousbusiness/gotosocial.git
				synced 2025-11-03 19:22:24 -06:00 
			
		
		
		
	* chore: update otel dependencies * refactor: combine tracing & metrics in observability package * chore: update example tracing compose file
		
			
				
	
	
		
			395 lines
		
	
	
	
		
			10 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			395 lines
		
	
	
	
		
			10 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
/*
 | 
						|
 *
 | 
						|
 * Copyright 2018 gRPC authors.
 | 
						|
 *
 | 
						|
 * Licensed under the Apache License, Version 2.0 (the "License");
 | 
						|
 * you may not use this file except in compliance with the License.
 | 
						|
 * You may obtain a copy of the License at
 | 
						|
 *
 | 
						|
 *     http://www.apache.org/licenses/LICENSE-2.0
 | 
						|
 *
 | 
						|
 * Unless required by applicable law or agreed to in writing, software
 | 
						|
 * distributed under the License is distributed on an "AS IS" BASIS,
 | 
						|
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
						|
 * See the License for the specific language governing permissions and
 | 
						|
 * limitations under the License.
 | 
						|
 *
 | 
						|
 */
 | 
						|
 | 
						|
package channelz
 | 
						|
 | 
						|
import (
 | 
						|
	"fmt"
 | 
						|
	"sort"
 | 
						|
	"sync"
 | 
						|
	"time"
 | 
						|
)
 | 
						|
 | 
						|
// entry represents a node in the channelz database.
 | 
						|
type entry interface {
 | 
						|
	// addChild adds a child e, whose channelz id is id to child list
 | 
						|
	addChild(id int64, e entry)
 | 
						|
	// deleteChild deletes a child with channelz id to be id from child list
 | 
						|
	deleteChild(id int64)
 | 
						|
	// triggerDelete tries to delete self from channelz database. However, if
 | 
						|
	// child list is not empty, then deletion from the database is on hold until
 | 
						|
	// the last child is deleted from database.
 | 
						|
	triggerDelete()
 | 
						|
	// deleteSelfIfReady check whether triggerDelete() has been called before,
 | 
						|
	// and whether child list is now empty. If both conditions are met, then
 | 
						|
	// delete self from database.
 | 
						|
	deleteSelfIfReady()
 | 
						|
	// getParentID returns parent ID of the entry. 0 value parent ID means no parent.
 | 
						|
	getParentID() int64
 | 
						|
	Entity
 | 
						|
}
 | 
						|
 | 
						|
// channelMap is the storage data structure for channelz.
 | 
						|
//
 | 
						|
// Methods of channelMap can be divided into two categories with respect to
 | 
						|
// locking.
 | 
						|
//
 | 
						|
// 1. Methods acquire the global lock.
 | 
						|
// 2. Methods that can only be called when global lock is held.
 | 
						|
//
 | 
						|
// A second type of method need always to be called inside a first type of method.
 | 
						|
type channelMap struct {
 | 
						|
	mu               sync.RWMutex
 | 
						|
	topLevelChannels map[int64]struct{}
 | 
						|
	channels         map[int64]*Channel
 | 
						|
	subChannels      map[int64]*SubChannel
 | 
						|
	sockets          map[int64]*Socket
 | 
						|
	servers          map[int64]*Server
 | 
						|
}
 | 
						|
 | 
						|
func newChannelMap() *channelMap {
 | 
						|
	return &channelMap{
 | 
						|
		topLevelChannels: make(map[int64]struct{}),
 | 
						|
		channels:         make(map[int64]*Channel),
 | 
						|
		subChannels:      make(map[int64]*SubChannel),
 | 
						|
		sockets:          make(map[int64]*Socket),
 | 
						|
		servers:          make(map[int64]*Server),
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
func (c *channelMap) addServer(id int64, s *Server) {
 | 
						|
	c.mu.Lock()
 | 
						|
	defer c.mu.Unlock()
 | 
						|
	s.cm = c
 | 
						|
	c.servers[id] = s
 | 
						|
}
 | 
						|
 | 
						|
func (c *channelMap) addChannel(id int64, cn *Channel, isTopChannel bool, pid int64) {
 | 
						|
	c.mu.Lock()
 | 
						|
	defer c.mu.Unlock()
 | 
						|
	cn.trace.cm = c
 | 
						|
	c.channels[id] = cn
 | 
						|
	if isTopChannel {
 | 
						|
		c.topLevelChannels[id] = struct{}{}
 | 
						|
	} else if p := c.channels[pid]; p != nil {
 | 
						|
		p.addChild(id, cn)
 | 
						|
	} else {
 | 
						|
		logger.Infof("channel %d references invalid parent ID %d", id, pid)
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
func (c *channelMap) addSubChannel(id int64, sc *SubChannel, pid int64) {
 | 
						|
	c.mu.Lock()
 | 
						|
	defer c.mu.Unlock()
 | 
						|
	sc.trace.cm = c
 | 
						|
	c.subChannels[id] = sc
 | 
						|
	if p := c.channels[pid]; p != nil {
 | 
						|
		p.addChild(id, sc)
 | 
						|
	} else {
 | 
						|
		logger.Infof("subchannel %d references invalid parent ID %d", id, pid)
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
func (c *channelMap) addSocket(s *Socket) {
 | 
						|
	c.mu.Lock()
 | 
						|
	defer c.mu.Unlock()
 | 
						|
	s.cm = c
 | 
						|
	c.sockets[s.ID] = s
 | 
						|
	if s.Parent == nil {
 | 
						|
		logger.Infof("normal socket %d has no parent", s.ID)
 | 
						|
	}
 | 
						|
	s.Parent.(entry).addChild(s.ID, s)
 | 
						|
}
 | 
						|
 | 
						|
// removeEntry triggers the removal of an entry, which may not indeed delete the
 | 
						|
// entry, if it has to wait on the deletion of its children and until no other
 | 
						|
// entity's channel trace references it.  It may lead to a chain of entry
 | 
						|
// deletion. For example, deleting the last socket of a gracefully shutting down
 | 
						|
// server will lead to the server being also deleted.
 | 
						|
func (c *channelMap) removeEntry(id int64) {
 | 
						|
	c.mu.Lock()
 | 
						|
	defer c.mu.Unlock()
 | 
						|
	c.findEntry(id).triggerDelete()
 | 
						|
}
 | 
						|
 | 
						|
// tracedChannel represents tracing operations which are present on both
 | 
						|
// channels and subChannels.
 | 
						|
type tracedChannel interface {
 | 
						|
	getChannelTrace() *ChannelTrace
 | 
						|
	incrTraceRefCount()
 | 
						|
	decrTraceRefCount()
 | 
						|
	getRefName() string
 | 
						|
}
 | 
						|
 | 
						|
// c.mu must be held by the caller
 | 
						|
func (c *channelMap) decrTraceRefCount(id int64) {
 | 
						|
	e := c.findEntry(id)
 | 
						|
	if v, ok := e.(tracedChannel); ok {
 | 
						|
		v.decrTraceRefCount()
 | 
						|
		e.deleteSelfIfReady()
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
// c.mu must be held by the caller.
 | 
						|
func (c *channelMap) findEntry(id int64) entry {
 | 
						|
	if v, ok := c.channels[id]; ok {
 | 
						|
		return v
 | 
						|
	}
 | 
						|
	if v, ok := c.subChannels[id]; ok {
 | 
						|
		return v
 | 
						|
	}
 | 
						|
	if v, ok := c.servers[id]; ok {
 | 
						|
		return v
 | 
						|
	}
 | 
						|
	if v, ok := c.sockets[id]; ok {
 | 
						|
		return v
 | 
						|
	}
 | 
						|
	return &dummyEntry{idNotFound: id}
 | 
						|
}
 | 
						|
 | 
						|
// c.mu must be held by the caller
 | 
						|
//
 | 
						|
// deleteEntry deletes an entry from the channelMap. Before calling this method,
 | 
						|
// caller must check this entry is ready to be deleted, i.e removeEntry() has
 | 
						|
// been called on it, and no children still exist.
 | 
						|
func (c *channelMap) deleteEntry(id int64) entry {
 | 
						|
	if v, ok := c.sockets[id]; ok {
 | 
						|
		delete(c.sockets, id)
 | 
						|
		return v
 | 
						|
	}
 | 
						|
	if v, ok := c.subChannels[id]; ok {
 | 
						|
		delete(c.subChannels, id)
 | 
						|
		return v
 | 
						|
	}
 | 
						|
	if v, ok := c.channels[id]; ok {
 | 
						|
		delete(c.channels, id)
 | 
						|
		delete(c.topLevelChannels, id)
 | 
						|
		return v
 | 
						|
	}
 | 
						|
	if v, ok := c.servers[id]; ok {
 | 
						|
		delete(c.servers, id)
 | 
						|
		return v
 | 
						|
	}
 | 
						|
	return &dummyEntry{idNotFound: id}
 | 
						|
}
 | 
						|
 | 
						|
func (c *channelMap) traceEvent(id int64, desc *TraceEvent) {
 | 
						|
	c.mu.Lock()
 | 
						|
	defer c.mu.Unlock()
 | 
						|
	child := c.findEntry(id)
 | 
						|
	childTC, ok := child.(tracedChannel)
 | 
						|
	if !ok {
 | 
						|
		return
 | 
						|
	}
 | 
						|
	childTC.getChannelTrace().append(&traceEvent{Desc: desc.Desc, Severity: desc.Severity, Timestamp: time.Now()})
 | 
						|
	if desc.Parent != nil {
 | 
						|
		parent := c.findEntry(child.getParentID())
 | 
						|
		var chanType RefChannelType
 | 
						|
		switch child.(type) {
 | 
						|
		case *Channel:
 | 
						|
			chanType = RefChannel
 | 
						|
		case *SubChannel:
 | 
						|
			chanType = RefSubChannel
 | 
						|
		}
 | 
						|
		if parentTC, ok := parent.(tracedChannel); ok {
 | 
						|
			parentTC.getChannelTrace().append(&traceEvent{
 | 
						|
				Desc:      desc.Parent.Desc,
 | 
						|
				Severity:  desc.Parent.Severity,
 | 
						|
				Timestamp: time.Now(),
 | 
						|
				RefID:     id,
 | 
						|
				RefName:   childTC.getRefName(),
 | 
						|
				RefType:   chanType,
 | 
						|
			})
 | 
						|
			childTC.incrTraceRefCount()
 | 
						|
		}
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
type int64Slice []int64
 | 
						|
 | 
						|
func (s int64Slice) Len() int           { return len(s) }
 | 
						|
func (s int64Slice) Swap(i, j int)      { s[i], s[j] = s[j], s[i] }
 | 
						|
func (s int64Slice) Less(i, j int) bool { return s[i] < s[j] }
 | 
						|
 | 
						|
func copyMap(m map[int64]string) map[int64]string {
 | 
						|
	n := make(map[int64]string)
 | 
						|
	for k, v := range m {
 | 
						|
		n[k] = v
 | 
						|
	}
 | 
						|
	return n
 | 
						|
}
 | 
						|
 | 
						|
func (c *channelMap) getTopChannels(id int64, maxResults int) ([]*Channel, bool) {
 | 
						|
	if maxResults <= 0 {
 | 
						|
		maxResults = EntriesPerPage
 | 
						|
	}
 | 
						|
	c.mu.RLock()
 | 
						|
	defer c.mu.RUnlock()
 | 
						|
	l := int64(len(c.topLevelChannels))
 | 
						|
	ids := make([]int64, 0, l)
 | 
						|
 | 
						|
	for k := range c.topLevelChannels {
 | 
						|
		ids = append(ids, k)
 | 
						|
	}
 | 
						|
	sort.Sort(int64Slice(ids))
 | 
						|
	idx := sort.Search(len(ids), func(i int) bool { return ids[i] >= id })
 | 
						|
	end := true
 | 
						|
	var t []*Channel
 | 
						|
	for _, v := range ids[idx:] {
 | 
						|
		if len(t) == maxResults {
 | 
						|
			end = false
 | 
						|
			break
 | 
						|
		}
 | 
						|
		if cn, ok := c.channels[v]; ok {
 | 
						|
			t = append(t, cn)
 | 
						|
		}
 | 
						|
	}
 | 
						|
	return t, end
 | 
						|
}
 | 
						|
 | 
						|
func (c *channelMap) getServers(id int64, maxResults int) ([]*Server, bool) {
 | 
						|
	if maxResults <= 0 {
 | 
						|
		maxResults = EntriesPerPage
 | 
						|
	}
 | 
						|
	c.mu.RLock()
 | 
						|
	defer c.mu.RUnlock()
 | 
						|
	ids := make([]int64, 0, len(c.servers))
 | 
						|
	for k := range c.servers {
 | 
						|
		ids = append(ids, k)
 | 
						|
	}
 | 
						|
	sort.Sort(int64Slice(ids))
 | 
						|
	idx := sort.Search(len(ids), func(i int) bool { return ids[i] >= id })
 | 
						|
	end := true
 | 
						|
	var s []*Server
 | 
						|
	for _, v := range ids[idx:] {
 | 
						|
		if len(s) == maxResults {
 | 
						|
			end = false
 | 
						|
			break
 | 
						|
		}
 | 
						|
		if svr, ok := c.servers[v]; ok {
 | 
						|
			s = append(s, svr)
 | 
						|
		}
 | 
						|
	}
 | 
						|
	return s, end
 | 
						|
}
 | 
						|
 | 
						|
func (c *channelMap) getServerSockets(id int64, startID int64, maxResults int) ([]*Socket, bool) {
 | 
						|
	if maxResults <= 0 {
 | 
						|
		maxResults = EntriesPerPage
 | 
						|
	}
 | 
						|
	c.mu.RLock()
 | 
						|
	defer c.mu.RUnlock()
 | 
						|
	svr, ok := c.servers[id]
 | 
						|
	if !ok {
 | 
						|
		// server with id doesn't exist.
 | 
						|
		return nil, true
 | 
						|
	}
 | 
						|
	svrskts := svr.sockets
 | 
						|
	ids := make([]int64, 0, len(svrskts))
 | 
						|
	sks := make([]*Socket, 0, min(len(svrskts), maxResults))
 | 
						|
	for k := range svrskts {
 | 
						|
		ids = append(ids, k)
 | 
						|
	}
 | 
						|
	sort.Sort(int64Slice(ids))
 | 
						|
	idx := sort.Search(len(ids), func(i int) bool { return ids[i] >= startID })
 | 
						|
	end := true
 | 
						|
	for _, v := range ids[idx:] {
 | 
						|
		if len(sks) == maxResults {
 | 
						|
			end = false
 | 
						|
			break
 | 
						|
		}
 | 
						|
		if ns, ok := c.sockets[v]; ok {
 | 
						|
			sks = append(sks, ns)
 | 
						|
		}
 | 
						|
	}
 | 
						|
	return sks, end
 | 
						|
}
 | 
						|
 | 
						|
func (c *channelMap) getChannel(id int64) *Channel {
 | 
						|
	c.mu.RLock()
 | 
						|
	defer c.mu.RUnlock()
 | 
						|
	return c.channels[id]
 | 
						|
}
 | 
						|
 | 
						|
func (c *channelMap) getSubChannel(id int64) *SubChannel {
 | 
						|
	c.mu.RLock()
 | 
						|
	defer c.mu.RUnlock()
 | 
						|
	return c.subChannels[id]
 | 
						|
}
 | 
						|
 | 
						|
func (c *channelMap) getSocket(id int64) *Socket {
 | 
						|
	c.mu.RLock()
 | 
						|
	defer c.mu.RUnlock()
 | 
						|
	return c.sockets[id]
 | 
						|
}
 | 
						|
 | 
						|
func (c *channelMap) getServer(id int64) *Server {
 | 
						|
	c.mu.RLock()
 | 
						|
	defer c.mu.RUnlock()
 | 
						|
	return c.servers[id]
 | 
						|
}
 | 
						|
 | 
						|
type dummyEntry struct {
 | 
						|
	// dummyEntry is a fake entry to handle entry not found case.
 | 
						|
	idNotFound int64
 | 
						|
	Entity
 | 
						|
}
 | 
						|
 | 
						|
func (d *dummyEntry) String() string {
 | 
						|
	return fmt.Sprintf("non-existent entity #%d", d.idNotFound)
 | 
						|
}
 | 
						|
 | 
						|
func (d *dummyEntry) ID() int64 { return d.idNotFound }
 | 
						|
 | 
						|
func (d *dummyEntry) addChild(id int64, e entry) {
 | 
						|
	// Note: It is possible for a normal program to reach here under race
 | 
						|
	// condition.  For example, there could be a race between ClientConn.Close()
 | 
						|
	// info being propagated to addrConn and http2Client. ClientConn.Close()
 | 
						|
	// cancel the context and result in http2Client to error. The error info is
 | 
						|
	// then caught by transport monitor and before addrConn.tearDown() is called
 | 
						|
	// in side ClientConn.Close(). Therefore, the addrConn will create a new
 | 
						|
	// transport. And when registering the new transport in channelz, its parent
 | 
						|
	// addrConn could have already been torn down and deleted from channelz
 | 
						|
	// tracking, and thus reach the code here.
 | 
						|
	logger.Infof("attempt to add child of type %T with id %d to a parent (id=%d) that doesn't currently exist", e, id, d.idNotFound)
 | 
						|
}
 | 
						|
 | 
						|
func (d *dummyEntry) deleteChild(id int64) {
 | 
						|
	// It is possible for a normal program to reach here under race condition.
 | 
						|
	// Refer to the example described in addChild().
 | 
						|
	logger.Infof("attempt to delete child with id %d from a parent (id=%d) that doesn't currently exist", id, d.idNotFound)
 | 
						|
}
 | 
						|
 | 
						|
func (d *dummyEntry) triggerDelete() {
 | 
						|
	logger.Warningf("attempt to delete an entry (id=%d) that doesn't currently exist", d.idNotFound)
 | 
						|
}
 | 
						|
 | 
						|
func (*dummyEntry) deleteSelfIfReady() {
 | 
						|
	// code should not reach here. deleteSelfIfReady is always called on an existing entry.
 | 
						|
}
 | 
						|
 | 
						|
func (*dummyEntry) getParentID() int64 {
 | 
						|
	return 0
 | 
						|
}
 | 
						|
 | 
						|
// Entity is implemented by all channelz types.
 | 
						|
type Entity interface {
 | 
						|
	isEntity()
 | 
						|
	fmt.Stringer
 | 
						|
	id() int64
 | 
						|
}
 |