evergreen/util.rs
1use crate::EgResult;
2use crate::EgValue;
3use json::JsonValue;
4use rand::Rng;
5use socket2::{Domain, Socket, Type};
6use std::collections::HashSet;
7use std::fs;
8use std::net::{SocketAddr, TcpListener};
9use std::path::Path;
10use std::thread;
11use std::time::Duration;
12use std::time::Instant;
13
14pub const REDACTED_PARAMS_STR: &str = "**PARAMS REDACTED**";
15
16// Typical value for SOMAXCONN
17const CONNECT_TCP_BACKLOG: i32 = 128;
18
19/// Current thread ID as u64.
20///
21/// Eventually this will not be needed.
22/// <https://doc.rust-lang.org/stable/std/thread/struct.ThreadId.html#method.as_u64>
23/// <https://github.com/rust-lang/rust/pull/110738>
24pub fn thread_id() -> u64 {
25 // "Thread(123)"
26 let id = format!("{:?}", thread::current().id());
27 let mut parts = id.split(&['(', ')']);
28
29 if let Some(id) = parts.nth(1) {
30 if let Ok(idnum) = id.parse::<u64>() {
31 return idnum;
32 }
33 }
34
35 0
36}
37
38/// Returns a string of random numbers of the requested length
39///
40/// Any `size` value that exceeds about 20 will consist wholly of
41/// zeroes along the first portion of the string.
42///
43/// ```
44/// use evergreen::util;
45/// let n = util::random_number(12);
46/// assert_eq!(n.len(), 12);
47/// ```
48pub fn random_number(size: u8) -> String {
49 let mut rng = rand::thread_rng();
50 let num: u64 = rng.gen_range(0..u64::MAX);
51 format!("{:0width$}", num, width = size as usize)[0..size as usize].to_string()
52}
53
54/// Converts a JSON number or string to an isize if possible
55///
56/// ```
57/// use evergreen::util;
58/// use json;
59/// let v = json::from(-123);
60/// assert_eq!(util::json_isize(&v), Some(-123));
61/// let v = json::from("hello");
62/// assert_eq!(util::json_isize(&v), None);
63/// ```
64pub fn json_isize(value: &JsonValue) -> Option<isize> {
65 if let Some(i) = value.as_isize() {
66 return Some(i);
67 } else if let Some(s) = value.as_str() {
68 if let Ok(i2) = s.parse::<isize>() {
69 return Some(i2);
70 }
71 };
72
73 None
74}
75
76/// Converts a JSON number or string to an usize if possible
77/// ```
78/// use evergreen::util;
79/// use json;
80/// let v = json::from(-123);
81/// assert_eq!(util::json_usize(&v), None);
82/// let v = json::from("hello");
83/// assert_eq!(util::json_usize(&v), None);
84/// let v = json::from(12321);
85/// assert_eq!(util::json_usize(&v), Some(12321));
86/// ```
87pub fn json_usize(value: &JsonValue) -> Option<usize> {
88 if let Some(i) = value.as_usize() {
89 return Some(i);
90 } else if let Some(s) = value.as_str() {
91 if let Ok(i2) = s.parse::<usize>() {
92 return Some(i2);
93 }
94 };
95
96 None
97}
98
99/// Simple seconds-based countdown timer.
100/// ```
101/// use evergreen::util;
102///
103/// let t = util::Timer::new(60);
104/// assert!(!t.done());
105/// assert!(t.remaining() > 0);
106/// assert_eq!(t.duration(), 60);
107///
108/// let t = util::Timer::new(0);
109/// assert!(t.done());
110/// assert!(t.remaining() == 0);
111/// assert_eq!(t.duration(), 0);
112///
113/// ```
114pub struct Timer {
115 /// Duration of this timer in seconds.
116 /// Timer is "done" once this many seconds have passed
117 /// since start_time.
118 duration: u64,
119
120 /// Moment this timer starts.
121 start_time: Instant,
122}
123
124impl Timer {
125 pub fn new(duration: u64) -> Timer {
126 Timer {
127 duration,
128 start_time: Instant::now(),
129 }
130 }
131 pub fn reset(&mut self) {
132 self.start_time = Instant::now();
133 }
134 pub fn remaining(&self) -> u64 {
135 let elapsed = self.start_time.elapsed().as_secs();
136 if elapsed > self.duration {
137 0
138 } else {
139 self.duration - elapsed
140 }
141 }
142 pub fn duration(&self) -> u64 {
143 self.duration
144 }
145 pub fn done(&self) -> bool {
146 self.remaining() == 0
147 }
148}
149
150/// Creates a (JSON) String verion of a list of method parameters,
151/// replacing params with a generic REDACTED message for log-protected
152/// methods.
153///
154/// ```
155/// use evergreen::util;
156/// let method = "opensrf.system.private.stuff";
157/// let log_protect = vec!["opensrf.system.private".to_string()];
158/// let params = vec![];
159///
160/// let s = util::stringify_params(method, ¶ms, &log_protect);
161/// assert_eq!(s.as_str(), util::REDACTED_PARAMS_STR);
162/// ```
163pub fn stringify_params(method: &str, params: &[EgValue], log_protect: &[String]) -> String {
164 // Check if the method should be protected
165 let is_protected = log_protect.iter().any(|m| method.starts_with(m));
166
167 if is_protected {
168 REDACTED_PARAMS_STR.to_string()
169 } else {
170 params
171 .iter()
172 // EgValue.dump() consumes the value, hence the clone.
173 .map(|p| p.clone().dump())
174 .collect::<Vec<_>>()
175 .join(" ")
176 }
177}
178
179/// Turns a PG array string (e.g. '{1,23,456}') into a uniq list of ints.
180///
181/// ```
182/// let mut res = evergreen::util::pg_unpack_int_array("{1,23,NULL,23,456}");
183/// res.sort();
184/// assert_eq!(res, vec![1,23,456]);
185/// ```
186///
187pub fn pg_unpack_int_array(array: &str) -> Vec<i64> {
188 array
189 .replace(['{', '}'], "")
190 .split(',')
191 // We only care about int-ish things.
192 .filter_map(|s| s.parse::<i64>().ok())
193 .collect::<HashSet<i64>>() // uniquify
194 .iter()
195 .copied()
196 .collect::<Vec<i64>>()
197}
198
199#[derive(Debug, Clone, PartialEq)]
200pub struct Pager {
201 limit: usize,
202 offset: usize,
203}
204
205impl Pager {
206 pub fn new(limit: usize, offset: usize) -> Self {
207 Pager { limit, offset }
208 }
209 pub fn limit(&self) -> usize {
210 self.limit
211 }
212 pub fn offset(&self) -> usize {
213 self.offset
214 }
215 pub fn reset(&mut self) {
216 self.limit = 0;
217 self.offset = 0
218 }
219}
220
221/// Subtract value b from value a while compensating for common floating
222/// point math problems.
223pub fn fpdiff(a: f64, b: f64) -> f64 {
224 ((a * 100.00) - (b * 100.00)) / 100.00
225}
226
227/// Add value b to value a while compensating for common floating point
228/// math problems.
229pub fn fpsum(a: f64, b: f64) -> f64 {
230 ((a * 100.00) + (b * 100.00)) / 100.00
231}
232
233/// "check", "create", "delete" a lockfile
234pub fn lockfile(path: &str, action: &str) -> EgResult<bool> {
235 match action {
236 "check" => match Path::new(path).try_exists() {
237 Ok(b) => Ok(b),
238 Err(e) => Err(e.to_string().into()),
239 },
240 "create" => {
241 // create() truncates. create_new() is still experimental.
242 // So check manually first.
243
244 if lockfile(path, "check")? {
245 return Err(format!("Lockfile already exists: {path}").into());
246 }
247
248 match fs::File::create(path) {
249 Ok(_) => Ok(true),
250 Err(e) => Err(e.to_string().into()),
251 }
252 }
253 "delete" => match fs::remove_file(path) {
254 Ok(_) => Ok(true),
255 Err(e) => Err(e.to_string().into()),
256 },
257 _ => Err(format!("Invalid lockfile action: {action}").into()),
258 }
259}
260
261/// Bind to the provided host:port while applying a read timeout to the
262/// TcpListener.
263///
264/// Applying a timeout to the TcpListener allows TCP servers to
265/// periodically stop listening for new connections and perform
266/// housekeeping duties (check for signals, etc.)
267///
268/// If you don't need a read timeout, the standard TcpListener::bind()
269/// approach should suffice.
270///
271/// * `address` - Bind and listen at this address
272/// * `port` - Bind and listen at this port.
273/// * `read_timeout` - Read timeout in seconds applied to the listening socket.
274///
275/// Example:
276///
277///
278/// ```text
279/// loop {
280/// let mut tcp_listener = eg::util::tcp_listener("127.0.0.1:9898", 5)?;
281///
282/// let client_stream = match self.tcp_listener.accept() {
283/// Ok(stream, _addr) => stream,
284/// Err(e) => match e.kind() {
285/// std::io::ErrorKind::WouldBlock => {
286/// // Read timed out. This is OK.
287/// self.check_for_signals_and_stuff();
288/// continue;
289/// },
290/// _ => {
291/// // Some other error occurred.
292/// eprintln!("TCP accept error {e}");
293/// break;
294/// }
295/// }
296/// }
297/// }
298/// ```
299pub fn tcp_listener(bind: &str, read_timeout: u64) -> EgResult<TcpListener> {
300 let socket = Socket::new(Domain::IPV4, Type::STREAM, None)
301 .map_err(|e| format!("Socket::new() failed with {e}"))?;
302
303 // When we stop/start the service, the address may briefly linger
304 // from open (idle) client connections.
305 socket
306 .set_reuse_address(true)
307 .map_err(|e| format!("Error setting reuse address: {e}"))?;
308
309 let address: SocketAddr = bind
310 .parse()
311 .map_err(|e| format!("Error parsing listen address: {bind}: {e}"))?;
312
313 socket
314 .bind(&address.into())
315 .map_err(|e| format!("Error binding to address: {bind}: {e}"))?;
316
317 socket
318 .listen(CONNECT_TCP_BACKLOG)
319 .map_err(|e| format!("Error listending on socket {bind}: {e}"))?;
320
321 // We need a read timeout so we can wake periodically to check
322 // for shutdown signals.
323 let polltime = Duration::from_secs(read_timeout);
324
325 socket
326 .set_read_timeout(Some(polltime))
327 .map_err(|e| format!("Error setting socket read_timeout: {e}"))?;
328
329 Ok(socket.into())
330}