1
0
mirror of https://github.com/actix/actix-extras.git synced 2025-02-22 18:33:18 +01:00
2020-09-28 02:59:57 +01:00

461 lines
16 KiB
Rust
Executable File

#![allow(unused_assignments, unused_variables, unreachable_patterns)]
use std::u8;
use derive_more::From;
use bytes::{BufMut, Bytes, BytesMut};
use bytestring::ByteString;
use uuid::Uuid;
use super::*;
use crate::errors::AmqpParseError;
use crate::codec::{self, decode_format_code, decode_list_header, Decode, DecodeFormatted, Encode};
#[derive(Clone, Debug, PartialEq, From)]
pub enum Frame {
Open(Open),
Begin(Begin),
Attach(Attach),
Flow(Flow),
Transfer(Transfer),
Disposition(Disposition),
Detach(Detach),
End(End),
Close(Close),
Empty,
}
impl Frame {
pub fn name(&self) -> &'static str {
match self {
Frame::Open(_) => "Open",
Frame::Begin(_) => "Begin",
Frame::Attach(_) => "Attach",
Frame::Flow(_) => "Flow",
Frame::Transfer(_) => "Transfer",
Frame::Disposition(_) => "Disposition",
Frame::Detach(_) => "Detach",
Frame::End(_) => "End",
Frame::Close(_) => "Close",
Frame::Empty => "Empty",
}
}
}
impl Decode for Frame {
fn decode(input: &[u8]) -> Result<(&[u8], Self), AmqpParseError> {
if input.is_empty() {
Ok((input, Frame::Empty))
} else {
let (input, fmt) = decode_format_code(input)?;
validate_code!(fmt, codec::FORMATCODE_DESCRIBED);
let (input, descriptor) = Descriptor::decode(input)?;
match descriptor {
Descriptor::Ulong(16) => decode_open_inner(input).map(|(i, r)| (i, Frame::Open(r))),
Descriptor::Ulong(17) => {
decode_begin_inner(input).map(|(i, r)| (i, Frame::Begin(r)))
}
Descriptor::Ulong(18) => {
decode_attach_inner(input).map(|(i, r)| (i, Frame::Attach(r)))
}
Descriptor::Ulong(19) => decode_flow_inner(input).map(|(i, r)| (i, Frame::Flow(r))),
Descriptor::Ulong(20) => {
decode_transfer_inner(input).map(|(i, r)| (i, Frame::Transfer(r)))
}
Descriptor::Ulong(21) => {
decode_disposition_inner(input).map(|(i, r)| (i, Frame::Disposition(r)))
}
Descriptor::Ulong(22) => {
decode_detach_inner(input).map(|(i, r)| (i, Frame::Detach(r)))
}
Descriptor::Ulong(23) => decode_end_inner(input).map(|(i, r)| (i, Frame::End(r))),
Descriptor::Ulong(24) => {
decode_close_inner(input).map(|(i, r)| (i, Frame::Close(r)))
}
Descriptor::Symbol(ref a) if a.as_str() == "amqp:open:list" => {
decode_open_inner(input).map(|(i, r)| (i, Frame::Open(r)))
}
Descriptor::Symbol(ref a) if a.as_str() == "amqp:begin:list" => {
decode_begin_inner(input).map(|(i, r)| (i, Frame::Begin(r)))
}
Descriptor::Symbol(ref a) if a.as_str() == "amqp:attach:list" => {
decode_attach_inner(input).map(|(i, r)| (i, Frame::Attach(r)))
}
Descriptor::Symbol(ref a) if a.as_str() == "amqp:flow:list" => {
decode_flow_inner(input).map(|(i, r)| (i, Frame::Flow(r)))
}
Descriptor::Symbol(ref a) if a.as_str() == "amqp:transfer:list" => {
decode_transfer_inner(input).map(|(i, r)| (i, Frame::Transfer(r)))
}
Descriptor::Symbol(ref a) if a.as_str() == "amqp:disposition:list" => {
decode_disposition_inner(input).map(|(i, r)| (i, Frame::Disposition(r)))
}
Descriptor::Symbol(ref a) if a.as_str() == "amqp:detach:list" => {
decode_detach_inner(input).map(|(i, r)| (i, Frame::Detach(r)))
}
Descriptor::Symbol(ref a) if a.as_str() == "amqp:end:list" => {
decode_end_inner(input).map(|(i, r)| (i, Frame::End(r)))
}
Descriptor::Symbol(ref a) if a.as_str() == "amqp:close:list" => {
decode_close_inner(input).map(|(i, r)| (i, Frame::Close(r)))
}
_ => Err(AmqpParseError::InvalidDescriptor(descriptor)),
}
}
}
}
impl Encode for Frame {
fn encoded_size(&self) -> usize {
match *self {
Frame::Open(ref v) => encoded_size_open_inner(v),
Frame::Begin(ref v) => encoded_size_begin_inner(v),
Frame::Attach(ref v) => encoded_size_attach_inner(v),
Frame::Flow(ref v) => encoded_size_flow_inner(v),
Frame::Transfer(ref v) => encoded_size_transfer_inner(v),
Frame::Disposition(ref v) => encoded_size_disposition_inner(v),
Frame::Detach(ref v) => encoded_size_detach_inner(v),
Frame::End(ref v) => encoded_size_end_inner(v),
Frame::Close(ref v) => encoded_size_close_inner(v),
Frame::Empty => 0,
}
}
fn encode(&self, buf: &mut BytesMut) {
match *self {
Frame::Open(ref v) => encode_open_inner(v, buf),
Frame::Begin(ref v) => encode_begin_inner(v, buf),
Frame::Attach(ref v) => encode_attach_inner(v, buf),
Frame::Flow(ref v) => encode_flow_inner(v, buf),
Frame::Transfer(ref v) => encode_transfer_inner(v, buf),
Frame::Disposition(ref v) => encode_disposition_inner(v, buf),
Frame::Detach(ref v) => encode_detach_inner(v, buf),
Frame::End(ref v) => encode_end_inner(v, buf),
Frame::Close(ref v) => encode_close_inner(v, buf),
Frame::Empty => (),
}
}
}
{{#each defs.provides as |provide|}}
{{#if provide.described}}
#[derive(Clone, Debug, PartialEq)]
pub enum {{provide.name}} {
{{#each provide.options as |option|}}
{{option.ty}}({{option.ty}}),
{{/each}}
}
impl DecodeFormatted for {{provide.name}} {
fn decode_with_format(input: &[u8], fmt: u8) -> Result<(&[u8], Self), AmqpParseError> {
validate_code!(fmt, codec::FORMATCODE_DESCRIBED);
let (input, descriptor) = Descriptor::decode(input)?;
match descriptor {
{{#each provide.options as |option|}}
Descriptor::Ulong({{option.descriptor.code}}) => decode_{{snake option.ty}}_inner(input).map(|(i, r)| (i, {{provide.name}}::{{option.ty}}(r))),
{{/each}}
{{#each provide.options as |option|}}
Descriptor::Symbol(ref a) if a.as_str() == "{{option.descriptor.name}}" => decode_{{snake option.ty}}_inner(input).map(|(i, r)| (i, {{provide.name}}::{{option.ty}}(r))),
{{/each}}
_ => Err(AmqpParseError::InvalidDescriptor(descriptor))
}
}
}
impl Encode for {{provide.name}} {
fn encoded_size(&self) -> usize {
match *self {
{{#each provide.options as |option|}}
{{provide.name}}::{{option.ty}}(ref v) => encoded_size_{{snake option.ty}}_inner(v),
{{/each}}
}
}
fn encode(&self, buf: &mut BytesMut) {
match *self {
{{#each provide.options as |option|}}
{{provide.name}}::{{option.ty}}(ref v) => encode_{{snake option.ty}}_inner(v, buf),
{{/each}}
}
}
}
{{/if}}
{{/each}}
{{#each defs.aliases as |alias|}}
pub type {{alias.name}} = {{alias.source}};
{{/each}}
{{#each defs.enums as |enum|}}
#[derive(Clone, Copy, Debug, PartialEq)]
pub enum {{enum.name}} {
{{#each enum.items as |item|}}
{{item.name}},
{{/each}}
}
{{#if enum.is_symbol}}
impl {{enum.name}} {
pub fn try_from(v: &Symbol) -> Result<Self, AmqpParseError> {
match v.as_str() {
{{#each enum.items as |item|}}
"{{item.value}}" => Ok({{enum.name}}::{{item.name}}),
{{/each}}
_ => Err(AmqpParseError::UnknownEnumOption("{{enum.name}}"))
}
}
}
impl DecodeFormatted for {{enum.name}} {
fn decode_with_format(input: &[u8], fmt: u8) -> Result<(&[u8], Self), AmqpParseError> {
let (input, base) = Symbol::decode_with_format(input, fmt)?;
Ok((input, Self::try_from(&base)?))
}
}
impl Encode for {{enum.name}} {
fn encoded_size(&self) -> usize {
match *self {
{{#each enum.items as |item|}}
{{enum.name}}::{{item.name}} => {{item.value_len}} + 2,
{{/each}}
}
}
fn encode(&self, buf: &mut BytesMut) {
match *self {
{{#each enum.items as |item|}}
{{enum.name}}::{{item.name}} => StaticSymbol("{{item.value}}").encode(buf),
{{/each}}
}
}
}
{{else}}
impl {{enum.name}} {
pub fn try_from(v: {{enum.ty}}) -> Result<Self, AmqpParseError> {
match v {
{{#each enum.items as |item|}}
{{item.value}} => Ok({{enum.name}}::{{item.name}}),
{{/each}}
_ => Err(AmqpParseError::UnknownEnumOption("{{enum.name}}"))
}
}
}
impl DecodeFormatted for {{enum.name}} {
fn decode_with_format(input: &[u8], fmt: u8) -> Result<(&[u8], Self), AmqpParseError> {
let (input, base) = {{enum.ty}}::decode_with_format(input, fmt)?;
Ok((input, Self::try_from(base)?))
}
}
impl Encode for {{enum.name}} {
fn encoded_size(&self) -> usize {
match *self {
{{#each enum.items as |item|}}
{{enum.name}}::{{item.name}} => {
let v : {{enum.ty}} = {{item.value}};
v.encoded_size()
},
{{/each}}
}
}
fn encode(&self, buf: &mut BytesMut) {
match *self {
{{#each enum.items as |item|}}
{{enum.name}}::{{item.name}} => {
let v : {{enum.ty}} = {{item.value}};
v.encode(buf);
},
{{/each}}
}
}
}
{{/if}}
{{/each}}
{{#each defs.described_restricted as |dr|}}
type {{dr.name}} = {{dr.ty}};
fn decode_{{snake dr.name}}_inner(input: &[u8]) -> Result<(&[u8], {{dr.name}}), AmqpParseError> {
{{dr.name}}::decode(input)
}
fn encoded_size_{{snake dr.name}}_inner(dr: &{{dr.name}}) -> usize {
// descriptor size + actual size
3 + dr.encoded_size()
}
fn encode_{{snake dr.name}}_inner(dr: &{{dr.name}}, buf: &mut BytesMut) {
Descriptor::Ulong({{dr.descriptor.code}}).encode(buf);
dr.encode(buf);
}
{{/each}}
{{#each defs.lists as |list|}}
#[derive(Clone, Debug, PartialEq)]
pub struct {{list.name}} {
{{#each list.fields as |field|}}
{{#if field.optional}}
pub {{field.name}}: Option<{{{field.ty}}}>,
{{else}}
pub {{field.name}}: {{{field.ty}}},
{{/if}}
{{/each}}
{{#if list.transfer}}
pub body: Option<TransferBody>,
{{/if}}
}
impl {{list.name}} {
{{#each list.fields as |field|}}
{{#if field.is_str}}
{{#if field.optional}}
pub fn {{field.name}}(&self) -> Option<&str> {
match self.{{field.name}} {
None => None,
Some(ref s) => Some(s.as_str())
}
}
{{else}}
pub fn {{field.name}}(&self) -> &str { self.{{field.name}}.as_str() }
{{/if}}
{{else}}
{{#if field.is_ref}}
{{#if field.optional}}
pub fn {{field.name}}(&self) -> Option<&{{{field.ty}}}> { self.{{field.name}}.as_ref() }
{{else}}
pub fn {{field.name}}(&self) -> &{{{field.ty}}} { &self.{{field.name}} }
{{/if}}
{{else}}
{{#if field.optional}}
pub fn {{field.name}}(&self) -> Option<{{{field.ty}}}> { self.{{field.name}} }
{{else}}
pub fn {{field.name}}(&self) -> {{{field.ty}}} { self.{{field.name}} }
{{/if}}
{{/if}}
{{/if}}
{{/each}}
{{#if list.transfer}}
pub fn body(&self) -> Option<&TransferBody> {
self.body.as_ref()
}
{{/if}}
#[allow(clippy::identity_op)]
const FIELD_COUNT: usize = 0 {{#each list.fields as |field|}} + 1{{/each}};
}
#[allow(unused_mut)]
fn decode_{{snake list.name}}_inner(input: &[u8]) -> Result<(&[u8], {{list.name}}), AmqpParseError> {
let (input, format) = decode_format_code(input)?;
let (input, header) = decode_list_header(input, format)?;
let size = header.size as usize;
decode_check_len!(input, size);
{{#if list.fields}}
let (mut input, mut remainder) = input.split_at(size);
let mut count = header.count;
{{#each list.fields as |field|}}
{{#if field.optional}}
let {{field.name}}: Option<{{{field.ty}}}>;
if count > 0 {
let decoded = Option::<{{{field.ty}}}>::decode(input)?;
input = decoded.0;
{{field.name}} = decoded.1;
count -= 1;
}
else {
{{field.name}} = None;
}
{{else}}
let {{field.name}}: {{{field.ty}}};
if count > 0 {
{{#if field.default}}
let (in1, decoded) = Option::<{{{field.ty}}}>::decode(input)?;
{{field.name}} = decoded.unwrap_or({{field.default}});
{{else}}
let (in1, decoded) = {{{field.ty}}}::decode(input)?;
{{field.name}} = decoded;
{{/if}}
input = in1;
count -= 1;
}
else {
{{#if field.default}}
{{field.name}} = {{field.default}};
{{else}}
return Err(AmqpParseError::RequiredFieldOmitted("{{field.name}}"))
{{/if}}
}
{{/if}}
{{/each}}
{{else}}
let mut remainder = &input[size..];
{{/if}}
{{#if list.transfer}}
let body = if remainder.is_empty() {
None
} else {
let b = Bytes::copy_from_slice(remainder);
remainder = &[];
Some(b.into())
};
{{/if}}
Ok((remainder, {{list.name}} {
{{#each list.fields as |field|}}
{{field.name}},
{{/each}}
{{#if list.transfer}}
body
{{/if}}
}))
}
fn encoded_size_{{snake list.name}}_inner(list: &{{list.name}}) -> usize {
#[allow(clippy::identity_op)]
let content_size = 0 {{#each list.fields as |field|}} + list.{{field.name}}.encoded_size(){{/each}};
// header: 0x00 0x53 <descriptor code> format_code size count
(if content_size + 1 > u8::MAX as usize { 12 } else { 6 })
+ content_size
{{#if list.transfer}}
+ list.body.as_ref().map(|b| b.len()).unwrap_or(0)
{{/if}}
}
fn encode_{{snake list.name}}_inner(list: &{{list.name}}, buf: &mut BytesMut) {
Descriptor::Ulong({{list.descriptor.code}}).encode(buf);
#[allow(clippy::identity_op)]
let content_size = 0 {{#each list.fields as |field|}} + list.{{field.name}}.encoded_size(){{/each}};
if content_size + 1 > u8::MAX as usize {
buf.put_u8(codec::FORMATCODE_LIST32);
buf.put_u32((content_size + 4) as u32); // +4 for 4 byte count
buf.put_u32({{list.name}}::FIELD_COUNT as u32);
}
else {
buf.put_u8(codec::FORMATCODE_LIST8);
buf.put_u8((content_size + 1) as u8);
buf.put_u8({{list.name}}::FIELD_COUNT as u8);
}
{{#each list.fields as |field|}}
list.{{field.name}}.encode(buf);
{{/each}}
{{#if list.transfer}}
if let Some(ref body) = list.body {
body.encode(buf)
}
{{/if}}
}
impl DecodeFormatted for {{list.name}} {
fn decode_with_format(input: &[u8], fmt: u8) -> Result<(&[u8], Self), AmqpParseError> {
validate_code!(fmt, codec::FORMATCODE_DESCRIBED);
let (input, descriptor) = Descriptor::decode(input)?;
let is_match = match descriptor {
Descriptor::Ulong(val) => val == {{list.descriptor.code}},
Descriptor::Symbol(ref sym) => sym.as_bytes() == b"{{list.descriptor.name}}",
};
if !is_match {
Err(AmqpParseError::InvalidDescriptor(descriptor))
} else {
decode_{{snake list.name}}_inner(input)
}
}
}
impl Encode for {{list.name}} {
fn encoded_size(&self) -> usize { encoded_size_{{snake list.name}}_inner(self) }
fn encode(&self, buf: &mut BytesMut) { encode_{{snake list.name}}_inner(self, buf) }
}
{{/each}}