1use super::error::Error;
2use super::spec;
3use super::Message;
4use deunicode::deunicode;
5use std::fmt;
6use std::io::prelude::*;
7use std::net::{Shutdown, TcpStream};
8use std::str;
9use std::time::Duration;
10
11const READ_BUFSIZE: usize = 256;
13
14pub struct Connection {
17 tcp_stream: TcpStream,
18
19 ascii: bool,
21
22 log_prefix: Option<String>,
23}
24
25impl fmt::Display for Connection {
26 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
27 if let Some(log_prefix) = self.log_prefix.as_ref() {
28 write!(f, "{log_prefix} ")
29 } else {
30 write!(f, "")
31 }
32 }
33}
34
35impl Connection {
36 pub fn new(sip_host: &str) -> Result<Self, Error> {
46 log::debug!("Connection::new() connecting to: {}", sip_host);
47
48 match TcpStream::connect(sip_host) {
49 Ok(stream) => Ok(Connection {
50 tcp_stream: stream,
51 ascii: false,
52 log_prefix: None,
53 }),
54 Err(s) => {
55 log::error!("Connection::new() failed: {s}");
56 Err(Error::NetworkError(s.to_string()))
57 }
58 }
59 }
60
61 pub fn from_stream(tcp_stream: TcpStream) -> Self {
63 Connection {
64 ascii: false,
65 tcp_stream,
66 log_prefix: None,
67 }
68 }
69
70 pub fn set_log_prefix(&mut self, prefix: impl Into<String>) {
73 self.log_prefix = Some(prefix.into());
74 }
75
76 pub fn set_ascii(&mut self, ascii: bool) {
78 self.ascii = ascii;
79 }
80
81 pub fn disconnect(&self) -> Result<(), Error> {
83 log::debug!("{self}Connection::disconnect()");
84
85 match self.tcp_stream.shutdown(Shutdown::Both) {
86 Ok(_) => Ok(()),
87 Err(s) => {
88 log::debug!("{self}disconnect() failed: {s}");
90 Err(Error::NetworkError(s.to_string()))
91 }
92 }
93 }
94
95 pub fn send(&mut self, msg: &Message) -> Result<(), Error> {
97 let mut msg_sip = msg.to_sip();
98
99 if self.ascii {
100 msg_sip = deunicode(&msg_sip).replace('\n', "");
103 }
104
105 log::info!("{self}OUTBOUND: {}", msg_sip);
107
108 msg_sip.push(spec::LINE_TERMINATOR);
109
110 match self.tcp_stream.write_all(msg_sip.as_bytes()) {
111 Ok(_) => Ok(()),
112 Err(s) => {
113 log::error!("{self}send() failed: {}", s);
114 Err(Error::NetworkError(s.to_string()))
115 }
116 }
117 }
118
119 pub fn send_with_timeout(&mut self, msg: &Message, timeout: u64) -> Result<(), Error> {
124 let time = Some(Duration::from_secs(timeout));
125
126 if let Err(e) = self.tcp_stream.set_write_timeout(time) {
127 log::error!("{self}Cannot set TCP write timeout: timeout={timeout} {e}");
128 return Err(Error::NetworkError(e.to_string()));
129 }
130
131 let result = self.send(msg);
132
133 if let Err(e) = self.tcp_stream.set_write_timeout(None) {
135 log::error!("{self}Cannot set TCP write timeout: {e}");
136 return Err(Error::NetworkError(e.to_string()));
137 }
138
139 result
140 }
141
142 pub fn recv(&mut self) -> Result<Message, Error> {
146 self.recv_internal()?
147 .ok_or_else(|| Error::NetworkError("Receive timed out on vanilla recv()?".to_string()))
148 }
149
150 pub fn recv_with_timeout(&mut self, timeout: u64) -> Result<Option<Message>, Error> {
153 let time = Some(Duration::from_secs(timeout));
154
155 if let Err(e) = self.tcp_stream.set_read_timeout(time) {
156 log::error!("{self}Cannot set TCP read timeout: timeout={timeout} {e}");
157 return Err(Error::NetworkError(e.to_string()));
158 }
159
160 let result = self.recv_internal();
161
162 if let Err(e) = self.tcp_stream.set_read_timeout(None) {
164 log::error!("{self}Cannot set TCP read timeout: {e}");
165 return Err(Error::NetworkError(e.to_string()));
166 }
167
168 result
169 }
170
171 fn recv_internal(&mut self) -> Result<Option<Message>, Error> {
173 let mut text = String::from("");
174
175 loop {
176 let mut buf: [u8; READ_BUFSIZE] = [0; READ_BUFSIZE];
177
178 let num_bytes = match self.tcp_stream.read(&mut buf) {
179 Ok(num) => num,
180 Err(e) => match e.kind() {
181 std::io::ErrorKind::WouldBlock => {
182 log::trace!("{self}SIP tcp read timed out. Returning None");
183 return Ok(None);
184 }
185 std::io::ErrorKind::ConnectionReset => {
186 log::info!("{self}remote disconnected in recv()");
187 return Err(Error::NetworkError(e.to_string()));
188 }
189 _ => {
190 log::error!("{self}recv() failed: {e}");
191 return Err(Error::NetworkError(e.to_string()));
192 }
193 },
194 };
195
196 if num_bytes == 0 {
197 break;
198 }
199
200 let chunk = match str::from_utf8(&buf) {
201 Ok(s) => s,
202 Err(s) => {
203 log::error!("{self}recv() got non-utf data: {}", s);
204 return Err(Error::MessageFormatError);
205 }
206 };
207
208 text += chunk;
209
210 if text.contains(spec::LINE_TERMINATOR) {
211 break;
213 }
214 }
215
216 if text.is_empty() {
217 log::debug!("{self}Reading TCP stream returned 0 bytes");
220 return Err(Error::NoResponseError);
221 }
222
223 let mut parts = text.split(spec::LINE_TERMINATOR);
226
227 match parts.next() {
228 Some(s) => {
229 let msg = Message::from_sip(s)?;
230 log::info!("{self}INBOUND: {}", msg.to_sip_redacted());
231 Ok(Some(msg))
232 }
233 None => Err(Error::MessageFormatError),
234 }
235 }
236
237 pub fn sendrecv(&mut self, msg: &Message) -> Result<Message, Error> {
239 self.send(msg)?;
240 self.recv()
241 }
242}