import ReconnectingEventSource from 'reconnecting-eventsource';
import { BehaviorSubject, fromEvent, Observable, pairwise, switchMap } from 'rxjs';
import { distinctUntilChanged, map } from 'rxjs/operators';
import { userService } from './user.service';

function eventSourceFactory(): ReconnectingEventSource {
  return new ReconnectingEventSource(`/api/info/sse?channel=dashboard`, {
    // TODO: this doesn't work, figure out how to pass headers
    headers: {
      connection: 'keep-alive',
      'x-request-channel': 'dashboard',
    },
    max_retry_time: 1000 * 15,
    withCredentials: true,
  } as any);
}

class SseService {
  private readonly eventSource$ = new BehaviorSubject<ReconnectingEventSource | null>(null);

  private readonly eventSourceProducer$: Observable<ReconnectingEventSource | null> = userService.user$.pipe(
    map((user) => user?.id),
    distinctUntilChanged(),
    map((userId) => userId ? eventSourceFactory() : null),
    pairwise(),
    map(([prevEventSource, eventSource]) => {
      if (prevEventSource) {
        try {
          prevEventSource.close();
        } catch (error) {
          console.warn(error, 'failed to close previous event source');
        }
      }

      return eventSource;
    }),
  );

  constructor() {
    this.eventSourceProducer$.subscribe({
      next: (eventSource) => {
        this.eventSource$.next(eventSource);
      },
    })
  }

  getEvents<T extends object>(eventName: string): Observable<T> {
    return this.eventSource$.pipe(
      switchMap((eventSource: ReconnectingEventSource | null) => {
        if (!eventSource) {
          return new Observable<T>();
        }

        return fromEvent(eventSource, eventName).pipe(
          map((event: MessageEvent<any>) => {
            const data = JSON.parse(event.data) as T
            console.debug(`SSE event '${event.type}'`, event, data);

            return data;
          }),
        )
      }),
    );
  }
}

export const sseService = new SseService();
