Files
ti1/database/RecordedCall.go
2025-12-23 13:24:20 +00:00

124 lines
3.4 KiB
Go

package database
import (
"context"
"crypto/md5"
"database/sql"
"encoding/hex"
"fmt"
"sync"
"ti1/valki"
"github.com/valkey-io/valkey-go"
)
func InsertOrUpdateRecordedCall(ctx context.Context, db *sql.DB, values []interface{}, valkeyClient valkey.Client) (int, string, error) {
// Replace empty strings with nil for timestamp fields
for i, v := range values {
if str, ok := v.(string); ok && str == "" {
values[i] = nil
}
}
// Convert values to a single string and hash it using MD5
var valuesString string
for _, v := range values {
if v != nil {
valuesString += fmt.Sprintf("%v", v)
}
}
hash := md5.Sum([]byte(valuesString))
hashString := hex.EncodeToString(hash[:])
estimatedVehicleJourneyID := values[0]
orderID := values[1]
key := fmt.Sprintf("%v.%v", estimatedVehicleJourneyID, orderID)
// Get the MD5 hash from Valkey
retrievedHash, err := valki.GetValkeyValue(ctx, valkeyClient, key)
if err != nil {
return 0, "", fmt.Errorf("failed to get value from Valkey: %w", err)
}
// Check if the retrieved value matches the original MD5 hash
if retrievedHash != hashString {
query := `
INSERT INTO calls (
estimatedvehiclejourney, "order", stoppointref,
aimeddeparturetime, expecteddeparturetime,
aimedarrivaltime, expectedarrivaltime,
cancellation, actualdeparturetime, actualarrivaltime,
recorded_data
)
VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11)
ON CONFLICT (estimatedvehiclejourney, "order")
DO UPDATE SET
stoppointref = EXCLUDED.stoppointref,
aimeddeparturetime = EXCLUDED.aimeddeparturetime,
expecteddeparturetime = EXCLUDED.expecteddeparturetime,
aimedarrivaltime = EXCLUDED.aimedarrivaltime,
expectedarrivaltime = EXCLUDED.expectedarrivaltime,
cancellation = EXCLUDED.cancellation,
actualdeparturetime = EXCLUDED.actualdeparturetime,
actualarrivaltime = EXCLUDED.actualarrivaltime,
recorded_data = EXCLUDED.recorded_data
RETURNING CASE WHEN xmax = 0 THEN 'insert' ELSE 'update' END, id;
`
err = valki.SetValkeyValue(ctx, valkeyClient, key, hashString)
if err != nil {
return 0, "", fmt.Errorf("failed to set value in Valkey: %w", err)
}
var action string
var id int
err = db.QueryRowContext(ctx, query, values...).Scan(&action, &id)
if err != nil {
return 0, "", fmt.Errorf("error executing statement: %w", err)
}
return id, action, nil
}
return 0, "none", nil
}
// BatchInsertRecordedCalls processes multiple recorded calls concurrently
func BatchInsertRecordedCalls(ctx context.Context, db *sql.DB, batch [][]interface{}, valkeyClient valkey.Client, workerCount int) ([]CallResult, error) {
if len(batch) == 0 {
return nil, nil
}
results := make([]CallResult, len(batch))
jobs := make(chan int, len(batch))
var wg sync.WaitGroup
// Start workers
for w := 0; w < workerCount; w++ {
wg.Add(1)
go func() {
defer wg.Done()
for idx := range jobs {
select {
case <-ctx.Done():
return
default:
id, action, err := InsertOrUpdateRecordedCall(ctx, db, batch[idx], valkeyClient)
results[idx] = CallResult{
ID: id,
Action: action,
Error: err,
}
}
}
}()
}
// Send jobs
for i := range batch {
jobs <- i
}
close(jobs)
wg.Wait()
return results, nil
}