1use crate::osrf::addr::BusAddress;
2use crate::osrf::conf;
3use crate::osrf::logging::Logger;
4use crate::osrf::message::TransportMessage;
5use crate::util;
6use crate::EgResult;
7use redis::{Commands, ConnectionAddr, ConnectionInfo, ProtocolVersion, RedisConnectionInfo};
8use std::fmt;
9
10pub struct Bus {
12 connection: redis::Connection,
13
14 address: BusAddress,
16
17 router_name: String,
19
20 raw_data_mode: bool,
25}
26
27impl Bus {
28 pub fn new(config: &conf::BusClient) -> EgResult<Self> {
29 let info = Bus::connection_info(config)?;
30
31 log::trace!("Bus::new() connecting to {:?}", info);
32
33 let client = redis::Client::open(info)
34 .map_err(|e| format!("Error opening Redis connection: {e}"))?;
35
36 let connection = client
37 .get_connection()
38 .map_err(|e| format!("Bus connect error: {e}"))?;
39
40 let username = config.username();
41 let domain = config.domain().name();
42 let addr = BusAddress::for_client(username, domain);
43
44 let bus = Bus {
45 connection,
46 raw_data_mode: false,
47 address: addr,
48 router_name: config.router_name().to_string(),
49 };
50
51 Ok(bus)
52 }
53
54 pub fn set_raw_data_mode(&mut self, on: bool) {
55 self.raw_data_mode = on;
56 }
57
58 fn connection_info(config: &conf::BusClient) -> EgResult<ConnectionInfo> {
63 let redis_con = RedisConnectionInfo {
64 db: 0,
65 username: Some(config.username().to_string()),
66 password: Some(config.password().to_string()),
67 protocol: ProtocolVersion::RESP3,
68 };
69
70 let domain = config.domain();
71 let con_addr = ConnectionAddr::Tcp(domain.name().to_string(), domain.port());
72
73 Ok(ConnectionInfo {
74 addr: con_addr,
75 redis: redis_con,
76 })
77 }
78
79 pub fn address(&self) -> &BusAddress {
81 &self.address
82 }
83
84 pub fn set_address(&mut self, addr: &BusAddress) {
86 self.address = addr.clone();
87 }
88
89 pub fn generate_address(&mut self) {
91 self.address = BusAddress::for_client(self.username(), self.domain());
92 }
93
94 pub fn router_name(&self) -> &str {
96 &self.router_name
97 }
98
99 pub fn domain(&self) -> &str {
101 self.address().domain()
102 }
103
104 pub fn username(&self) -> &str {
105 self.address().username()
106 }
107
108 pub fn connection(&mut self) -> &mut redis::Connection {
109 &mut self.connection
110 }
111
112 fn recv_one_chunk(
117 &mut self,
118 timeout: u64,
119 recipient: Option<&str>,
120 ) -> EgResult<Option<String>> {
121 let recipient = match recipient {
122 Some(s) => s.to_string(),
123 None => self.address().as_str().to_string(),
124 };
125
126 let value: String;
127
128 if timeout == 0 {
129 value = match self.connection().lpop(&recipient, None) {
133 Ok(c) => c,
134 Err(e) => match e.kind() {
135 redis::ErrorKind::TypeError => {
136 return Ok(None);
138 }
139 _ => return Err(format!("recv_one_chunk failed: {e}").into()),
140 },
141 };
142 } else {
143 let mut resp: Vec<String> = self
144 .connection()
145 .blpop(&recipient, timeout as f64) .map_err(|e| format!("Redis blpop error recipient={recipient}: {e}"))?;
147
148 if resp.len() > 1 {
149 value = resp.remove(1);
152 } else {
153 return Ok(None);
155 }
156 }
157
158 log::trace!("recv_one_value() pulled from bus: {}", value);
159
160 Ok(Some(value))
161 }
162
163 fn recv_one_value(
166 &mut self,
167 timeout: u64,
168 recipient: Option<&str>,
169 ) -> EgResult<Option<json::JsonValue>> {
170 let json_string = match self.recv_one_chunk(timeout, recipient)? {
171 Some(s) => s,
172 None => {
173 return Ok(None);
174 }
175 };
176
177 log::trace!("{self} read json from the bus: {json_string}");
178
179 match json::parse(&json_string) {
180 Ok(json_val) => Ok(Some(json_val)),
181 Err(err) => Err(format!("Error parsing JSON: {err:?}").into()),
182 }
183 }
184
185 pub fn recv_json_value(
194 &mut self,
195 timeout: u64,
196 recipient: Option<&str>,
197 ) -> EgResult<Option<json::JsonValue>> {
198 let mut option: Option<json::JsonValue>;
199
200 match timeout {
201 0 => return self.recv_one_value(timeout, recipient),
203
204 _ => {
206 let timer = util::Timer::new(timeout);
207
208 while !timer.done() {
209 option = self.recv_one_value(timer.remaining(), recipient)?;
210
211 if option.is_some() {
212 return Ok(option);
213 }
214 }
215 }
216 }
217
218 Ok(None)
219 }
220
221 pub fn recv(
250 &mut self,
251 timeout: u64,
252 recipient: Option<&str>,
253 ) -> EgResult<Option<TransportMessage>> {
254 let json_op = self.recv_json_value(timeout, recipient)?;
255
256 if let Some(jv) = json_op {
257 match TransportMessage::from_json_value(jv, self.raw_data_mode) {
258 Ok(v) => Ok(Some(v)),
259 Err(e) => {
260 log::error!("Error translating JSON value into EgValue: {e}");
261 Ok(None)
262 }
263 }
264 } else {
265 Ok(None)
266 }
267 }
268
269 pub fn send(&mut self, msg: TransportMessage) -> EgResult<()> {
271 self.send_internal(msg, None)
272 }
273
274 pub fn send_to(&mut self, msg: TransportMessage, recipient: &str) -> EgResult<()> {
277 self.send_internal(msg, Some(recipient))
278 }
279
280 fn send_internal(&mut self, msg: TransportMessage, recipient: Option<&str>) -> EgResult<()> {
283 let mut json_val = msg.into_json_value();
284
285 json_val["osrf_xid"] = json::from(Logger::get_log_trace());
289
290 let recipient = recipient.unwrap_or(json_val["to"].as_str().unwrap());
295
296 let json_str = json_val.dump();
297
298 log::trace!("send() writing chunk to={}: {}", recipient, json_str);
299
300 let res: Result<i32, _> = self.connection().rpush(recipient, json_str);
301
302 if let Err(e) = res {
303 return Err(format!("Error in send() {e}").into());
304 }
305
306 Ok(())
307 }
308
309 pub fn keys(&mut self, pattern: &str) -> EgResult<Vec<String>> {
311 let res: Result<Vec<String>, _> = self.connection().keys(pattern);
312
313 if let Err(e) = res {
314 return Err(format!("Error in keys(): {e}").into());
315 }
316
317 Ok(res.unwrap())
318 }
319
320 pub fn llen(&mut self, key: &str) -> EgResult<i32> {
322 let res: Result<i32, _> = self.connection().llen(key);
323
324 if let Err(e) = res {
325 return Err(format!("Error in llen(): {e}").into());
326 }
327
328 Ok(res.unwrap())
329 }
330
331 pub fn ttl(&mut self, key: &str) -> EgResult<i32> {
335 let res: Result<i32, _> = self.connection().ttl(key);
336
337 if let Err(e) = res {
338 return Err(format!("Error in ttl(): {e}").into());
339 }
340
341 Ok(res.unwrap())
342 }
343
344 pub fn lrange(&mut self, key: &str, start: isize, stop: isize) -> EgResult<Vec<String>> {
346 let res: Result<Vec<String>, _> = self.connection().lrange(key, start, stop);
347
348 if let Err(e) = res {
349 return Err(format!("Error in lrange(): {e}").into());
350 }
351
352 Ok(res.unwrap())
353 }
354
355 pub fn set_key_timeout(&mut self, key: &str, timeout: i64) -> EgResult<i32> {
357 let res: Result<i32, _> = self.connection().expire(key, timeout);
358
359 if let Err(ref e) = res {
360 Err(format!("Error in set_key_timeout(): {e}"))?;
361 }
362
363 let val = res.unwrap();
364 Ok(val)
365 }
366
367 pub fn clear_bus(&mut self) -> EgResult<()> {
369 let stream = self.address().as_str().to_string(); let res: Result<i32, _> = self.connection().del(stream);
371
372 if let Err(e) = res {
373 return Err(format!("Error in queue clear(): {e}").into());
374 }
375
376 Ok(())
377 }
378}
379
380impl fmt::Display for Bus {
382 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
383 write!(f, "Bus {}", self.address().as_str())
384 }
385}
386
387impl Drop for Bus {
391 fn drop(&mut self) {
393 let stream = self.address().as_str().to_string();
394 let res: Result<i32, _> = self.connection().del(&stream);
395 res.ok();
396 }
397}