aboutsummaryrefslogtreecommitdiffstats
path: root/logging/host_linux.go
diff options
context:
space:
mode:
Diffstat (limited to 'logging/host_linux.go')
-rw-r--r--logging/host_linux.go211
1 files changed, 211 insertions, 0 deletions
diff --git a/logging/host_linux.go b/logging/host_linux.go
new file mode 100644
index 0000000..8617cd0
--- /dev/null
+++ b/logging/host_linux.go
@@ -0,0 +1,211 @@
+//go:build linux
+
+package logging
+
+import (
+ "bytes"
+ "context"
+ "encoding/binary"
+ "fmt"
+ "log/slog"
+ "net"
+ "os"
+ "path/filepath"
+ "runtime"
+ "strconv"
+ "strings"
+ "sync"
+ "syscall"
+
+ netkit "go.sudomsg.com/kit/net"
+ "golang.org/x/sys/unix"
+)
+
+type journalSink struct {
+ Level slog.Level
+ Tag string
+ Conn net.Conn
+ Encoder Encoder
+ mu sync.Mutex
+ ReplaceAttr func(groups []string, a slog.Attr) slog.Attr
+}
+
+var _ Sink = &journalSink{}
+
+func newJournalSink(ctx context.Context, level slog.Level, tag string, encoder Encoder, replaceAttr func(groups []string, a slog.Attr) slog.Attr) (*journalSink, error) {
+ conn, err := netkit.Dial(ctx, netkit.NetUnixDatagram, "/run/systemd/journal/socket")
+ if err != nil {
+ return nil, fmt.Errorf("failed to connect to journald: %w", err)
+ }
+
+ if encoder == nil {
+ encoder = MessageEncoder
+ }
+
+ if tag == "" {
+ tag = filepath.Base(os.Args[0])
+ }
+
+ if tag == "" {
+ tag = "kit"
+ }
+
+ return &journalSink{
+ Level: level,
+ Tag: tag,
+ Conn: conn,
+ Encoder: encoder,
+ ReplaceAttr: replaceAttr,
+ }, nil
+}
+
+func (s *journalSink) Enabled(level slog.Level) bool {
+ return s.Level <= level
+}
+
+func (s *journalSink) Append(r slog.Record) error {
+ fields := map[string]string{
+ "SYSLOG_IDENTIFIER": s.Tag,
+ }
+
+ it := RecordAll(r, func(groups []string, a slog.Attr) slog.Attr {
+ var key string
+ switch a.Key {
+ case slog.LevelKey:
+ key = "PRIORITY"
+ val := a.Value.Any().(slog.Leveler).Level()
+ fields["PRIORTY"] = fmt.Sprintf("%d", mapLevelToSyslog(val))
+ return slog.Attr{}
+ case slog.TimeKey:
+ fields["SYSLOG_TIMESTAMP"] = a.Value.String()
+ return slog.Attr{}
+ case slog.SourceKey:
+ pc := uintptr(a.Value.Uint64())
+ fs := runtime.CallersFrames([]uintptr{pc})
+ f, _ := fs.Next()
+
+ fields["CODE_FUNC"] = f.Func.Name()
+ fields["CODE_FILE"] = f.File
+ fields["CODE_LINE"] = strconv.Itoa(f.Line)
+ return slog.Attr{}
+ default:
+ key = sanitizeKey(strings.Join(append(groups, a.Key), "_"))
+ }
+
+ fields[key] = a.Value.String()
+ return a
+ })
+
+ msg, err := s.Encoder(it)
+ if err != nil {
+ return fmt.Errorf("encoding failed: %w", err)
+ }
+
+ fields["MESSAGE"] = msg
+ return s.write(fields)
+}
+
+func (s *journalSink) write(fields map[string]string) error {
+ var buf bytes.Buffer
+
+ for k, v := range fields {
+ if strings.ContainsRune(v, '\n') {
+ fmt.Fprintln(&buf, k)
+ binary.Write(&buf, binary.LittleEndian, uint64(len(v)))
+ fmt.Fprintln(&buf, v)
+ } else {
+ fmt.Fprintf(&buf, "%v=%v\n", k, v)
+ }
+ }
+
+ s.mu.Lock()
+ defer s.mu.Unlock()
+
+ if buf.Len() < 65536 {
+ _, err := s.Conn.Write(buf.Bytes())
+ return err
+ }
+
+ return s.writeMemfd(buf.Bytes())
+}
+
+func (s *journalSink) writeMemfd(data []byte) error {
+ fd, err := unix.MemfdCreate("journal_payload", unix.MFD_CLOEXEC|unix.MFD_ALLOW_SEALING)
+ if err != nil {
+ return fmt.Errorf("memfd_create failed: %w", err)
+ }
+ defer unix.Close(fd)
+
+ _, err = unix.Write(fd, data)
+ if err != nil {
+ return err
+ }
+
+ _, err = unix.FcntlInt(uintptr(fd), unix.F_ADD_SEALS,
+ unix.F_SEAL_SHRINK|unix.F_SEAL_GROW|unix.F_SEAL_WRITE|unix.F_SEAL_SEAL)
+ if err != nil {
+ return err
+ }
+
+ unixConn, ok := s.Conn.(*net.UnixConn)
+ if !ok {
+ return fmt.Errorf("connection is not a unix socket")
+ }
+
+ rights := unix.UnixRights(fd)
+
+ _, _, err = unixConn.WriteMsgUnix(nil, rights, nil)
+ return err
+}
+
+func sanitizeKey(k string) string {
+ k = strings.TrimLeft(strings.ToUpper(k), "_")
+ var res strings.Builder
+ for _, r := range k {
+ if (r >= 'A' && r <= 'Z') || (r >= '0' && r <= '9') || r == '_' {
+ res.WriteRune(r)
+ } else {
+ res.WriteRune('_')
+ }
+ }
+ return res.String()
+}
+
+func isJournalAvailable() bool {
+ const socketPath = "/run/systemd/journal/socket"
+
+ fi, err := os.Stat(socketPath)
+ if err != nil {
+ return false
+ }
+
+ return fi.Mode()&os.ModeSocket != 0
+}
+
+func NewHostSink(ctx context.Context, level slog.Level, tag string, encoder Encoder, replaceAttr func(groups []string, a slog.Attr) slog.Attr) (Sink, error) {
+ if isJournalAvailable() {
+ return newJournalSink(ctx, level, tag, encoder, replaceAttr)
+ }
+
+ return DialSyslog(ctx, netkit.NetUnixDatagram, "", encoder, SyslogOptions{
+ Tag: tag,
+ Facility: FacilityUser,
+ Level: level,
+ ReplaceAttr: replaceAttr,
+ })
+}
+
+func IsDaemonManaged() bool {
+ stream := os.Getenv("JOURNAL_STREAM")
+ if stream == "" {
+ return false
+ }
+
+ var stat syscall.Stat_t
+ if err := syscall.Fstat(2, &stat); err != nil {
+ return false
+ }
+
+ expected := fmt.Sprintf("%d:%d", stat.Dev, stat.Ino)
+ return stream == expected
+}