import { TrackPoint } from "@chartedsails/sailing-data";
import {
  Builder,
  Float64,
  makeBuilder,
  makeData,
  RecordBatch,
  Schema,
  Struct,
  Table,
  TimestampMillisecond,
} from "apache-arrow";
import {
  SailingArrowSchema,
  SailingArrowSchemaType,
} from "./SailingArrowSchema";

/**
 * A builder for a SailingArrowTable. It will dynamically add new columns as
 * they appear the trackpoints.
 *
 * Methods allow the caller to get the current schema and the data as a
 * RecordBatch or as a Table.
 *
 * You can also force the schema, in which case it will stick to the provided
 * schema and insert nulls (for variables with no values) or ignore variables
 * that are not in the schema.
 *
 */
export class SailingArrowBuilder {
  private finalized = false;
  private builders: Partial<{ [k in keyof SailingArrowSchemaType]: Builder }> &
    Pick<
      { [k in keyof SailingArrowSchemaType]: Builder },
      "time" | "latitude" | "longitude"
    > = {
    time: makeBuilder({
      type: new TimestampMillisecond(),
    }),
    longitude: makeBuilder({
      type: new Float64(),
      nullValues: [NaN, undefined, null],
    }),
    latitude: makeBuilder({
      type: new Float64(),
      nullValues: [NaN, undefined, null],
    }),
  };

  public constructor(
    private schema: Schema<SailingArrowSchemaType> | undefined = undefined
  ) {}

  public append(point: TrackPoint) {
    if (this.finalized) {
      throw new Error(`builder already finalized.`);
    }

    this.builders["time"] = this.builders["time"].append(point.time);
    this.builders["longitude"].append(point.position?.[0]);
    this.builders["latitude"].append(point.position?.[1]);

    // Add other columns
    for (const key of Object.keys(point) as Array<
      keyof SailingArrowSchemaType
    >) {
      // We already handled these two columns - Position is a special case.
      if (["time", "position"].includes(key)) {
        continue;
      }
      // Create a new column if needed
      if (!this.builders[key]) {
        // If we have a forced schema and the new column is not in it then we skip this key.
        if (this.schema && !this.schema.fields.find((f) => f.name === key)) {
          continue;
        }

        const type = SailingArrowSchema.fields.find(
          (f) => f.name === key
        )?.type;
        // Skip this key if it's not defined in SailingArrowSchema
        if (!type) {
          continue;
        }

        // Make a builder and pad with nulls so that this vector will have the same length as the other one
        this.builders[key] = makeBuilder({
          type,
          nullValues: [NaN, undefined, null],
        });
        // length -1 because we already appended the time but not the current value
        for (let i = 0; i < this.builders["time"].length - 1; i++) {
          this.builders[key]!.append(null);
        }
      }

      this.builders[key]!.append(point[key as keyof TrackPoint]);
    }

    // We need to keep all the builders in sync so that they have the same length.
    // Append nulls to fields that were not defined in this trackpoint.
    const nullFields = Object.keys(this.builders).filter(
      (field) =>
        !Object.keys(point).includes(field) &&
        !["time", "latitude", "longitude"].includes(field)
    ) as Array<keyof SailingArrowSchemaType>;
    for (const key of nullFields) {
      this.builders[key]!.append(null);
    }
  }

  public length() {
    return this.builders["time"].length;
  }

  public getSchema() {
    // Return the schema given to the constructor or generate a new one based on what data we have seen.
    if (this.schema) {
      return this.schema;
    } else {
      return SailingArrowSchema.select(
        Object.keys(this.builders) as Array<keyof SailingArrowSchemaType>
      );
    }
  }

  public toRecordBatch() {
    if (this.finalized) {
      throw new Error(`builder already finalized.`);
    }
    this.finalized = true;
    const schema = this.getSchema();

    const type = new Struct<SailingArrowSchemaType>(schema.fields);
    const children = schema.fields.map((f) => {
      const dataBlobs =
        this.builders[f.name as keyof SailingArrowSchemaType]?.toVector().data;

      // If we did not see any of these values make a null vector.
      if (!dataBlobs) {
        return makeData({
          type: SailingArrowSchema.fields.find((sf) => sf.name === f.name)!
            .type,
          length: this.builders["time"].length,
          nullCount: this.builders["time"].length,
        });
      }

      if (!dataBlobs || dataBlobs.length > 1) {
        throw new Error(
          `Vector returned by builder has invalid count of data blobs (field: ${f.name} length: ${dataBlobs?.length})`
        );
      }
      return dataBlobs[0];
    });

    const data = makeData<typeof type>({
      type,
      children,
      // TODO: Try setting property offset here instead of filling with empty values.
    });
    let batch = new RecordBatch(schema, data);
    return batch;
  }

  public toTable() {
    return new Table(this.getSchema(), [this.toRecordBatch()]);
  }
}
