evergreen/osrf/
bus.rs

1use crate::osrf::addr::BusAddress;
2use crate::osrf::conf;
3use crate::osrf::logging::Logger;
4use crate::osrf::message::TransportMessage;
5use crate::util;
6use crate::EgResult;
7use redis::{Commands, ConnectionAddr, ConnectionInfo, ProtocolVersion, RedisConnectionInfo};
8use std::fmt;
9
10/// Manages a Redis connection.
11pub struct Bus {
12    connection: redis::Connection,
13
14    /// Every bus connection has a unique client address.
15    address: BusAddress,
16
17    /// Name of the router running on our primary domain.
18    router_name: String,
19
20    /// Some clients don't need the IDL and all its classes to function
21    /// (e.g. the router).  Using raw_data_mode allows for transport
22    /// messages to be parsed and serialized without concern for
23    /// IDL-classed information stored in the message.
24    raw_data_mode: bool,
25}
26
27impl Bus {
28    pub fn new(config: &conf::BusClient) -> EgResult<Self> {
29        let info = Bus::connection_info(config)?;
30
31        log::trace!("Bus::new() connecting to {:?}", info);
32
33        let client = redis::Client::open(info)
34            .map_err(|e| format!("Error opening Redis connection: {e}"))?;
35
36        let connection = client
37            .get_connection()
38            .map_err(|e| format!("Bus connect error: {e}"))?;
39
40        let username = config.username();
41        let domain = config.domain().name();
42        let addr = BusAddress::for_client(username, domain);
43
44        let bus = Bus {
45            connection,
46            raw_data_mode: false,
47            address: addr,
48            router_name: config.router_name().to_string(),
49        };
50
51        Ok(bus)
52    }
53
54    pub fn set_raw_data_mode(&mut self, on: bool) {
55        self.raw_data_mode = on;
56    }
57
58    /// Generates the Redis connection Info
59    ///
60    /// Builds the connection info by hand because it gives us more
61    /// flexibility/control than compiling a URL string.
62    fn connection_info(config: &conf::BusClient) -> EgResult<ConnectionInfo> {
63        let redis_con = RedisConnectionInfo {
64            db: 0,
65            username: Some(config.username().to_string()),
66            password: Some(config.password().to_string()),
67            protocol: ProtocolVersion::RESP3,
68        };
69
70        let domain = config.domain();
71        let con_addr = ConnectionAddr::Tcp(domain.name().to_string(), domain.port());
72
73        Ok(ConnectionInfo {
74            addr: con_addr,
75            redis: redis_con,
76        })
77    }
78
79    /// The unique bus address for this bus connection.
80    pub fn address(&self) -> &BusAddress {
81        &self.address
82    }
83
84    /// Apply a new bus address
85    pub fn set_address(&mut self, addr: &BusAddress) {
86        self.address = addr.clone();
87    }
88
89    /// Generates a new BusAddress and applies it to this Bus.
90    pub fn generate_address(&mut self) {
91        self.address = BusAddress::for_client(self.username(), self.domain());
92    }
93
94    /// The name of the router running on our primary domain.
95    pub fn router_name(&self) -> &str {
96        &self.router_name
97    }
98
99    /// Our primary domain
100    pub fn domain(&self) -> &str {
101        self.address().domain()
102    }
103
104    pub fn username(&self) -> &str {
105        self.address().username()
106    }
107
108    pub fn connection(&mut self) -> &mut redis::Connection {
109        &mut self.connection
110    }
111
112    /// Returns at most one String pulled from the queue or None if the
113    /// pop times out or is interrupted.
114    ///
115    /// The string will be whole, unparsed JSON string.
116    fn recv_one_chunk(
117        &mut self,
118        timeout: u64,
119        recipient: Option<&str>,
120    ) -> EgResult<Option<String>> {
121        let recipient = match recipient {
122            Some(s) => s.to_string(),
123            None => self.address().as_str().to_string(),
124        };
125
126        let value: String;
127
128        if timeout == 0 {
129            // non-blocking
130
131            // LPOP returns a scalar response.
132            value = match self.connection().lpop(&recipient, None) {
133                Ok(c) => c,
134                Err(e) => match e.kind() {
135                    redis::ErrorKind::TypeError => {
136                        // Will read a Nil value on timeout.  That's OK.
137                        return Ok(None);
138                    }
139                    _ => return Err(format!("recv_one_chunk failed: {e}").into()),
140                },
141            };
142        } else {
143            let mut resp: Vec<String> = self
144                .connection()
145                .blpop(&recipient, timeout as f64) // TODO
146                .map_err(|e| format!("Redis blpop error recipient={recipient}: {e}"))?;
147
148            if resp.len() > 1 {
149                // BLPOP returns the name of the popped list and the value.
150                // resp = [key, value]
151                value = resp.remove(1);
152            } else {
153                // No message received
154                return Ok(None);
155            }
156        }
157
158        log::trace!("recv_one_value() pulled from bus: {}", value);
159
160        Ok(Some(value))
161    }
162
163    /// Returns at most one JSON value pulled from the queue or None if
164    /// the list pop times out or the pop is interrupted by a signal.
165    fn recv_one_value(
166        &mut self,
167        timeout: u64,
168        recipient: Option<&str>,
169    ) -> EgResult<Option<json::JsonValue>> {
170        let json_string = match self.recv_one_chunk(timeout, recipient)? {
171            Some(s) => s,
172            None => {
173                return Ok(None);
174            }
175        };
176
177        log::trace!("{self} read json from the bus: {json_string}");
178
179        match json::parse(&json_string) {
180            Ok(json_val) => Ok(Some(json_val)),
181            Err(err) => Err(format!("Error parsing JSON: {err:?}").into()),
182        }
183    }
184
185    /// Returns at most one JSON value pulled from the queue.
186    ///
187    /// Keeps trying until a value is returned or the timeout is exceeded.
188    ///
189    /// # Arguments
190    ///
191    /// * `timeout` - Time in seconds to wait for a value.
192    ///   0 means do not block.
193    pub fn recv_json_value(
194        &mut self,
195        timeout: u64,
196        recipient: Option<&str>,
197    ) -> EgResult<Option<json::JsonValue>> {
198        let mut option: Option<json::JsonValue>;
199
200        match timeout {
201            // See if any data is ready now
202            0 => return self.recv_one_value(timeout, recipient),
203
204            // Keep trying until we have a result or exhaust the timeout.
205            _ => {
206                let timer = util::Timer::new(timeout);
207
208                while !timer.done() {
209                    option = self.recv_one_value(timer.remaining(), recipient)?;
210
211                    if option.is_some() {
212                        return Ok(option);
213                    }
214                }
215            }
216        }
217
218        Ok(None)
219    }
220
221    /// Returns at most one TransportMessage.
222    ///
223    /// Keeps trying until a value is returned or the timeout is exceeded.
224    ///
225    /// Avoids exiting with an error on receipt of invalid data from the
226    /// network, since exiting early can result in leaving additional
227    /// (streamed) invalid messages on the bus for later retrieval,
228    /// because presumably the original client request exited instead of
229    /// processing all of the messages.
230    ///
231    /// Invalid messages left on the bus can result in later failures
232    /// for unrelated requests as the old invalid messages are pulled
233    /// and parsed, resulting in additional early exits.
234    ///
235    /// Instead, act as if no response was received.  This is still an
236    /// error condition that requries repair, but this way the impact is
237    /// limited to the failed request.
238    ///
239    /// This condition was seen in the wild when introspecting Perl
240    /// services, which contains unknown "__c" message classes.
241    ///
242    /// # Arguments
243    ///
244    /// * `timeout` - Time in seconds to wait for a response.
245    ///   0 means do not block.
246    /// * `recipient` - Optionally specify the name of the destination
247    ///   queue/stream.  This overrides using the bus-specific
248    ///   bus address as the recipient.
249    pub fn recv(
250        &mut self,
251        timeout: u64,
252        recipient: Option<&str>,
253    ) -> EgResult<Option<TransportMessage>> {
254        let json_op = self.recv_json_value(timeout, recipient)?;
255
256        if let Some(jv) = json_op {
257            match TransportMessage::from_json_value(jv, self.raw_data_mode) {
258                Ok(v) => Ok(Some(v)),
259                Err(e) => {
260                    log::error!("Error translating JSON value into EgValue: {e}");
261                    Ok(None)
262                }
263            }
264        } else {
265            Ok(None)
266        }
267    }
268
269    /// Send a TransportMessage to the "to" value in the message.
270    pub fn send(&mut self, msg: TransportMessage) -> EgResult<()> {
271        self.send_internal(msg, None)
272    }
273
274    /// Send a TransportMessage to the specified BusAddress, regardless
275    /// of what value is in the msg.to() field.
276    pub fn send_to(&mut self, msg: TransportMessage, recipient: &str) -> EgResult<()> {
277        self.send_internal(msg, Some(recipient))
278    }
279
280    /// Sends a TransportMessage to the specified BusAddress, regardless
281    /// of what value is in the msg.to() field.
282    fn send_internal(&mut self, msg: TransportMessage, recipient: Option<&str>) -> EgResult<()> {
283        let mut json_val = msg.into_json_value();
284
285        // Play a little inside baseball here and tag the message
286        // with our log trace.  This way the layers above don't have
287        // to worry about it.
288        json_val["osrf_xid"] = json::from(Logger::get_log_trace());
289
290        // Similarly, this allows us to avoid an unnecessary clone
291        // on the recipient if it resides in the now-moved source message.
292        // json_val["to"].as_str() is guaranteed here, because it's a
293        // requirement for TransportMessage.
294        let recipient = recipient.unwrap_or(json_val["to"].as_str().unwrap());
295
296        let json_str = json_val.dump();
297
298        log::trace!("send() writing chunk to={}: {}", recipient, json_str);
299
300        let res: Result<i32, _> = self.connection().rpush(recipient, json_str);
301
302        if let Err(e) = res {
303            return Err(format!("Error in send() {e}").into());
304        }
305
306        Ok(())
307    }
308
309    /// Returns a list of keys that match the provided pattern.
310    pub fn keys(&mut self, pattern: &str) -> EgResult<Vec<String>> {
311        let res: Result<Vec<String>, _> = self.connection().keys(pattern);
312
313        if let Err(e) = res {
314            return Err(format!("Error in keys(): {e}").into());
315        }
316
317        Ok(res.unwrap())
318    }
319
320    /// Returns the length of the array specified by 'key'.
321    pub fn llen(&mut self, key: &str) -> EgResult<i32> {
322        let res: Result<i32, _> = self.connection().llen(key);
323
324        if let Err(e) = res {
325            return Err(format!("Error in llen(): {e}").into());
326        }
327
328        Ok(res.unwrap())
329    }
330
331    /// Returns the time-to-live (in seconds) of the specified key.
332    ///
333    /// Return -1 if no expire time is set, -2 if no such key exists.
334    pub fn ttl(&mut self, key: &str) -> EgResult<i32> {
335        let res: Result<i32, _> = self.connection().ttl(key);
336
337        if let Err(e) = res {
338            return Err(format!("Error in ttl(): {e}").into());
339        }
340
341        Ok(res.unwrap())
342    }
343
344    /// Returns an array slice as a Vec of Strings.
345    pub fn lrange(&mut self, key: &str, start: isize, stop: isize) -> EgResult<Vec<String>> {
346        let res: Result<Vec<String>, _> = self.connection().lrange(key, start, stop);
347
348        if let Err(e) = res {
349            return Err(format!("Error in lrange(): {e}").into());
350        }
351
352        Ok(res.unwrap())
353    }
354
355    /// Set the expire time on the specified key to 'timeout' seconds from now.
356    pub fn set_key_timeout(&mut self, key: &str, timeout: i64) -> EgResult<i32> {
357        let res: Result<i32, _> = self.connection().expire(key, timeout);
358
359        if let Err(ref e) = res {
360            Err(format!("Error in set_key_timeout(): {e}"))?;
361        }
362
363        let val = res.unwrap();
364        Ok(val)
365    }
366
367    /// Remove all pending data from the recipient queue.
368    pub fn clear_bus(&mut self) -> EgResult<()> {
369        let stream = self.address().as_str().to_string(); // mut borrow
370        let res: Result<i32, _> = self.connection().del(stream);
371
372        if let Err(e) = res {
373            return Err(format!("Error in queue clear(): {e}").into());
374        }
375
376        Ok(())
377    }
378}
379
380/// Good for debugging / logging
381impl fmt::Display for Bus {
382    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
383        write!(f, "Bus {}", self.address().as_str())
384    }
385}
386
387/// Every Bus instance has a unique address which will never be used
388/// again.  When this bus instance is dropped, remove any remaining
389/// messages destined for this address since otherwise they will linger.
390impl Drop for Bus {
391    /// Similar to clear_bus but avoids any logging / error reporting.
392    fn drop(&mut self) {
393        let stream = self.address().as_str().to_string();
394        let res: Result<i32, _> = self.connection().del(&stream);
395        res.ok();
396    }
397}