~shabbyrobe/dogdump

5e858b1e56ce23f5adf101d4fd36d4991ed67253 — Blake Williams 3 months ago e8bb9c3
Dump in API trace format
A cmd/dogdump/apitrace.go => cmd/dogdump/apitrace.go +74 -0
@@ 0,0 1,74 @@
package main

import (
	"context"
	"encoding/json"
	"log"
	"os"
	"sync"

	"go.shabbyrobe.org/dogdump/dogapi"
	"go.shabbyrobe.org/dogdump/dogproto"
	"go.shabbyrobe.org/dogdump/dogtrace"
)

type APITraceWriter struct {
	handleError func(err error) (abort error)

	f   *os.File
	enc *json.Encoder
	mu  sync.Mutex
}

var _ dogtrace.Handler = (&APITraceWriter{}).Handle

func newAPITraceWriter(file string, handleError func(err error) (abort error)) (*APITraceWriter, error) {
	if handleError == nil {
		handleError = func(err error) (abort error) {
			log.Println(err)
			return nil
		}
	}

	f, err := os.OpenFile(file, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0600)
	if err != nil {
		return nil, err
	}

	return &APITraceWriter{
		handleError: handleError,
		f:           f,
		enc:         json.NewEncoder(f),
	}, nil
}

func (w *APITraceWriter) Close() error {
	return w.f.Close()
}

func (w *APITraceWriter) encode(trace dogapi.Trace) error {
	w.mu.Lock()
	defer w.mu.Unlock()

	var traceWrapper struct {
		Trace dogapi.Trace `json:"trace"`
	}
	traceWrapper.Trace = trace

	return w.enc.Encode(traceWrapper)
}

func (w *APITraceWriter) Handle(ctx context.Context, spanLists dogproto.SpanLists) (abort error) {
	traces, err := dogapi.TracesFromInputSpanLists(spanLists)
	if err != nil {
		return w.handleError(err)
	}

	for _, trace := range traces {
		if err := w.encode(trace); err != nil {
			return err
		}
	}

	return nil
}

