Files
smartmta/rust/crates/mailer-smtp/src/connection.rs

1024 lines
33 KiB
Rust
Raw Normal View History

//! Per-connection SMTP handler.
//!
//! Manages the read/write loop for a single SMTP connection.
//! Dispatches parsed commands, handles DATA mode, and manages
//! authentication flow.
use crate::command::{parse_command, AuthMechanism, ParseError, SmtpCommand};
use crate::config::SmtpServerConfig;
use crate::data::{DataAccumulator, DataAction};
use crate::rate_limiter::RateLimiter;
use crate::response::{build_capabilities, SmtpResponse};
use crate::session::{AuthState, SmtpSession};
use crate::validation;
use base64::Engine;
use base64::engine::general_purpose::STANDARD as BASE64;
use serde::{Deserialize, Serialize};
use std::sync::Arc;
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
use tokio::net::TcpStream;
use tokio::sync::{mpsc, oneshot};
use tokio::time::{timeout, Duration};
use tokio_rustls::server::TlsStream;
use tracing::{debug, info, warn};
/// Events emitted by a connection handler to the server.
#[derive(Debug, Serialize, Deserialize)]
pub enum ConnectionEvent {
/// A complete email has been received and needs processing.
EmailReceived {
correlation_id: String,
session_id: String,
mail_from: String,
rcpt_to: Vec<String>,
/// Base64-encoded raw message for inline, or file path for large messages.
data: EmailData,
remote_addr: String,
client_hostname: Option<String>,
secure: bool,
authenticated_user: Option<String>,
/// In-process security results (DKIM, SPF, DMARC, content scan).
security_results: Option<serde_json::Value>,
},
/// An authentication request that needs TS validation.
AuthRequest {
correlation_id: String,
session_id: String,
username: String,
password: String,
remote_addr: String,
},
}
/// How email data is transported from Rust to TS.
#[derive(Debug, Serialize, Deserialize)]
#[serde(tag = "type")]
pub enum EmailData {
/// Inline base64-encoded data (for messages <= 256KB).
#[serde(rename = "inline")]
Inline { base64: String },
/// Path to a temp file containing the raw message (for large messages).
#[serde(rename = "file")]
File { path: String },
}
/// Result of TS processing an email.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct EmailProcessingResult {
pub accepted: bool,
pub smtp_code: Option<u16>,
pub smtp_message: Option<String>,
}
/// Result of TS processing an auth request.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AuthResult {
pub success: bool,
pub message: Option<String>,
}
/// Abstraction over plain and TLS streams.
pub enum SmtpStream {
Plain(BufReader<TcpStream>),
Tls(BufReader<TlsStream<TcpStream>>),
}
impl SmtpStream {
/// Read a line from the stream (up to max_line_length bytes).
pub async fn read_line(&mut self, buf: &mut String, max_len: usize) -> std::io::Result<usize> {
match self {
SmtpStream::Plain(reader) => {
let result = reader.read_line(buf).await?;
if buf.len() > max_len {
buf.truncate(max_len);
}
Ok(result)
}
SmtpStream::Tls(reader) => {
let result = reader.read_line(buf).await?;
if buf.len() > max_len {
buf.truncate(max_len);
}
Ok(result)
}
}
}
/// Read bytes from the stream into a buffer.
pub async fn read_chunk(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
use tokio::io::AsyncReadExt;
match self {
SmtpStream::Plain(reader) => reader.read(buf).await,
SmtpStream::Tls(reader) => reader.read(buf).await,
}
}
/// Write bytes to the stream.
pub async fn write_all(&mut self, buf: &[u8]) -> std::io::Result<()> {
match self {
SmtpStream::Plain(reader) => reader.get_mut().write_all(buf).await,
SmtpStream::Tls(reader) => reader.get_mut().write_all(buf).await,
}
}
/// Flush the write buffer.
pub async fn flush(&mut self) -> std::io::Result<()> {
match self {
SmtpStream::Plain(reader) => reader.get_mut().flush().await,
SmtpStream::Tls(reader) => reader.get_mut().flush().await,
}
}
/// Unwrap to get the raw TcpStream for STARTTLS upgrade.
/// Only works on Plain streams.
pub fn into_tcp_stream(self) -> Option<TcpStream> {
match self {
SmtpStream::Plain(reader) => Some(reader.into_inner()),
SmtpStream::Tls(_) => None,
}
}
}
/// Handle a single SMTP connection.
///
/// This is the main entry point spawned for each incoming connection.
pub async fn handle_connection(
mut stream: SmtpStream,
config: Arc<SmtpServerConfig>,
rate_limiter: Arc<RateLimiter>,
event_tx: mpsc::Sender<ConnectionEvent>,
callback_register: Arc<dyn CallbackRegistry + Send + Sync>,
tls_acceptor: Option<Arc<tokio_rustls::TlsAcceptor>>,
remote_addr: String,
is_secure: bool,
) {
let mut session = SmtpSession::new(remote_addr.clone(), is_secure);
// Check IP rate limit
if !rate_limiter.check_connection(&remote_addr) {
let resp = SmtpResponse::service_unavailable(
&config.hostname,
"Too many connections from your IP",
);
let _ = stream.write_all(&resp.to_bytes()).await;
let _ = stream.flush().await;
info!(
session_id = %session.id,
remote_addr = %remote_addr,
"Connection rejected: rate limit exceeded"
);
return;
}
// Send greeting
let greeting = SmtpResponse::greeting(&config.hostname);
if stream.write_all(&greeting.to_bytes()).await.is_err() {
return;
}
if stream.flush().await.is_err() {
return;
}
let socket_timeout = Duration::from_secs(config.socket_timeout_secs);
loop {
let mut line = String::new();
let read_result = timeout(socket_timeout, stream.read_line(&mut line, 4096)).await;
match read_result {
Err(_) => {
// Timeout
let resp = SmtpResponse::service_unavailable(
&config.hostname,
"Connection timed out",
);
let _ = stream.write_all(&resp.to_bytes()).await;
let _ = stream.flush().await;
debug!(session_id = %session.id, "Connection timed out");
break;
}
Ok(Err(e)) => {
debug!(session_id = %session.id, error = %e, "Read error");
break;
}
Ok(Ok(0)) => {
debug!(session_id = %session.id, "Client disconnected");
break;
}
Ok(Ok(_)) => {
// Process command
let response = process_line(
&line,
&mut session,
&mut stream,
&config,
&rate_limiter,
&event_tx,
callback_register.as_ref(),
&tls_acceptor,
)
.await;
match response {
LineResult::Response(resp) => {
if stream.write_all(&resp.to_bytes()).await.is_err() {
break;
}
if stream.flush().await.is_err() {
break;
}
}
LineResult::Quit(resp) => {
let _ = stream.write_all(&resp.to_bytes()).await;
let _ = stream.flush().await;
break;
}
LineResult::StartTlsSignal => {
// Send 220 Ready response
let resp = SmtpResponse::new(220, "Ready to start TLS");
if stream.write_all(&resp.to_bytes()).await.is_err() {
break;
}
if stream.flush().await.is_err() {
break;
}
// Extract TCP stream and upgrade
if let Some(tcp_stream) = stream.into_tcp_stream() {
if let Some(acceptor) = &tls_acceptor {
match acceptor.accept(tcp_stream).await {
Ok(tls_stream) => {
stream = SmtpStream::Tls(BufReader::new(tls_stream));
session.secure = true;
// Client must re-EHLO after STARTTLS
session.state = crate::state::SmtpState::Connected;
session.client_hostname = None;
session.esmtp = false;
session.auth_state = AuthState::None;
session.envelope = Default::default();
debug!(session_id = %session.id, "TLS upgrade successful");
}
Err(e) => {
warn!(session_id = %session.id, error = %e, "TLS handshake failed");
break;
}
}
} else {
break;
}
} else {
// Already TLS — shouldn't happen
break;
}
}
LineResult::NoResponse => {}
LineResult::Disconnect => {
break;
}
}
}
}
}
info!(
session_id = %session.id,
remote_addr = %remote_addr,
messages = session.message_count,
"Connection closed"
);
}
/// Result of processing a single input line.
enum LineResult {
/// Send this response to the client.
Response(SmtpResponse),
/// Send this response then close the connection.
Quit(SmtpResponse),
/// Signal that STARTTLS should be performed (main loop sends 220 and upgrades).
StartTlsSignal,
/// No response needed (handled internally).
NoResponse,
/// Disconnect immediately.
Disconnect,
}
/// Trait for registering and resolving correlation-ID callbacks.
pub trait CallbackRegistry: Send + Sync {
/// Register a callback for email processing and return a receiver.
fn register_email_callback(
&self,
correlation_id: &str,
) -> oneshot::Receiver<EmailProcessingResult>;
/// Register a callback for auth and return a receiver.
fn register_auth_callback(
&self,
correlation_id: &str,
) -> oneshot::Receiver<AuthResult>;
}
/// Process a single input line from the client.
async fn process_line(
line: &str,
session: &mut SmtpSession,
stream: &mut SmtpStream,
config: &SmtpServerConfig,
rate_limiter: &RateLimiter,
event_tx: &mpsc::Sender<ConnectionEvent>,
callback_registry: &dyn CallbackRegistry,
tls_acceptor: &Option<Arc<tokio_rustls::TlsAcceptor>>,
) -> LineResult {
// Handle AUTH intermediate states (waiting for username/password)
match &session.auth_state {
AuthState::WaitingForUsername => {
return handle_auth_username(line.trim(), session);
}
AuthState::WaitingForPassword { .. } => {
return handle_auth_password(
line.trim(),
session,
config,
rate_limiter,
event_tx,
callback_registry,
)
.await;
}
_ => {}
}
// Parse SMTP command
let cmd = match parse_command(line) {
Ok(cmd) => cmd,
Err(ParseError::Empty) => return LineResult::NoResponse,
Err(_) => {
if session.record_invalid_command() {
return LineResult::Quit(SmtpResponse::service_unavailable(
&config.hostname,
"Too many invalid commands",
));
}
return LineResult::Response(SmtpResponse::syntax_error());
}
};
match cmd {
SmtpCommand::Ehlo(hostname) => handle_ehlo(hostname, true, session, config),
SmtpCommand::Helo(hostname) => handle_ehlo(hostname, false, session, config),
SmtpCommand::MailFrom { address, params } => {
handle_mail_from(address, params, session, config, rate_limiter)
}
SmtpCommand::RcptTo { address, params } => {
handle_rcpt_to(address, params, session, config)
}
SmtpCommand::Data => {
handle_data(session, stream, config, event_tx, callback_registry).await
}
SmtpCommand::Rset => {
session.reset_transaction();
LineResult::Response(SmtpResponse::ok("OK"))
}
SmtpCommand::Noop => LineResult::Response(SmtpResponse::ok("OK")),
SmtpCommand::Quit => {
LineResult::Quit(SmtpResponse::closing(&config.hostname))
}
SmtpCommand::StartTls => {
handle_starttls(session, config, tls_acceptor)
}
SmtpCommand::Auth {
mechanism,
initial_response,
} => {
handle_auth(
mechanism,
initial_response,
session,
config,
rate_limiter,
event_tx,
callback_registry,
)
.await
}
SmtpCommand::Help(_) => {
LineResult::Response(SmtpResponse::new(
214,
"EHLO HELO MAIL RCPT DATA RSET NOOP QUIT STARTTLS AUTH HELP VRFY",
))
}
SmtpCommand::Vrfy(_) => {
LineResult::Response(SmtpResponse::new(252, "Cannot VRFY user"))
}
SmtpCommand::Expn(_) => {
LineResult::Response(SmtpResponse::not_implemented())
}
}
}
/// Handle EHLO/HELO command.
fn handle_ehlo(
hostname: String,
esmtp: bool,
session: &mut SmtpSession,
config: &SmtpServerConfig,
) -> LineResult {
if !validation::is_valid_ehlo_hostname(&hostname) {
return LineResult::Response(SmtpResponse::param_error(
"Invalid hostname",
));
}
session.reset_for_ehlo(hostname, esmtp);
if esmtp {
let caps = build_capabilities(
config.max_message_size,
config.has_tls(),
session.secure,
config.auth_enabled,
);
LineResult::Response(SmtpResponse::ehlo_response(&config.hostname, &caps))
} else {
LineResult::Response(SmtpResponse::ok(format!(
"{} Hello",
config.hostname
)))
}
}
/// Handle MAIL FROM command.
fn handle_mail_from(
address: String,
params: std::collections::HashMap<String, Option<String>>,
session: &mut SmtpSession,
config: &SmtpServerConfig,
rate_limiter: &RateLimiter,
) -> LineResult {
if !session.state.can_mail_from() {
return LineResult::Response(SmtpResponse::bad_sequence(
"Send EHLO/HELO first",
));
}
if !validation::is_valid_smtp_address(&address) {
return LineResult::Response(SmtpResponse::param_error(
"Invalid sender address",
));
}
// Check SIZE param
if let Some(Some(size_str)) = params.get("SIZE") {
match validation::validate_size_param(size_str, config.max_message_size) {
Ok(_) => {}
Err(msg) => return LineResult::Response(SmtpResponse::new(552, msg)),
}
}
// Rate limit check for sender
if !address.is_empty() && !rate_limiter.check_message(&address) {
return LineResult::Response(SmtpResponse::temp_failure(
"Too many messages from this sender, try again later",
));
}
session.envelope.mail_from = address;
session.envelope.declared_size = params
.get("SIZE")
.and_then(|v| v.as_ref())
.and_then(|s| s.parse().ok());
session.envelope.body_type = params
.get("BODY")
.and_then(|v| v.clone());
match session.state.transition_mail_from() {
Ok(new_state) => {
session.state = new_state;
LineResult::Response(SmtpResponse::ok("OK"))
}
Err(_) => LineResult::Response(SmtpResponse::bad_sequence(
"Bad sequence of commands",
)),
}
}
/// Handle RCPT TO command.
fn handle_rcpt_to(
address: String,
_params: std::collections::HashMap<String, Option<String>>,
session: &mut SmtpSession,
config: &SmtpServerConfig,
) -> LineResult {
if !session.state.can_rcpt_to() {
return LineResult::Response(SmtpResponse::bad_sequence(
"Send MAIL FROM first",
));
}
if !validation::is_valid_smtp_address(&address) || address.is_empty() {
return LineResult::Response(SmtpResponse::param_error(
"Invalid recipient address",
));
}
if session.envelope.rcpt_to.len() >= config.max_recipients as usize {
return LineResult::Response(SmtpResponse::new(
452,
"Too many recipients",
));
}
session.envelope.rcpt_to.push(address);
match session.state.transition_rcpt_to() {
Ok(new_state) => {
session.state = new_state;
LineResult::Response(SmtpResponse::ok("OK"))
}
Err(_) => LineResult::Response(SmtpResponse::bad_sequence(
"Bad sequence of commands",
)),
}
}
/// Handle DATA command: switch to data mode, accumulate, then emit event.
async fn handle_data(
session: &mut SmtpSession,
stream: &mut SmtpStream,
config: &SmtpServerConfig,
event_tx: &mpsc::Sender<ConnectionEvent>,
callback_registry: &dyn CallbackRegistry,
) -> LineResult {
if !session.state.can_data() {
return LineResult::Response(SmtpResponse::bad_sequence(
"Send RCPT TO first",
));
}
// Transition to DATA state
session.state = match session.state.transition_data() {
Ok(s) => s,
Err(_) => {
return LineResult::Response(SmtpResponse::bad_sequence(
"Bad sequence of commands",
));
}
};
// Send 354
let start_resp = SmtpResponse::start_data();
if stream.write_all(&start_resp.to_bytes()).await.is_err() {
return LineResult::Disconnect;
}
if stream.flush().await.is_err() {
return LineResult::Disconnect;
}
// Accumulate data
let mut accumulator = DataAccumulator::new(config.max_message_size);
let data_timeout = Duration::from_secs(config.data_timeout_secs);
let mut buf = [0u8; 8192];
loop {
let read_result = timeout(data_timeout, stream.read_chunk(&mut buf)).await;
match read_result {
Err(_) => {
// Data timeout
return LineResult::Quit(SmtpResponse::service_unavailable(
&config.hostname,
"Data timeout",
));
}
Ok(Err(_)) => return LineResult::Disconnect,
Ok(Ok(0)) => return LineResult::Disconnect,
Ok(Ok(n)) => {
match accumulator.process_chunk(&buf[..n]) {
DataAction::Continue => continue,
DataAction::SizeExceeded => {
// Must still read until end-of-data to stay in sync
session.state = crate::state::SmtpState::Greeted;
session.envelope = Default::default();
return LineResult::Response(SmtpResponse::size_exceeded(
config.max_message_size,
));
}
DataAction::Complete => break,
}
}
}
}
// Data complete — prepare for delivery
let raw_message = accumulator.into_message().unwrap_or_default();
let correlation_id = uuid::Uuid::new_v4().to_string();
// Determine transport: inline base64 or temp file
let email_data = if raw_message.len() <= 256 * 1024 {
EmailData::Inline {
base64: BASE64.encode(&raw_message),
}
} else {
// Write to temp file
let tmp_path = format!("/tmp/mailer-smtp-{}.eml", &correlation_id);
match tokio::fs::write(&tmp_path, &raw_message).await {
Ok(_) => EmailData::File { path: tmp_path },
Err(e) => {
warn!(error = %e, "Failed to write temp file for large email");
// Fall back to inline
EmailData::Inline {
base64: BASE64.encode(&raw_message),
}
}
}
};
// Register callback before sending event
let rx = callback_registry.register_email_callback(&correlation_id);
// Send event to TS
let event = ConnectionEvent::EmailReceived {
correlation_id: correlation_id.clone(),
session_id: session.id.clone(),
mail_from: session.envelope.mail_from.clone(),
rcpt_to: session.envelope.rcpt_to.clone(),
data: email_data,
remote_addr: session.remote_addr.clone(),
client_hostname: session.client_hostname.clone(),
secure: session.secure,
authenticated_user: session.authenticated_user().map(|s| s.to_string()),
security_results: None, // Will be populated by server.rs when in-process security is added
};
if event_tx.send(event).await.is_err() {
warn!("Failed to send emailReceived event");
return LineResult::Response(SmtpResponse::local_error(
"Internal processing error",
));
}
// Wait for TS response with timeout
let processing_timeout = Duration::from_secs(config.processing_timeout_secs);
let result = match timeout(processing_timeout, rx).await {
Ok(Ok(result)) => result,
Ok(Err(_)) => {
warn!(correlation_id = %correlation_id, "Callback channel dropped");
EmailProcessingResult {
accepted: false,
smtp_code: Some(451),
smtp_message: Some("Processing error".into()),
}
}
Err(_) => {
warn!(correlation_id = %correlation_id, "Processing timeout");
EmailProcessingResult {
accepted: false,
smtp_code: Some(451),
smtp_message: Some("Processing timeout".into()),
}
}
};
// Reset transaction state
session.envelope = Default::default();
let _ = session.state.transition_finished();
session.state = crate::state::SmtpState::Finished;
session.record_message();
if result.accepted {
LineResult::Response(SmtpResponse::ok(
result.smtp_message.unwrap_or_else(|| "Message accepted".into()),
))
} else {
let code = result.smtp_code.unwrap_or(550);
let msg = result
.smtp_message
.unwrap_or_else(|| "Message rejected".into());
LineResult::Response(SmtpResponse::new(code, msg))
}
}
/// Handle STARTTLS command.
///
/// Returns `StartTlsSignal` to indicate the main loop should send 220 and
/// perform the TLS upgrade. The main loop handles the stream swap.
fn handle_starttls(
session: &SmtpSession,
config: &SmtpServerConfig,
tls_acceptor: &Option<Arc<tokio_rustls::TlsAcceptor>>,
) -> LineResult {
if session.secure {
return LineResult::Response(SmtpResponse::bad_sequence(
"Already using TLS",
));
}
if !session.state.can_starttls() {
return LineResult::Response(SmtpResponse::bad_sequence(
"STARTTLS not allowed in current state",
));
}
if tls_acceptor.is_none() || !config.has_tls() {
return LineResult::Response(SmtpResponse::new(
454,
"TLS not available",
));
}
// Signal the main loop to perform TLS upgrade.
// The main loop will: send 220, extract TCP stream, do TLS handshake.
LineResult::StartTlsSignal
}
/// Handle AUTH command.
async fn handle_auth(
mechanism: AuthMechanism,
initial_response: Option<String>,
session: &mut SmtpSession,
config: &SmtpServerConfig,
rate_limiter: &RateLimiter,
event_tx: &mpsc::Sender<ConnectionEvent>,
callback_registry: &dyn CallbackRegistry,
) -> LineResult {
if !config.auth_enabled {
return LineResult::Response(SmtpResponse::not_implemented());
}
if session.is_authenticated() {
return LineResult::Response(SmtpResponse::bad_sequence(
"Already authenticated",
));
}
if !session.state.can_auth() {
return LineResult::Response(SmtpResponse::bad_sequence(
"Send EHLO first",
));
}
match mechanism {
AuthMechanism::Plain => {
if let Some(response) = initial_response {
// Decode and validate immediately
return process_auth_plain(
&response,
session,
config,
rate_limiter,
event_tx,
callback_registry,
)
.await;
}
// No initial response — send challenge for credentials
session.auth_state = AuthState::WaitingForUsername;
// For PLAIN, we use an empty challenge
LineResult::Response(SmtpResponse::auth_challenge(""))
}
AuthMechanism::Login => {
if let Some(response) = initial_response {
// The initial response is the username (base64)
match BASE64.decode(response.as_bytes()) {
Ok(decoded) => {
let username = String::from_utf8_lossy(&decoded).to_string();
session.auth_state = AuthState::WaitingForPassword { username };
// Send password prompt (base64 of "Password:")
LineResult::Response(SmtpResponse::auth_challenge(
&BASE64.encode(b"Password:"),
))
}
Err(_) => LineResult::Response(SmtpResponse::param_error(
"Invalid base64 encoding",
)),
}
} else {
session.auth_state = AuthState::WaitingForUsername;
// Send username prompt (base64 of "Username:")
LineResult::Response(SmtpResponse::auth_challenge(
&BASE64.encode(b"Username:"),
))
}
}
}
}
/// Handle username input during LOGIN auth flow.
fn handle_auth_username(line: &str, session: &mut SmtpSession) -> LineResult {
// Cancel auth if client sends "*"
if line == "*" {
session.auth_state = AuthState::None;
return LineResult::Response(SmtpResponse::new(501, "Authentication cancelled"));
}
match BASE64.decode(line.as_bytes()) {
Ok(decoded) => {
let username = String::from_utf8_lossy(&decoded).to_string();
session.auth_state = AuthState::WaitingForPassword { username };
LineResult::Response(SmtpResponse::auth_challenge(
&BASE64.encode(b"Password:"),
))
}
Err(_) => {
session.auth_state = AuthState::None;
LineResult::Response(SmtpResponse::param_error(
"Invalid base64 encoding",
))
}
}
}
/// Handle password input during LOGIN auth flow.
async fn handle_auth_password(
line: &str,
session: &mut SmtpSession,
config: &SmtpServerConfig,
rate_limiter: &RateLimiter,
event_tx: &mpsc::Sender<ConnectionEvent>,
callback_registry: &dyn CallbackRegistry,
) -> LineResult {
// Cancel auth if client sends "*"
if line == "*" {
session.auth_state = AuthState::None;
return LineResult::Response(SmtpResponse::new(501, "Authentication cancelled"));
}
let username = match &session.auth_state {
AuthState::WaitingForPassword { username } => username.clone(),
_ => {
session.auth_state = AuthState::None;
return LineResult::Response(SmtpResponse::bad_sequence("Unexpected auth state"));
}
};
let password = match BASE64.decode(line.as_bytes()) {
Ok(decoded) => String::from_utf8_lossy(&decoded).to_string(),
Err(_) => {
session.auth_state = AuthState::None;
return LineResult::Response(SmtpResponse::param_error(
"Invalid base64 encoding",
));
}
};
validate_credentials(
&username,
&password,
session,
config,
rate_limiter,
event_tx,
callback_registry,
)
.await
}
/// Process AUTH PLAIN credentials (base64-encoded "\0username\0password").
async fn process_auth_plain(
base64_data: &str,
session: &mut SmtpSession,
config: &SmtpServerConfig,
rate_limiter: &RateLimiter,
event_tx: &mpsc::Sender<ConnectionEvent>,
callback_registry: &dyn CallbackRegistry,
) -> LineResult {
let decoded = match BASE64.decode(base64_data.as_bytes()) {
Ok(d) => d,
Err(_) => {
return LineResult::Response(SmtpResponse::param_error(
"Invalid base64 encoding",
));
}
};
// PLAIN format: \0username\0password
let parts: Vec<&[u8]> = decoded.splitn(3, |&b| b == 0).collect();
if parts.len() < 3 {
return LineResult::Response(SmtpResponse::param_error(
"Invalid PLAIN auth format",
));
}
let username = String::from_utf8_lossy(parts[1]).to_string();
let password = String::from_utf8_lossy(parts[2]).to_string();
validate_credentials(
&username,
&password,
session,
config,
rate_limiter,
event_tx,
callback_registry,
)
.await
}
/// Validate credentials by sending authRequest to TS and waiting for response.
async fn validate_credentials(
username: &str,
password: &str,
session: &mut SmtpSession,
config: &SmtpServerConfig,
rate_limiter: &RateLimiter,
event_tx: &mpsc::Sender<ConnectionEvent>,
callback_registry: &dyn CallbackRegistry,
) -> LineResult {
let correlation_id = uuid::Uuid::new_v4().to_string();
// Register callback before sending event
let rx = callback_registry.register_auth_callback(&correlation_id);
let event = ConnectionEvent::AuthRequest {
correlation_id: correlation_id.clone(),
session_id: session.id.clone(),
username: username.to_string(),
password: password.to_string(),
remote_addr: session.remote_addr.clone(),
};
if event_tx.send(event).await.is_err() {
session.auth_state = AuthState::None;
return LineResult::Response(SmtpResponse::local_error(
"Internal processing error",
));
}
// Wait for TS response
let auth_timeout = Duration::from_secs(5);
let result = match timeout(auth_timeout, rx).await {
Ok(Ok(result)) => result,
Ok(Err(_)) => AuthResult {
success: false,
message: Some("Auth processing error".into()),
},
Err(_) => AuthResult {
success: false,
message: Some("Auth timeout".into()),
},
};
if result.success {
session.auth_state = AuthState::Authenticated {
username: username.to_string(),
};
LineResult::Response(SmtpResponse::auth_success())
} else {
session.auth_state = AuthState::None;
let exceeded = session.record_auth_failure(config.max_auth_failures);
if exceeded {
if !rate_limiter.check_auth_failure(&session.remote_addr) {
return LineResult::Quit(SmtpResponse::service_unavailable(
&config.hostname,
"Too many authentication failures",
));
}
return LineResult::Quit(SmtpResponse::service_unavailable(
&config.hostname,
"Too many authentication failures",
));
}
LineResult::Response(SmtpResponse::auth_failed())
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_email_data_serialization() {
let data = EmailData::Inline {
base64: "dGVzdA==".into(),
};
let json = serde_json::to_string(&data).unwrap();
assert!(json.contains("inline"));
let data = EmailData::File {
path: "/tmp/test.eml".into(),
};
let json = serde_json::to_string(&data).unwrap();
assert!(json.contains("file"));
}
#[test]
fn test_processing_result_serialization() {
let result = EmailProcessingResult {
accepted: true,
smtp_code: Some(250),
smtp_message: Some("OK".into()),
};
let json = serde_json::to_string(&result).unwrap();
assert!(json.contains("accepted"));
}
}