import * as Api from "@libs/proto/core/notification/v1/notification_api";
import * as Notification from "@libs/proto/core/notification/v1/notification";
import { ClientReadableStream } from "grpc-web";
import { Injectable } from "@angular/core";
import { MainService } from "@libs/main.service";
import { Product } from "@libs/models/product";
import { Bid } from "@libs/models/bid";
import { fromEvent, ReplaySubject, Subscription } from "rxjs";
import { v4 as uuidv4 } from "uuid";
import { AuthorizationStreamInterceptor } from "@libs/util/interceptor-stream";

@Injectable({
  providedIn: "root",
})
export class GrpcNotificationService {
  private client: Api.core.notification.v1.NotificationAPIClient;

  productChangedEvent$: ReplaySubject<Product> = new ReplaySubject<Product>(1);
  bidChangedEvent$: ReplaySubject<any> = new ReplaySubject<any>(1);
  notificationSubs: ClientReadableStream<Api.core.notification.v1.SubscribeNotificationResponse>;
  isStreamActive = false;
  tokenRenewalSubscription: Subscription;

  constructor(public mainService: MainService) {
    this.client = new Api.core.notification.v1.NotificationAPIClient(mainService.endpointBase, null, {
      streamInterceptors: [new AuthorizationStreamInterceptor()],
    });
    this.listenForTokenRenewal();
  }

  private listenForTokenRenewal() {
    this.tokenRenewalSubscription = fromEvent(document, "tokenRenewed").subscribe((event: CustomEvent) => {
      console.log("Token renewed, restarting stream...");
      this.stopStreamNotifications();
      this.initStreamNotifications();
    });
  }

  ngOnDestroy() {
    this.tokenRenewalSubscription.unsubscribe();
    this.stopStreamNotifications();
  }

  initStreamNotifications() {
    const notification: Notification.core.notification.v1.SubscribeNotification[] = [
      new Notification.core.notification.v1.SubscribeNotification({
        productChangedEvent: new Notification.core.notification.v1.ProductChangedEvent(),
      }),
      new Notification.core.notification.v1.SubscribeNotification({
        bidReceivedEvent: new Notification.core.notification.v1.BidReceivedEvent(),
      })
    ];
    const request = new Api.core.notification.v1.SubscribeNotificationRequest({
      id: uuidv4(),
      notification,
    });
    this.notificationSubs = this.client.Subscribe(request, {});
    this.isStreamActive = true;
    this.notificationSubs.on("data", this.handleData);
    this.notificationSubs.on("error", this.handleError);
  }

  initStreamBidReceived() {
    const notification: Notification.core.notification.v1.SubscribeNotification[] = [
      new Notification.core.notification.v1.SubscribeNotification({
        bidReceivedEvent: new Notification.core.notification.v1.BidReceivedEvent(),
      }),
    ];
    const request = new Api.core.notification.v1.SubscribeNotificationRequest({
      id: uuidv4(),
      notification,
    });

    console.log('uid', request);

    this.notificationSubs = this.client.Subscribe(request, {});
    this.isStreamActive = true;
    this.notificationSubs.on("data", this.handleData);
    this.notificationSubs.on("error", this.handleError);
  }

  private handleData = (response) => {
    const res = response.toObject();
    const productChangeEvent = res.notification.productChangedEvent;
    const bidChangeEvent = res.notification.bidReceivedEvent;
    if (productChangeEvent?.product) {
      this.productChangedEvent$.next(productChangeEvent.product);
    }
    if (bidChangeEvent) {
      this.bidChangedEvent$.next(bidChangeEvent);
    }
  };

  private handleError = (error) => {
    console.error("Stream error:", error);
  };

  stopStreamNotifications() {
    if (this.notificationSubs && this.isStreamActive) {
      this.notificationSubs.cancel();
      this.isStreamActive = false;
    }
  }
}
