use super::error::Error;
use super::spec;
use super::Message;
use deunicode::deunicode;
use std::fmt;
use std::io::prelude::*;
use std::net::{Shutdown, TcpStream};
use std::str;
use std::time::Duration;
const READ_BUFSIZE: usize = 256;
pub struct Connection {
tcp_stream: TcpStream,
ascii: bool,
log_prefix: Option<String>,
}
impl fmt::Display for Connection {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
if let Some(log_prefix) = self.log_prefix.as_ref() {
write!(f, "{log_prefix} ")
} else {
write!(f, "")
}
}
}
impl Connection {
pub fn new(sip_host: &str) -> Result<Self, Error> {
log::debug!("Connection::new() connecting to: {}", sip_host);
match TcpStream::connect(sip_host) {
Ok(stream) => Ok(Connection {
tcp_stream: stream,
ascii: false,
log_prefix: None,
}),
Err(s) => {
log::error!("Connection::new() failed: {s}");
Err(Error::NetworkError(s.to_string()))
}
}
}
pub fn from_stream(tcp_stream: TcpStream) -> Self {
Connection {
ascii: false,
tcp_stream,
log_prefix: None,
}
}
pub fn set_log_prefix(&mut self, prefix: impl Into<String>) {
self.log_prefix = Some(prefix.into());
}
pub fn set_ascii(&mut self, ascii: bool) {
self.ascii = ascii;
}
pub fn disconnect(&self) -> Result<(), Error> {
log::debug!("{self}Connection::disconnect()");
match self.tcp_stream.shutdown(Shutdown::Both) {
Ok(_) => Ok(()),
Err(s) => {
log::info!("{self}disconnect() failed: {s}");
Err(Error::NetworkError(s.to_string()))
}
}
}
pub fn send(&mut self, msg: &Message) -> Result<(), Error> {
let mut msg_sip = msg.to_sip();
if self.ascii {
msg_sip = deunicode(&msg_sip).replace('\n', "");
}
log::info!("{self}OUTBOUND: {}", msg_sip);
msg_sip.push(spec::LINE_TERMINATOR);
match self.tcp_stream.write_all(msg_sip.as_bytes()) {
Ok(_) => Ok(()),
Err(s) => {
log::error!("{self}send() failed: {}", s);
Err(Error::NetworkError(s.to_string()))
}
}
}
pub fn send_with_timeout(&mut self, msg: &Message, timeout: u64) -> Result<(), Error> {
let time = Some(Duration::from_secs(timeout));
if let Err(e) = self.tcp_stream.set_write_timeout(time) {
log::error!("{self}Cannot set TCP write timeout: timeout={timeout} {e}");
return Err(Error::NetworkError(e.to_string()));
}
let result = self.send(msg);
if let Err(e) = self.tcp_stream.set_write_timeout(None) {
log::error!("{self}Cannot set TCP write timeout: {e}");
return Err(Error::NetworkError(e.to_string()));
}
result
}
pub fn recv(&mut self) -> Result<Message, Error> {
self.recv_internal()?
.ok_or_else(|| Error::NetworkError("Receive timed out on vanilla recv()?".to_string()))
}
pub fn recv_with_timeout(&mut self, timeout: u64) -> Result<Option<Message>, Error> {
let time = Some(Duration::from_secs(timeout));
if let Err(e) = self.tcp_stream.set_read_timeout(time) {
log::error!("{self}Cannot set TCP read timeout: timeout={timeout} {e}");
return Err(Error::NetworkError(e.to_string()));
}
let result = self.recv_internal();
if let Err(e) = self.tcp_stream.set_read_timeout(None) {
log::error!("{self}Cannot set TCP read timeout: {e}");
return Err(Error::NetworkError(e.to_string()));
}
result
}
fn recv_internal(&mut self) -> Result<Option<Message>, Error> {
let mut text = String::from("");
loop {
let mut buf: [u8; READ_BUFSIZE] = [0; READ_BUFSIZE];
let num_bytes = match self.tcp_stream.read(&mut buf) {
Ok(num) => num,
Err(e) => match e.kind() {
std::io::ErrorKind::WouldBlock => {
log::trace!("{self}SIP tcp read timed out. Returning None");
return Ok(None);
}
std::io::ErrorKind::ConnectionReset => {
log::info!("{self}remote disconnected in recv()");
return Err(Error::NetworkError(e.to_string()));
}
_ => {
log::error!("{self}recv() failed: {e}");
return Err(Error::NetworkError(e.to_string()));
}
},
};
if num_bytes == 0 {
break;
}
let chunk = match str::from_utf8(&buf) {
Ok(s) => s,
Err(s) => {
log::error!("{self}recv() got non-utf data: {}", s);
return Err(Error::MessageFormatError);
}
};
text += chunk;
if text.contains(spec::LINE_TERMINATOR) {
break;
}
}
if text.is_empty() {
log::debug!("{self}Reading TCP stream returned 0 bytes");
return Err(Error::NoResponseError);
}
let mut parts = text.split(spec::LINE_TERMINATOR);
match parts.next() {
Some(s) => {
let msg = Message::from_sip(s)?;
log::info!("{self}INBOUND: {}", msg.to_sip_redacted());
Ok(Some(msg))
}
None => Err(Error::MessageFormatError),
}
}
pub fn sendrecv(&mut self, msg: &Message) -> Result<Message, Error> {
self.send(msg)?;
self.recv()
}
}