import { Injectable, inject } from '@angular/core'; import { HttpClient } from '@angular/common/http'; import { BehaviorSubject, combineLatest, distinctUntilChanged, debounceTime, map, switchMap, filter, shareReplay, from, of } from 'rxjs'; import type { ProviderId } from '../core/providers/provider-registry'; import type { SuggestionItemV1, SearchResponseV1 } from './api.v1'; import { YtAdapter } from './adapters/yt'; import { DmAdapter } from './adapters/dm'; import { TwAdapter } from './adapters/tw'; import { PtAdapter } from './adapters/pt'; import { OdAdapter } from './adapters/od'; import { RuAdapter } from './adapters/ru'; import type { ProviderAdapter, ProviderSearchParams, SearchResult } from './models'; import { VideoItem } from 'src/app/shared/models/video-item.model'; export type SuggestionItem = SuggestionItemV1; export type SearchResponse = SearchResponseV1; type CacheKey = string; interface CacheEntry { t: number; data: SearchResult; } @Injectable({ providedIn: 'root' }) export class SearchService { private http = inject(HttpClient); // Subjects for params from UI/URL readonly q$ = new BehaviorSubject(''); readonly providers$ = new BehaviorSubject('all'); readonly page$ = new BehaviorSubject(1); readonly pageSize$ = new BehaviorSubject(24); readonly sort$ = new BehaviorSubject<'relevance' | 'date' | 'views' | 'duration'>('relevance'); // In-memory cache 60s per (provider, q, params) private cache = new Map>(); private cacheTtlMs = 60_000; // Adapters registry private adapters: Record> = { yt: new YtAdapter(this.http), dm: new DmAdapter(this.http), tw: new TwAdapter(this.http), pt: new PtAdapter(this.http), od: new OdAdapter(this.http), ru: new RuAdapter(this.http) }; readonly params$ = combineLatest([this.q$, this.providers$, this.page$, this.pageSize$, this.sort$]).pipe( debounceTime(120), distinctUntilChanged((a, b) => JSON.stringify(a) === JSON.stringify(b)) ); // Public request stream: aggregates groups by provider, runs adapters in parallel with timeout/abort/retry readonly request$ = this.params$.pipe( filter(([q]) => typeof q === 'string' && q.trim().length >= 2), switchMap(([q, prov, page, pageSize, sort]) => from(this.runAdapters({ q: String(q), pageToken: String(page || 1), sort: (sort as any) || 'relevance', // global filter knobs could be added here: time/length/type in the future }, prov))) , // Map provider results to the format expected by the UI map((byProvider) => { const groups = Object.entries(byProvider).reduce((acc, [pid, result]) => { acc[pid as ProviderId] = result.items; return acc; }, {} as Record); const providers = Object.keys(byProvider) as ProviderId[]; const resp: SearchResponse = { q: this.q$.value, providers, groups }; return resp; }), shareReplay(1) ); // Orchestrate all active providers in parallel with timeout(8s), retry(1) and abort on param change. private async runAdapters(params: ProviderSearchParams, prov: ProviderId[] | 'all'): Promise>> { const active: ProviderId[] = prov === 'all' ? ['yt','dm','tw','pt','od','ru'] : (Array.isArray(prov) ? prov : []); const controller = new AbortController(); const signal = controller.signal; const tasks = active.map(async (pid) => { const adapter = this.adapters[pid]; if (!adapter) return [pid, { items: [] } as SearchResult] as const; const key: CacheKey = `${pid}|${params.q}|${params.pageToken}|${params.sort}`; const now = Date.now(); const cached = this.cache.get(key); if (cached && (now - cached.t) < this.cacheTtlMs) { return [pid, cached.data] as const; } const perProviderTimeout = 8000; const withTimeout = (p: Promise): Promise => new Promise((resolve, reject) => { const tid = setTimeout(() => reject(new Error('timeout')), perProviderTimeout); p.then(v => { clearTimeout(tid); resolve(v); }).catch(e => { clearTimeout(tid); reject(e); }); }); const attempt = async (): Promise> => withTimeout(adapter.search(params, signal)); try { const res: SearchResult = await attempt().catch(async (e) => { // retry once on network-like errors if (e && (e.name === 'AbortError' || String(e.message || '').includes('abort'))) throw e; try { return await attempt(); } catch (err) { throw err; } }); this.cache.set(key, { t: Date.now(), data: res }); return [pid, res] as const; } catch { // Swallow errors per provider, return empty return [pid, { items: [] } as SearchResult] as const; } }); const results = await Promise.all(tasks); return Object.fromEntries(results) as Record>; } // Convenience setter helpers setQuery(q: string) { this.q$.next(q || ''); } setProviders(list: ProviderId[] | 'all') { this.providers$.next(list && (Array.isArray(list) ? list : 'all')); } setPage(page: number) { this.page$.next(Math.max(1, Math.floor(page || 1))); } setPageSize(size: number) { this.pageSize$.next(Math.min(50, Math.max(1, Math.floor(size || 24)))); } setSort(sort: 'relevance' | 'date' | 'views' | 'duration') { this.sort$.next(sort || 'relevance'); } }