diff options
Diffstat (limited to 'logging/host_linux.go')
| -rw-r--r-- | logging/host_linux.go | 211 |
1 files changed, 211 insertions, 0 deletions
diff --git a/logging/host_linux.go b/logging/host_linux.go new file mode 100644 index 0000000..8617cd0 --- /dev/null +++ b/logging/host_linux.go @@ -0,0 +1,211 @@ +//go:build linux + +package logging + +import ( + "bytes" + "context" + "encoding/binary" + "fmt" + "log/slog" + "net" + "os" + "path/filepath" + "runtime" + "strconv" + "strings" + "sync" + "syscall" + + netkit "go.sudomsg.com/kit/net" + "golang.org/x/sys/unix" +) + +type journalSink struct { + Level slog.Level + Tag string + Conn net.Conn + Encoder Encoder + mu sync.Mutex + ReplaceAttr func(groups []string, a slog.Attr) slog.Attr +} + +var _ Sink = &journalSink{} + +func newJournalSink(ctx context.Context, level slog.Level, tag string, encoder Encoder, replaceAttr func(groups []string, a slog.Attr) slog.Attr) (*journalSink, error) { + conn, err := netkit.Dial(ctx, netkit.NetUnixDatagram, "/run/systemd/journal/socket") + if err != nil { + return nil, fmt.Errorf("failed to connect to journald: %w", err) + } + + if encoder == nil { + encoder = MessageEncoder + } + + if tag == "" { + tag = filepath.Base(os.Args[0]) + } + + if tag == "" { + tag = "kit" + } + + return &journalSink{ + Level: level, + Tag: tag, + Conn: conn, + Encoder: encoder, + ReplaceAttr: replaceAttr, + }, nil +} + +func (s *journalSink) Enabled(level slog.Level) bool { + return s.Level <= level +} + +func (s *journalSink) Append(r slog.Record) error { + fields := map[string]string{ + "SYSLOG_IDENTIFIER": s.Tag, + } + + it := RecordAll(r, func(groups []string, a slog.Attr) slog.Attr { + var key string + switch a.Key { + case slog.LevelKey: + key = "PRIORITY" + val := a.Value.Any().(slog.Leveler).Level() + fields["PRIORTY"] = fmt.Sprintf("%d", mapLevelToSyslog(val)) + return slog.Attr{} + case slog.TimeKey: + fields["SYSLOG_TIMESTAMP"] = a.Value.String() + return slog.Attr{} + case slog.SourceKey: + pc := uintptr(a.Value.Uint64()) + fs := runtime.CallersFrames([]uintptr{pc}) + f, _ := fs.Next() + + fields["CODE_FUNC"] = f.Func.Name() + fields["CODE_FILE"] = f.File + fields["CODE_LINE"] = strconv.Itoa(f.Line) + return slog.Attr{} + default: + key = sanitizeKey(strings.Join(append(groups, a.Key), "_")) + } + + fields[key] = a.Value.String() + return a + }) + + msg, err := s.Encoder(it) + if err != nil { + return fmt.Errorf("encoding failed: %w", err) + } + + fields["MESSAGE"] = msg + return s.write(fields) +} + +func (s *journalSink) write(fields map[string]string) error { + var buf bytes.Buffer + + for k, v := range fields { + if strings.ContainsRune(v, '\n') { + fmt.Fprintln(&buf, k) + binary.Write(&buf, binary.LittleEndian, uint64(len(v))) + fmt.Fprintln(&buf, v) + } else { + fmt.Fprintf(&buf, "%v=%v\n", k, v) + } + } + + s.mu.Lock() + defer s.mu.Unlock() + + if buf.Len() < 65536 { + _, err := s.Conn.Write(buf.Bytes()) + return err + } + + return s.writeMemfd(buf.Bytes()) +} + +func (s *journalSink) writeMemfd(data []byte) error { + fd, err := unix.MemfdCreate("journal_payload", unix.MFD_CLOEXEC|unix.MFD_ALLOW_SEALING) + if err != nil { + return fmt.Errorf("memfd_create failed: %w", err) + } + defer unix.Close(fd) + + _, err = unix.Write(fd, data) + if err != nil { + return err + } + + _, err = unix.FcntlInt(uintptr(fd), unix.F_ADD_SEALS, + unix.F_SEAL_SHRINK|unix.F_SEAL_GROW|unix.F_SEAL_WRITE|unix.F_SEAL_SEAL) + if err != nil { + return err + } + + unixConn, ok := s.Conn.(*net.UnixConn) + if !ok { + return fmt.Errorf("connection is not a unix socket") + } + + rights := unix.UnixRights(fd) + + _, _, err = unixConn.WriteMsgUnix(nil, rights, nil) + return err +} + +func sanitizeKey(k string) string { + k = strings.TrimLeft(strings.ToUpper(k), "_") + var res strings.Builder + for _, r := range k { + if (r >= 'A' && r <= 'Z') || (r >= '0' && r <= '9') || r == '_' { + res.WriteRune(r) + } else { + res.WriteRune('_') + } + } + return res.String() +} + +func isJournalAvailable() bool { + const socketPath = "/run/systemd/journal/socket" + + fi, err := os.Stat(socketPath) + if err != nil { + return false + } + + return fi.Mode()&os.ModeSocket != 0 +} + +func NewHostSink(ctx context.Context, level slog.Level, tag string, encoder Encoder, replaceAttr func(groups []string, a slog.Attr) slog.Attr) (Sink, error) { + if isJournalAvailable() { + return newJournalSink(ctx, level, tag, encoder, replaceAttr) + } + + return DialSyslog(ctx, netkit.NetUnixDatagram, "", encoder, SyslogOptions{ + Tag: tag, + Facility: FacilityUser, + Level: level, + ReplaceAttr: replaceAttr, + }) +} + +func IsDaemonManaged() bool { + stream := os.Getenv("JOURNAL_STREAM") + if stream == "" { + return false + } + + var stat syscall.Stat_t + if err := syscall.Fstat(2, &stat); err != nil { + return false + } + + expected := fmt.Sprintf("%d:%d", stat.Dev, stat.Ino) + return stream == expected +} |
