mirror of
https://github.com/superseriousbusiness/gotosocial.git
synced 2025-11-02 01:12:27 -06:00
Add optional syslog logrus hook (#343)
* add optional syslog logrus hook * document syslog
This commit is contained in:
parent
909f801742
commit
c111b239f7
38 changed files with 2242 additions and 37 deletions
378
vendor/gopkg.in/mcuadros/go-syslog.v2/server.go
generated
vendored
Normal file
378
vendor/gopkg.in/mcuadros/go-syslog.v2/server.go
generated
vendored
Normal file
|
|
@ -0,0 +1,378 @@
|
|||
package syslog
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"crypto/tls"
|
||||
"errors"
|
||||
"net"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"gopkg.in/mcuadros/go-syslog.v2/format"
|
||||
)
|
||||
|
||||
var (
|
||||
RFC3164 = &format.RFC3164{} // RFC3164: http://www.ietf.org/rfc/rfc3164.txt
|
||||
RFC5424 = &format.RFC5424{} // RFC5424: http://www.ietf.org/rfc/rfc5424.txt
|
||||
RFC6587 = &format.RFC6587{} // RFC6587: http://www.ietf.org/rfc/rfc6587.txt - octet counting variant
|
||||
Automatic = &format.Automatic{} // Automatically identify the format
|
||||
)
|
||||
|
||||
const (
|
||||
datagramChannelBufferSize = 10
|
||||
datagramReadBufferSize = 64 * 1024
|
||||
)
|
||||
|
||||
// A function type which gets the TLS peer name from the connection. Can return
|
||||
// ok=false to terminate the connection
|
||||
type TlsPeerNameFunc func(tlsConn *tls.Conn) (tlsPeer string, ok bool)
|
||||
|
||||
type Server struct {
|
||||
listeners []net.Listener
|
||||
connections []net.PacketConn
|
||||
wait sync.WaitGroup
|
||||
doneTcp chan bool
|
||||
datagramChannel chan DatagramMessage
|
||||
format format.Format
|
||||
handler Handler
|
||||
lastError error
|
||||
readTimeoutMilliseconds int64
|
||||
tlsPeerNameFunc TlsPeerNameFunc
|
||||
datagramPool sync.Pool
|
||||
}
|
||||
|
||||
//NewServer returns a new Server
|
||||
func NewServer() *Server {
|
||||
return &Server{tlsPeerNameFunc: defaultTlsPeerName, datagramPool: sync.Pool{
|
||||
New: func() interface{} {
|
||||
return make([]byte, 65536)
|
||||
},
|
||||
}}
|
||||
}
|
||||
|
||||
//Sets the syslog format (RFC3164 or RFC5424 or RFC6587)
|
||||
func (s *Server) SetFormat(f format.Format) {
|
||||
s.format = f
|
||||
}
|
||||
|
||||
//Sets the handler, this handler with receive every syslog entry
|
||||
func (s *Server) SetHandler(handler Handler) {
|
||||
s.handler = handler
|
||||
}
|
||||
|
||||
//Sets the connection timeout for TCP connections, in milliseconds
|
||||
func (s *Server) SetTimeout(millseconds int64) {
|
||||
s.readTimeoutMilliseconds = millseconds
|
||||
}
|
||||
|
||||
// Set the function that extracts a TLS peer name from the TLS connection
|
||||
func (s *Server) SetTlsPeerNameFunc(tlsPeerNameFunc TlsPeerNameFunc) {
|
||||
s.tlsPeerNameFunc = tlsPeerNameFunc
|
||||
}
|
||||
|
||||
// Default TLS peer name function - returns the CN of the certificate
|
||||
func defaultTlsPeerName(tlsConn *tls.Conn) (tlsPeer string, ok bool) {
|
||||
state := tlsConn.ConnectionState()
|
||||
if len(state.PeerCertificates) <= 0 {
|
||||
return "", false
|
||||
}
|
||||
cn := state.PeerCertificates[0].Subject.CommonName
|
||||
return cn, true
|
||||
}
|
||||
|
||||
//Configure the server for listen on an UDP addr
|
||||
func (s *Server) ListenUDP(addr string) error {
|
||||
udpAddr, err := net.ResolveUDPAddr("udp", addr)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
connection, err := net.ListenUDP("udp", udpAddr)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
connection.SetReadBuffer(datagramReadBufferSize)
|
||||
|
||||
s.connections = append(s.connections, connection)
|
||||
return nil
|
||||
}
|
||||
|
||||
//Configure the server for listen on an unix socket
|
||||
func (s *Server) ListenUnixgram(addr string) error {
|
||||
unixAddr, err := net.ResolveUnixAddr("unixgram", addr)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
connection, err := net.ListenUnixgram("unixgram", unixAddr)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
connection.SetReadBuffer(datagramReadBufferSize)
|
||||
|
||||
s.connections = append(s.connections, connection)
|
||||
return nil
|
||||
}
|
||||
|
||||
//Configure the server for listen on a TCP addr
|
||||
func (s *Server) ListenTCP(addr string) error {
|
||||
tcpAddr, err := net.ResolveTCPAddr("tcp", addr)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
listener, err := net.ListenTCP("tcp", tcpAddr)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
s.doneTcp = make(chan bool)
|
||||
s.listeners = append(s.listeners, listener)
|
||||
return nil
|
||||
}
|
||||
|
||||
//Configure the server for listen on a TCP addr for TLS
|
||||
func (s *Server) ListenTCPTLS(addr string, config *tls.Config) error {
|
||||
listener, err := tls.Listen("tcp", addr, config)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
s.doneTcp = make(chan bool)
|
||||
s.listeners = append(s.listeners, listener)
|
||||
return nil
|
||||
}
|
||||
|
||||
//Starts the server, all the go routines goes to live
|
||||
func (s *Server) Boot() error {
|
||||
if s.format == nil {
|
||||
return errors.New("please set a valid format")
|
||||
}
|
||||
|
||||
if s.handler == nil {
|
||||
return errors.New("please set a valid handler")
|
||||
}
|
||||
|
||||
for _, listener := range s.listeners {
|
||||
s.goAcceptConnection(listener)
|
||||
}
|
||||
|
||||
if len(s.connections) > 0 {
|
||||
s.goParseDatagrams()
|
||||
}
|
||||
|
||||
for _, connection := range s.connections {
|
||||
s.goReceiveDatagrams(connection)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Server) goAcceptConnection(listener net.Listener) {
|
||||
s.wait.Add(1)
|
||||
go func(listener net.Listener) {
|
||||
loop:
|
||||
for {
|
||||
select {
|
||||
case <-s.doneTcp:
|
||||
break loop
|
||||
default:
|
||||
}
|
||||
connection, err := listener.Accept()
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
|
||||
s.goScanConnection(connection)
|
||||
}
|
||||
|
||||
s.wait.Done()
|
||||
}(listener)
|
||||
}
|
||||
|
||||
func (s *Server) goScanConnection(connection net.Conn) {
|
||||
scanner := bufio.NewScanner(connection)
|
||||
if sf := s.format.GetSplitFunc(); sf != nil {
|
||||
scanner.Split(sf)
|
||||
}
|
||||
|
||||
remoteAddr := connection.RemoteAddr()
|
||||
var client string
|
||||
if remoteAddr != nil {
|
||||
client = remoteAddr.String()
|
||||
}
|
||||
|
||||
tlsPeer := ""
|
||||
if tlsConn, ok := connection.(*tls.Conn); ok {
|
||||
// Handshake now so we get the TLS peer information
|
||||
if err := tlsConn.Handshake(); err != nil {
|
||||
connection.Close()
|
||||
return
|
||||
}
|
||||
if s.tlsPeerNameFunc != nil {
|
||||
var ok bool
|
||||
tlsPeer, ok = s.tlsPeerNameFunc(tlsConn)
|
||||
if !ok {
|
||||
connection.Close()
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
var scanCloser *ScanCloser
|
||||
scanCloser = &ScanCloser{scanner, connection}
|
||||
|
||||
s.wait.Add(1)
|
||||
go s.scan(scanCloser, client, tlsPeer)
|
||||
}
|
||||
|
||||
func (s *Server) scan(scanCloser *ScanCloser, client string, tlsPeer string) {
|
||||
loop:
|
||||
for {
|
||||
select {
|
||||
case <-s.doneTcp:
|
||||
break loop
|
||||
default:
|
||||
}
|
||||
if s.readTimeoutMilliseconds > 0 {
|
||||
scanCloser.closer.SetReadDeadline(time.Now().Add(time.Duration(s.readTimeoutMilliseconds) * time.Millisecond))
|
||||
}
|
||||
if scanCloser.Scan() {
|
||||
s.parser([]byte(scanCloser.Text()), client, tlsPeer)
|
||||
} else {
|
||||
break loop
|
||||
}
|
||||
}
|
||||
scanCloser.closer.Close()
|
||||
|
||||
s.wait.Done()
|
||||
}
|
||||
|
||||
func (s *Server) parser(line []byte, client string, tlsPeer string) {
|
||||
parser := s.format.GetParser(line)
|
||||
err := parser.Parse()
|
||||
if err != nil {
|
||||
s.lastError = err
|
||||
}
|
||||
|
||||
logParts := parser.Dump()
|
||||
logParts["client"] = client
|
||||
if logParts["hostname"] == "" && (s.format == RFC3164 || s.format == Automatic) {
|
||||
if i := strings.Index(client, ":"); i > 1 {
|
||||
logParts["hostname"] = client[:i]
|
||||
} else {
|
||||
logParts["hostname"] = client
|
||||
}
|
||||
}
|
||||
logParts["tls_peer"] = tlsPeer
|
||||
|
||||
s.handler.Handle(logParts, int64(len(line)), err)
|
||||
}
|
||||
|
||||
//Returns the last error
|
||||
func (s *Server) GetLastError() error {
|
||||
return s.lastError
|
||||
}
|
||||
|
||||
//Kill the server
|
||||
func (s *Server) Kill() error {
|
||||
for _, connection := range s.connections {
|
||||
err := connection.Close()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
for _, listener := range s.listeners {
|
||||
err := listener.Close()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
// Only need to close channel once to broadcast to all waiting
|
||||
if s.doneTcp != nil {
|
||||
close(s.doneTcp)
|
||||
}
|
||||
if s.datagramChannel != nil {
|
||||
close(s.datagramChannel)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
//Waits until the server stops
|
||||
func (s *Server) Wait() {
|
||||
s.wait.Wait()
|
||||
}
|
||||
|
||||
type TimeoutCloser interface {
|
||||
Close() error
|
||||
SetReadDeadline(t time.Time) error
|
||||
}
|
||||
|
||||
type ScanCloser struct {
|
||||
*bufio.Scanner
|
||||
closer TimeoutCloser
|
||||
}
|
||||
|
||||
type DatagramMessage struct {
|
||||
message []byte
|
||||
client string
|
||||
}
|
||||
|
||||
func (s *Server) goReceiveDatagrams(packetconn net.PacketConn) {
|
||||
s.wait.Add(1)
|
||||
go func() {
|
||||
defer s.wait.Done()
|
||||
for {
|
||||
buf := s.datagramPool.Get().([]byte)
|
||||
n, addr, err := packetconn.ReadFrom(buf)
|
||||
if err == nil {
|
||||
// Ignore trailing control characters and NULs
|
||||
for ; (n > 0) && (buf[n-1] < 32); n-- {
|
||||
}
|
||||
if n > 0 {
|
||||
var address string
|
||||
if addr != nil {
|
||||
address = addr.String()
|
||||
}
|
||||
s.datagramChannel <- DatagramMessage{buf[:n], address}
|
||||
}
|
||||
} else {
|
||||
// there has been an error. Either the server has been killed
|
||||
// or may be getting a transitory error due to (e.g.) the
|
||||
// interface being shutdown in which case sleep() to avoid busy wait.
|
||||
opError, ok := err.(*net.OpError)
|
||||
if (ok) && !opError.Temporary() && !opError.Timeout() {
|
||||
return
|
||||
}
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
func (s *Server) goParseDatagrams() {
|
||||
s.datagramChannel = make(chan DatagramMessage, datagramChannelBufferSize)
|
||||
|
||||
s.wait.Add(1)
|
||||
go func() {
|
||||
defer s.wait.Done()
|
||||
for {
|
||||
select {
|
||||
case msg, ok := (<-s.datagramChannel):
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
if sf := s.format.GetSplitFunc(); sf != nil {
|
||||
if _, token, err := sf(msg.message, true); err == nil {
|
||||
s.parser(token, msg.client, "")
|
||||
}
|
||||
} else {
|
||||
s.parser(msg.message, msg.client, "")
|
||||
}
|
||||
s.datagramPool.Put(msg.message[:cap(msg.message)])
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
Loading…
Add table
Add a link
Reference in a new issue