sip2/
connection.rs

1use super::error::Error;
2use super::spec;
3use super::Message;
4use deunicode::deunicode;
5use std::fmt;
6use std::io::prelude::*;
7use std::net::{Shutdown, TcpStream};
8use std::str;
9use std::time::Duration;
10
11// Read data from the socket in chunks this size.
12const READ_BUFSIZE: usize = 256;
13
14/// Manages a TCP connection to a SIP server and handles message sending
15/// and receiving.
16pub struct Connection {
17    tcp_stream: TcpStream,
18
19    // If set, non-ASCII chars are removed from outbound messages.
20    ascii: bool,
21
22    log_prefix: Option<String>,
23}
24
25impl fmt::Display for Connection {
26    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
27        if let Some(log_prefix) = self.log_prefix.as_ref() {
28            write!(f, "{log_prefix} ")
29        } else {
30            write!(f, "")
31        }
32    }
33}
34
35impl Connection {
36    /// Creates a new SIP client and opens the TCP connection to the server
37    ///
38    /// * `sip_host` - SIP server host/ip and port
39    /// * E.g. "127.0.0.1:6001"
40    ///
41    /// ```
42    /// use sip2::Connection;
43    /// assert_eq!(Connection::new("JUNK0+..-*z$@").is_err(), true);
44    /// ```
45    pub fn new(sip_host: &str) -> Result<Self, Error> {
46        log::debug!("Connection::new() connecting to: {}", sip_host);
47
48        match TcpStream::connect(sip_host) {
49            Ok(stream) => Ok(Connection {
50                tcp_stream: stream,
51                ascii: false,
52                log_prefix: None,
53            }),
54            Err(s) => {
55                log::error!("Connection::new() failed: {s}");
56                Err(Error::NetworkError(s.to_string()))
57            }
58        }
59    }
60
61    /// Create a new SIP connection from an existing TCP stream.
62    pub fn from_stream(tcp_stream: TcpStream) -> Self {
63        Connection {
64            ascii: false,
65            tcp_stream,
66            log_prefix: None,
67        }
68    }
69
70    /// Add a string that will be prepended to all log:: calls where
71    /// a self exists.
72    pub fn set_log_prefix(&mut self, prefix: impl Into<String>) {
73        self.log_prefix = Some(prefix.into());
74    }
75
76    /// Set the ascii flag
77    pub fn set_ascii(&mut self, ascii: bool) {
78        self.ascii = ascii;
79    }
80
81    /// Shutdown the TCP connection with the SIP server.
82    pub fn disconnect(&self) -> Result<(), Error> {
83        log::debug!("{self}Connection::disconnect()");
84
85        match self.tcp_stream.shutdown(Shutdown::Both) {
86            Ok(_) => Ok(()),
87            Err(s) => {
88                // Disconnect will fail if the other end already disconnected.
89                log::debug!("{self}disconnect() failed: {s}");
90                Err(Error::NetworkError(s.to_string()))
91            }
92        }
93    }
94
95    /// Send a SIP message
96    pub fn send(&mut self, msg: &Message) -> Result<(), Error> {
97        let mut msg_sip = msg.to_sip();
98
99        if self.ascii {
100            // https://crates.io/crates/deunicode
101            // "Some transliterations do produce \n characters."
102            msg_sip = deunicode(&msg_sip).replace('\n', "");
103        }
104
105        // No need to redact here since SIP replies do not include passwords.
106        log::info!("{self}OUTBOUND: {}", msg_sip);
107
108        msg_sip.push(spec::LINE_TERMINATOR);
109
110        match self.tcp_stream.write_all(msg_sip.as_bytes()) {
111            Ok(_) => Ok(()),
112            Err(s) => {
113                log::error!("{self}send() failed: {}", s);
114                Err(Error::NetworkError(s.to_string()))
115            }
116        }
117    }
118
119    /// Send a message with a write timeout.
120    ///
121    /// Returns Err() if the send/write times out.  Clears the TCP socket
122    /// timeout upon completion.
123    pub fn send_with_timeout(&mut self, msg: &Message, timeout: u64) -> Result<(), Error> {
124        let time = Some(Duration::from_secs(timeout));
125
126        if let Err(e) = self.tcp_stream.set_write_timeout(time) {
127            log::error!("{self}Cannot set TCP write timeout: timeout={timeout} {e}");
128            return Err(Error::NetworkError(e.to_string()));
129        }
130
131        let result = self.send(msg);
132
133        // Clear the timeout
134        if let Err(e) = self.tcp_stream.set_write_timeout(None) {
135            log::error!("{self}Cannot set TCP write timeout: {e}");
136            return Err(Error::NetworkError(e.to_string()));
137        }
138
139        result
140    }
141
142    /// Receive a SIP response.
143    ///
144    /// Blocks until a response is received.
145    pub fn recv(&mut self) -> Result<Message, Error> {
146        self.recv_internal()?
147            .ok_or_else(|| Error::NetworkError("Receive timed out on vanilla recv()?".to_string()))
148    }
149
150    /// Receive a message, waiting at most `timeout` seconds.  Clears the
151    /// TCP socket timeout upon completion.
152    pub fn recv_with_timeout(&mut self, timeout: u64) -> Result<Option<Message>, Error> {
153        let time = Some(Duration::from_secs(timeout));
154
155        if let Err(e) = self.tcp_stream.set_read_timeout(time) {
156            log::error!("{self}Cannot set TCP read timeout: timeout={timeout} {e}");
157            return Err(Error::NetworkError(e.to_string()));
158        }
159
160        let result = self.recv_internal();
161
162        // Clear the timeout
163        if let Err(e) = self.tcp_stream.set_read_timeout(None) {
164            log::error!("{self}Cannot set TCP read timeout: {e}");
165            return Err(Error::NetworkError(e.to_string()));
166        }
167
168        result
169    }
170
171    /// Do the actual receiving from the socket.
172    fn recv_internal(&mut self) -> Result<Option<Message>, Error> {
173        let mut text = String::from("");
174
175        loop {
176            let mut buf: [u8; READ_BUFSIZE] = [0; READ_BUFSIZE];
177
178            let num_bytes = match self.tcp_stream.read(&mut buf) {
179                Ok(num) => num,
180                Err(e) => match e.kind() {
181                    std::io::ErrorKind::WouldBlock => {
182                        log::trace!("{self}SIP tcp read timed out.  Returning None");
183                        return Ok(None);
184                    }
185                    std::io::ErrorKind::ConnectionReset => {
186                        log::info!("{self}remote disconnected in recv()");
187                        return Err(Error::NetworkError(e.to_string()));
188                    }
189                    _ => {
190                        log::error!("{self}recv() failed: {e}");
191                        return Err(Error::NetworkError(e.to_string()));
192                    }
193                },
194            };
195
196            if num_bytes == 0 {
197                break;
198            }
199
200            let chunk = match str::from_utf8(&buf) {
201                Ok(s) => s,
202                Err(s) => {
203                    log::error!("{self}recv() got non-utf data: {}", s);
204                    return Err(Error::MessageFormatError);
205                }
206            };
207
208            text += chunk;
209
210            if text.contains(spec::LINE_TERMINATOR) {
211                // We've read a whole message.
212                break;
213            }
214        }
215
216        if text.is_empty() {
217            // Receiving no content here indicates either an error
218            // or the client simply disconnected.
219            log::debug!("{self}Reading TCP stream returned 0 bytes");
220            return Err(Error::NoResponseError);
221        }
222
223        // SIP requests should always arrive one at a time.  Discard the
224        // line/message terminator and any data that exists beyond it.
225        let mut parts = text.split(spec::LINE_TERMINATOR);
226
227        match parts.next() {
228            Some(s) => {
229                let msg = Message::from_sip(s)?;
230                log::info!("{self}INBOUND: {}", msg.to_sip_redacted());
231                Ok(Some(msg))
232            }
233            None => Err(Error::MessageFormatError),
234        }
235    }
236
237    /// Shortcut for:  self.send(msg); resp = self.recv();
238    pub fn sendrecv(&mut self, msg: &Message) -> Result<Message, Error> {
239        self.send(msg)?;
240        self.recv()
241    }
242}