1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
use tokio_core::io::{EasyBuf, Codec, Io, Framed};
use tokio_proto::multiplex::{self, RequestId};
use byteorder::{self, ByteOrder};
use std::marker::PhantomData;
use std::io;
const SIZE_OF_REQID: usize = 8;
#[derive(Debug, Default, Clone)]
pub struct RequestIdFieldProto<B, C> {
base: C,
_byteorder: PhantomData<B>,
}
impl<B, C> RequestIdFieldProto<B, C> where C: Codec + Clone {
pub fn new(base: C) -> Self {
RequestIdFieldProto {
base: base,
_byteorder: PhantomData,
}
}
}
impl<B, C, T> multiplex::ClientProto<T> for RequestIdFieldProto<B, C>
where C: Codec + Clone + 'static,
B: byteorder::ByteOrder + 'static,
T: Io + 'static
{
type Request = C::Out;
type Response = C::In;
type Error = io::Error;
type Transport = Framed<T, RequestIdFieldCodec<B, C>>;
type BindTransport = io::Result<Self::Transport>;
fn bind_transport(&self, io: T) -> Self::BindTransport {
Ok(io.framed(RequestIdFieldCodec::<B, C>::new(self.base.clone())))
}
}
impl<B, C, T> multiplex::ServerProto<T> for RequestIdFieldProto<B, C>
where C: Codec + Clone + 'static,
B: byteorder::ByteOrder + 'static,
T: Io + 'static
{
type Request = C::In;
type Response = C::Out;
type Error = io::Error;
type Transport = Framed<T, RequestIdFieldCodec<B, C>>;
type BindTransport = io::Result<Self::Transport>;
fn bind_transport(&self, io: T) -> Self::BindTransport {
Ok(io.framed(RequestIdFieldCodec::<B, C>::new(self.base.clone())))
}
}
#[derive(Debug, Clone, Default)]
pub struct RequestIdFieldCodec<B, C> {
base: C,
reqid: Option<RequestId>,
_byteorder: PhantomData<B>,
}
impl<B, C> RequestIdFieldCodec<B, C> {
pub fn new(base: C) -> Self {
RequestIdFieldCodec {
base: base,
reqid: None,
_byteorder: PhantomData,
}
}
}
impl<B, C> Codec for RequestIdFieldCodec<B, C>
where B: ByteOrder, C: Codec
{
type In = (RequestId, C::In);
type Out = (RequestId, C::Out);
fn decode(&mut self, buf: &mut EasyBuf) -> io::Result<Option<(RequestId, C::In)>> {
let reqid = if let Some(id) = self.reqid.take() {
id
} else {
if buf.len() < SIZE_OF_REQID {
return Ok(None);
}
B::read_u64(buf.drain_to(SIZE_OF_REQID).as_slice())
};
match self.base.decode(buf) {
Ok(Some(msg)) => Ok(Some((reqid, msg))),
Ok(None) => {
self.reqid = Some(reqid);
Ok(None)
}
Err(e) => Err(e),
}
}
fn encode(&mut self, (reqid, msg): (RequestId, C::Out), buf: &mut Vec<u8>) -> io::Result<()> {
let mut arr = [0u8; SIZE_OF_REQID];
B::write_u64(&mut arr, reqid);
buf.extend_from_slice(&arr[..]);
self.base.encode(msg, buf)
}
}