/** * Native PyPI CLI Testing * Tests the PyPI registry implementation using pip and twine CLI tools */ import { expect, tap } from '@git.zone/tstest/tapbundle'; import { tapNodeTools } from '@git.zone/tstest/tapbundle_serverside'; import { SmartRegistry } from '../ts/index.js'; import { createTestRegistry, createTestTokens, createPythonWheel, createPythonSdist } from './helpers/registry.js'; import type { IRequestContext, IResponse } from '../ts/core/interfaces.core.js'; import * as http from 'http'; import * as url from 'url'; import * as fs from 'fs'; import * as path from 'path'; // Test context let registry: SmartRegistry; let server: http.Server; let registryUrl: string; let registryPort: number; let pypiToken: string; let testDir: string; let pipHome: string; let hasPip = false; let hasTwine = false; /** * Create HTTP server wrapper around SmartRegistry */ async function createHttpServer( registryInstance: SmartRegistry, port: number ): Promise<{ server: http.Server; url: string }> { return new Promise((resolve, reject) => { const httpServer = http.createServer(async (req, res) => { try { // Parse request const parsedUrl = url.parse(req.url || '', true); const pathname = parsedUrl.pathname || '/'; const query = parsedUrl.query; // Read body const chunks: Buffer[] = []; for await (const chunk of req) { chunks.push(chunk); } const bodyBuffer = Buffer.concat(chunks); // Parse body based on content type let body: any; if (bodyBuffer.length > 0) { const contentType = req.headers['content-type'] || ''; if (contentType.includes('application/json')) { try { body = JSON.parse(bodyBuffer.toString('utf-8')); } catch (error) { body = bodyBuffer; } } else if (contentType.includes('multipart/form-data')) { // For multipart, pass raw buffer body = bodyBuffer; } else { body = bodyBuffer; } } // Convert to IRequestContext const context: IRequestContext = { method: req.method || 'GET', path: pathname, headers: req.headers as Record, query: query as Record, body: body, rawBody: bodyBuffer, }; // Handle request const response: IResponse = await registryInstance.handleRequest(context); // Convert IResponse to HTTP response res.statusCode = response.status; // Set headers for (const [key, value] of Object.entries(response.headers || {})) { res.setHeader(key, value); } // Send body if (response.body) { if (Buffer.isBuffer(response.body)) { res.end(response.body); } else if (typeof response.body === 'string') { res.end(response.body); } else { res.setHeader('Content-Type', 'application/json'); res.end(JSON.stringify(response.body)); } } else { res.end(); } } catch (error) { console.error('Server error:', error); res.statusCode = 500; res.setHeader('Content-Type', 'application/json'); res.end(JSON.stringify({ error: 'INTERNAL_ERROR', message: String(error) })); } }); httpServer.listen(port, () => { const serverUrl = `http://localhost:${port}`; resolve({ server: httpServer, url: serverUrl }); }); httpServer.on('error', reject); }); } /** * Setup .pypirc for twine authentication */ function setupPypirc( token: string, pipHomeArg: string, serverUrl: string ): string { fs.mkdirSync(pipHomeArg, { recursive: true }); const pypircContent = `[distutils] index-servers = testpypi [testpypi] repository = ${serverUrl}/pypi username = testuser password = ${token} `; const pypircPath = path.join(pipHomeArg, '.pypirc'); fs.writeFileSync(pypircPath, pypircContent, 'utf-8'); // Set restrictive permissions fs.chmodSync(pypircPath, 0o600); return pypircPath; } /** * Setup pip.conf for pip to use our registry */ function setupPipConf( token: string, pipHomeArg: string, serverUrl: string, port: number ): string { fs.mkdirSync(pipHomeArg, { recursive: true }); // pip.conf with authentication const pipConfContent = `[global] index-url = ${serverUrl}/pypi/simple/ trusted-host = localhost extra-index-url = https://pypi.org/simple/ `; const pipDir = path.join(pipHomeArg, 'pip'); fs.mkdirSync(pipDir, { recursive: true }); const pipConfPath = path.join(pipDir, 'pip.conf'); fs.writeFileSync(pipConfPath, pipConfContent, 'utf-8'); return pipConfPath; } /** * Create a test Python package wheel file */ async function createTestWheelFile( packageName: string, version: string, targetDir: string ): Promise { const wheelData = await createPythonWheel(packageName, version); const normalizedName = packageName.replace(/-/g, '_'); const wheelFilename = `${normalizedName}-${version}-py3-none-any.whl`; const wheelPath = path.join(targetDir, wheelFilename); fs.writeFileSync(wheelPath, wheelData); return wheelPath; } /** * Create a test Python package sdist file */ async function createTestSdistFile( packageName: string, version: string, targetDir: string ): Promise { const sdistData = await createPythonSdist(packageName, version); const sdistFilename = `${packageName}-${version}.tar.gz`; const sdistPath = path.join(targetDir, sdistFilename); fs.writeFileSync(sdistPath, sdistData); return sdistPath; } /** * Run pip command with custom config */ async function runPipCommand( command: string, cwd: string ): Promise<{ stdout: string; stderr: string; exitCode: number }> { const pipConfDir = path.join(pipHome, 'pip'); const fullCommand = `cd "${cwd}" && PIP_CONFIG_FILE="${path.join(pipConfDir, 'pip.conf')}" pip ${command}`; try { const result = await tapNodeTools.runCommand(fullCommand); return { stdout: result.stdout || '', stderr: result.stderr || '', exitCode: result.exitCode || 0, }; } catch (error: any) { return { stdout: error.stdout || '', stderr: error.stderr || String(error), exitCode: error.exitCode || 1, }; } } /** * Run twine command with custom config */ async function runTwineCommand( command: string, cwd: string ): Promise<{ stdout: string; stderr: string; exitCode: number }> { const pypircPath = path.join(pipHome, '.pypirc'); const fullCommand = `cd "${cwd}" && twine ${command} --config-file "${pypircPath}"`; try { const result = await tapNodeTools.runCommand(fullCommand); return { stdout: result.stdout || '', stderr: result.stderr || '', exitCode: result.exitCode || 0, }; } catch (error: any) { return { stdout: error.stdout || '', stderr: error.stderr || String(error), exitCode: error.exitCode || 1, }; } } /** * Cleanup test directory */ function cleanupTestDir(dir: string): void { if (fs.existsSync(dir)) { fs.rmSync(dir, { recursive: true, force: true }); } } // ======================================================================== // TESTS // ======================================================================== tap.test('PyPI CLI: should verify pip is installed', async () => { try { const result = await tapNodeTools.runCommand('pip --version'); console.log('pip version output:', result.stdout.substring(0, 200)); hasPip = result.exitCode === 0; expect(result.exitCode).toEqual(0); } catch (error) { console.log('pip CLI not available'); } }); tap.test('PyPI CLI: should verify twine is installed', async () => { try { const result = await tapNodeTools.runCommand('twine --version'); console.log('twine version output:', result.stdout.substring(0, 200)); hasTwine = result.exitCode === 0; expect(result.exitCode).toEqual(0); } catch (error) { console.log('twine CLI not available'); } if (!hasPip && !hasTwine) { console.log('Neither pip nor twine available, skipping native CLI tests'); tap.skip.test('PyPI CLI: remaining tests skipped - no CLI tools available'); return; } }); tap.test('PyPI CLI: should setup registry and HTTP server', async () => { // Create registry registry = await createTestRegistry(); const tokens = await createTestTokens(registry); pypiToken = tokens.pypiToken; expect(registry).toBeInstanceOf(SmartRegistry); expect(pypiToken).toBeTypeOf('string'); // Use port 39000 (avoids conflicts with other tests) registryPort = 39000; const serverSetup = await createHttpServer(registry, registryPort); server = serverSetup.server; registryUrl = serverSetup.url; expect(server).toBeDefined(); expect(registryUrl).toEqual(`http://localhost:${registryPort}`); // Setup test directory testDir = path.join(process.cwd(), '.nogit', 'test-pypi-cli'); cleanupTestDir(testDir); fs.mkdirSync(testDir, { recursive: true }); // Setup pip/pypi home directory pipHome = path.join(testDir, '.pip'); fs.mkdirSync(pipHome, { recursive: true }); // Setup .pypirc for twine const pypircPath = setupPypirc(pypiToken, pipHome, registryUrl); expect(fs.existsSync(pypircPath)).toEqual(true); // Setup pip.conf const pipConfPath = setupPipConf(pypiToken, pipHome, registryUrl, registryPort); expect(fs.existsSync(pipConfPath)).toEqual(true); }); tap.test('PyPI CLI: should verify server is responding', async () => { // Check server is up by doing a direct HTTP request to simple index const response = await fetch(`${registryUrl}/pypi/simple/`); expect(response.status).toBeGreaterThanOrEqual(200); expect(response.status).toBeLessThan(500); }); tap.test('PyPI CLI: should upload wheel with twine', async () => { if (!hasTwine) { console.log('Skipping twine test - twine not available'); return; } const packageName = 'test-pypi-pkg'; const version = '1.0.0'; const wheelPath = await createTestWheelFile(packageName, version, testDir); expect(fs.existsSync(wheelPath)).toEqual(true); const result = await runTwineCommand( `upload --repository testpypi "${wheelPath}"`, testDir ); console.log('twine upload output:', result.stdout); console.log('twine upload stderr:', result.stderr); expect(result.exitCode).toEqual(0); }); tap.test('PyPI CLI: should verify package in simple index', async () => { if (!hasTwine) { console.log('Skipping - twine not available'); return; } const packageName = 'test-pypi-pkg'; const response = await fetch(`${registryUrl}/pypi/simple/${packageName}/`); expect(response.status).toEqual(200); const html = await response.text(); expect(html).toContain('1.0.0'); }); tap.test('PyPI CLI: should upload sdist with twine', async () => { if (!hasTwine) { console.log('Skipping twine test - twine not available'); return; } const packageName = 'test-pypi-pkg'; const version = '1.1.0'; const sdistPath = await createTestSdistFile(packageName, version, testDir); expect(fs.existsSync(sdistPath)).toEqual(true); const result = await runTwineCommand( `upload --repository testpypi "${sdistPath}"`, testDir ); console.log('twine upload sdist output:', result.stdout); console.log('twine upload sdist stderr:', result.stderr); expect(result.exitCode).toEqual(0); }); tap.test('PyPI CLI: should list all versions in simple index', async () => { if (!hasTwine) { console.log('Skipping - twine not available'); return; } const packageName = 'test-pypi-pkg'; const response = await fetch(`${registryUrl}/pypi/simple/${packageName}/`); expect(response.status).toEqual(200); const html = await response.text(); expect(html).toContain('1.0.0'); expect(html).toContain('1.1.0'); }); tap.test('PyPI CLI: should get JSON metadata', async () => { if (!hasTwine) { console.log('Skipping - twine not available'); return; } const packageName = 'test-pypi-pkg'; const response = await fetch(`${registryUrl}/pypi/pypi/${packageName}/json`); expect(response.status).toEqual(200); const metadata = await response.json(); expect(metadata.info).toBeDefined(); expect(metadata.info.name).toEqual(packageName); expect(metadata.releases).toBeDefined(); expect(metadata.releases['1.0.0']).toBeDefined(); }); tap.test('PyPI CLI: should download package with pip', async () => { if (!hasPip || !hasTwine) { console.log('Skipping pip download test - pip or twine not available'); return; } const downloadDir = path.join(testDir, 'downloads'); fs.mkdirSync(downloadDir, { recursive: true }); // Download (not install) the package const result = await runPipCommand( `download test-pypi-pkg==1.0.0 --dest "${downloadDir}" --no-deps`, testDir ); console.log('pip download output:', result.stdout); console.log('pip download stderr:', result.stderr); // pip download may fail if the package doesn't meet pip's requirements // Just check it doesn't crash expect(result.exitCode).toBeLessThanOrEqual(1); }); tap.test('PyPI CLI: should search for packages via API', async () => { const packageName = 'test-pypi-pkg'; // Use the JSON API to search/list const response = await fetch(`${registryUrl}/pypi/pypi/${packageName}/json`); expect(response.status).toEqual(200); const metadata = await response.json(); expect(metadata.info.name).toEqual(packageName); }); tap.test('PyPI CLI: should fail upload without auth', async () => { if (!hasTwine) { console.log('Skipping twine test - twine not available'); return; } const packageName = 'unauth-pkg'; const version = '1.0.0'; const wheelPath = await createTestWheelFile(packageName, version, testDir); // Create a pypirc without proper credentials const badPypircPath = path.join(testDir, '.bad-pypirc'); fs.writeFileSync(badPypircPath, `[distutils] index-servers = badpypi [badpypi] repository = ${registryUrl}/pypi username = baduser password = badtoken `, 'utf-8'); const fullCommand = `cd "${testDir}" && twine upload --repository badpypi "${wheelPath}" --config-file "${badPypircPath}"`; try { const result = await tapNodeTools.runCommand(fullCommand); // Should fail expect(result.exitCode).not.toEqual(0); } catch (error: any) { // Expected to fail expect(error.exitCode || 1).not.toEqual(0); } }); tap.postTask('cleanup pypi cli tests', async () => { // Stop server if (server) { await new Promise((resolve) => { server.close(() => resolve()); }); } // Cleanup test directory if (testDir) { cleanupTestDir(testDir); } // Destroy registry if (registry) { registry.destroy(); } }); export default tap.start();