Skip to content

Commit 1e7b9a9

Browse files
committed
replace class Connection with own implementation
1 parent 65601eb commit 1e7b9a9

File tree

3 files changed

+158
-1
lines changed

3 files changed

+158
-1
lines changed

src/bin/server.rs

+2-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
use bytes::Bytes;
22
use dashmap::DashMap;
3-
use mini_redis::{Connection, Frame};
3+
use mini_redis::Frame;
4+
use redis_rs::connection::Connection;
45
use std::sync::Arc;
56
use tokio::net::{TcpListener, TcpStream};
67
use tracing::{debug, info, instrument};

src/connection.rs

+151
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,151 @@
1+
use mini_redis::frame::Error::Incomplete;
2+
use mini_redis::Frame;
3+
use crate::Result;
4+
use std::io::Cursor;
5+
use tokio::io::{self, AsyncReadExt, AsyncWriteExt, BufWriter};
6+
use tokio::net::TcpStream;
7+
use bytes::{BytesMut, Buf};
8+
9+
pub struct Connection {
10+
stream: BufWriter<TcpStream>, // use BufWriter to optimize write performance
11+
// buffer: Vec<u8>,
12+
// cursor: usize,
13+
buffer: BytesMut,
14+
}
15+
16+
impl Connection {
17+
/// create a new connection
18+
pub fn new(stream: TcpStream) -> Connection {
19+
Connection {
20+
stream: BufWriter::new(stream),
21+
// buffer: vec![0; 4 * 1024],
22+
// cursor: 0,
23+
buffer: BytesMut::with_capacity(4 * 1024),
24+
}
25+
}
26+
27+
/// read
28+
/// return Some(Frame) if a frame is parsed
29+
/// or None if the connection is closed
30+
pub async fn read_frame(&mut self) -> Result<Option<Frame>> {
31+
loop {
32+
// Attempt to parse a frame from the buffered data.
33+
// If enough data has been buffered, the frame is returned.
34+
if let Some(frame) = self.parse_frame()? {
35+
return Ok(Some(frame));
36+
}
37+
38+
// Dead code: use vector and cursor instead of BytesMut
39+
// // make sure there is capacity to read more data
40+
// // grow the buffer if not
41+
// if self.buffer.len() == self.cursor {
42+
// self.buffer.resize(self.cursor * 2, 0);
43+
// }
44+
45+
// // read data from the socket
46+
// // and write it to the buffer starting at the current cursor
47+
// let n = self.stream.read(&mut self.buffer[self.cursor..]).await?;
48+
49+
// if 0 == n {
50+
// // the remote closed the connection
51+
// if self.cursor == 0 {
52+
// return Ok(None);
53+
// } else {
54+
// return Err("connection reset by peer".into());
55+
// }
56+
// } else {
57+
// self.cursor += n;
58+
// }
59+
60+
if 0 == self.stream.read_buf(&mut self.buffer).await? {
61+
if self.buffer.is_empty() {
62+
return Ok(None);
63+
} else {
64+
return Err("connection reset by peer".into());
65+
}
66+
}
67+
}
68+
}
69+
70+
/// write frame into connection
71+
pub async fn write_frame(&mut self, frame: &Frame) -> io::Result<()> {
72+
match frame {
73+
Frame::Array(frames) => {
74+
self.stream.write_all(b"*").await?;
75+
self.stream
76+
.write_all(frames.len().to_string().as_bytes())
77+
.await?;
78+
self.stream.write_all(b"\r\n").await?;
79+
for frame in frames {
80+
self.write_single_frame(frame).await?;
81+
}
82+
}
83+
_ => self.write_single_frame(frame).await?,
84+
}
85+
86+
self.stream.flush().await
87+
}
88+
89+
/// write single frame into connection
90+
async fn write_single_frame(&mut self, frame: &Frame) -> io::Result<()> {
91+
match frame {
92+
Frame::Simple(s) => {
93+
self.stream.write_all(b"+").await?;
94+
self.stream.write_all(s.as_bytes()).await?;
95+
self.stream.write_all(b"\r\n").await?;
96+
}
97+
Frame::Error(e) => {
98+
self.stream.write_all(b"-").await?;
99+
self.stream.write_all(e.as_bytes()).await?;
100+
self.stream.write_all(b"\r\n").await?;
101+
}
102+
Frame::Integer(i) => {
103+
self.stream.write_all(b":").await?;
104+
self.stream.write_all(i.to_string().as_bytes()).await?;
105+
self.stream.write_all(b"\r\n").await?;
106+
}
107+
Frame::Null => {
108+
self.stream.write_all(b"$-1\r\n").await?;
109+
}
110+
Frame::Bulk(data) => {
111+
self.stream.write_all(b"$").await?;
112+
self.stream
113+
.write_all(data.len().to_string().as_bytes())
114+
.await?;
115+
self.stream.write_all(b"\r\n").await?;
116+
self.stream.write_all(data).await?;
117+
self.stream.write_all(b"\r\n").await?;
118+
}
119+
_ => {
120+
unreachable!();
121+
}
122+
}
123+
124+
Ok(())
125+
}
126+
127+
/// parse frame from buffer
128+
/// return Some(Frame) if a frame is parsed
129+
/// or None if the buffer does not contain a complete frame
130+
fn parse_frame(&mut self) -> Result<Option<Frame>> {
131+
// Create a new Parser with the buffer
132+
let mut buf = Cursor::new(&self.buffer[..]);
133+
134+
// check if a frame can be parsed
135+
match Frame::check(&mut buf) {
136+
Ok(_) => {
137+
// Update the cursor based on how many bytes were parsed
138+
let len = buf.position() as usize;
139+
buf.set_position(0);
140+
141+
let frame = Frame::parse(&mut buf)?;
142+
// discard the frame from the buffer
143+
// self.cursor -= len;
144+
self.buffer.advance(len);
145+
Ok(Some(frame))
146+
}
147+
Err(Incomplete) => Ok(None),
148+
Err(e) => Err(e.into()),
149+
}
150+
}
151+
}

src/lib.rs

+5
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pub mod connection;
2+
3+
// refer to module mini_redis
4+
pub type Error = Box<dyn std::error::Error + Send + Sync>;
5+
pub type Result<T> = std::result::Result<T, Error>;

0 commit comments

Comments
 (0)