feat(mailer-smtp): add in-process security pipeline for SMTP delivery (DKIM/SPF/DMARC, content scanning, IP reputation)

This commit is contained in:
2026-02-10 22:26:20 +00:00
parent 595634fb0f
commit eb2643de93
151 changed files with 477 additions and 47531 deletions

View File

@@ -5,14 +5,14 @@
//! script injection, and sensitive data patterns.
use regex::Regex;
use serde::Serialize;
use serde::{Deserialize, Serialize};
use std::sync::LazyLock;
// ---------------------------------------------------------------------------
// Result types
// ---------------------------------------------------------------------------
#[derive(Debug, Clone, Serialize)]
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct ContentScanResult {
pub threat_score: u32,

View File

@@ -22,3 +22,4 @@ base64.workspace = true
rustls-pki-types.workspace = true
rustls = { version = "0.23", default-features = false, features = ["ring", "logging", "std", "tls12"] }
rustls-pemfile = "2"
mailparse.workspace = true

View File

@@ -14,7 +14,10 @@ use crate::validation;
use base64::Engine;
use base64::engine::general_purpose::STANDARD as BASE64;
use hickory_resolver::TokioResolver;
use mailer_security::MessageAuthenticator;
use serde::{Deserialize, Serialize};
use std::net::IpAddr;
use std::sync::Arc;
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
use tokio::net::TcpStream;
@@ -152,6 +155,8 @@ pub async fn handle_connection(
tls_acceptor: Option<Arc<tokio_rustls::TlsAcceptor>>,
remote_addr: String,
is_secure: bool,
authenticator: Arc<MessageAuthenticator>,
resolver: Arc<TokioResolver>,
) {
let mut session = SmtpSession::new(remote_addr.clone(), is_secure);
@@ -217,6 +222,8 @@ pub async fn handle_connection(
&event_tx,
callback_register.as_ref(),
&tls_acceptor,
&authenticator,
&resolver,
)
.await;
@@ -327,6 +334,8 @@ async fn process_line(
event_tx: &mpsc::Sender<ConnectionEvent>,
callback_registry: &dyn CallbackRegistry,
tls_acceptor: &Option<Arc<tokio_rustls::TlsAcceptor>>,
authenticator: &Arc<MessageAuthenticator>,
resolver: &Arc<TokioResolver>,
) -> LineResult {
// Handle AUTH intermediate states (waiting for username/password)
match &session.auth_state {
@@ -375,7 +384,7 @@ async fn process_line(
}
SmtpCommand::Data => {
handle_data(session, stream, config, event_tx, callback_registry).await
handle_data(session, stream, config, event_tx, callback_registry, authenticator, resolver).await
}
SmtpCommand::Rset => {
@@ -558,6 +567,8 @@ async fn handle_data(
config: &SmtpServerConfig,
event_tx: &mpsc::Sender<ConnectionEvent>,
callback_registry: &dyn CallbackRegistry,
authenticator: &Arc<MessageAuthenticator>,
resolver: &Arc<TokioResolver>,
) -> LineResult {
if !session.state.can_data() {
return LineResult::Response(SmtpResponse::bad_sequence(
@@ -622,6 +633,18 @@ async fn handle_data(
let raw_message = accumulator.into_message().unwrap_or_default();
let correlation_id = uuid::Uuid::new_v4().to_string();
// --- In-process security pipeline (30s timeout) ---
let security_results = run_security_pipeline(
&raw_message,
&session.remote_addr,
session.client_hostname.as_deref().unwrap_or("unknown"),
&config.hostname,
&session.envelope.mail_from,
authenticator,
resolver,
)
.await;
// Determine transport: inline base64 or temp file
let email_data = if raw_message.len() <= 256 * 1024 {
EmailData::Inline {
@@ -656,7 +679,7 @@ async fn handle_data(
client_hostname: session.client_hostname.clone(),
secure: session.secure,
authenticated_user: session.authenticated_user().map(|s| s.to_string()),
security_results: None, // Will be populated by server.rs when in-process security is added
security_results
};
if event_tx.send(event).await.is_err() {
@@ -991,6 +1014,166 @@ async fn validate_credentials(
}
}
/// Extract MIME parts from a raw email message for content scanning.
///
/// Returns `(subject, text_body, html_body, attachment_filenames)`.
fn extract_mime_parts(raw_message: &[u8]) -> (Option<String>, Option<String>, Option<String>, Vec<String>) {
let parsed = match mailparse::parse_mail(raw_message) {
Ok(p) => p,
Err(e) => {
debug!(error = %e, "Failed to parse MIME for content scanning");
return (None, None, None, Vec::new());
}
};
// Extract Subject header
let subject = parsed
.headers
.iter()
.find(|h| h.get_key().eq_ignore_ascii_case("subject"))
.map(|h| h.get_value());
let mut text_body: Option<String> = None;
let mut html_body: Option<String> = None;
let mut attachments: Vec<String> = Vec::new();
// Walk the MIME tree
fn walk_parts(
part: &mailparse::ParsedMail<'_>,
text_body: &mut Option<String>,
html_body: &mut Option<String>,
attachments: &mut Vec<String>,
) {
let content_type = part.ctype.mimetype.to_lowercase();
let disposition = part.get_content_disposition();
// Check if this is an attachment
if disposition.disposition == mailparse::DispositionType::Attachment {
if let Some(filename) = disposition.params.get("filename") {
attachments.push(filename.clone());
}
} else if content_type == "text/plain" && text_body.is_none() {
if let Ok(body) = part.get_body() {
*text_body = Some(body);
}
} else if content_type == "text/html" && html_body.is_none() {
if let Ok(body) = part.get_body() {
*html_body = Some(body);
}
}
// Recurse into subparts
for sub in &part.subparts {
walk_parts(sub, text_body, html_body, attachments);
}
}
walk_parts(&parsed, &mut text_body, &mut html_body, &mut attachments);
(subject, text_body, html_body, attachments)
}
/// Run the full security pipeline: DKIM/SPF/DMARC + content scan + IP reputation.
///
/// Returns `Some(json_value)` on success or `None` if the pipeline fails or times out.
async fn run_security_pipeline(
raw_message: &[u8],
remote_addr: &str,
helo_domain: &str,
hostname: &str,
mail_from: &str,
authenticator: &Arc<MessageAuthenticator>,
resolver: &Arc<TokioResolver>,
) -> Option<serde_json::Value> {
let security_timeout = Duration::from_secs(30);
match timeout(security_timeout, run_security_pipeline_inner(
raw_message, remote_addr, helo_domain, hostname, mail_from, authenticator, resolver,
)).await {
Ok(Ok(value)) => {
debug!("In-process security pipeline completed");
Some(value)
}
Ok(Err(e)) => {
warn!(error = %e, "Security pipeline error — emitting event without results");
None
}
Err(_) => {
warn!("Security pipeline timed out (30s) — emitting event without results");
None
}
}
}
/// Inner implementation of the security pipeline (no timeout wrapper).
async fn run_security_pipeline_inner(
raw_message: &[u8],
remote_addr: &str,
helo_domain: &str,
hostname: &str,
mail_from: &str,
authenticator: &Arc<MessageAuthenticator>,
resolver: &Arc<TokioResolver>,
) -> std::result::Result<serde_json::Value, Box<dyn std::error::Error + Send + Sync>> {
// Parse the remote IP address
let ip: IpAddr = remote_addr.parse().unwrap_or(IpAddr::V4(std::net::Ipv4Addr::LOCALHOST));
// Run DKIM/SPF/DMARC and IP reputation concurrently
let (email_security, reputation) = tokio::join!(
mailer_security::verify_email_security(
raw_message, ip, helo_domain, hostname, mail_from, authenticator,
),
mailer_security::check_reputation(
ip, mailer_security::DEFAULT_DNSBL_SERVERS, resolver,
),
);
// Extract MIME parts for content scanning (synchronous)
let (subject, text_body, html_body, attachment_names) = extract_mime_parts(raw_message);
// Run content scan (synchronous)
let content_scan = mailer_security::content_scanner::scan_content(
subject.as_deref(),
text_body.as_deref(),
html_body.as_deref(),
&attachment_names,
);
// Build the combined results JSON
let mut results = serde_json::Map::new();
// DKIM/SPF/DMARC
match email_security {
Ok(sec) => {
results.insert("dkim".into(), serde_json::to_value(&sec.dkim)?);
results.insert("spf".into(), serde_json::to_value(&sec.spf)?);
results.insert("dmarc".into(), serde_json::to_value(&sec.dmarc)?);
}
Err(e) => {
warn!(error = %e, "Email security verification failed");
results.insert("dkim".into(), serde_json::Value::Array(vec![]));
results.insert("spf".into(), serde_json::Value::Null);
results.insert("dmarc".into(), serde_json::Value::Null);
}
}
// Content scan
results.insert("contentScan".into(), serde_json::to_value(&content_scan)?);
// IP reputation
match reputation {
Ok(rep) => {
results.insert("ipReputation".into(), serde_json::to_value(&rep)?);
}
Err(e) => {
warn!(error = %e, "IP reputation check failed");
results.insert("ipReputation".into(), serde_json::Value::Null);
}
}
Ok(serde_json::Value::Object(results))
}
#[cfg(test)]
mod tests {
use super::*;
@@ -1020,4 +1203,106 @@ mod tests {
let json = serde_json::to_string(&result).unwrap();
assert!(json.contains("accepted"));
}
#[test]
fn test_extract_mime_parts_simple() {
let raw = b"From: sender@example.com\r\n\
To: rcpt@example.com\r\n\
Subject: Test Subject\r\n\
Content-Type: text/plain\r\n\
\r\n\
Hello, this is a test body.\r\n";
let (subject, text, html, attachments) = extract_mime_parts(raw);
assert_eq!(subject.as_deref(), Some("Test Subject"));
assert!(text.is_some());
assert!(text.unwrap().contains("Hello, this is a test body."));
assert!(html.is_none());
assert!(attachments.is_empty());
}
#[test]
fn test_extract_mime_parts_multipart() {
let raw = b"From: sender@example.com\r\n\
To: rcpt@example.com\r\n\
Subject: Multipart Test\r\n\
Content-Type: multipart/mixed; boundary=\"boundary123\"\r\n\
\r\n\
--boundary123\r\n\
Content-Type: text/plain\r\n\
\r\n\
Plain text body\r\n\
--boundary123\r\n\
Content-Type: text/html\r\n\
\r\n\
<html><body>HTML body</body></html>\r\n\
--boundary123\r\n\
Content-Type: application/octet-stream\r\n\
Content-Disposition: attachment; filename=\"report.pdf\"\r\n\
\r\n\
binary data here\r\n\
--boundary123--\r\n";
let (subject, text, html, attachments) = extract_mime_parts(raw);
assert_eq!(subject.as_deref(), Some("Multipart Test"));
assert!(text.is_some());
assert!(text.unwrap().contains("Plain text body"));
assert!(html.is_some());
assert!(html.unwrap().contains("HTML body"));
assert_eq!(attachments.len(), 1);
assert_eq!(attachments[0], "report.pdf");
}
#[test]
fn test_extract_mime_parts_no_subject() {
let raw = b"From: sender@example.com\r\n\
To: rcpt@example.com\r\n\
Content-Type: text/plain\r\n\
\r\n\
Body without subject\r\n";
let (subject, text, _html, _attachments) = extract_mime_parts(raw);
assert!(subject.is_none());
assert!(text.is_some());
}
#[test]
fn test_extract_mime_parts_invalid() {
let raw = b"this is not a valid email";
let (subject, text, html, attachments) = extract_mime_parts(raw);
// Should not panic, may or may not parse partially
// The key property is that it doesn't crash
let _ = (subject, text, html, attachments);
}
#[test]
fn test_extract_mime_parts_multiple_attachments() {
let raw = b"From: sender@example.com\r\n\
To: rcpt@example.com\r\n\
Subject: Attachments\r\n\
Content-Type: multipart/mixed; boundary=\"bound\"\r\n\
\r\n\
--bound\r\n\
Content-Type: text/plain\r\n\
\r\n\
See attached\r\n\
--bound\r\n\
Content-Type: application/pdf\r\n\
Content-Disposition: attachment; filename=\"doc1.pdf\"\r\n\
\r\n\
pdf data\r\n\
--bound\r\n\
Content-Type: application/vnd.ms-excel\r\n\
Content-Disposition: attachment; filename=\"data.xlsx\"\r\n\
\r\n\
excel data\r\n\
--bound--\r\n";
let (subject, text, _html, attachments) = extract_mime_parts(raw);
assert_eq!(subject.as_deref(), Some("Attachments"));
assert!(text.is_some());
assert_eq!(attachments.len(), 2);
assert!(attachments.contains(&"doc1.pdf".to_string()));
assert!(attachments.contains(&"data.xlsx".to_string()));
}
}

View File

@@ -9,6 +9,8 @@ use crate::connection::{
};
use crate::rate_limiter::{RateLimitConfig, RateLimiter};
use hickory_resolver::TokioResolver;
use mailer_security::MessageAuthenticator;
use rustls_pki_types::{CertificateDer, PrivateKeyDer};
use std::io::BufReader;
use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};
@@ -63,6 +65,17 @@ pub async fn start_server(
let (event_tx, event_rx) = mpsc::channel::<ConnectionEvent>(1024);
// Create shared security resources for in-process email verification
let authenticator: Arc<MessageAuthenticator> = Arc::new(
mailer_security::default_authenticator()
.map_err(|e| format!("Failed to create MessageAuthenticator: {e}"))?
);
let resolver: Arc<TokioResolver> = Arc::new(
TokioResolver::builder_tokio()
.map(|b| b.build())
.map_err(|e| format!("Failed to create TokioResolver: {e}"))?
);
// Build TLS acceptor if configured
let tls_acceptor = if config.has_tls() {
Some(Arc::new(build_tls_acceptor(&config)?))
@@ -87,6 +100,8 @@ pub async fn start_server(
callback_registry.clone(),
tls_acceptor.clone(),
false, // not implicit TLS
authenticator.clone(),
resolver.clone(),
));
handles.push(handle);
}
@@ -108,6 +123,8 @@ pub async fn start_server(
callback_registry.clone(),
tls_acceptor.clone(),
true, // implicit TLS
authenticator.clone(),
resolver.clone(),
));
handles.push(handle);
} else {
@@ -153,6 +170,8 @@ async fn accept_loop(
callback_registry: Arc<dyn CallbackRegistry + Send + Sync>,
tls_acceptor: Option<Arc<tokio_rustls::TlsAcceptor>>,
implicit_tls: bool,
authenticator: Arc<MessageAuthenticator>,
resolver: Arc<TokioResolver>,
) {
loop {
if shutdown.load(Ordering::SeqCst) {
@@ -194,6 +213,8 @@ async fn accept_loop(
let callback_registry = callback_registry.clone();
let tls_acceptor = tls_acceptor.clone();
let active_connections = active_connections.clone();
let authenticator = authenticator.clone();
let resolver = resolver.clone();
active_connections.fetch_add(1, Ordering::SeqCst);
@@ -232,6 +253,8 @@ async fn accept_loop(
tls_acceptor,
remote_addr,
implicit_tls,
authenticator,
resolver,
)
.await;