~nickbp/originz

ref: d94f181c5fbfdf0f62b40ea46d3f301f4ec681e5 originz/src/codec/decoder.rs -rw-r--r-- 7.7 KiB
d94f181cNick Parker Backport current benchmark to older code 1 year, 8 months ago
                                                                                
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
#![deny(warnings)]

use anyhow::{bail, Context, Result};

use crate::codec::message;
use crate::fbs::dns_message_generated::{Header, Message, MessageArgs, Question, Resource, OPT};

/// Decodes a wire DNS message, turning it into a parsed `Message` object.
pub struct DNSMessageDecoder<'a> {
    /// The current offset in buf. We do this manually in order to avoid changes to buf's offsets,
    /// as required to support label compression.
    offset: usize,

    /// The expected count of questions, answers, authorities, and additionals to be parsed.
    expected_counts: message::RecordCounts,

    header: Option<flatbuffers::WIPOffset<Header<'a>>>,
    opt: Option<flatbuffers::WIPOffset<OPT<'a>>>,
    question: Vec<flatbuffers::WIPOffset<Question<'a>>>,
    answer: Vec<flatbuffers::WIPOffset<Resource<'a>>>,
    authority: Vec<flatbuffers::WIPOffset<Resource<'a>>>,
    additional: Vec<flatbuffers::WIPOffset<Resource<'a>>>,
}

impl<'a> DNSMessageDecoder<'a> {
    /// Returns a `DNSMessageDecoder` for decoding DNS messages.
    pub fn new() -> DNSMessageDecoder<'a> {
        DNSMessageDecoder {
            offset: 0,
            expected_counts: message::RecordCounts::new(),
            header: None,
            opt: None,
            question: Vec::new(),
            answer: Vec::new(),
            authority: Vec::new(),
            additional: Vec::new(),
        }
    }

    /// Decodes data from the provided `BytesMut`.
    /// - Returns `Ok(true)` if the data has been parsed successfully and is available in `fb_builder`.
    /// - Returns `Ok(false)` if more data is needed via additional calls to `decode()`.
    pub fn decode(
        &mut self,
        buf: &[u8],
        fb_builder: &mut flatbuffers::FlatBufferBuilder<'a>,
    ) -> Result<bool> {
        // Note that we avoid calling split_* against buf and instead keep track of our read offset locally.
        // This is because buf needs to be 'untainted' by any offset shifts in case label compression is being used.

        if !self.header.is_some() {
            // Haven't consumed a header yet, try for that first.
            match message::read_header(fb_builder, buf, &mut self.offset)? {
                Some((header, record_counts, truncated)) => {
                    if truncated {
                        // Header content says message is truncated.
                        // Don't bother waiting for more data because it won't be coming.
                        // Upstream should check for header.truncated in the result.
                        self.flush_result(fb_builder);
                        return Ok(false);
                    } else {
                        self.expected_counts = record_counts;
                        self.header = Some(header);
                    }
                }
                None => return Ok(false),
            }
        }

        while self.question.len() < self.expected_counts.question as usize {
            // Expecting more questions, try to get them
            match message::read_question(fb_builder, buf, &mut self.offset)
                .context("Failed to read question resource")?
            {
                Some(question) => {
                    self.question.push(question);
                }
                None => return Ok(false),
            }
        }

        while self.answer.len() < self.expected_counts.answer as usize {
            // Expecting more answer resources, try to get them
            match message::read_resource_non_opt(fb_builder, buf, &mut self.offset)
                .context("Failed to read answer resource")?
            {
                Some(answer) => {
                    self.answer.push(answer);
                }
                None => return Ok(false),
            }
        }

        while self.authority.len() < self.expected_counts.authority as usize {
            // Expecting more authority resources, try to get them
            match message::read_resource_non_opt(fb_builder, buf, &mut self.offset)
                .context("Failed to read authority resource")?
            {
                Some(authority) => {
                    self.authority.push(authority);
                }
                None => return Ok(false),
            }
        }

        // Ensure any OPT record is counted against the expected number of additional records
        let mut opt_count = match self.opt {
            Some(_opt) => 1,
            None => 0,
        };
        while self.additional.len() + opt_count < self.expected_counts.additional as usize {
            // Expecting more additional resources, try to get them
            // Use special handling for OPT resources, when they are encountered
            match message::read_resource_name_type(buf, &mut self.offset)
                .context("Failed to read additional resource name and type")?
            {
                Some((name, message::OPT_RESOURCE_TYPE)) => {
                    if name != "." {
                        bail!("Expected OPT resource to have name '.', but was: {}", name);
                    }
                    if opt_count != 0 {
                        bail!("Message has multiple OPT resources");
                    }
                    match message::read_resource_remainder_opt(fb_builder, buf, &mut self.offset) {
                        Ok(Some(opt)) => {
                            self.opt = Some(opt);
                            opt_count += 1;
                        }
                        Ok(None) => return Ok(false),
                        Err(e) => return Err(e),
                    }
                }
                Some((name, resource_type)) => {
                    match message::read_resource_remainder(
                        fb_builder,
                        buf,
                        &mut self.offset,
                        name,
                        resource_type,
                    ) {
                        Ok(Some(additional)) => self.additional.push(additional),
                        Ok(None) => return Ok(false),
                        Err(e) => return Err(e),
                    }
                }
                None => return Ok(false),
            }
        }

        self.flush_result(fb_builder);

        Ok(true)
    }

    /// Writes the accumulated content to fb_builder and clears local state.
    fn flush_result(&mut self, fb_builder: &mut flatbuffers::FlatBufferBuilder<'a>) {
        let question = match self.question.is_empty() {
            true => None,
            false => Some(fb_builder.create_vector(self.question.as_slice())),
        };
        let answer = match self.answer.is_empty() {
            true => None,
            false => Some(fb_builder.create_vector(self.answer.as_slice())),
        };
        let authority = match self.authority.is_empty() {
            true => None,
            false => Some(fb_builder.create_vector(self.authority.as_slice())),
        };
        let additional = match self.additional.is_empty() {
            true => None,
            false => Some(fb_builder.create_vector(self.additional.as_slice())),
        };

        let message_offset = Message::create(
            fb_builder,
            &MessageArgs {
                header: self.header,
                opt: self.opt,
                question,
                answer,
                authority,
                additional,
            },
        );
        fb_builder.finish_minimal(message_offset);

        self.clear();
    }

    /// Clears local state. This must be called before reusing the decoder for a new message.
    fn clear(&mut self) {
        self.offset = 0;
        self.expected_counts = message::RecordCounts::new();
        self.header = None;
        self.question.clear();
        self.answer.clear();
        self.authority.clear();
        self.additional.clear();
    }
}