evergreen/
util.rs

1use crate::EgResult;
2use crate::EgValue;
3use json::JsonValue;
4use rand::Rng;
5use socket2::{Domain, Socket, Type};
6use std::collections::HashSet;
7use std::fs;
8use std::net::{SocketAddr, TcpListener};
9use std::path::Path;
10use std::thread;
11use std::time::Duration;
12use std::time::Instant;
13
14pub const REDACTED_PARAMS_STR: &str = "**PARAMS REDACTED**";
15
16// Typical value for SOMAXCONN
17const CONNECT_TCP_BACKLOG: i32 = 128;
18
19/// Current thread ID as u64.
20///
21/// Eventually this will not be needed.
22/// <https://doc.rust-lang.org/stable/std/thread/struct.ThreadId.html#method.as_u64>
23/// <https://github.com/rust-lang/rust/pull/110738>
24pub fn thread_id() -> u64 {
25    // "Thread(123)"
26    let id = format!("{:?}", thread::current().id());
27    let mut parts = id.split(&['(', ')']);
28
29    if let Some(id) = parts.nth(1) {
30        if let Ok(idnum) = id.parse::<u64>() {
31            return idnum;
32        }
33    }
34
35    0
36}
37
38/// Returns a string of random numbers of the requested length
39///
40/// Any `size` value that exceeds about 20 will consist wholly of
41/// zeroes along the first portion of the string.
42///
43/// ```
44/// use evergreen::util;
45/// let n = util::random_number(12);
46/// assert_eq!(n.len(), 12);
47/// ```
48pub fn random_number(size: u8) -> String {
49    let mut rng = rand::thread_rng();
50    let num: u64 = rng.gen_range(0..u64::MAX);
51    format!("{:0width$}", num, width = size as usize)[0..size as usize].to_string()
52}
53
54/// Converts a JSON number or string to an isize if possible
55///
56/// ```
57/// use evergreen::util;
58/// use json;
59/// let v = json::from(-123);
60/// assert_eq!(util::json_isize(&v), Some(-123));
61/// let v = json::from("hello");
62/// assert_eq!(util::json_isize(&v), None);
63/// ```
64pub fn json_isize(value: &JsonValue) -> Option<isize> {
65    if let Some(i) = value.as_isize() {
66        return Some(i);
67    } else if let Some(s) = value.as_str() {
68        if let Ok(i2) = s.parse::<isize>() {
69            return Some(i2);
70        }
71    };
72
73    None
74}
75
76/// Converts a JSON number or string to an usize if possible
77/// ```
78/// use evergreen::util;
79/// use json;
80/// let v = json::from(-123);
81/// assert_eq!(util::json_usize(&v), None);
82/// let v = json::from("hello");
83/// assert_eq!(util::json_usize(&v), None);
84/// let v = json::from(12321);
85/// assert_eq!(util::json_usize(&v), Some(12321));
86/// ```
87pub fn json_usize(value: &JsonValue) -> Option<usize> {
88    if let Some(i) = value.as_usize() {
89        return Some(i);
90    } else if let Some(s) = value.as_str() {
91        if let Ok(i2) = s.parse::<usize>() {
92            return Some(i2);
93        }
94    };
95
96    None
97}
98
99/// Simple seconds-based countdown timer.
100/// ```
101/// use evergreen::util;
102///
103/// let t = util::Timer::new(60);
104/// assert!(!t.done());
105/// assert!(t.remaining() > 0);
106/// assert_eq!(t.duration(), 60);
107///
108/// let t = util::Timer::new(0);
109/// assert!(t.done());
110/// assert!(t.remaining() == 0);
111/// assert_eq!(t.duration(), 0);
112///
113/// ```
114pub struct Timer {
115    /// Duration of this timer in seconds.
116    /// Timer is "done" once this many seconds have passed
117    /// since start_time.
118    duration: u64,
119
120    /// Moment this timer starts.
121    start_time: Instant,
122}
123
124impl Timer {
125    pub fn new(duration: u64) -> Timer {
126        Timer {
127            duration,
128            start_time: Instant::now(),
129        }
130    }
131    pub fn reset(&mut self) {
132        self.start_time = Instant::now();
133    }
134    pub fn remaining(&self) -> u64 {
135        let elapsed = self.start_time.elapsed().as_secs();
136        if elapsed > self.duration {
137            0
138        } else {
139            self.duration - elapsed
140        }
141    }
142    pub fn duration(&self) -> u64 {
143        self.duration
144    }
145    pub fn done(&self) -> bool {
146        self.remaining() == 0
147    }
148}
149
150/// Creates a (JSON) String verion of a list of method parameters,
151/// replacing params with a generic REDACTED message for log-protected
152/// methods.
153///
154/// ```
155/// use evergreen::util;
156/// let method = "opensrf.system.private.stuff";
157/// let log_protect = vec!["opensrf.system.private".to_string()];
158/// let params = vec![];
159///
160/// let s = util::stringify_params(method, &params, &log_protect);
161/// assert_eq!(s.as_str(), util::REDACTED_PARAMS_STR);
162/// ```
163pub fn stringify_params(method: &str, params: &[EgValue], log_protect: &[String]) -> String {
164    // Check if the method should be protected
165    let is_protected = log_protect.iter().any(|m| method.starts_with(m));
166
167    if is_protected {
168        REDACTED_PARAMS_STR.to_string()
169    } else {
170        params
171            .iter()
172            // EgValue.dump() consumes the value, hence the clone.
173            .map(|p| p.clone().dump())
174            .collect::<Vec<_>>()
175            .join(" ")
176    }
177}
178
179/// Turns a PG array string (e.g. '{1,23,456}') into a uniq list of ints.
180///
181/// ```
182/// let mut res = evergreen::util::pg_unpack_int_array("{1,23,NULL,23,456}");
183/// res.sort();
184/// assert_eq!(res, vec![1,23,456]);
185/// ```
186///
187pub fn pg_unpack_int_array(array: &str) -> Vec<i64> {
188    array
189        .replace(['{', '}'], "")
190        .split(',')
191        // We only care about int-ish things.
192        .filter_map(|s| s.parse::<i64>().ok())
193        .collect::<HashSet<i64>>() // uniquify
194        .iter()
195        .copied()
196        .collect::<Vec<i64>>()
197}
198
199#[derive(Debug, Clone, PartialEq)]
200pub struct Pager {
201    limit: usize,
202    offset: usize,
203}
204
205impl Pager {
206    pub fn new(limit: usize, offset: usize) -> Self {
207        Pager { limit, offset }
208    }
209    pub fn limit(&self) -> usize {
210        self.limit
211    }
212    pub fn offset(&self) -> usize {
213        self.offset
214    }
215    pub fn reset(&mut self) {
216        self.limit = 0;
217        self.offset = 0
218    }
219}
220
221/// Subtract value b from value a while compensating for common floating
222/// point math problems.
223pub fn fpdiff(a: f64, b: f64) -> f64 {
224    ((a * 100.00) - (b * 100.00)) / 100.00
225}
226
227/// Add value b to value a while compensating for common floating point
228/// math problems.
229pub fn fpsum(a: f64, b: f64) -> f64 {
230    ((a * 100.00) + (b * 100.00)) / 100.00
231}
232
233/// "check", "create", "delete" a lockfile
234pub fn lockfile(path: &str, action: &str) -> EgResult<bool> {
235    match action {
236        "check" => match Path::new(path).try_exists() {
237            Ok(b) => Ok(b),
238            Err(e) => Err(e.to_string().into()),
239        },
240        "create" => {
241            // create() truncates.  create_new() is still experimental.
242            // So check manually first.
243
244            if lockfile(path, "check")? {
245                return Err(format!("Lockfile already exists: {path}").into());
246            }
247
248            match fs::File::create(path) {
249                Ok(_) => Ok(true),
250                Err(e) => Err(e.to_string().into()),
251            }
252        }
253        "delete" => match fs::remove_file(path) {
254            Ok(_) => Ok(true),
255            Err(e) => Err(e.to_string().into()),
256        },
257        _ => Err(format!("Invalid lockfile action: {action}").into()),
258    }
259}
260
261/// Bind to the provided host:port while applying a read timeout to the
262/// TcpListener.
263///
264/// Applying a timeout to the TcpListener allows TCP servers to
265/// periodically stop listening for new connections and perform
266/// housekeeping duties (check for signals, etc.)
267///
268/// If you don't need a read timeout, the standard TcpListener::bind()
269/// approach should suffice.
270///
271/// * `address` - Bind and listen at this address
272/// * `port` - Bind and listen at this port.
273/// * `read_timeout` - Read timeout in seconds applied to the listening socket.
274///
275/// Example:
276///
277///
278/// ```text
279/// loop {
280///    let mut tcp_listener = eg::util::tcp_listener("127.0.0.1:9898", 5)?;
281///
282///    let client_stream = match self.tcp_listener.accept() {
283///        Ok(stream, _addr) => stream,
284///        Err(e) => match e.kind() {
285///            std::io::ErrorKind::WouldBlock => {
286///                // Read timed out.  This is OK.
287///                self.check_for_signals_and_stuff();
288///                continue;
289///            },
290///            _ => {
291///                // Some other error occurred.
292///                eprintln!("TCP accept error {e}");
293///                break;
294///            }
295///        }
296///    }
297/// }
298/// ```
299pub fn tcp_listener(bind: &str, read_timeout: u64) -> EgResult<TcpListener> {
300    let socket = Socket::new(Domain::IPV4, Type::STREAM, None)
301        .map_err(|e| format!("Socket::new() failed with {e}"))?;
302
303    // When we stop/start the service, the address may briefly linger
304    // from open (idle) client connections.
305    socket
306        .set_reuse_address(true)
307        .map_err(|e| format!("Error setting reuse address: {e}"))?;
308
309    let address: SocketAddr = bind
310        .parse()
311        .map_err(|e| format!("Error parsing listen address: {bind}: {e}"))?;
312
313    socket
314        .bind(&address.into())
315        .map_err(|e| format!("Error binding to address: {bind}: {e}"))?;
316
317    socket
318        .listen(CONNECT_TCP_BACKLOG)
319        .map_err(|e| format!("Error listending on socket {bind}: {e}"))?;
320
321    // We need a read timeout so we can wake periodically to check
322    // for shutdown signals.
323    let polltime = Duration::from_secs(read_timeout);
324
325    socket
326        .set_read_timeout(Some(polltime))
327        .map_err(|e| format!("Error setting socket read_timeout: {e}"))?;
328
329    Ok(socket.into())
330}