use valkey for Estimated Vehicle

This commit is contained in:
pigwin
2025-01-11 18:30:53 +00:00
parent c09ea5784a
commit bed666bd81
2 changed files with 70 additions and 30 deletions

View File

@@ -1,41 +1,79 @@
package database package database
import ( import (
"context"
"crypto/md5"
"database/sql" "database/sql"
"encoding/hex"
"fmt" "fmt"
"ti1/valki"
"github.com/valkey-io/valkey-go"
) )
func InsertOrUpdateEstimatedVehicleJourney(db *sql.DB, values []interface{}) (int, string, error) { func InsertOrUpdateEstimatedVehicleJourney(ctx context.Context, db *sql.DB, values []interface{}, valkeyClient valkey.Client) (int, string, error) {
query := ` // Generate a key using lineref, directionref, datasource, and datedvehiclejourneyref
INSERT INTO estimatedvehiclejourney (servicedelivery, recordedattime, lineref, directionref, datasource, datedvehiclejourneyref, vehiclemode, dataframeref, originref, destinationref, operatorref, vehicleref, cancellation, other, firstservicedelivery) lineref := values[2]
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $1) directionref := values[3]
ON CONFLICT (lineref, directionref, datasource, datedvehiclejourneyref) datasource := values[4]
DO UPDATE SET datedvehiclejourneyref := values[5]
servicedelivery = EXCLUDED.servicedelivery, key := fmt.Sprintf("%v.%v.%v.%v", lineref, directionref, datasource, datedvehiclejourneyref)
recordedattime = EXCLUDED.recordedattime,
vehiclemode = COALESCE(EXCLUDED.vehiclemode, estimatedvehiclejourney.vehiclemode),
dataframeref = COALESCE(EXCLUDED.dataframeref, estimatedvehiclejourney.dataframeref),
originref = COALESCE(EXCLUDED.originref, estimatedvehiclejourney.originref),
destinationref = COALESCE(EXCLUDED.destinationref, estimatedvehiclejourney.destinationref),
operatorref = COALESCE(EXCLUDED.operatorref, estimatedvehiclejourney.operatorref),
vehicleref = COALESCE(EXCLUDED.vehicleref, estimatedvehiclejourney.vehicleref),
cancellation = COALESCE(EXCLUDED.cancellation, estimatedvehiclejourney.cancellation),
other = COALESCE(EXCLUDED.other, estimatedvehiclejourney.other)
RETURNING CASE WHEN xmax = 0 THEN 'insert' ELSE 'update' END, id;
`
stmt, err := db.Prepare(query) // Convert values to a single string and hash it using MD5
if err != nil { var valuesString string
return 0, "", fmt.Errorf("error preparing statement: %v", err) for _, v := range values {
if v != nil {
valuesString += fmt.Sprintf("%v", v)
}
} }
defer stmt.Close() hash := md5.Sum([]byte(valuesString))
hashString := hex.EncodeToString(hash[:])
var action string // Get the MD5 hash from Valkey
var id int retrievedHash, err := valki.GetValkeyValue(ctx, valkeyClient, key)
err = stmt.QueryRow(values...).Scan(&action, &id)
if err != nil { if err != nil {
return 0, "", fmt.Errorf("error executing statement: %v", err) return 0, "", fmt.Errorf("failed to get value from Valkey: %v", err)
} }
return id, action, nil // Check if the retrieved value matches the original MD5 hash
if retrievedHash != hashString {
query := `
INSERT INTO estimatedvehiclejourney (servicedelivery, recordedattime, lineref, directionref, datasource, datedvehiclejourneyref, vehiclemode, dataframeref, originref, destinationref, operatorref, vehicleref, cancellation, other, firstservicedelivery)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $1)
ON CONFLICT (lineref, directionref, datasource, datedvehiclejourneyref)
DO UPDATE SET
servicedelivery = EXCLUDED.servicedelivery,
recordedattime = EXCLUDED.recordedattime,
vehiclemode = COALESCE(EXCLUDED.vehiclemode, estimatedvehiclejourney.vehiclemode),
dataframeref = COALESCE(EXCLUDED.dataframeref, estimatedvehiclejourney.dataframeref),
originref = COALESCE(EXCLUDED.originref, estimatedvehiclejourney.originref),
destinationref = COALESCE(EXCLUDED.destinationref, estimatedvehiclejourney.destinationref),
operatorref = COALESCE(EXCLUDED.operatorref, estimatedvehiclejourney.operatorref),
vehicleref = COALESCE(EXCLUDED.vehicleref, estimatedvehiclejourney.vehicleref),
cancellation = COALESCE(EXCLUDED.cancellation, estimatedvehiclejourney.cancellation),
other = COALESCE(EXCLUDED.other, estimatedvehiclejourney.other)
RETURNING CASE WHEN xmax = 0 THEN 'insert' ELSE 'update' END, id;
`
stmt, err := db.Prepare(query)
if err != nil {
return 0, "", fmt.Errorf("error preparing statement: %v", err)
}
defer stmt.Close()
err = valki.SetValkeyValue(ctx, valkeyClient, key, hashString)
if err != nil {
return 0, "", fmt.Errorf("failed to set value in Valkey: %v", err)
}
var action string
var id int
err = stmt.QueryRow(values...).Scan(&action, &id)
if err != nil {
return 0, "", fmt.Errorf("error executing statement: %v", err)
}
return id, action, nil
} else {
return 0, "none", nil
}
} }

View File

@@ -38,7 +38,7 @@ func DBData(data *data.Data) {
fmt.Println("SID:", sid) fmt.Println("SID:", sid)
// counters // counters
var insertCount, updateCount, totalCount, estimatedCallInsertCount, estimatedCallUpdateCount, estimatedCallNoneCount, recordedCallInsertCount, recordedCallUpdateCount, recordedCallNoneCount int var insertCount, updateCount, noneCount, totalCount, estimatedCallInsertCount, estimatedCallUpdateCount, estimatedCallNoneCount, recordedCallInsertCount, recordedCallUpdateCount, recordedCallNoneCount int
for _, journey := range data.ServiceDelivery.EstimatedTimetableDelivery[0].EstimatedJourneyVersionFrame.EstimatedVehicleJourney { for _, journey := range data.ServiceDelivery.EstimatedTimetableDelivery[0].EstimatedJourneyVersionFrame.EstimatedVehicleJourney {
var values []interface{} var values []interface{}
@@ -151,7 +151,7 @@ func DBData(data *data.Data) {
values = append(values, otherJson) values = append(values, otherJson)
// Insert or update the record // Insert or update the record
id, action, err := database.InsertOrUpdateEstimatedVehicleJourney(db, values) id, action, err := database.InsertOrUpdateEstimatedVehicleJourney(ctx, db, values, valkeyClient)
if err != nil { if err != nil {
fmt.Printf("Error inserting/updating estimated vehicle journey: %v\n", err) fmt.Printf("Error inserting/updating estimated vehicle journey: %v\n", err)
} else { } else {
@@ -163,8 +163,10 @@ func DBData(data *data.Data) {
insertCount++ insertCount++
} else if action == "update" { } else if action == "update" {
updateCount++ updateCount++
} else if action == "none" {
noneCount++
} }
totalCount = insertCount + updateCount totalCount = insertCount + updateCount + noneCount
//fmt.Printf("Inserts: %d, Updates: %d, Total: %d\n", insertCount, updateCount, totalCount) //fmt.Printf("Inserts: %d, Updates: %d, Total: %d\n", insertCount, updateCount, totalCount)
if totalCount%1000 == 0 { if totalCount%1000 == 0 {