import { Subject, timer, race, of } from "rxjs";
import {
  switchAll,
  catchError,
  retry,
  tap,
  concatMap,
  delay,
  filter,
  take,
  retryWhen,
} from "rxjs/operators";

import { webSocket, WebSocketSubject } from "rxjs/webSocket";
import { getSiteConfigs } from "./site-config";

const WS_SITE = getSiteConfigs().WS_SITE;

export default class WebSocket {
  private socket$: WebSocketSubject<any> | undefined;

  private messagesSubject$ = new Subject() as any;
  public messages$ = this.messagesSubject$.pipe(
    switchAll(),
    catchError((e) => {
      throw e;
    })
  );

  private heartbeatSub: any;

  private token: string | undefined;

  connect(token: string) {
    this.token = token;
    if (!this.socket$ || this.socket$.closed) {
      this.socket$ = this.getNewWebSocket();
      const messages = this.socket$.pipe(
        retryWhen((errors) =>
          errors.pipe(
            tap(() => console.log("retry")),
            delay(20000)
          )
        )
      );
      this.messagesSubject$.next(messages);
    }
  }

  sendMessage(msg: any) {
    this.socket$!.next(msg);
  }

  close() {
    this.socket$!.complete();
    this.socket$ = undefined;
  }

  getNewWebSocket() {
    if (this.heartbeatSub) this.heartbeatSub.unsubscribe();

    const url = WS_SITE + `?token=${this.token}`;

    return webSocket({
      url,
      deserializer: (val) => {
        if (val.data === "H") return "H";
        else return JSON.parse(val.data);
      },
      serializer: (val) => {
        if (val === "H") {
          return "H";
        } else return JSON.stringify(val);
      },
      openObserver: {
        next: () => {
          //interval 45s, 等待15s，累计一分钟心跳
          console.log("[DataService]: send H" + new Date().toLocaleString());
          this.heartbeatSub = timer(0, 15000)
            .pipe(
              tap(() => {
                this.sendMessage("H");
              }),
              concatMap(() =>
                race(
                  of("timeout").pipe(delay(15000)),
                  this.socket$!.pipe(
                    filter((msg) => msg === "H"),
                    take(1)
                  )
                )
              )
            )
            .subscribe(
              (resp) => {
                if (resp === "timeout") {
                  console.log("[DataService]: timeout");
                  this.heartbeatSub.unsubscribe();
                }
              },
              (err) => {
                this.heartbeatSub.unsubscribe();
              }
            );
        },
      },
      closeObserver: {
        next: () => {
          console.log("[DataService]: connection closed");
          this.heartbeatSub?.unsubscribe();
        },
      },
    });
  }
}
