



















import { Vue, Component, Prop } from 'vue-property-decorator';
import { interval, Subject } from 'rxjs';
import { webSocket, WebSocketSubject } from 'rxjs/webSocket';
import { skip, takeUntil } from 'rxjs/operators';
import TimeSeriesChart, {
  TimeSeriesChartDataset,
  TimeSeriesDataPoint,
} from './TimeSeriesChart.vue';
import { addMinutes, subMinutes } from 'date-fns';
import { Device, DeviceEventLog } from '@/models/device/models';
import {
  DeviceDataEvent,
  DeviceEventLogEventQuery,
  DeviceEventType,
} from '@/models/device/interfaces';
import {
  EventData,
  ModelConfig,
  StreamConfig,
} from '@/apps/monitoring/interfaces';
import { PLOT_COLORS } from '../explorer/colors';
import { globalStore } from '@/store/modules/global';

@Component({
  components: {
    TimeSeriesChart,
  },
})
export default class LiveChart extends Vue {
  @Prop({ required: true }) device!: string;
  @Prop({ required: true }) appId!: string;
  destroySubject = new Subject<void>();

  deviceObject: Device | null = null;
  webSocketSubject: WebSocketSubject<DeviceDataEvent> | null = null;
  timer: ReturnType<typeof setTimeout> | null = null;
  data: TimeSeriesDataPoint[] = [];
  datasets: TimeSeriesChartDataset[] = [];
  start = subMinutes(new Date(), 10);
  end = addMinutes(new Date(), 5);
  maxDataPoints = 10_000;
  streamConfig: StreamConfig | null = null;

  get socketFacility(): string | null {
    return this.streamConfig?.webSocketFacility ?? null;
  }

  get socketUrl(): string {
    let host = window.location.host;
    if (host.startsWith('localhost')) {
      host = 'next-dms.leitwert.ch';
    }
    return `wss://${host}/ws/${this.socketFacility}?subscribe-broadcast`;
  }

  async mounted(): Promise<void> {
    await this.load();
    this.$apiv2
      .getRefreshStream()
      .pipe(takeUntil(this.destroySubject))
      .subscribe(() => {
        this.load();
      });

    // move chart every 5min
    interval(5 * 60 * 1000)
      .pipe(takeUntil(this.destroySubject))
      .subscribe(() => this.setStartAndEnd());
  }

  destroyed(): void {
    this.destroySubject.next();
    this.destroySubject.complete();
    this.webSocketSubject?.complete();
  }

  async load(): Promise<void> {
    console.log('LiveChart: load');
    try {
      this.deviceObject = await this.$apiv2.get<Device>(Device, this.device);
      this.getStreamConfig();
      this.setStartAndEnd();
      this.setDataSets();
      await this.fetchData(this.start, this.end);
      this.subscribeToWebSocket();
    } catch (error) {
      this.$errorHandler.handleError(error);
    }
  }

  getStreamConfig(): void {
    const setting = globalStore.clientAppSetting('device_models');
    if (setting?.value) {
      const deviceModels: ModelConfig[] = setting.value.device_models || [];
      let modelConfig: ModelConfig | null =
        deviceModels.find(m => m.id === this.deviceObject?.model) ?? null;
      if (modelConfig?.monitoringConfig?.streams?.length) {
        this.streamConfig = modelConfig.monitoringConfig.streams[0];
      }
    }
  }

  setStartAndEnd(): void {
    this.start = subMinutes(new Date(), 10);
    this.end = addMinutes(new Date(), 5);
  }

  setDataSets(): void {
    if (this.streamConfig) {
      this.datasets = this.streamConfig.parametersConfig.map(
        (paramConfig, index) => {
          return {
            label: `${paramConfig.label} [${paramConfig.unit}]`,
            backgroundColor: PLOT_COLORS[index],
            borderColor: PLOT_COLORS[index],
            data: [],
          };
        },
      );
    }
  }

  async fetchData(start: Date, end: Date): Promise<void> {
    console.log('LiveChart: fetchData', start, end);
    const loading = this.$buefy.loading.open({});
    try {
      const query: DeviceEventLogEventQuery = {
        device: this.device,
        order_by: 'time_asc',
        page: 1,
        page_size: 500,
        event_type: DeviceEventType.DEVICE_DATA,
        identity: this.streamConfig?.stream ?? 'no_stream',
        // request a bit more to fill chart
        start_time: subMinutes(start, 5).toISOString(),
        end_time: end.toISOString(),
      };
      const response = await DeviceEventLog.queryEvents<DeviceDataEvent>(query);
      if (response.results.length) {
        response.results.forEach(event => {
          this.processDeviceDataEvent(event);
        });
      }
    } catch (error) {
      this.$errorHandler.handleError(error);
    }
    loading.close();
  }

  subscribeToWebSocket(): void {
    this.webSocketSubject?.complete();
    this.webSocketSubject = webSocket(this.socketUrl);
    // skip(1) to skip first old event that is retained by backend
    this.webSocketSubject.pipe(skip(1)).subscribe(
      deviceDataEvent => {
        console.log(
          'Message from server',
          deviceDataEvent.identity,
          deviceDataEvent.device,
          deviceDataEvent.time,
        );
        this.processDeviceDataEvent(deviceDataEvent);
      },
      err => console.log('websocket error', err),
      () => console.log('complete'),
    );
  }

  processDeviceDataEvent(deviceDataEvent: DeviceDataEvent): void {
    if (
      this.streamConfig &&
      deviceDataEvent.identity === this.streamConfig.stream &&
      deviceDataEvent.device === this.device
    ) {
      if (deviceDataEvent?.payload?.data_json) {
        const newDataSets = [...this.datasets];
        let eventData: EventData | undefined;
        if (
          this.streamConfig.usesDataJsonString &&
          deviceDataEvent.payload.data_json
        ) {
          eventData = JSON.parse(deviceDataEvent.payload.data_json);
        } else {
          eventData = deviceDataEvent.payload.data;
        }
        if (eventData !== undefined) {
          this.streamConfig.parametersConfig.forEach(
            (paramConfig, paramIndex) => {
              if (eventData === undefined) {
                return;
              }
              const valueIndex = eventData.columns.findIndex(
                e => e === paramConfig.valueColumn,
              );
              eventData.index.forEach((timestamp, index) => {
                newDataSets[paramIndex].data.push({
                  x: timestamp as string,
                  y: eventData!.data[index][valueIndex] as number,
                });
              });
              while (newDataSets[paramIndex].data.length > this.maxDataPoints) {
                newDataSets[paramIndex].data.shift();
              }
            },
          );
        }
        this.datasets = newDataSets;
      }
    }
  }
}
