1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
//! Fixed-length protocol.

use tokio_core::io::{Codec, Io, EasyBuf, Framed};
use tokio_proto::pipeline::{ServerProto, ClientProto};
use std::io;

/// A protocol such that frames are continuous and have the same specified length.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct FixedLengthProto {
    pub length: usize,
}

impl FixedLengthProto {
    pub fn new(length: usize) -> FixedLengthProto {
        FixedLengthProto { length: length }
    }

    fn codec(&self) -> FixedLengthCodec {
        FixedLengthCodec { length: self.length }
    }
}

impl<T: Io + 'static> ServerProto<T> for FixedLengthProto {
    type Request = Vec<u8>;
    type Response = Vec<u8>;
    type Error = io::Error;
    type Transport = Framed<T, FixedLengthCodec>;
    type BindTransport = io::Result<Self::Transport>;

    fn bind_transport(&self, io: T) -> Self::BindTransport {
        Ok(io.framed(self.codec()))
    }
}

impl<T: Io + 'static> ClientProto<T> for FixedLengthProto {
    type Request = Vec<u8>;
    type Response = Vec<u8>;
    type Error = io::Error;
    type Transport = Framed<T, FixedLengthCodec>;
    type BindTransport = io::Result<Self::Transport>;

    fn bind_transport(&self, io: T) -> Self::BindTransport {
        Ok(io.framed(self.codec()))
    }
}

/// Protocol codec used by [`FixedLengthProto`](./struct.FixedLengthProto.html).
#[derive(Debug, Clone)]
pub struct FixedLengthCodec {
    length: usize,
}

impl FixedLengthCodec {
    pub fn new(length: usize) -> FixedLengthCodec {
        FixedLengthCodec { length: length }
    }

    pub fn length(&self) -> usize {
        self.length
    }
}

impl Codec for FixedLengthCodec {
    type In = Vec<u8>;
    type Out = Vec<u8>;

    #[inline]
    fn decode(&mut self, buf: &mut EasyBuf) -> io::Result<Option<Vec<u8>>> {
        Ok(if buf.len() >= self.length {
            let bs = buf.drain_to(self.length);
            Some(bs.as_slice().to_vec())
        } else {
            None
        })
    }

    #[inline]
    fn encode(&mut self, item: Vec<u8>, buf: &mut Vec<u8>) -> io::Result<()> {
        assert_eq!(item.len(), self.length);
        buf.extend_from_slice(&item);
        Ok(())
    }
}

#[test]
fn test_fixed_length() {
    let mut p = FixedLengthCodec { length: 5 };

    let mut buf = EasyBuf::new();
    buf.get_mut().extend_from_slice(b"abcdefghijkl");

    assert_eq!(p.decode(&mut buf).unwrap(), Some(b"abcde".to_vec()));
    assert_eq!(p.decode(&mut buf).unwrap(), Some(b"fghij".to_vec()));
    assert!(p.decode(&mut buf).unwrap().is_none());

    buf.get_mut().extend_from_slice(b"mno");

    assert_eq!(p.decode(&mut buf).unwrap(), Some(b"klmno".to_vec()));
}