BREAKING CHANGE(smtp-client): Replace the legacy TypeScript SMTP client with a new Rust-based SMTP client and IPC bridge for outbound delivery
This commit is contained in:
503
rust/crates/mailer-smtp/src/client/pool.rs
Normal file
503
rust/crates/mailer-smtp/src/client/pool.rs
Normal file
@@ -0,0 +1,503 @@
|
||||
//! Connection pooling for the SMTP client.
|
||||
//!
|
||||
//! Manages reusable connections per destination `host:port`.
|
||||
|
||||
use super::config::SmtpClientConfig;
|
||||
use super::connection::{connect_plain, connect_tls, ClientSmtpStream};
|
||||
use super::error::SmtpClientError;
|
||||
use super::protocol::{self, EhloCapabilities};
|
||||
use dashmap::DashMap;
|
||||
use serde::Serialize;
|
||||
use std::sync::Arc;
|
||||
use std::time::Instant;
|
||||
use tokio::sync::Mutex;
|
||||
use tracing::{debug, info};
|
||||
|
||||
/// Maximum age of a pooled connection (5 minutes).
|
||||
const MAX_CONNECTION_AGE_SECS: u64 = 300;
|
||||
|
||||
/// Maximum idle time before a connection is reaped (30 seconds).
|
||||
const MAX_IDLE_SECS: u64 = 30;
|
||||
|
||||
/// Maximum messages per pooled connection before it's recycled.
|
||||
const MAX_MESSAGES_PER_CONNECTION: u32 = 100;
|
||||
|
||||
/// A pooled SMTP connection.
|
||||
pub struct PooledConnection {
|
||||
pub stream: ClientSmtpStream,
|
||||
pub capabilities: EhloCapabilities,
|
||||
pub created_at: Instant,
|
||||
pub last_used: Instant,
|
||||
pub message_count: u32,
|
||||
pub idle: bool,
|
||||
}
|
||||
|
||||
/// Check if a pooled connection is stale (too old, too many messages, or idle too long).
|
||||
fn is_connection_stale(conn: &PooledConnection) -> bool {
|
||||
conn.created_at.elapsed().as_secs() > MAX_CONNECTION_AGE_SECS
|
||||
|| conn.message_count >= MAX_MESSAGES_PER_CONNECTION
|
||||
|| (conn.idle && conn.last_used.elapsed().as_secs() > MAX_IDLE_SECS)
|
||||
}
|
||||
|
||||
/// Per-destination connection pool.
|
||||
pub struct ConnectionPool {
|
||||
connections: Vec<PooledConnection>,
|
||||
max_connections: usize,
|
||||
config: SmtpClientConfig,
|
||||
}
|
||||
|
||||
impl ConnectionPool {
|
||||
fn new(config: SmtpClientConfig) -> Self {
|
||||
let max_connections = config.max_pool_connections;
|
||||
Self {
|
||||
connections: Vec::new(),
|
||||
max_connections,
|
||||
config,
|
||||
}
|
||||
}
|
||||
|
||||
/// Get an idle connection or create a new one.
|
||||
async fn acquire(&mut self) -> Result<PooledConnection, SmtpClientError> {
|
||||
// Remove stale connections first
|
||||
self.cleanup_stale();
|
||||
|
||||
// Find an idle connection
|
||||
if let Some(idx) = self
|
||||
.connections
|
||||
.iter()
|
||||
.position(|c| c.idle && !is_connection_stale(c))
|
||||
{
|
||||
let mut conn = self.connections.remove(idx);
|
||||
conn.idle = false;
|
||||
conn.last_used = Instant::now();
|
||||
debug!(
|
||||
"Reusing pooled connection (age={}s, msgs={})",
|
||||
conn.created_at.elapsed().as_secs(),
|
||||
conn.message_count
|
||||
);
|
||||
return Ok(conn);
|
||||
}
|
||||
|
||||
// Check if we can create a new connection
|
||||
if self.connections.len() >= self.max_connections {
|
||||
return Err(SmtpClientError::PoolExhausted {
|
||||
message: format!(
|
||||
"Pool for {} is at max capacity ({})",
|
||||
self.config.effective_pool_key(),
|
||||
self.max_connections
|
||||
),
|
||||
});
|
||||
}
|
||||
|
||||
// Create a new connection
|
||||
self.create_connection().await
|
||||
}
|
||||
|
||||
/// Return a connection to the pool (or close it if it's expired).
|
||||
fn release(&mut self, mut conn: PooledConnection) {
|
||||
conn.message_count += 1;
|
||||
conn.last_used = Instant::now();
|
||||
conn.idle = true;
|
||||
|
||||
// Don't return if it's stale
|
||||
if is_connection_stale(&conn) || self.connections.len() >= self.max_connections {
|
||||
debug!("Discarding stale/excess pooled connection");
|
||||
// Drop the connection (stream will be closed)
|
||||
return;
|
||||
}
|
||||
|
||||
self.connections.push(conn);
|
||||
}
|
||||
|
||||
/// Create a fresh SMTP connection and complete the handshake.
|
||||
async fn create_connection(&self) -> Result<PooledConnection, SmtpClientError> {
|
||||
let mut stream = if self.config.secure {
|
||||
connect_tls(
|
||||
&self.config.host,
|
||||
self.config.port,
|
||||
self.config.connection_timeout_secs,
|
||||
)
|
||||
.await?
|
||||
} else {
|
||||
connect_plain(
|
||||
&self.config.host,
|
||||
self.config.port,
|
||||
self.config.connection_timeout_secs,
|
||||
)
|
||||
.await?
|
||||
};
|
||||
|
||||
// Read greeting
|
||||
protocol::read_greeting(&mut stream, self.config.socket_timeout_secs).await?;
|
||||
|
||||
// Send EHLO
|
||||
let mut capabilities =
|
||||
protocol::send_ehlo(&mut stream, &self.config.domain, self.config.socket_timeout_secs)
|
||||
.await?;
|
||||
|
||||
// STARTTLS if available and not already secure
|
||||
if !self.config.secure && capabilities.starttls {
|
||||
protocol::send_starttls(&mut stream, self.config.socket_timeout_secs).await?;
|
||||
stream =
|
||||
super::connection::upgrade_to_tls(stream, &self.config.host).await?;
|
||||
|
||||
// Re-EHLO after STARTTLS — use updated capabilities for auth
|
||||
capabilities = protocol::send_ehlo(
|
||||
&mut stream,
|
||||
&self.config.domain,
|
||||
self.config.socket_timeout_secs,
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
|
||||
// Authenticate if credentials provided
|
||||
if let Some(auth) = &self.config.auth {
|
||||
protocol::authenticate(
|
||||
&mut stream,
|
||||
auth,
|
||||
&capabilities,
|
||||
self.config.socket_timeout_secs,
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
|
||||
info!(
|
||||
"New SMTP connection to {} established",
|
||||
self.config.effective_pool_key()
|
||||
);
|
||||
|
||||
Ok(PooledConnection {
|
||||
stream,
|
||||
capabilities,
|
||||
created_at: Instant::now(),
|
||||
last_used: Instant::now(),
|
||||
message_count: 0,
|
||||
idle: false,
|
||||
})
|
||||
}
|
||||
|
||||
fn cleanup_stale(&mut self) {
|
||||
self.connections.retain(|c| !is_connection_stale(c));
|
||||
}
|
||||
|
||||
/// Number of connections in the pool.
|
||||
fn total(&self) -> usize {
|
||||
self.connections.len()
|
||||
}
|
||||
|
||||
/// Number of idle connections.
|
||||
fn idle_count(&self) -> usize {
|
||||
self.connections.iter().filter(|c| c.idle).count()
|
||||
}
|
||||
|
||||
/// Close all connections.
|
||||
fn close_all(&mut self) {
|
||||
self.connections.clear();
|
||||
}
|
||||
}
|
||||
|
||||
/// Status report for a single pool.
|
||||
#[derive(Debug, Clone, Serialize)]
|
||||
pub struct PoolStatus {
|
||||
pub total: usize,
|
||||
pub active: usize,
|
||||
pub idle: usize,
|
||||
}
|
||||
|
||||
/// Manages connection pools for multiple SMTP destinations.
|
||||
pub struct SmtpClientManager {
|
||||
pools: DashMap<String, Arc<Mutex<ConnectionPool>>>,
|
||||
}
|
||||
|
||||
impl SmtpClientManager {
|
||||
pub fn new() -> Self {
|
||||
Self {
|
||||
pools: DashMap::new(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Get or create a pool for the given config.
|
||||
fn get_pool(&self, config: &SmtpClientConfig) -> Arc<Mutex<ConnectionPool>> {
|
||||
let key = config.effective_pool_key();
|
||||
self.pools
|
||||
.entry(key)
|
||||
.or_insert_with(|| Arc::new(Mutex::new(ConnectionPool::new(config.clone()))))
|
||||
.clone()
|
||||
}
|
||||
|
||||
/// Acquire a connection from the pool, send a message, and release it.
|
||||
pub async fn send_message(
|
||||
&self,
|
||||
config: &SmtpClientConfig,
|
||||
sender: &str,
|
||||
recipients: &[String],
|
||||
message: &[u8],
|
||||
) -> Result<SmtpSendResult, SmtpClientError> {
|
||||
let pool_arc = self.get_pool(config);
|
||||
let mut pool = pool_arc.lock().await;
|
||||
|
||||
let mut conn = pool.acquire().await?;
|
||||
drop(pool); // Release the pool lock while we do network I/O
|
||||
|
||||
// Reset server state if reusing a connection that has already sent messages
|
||||
if conn.message_count > 0 {
|
||||
protocol::send_rset(&mut conn.stream, config.socket_timeout_secs).await?;
|
||||
}
|
||||
|
||||
// Perform the SMTP transaction
|
||||
let result =
|
||||
Self::perform_send(&mut conn.stream, sender, recipients, message, config).await;
|
||||
|
||||
// Re-acquire the pool lock and release the connection
|
||||
let mut pool = pool_arc.lock().await;
|
||||
match &result {
|
||||
Ok(_) => pool.release(conn),
|
||||
Err(_) => {
|
||||
// Don't return failed connections to the pool
|
||||
debug!("Discarding connection after send failure");
|
||||
}
|
||||
}
|
||||
|
||||
result
|
||||
}
|
||||
|
||||
/// Perform the SMTP send transaction on a connected stream.
|
||||
async fn perform_send(
|
||||
stream: &mut ClientSmtpStream,
|
||||
sender: &str,
|
||||
recipients: &[String],
|
||||
message: &[u8],
|
||||
config: &SmtpClientConfig,
|
||||
) -> Result<SmtpSendResult, SmtpClientError> {
|
||||
let timeout_secs = config.socket_timeout_secs;
|
||||
|
||||
// MAIL FROM
|
||||
protocol::send_mail_from(stream, sender, timeout_secs).await?;
|
||||
|
||||
// RCPT TO for each recipient
|
||||
let mut accepted = Vec::new();
|
||||
let mut rejected = Vec::new();
|
||||
|
||||
for rcpt in recipients {
|
||||
match protocol::send_rcpt_to(stream, rcpt, timeout_secs).await {
|
||||
Ok(resp) => {
|
||||
if resp.is_success() {
|
||||
accepted.push(rcpt.clone());
|
||||
} else {
|
||||
rejected.push(rcpt.clone());
|
||||
}
|
||||
}
|
||||
Err(_) => {
|
||||
rejected.push(rcpt.clone());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// If no recipients were accepted, fail
|
||||
if accepted.is_empty() {
|
||||
return Err(SmtpClientError::ProtocolError {
|
||||
code: 550,
|
||||
message: "All recipients were rejected".into(),
|
||||
});
|
||||
}
|
||||
|
||||
// DATA
|
||||
let data_resp = protocol::send_data(stream, message, timeout_secs).await?;
|
||||
|
||||
// Extract message ID from the response if present
|
||||
let message_id = data_resp
|
||||
.lines
|
||||
.iter()
|
||||
.find_map(|line| {
|
||||
// Look for a pattern like "queued as XXXX" or message-id
|
||||
if line.contains("queued") || line.contains("id=") {
|
||||
Some(line.clone())
|
||||
} else {
|
||||
None
|
||||
}
|
||||
});
|
||||
|
||||
Ok(SmtpSendResult {
|
||||
accepted,
|
||||
rejected,
|
||||
message_id,
|
||||
response: data_resp.full_message(),
|
||||
envelope: SmtpEnvelope {
|
||||
from: sender.to_string(),
|
||||
to: recipients.to_vec(),
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
/// Verify connectivity to an SMTP server (connect, EHLO, QUIT).
|
||||
pub async fn verify_connection(
|
||||
&self,
|
||||
config: &SmtpClientConfig,
|
||||
) -> Result<SmtpVerifyResult, SmtpClientError> {
|
||||
let mut stream = if config.secure {
|
||||
connect_tls(
|
||||
&config.host,
|
||||
config.port,
|
||||
config.connection_timeout_secs,
|
||||
)
|
||||
.await?
|
||||
} else {
|
||||
connect_plain(
|
||||
&config.host,
|
||||
config.port,
|
||||
config.connection_timeout_secs,
|
||||
)
|
||||
.await?
|
||||
};
|
||||
|
||||
let greeting = protocol::read_greeting(&mut stream, config.socket_timeout_secs).await?;
|
||||
let caps =
|
||||
protocol::send_ehlo(&mut stream, &config.domain, config.socket_timeout_secs).await?;
|
||||
let _ = protocol::send_quit(&mut stream, config.socket_timeout_secs).await;
|
||||
|
||||
Ok(SmtpVerifyResult {
|
||||
reachable: true,
|
||||
greeting: Some(greeting.full_message()),
|
||||
capabilities: Some(caps.extensions),
|
||||
})
|
||||
}
|
||||
|
||||
/// Get status of all pools.
|
||||
pub fn pool_status(&self) -> std::collections::HashMap<String, PoolStatus> {
|
||||
let mut result = std::collections::HashMap::new();
|
||||
|
||||
for entry in self.pools.iter() {
|
||||
let key = entry.key().clone();
|
||||
// Try to get the lock without blocking — if locked, report as active
|
||||
match entry.value().try_lock() {
|
||||
Ok(pool) => {
|
||||
let total = pool.total();
|
||||
let idle = pool.idle_count();
|
||||
result.insert(
|
||||
key,
|
||||
PoolStatus {
|
||||
total,
|
||||
active: total - idle,
|
||||
idle,
|
||||
},
|
||||
);
|
||||
}
|
||||
Err(_) => {
|
||||
// Pool is in use; report as busy
|
||||
result.insert(
|
||||
key,
|
||||
PoolStatus {
|
||||
total: 0,
|
||||
active: 1,
|
||||
idle: 0,
|
||||
},
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
result
|
||||
}
|
||||
|
||||
/// Close a specific pool.
|
||||
pub async fn close_pool(&self, key: &str) {
|
||||
if let Some(pool_ref) = self.pools.get(key) {
|
||||
let mut pool = pool_ref.lock().await;
|
||||
pool.close_all();
|
||||
}
|
||||
self.pools.remove(key);
|
||||
}
|
||||
|
||||
/// Close all pools.
|
||||
pub async fn close_all_pools(&self) {
|
||||
let keys: Vec<String> = self.pools.iter().map(|e| e.key().clone()).collect();
|
||||
for key in keys {
|
||||
self.close_pool(&key).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Result of sending an email via SMTP.
|
||||
#[derive(Debug, Clone, Serialize)]
|
||||
pub struct SmtpSendResult {
|
||||
pub accepted: Vec<String>,
|
||||
pub rejected: Vec<String>,
|
||||
#[serde(rename = "messageId")]
|
||||
pub message_id: Option<String>,
|
||||
pub response: String,
|
||||
pub envelope: SmtpEnvelope,
|
||||
}
|
||||
|
||||
/// SMTP envelope (sender + recipients).
|
||||
#[derive(Debug, Clone, Serialize)]
|
||||
pub struct SmtpEnvelope {
|
||||
pub from: String,
|
||||
pub to: Vec<String>,
|
||||
}
|
||||
|
||||
/// Result of verifying an SMTP connection.
|
||||
#[derive(Debug, Clone, Serialize)]
|
||||
pub struct SmtpVerifyResult {
|
||||
pub reachable: bool,
|
||||
pub greeting: Option<String>,
|
||||
pub capabilities: Option<Vec<String>>,
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn test_pool_status_serialization() {
|
||||
let status = PoolStatus {
|
||||
total: 5,
|
||||
active: 2,
|
||||
idle: 3,
|
||||
};
|
||||
let json = serde_json::to_string(&status).unwrap();
|
||||
assert!(json.contains("\"total\":5"));
|
||||
assert!(json.contains("\"active\":2"));
|
||||
assert!(json.contains("\"idle\":3"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_send_result_serialization() {
|
||||
let result = SmtpSendResult {
|
||||
accepted: vec!["a@b.com".into()],
|
||||
rejected: vec![],
|
||||
message_id: Some("abc123".into()),
|
||||
response: "250 OK".into(),
|
||||
envelope: SmtpEnvelope {
|
||||
from: "from@test.com".into(),
|
||||
to: vec!["a@b.com".into()],
|
||||
},
|
||||
};
|
||||
let json = serde_json::to_string(&result).unwrap();
|
||||
assert!(json.contains("\"messageId\":\"abc123\""));
|
||||
assert!(json.contains("\"accepted\":[\"a@b.com\"]"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_verify_result_serialization() {
|
||||
let result = SmtpVerifyResult {
|
||||
reachable: true,
|
||||
greeting: Some("220 mail.example.com".into()),
|
||||
capabilities: Some(vec!["SIZE 10485760".into(), "STARTTLS".into()]),
|
||||
};
|
||||
let json = serde_json::to_string(&result).unwrap();
|
||||
assert!(json.contains("\"reachable\":true"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_smtp_client_manager_new() {
|
||||
let mgr = SmtpClientManager::new();
|
||||
assert!(mgr.pool_status().is_empty());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_close_all_empty() {
|
||||
let mgr = SmtpClientManager::new();
|
||||
mgr.close_all_pools().await;
|
||||
assert!(mgr.pool_status().is_empty());
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user