Files
smartmta/rust/crates/mailer-smtp/src/client/pool.rs

516 lines
16 KiB
Rust

//! Connection pooling for the SMTP client.
//!
//! Manages reusable connections per destination `host:port`.
use super::config::SmtpClientConfig;
use super::connection::{connect_plain, connect_tls, ClientSmtpStream};
use super::error::SmtpClientError;
use super::protocol::{self, EhloCapabilities};
use dashmap::DashMap;
use serde::Serialize;
use std::sync::Arc;
use std::time::Instant;
use tokio::sync::Mutex;
use tracing::{debug, info};
/// Maximum age of a pooled connection (5 minutes).
const MAX_CONNECTION_AGE_SECS: u64 = 300;
/// Maximum idle time before a connection is reaped (30 seconds).
const MAX_IDLE_SECS: u64 = 30;
/// Maximum messages per pooled connection before it's recycled.
const MAX_MESSAGES_PER_CONNECTION: u32 = 100;
/// A pooled SMTP connection.
pub struct PooledConnection {
pub stream: ClientSmtpStream,
pub capabilities: EhloCapabilities,
pub created_at: Instant,
pub last_used: Instant,
pub message_count: u32,
pub idle: bool,
}
/// Check if a pooled connection is stale (too old, too many messages, or idle too long).
fn is_connection_stale(conn: &PooledConnection) -> bool {
conn.created_at.elapsed().as_secs() > MAX_CONNECTION_AGE_SECS
|| conn.message_count >= MAX_MESSAGES_PER_CONNECTION
|| (conn.idle && conn.last_used.elapsed().as_secs() > MAX_IDLE_SECS)
}
/// Per-destination connection pool.
pub struct ConnectionPool {
connections: Vec<PooledConnection>,
max_connections: usize,
config: SmtpClientConfig,
}
impl ConnectionPool {
fn new(config: SmtpClientConfig) -> Self {
let max_connections = config.max_pool_connections;
Self {
connections: Vec::new(),
max_connections,
config,
}
}
/// Get an idle connection or create a new one.
async fn acquire(&mut self) -> Result<PooledConnection, SmtpClientError> {
// Remove stale connections first
self.cleanup_stale();
// Find an idle connection
if let Some(idx) = self
.connections
.iter()
.position(|c| c.idle && !is_connection_stale(c))
{
let mut conn = self.connections.remove(idx);
conn.idle = false;
conn.last_used = Instant::now();
debug!(
"Reusing pooled connection (age={}s, msgs={})",
conn.created_at.elapsed().as_secs(),
conn.message_count
);
return Ok(conn);
}
// Check if we can create a new connection
if self.connections.len() >= self.max_connections {
return Err(SmtpClientError::PoolExhausted {
message: format!(
"Pool for {} is at max capacity ({})",
self.config.effective_pool_key(),
self.max_connections
),
});
}
// Create a new connection
self.create_connection().await
}
/// Return a connection to the pool (or close it if it's expired).
fn release(&mut self, mut conn: PooledConnection) {
conn.message_count += 1;
conn.last_used = Instant::now();
conn.idle = true;
// Don't return if it's stale
if is_connection_stale(&conn) || self.connections.len() >= self.max_connections {
debug!("Discarding stale/excess pooled connection");
// Drop the connection (stream will be closed)
return;
}
self.connections.push(conn);
}
/// Create a fresh SMTP connection and complete the handshake.
async fn create_connection(&self) -> Result<PooledConnection, SmtpClientError> {
let mut stream = if self.config.secure {
connect_tls(
&self.config.host,
self.config.port,
self.config.connection_timeout_secs,
self.config.tls_opportunistic,
)
.await?
} else {
connect_plain(
&self.config.host,
self.config.port,
self.config.connection_timeout_secs,
)
.await?
};
// Read greeting
protocol::read_greeting(&mut stream, self.config.socket_timeout_secs).await?;
// Send EHLO
let mut capabilities =
protocol::send_ehlo(&mut stream, &self.config.domain, self.config.socket_timeout_secs)
.await?;
// STARTTLS if available and not already secure
if !self.config.secure && capabilities.starttls {
protocol::send_starttls(&mut stream, self.config.socket_timeout_secs).await?;
stream =
super::connection::upgrade_to_tls(stream, &self.config.host, self.config.tls_opportunistic).await?;
// Re-EHLO after STARTTLS — use updated capabilities for auth
capabilities = protocol::send_ehlo(
&mut stream,
&self.config.domain,
self.config.socket_timeout_secs,
)
.await?;
}
// Authenticate if credentials provided
if let Some(auth) = &self.config.auth {
protocol::authenticate(
&mut stream,
auth,
&capabilities,
self.config.socket_timeout_secs,
)
.await?;
}
info!(
"New SMTP connection to {} established",
self.config.effective_pool_key()
);
Ok(PooledConnection {
stream,
capabilities,
created_at: Instant::now(),
last_used: Instant::now(),
message_count: 0,
idle: false,
})
}
fn cleanup_stale(&mut self) {
self.connections.retain(|c| !is_connection_stale(c));
}
/// Number of connections in the pool.
fn total(&self) -> usize {
self.connections.len()
}
/// Number of idle connections.
fn idle_count(&self) -> usize {
self.connections.iter().filter(|c| c.idle).count()
}
/// Close all connections.
fn close_all(&mut self) {
self.connections.clear();
}
}
/// Status report for a single pool.
#[derive(Debug, Clone, Serialize)]
pub struct PoolStatus {
pub total: usize,
pub active: usize,
pub idle: usize,
}
/// Manages connection pools for multiple SMTP destinations.
pub struct SmtpClientManager {
pools: DashMap<String, Arc<Mutex<ConnectionPool>>>,
}
impl SmtpClientManager {
pub fn new() -> Self {
Self {
pools: DashMap::new(),
}
}
/// Get or create a pool for the given config.
fn get_pool(&self, config: &SmtpClientConfig) -> Arc<Mutex<ConnectionPool>> {
let key = config.effective_pool_key();
self.pools
.entry(key)
.or_insert_with(|| Arc::new(Mutex::new(ConnectionPool::new(config.clone()))))
.clone()
}
/// Acquire a connection from the pool, send a message, and release it.
pub async fn send_message(
&self,
config: &SmtpClientConfig,
sender: &str,
recipients: &[String],
message: &[u8],
) -> Result<SmtpSendResult, SmtpClientError> {
let pool_arc = self.get_pool(config);
let mut pool = pool_arc.lock().await;
let mut conn = pool.acquire().await?;
drop(pool); // Release the pool lock while we do network I/O
// Reset server state if reusing a connection that has already sent messages
if conn.message_count > 0 {
protocol::send_rset(&mut conn.stream, config.socket_timeout_secs).await?;
}
// Perform the SMTP transaction (use pipelining if server supports it)
let pipelining = conn.capabilities.pipelining;
let result =
Self::perform_send(&mut conn.stream, sender, recipients, message, config, pipelining).await;
// Re-acquire the pool lock and release the connection
let mut pool = pool_arc.lock().await;
match &result {
Ok(_) => pool.release(conn),
Err(_) => {
// Don't return failed connections to the pool
debug!("Discarding connection after send failure");
}
}
result
}
/// Perform the SMTP send transaction on a connected stream.
async fn perform_send(
stream: &mut ClientSmtpStream,
sender: &str,
recipients: &[String],
message: &[u8],
config: &SmtpClientConfig,
pipelining: bool,
) -> Result<SmtpSendResult, SmtpClientError> {
let timeout_secs = config.socket_timeout_secs;
let (accepted, rejected) = if pipelining {
// Use pipelined envelope: MAIL FROM + all RCPT TO in one batch
let (_mail_ok, acc, rej) = protocol::send_pipelined_envelope(
stream, sender, recipients, timeout_secs,
).await?;
(acc, rej)
} else {
// Sequential: MAIL FROM, then each RCPT TO
protocol::send_mail_from(stream, sender, timeout_secs).await?;
let mut accepted = Vec::new();
let mut rejected = Vec::new();
for rcpt in recipients {
match protocol::send_rcpt_to(stream, rcpt, timeout_secs).await {
Ok(resp) => {
if resp.is_success() {
accepted.push(rcpt.clone());
} else {
rejected.push(rcpt.clone());
}
}
Err(_) => {
rejected.push(rcpt.clone());
}
}
}
(accepted, rejected)
};
// If no recipients were accepted, fail
if accepted.is_empty() {
return Err(SmtpClientError::ProtocolError {
code: 550,
message: "All recipients were rejected".into(),
});
}
// DATA
let data_resp = protocol::send_data(stream, message, timeout_secs).await?;
// Extract message ID from the response if present
let message_id = data_resp
.lines
.iter()
.find_map(|line| {
// Look for a pattern like "queued as XXXX" or message-id
if line.contains("queued") || line.contains("id=") {
Some(line.clone())
} else {
None
}
});
Ok(SmtpSendResult {
accepted,
rejected,
message_id,
response: data_resp.full_message(),
envelope: SmtpEnvelope {
from: sender.to_string(),
to: recipients.to_vec(),
},
})
}
/// Verify connectivity to an SMTP server (connect, EHLO, QUIT).
pub async fn verify_connection(
&self,
config: &SmtpClientConfig,
) -> Result<SmtpVerifyResult, SmtpClientError> {
let mut stream = if config.secure {
connect_tls(
&config.host,
config.port,
config.connection_timeout_secs,
config.tls_opportunistic,
)
.await?
} else {
connect_plain(
&config.host,
config.port,
config.connection_timeout_secs,
)
.await?
};
let greeting = protocol::read_greeting(&mut stream, config.socket_timeout_secs).await?;
let caps =
protocol::send_ehlo(&mut stream, &config.domain, config.socket_timeout_secs).await?;
let _ = protocol::send_quit(&mut stream, config.socket_timeout_secs).await;
Ok(SmtpVerifyResult {
reachable: true,
greeting: Some(greeting.full_message()),
capabilities: Some(caps.extensions),
})
}
/// Get status of all pools.
pub fn pool_status(&self) -> std::collections::HashMap<String, PoolStatus> {
let mut result = std::collections::HashMap::new();
for entry in self.pools.iter() {
let key = entry.key().clone();
// Try to get the lock without blocking — if locked, report as active
match entry.value().try_lock() {
Ok(pool) => {
let total = pool.total();
let idle = pool.idle_count();
result.insert(
key,
PoolStatus {
total,
active: total - idle,
idle,
},
);
}
Err(_) => {
// Pool is in use; report as busy
result.insert(
key,
PoolStatus {
total: 0,
active: 1,
idle: 0,
},
);
}
}
}
result
}
/// Close a specific pool.
pub async fn close_pool(&self, key: &str) {
if let Some(pool_ref) = self.pools.get(key) {
let mut pool = pool_ref.lock().await;
pool.close_all();
}
self.pools.remove(key);
}
/// Close all pools.
pub async fn close_all_pools(&self) {
let keys: Vec<String> = self.pools.iter().map(|e| e.key().clone()).collect();
for key in keys {
self.close_pool(&key).await;
}
}
}
/// Result of sending an email via SMTP.
#[derive(Debug, Clone, Serialize)]
pub struct SmtpSendResult {
pub accepted: Vec<String>,
pub rejected: Vec<String>,
#[serde(rename = "messageId")]
pub message_id: Option<String>,
pub response: String,
pub envelope: SmtpEnvelope,
}
/// SMTP envelope (sender + recipients).
#[derive(Debug, Clone, Serialize)]
pub struct SmtpEnvelope {
pub from: String,
pub to: Vec<String>,
}
/// Result of verifying an SMTP connection.
#[derive(Debug, Clone, Serialize)]
pub struct SmtpVerifyResult {
pub reachable: bool,
pub greeting: Option<String>,
pub capabilities: Option<Vec<String>>,
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_pool_status_serialization() {
let status = PoolStatus {
total: 5,
active: 2,
idle: 3,
};
let json = serde_json::to_string(&status).unwrap();
assert!(json.contains("\"total\":5"));
assert!(json.contains("\"active\":2"));
assert!(json.contains("\"idle\":3"));
}
#[test]
fn test_send_result_serialization() {
let result = SmtpSendResult {
accepted: vec!["a@b.com".into()],
rejected: vec![],
message_id: Some("abc123".into()),
response: "250 OK".into(),
envelope: SmtpEnvelope {
from: "from@test.com".into(),
to: vec!["a@b.com".into()],
},
};
let json = serde_json::to_string(&result).unwrap();
assert!(json.contains("\"messageId\":\"abc123\""));
assert!(json.contains("\"accepted\":[\"a@b.com\"]"));
}
#[test]
fn test_verify_result_serialization() {
let result = SmtpVerifyResult {
reachable: true,
greeting: Some("220 mail.example.com".into()),
capabilities: Some(vec!["SIZE 10485760".into(), "STARTTLS".into()]),
};
let json = serde_json::to_string(&result).unwrap();
assert!(json.contains("\"reachable\":true"));
}
#[test]
fn test_smtp_client_manager_new() {
let mgr = SmtpClientManager::new();
assert!(mgr.pool_status().is_empty());
}
#[tokio::test]
async fn test_close_all_empty() {
let mgr = SmtpClientManager::new();
mgr.close_all_pools().await;
assert!(mgr.pool_status().is_empty());
}
}