mirror of
				https://github.com/superseriousbusiness/gotosocial.git
				synced 2025-10-30 17:22:26 -05:00 
			
		
		
		
	
		
			
	
	
		
			379 lines
		
	
	
	
		
			8.4 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
		
		
			
		
	
	
			379 lines
		
	
	
	
		
			8.4 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
|  | package syslog | ||
|  | 
 | ||
|  | import ( | ||
|  | 	"bufio" | ||
|  | 	"crypto/tls" | ||
|  | 	"errors" | ||
|  | 	"net" | ||
|  | 	"strings" | ||
|  | 	"sync" | ||
|  | 	"time" | ||
|  | 
 | ||
|  | 	"gopkg.in/mcuadros/go-syslog.v2/format" | ||
|  | ) | ||
|  | 
 | ||
|  | var ( | ||
|  | 	RFC3164   = &format.RFC3164{}   // RFC3164: http://www.ietf.org/rfc/rfc3164.txt | ||
|  | 	RFC5424   = &format.RFC5424{}   // RFC5424: http://www.ietf.org/rfc/rfc5424.txt | ||
|  | 	RFC6587   = &format.RFC6587{}   // RFC6587: http://www.ietf.org/rfc/rfc6587.txt - octet counting variant | ||
|  | 	Automatic = &format.Automatic{} // Automatically identify the format | ||
|  | ) | ||
|  | 
 | ||
|  | const ( | ||
|  | 	datagramChannelBufferSize = 10 | ||
|  | 	datagramReadBufferSize    = 64 * 1024 | ||
|  | ) | ||
|  | 
 | ||
|  | // A function type which gets the TLS peer name from the connection. Can return | ||
|  | // ok=false to terminate the connection | ||
|  | type TlsPeerNameFunc func(tlsConn *tls.Conn) (tlsPeer string, ok bool) | ||
|  | 
 | ||
|  | type Server struct { | ||
|  | 	listeners               []net.Listener | ||
|  | 	connections             []net.PacketConn | ||
|  | 	wait                    sync.WaitGroup | ||
|  | 	doneTcp                 chan bool | ||
|  | 	datagramChannel         chan DatagramMessage | ||
|  | 	format                  format.Format | ||
|  | 	handler                 Handler | ||
|  | 	lastError               error | ||
|  | 	readTimeoutMilliseconds int64 | ||
|  | 	tlsPeerNameFunc         TlsPeerNameFunc | ||
|  | 	datagramPool            sync.Pool | ||
|  | } | ||
|  | 
 | ||
|  | //NewServer returns a new Server | ||
|  | func NewServer() *Server { | ||
|  | 	return &Server{tlsPeerNameFunc: defaultTlsPeerName, datagramPool: sync.Pool{ | ||
|  | 		New: func() interface{} { | ||
|  | 			return make([]byte, 65536) | ||
|  | 		}, | ||
|  | 	}} | ||
|  | } | ||
|  | 
 | ||
|  | //Sets the syslog format (RFC3164 or RFC5424 or RFC6587) | ||
|  | func (s *Server) SetFormat(f format.Format) { | ||
|  | 	s.format = f | ||
|  | } | ||
|  | 
 | ||
|  | //Sets the handler, this handler with receive every syslog entry | ||
|  | func (s *Server) SetHandler(handler Handler) { | ||
|  | 	s.handler = handler | ||
|  | } | ||
|  | 
 | ||
|  | //Sets the connection timeout for TCP connections, in milliseconds | ||
|  | func (s *Server) SetTimeout(millseconds int64) { | ||
|  | 	s.readTimeoutMilliseconds = millseconds | ||
|  | } | ||
|  | 
 | ||
|  | // Set the function that extracts a TLS peer name from the TLS connection | ||
|  | func (s *Server) SetTlsPeerNameFunc(tlsPeerNameFunc TlsPeerNameFunc) { | ||
|  | 	s.tlsPeerNameFunc = tlsPeerNameFunc | ||
|  | } | ||
|  | 
 | ||
|  | // Default TLS peer name function - returns the CN of the certificate | ||
|  | func defaultTlsPeerName(tlsConn *tls.Conn) (tlsPeer string, ok bool) { | ||
|  | 	state := tlsConn.ConnectionState() | ||
|  | 	if len(state.PeerCertificates) <= 0 { | ||
|  | 		return "", false | ||
|  | 	} | ||
|  | 	cn := state.PeerCertificates[0].Subject.CommonName | ||
|  | 	return cn, true | ||
|  | } | ||
|  | 
 | ||