M cmd/dogdump/cmd_serve.go => cmd/dogdump/cmd_serve.go +18 -1
@@ 30,6 30,7 @@ func run() (rerr error) {
	var showMetrics, showTraces, showLogs bool
	var metricsInfluxHost, metricsInfluxToken, metricsInfluxOrg, metricsInfluxBucket string
	var traceOTLPHost string
	var traceAPIDumpFile string

	var flags = flag.NewFlagSet("", 0)
	flags.StringVar(&dumpFileName, "file", "", "Log ndjson/jsonl to this file. Will print to stdout if not passed.")


@@ 52,6 53,9 @@ func run() (rerr error) {
	// Datadog trace OTLP sink:
	flags.StringVar(&traceOTLPHost, "trace.otlp.host", "", "Send traces to OTLP host")

	// Datadog trace API format sink:
	flags.StringVar(&traceAPIDumpFile, "trace.api.file", "", "Dump traces in a jsonl file in the format of /api/v1/trace")

	if err := flags.Parse(os.Args[1:]); err != nil {
		return err
	}


@@ 113,7 117,7 @@ func run() (rerr error) {

	if tracesAddr != "" {
		n++
		errgrp.Go(func() error {
		errgrp.Go(func() (rerr error) {
			if traceOTLPHost != "" {
				log.Printf("shipping traces to otlp server at %s", traceOTLPHost)
				endpoint := doggotel.DefaultEndpointFromHost(false, traceOTLPHost)


@@ 124,6 128,19 @@ func run() (rerr error) {
				tracesHandler = dogtrace.MultiHandler(tracesHandler, dog2OTLPHandler.HandleTrace)
			}

			if traceAPIDumpFile != "" {
				h, err := newAPITraceWriter(traceAPIDumpFile, nil)
				if err != nil {
					return err
				}
				defer func() {
					if cerr := h.Close(); cerr != nil && rerr == nil {
						rerr = cerr
					}
				}()
				tracesHandler = dogtrace.MultiHandler(tracesHandler, h.Handle)
			}

			srv := http.Server{
				Handler: dogtrace.HTTPHandler(tracesHandler, dumper.Error),
			}

A dogapi/trace.go => dogapi/trace.go +188 -0
@@ 0,0 1,188 @@
package dogapi

import (
	"encoding/json"
	"fmt"
	"maps"
	"math"
	"strconv"
	"time"

	"go.shabbyrobe.org/dogdump/dogproto"
)

type (
	SpanID  string
	TraceID string
)

// Trace describes the response format for the datadog REST API:
// https://app.datadoghq.com/api/v1/trace/<id>
//
// It's _just_ different enough, and with a different-enough purpose, to dogproto to merit
// duplication.
type Trace struct {
	RootID SpanID          `json:"root_id"`
	Spans  map[SpanID]Span `json:"spans"`
}

type Span struct {
	TraceID         TraceID            `json:"trace_id"`
	SpanID          SpanID             `json:"span_id"`
	ParentID        SpanID             `json:"parent_id"`
	Start           FloatTime          `json:"start"`
	End             FloatTime          `json:"end"`
	Duration        time.Duration      `json:"duration"`
	Type            string             `json:"type"`
	Service         string             `json:"service"`
	Name            string             `json:"name"`
	Resource        string             `json:"resource"`
	ResourceHash    string             `json:"resource_hash"`
	HostID          int64              `json:"host_id"`
	HostName        string             `json:"hostname"`
	Env             string             `json:"env"`
	HostGroups      []string           `json:"host_groups"`
	Meta            map[string]string  `json:"meta"`
	Metrics         map[string]float64 `json:"metrics"`
	IngestionReason string             `json:"ingestion_reason"`
	Metadata        any                `json:"metadata"` // Unsure how to type this
	ChildIDs        []SpanID           `json:"children_ids"`
}

func TracesFromInputSpanLists(input dogproto.SpanLists) (map[TraceID]Trace, error) {
	type key struct {
		TraceID TraceID
		SpanID  SpanID
	}

	traces := map[TraceID]Trace{}
	parents := map[key]SpanID{}
	children := map[key][]SpanID{}

	for _, inputSpanList := range input {
		for _, inputSpan := range inputSpanList {
			span, err := SpanFromInputTrace(inputSpan)
			if err != nil {
				return nil, err
			}

			k := key{span.TraceID, span.SpanID}
			if _, ok := parents[k]; ok {
				return nil, fmt.Errorf("dogapi: duplicate span id %q", k)
			}

			if span.ParentID != "" {
				parents[k] = span.ParentID

				pk := key{span.TraceID, span.ParentID}
				if _, ok := children[pk]; !ok {
					children[pk] = []SpanID{span.SpanID}
				} else {
					children[pk] = append(children[pk], span.SpanID)
				}
			}

			if _, ok := traces[span.TraceID]; !ok {
				traces[span.TraceID] = Trace{
					Spans: map[SpanID]Span{},
				}
			}
			traces[span.TraceID].Spans[span.SpanID] = span
		}
	}

	for traceID, trace := range traces {
		var root SpanID
		var seen = map[key]struct{}{}
		for idx, span := range trace.Spans {
			k := key{span.TraceID, span.SpanID}
			if span.ParentID != "" {
				root = span.ParentID
			}

			span.ChildIDs = children[k]
			if span.ChildIDs == nil {
				span.ChildIDs = []SpanID{}
			}
			trace.Spans[idx] = span
		}

		for {
			k := key{traceID, root}
			next, ok := parents[k]
			if !ok {
				break
			} else if _, ok := seen[k]; ok {
				return nil, fmt.Errorf("dogap: parent loop in trace")
			}
			root = next
			seen[k] = struct{}{}
		}

		trace.RootID = root
		traces[traceID] = trace
	}

	return traces, nil
}

func SpanFromInputTrace(input dogproto.Span) (Span, error) {
	if input.Error != 0 {
		return Span{}, fmt.Errorf("dogapi: input trace contained an error: %d", input.Error)
	}

	s := Span{
		TraceID:  TraceID(strconv.FormatUint(input.TraceID, 10)),
		SpanID:   SpanID(strconv.FormatUint(input.SpanID, 10)),
		Start:    FloatTime(input.StartTime()),
		End:      FloatTime(input.EndTime()),
		Duration: input.Duration,
		Type:     ptrVal(input.Type),
		Service:  input.Service,
		Name:     input.Name,
		Resource: input.Resource,
		Meta:     maps.Clone(input.Meta),
		Metrics:  maps.Clone(input.Metrics),

		// No equivalent:
		// ResourceHash: input.ResourceHash,
		// HostID: input.HostID,
		// HostName: input.HostName,
		// Env             string             `json:"env"`
		// HostGroups      []string           `json:"host_groups"`
		// IngestionReason string             `json:"ingestion_reason"`
		// Metadata        any                `json:"metadata"` // Unsure how to type this
		// ChildIDs     []SpanID              `json:"children_ids"`
	}

	if input.ParentID > 0 {
		s.ParentID = SpanID(strconv.FormatUint(input.ParentID, 10))
	}

	return s, nil
}

func ptrVal[T any](p *T) (v T) {
	if p == nil {
		return v
	}
	return *p
}

type FloatTime time.Time

func (ft FloatTime) MarshalJSON() ([]byte, error) {
	tm := time.Time(ft)
	sec := float64(tm.Unix()) + (float64(tm.Nanosecond()) / 1000000000.0)
	return json.Marshal(sec)
}

func (ft *FloatTime) UnmarshalJSON(bts []byte) error {
	var f float64
	if err := json.Unmarshal(bts, &f); err != nil {
		return err
	}
	tm := time.Unix(int64(f), int64(math.Mod(f, 1)*1000000000))
	*ft = FloatTime(tm)
	return nil
}

M doggotel/trace.go => doggotel/trace.go +1 -1
@@ 36,7 36,7 @@ func ConvertTraceToJSON(spanLists dogproto.SpanLists) (*JSONTrace, error) {
				Name:              otlpName(span),
				Attributes:        buildAttributes(span),
				StartTimeUnixNano: uint64(span.StartUnixNano),
				EndTimeUnixNano:   uint64(span.StartUnixNano) + uint64(span.DurationNsec),
				EndTimeUnixNano:   uint64(span.StartUnixNano) + uint64(span.Duration),
			}
			outScopeSpan.Spans = append(outScopeSpan.Spans, outSpan)
			outResourceSpan.ScopeSpans = append(outResourceSpan.ScopeSpans, outScopeSpan)

A dogproto/config.go => dogproto/config.go +23 -0
@@ 0,0 1,23 @@
package dogproto

// JSON response to 'GET /v0.7/config', sponged out of DDAgentFeaturesDiscovery.java
// in https://github.com/DataDog/dd-trace-java/
type ConfigResponse struct {
	Version string `json:"version"`
	Config  Config `json:"config"`

	// Must not be nil.
	// These are just paths, not hosts:
	// - https://github.com/DataDog/dd-trace-java/blob/0f986a71765d0bad85ed4e5c3bc6ef44b8cb78fe/communication/src/main/java/datadog/communication/ddagent/DDAgentFeaturesDiscovery.java#L39
	// - https://github.com/DataDog/dd-trace-java/blob/0f986a71765d0bad85ed4e5c3bc6ef44b8cb78fe/communication/src/main/java/datadog/communication/ddagent/DDAgentFeaturesDiscovery.java#L59C46-L59C65
	// In the java version, whether or not metrics or tracing are enabled is contingent on
	// whether those paths are found in here.
	Endpoints []string `json:"endpoints"`

	ClientDropP0s bool `json:"client_drop_p0s,omitempty"`
}

type Config struct {
	// A port <= 0 should instruct Java to skip this and use an internal default
	StatsDPort int `json:"statsd_port"`
}

M dogproto/traceproto.go => dogproto/traceproto.go +9 -21
@@ 3,6 3,8 @@ package dogproto
//go:generate go run github.com/tinylib/msgp@latest

import (
	"time"

	"github.com/tinylib/msgp/msgp"
)



@@ 21,9 23,9 @@ type Span struct {
	Service       string             `msg:"service" json:"service"`
	Resource      string             `msg:"resource" json:"resource"`
	Type          *string            `msg:"type" json:"type"`
	StartUnixNano int64              `msg:"start" json:"start"`
	DurationNsec  int64              `msg:"duration" json:"duration"`
	Meta          map[string]string  `msg:"meta omitempty" json:"meta,omitempty"`
	StartUnixNano int64              `msg:"start" json:"start"` // NOTE: This is a float of 'seconds' in /api/v1/trace, but a unixnano here
	Duration      time.Duration      `msg:"duration" json:"duration"`
	Meta          map[string]string  `msg:"meta,omitempty" json:"meta,omitempty"`
	Metrics       map[string]float64 `msg:"metrics,omitempty" json:"metrics,omitempty"`
	SpanID        uint64             `msg:"span_id" json:"spanId"`
	TraceID       uint64             `msg:"trace_id" json:"traceId"`


@@ 31,24 33,10 @@ type Span struct {
	Error         int32              `msg:"error" json:"error"`
}

// JSON response to 'GET /v0.7/config', sponged out of DDAgentFeaturesDiscovery.java
// in https://github.com/DataDog/dd-trace-java/
type ConfigResponse struct {
	Version string `json:"version"`
	Config  Config `json:"config"`

	// Must not be nil.
	// These are just paths, not hosts:
	// - https://github.com/DataDog/dd-trace-java/blob/0f986a71765d0bad85ed4e5c3bc6ef44b8cb78fe/communication/src/main/java/datadog/communication/ddagent/DDAgentFeaturesDiscovery.java#L39
	// - https://github.com/DataDog/dd-trace-java/blob/0f986a71765d0bad85ed4e5c3bc6ef44b8cb78fe/communication/src/main/java/datadog/communication/ddagent/DDAgentFeaturesDiscovery.java#L59C46-L59C65
	// In the java version, whether or not metrics or tracing are enabled is contingent on
	// whether those paths are found in here.
	Endpoints []string `json:"endpoints"`

	ClientDropP0s bool `json:"client_drop_p0s,omitempty"`
func (span Span) StartTime() time.Time {
	return time.Unix(0, span.StartUnixNano)
}

type Config struct {
	// A port <= 0 should instruct Java to skip this and use an internal default
	StatsDPort int `json:"statsd_port"`
func (span Span) EndTime() time.Time {
	return time.Unix(0, span.StartUnixNano+int64(span.Duration))
}

M dogproto/traceproto_gen.go => dogproto/traceproto_gen.go +43 -31
@@ 67,12 67,12 @@ func (z *Span) DecodeMsg(dc *msgp.Reader) (err error) {
				return
			}
		case "duration":
			z.DurationNsec, err = dc.ReadInt64()
			z.Duration, err = dc.ReadDuration()
			if err != nil {
				err = msgp.WrapError(err, "DurationNsec")
				err = msgp.WrapError(err, "Duration")
				return
			}
		case "meta omitempty":
		case "meta":
			var zb0002 uint32
			zb0002, err = dc.ReadMapHeader()
			if err != nil {


@@ 173,6 173,10 @@ func (z *Span) EncodeMsg(en *msgp.Writer) (err error) {
	zb0001Len := uint32(12)
	var zb0001Mask uint16 /* 12 bits */
	_ = zb0001Mask
	if z.Meta == nil {
		zb0001Len--
		zb0001Mask |= 0x40
	}
	if z.Metrics == nil {
		zb0001Len--
		zb0001Mask |= 0x80


@@ 247,32 251,34 @@ func (z *Span) EncodeMsg(en *msgp.Writer) (err error) {
	if err != nil {
		return
	}
	err = en.WriteInt64(z.DurationNsec)
	if err != nil {
		err = msgp.WrapError(err, "DurationNsec")
		return
	}
	// write "meta omitempty"
	err = en.Append(0xae, 0x6d, 0x65, 0x74, 0x61, 0x20, 0x6f, 0x6d, 0x69, 0x74, 0x65, 0x6d, 0x70, 0x74, 0x79)
	if err != nil {
		return
	}
	err = en.WriteMapHeader(uint32(len(z.Meta)))
	err = en.WriteDuration(z.Duration)
	if err != nil {
		err = msgp.WrapError(err, "Meta")
		err = msgp.WrapError(err, "Duration")
		return
	}
	for za0001, za0002 := range z.Meta {
		err = en.WriteString(za0001)
	if (zb0001Mask & 0x40) == 0 { // if not empty
		// write "meta"
		err = en.Append(0xa4, 0x6d, 0x65, 0x74, 0x61)
		if err != nil {
			err = msgp.WrapError(err, "Meta")
			return
		}
		err = en.WriteString(za0002)
		err = en.WriteMapHeader(uint32(len(z.Meta)))
		if err != nil {
			err = msgp.WrapError(err, "Meta", za0001)
			err = msgp.WrapError(err, "Meta")
			return
		}
		for za0001, za0002 := range z.Meta {
			err = en.WriteString(za0001)
			if err != nil {
				err = msgp.WrapError(err, "Meta")
				return
			}
			err = en.WriteString(za0002)
			if err != nil {
				err = msgp.WrapError(err, "Meta", za0001)
				return
			}
		}
	}
	if (zb0001Mask & 0x80) == 0 { // if not empty
		// write "metrics"


@@ 348,6 354,10 @@ func (z *Span) MarshalMsg(b []byte) (o []byte, err error) {
	zb0001Len := uint32(12)
	var zb0001Mask uint16 /* 12 bits */
	_ = zb0001Mask
	if z.Meta == nil {
		zb0001Len--
		zb0001Mask |= 0x40
	}
	if z.Metrics == nil {
		zb0001Len--
		zb0001Mask |= 0x80


@@ 378,13 388,15 @@ func (z *Span) MarshalMsg(b []byte) (o []byte, err error) {
	o = msgp.AppendInt64(o, z.StartUnixNano)
	// string "duration"
	o = append(o, 0xa8, 0x64, 0x75, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e)
	o = msgp.AppendInt64(o, z.DurationNsec)
	// string "meta omitempty"
	o = append(o, 0xae, 0x6d, 0x65, 0x74, 0x61, 0x20, 0x6f, 0x6d, 0x69, 0x74, 0x65, 0x6d, 0x70, 0x74, 0x79)
	o = msgp.AppendMapHeader(o, uint32(len(z.Meta)))
	for za0001, za0002 := range z.Meta {
		o = msgp.AppendString(o, za0001)
		o = msgp.AppendString(o, za0002)
	o = msgp.AppendDuration(o, z.Duration)
	if (zb0001Mask & 0x40) == 0 { // if not empty
		// string "meta"
		o = append(o, 0xa4, 0x6d, 0x65, 0x74, 0x61)
		o = msgp.AppendMapHeader(o, uint32(len(z.Meta)))
		for za0001, za0002 := range z.Meta {
			o = msgp.AppendString(o, za0001)
			o = msgp.AppendString(o, za0002)
		}
	}
	if (zb0001Mask & 0x80) == 0 { // if not empty
		// string "metrics"


@@ 470,12 482,12 @@ func (z *Span) UnmarshalMsg(bts []byte) (o []byte, err error) {
				return
			}
		case "duration":
			z.DurationNsec, bts, err = msgp.ReadInt64Bytes(bts)
			z.Duration, bts, err = msgp.ReadDurationBytes(bts)
			if err != nil {
				err = msgp.WrapError(err, "DurationNsec")
				err = msgp.WrapError(err, "Duration")
				return
			}
		case "meta omitempty":
		case "meta":
			var zb0002 uint32
			zb0002, bts, err = msgp.ReadMapHeaderBytes(bts)
			if err != nil {


@@ 579,7 591,7 @@ func (z *Span) Msgsize() (s int) {
	} else {
		s += msgp.StringPrefixSize + len(*z.Type)
	}
	s += 6 + msgp.Int64Size + 9 + msgp.Int64Size + 15 + msgp.MapHeaderSize
	s += 6 + msgp.Int64Size + 9 + msgp.DurationSize + 5 + msgp.MapHeaderSize
	if z.Meta != nil {
		for za0001, za0002 := range z.Meta {
			_ = za0002