feat(mailer-smtp): implement in-process SMTP server and management IPC integration
This commit is contained in:
@@ -6,9 +6,16 @@
|
||||
//! integration with `@push.rocks/smartrust` from TypeScript
|
||||
|
||||
use clap::{Parser, Subcommand};
|
||||
use dashmap::DashMap;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::io::{self, BufRead, Write};
|
||||
use std::net::IpAddr;
|
||||
use std::sync::Arc;
|
||||
use tokio::sync::oneshot;
|
||||
|
||||
use mailer_smtp::connection::{
|
||||
AuthResult, CallbackRegistry, ConnectionEvent, EmailProcessingResult,
|
||||
};
|
||||
|
||||
/// mailer-bin: Rust-powered email security tools
|
||||
#[derive(Parser)]
|
||||
@@ -105,6 +112,43 @@ struct IpcEvent {
|
||||
data: serde_json::Value,
|
||||
}
|
||||
|
||||
// --- Pending callbacks for correlation-ID based reverse calls ---
|
||||
|
||||
/// Stores oneshot senders for pending email processing and auth callbacks.
|
||||
struct PendingCallbacks {
|
||||
email: DashMap<String, oneshot::Sender<EmailProcessingResult>>,
|
||||
auth: DashMap<String, oneshot::Sender<AuthResult>>,
|
||||
}
|
||||
|
||||
impl PendingCallbacks {
|
||||
fn new() -> Self {
|
||||
Self {
|
||||
email: DashMap::new(),
|
||||
auth: DashMap::new(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl CallbackRegistry for PendingCallbacks {
|
||||
fn register_email_callback(
|
||||
&self,
|
||||
correlation_id: &str,
|
||||
) -> oneshot::Receiver<EmailProcessingResult> {
|
||||
let (tx, rx) = oneshot::channel();
|
||||
self.email.insert(correlation_id.to_string(), tx);
|
||||
rx
|
||||
}
|
||||
|
||||
fn register_auth_callback(
|
||||
&self,
|
||||
correlation_id: &str,
|
||||
) -> oneshot::Receiver<AuthResult> {
|
||||
let (tx, rx) = oneshot::channel();
|
||||
self.auth.insert(correlation_id.to_string(), tx);
|
||||
rx
|
||||
}
|
||||
}
|
||||
|
||||
fn main() {
|
||||
let cli = Cli::parse();
|
||||
|
||||
@@ -278,7 +322,17 @@ fn main() {
|
||||
|
||||
use std::io::Read;
|
||||
|
||||
/// Shared state for the management mode.
|
||||
struct ManagementState {
|
||||
callbacks: Arc<PendingCallbacks>,
|
||||
smtp_handle: Option<mailer_smtp::server::SmtpServerHandle>,
|
||||
smtp_event_rx: Option<tokio::sync::mpsc::Receiver<ConnectionEvent>>,
|
||||
}
|
||||
|
||||
/// Run in management/IPC mode for smartrust bridge.
|
||||
///
|
||||
/// This mode supports both request/response IPC (existing commands) and
|
||||
/// long-running SMTP server with event-based callbacks.
|
||||
fn run_management_mode() {
|
||||
// Signal readiness
|
||||
let ready_event = IpcEvent {
|
||||
@@ -294,39 +348,153 @@ fn run_management_mode() {
|
||||
|
||||
let rt = tokio::runtime::Runtime::new().unwrap();
|
||||
|
||||
let stdin = io::stdin();
|
||||
for line in stdin.lock().lines() {
|
||||
let line = match line {
|
||||
Ok(l) => l,
|
||||
Err(_) => break,
|
||||
};
|
||||
let callbacks = Arc::new(PendingCallbacks::new());
|
||||
let mut state = ManagementState {
|
||||
callbacks: callbacks.clone(),
|
||||
smtp_handle: None,
|
||||
smtp_event_rx: None,
|
||||
};
|
||||
|
||||
if line.trim().is_empty() {
|
||||
continue;
|
||||
}
|
||||
// We need to read stdin in a separate thread (blocking I/O)
|
||||
// and process commands + SMTP events in the tokio runtime.
|
||||
let (cmd_tx, mut cmd_rx) = tokio::sync::mpsc::channel::<String>(256);
|
||||
|
||||
let req: IpcRequest = match serde_json::from_str(&line) {
|
||||
Ok(r) => r,
|
||||
Err(e) => {
|
||||
let resp = IpcResponse {
|
||||
id: "unknown".to_string(),
|
||||
success: false,
|
||||
result: None,
|
||||
error: Some(format!("Invalid request: {}", e)),
|
||||
};
|
||||
println!("{}", serde_json::to_string(&resp).unwrap());
|
||||
io::stdout().flush().unwrap();
|
||||
continue;
|
||||
// Spawn stdin reader thread
|
||||
std::thread::spawn(move || {
|
||||
let stdin = io::stdin();
|
||||
for line in stdin.lock().lines() {
|
||||
match line {
|
||||
Ok(l) if !l.trim().is_empty() => {
|
||||
if cmd_tx.blocking_send(l).is_err() {
|
||||
break;
|
||||
}
|
||||
}
|
||||
Ok(_) => continue,
|
||||
Err(_) => break,
|
||||
}
|
||||
};
|
||||
}
|
||||
});
|
||||
|
||||
let response = rt.block_on(handle_ipc_request(&req));
|
||||
println!("{}", serde_json::to_string(&response).unwrap());
|
||||
io::stdout().flush().unwrap();
|
||||
rt.block_on(async {
|
||||
loop {
|
||||
// Select between stdin commands and SMTP server events
|
||||
tokio::select! {
|
||||
cmd = cmd_rx.recv() => {
|
||||
match cmd {
|
||||
Some(line) => {
|
||||
let req: IpcRequest = match serde_json::from_str(&line) {
|
||||
Ok(r) => r,
|
||||
Err(e) => {
|
||||
let resp = IpcResponse {
|
||||
id: "unknown".to_string(),
|
||||
success: false,
|
||||
result: None,
|
||||
error: Some(format!("Invalid request: {}", e)),
|
||||
};
|
||||
emit_line(&serde_json::to_string(&resp).unwrap());
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
let response = handle_ipc_request(&req, &mut state).await;
|
||||
emit_line(&serde_json::to_string(&response).unwrap());
|
||||
}
|
||||
None => {
|
||||
// stdin closed — shut down
|
||||
if let Some(handle) = state.smtp_handle.take() {
|
||||
handle.shutdown().await;
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
event = async {
|
||||
if let Some(rx) = &mut state.smtp_event_rx {
|
||||
rx.recv().await
|
||||
} else {
|
||||
// No SMTP server running — wait forever (yields to other branch)
|
||||
std::future::pending::<Option<ConnectionEvent>>().await
|
||||
}
|
||||
} => {
|
||||
if let Some(event) = event {
|
||||
handle_smtp_event(event);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/// Emit a line to stdout and flush.
|
||||
fn emit_line(line: &str) {
|
||||
let stdout = io::stdout();
|
||||
let mut handle = stdout.lock();
|
||||
let _ = writeln!(handle, "{}", line);
|
||||
let _ = handle.flush();
|
||||
}
|
||||
|
||||
/// Emit an IPC event to stdout.
|
||||
fn emit_event(event_name: &str, data: serde_json::Value) {
|
||||
let event = IpcEvent {
|
||||
event: event_name.to_string(),
|
||||
data,
|
||||
};
|
||||
emit_line(&serde_json::to_string(&event).unwrap());
|
||||
}
|
||||
|
||||
/// Handle a connection event from the SMTP server.
|
||||
fn handle_smtp_event(event: ConnectionEvent) {
|
||||
match event {
|
||||
ConnectionEvent::EmailReceived {
|
||||
correlation_id,
|
||||
session_id,
|
||||
mail_from,
|
||||
rcpt_to,
|
||||
data,
|
||||
remote_addr,
|
||||
client_hostname,
|
||||
secure,
|
||||
authenticated_user,
|
||||
security_results,
|
||||
} => {
|
||||
emit_event(
|
||||
"emailReceived",
|
||||
serde_json::json!({
|
||||
"correlationId": correlation_id,
|
||||
"sessionId": session_id,
|
||||
"mailFrom": mail_from,
|
||||
"rcptTo": rcpt_to,
|
||||
"data": data,
|
||||
"remoteAddr": remote_addr,
|
||||
"clientHostname": client_hostname,
|
||||
"secure": secure,
|
||||
"authenticatedUser": authenticated_user,
|
||||
"securityResults": security_results,
|
||||
}),
|
||||
);
|
||||
}
|
||||
ConnectionEvent::AuthRequest {
|
||||
correlation_id,
|
||||
session_id,
|
||||
username,
|
||||
password,
|
||||
remote_addr,
|
||||
} => {
|
||||
emit_event(
|
||||
"authRequest",
|
||||
serde_json::json!({
|
||||
"correlationId": correlation_id,
|
||||
"sessionId": session_id,
|
||||
"username": username,
|
||||
"password": password,
|
||||
"remoteAddr": remote_addr,
|
||||
}),
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn handle_ipc_request(req: &IpcRequest) -> IpcResponse {
|
||||
async fn handle_ipc_request(req: &IpcRequest, state: &mut ManagementState) -> IpcResponse {
|
||||
match req.method.as_str() {
|
||||
"ping" => IpcResponse {
|
||||
id: req.id.clone(),
|
||||
@@ -636,6 +804,35 @@ async fn handle_ipc_request(req: &IpcRequest) -> IpcResponse {
|
||||
}
|
||||
}
|
||||
|
||||
// --- SMTP Server lifecycle commands ---
|
||||
|
||||
"startSmtpServer" => {
|
||||
handle_start_smtp_server(req, state).await
|
||||
}
|
||||
|
||||
"stopSmtpServer" => {
|
||||
handle_stop_smtp_server(req, state).await
|
||||
}
|
||||
|
||||
"emailProcessingResult" => {
|
||||
handle_email_processing_result(req, state)
|
||||
}
|
||||
|
||||
"authResult" => {
|
||||
handle_auth_result(req, state)
|
||||
}
|
||||
|
||||
"configureRateLimits" => {
|
||||
// Rate limit configuration is set at startSmtpServer time.
|
||||
// This command allows runtime updates, but for now we acknowledge it.
|
||||
IpcResponse {
|
||||
id: req.id.clone(),
|
||||
success: true,
|
||||
result: Some(serde_json::json!({"configured": true})),
|
||||
error: None,
|
||||
}
|
||||
}
|
||||
|
||||
_ => IpcResponse {
|
||||
id: req.id.clone(),
|
||||
success: false,
|
||||
@@ -644,3 +841,214 @@ async fn handle_ipc_request(req: &IpcRequest) -> IpcResponse {
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
/// Handle startSmtpServer IPC command.
|
||||
async fn handle_start_smtp_server(req: &IpcRequest, state: &mut ManagementState) -> IpcResponse {
|
||||
// Stop existing server if running
|
||||
if let Some(handle) = state.smtp_handle.take() {
|
||||
handle.shutdown().await;
|
||||
}
|
||||
|
||||
// Parse config from params
|
||||
let config = match parse_smtp_config(&req.params) {
|
||||
Ok(c) => c,
|
||||
Err(e) => {
|
||||
return IpcResponse {
|
||||
id: req.id.clone(),
|
||||
success: false,
|
||||
result: None,
|
||||
error: Some(format!("Invalid config: {}", e)),
|
||||
};
|
||||
}
|
||||
};
|
||||
|
||||
// Parse optional rate limit config
|
||||
let rate_config = req.params.get("rateLimits").and_then(|v| {
|
||||
serde_json::from_value::<mailer_smtp::rate_limiter::RateLimitConfig>(v.clone()).ok()
|
||||
});
|
||||
|
||||
match mailer_smtp::server::start_server(config, state.callbacks.clone(), rate_config).await {
|
||||
Ok((handle, event_rx)) => {
|
||||
state.smtp_handle = Some(handle);
|
||||
state.smtp_event_rx = Some(event_rx);
|
||||
IpcResponse {
|
||||
id: req.id.clone(),
|
||||
success: true,
|
||||
result: Some(serde_json::json!({"started": true})),
|
||||
error: None,
|
||||
}
|
||||
}
|
||||
Err(e) => IpcResponse {
|
||||
id: req.id.clone(),
|
||||
success: false,
|
||||
result: None,
|
||||
error: Some(format!("Failed to start SMTP server: {}", e)),
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
/// Handle stopSmtpServer IPC command.
|
||||
async fn handle_stop_smtp_server(req: &IpcRequest, state: &mut ManagementState) -> IpcResponse {
|
||||
if let Some(handle) = state.smtp_handle.take() {
|
||||
handle.shutdown().await;
|
||||
state.smtp_event_rx = None;
|
||||
IpcResponse {
|
||||
id: req.id.clone(),
|
||||
success: true,
|
||||
result: Some(serde_json::json!({"stopped": true})),
|
||||
error: None,
|
||||
}
|
||||
} else {
|
||||
IpcResponse {
|
||||
id: req.id.clone(),
|
||||
success: true,
|
||||
result: Some(serde_json::json!({"stopped": true, "wasRunning": false})),
|
||||
error: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Handle emailProcessingResult IPC command — resolves a pending email callback.
|
||||
fn handle_email_processing_result(req: &IpcRequest, state: &ManagementState) -> IpcResponse {
|
||||
let correlation_id = req
|
||||
.params
|
||||
.get("correlationId")
|
||||
.and_then(|v| v.as_str())
|
||||
.unwrap_or("");
|
||||
|
||||
let result = EmailProcessingResult {
|
||||
accepted: req.params.get("accepted").and_then(|v| v.as_bool()).unwrap_or(false),
|
||||
smtp_code: req.params.get("smtpCode").and_then(|v| v.as_u64()).map(|v| v as u16),
|
||||
smtp_message: req
|
||||
.params
|
||||
.get("smtpMessage")
|
||||
.and_then(|v| v.as_str())
|
||||
.map(String::from),
|
||||
};
|
||||
|
||||
if let Some((_, tx)) = state.callbacks.email.remove(correlation_id) {
|
||||
let _ = tx.send(result);
|
||||
IpcResponse {
|
||||
id: req.id.clone(),
|
||||
success: true,
|
||||
result: Some(serde_json::json!({"resolved": true})),
|
||||
error: None,
|
||||
}
|
||||
} else {
|
||||
IpcResponse {
|
||||
id: req.id.clone(),
|
||||
success: false,
|
||||
result: None,
|
||||
error: Some(format!(
|
||||
"No pending callback for correlationId: {}",
|
||||
correlation_id
|
||||
)),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Handle authResult IPC command — resolves a pending auth callback.
|
||||
fn handle_auth_result(req: &IpcRequest, state: &ManagementState) -> IpcResponse {
|
||||
let correlation_id = req
|
||||
.params
|
||||
.get("correlationId")
|
||||
.and_then(|v| v.as_str())
|
||||
.unwrap_or("");
|
||||
|
||||
let result = AuthResult {
|
||||
success: req.params.get("success").and_then(|v| v.as_bool()).unwrap_or(false),
|
||||
message: req
|
||||
.params
|
||||
.get("message")
|
||||
.and_then(|v| v.as_str())
|
||||
.map(String::from),
|
||||
};
|
||||
|
||||
if let Some((_, tx)) = state.callbacks.auth.remove(correlation_id) {
|
||||
let _ = tx.send(result);
|
||||
IpcResponse {
|
||||
id: req.id.clone(),
|
||||
success: true,
|
||||
result: Some(serde_json::json!({"resolved": true})),
|
||||
error: None,
|
||||
}
|
||||
} else {
|
||||
IpcResponse {
|
||||
id: req.id.clone(),
|
||||
success: false,
|
||||
result: None,
|
||||
error: Some(format!(
|
||||
"No pending auth callback for correlationId: {}",
|
||||
correlation_id
|
||||
)),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Parse SmtpServerConfig from IPC params JSON.
|
||||
fn parse_smtp_config(
|
||||
params: &serde_json::Value,
|
||||
) -> Result<mailer_smtp::config::SmtpServerConfig, String> {
|
||||
let mut config = mailer_smtp::config::SmtpServerConfig::default();
|
||||
|
||||
if let Some(hostname) = params.get("hostname").and_then(|v| v.as_str()) {
|
||||
config.hostname = hostname.to_string();
|
||||
}
|
||||
|
||||
if let Some(ports) = params.get("ports").and_then(|v| v.as_array()) {
|
||||
config.ports = ports
|
||||
.iter()
|
||||
.filter_map(|v| v.as_u64().map(|p| p as u16))
|
||||
.collect();
|
||||
}
|
||||
|
||||
if let Some(secure_port) = params.get("securePort").and_then(|v| v.as_u64()) {
|
||||
config.secure_port = Some(secure_port as u16);
|
||||
}
|
||||
|
||||
if let Some(cert) = params.get("tlsCertPem").and_then(|v| v.as_str()) {
|
||||
config.tls_cert_pem = Some(cert.to_string());
|
||||
}
|
||||
|
||||
if let Some(key) = params.get("tlsKeyPem").and_then(|v| v.as_str()) {
|
||||
config.tls_key_pem = Some(key.to_string());
|
||||
}
|
||||
|
||||
if let Some(size) = params.get("maxMessageSize").and_then(|v| v.as_u64()) {
|
||||
config.max_message_size = size;
|
||||
}
|
||||
|
||||
if let Some(conns) = params.get("maxConnections").and_then(|v| v.as_u64()) {
|
||||
config.max_connections = conns as u32;
|
||||
}
|
||||
|
||||
if let Some(rcpts) = params.get("maxRecipients").and_then(|v| v.as_u64()) {
|
||||
config.max_recipients = rcpts as u32;
|
||||
}
|
||||
|
||||
if let Some(timeout) = params.get("connectionTimeoutSecs").and_then(|v| v.as_u64()) {
|
||||
config.connection_timeout_secs = timeout;
|
||||
}
|
||||
|
||||
if let Some(timeout) = params.get("dataTimeoutSecs").and_then(|v| v.as_u64()) {
|
||||
config.data_timeout_secs = timeout;
|
||||
}
|
||||
|
||||
if let Some(auth) = params.get("authEnabled").and_then(|v| v.as_bool()) {
|
||||
config.auth_enabled = auth;
|
||||
}
|
||||
|
||||
if let Some(failures) = params.get("maxAuthFailures").and_then(|v| v.as_u64()) {
|
||||
config.max_auth_failures = failures as u32;
|
||||
}
|
||||
|
||||
if let Some(timeout) = params.get("socketTimeoutSecs").and_then(|v| v.as_u64()) {
|
||||
config.socket_timeout_secs = timeout;
|
||||
}
|
||||
|
||||
if let Some(timeout) = params.get("processingTimeoutSecs").and_then(|v| v.as_u64()) {
|
||||
config.processing_timeout_secs = timeout;
|
||||
}
|
||||
|
||||
Ok(config)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user