import * as plugins from './plugins.js'; export class CodeFeed { private baseUrl: string; private token?: string; private lastRunTimestamp: string; private pageLimit = 50; // Raw changelog content for the current repository private changelogContent: string = ''; // npm registry helper for published-on-npm checks private npmRegistry: plugins.smartnpm.NpmRegistry; // In-memory stateful cache of commits private enableCache: boolean = false; private cacheWindowMs?: number; private cache: plugins.interfaces.ICommitResult[] = []; // enable or disable npm publishedOnNpm checks (true by default) private enableNpmCheck: boolean = true; // return only tagged commits (false by default) private enableTaggedOnly: boolean = false; // allow/deny filters private orgAllowlist?: string[]; private orgDenylist?: string[]; constructor( baseUrl: string, token?: string, lastRunTimestamp?: string, options?: { enableCache?: boolean; cacheWindowMs?: number; enableNpmCheck?: boolean; taggedOnly?: boolean; orgAllowlist?: string[]; orgDenylist?: string[]; } ) { this.baseUrl = baseUrl; this.token = token; this.lastRunTimestamp = lastRunTimestamp ?? new Date(Date.now() - 7 * 24 * 60 * 60 * 1000).toISOString(); // configure stateful caching this.enableCache = options?.enableCache ?? false; this.cacheWindowMs = options?.cacheWindowMs; this.enableNpmCheck = options?.enableNpmCheck ?? true; this.enableTaggedOnly = options?.taggedOnly ?? false; this.orgAllowlist = options?.orgAllowlist; this.orgDenylist = options?.orgDenylist; this.cache = []; // npm registry instance for version lookups this.npmRegistry = new plugins.smartnpm.NpmRegistry(); console.log('CodeFeed initialized with last run timestamp:', this.lastRunTimestamp); } /** * Fetch all new commits (since lastRunTimestamp) across all orgs and repos. */ public async fetchAllCommitsFromInstance(): Promise { // Controlled concurrency with AsyncExecutionStack const stack = new plugins.lik.AsyncExecutionStack(); stack.setNonExclusiveMaxConcurrency(20); // determine since timestamp for this run (stateful caching) let effectiveSince = this.lastRunTimestamp; if (this.enableCache && this.cache.length > 0) { // use newest timestamp in cache to fetch only tail effectiveSince = this.cache.reduce( (max, c) => (c.timestamp > max ? c.timestamp : max), effectiveSince ); } // 1) get all organizations let orgs = await this.fetchAllOrganizations(); // apply allow/deny filters if (this.orgAllowlist && this.orgAllowlist.length > 0) { orgs = orgs.filter((o) => this.orgAllowlist!.includes(o)); } if (this.orgDenylist && this.orgDenylist.length > 0) { orgs = orgs.filter((o) => !this.orgDenylist!.includes(o)); } // 2) fetch repos per org in parallel const repoLists = await Promise.all( orgs.map((org) => stack.getNonExclusiveExecutionSlot(() => this.fetchRepositoriesForOrg(org)) ) ); // flatten to [{ owner, name }] const allRepos = orgs.flatMap((org, i) => repoLists[i].map((r) => ({ owner: org, name: r.name })) ); // 3) probe latest commit per repo and fetch full list only if new commits exist const commitJobs = allRepos.map(({ owner, name }) => stack.getNonExclusiveExecutionSlot(async () => { try { // 3a) Probe the most recent commit (limit=1) const probeResp = await this.fetchFunction( `/api/v1/repos/${owner}/${name}/commits?limit=1`, { headers: this.token ? { Authorization: `token ${this.token}` } : {} } ); if (!probeResp.ok) { throw new Error(`Probe failed for ${owner}/${name}: ${probeResp.statusText}`); } const probeData: plugins.interfaces.ICommit[] = await probeResp.json(); // If no commits or no new commits since last run, skip if ( probeData.length === 0 || new Date(probeData[0].commit.author.date).getTime() <= new Date(effectiveSince).getTime() ) { return { owner, name, commits: [] }; } // 3b) Fetch commits since last run const commits = await this.fetchRecentCommitsForRepo( owner, name, effectiveSince ); return { owner, name, commits }; } catch (e: any) { console.error(`Failed to fetch commits for ${owner}/${name}:`, e.message); return { owner, name, commits: [] }; } }) ); const commitResults = await Promise.all(commitJobs); // 4) build new commit entries with tagging, npm and changelog support const newResults: plugins.interfaces.ICommitResult[] = []; for (const { owner, name, commits } of commitResults) { // skip repos with no new commits if (commits.length === 0) { this.changelogContent = ''; continue; } // load changelog for this repo await this.loadChangelogFromRepo(owner, name); // fetch tags for this repo let taggedShas: Set; let tagNameBySha: Map; try { const tagInfo = await this.fetchTags(owner, name); taggedShas = tagInfo.shas; tagNameBySha = tagInfo.map; } catch (e: any) { console.error(`Failed to fetch tags for ${owner}/${name}:`, e.message); taggedShas = new Set(); tagNameBySha = new Map(); } // fetch npm package info only if any new commits correspond to a tag const hasTaggedCommit = commits.some((c) => taggedShas.has(c.sha)); let pkgInfo: { allVersions: Array<{ version: string }> } | null = null; if (hasTaggedCommit && this.enableNpmCheck) { try { pkgInfo = await this.npmRegistry.getPackageInfo(`@${owner}/${name}`); } catch (e: any) { console.error(`Failed to fetch package info for ${owner}/${name}:`, e.message); pkgInfo = null; } } // build commit entries for (const c of commits) { const isTagged = taggedShas.has(c.sha); // derive version from tag name if present (strip leading 'v') let versionFromTag: string | undefined; if (isTagged) { const tagName = tagNameBySha.get(c.sha); if (tagName) { versionFromTag = tagName.startsWith('v') ? tagName.substring(1) : tagName; } } const publishedOnNpm = isTagged && pkgInfo && versionFromTag ? pkgInfo.allVersions.some((v) => v.version === versionFromTag) : false; let changelogEntry: string | undefined; if (this.changelogContent) { if (versionFromTag) { changelogEntry = this.getChangelogForVersion(versionFromTag); } } newResults.push({ baseUrl: this.baseUrl, org: owner, repo: name, timestamp: c.commit.author.date, prettyAgoTime: plugins.smarttime.getMilliSecondsAsHumanReadableAgoTime( new Date(c.commit.author.date).getTime() ), hash: c.sha, commitMessage: c.commit.message, tagged: isTagged, publishedOnNpm, changelog: changelogEntry, }); } } // if caching is enabled, merge into in-memory cache and return full cache if (this.enableCache) { const existingHashes = new Set(this.cache.map((c) => c.hash)); const uniqueNew = newResults.filter((c) => !existingHashes.has(c.hash)); this.cache.push(...uniqueNew); // trim commits older than window if (this.cacheWindowMs !== undefined) { const cutoff = Date.now() - this.cacheWindowMs; this.cache = this.cache.filter((c) => new Date(c.timestamp).getTime() >= cutoff); } // advance lastRunTimestamp to now this.lastRunTimestamp = new Date().toISOString(); // sort descending by timestamp this.cache.sort((a, b) => b.timestamp.localeCompare(a.timestamp)); // apply tagged-only filter if requested if (this.enableTaggedOnly) { return this.cache.filter((c) => c.tagged === true); } return this.cache; } // no caching: apply tagged-only filter if requested // sort and dedupe const seen = new Set(); const unique = newResults.filter((c) => { if (seen.has(c.hash)) return false; seen.add(c.hash); return true; }); unique.sort((a, b) => b.timestamp.localeCompare(a.timestamp)); if (this.enableTaggedOnly) { return unique.filter((c) => c.tagged === true); } return unique; } /** * Load the changelog directly from the Gitea repository. */ private async loadChangelogFromRepo(owner: string, repo: string): Promise { const headers: Record = {}; if (this.token) headers['Authorization'] = `token ${this.token}`; const candidates = [ 'CHANGELOG.md', 'changelog.md', 'Changelog.md', 'docs/CHANGELOG.md', ]; for (const path of candidates) { const url = `/api/v1/repos/${owner}/${repo}/contents/${encodeURIComponent(path)}`; const response = await this.fetchFunction(url, { headers }); if (!response.ok) { continue; } try { const data = await response.json(); if (data && data.content) { this.changelogContent = Buffer.from(data.content, 'base64').toString('utf8'); return; } } catch { // continue trying others } } this.changelogContent = ''; } /** * Parse the changelog to find the entry for a given version. * The changelog format is assumed as: * * # Changelog * * ## - - * */ private getChangelogForVersion(version: string): string | undefined { if (!this.changelogContent) { return undefined; } const lines = this.changelogContent.split('\n'); const versionHeaderIndex = lines.findIndex((line) => line.includes(`- ${version} -`)); if (versionHeaderIndex === -1) { return undefined; } const changelogLines: string[] = []; for (let i = versionHeaderIndex + 1; i < lines.length; i++) { const line = lines[i]; // The next version header starts with `## ` if (line.startsWith('## ')) { break; } changelogLines.push(line); } return changelogLines.join('\n').trim(); } /** * Fetch all tags for a given repo and return the set of tagged commit SHAs */ private async fetchTags(owner: string, repo: string): Promise<{ shas: Set; map: Map }> { const taggedShas = new Set(); const tagNameBySha = new Map(); let page = 1; while (true) { const url = `/api/v1/repos/${owner}/${repo}/tags?limit=${this.pageLimit}&page=${page}`; const resp = await this.fetchFunction(url, { headers: this.token ? { Authorization: `token ${this.token}` } : {}, }); if (!resp.ok) { console.error(`Failed to fetch tags for ${owner}/${repo}: ${resp.status} ${resp.statusText}`); return { shas: taggedShas, map: tagNameBySha }; } const data: plugins.interfaces.ITag[] = await resp.json(); if (data.length === 0) break; for (const t of data) { const sha = t.commit?.sha; if (sha) { taggedShas.add(sha); if (t.name) tagNameBySha.set(sha, t.name); } } if (data.length < this.pageLimit) break; page++; } return { shas: taggedShas, map: tagNameBySha }; } private async fetchAllOrganizations(): Promise { const headers = this.token ? { Authorization: `token ${this.token}` } : {}; let page = 1; const orgs: string[] = []; while (true) { const resp = await this.fetchFunction(`/api/v1/orgs?limit=${this.pageLimit}&page=${page}`, { headers }); if (!resp.ok) { throw new Error(`Failed to fetch organizations: ${resp.status} ${resp.statusText}`); } const data: { username: string }[] = await resp.json(); if (data.length === 0) break; orgs.push(...data.map((o) => o.username)); if (data.length < this.pageLimit) break; page++; } return orgs; } private async fetchRepositoriesForOrg(org: string): Promise { const headers = this.token ? { Authorization: `token ${this.token}` } : {}; let page = 1; const repos: plugins.interfaces.IRepository[] = []; while (true) { const resp = await this.fetchFunction(`/api/v1/orgs/${org}/repos?limit=${this.pageLimit}&page=${page}`, { headers }); if (!resp.ok) { throw new Error(`Failed to fetch repositories for ${org}: ${resp.status} ${resp.statusText}`); } const data: plugins.interfaces.IRepository[] = await resp.json(); if (data.length === 0) break; repos.push(...data); if (data.length < this.pageLimit) break; page++; } return repos; } private async fetchRecentCommitsForRepo( owner: string, repo: string, sinceTimestamp?: string ): Promise { const since = sinceTimestamp ?? this.lastRunTimestamp; const headers = this.token ? { Authorization: `token ${this.token}` } : {}; let page = 1; const commits: plugins.interfaces.ICommit[] = []; while (true) { const url = `/api/v1/repos/${owner}/${repo}/commits?since=${encodeURIComponent(since)}&limit=${this.pageLimit}&page=${page}`; const resp = await this.fetchFunction(url, { headers }); if (!resp.ok) { throw new Error(`Failed to fetch commits for ${owner}/${repo}: ${resp.status} ${resp.statusText}`); } const data: plugins.interfaces.ICommit[] = await resp.json(); if (data.length === 0) break; commits.push(...data); if (data.length < this.pageLimit) break; page++; } return commits; } public async fetchFunction( urlArg: string, optionsArg: RequestInit = {} ): Promise { const maxAttempts = 4; let attempt = 0; let lastError: any; while (attempt < maxAttempts) { try { const resp = await fetch(`${this.baseUrl}${urlArg}`, optionsArg); // retry on 429 and 5xx if (resp.status === 429 || resp.status >= 500) { const retryAfter = Number(resp.headers.get('retry-after')); const backoffMs = retryAfter ? retryAfter * 1000 : Math.min(32000, 1000 * Math.pow(2, attempt)) + Math.floor(Math.random() * 250); await new Promise((r) => setTimeout(r, backoffMs)); attempt++; continue; } return resp; } catch (e: any) { lastError = e; const backoffMs = Math.min(32000, 1000 * Math.pow(2, attempt)) + Math.floor(Math.random() * 250); await new Promise((r) => setTimeout(r, backoffMs)); attempt++; } } throw new Error(`fetchFunction failed after retries for ${urlArg}: ${lastError?.message ?? 'unknown error'}`); } }