diff --git a/tokio-postgres/src/client.rs b/tokio-postgres/src/client.rs index eab65d30a..2d70734a3 100644 --- a/tokio-postgres/src/client.rs +++ b/tokio-postgres/src/client.rs @@ -44,8 +44,8 @@ pub struct Responses { } pub struct CopyBothHandles { - pub(crate) stream_receiver: mpsc::Receiver>, - pub(crate) sink_sender: mpsc::Sender, + pub(crate) stream_receiver: tokio::sync::mpsc::Receiver>, + pub(crate) sink_sender: tokio::sync::mpsc::Sender, } impl Responses { @@ -124,8 +124,8 @@ impl InnerClient { pub fn start_copy_both(&self) -> Result { let (sender, receiver) = mpsc::channel(16); - let (stream_sender, stream_receiver) = mpsc::channel(16); - let (sink_sender, sink_receiver) = mpsc::channel(16); + let (stream_sender, stream_receiver) = tokio::sync::mpsc::channel(16); + let (sink_sender, sink_receiver) = tokio::sync::mpsc::channel(16); let responses = Responses { receiver, diff --git a/tokio-postgres/src/copy_both.rs b/tokio-postgres/src/copy_both.rs index e8c2103b0..ee110bfe0 100644 --- a/tokio-postgres/src/copy_both.rs +++ b/tokio-postgres/src/copy_both.rs @@ -2,7 +2,6 @@ use crate::client::{InnerClient, Responses}; use crate::codec::FrontendMessage; use crate::{simple_query, Error}; use bytes::{Buf, BufMut, Bytes, BytesMut}; -use futures_channel::mpsc; use futures_util::{ready, Sink, SinkExt, Stream, StreamExt}; use log::debug; use pin_project_lite::pin_project; @@ -12,6 +11,8 @@ use postgres_protocol::message::frontend::CopyData; use std::marker::{PhantomData, PhantomPinned}; use std::pin::Pin; use std::task::{Context, Poll}; +use tokio::sync::mpsc; +use tokio_util::sync::PollSender; /// The state machine of CopyBothReceiver /// @@ -70,7 +71,7 @@ pub struct CopyBothReceiver { /// Receiver of frontend messages sent by the user using sink_receiver: mpsc::Receiver, /// Sender of CopyData contents to be consumed by the user using - stream_sender: mpsc::Sender>, + stream_sender: PollSender>, /// The current state of the subprotocol state: CopyBothState, /// Holds a buffered message until we are ready to send it to the user's stream @@ -86,7 +87,7 @@ impl CopyBothReceiver { CopyBothReceiver { responses, sink_receiver, - stream_sender, + stream_sender: PollSender::new(stream_sender), state: CopyBothState::Setup, buffered_message: None, } @@ -108,10 +109,10 @@ impl CopyBothReceiver { // Deliver the buffered message (if any) to the user to ensure we can potentially // buffer a new one in response to a server message if let Some(message) = self.buffered_message.take() { - match self.stream_sender.poll_ready(cx) { + match self.stream_sender.poll_ready_unpin(cx) { Poll::Ready(_) => { // If the receiver has hung up we'll just drop the message - let _ = self.stream_sender.start_send(message); + let _ = self.stream_sender.start_send_unpin(message); } Poll::Pending => { // Stash the message and try again later @@ -147,7 +148,7 @@ impl CopyBothReceiver { match self.state { CopyNone => self.state = CopyComplete, CopyComplete => { - self.stream_sender.close_channel(); + self.stream_sender.close(); self.sink_receiver.close(); self.state = CommandComplete; } @@ -168,7 +169,7 @@ impl CopyBothReceiver { Some(Ok(Message::ReadyForQuery(_))) => match self.state { CommandComplete => { self.sink_receiver.close(); - self.stream_sender.close_channel(); + self.stream_sender.close(); } _ => self.unexpected_message(), }, @@ -190,7 +191,7 @@ impl Stream for CopyBothReceiver { match self.poll_backend(cx) { Poll::Ready(()) => Poll::Ready(None), Poll::Pending => match self.state { - Setup | CopyBoth | CopyIn => match ready!(self.sink_receiver.poll_next_unpin(cx)) { + Setup | CopyBoth | CopyIn => match ready!(self.sink_receiver.poll_recv(cx)) { Some(msg) => Poll::Ready(Some(msg)), None => match self.state { // The user has cancelled their interest to this CopyBoth query but we're @@ -252,9 +253,7 @@ pin_project! { /// } /// ``` pub struct CopyBothDuplex { - #[pin] - sink_sender: mpsc::Sender, - #[pin] + sink_sender: PollSender, stream_receiver: mpsc::Receiver>, buf: BytesMut, #[pin] @@ -267,7 +266,7 @@ impl Stream for CopyBothDuplex { type Item = Result; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - Poll::Ready(match ready!(self.project().stream_receiver.poll_next(cx)) { + Poll::Ready(match ready!(self.project().stream_receiver.poll_recv(cx)) { Some(Ok(Message::CopyData(body))) => Some(Ok(body.into_bytes())), Some(Ok(_)) => Some(Err(Error::unexpected_message())), Some(Err(err)) => Some(Err(err)), @@ -285,7 +284,7 @@ where fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { self.project() .sink_sender - .poll_ready(cx) + .poll_ready_unpin(cx) .map_err(|_| Error::closed()) } @@ -309,30 +308,28 @@ where let data = CopyData::new(data).map_err(Error::encode)?; this.sink_sender - .start_send(FrontendMessage::CopyData(data)) + .start_send_unpin(FrontendMessage::CopyData(data)) .map_err(|_| Error::closed()) } fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - let mut this = self.project(); + let this = self.project(); if !this.buf.is_empty() { - ready!(this.sink_sender.as_mut().poll_ready(cx)).map_err(|_| Error::closed())?; + ready!(this.sink_sender.poll_ready_unpin(cx)).map_err(|_| Error::closed())?; let data: Box = Box::new(this.buf.split().freeze()); let data = CopyData::new(data).map_err(Error::encode)?; this.sink_sender - .as_mut() - .start_send(FrontendMessage::CopyData(data)) + .start_send_unpin(FrontendMessage::CopyData(data)) .map_err(|_| Error::closed())?; } - - this.sink_sender.poll_flush(cx).map_err(|_| Error::closed()) + Poll::Ready(Ok(())) } fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { ready!(self.as_mut().poll_flush(cx))?; - let mut this = self.as_mut().project(); - this.sink_sender.disconnect(); + let this = self.as_mut().project(); + this.sink_sender.close(); Poll::Ready(Ok(())) } } @@ -356,14 +353,14 @@ where .await .map_err(|_| Error::closed())?; - match handles.stream_receiver.next().await.transpose()? { + match handles.stream_receiver.recv().await.transpose()? { Some(Message::CopyBothResponse(_)) => {} _ => return Err(Error::unexpected_message()), } Ok(CopyBothDuplex { stream_receiver: handles.stream_receiver, - sink_sender: handles.sink_sender, + sink_sender: PollSender::new(handles.sink_sender), buf: BytesMut::new(), _p: PhantomPinned, _p2: PhantomData,