|  | //Configure the server for listen on an UDP addr | ||
|  | func (s *Server) ListenUDP(addr string) error { | ||
|  | 	udpAddr, err := net.ResolveUDPAddr("udp", addr) | ||
|  | 	if err != nil { | ||
|  | 		return err | ||
|  | 	} | ||
|  | 
 | ||
|  | 	connection, err := net.ListenUDP("udp", udpAddr) | ||
|  | 	if err != nil { | ||
|  | 		return err | ||
|  | 	} | ||
|  | 	connection.SetReadBuffer(datagramReadBufferSize) | ||
|  | 
 | ||
|  | 	s.connections = append(s.connections, connection) | ||
|  | 	return nil | ||
|  | } | ||
|  | 
 | ||
|  | //Configure the server for listen on an unix socket | ||
|  | func (s *Server) ListenUnixgram(addr string) error { | ||
|  | 	unixAddr, err := net.ResolveUnixAddr("unixgram", addr) | ||
|  | 	if err != nil { | ||
|  | 		return err | ||
|  | 	} | ||
|  | 
 | ||
|  | 	connection, err := net.ListenUnixgram("unixgram", unixAddr) | ||
|  | 	if err != nil { | ||
|  | 		return err | ||
|  | 	} | ||
|  | 	connection.SetReadBuffer(datagramReadBufferSize) | ||
|  | 
 | ||
|  | 	s.connections = append(s.connections, connection) | ||
|  | 	return nil | ||
|  | } | ||
|  | 
 | ||
|  | //Configure the server for listen on a TCP addr | ||
|  | func (s *Server) ListenTCP(addr string) error { | ||
|  | 	tcpAddr, err := net.ResolveTCPAddr("tcp", addr) | ||
|  | 	if err != nil { | ||
|  | 		return err | ||
|  | 	} | ||
|  | 
 | ||
|  | 	listener, err := net.ListenTCP("tcp", tcpAddr) | ||
|  | 	if err != nil { | ||
|  | 		return err | ||
|  | 	} | ||
|  | 
 | ||
|  | 	s.doneTcp = make(chan bool) | ||
|  | 	s.listeners = append(s.listeners, listener) | ||
|  | 	return nil | ||
|  | } | ||
|  | 
 | ||
|  | //Configure the server for listen on a TCP addr for TLS | ||
|  | func (s *Server) ListenTCPTLS(addr string, config *tls.Config) error { | ||
|  | 	listener, err := tls.Listen("tcp", addr, config) | ||
|  | 	if err != nil { | ||
|  | 		return err | ||
|  | 	} | ||
|  | 
 | ||
|  | 	s.doneTcp = make(chan bool) | ||
|  | 	s.listeners = append(s.listeners, listener) | ||
|  | 	return nil | ||
|  | } | ||
|  | 
 | ||
|  | //Starts the server, all the go routines goes to live | ||
|  | func (s *Server) Boot() error { | ||
|  | 	if s.format == nil { | ||
|  | 		return errors.New("please set a valid format") | ||
|  | 	} | ||
|  | 
 | ||
|  | 	if s.handler == nil { | ||
|  | 		return errors.New("please set a valid handler") | ||
|  | 	} | ||
|  | 
 | ||
|  | 	for _, listener := range s.listeners { | ||
|  | 		s.goAcceptConnection(listener) | ||
|  | 	} | ||
|  | 
 | ||
|  | 	if len(s.connections) > 0 { | ||
|  | 		s.goParseDatagrams() | ||
|  | 	} | ||
|  | 
 | ||
|  | 	for _, connection := range s.connections { | ||
|  | 		s.goReceiveDatagrams(connection) | ||
|  | 	} | ||
|  | 
 | ||
|  | 	return nil | ||
|  | } | ||
|  | 
 | ||
