From 25fea3956d309b1974625a07a7a977376ed7ffe7 Mon Sep 17 00:00:00 2001 From: Ryan Gonzalez Date: Sat, 19 Aug 2023 17:21:58 -0500 Subject: [PATCH] Switch the disk format to MessagePack Variable-length integers, in particular, are useful for saving space. --- shard.yml | 2 + src/alys.cr | 152 +++++++++++++++-------- src/alys/VERSION | 2 +- src/alys/msgpack.cr | 269 ++++++++++++++++++++++++++++++++++++++++ tools/alys_converter.cr | 75 ++--------- 5 files changed, 386 insertions(+), 114 deletions(-) create mode 100644 src/alys/msgpack.cr diff --git a/shard.yml b/shard.yml index 70a44c0..88eb9a7 100644 --- a/shard.yml +++ b/shard.yml @@ -12,6 +12,8 @@ executables: - alys_converter dependencies: + msgpack: + github: crystal-community/msgpack-crystal protobuf: github: jeromegn/protobuf.cr commit: ce742d987caa7af2108cc7de00aaec435004c6a5 diff --git a/src/alys.cr b/src/alys.cr index f6a4dbe..9a4df6d 100644 --- a/src/alys.cr +++ b/src/alys.cr @@ -22,6 +22,8 @@ # will have a file & name string of the embedded length immediately following # it. +require "msgpack" +require "./alys/msgpack.cr" require "./alys/overrides.cr" {{ `"#{__DIR__}/../tools/detect_libunwind.sh" --print-code` }} @@ -118,29 +120,79 @@ module Alys::Internal PROTOCOL_VERSION = {{ read_file("#{__DIR__}/alys/VERSION").strip.id + "u32" }} - @[Packed] - record Header, - name : UInt8[4], - version : UInt32 + record Header, name : String, version : UInt32 do + def self.new(unpacker : MessagePack::Unpacker) + Header.new unpacker.read_string, UInt32.new(unpacker) + end + + def to_msgpack(packer) + packer.write name + packer.write version + end + end - @[Packed] record Frame, ip : UInt64, line : UInt16, - file_len : UInt16, - name_len : UInt16 + file : Bytes, + name : Bytes do + def self.new(unpacker : MessagePack::Unpacker) : self + Frame.new UInt64.new(unpacker), + UInt16.new(unpacker), + unpacker.read_bytes, + unpacker.read_bytes + end + + def to_msgpack(packer) + packer.write ip + packer.write line + packer.write file + packer.write name + end + end - record AllocEventExtra, size : UInt64 - record ReallocEventExtra, prev_addr : UInt64 + record AllocEventExtra, size : UInt64 do + def self.new(unpacker : MessagePack::Unpacker) : self + AllocEventExtra.new UInt64.new(unpacker) + end + + def to_msgpack(packer) + packer.write size + end + end + + record ReallocEventExtra, prev_addr : UInt64 do + def self.new(unpacker : MessagePack::Unpacker) : self + ReallocEventExtra.new UInt64.new(unpacker) + end + + def to_msgpack(packer) + packer.write prev_addr + end + end - @[Packed] record Event, kind : EventKind, seconds : Int64, nanos : Int32, - addr : UInt64 + addr : UInt64 do + def self.new(unpacker : MessagePack::Unpacker) : self + Event.new EventKind.new(unpacker), + Int64.new(unpacker), + Int32.new(unpacker), + UInt64.new(unpacker) + end + + def to_msgpack(packer) + packer.write kind.value + packer.write seconds + packer.write nanos + packer.write addr + end + end - # Not IO because I don't need to implement 'read'. + # Not IO because that introduces a circular dependency where IO methods + # require the current fiber to be initialized which requires the GC. class EventlessFd # useful for debugging getter pos @@ -159,11 +211,22 @@ module Alys::Internal LibC.close @fd if @autoclose end - def write(bytes : Bytes) + def write_byte(byte : UInt8) + write Bytes.new(pointerof(byte), 1) + end + + def write_bytes(value : T, format : IO::ByteFormat) forall T + buffer = uninitialized T + bytes = Bytes.new pointerof(buffer).as(UInt8*), sizeof(T) + format.encode value, bytes + write bytes + end + + def write(slice : Bytes) : Nil total_written = 0u64 - while total_written < bytes.size - written = LibC.write @fd, bytes.to_unsafe + total_written, - bytes.size - total_written + while total_written < slice.size + written = LibC.write @fd, slice.to_unsafe + total_written, + slice.size - total_written if written == -1 errno = Errno.value if errno = Errno::EAGAIN @@ -178,38 +241,32 @@ module Alys::Internal @pos += written end end - - def write_value(value : Value) - bytes = Bytes.new pointerof(value).as(Pointer(UInt8)), - sizeof(typeof(value)) - write bytes - end end def self.bytes_from_cstr(cstr : LibC::Char*) : Bytes Bytes.new cstr, LibC.strlen(cstr) end - @@fd : EventlessFd? = nil + @@packer : MsgPacker? = nil def self.enabled? - @@fd != nil + @@packer != nil end def self.enable(file : String) - raise "Already enabled" if @@fd != nil + raise "Already enabled" if @@packer != nil Exception::CallStack.load_debug_info fd = EventlessFd.open file + packer = MsgPacker.new fd - name = StaticArray['A', 'L', 'Y', 'S'].map { |c| c.bytes[0] } - header = Header.new name: name, version: PROTOCOL_VERSION - fd.write_value header + header = Header.new name: "ALYS", version: PROTOCOL_VERSION + header.to_msgpack packer # Don't save it until down here, since the previous lines would have started - # allocations that would be written to this fd. - @@fd = fd + # allocations that would be written to this packer. + @@packer = packer end @@in_write = false @@ -217,7 +274,7 @@ module Alys::Internal private def self.in_write if @@in_write @@in_write = false - @@fd = nil + @@packer = nil raise "[ALYS] alloc called within handler!\n" end @@ -227,7 +284,7 @@ module Alys::Internal end def self.record_finalize(mem : Void*) - return unless fd = @@fd + return unless packer = @@packer self.in_write do time = Time.monotonic @@ -235,19 +292,19 @@ module Alys::Internal seconds: time.to_i, nanos: time.nanoseconds, addr: mem.address - fd.write_value event + event.to_msgpack packer end end private record BacktraceHandlerData, settings : BacktraceSettings, - fd : EventlessFd, + packer : MsgPacker, count : UInt32 = 0 @[AlwaysInline] def self.record_alloc(mem : Void*, size : UInt64, prev_addr : Void* = Pointer(Void).null) - return unless fd = @@fd + return unless packer = @@packer backtrace_settings = Alys.backtrace_settings @@ -263,22 +320,22 @@ module Alys::Internal seconds: time.to_i, nanos: time.nanoseconds, addr: mem.address - fd.write_value event + event.to_msgpack packer extra = AllocEventExtra.new size: size.as(UInt64) - fd.write_value extra + extra.to_msgpack packer if !prev_addr.null? extra = ReallocEventExtra.new prev_addr: prev_addr.address - fd.write_value extra + extra.to_msgpack packer end if backtrace_settings.type_ != BacktraceType::None - backtrace_data = BacktraceHandlerData.new backtrace_settings, fd + backtrace_data = BacktraceHandlerData.new backtrace_settings, packer LibUnwind.backtrace ->(ctx, data) { bt_data = data.as Pointer(BacktraceHandlerData) - our_fd = bt_data.value.fd + our_packer = bt_data.value.packer settings = bt_data.value.settings ip = LibUnwind.get_ip(ctx).as UInt64 @@ -301,17 +358,11 @@ module Alys::Internal file, line, _ = Exception::CallStack.alys_decode_line_number ip end - file_len = file.try(&.size) || 0 - name_len = name.try(&.size) || 0 - frame = Frame.new ip: ip, line: line.to_u16, - file_len: file_len.to_u16, - name_len: name_len.to_u16 - - our_fd.write_value frame - our_fd.write file if file - our_fd.write name if name + file: file || Bytes.empty, + name: name || Bytes.empty + frame.to_msgpack our_packer if limit = bt_data.value.settings.limit return LibUnwind::ReasonCode::END_OF_STACK if bt_data.value.count >= limit @@ -322,8 +373,7 @@ module Alys::Internal }, pointerof(backtrace_data) end - fd.write_value Frame.new(ip: 0u16, line: 0u16, file_len: 0u16, - name_len: 0u16) + packer.write nil end end end diff --git a/src/alys/VERSION b/src/alys/VERSION index 274dc78..20206c9 100644 --- a/src/alys/VERSION +++ b/src/alys/VERSION @@ -1 +1 @@ -2022090801 +2023081901 diff --git a/src/alys/msgpack.cr b/src/alys/msgpack.cr new file mode 100644 index 0000000..3063ccd --- /dev/null +++ b/src/alys/msgpack.cr @@ -0,0 +1,269 @@ +# This file was largely taken from the msgpack-crystal source code: +# https://github.com/crystal-community/msgpack-crystal/blob/v1.3.4/src/message_pack/packer.cr# +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +struct Alys::Internal::MsgPacker + # Avoid creating any exceptions while writing. + private def invalid_length + LibC.printf "%s\n", "[ALYS:MsgPacker] invalid length" + LibC.abort + end + + # Take an EventlessFd instead of IO. + def initialize(@io : EventlessFd) + end + + def write(value : Nil) + write_byte(0xC0) + self + end + + def write(value : Bool) + write_byte(value ? 0xC3_u8 : 0xC2_u8) + self + end + + def write_string_start(bytesize) + case bytesize + when (0x00..0x1F) + write_byte(0xA0_u8 + bytesize.to_u8) + when (0x0000..0xFF) + write_byte(0xD9) + write_value(bytesize.to_u8) + when (0x0000..0xFFFF) + write_byte(0xDA) + write_value(bytesize.to_u16) + when (0x00000000..0xFFFFFFFF) + write_byte(0xDB) + write_value(bytesize.to_u32) + else + invalid_length + end + self + end + + def write_binary_start(bytesize) + case bytesize + when (0x0000..0xFF) + # bin8 + write_byte(0xC4) + write_value(bytesize.to_u8) + when (0x0000..0xFFFF) + # bin16 + write_byte(0xC5) + write_value(bytesize.to_u16) + when (0x00000000..0xFFFFFFFF) + # bin32 + write_byte(0xC6) + write_value(bytesize.to_u32) + else + invalid_length + end + self + end + + def write(value : String) + write_string_start(value.bytesize) + write_slice(value.to_slice) + self + end + + def write(value : Bytes) + write_binary_start(value.bytesize) + write_slice(value) + self + end + + def write(value : Symbol) + write(value.to_s) + end + + def write(value : Float32 | Float64) + case value + when Float32 + write_byte(0xCA) + when Float64 + write_byte(0xCB) + end + write_value(value) + self + end + + def write(value : Int8 | Int16 | Int32 | Int64 | UInt8 | UInt16 | UInt32 | UInt64) + if value >= 0 + if 0x7F.to_u8 >= value + write_byte(value.to_u8) + elsif UInt8::MAX >= value + write_byte(0xCC) + write_byte(value.to_u8) + elsif UInt16::MAX >= value + write_byte(0xCD) + write_value(value.to_u16) + elsif UInt32::MAX >= value + write_byte(0xCE) + write_value(value.to_u32) + else + write_byte(0xCF) + write_value(value.to_u64) + end + else + if -0x20.to_i8 <= value + v = value.to_i8 + write_byte(v.to_u8!) + elsif Int8::MIN <= value + write_byte(0xD0) + v = value.to_i8 + write_byte(v.to_u8!) + elsif Int16::MIN <= value + write_byte(0xD1) + write_value(value.to_i16) + elsif Int32::MIN <= value + write_byte(0xD2) + write_value(value.to_i32) + else + write_byte(0xD3) + write_value(value.to_i64) + end + end + self + end + + def write(value : Hash) + write_hash_start(value.size) + + value.each do |key, value| + self.write(key) + self.write(value) + end + + self + end + + def write_hash_start(length) + case length + when (0x00..0x0F) + write_byte(0x80_u8 + length.to_u8) + when (0x0000..0xFFFF) + write_byte(0xDE) + write_value(length.to_u16) + when (0x00000000..0xFFFFFFFF) + write_byte(0xDF) + write_value(length.to_u32) + else + invalid_length + end + self + end + + def write(value : Array) + write_array_start(value.size) + value.each { |item| self.write(item) } + self + end + + def write_array_start(length) + case length + when (0x00..0x0F) + write_byte(0x90_u8 + length.to_u8) + when (0x0000..0xFFFF) + write_byte(0xDC) + write_value(length.to_u16) + when (0x00000000..0xFFFFFFFF) + write_byte(0xDD) + write_value(length.to_u32) + else + invalid_length + end + self + end + + def write(value : Tuple) + write_array_start(value.size) + value.each { |item| self.write(item) } + self + end + + def write_ext_start(bytesize) + case bytesize + when 1 + write_byte(0xD4) + when 2 + write_byte(0xD5) + when 4 + write_byte(0xD6) + when 8 + write_byte(0xD7) + when 16 + write_byte(0xD8) + when 0x00..0xFF + write_byte(0xC7) + write_byte(bytesize.to_u8) + when 0x0000..0xFFFF + write_byte(0xC8) + write_value(bytesize.to_u16) + when 0x00000000..0xFFFFFFFF + write_byte(0xC9) + write_value(bytesize.to_u32) + else + invalid_length + end + self + end + + def write_ext(type_id : Int8, bytes : Bytes) + write_ext_start(bytes.size) + write_byte(type_id.to_u8!) + write_slice(bytes.to_slice) + end + + def write_ext(type_id : Int8) + io = IO::Memory.new + yield(io) + write_ext(type_id, io.to_slice) + end + + private def write_byte(byte : UInt8) + @io.write_byte(byte) + end + + private def write_value(value) + @io.write_bytes(value, IO::ByteFormat::BigEndian) + end + + private def write_slice(slice) + @io.write(slice) + end + + def to_slice + io = @io + if io.responds_to?(:to_slice) + io.to_slice + else + raise Error.new("to slice not implemented for io type: #{typeof(io)}") + end + end + + def to_s + @io.to_s + end + + def bytes + @io.to_s.bytes + end +end diff --git a/tools/alys_converter.cr b/tools/alys_converter.cr index e416d55..6bc65f5 100644 --- a/tools/alys_converter.cr +++ b/tools/alys_converter.cr @@ -9,27 +9,6 @@ require "compress/gzip" require "json" require "option_parser" -module ReadUtils - def self.bytes(io : IO, size : Int) - slice = Bytes.new size - io.read_fully slice - slice - end - - def self.string(io : IO, size : Int) - slice = bytes io, size - String.new slice - end -end - -module ReadStructFromIO - macro included - def self.from_io(%io : IO) - ::ReadUtils.bytes(%io, sizeof({{@type}})).to_unsafe.as(Pointer({{@type}})).value - end - end -end - PROTOCOL_VERSION = Alys::Internal::PROTOCOL_VERSION alias EventKind = Alys::Internal::EventKind @@ -39,35 +18,6 @@ alias RawAllocEventExtra = Alys::Internal::AllocEventExtra alias RawReallocEventExtra = Alys::Internal::ReallocEventExtra alias RawFrame = Alys::Internal::Frame -@[Packed] -struct RawHeader - include ReadStructFromIO -end - -struct RawEvent - include ReadStructFromIO -end - -struct RawAllocEventExtra - include ReadStructFromIO -end - -struct RawReallocEventExtra - include ReadStructFromIO -end - -struct RawFrame - include ReadStructFromIO - - def self.read_all(io : IO) - loop do - frame = from_io io - return if frame.ip == 0 - yield frame - end - end -end - class ExecutableSymbol property name : String? property file : String? @@ -157,17 +107,17 @@ abstract class EventVisitor end class EventReader - def initialize(@source : IO, @symbolizer : Symbolizer?) + def initialize(@source : MessagePack::Unpacker, @symbolizer : Symbolizer?) end private def read_stack : StackTrace frames = [] of Tuple(RawFrame, ExecutableSymbol) - RawFrame.read_all(@source) do |frame| + while (frame = RawFrame?.new @source) symbol = ExecutableSymbol.new symbol.line = frame.line if frame.line != 0 - symbol.file = ReadUtils.string(@source, frame.file_len) if frame.file_len != 0 - symbol.name = ReadUtils.string(@source, frame.name_len) if frame.name_len != 0 + symbol.file = String.new frame.file + symbol.name = String.new frame.name if (!symbol.line || !symbol.file || !symbol.name) && (symbolizer = @symbolizer) if new_symbol = symbolizer.symbolize frame.ip @@ -187,8 +137,8 @@ class EventReader start_time = nil - header = RawHeader.from_io @source - raise "Bad file format" if String.new(header.name.to_slice) != "ALYS" + header = RawHeader.new @source + raise "Bad file format" if header.name != "ALYS" if header.version != PROTOCOL_VERSION raise "Incompatible protocol version #{header.version} (supported: #{PROTOCOL_VERSION})" @@ -196,8 +146,8 @@ class EventReader loop do begin - raw_event = RawEvent.from_io @source - rescue ex : IO::EOFError + raw_event = RawEvent.new @source + rescue ex : MessagePack::EofError break end @@ -210,7 +160,7 @@ class EventReader when EventKind::Alloc raise "Duplicate address: #{raw_event.addr}" if addrs_to_ids.includes? raw_event.addr - extra = RawAllocEventExtra.from_io @source + extra = RawAllocEventExtra.new @source addrs_to_ids[raw_event.addr] = info = AllocInfo.new id: next_id, addr: raw_event.addr, @@ -219,8 +169,8 @@ class EventReader visitor.visit_alloc raw_event, time_offset, info, read_stack when EventKind::Realloc - alloc_extra = RawAllocEventExtra.from_io @source - realloc_extra = RawReallocEventExtra.from_io @source + alloc_extra = RawAllocEventExtra.new @source + realloc_extra = RawReallocEventExtra.new @source prev_info = addrs_to_ids.delete realloc_extra.prev_addr override_kind = nil @@ -559,7 +509,8 @@ def run end event = File.open source do |source_file| - reader = EventReader.new source_file, symbolizer + source = MessagePack::IOUnpacker.new source_file + reader = EventReader.new source, symbolizer File.open dest, "w" do |dest_file| case format when Format::JSON -- 2.45.2