|  | func (s *Server) goAcceptConnection(listener net.Listener) { | ||
|  | 	s.wait.Add(1) | ||
|  | 	go func(listener net.Listener) { | ||
|  | 	loop: | ||
|  | 		for { | ||
|  | 			select { | ||
|  | 			case <-s.doneTcp: | ||
|  | 				break loop | ||
|  | 			default: | ||
|  | 			} | ||
|  | 			connection, err := listener.Accept() | ||
|  | 			if err != nil { | ||
|  | 				continue | ||
|  | 			} | ||
|  | 
 | ||
|  | 			s.goScanConnection(connection) | ||
|  | 		} | ||
|  | 
 | ||
|  | 		s.wait.Done() | ||
|  | 	}(listener) | ||
|  | } | ||
|  | 
 | ||
|  | func (s *Server) goScanConnection(connection net.Conn) { | ||
|  | 	scanner := bufio.NewScanner(connection) | ||
|  | 	if sf := s.format.GetSplitFunc(); sf != nil { | ||
|  | 		scanner.Split(sf) | ||
|  | 	} | ||
|  | 
 | ||
|  | 	remoteAddr := connection.RemoteAddr() | ||
|  | 	var client string | ||
|  | 	if remoteAddr != nil { | ||
|  | 		client = remoteAddr.String() | ||
|  | 	} | ||
|  | 
 | ||
|  | 	tlsPeer := "" | ||
|  | 	if tlsConn, ok := connection.(*tls.Conn); ok { | ||
|  | 		// Handshake now so we get the TLS peer information | ||
|  | 		if err := tlsConn.Handshake(); err != nil { | ||
|  | 			connection.Close() | ||
|  | 			return | ||
|  | 		} | ||
|  | 		if s.tlsPeerNameFunc != nil { | ||
|  | 			var ok bool | ||
|  | 			tlsPeer, ok = s.tlsPeerNameFunc(tlsConn) | ||
|  | 			if !ok { | ||
|  | 				connection.Close() | ||
|  | 				return | ||
|  | 			} | ||
|  | 		} | ||
|  | 	} | ||
|  | 
 | ||
|  | 	var scanCloser *ScanCloser | ||
|  | 	scanCloser = &ScanCloser{scanner, connection} | ||
|  | 
 | ||
|  | 	s.wait.Add(1) | ||
|  | 	go s.scan(scanCloser, client, tlsPeer) | ||
|  | } | ||
|  | 
 | ||
|  | func (s *Server) scan(scanCloser *ScanCloser, client string, tlsPeer string) { | ||
|  | loop: | ||
|  | 	for { | ||
|  | 		select { | ||
|  | 		case <-s.doneTcp: | ||
|  | 			break loop | ||
|  | 		default: | ||
|  | 		} | ||
|  | 		if s.readTimeoutMilliseconds > 0 { | ||
|  | 			scanCloser.closer.SetReadDeadline(time.Now().Add(time.Duration(s.readTimeoutMilliseconds) * time.Millisecond)) | ||
|  | 		} | ||
|  | 		if scanCloser.Scan() { | ||
|  | 			s.parser([]byte(scanCloser.Text()), client, tlsPeer) | ||
|  | 		} else { | ||
|  | 			break loop | ||
|  | 		} | ||
|  | 	} | ||
|  | 	scanCloser.closer.Close() | ||
|  | 
 | ||
|  | 	s.wait.Done() | ||
|  | } | ||
|  | 
 | ||
|  | func (s *Server) parser(line []byte, client string, tlsPeer string) { | ||
|  | 	parser := s.format.GetParser(line) | ||
|  | 	err := parser.Parse() | ||
|  | 	if err != nil { | ||
|  | 		s.lastError = err | ||
|  | 	} | ||
|  | 
 | ||
|  | 	logParts := parser.Dump() | ||
|  | 	logParts["client"] = client | ||
|  | 	if logParts["hostname"] == "" && (s.format == RFC3164 || s.format == Automatic) { | ||
|  | 		if i := strings.Index(client, ":"); i > 1 { | ||
|  | 			logParts["hostname"] = client[:i] | ||
|  | 		} else { | ||
|  | 			logParts["hostname"] = client | ||
|  | 		} | ||
|  | 	} | ||
|  | 	logParts["tls_peer"] = tlsPeer | ||
|  | 
 | ||
|  | 	s.handler.Handle(logParts, int64(len(line)), err) | ||
|  | } | ||
|  | 
 | ||
|  | //Returns the last error | ||
|  | func (s *Server) GetLastError() error { | ||
|  | 	return s.lastError | ||
|  | } | ||
|  | 
 | ||
|  | //Kill the server | ||
|  | func (s *Server) Kill() error { | ||
|  | 	for _, connection := range s.connections { | ||
|  | 		err := connection.Close() | ||
|  | 		if err != nil { | ||
|  | 			return err | ||
|  | 		} | ||
|  | 	} | ||
|  | 
 | ||
|  | 	for _, listener := range s.listeners { | ||
|  | 		err := listener.Close() | ||
|  | 		if err != nil { | ||
|  | 			return err | ||
|  | 		} | ||
|  | 	} | ||
|  | 	// Only need to close channel once to broadcast to all waiting | ||
|  | 	if s.doneTcp != nil { | ||
|  | 		close(s.doneTcp) | ||
|  | 	} | ||
|  | 	if s.datagramChannel != nil { | ||
|  | 		close(s.datagramChannel) | ||
|  | 	} | ||
|  | 	return nil | ||
|  | } | ||
|  | 
 | ||
|  | //Waits until the server stops | ||
|  | func (s *Server) Wait() { | ||
|  | 	s.wait.Wait() | ||
|  | } | ||
|  | 
 | ||
|  | type TimeoutCloser interface { | ||
|  | 	Close() error | ||
|  | 	SetReadDeadline(t time.Time) error | ||
|  | } | ||
|  | 
 | ||
|  | type ScanCloser struct { | ||
|  | 	*bufio.Scanner | ||
|  | 	closer TimeoutCloser | ||
|  | } | ||
|  | 
 | ||
|  | type DatagramMessage struct { | ||
|  | 	message []byte | ||
|  | 	client  string | ||
|  | } | ||
|  | 
 | ||
|  | func (s *Server) goReceiveDatagrams(packetconn net.PacketConn) { | ||
|  | 	s.wait.Add(1) | ||
|  | 	go func() { | ||
|  | 		defer s.wait.Done() | ||
|  | 		for { | ||
|  | 			buf := s.datagramPool.Get().([]byte) | ||
|  | 			n, addr, err := packetconn.ReadFrom(buf) | ||
|  | 			if err == nil { | ||
|  | 				// Ignore trailing control characters and NULs | ||
|  | 				for ; (n > 0) && (buf[n-1] < 32); n-- { | ||
|  | 				} | ||
|  | 				if n > 0 { | ||
|  | 					var address string | ||
|  | 					if addr != nil { | ||
|  | 						address = addr.String() | ||
|  | 					} | ||
|  | 					s.datagramChannel <- DatagramMessage{buf[:n], address} | ||
|  | 				} | ||
|  | 			} else { | ||
|  | 				// there has been an error. Either the server has been killed | ||
|  | 				// or may be getting a transitory error due to (e.g.) the | ||
|  | 				// interface being shutdown in which case sleep() to avoid busy wait. | ||
|  | 				opError, ok := err.(*net.OpError) | ||
|  | 				if (ok) && !opError.Temporary() && !opError.Timeout() { | ||
|  | 					return | ||
|  | 				} | ||
|  | 				time.Sleep(10 * time.Millisecond) | ||
|  | 			} | ||
|  | 		} | ||
|  | 	}() | ||
|  | } | ||
|  | 
 | ||
|  | func (s *Server) goParseDatagrams() { | ||
|  | 	s.datagramChannel = make(chan DatagramMessage, datagramChannelBufferSize) | ||
|  | 
 | ||
|  | 	s.wait.Add(1) | ||
|  | 	go func() { | ||
|  | 		defer s.wait.Done() | ||
|  | 		for { | ||
|  | 			select { | ||
|  | 			case msg, ok := (<-s.datagramChannel): | ||
|  | 				if !ok { | ||
|  | 					return | ||
|  | 				} | ||
|  | 				if sf := s.format.GetSplitFunc(); sf != nil { | ||
|  | 					if _, token, err := sf(msg.message, true); err == nil { | ||
|  | 						s.parser(token, msg.client, "") | ||
|  | 					} | ||
|  | 				} else { | ||
|  | 					s.parser(msg.message, msg.client, "") | ||
|  | 				} | ||
|  | 				s.datagramPool.Put(msg.message[:cap(msg.message)]) | ||
|  | 			} | ||
|  | 		} | ||
|  | 	}() | ||
|  | } |