added changes
This commit is contained in:
pigwin
2025-01-11 21:03:15 +01:00
committed by GitHub
2 changed files with 81 additions and 45 deletions

View File

@@ -1,11 +1,17 @@
package database package database
import ( import (
"context"
"crypto/md5"
"database/sql" "database/sql"
"encoding/hex"
"fmt" "fmt"
"ti1/valki"
"github.com/valkey-io/valkey-go"
) )
func InsertOrUpdateRecordedCall(db *sql.DB, values []interface{}) (int, string, error) { func InsertOrUpdateRecordedCall(ctx context.Context, db *sql.DB, values []interface{}, valkeyClient valkey.Client) (int, string, error) {
// Replace empty strings with nil for timestamp fields // Replace empty strings with nil for timestamp fields
for i, v := range values { for i, v := range values {
if str, ok := v.(string); ok && str == "" { if str, ok := v.(string); ok && str == "" {
@@ -13,46 +19,71 @@ func InsertOrUpdateRecordedCall(db *sql.DB, values []interface{}) (int, string,
} }
} }
query := ` // Convert values to a single string and hash it using MD5
INSERT INTO calls ( var valuesString string
estimatedvehiclejourney, "order", stoppointref, for _, v := range values {
aimeddeparturetime, expecteddeparturetime, if v != nil {
aimedarrivaltime, expectedarrivaltime, valuesString += fmt.Sprintf("%v", v)
cancellation, actualdeparturetime, actualarrivaltime,
recorded_data
)
VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11)
ON CONFLICT (estimatedvehiclejourney, "order")
DO UPDATE SET
stoppointref = EXCLUDED.stoppointref,
aimeddeparturetime = EXCLUDED.aimeddeparturetime,
expecteddeparturetime = EXCLUDED.expecteddeparturetime,
aimedarrivaltime = EXCLUDED.aimedarrivaltime,
expectedarrivaltime = EXCLUDED.expectedarrivaltime,
cancellation = EXCLUDED.cancellation,
actualdeparturetime = EXCLUDED.actualdeparturetime,
actualarrivaltime = EXCLUDED.actualarrivaltime,
recorded_data = EXCLUDED.recorded_data
RETURNING CASE WHEN xmax = 0 THEN 'insert' ELSE 'update' END, id;
`
stmt, err := db.Prepare(query)
if err != nil {
return 0, "", fmt.Errorf("error preparing statement: %v", err)
}
defer stmt.Close()
var action string
var id int
err = stmt.QueryRow(values...).Scan(&action, &id)
if err != nil {
if 1 == 0 {
fmt.Println("Executing query:", query)
for i, v := range values {
fmt.Printf("Value %d: (%v)\n", i+1, v)
}
} }
return 0, "", fmt.Errorf("error executing statement: %v", err)
} }
return id, action, nil hash := md5.Sum([]byte(valuesString))
hashString := hex.EncodeToString(hash[:])
estimatedVehicleJourneyID := values[0]
orderID := values[1]
key := fmt.Sprintf("%v.%v", estimatedVehicleJourneyID, orderID)
var err error
// Get the MD5 hash from Valkey
retrievedHash, err := valki.GetValkeyValue(ctx, valkeyClient, key)
if err != nil {
return 0, "", fmt.Errorf("failed to get value from Valkey: %v", err)
}
// Check if the retrieved value matches the original MD5 hash
if retrievedHash != hashString {
query := `
INSERT INTO calls (
estimatedvehiclejourney, "order", stoppointref,
aimeddeparturetime, expecteddeparturetime,
aimedarrivaltime, expectedarrivaltime,
cancellation, actualdeparturetime, actualarrivaltime,
recorded_data
)
VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11)
ON CONFLICT (estimatedvehiclejourney, "order")
DO UPDATE SET
stoppointref = EXCLUDED.stoppointref,
aimeddeparturetime = EXCLUDED.aimeddeparturetime,
expecteddeparturetime = EXCLUDED.expecteddeparturetime,
aimedarrivaltime = EXCLUDED.aimedarrivaltime,
expectedarrivaltime = EXCLUDED.expectedarrivaltime,
cancellation = EXCLUDED.cancellation,
actualdeparturetime = EXCLUDED.actualdeparturetime,
actualarrivaltime = EXCLUDED.actualarrivaltime,
recorded_data = EXCLUDED.recorded_data
RETURNING CASE WHEN xmax = 0 THEN 'insert' ELSE 'update' END, id;
`
stmt, err := db.Prepare(query)
if err != nil {
return 0, "", fmt.Errorf("error preparing statement: %v", err)
}
defer stmt.Close()
err = valki.SetValkeyValue(ctx, valkeyClient, key, hashString)
if err != nil {
return 0, "", fmt.Errorf("failed to set value in Valkey: %v", err)
}
var action string
var id int
err = stmt.QueryRow(values...).Scan(&action, &id)
if err != nil {
return 0, "", fmt.Errorf("error executing statement: %v", err)
}
return id, action, nil
} else {
return 0, "none", nil
}
} }

View File

@@ -38,7 +38,7 @@ func DBData(data *data.Data) {
fmt.Println("SID:", sid) fmt.Println("SID:", sid)
// counters // counters
var insertCount, updateCount, totalCount, estimatedCallInsertCount, estimatedCallUpdateCount, estimatedCallNoneCount, recordedCallInsertCount, recordedCallUpdateCount int var insertCount, updateCount, totalCount, estimatedCallInsertCount, estimatedCallUpdateCount, estimatedCallNoneCount, recordedCallInsertCount, recordedCallUpdateCount, recordedCallNoneCount int
for _, journey := range data.ServiceDelivery.EstimatedTimetableDelivery[0].EstimatedJourneyVersionFrame.EstimatedVehicleJourney { for _, journey := range data.ServiceDelivery.EstimatedTimetableDelivery[0].EstimatedJourneyVersionFrame.EstimatedVehicleJourney {
var values []interface{} var values []interface{}
@@ -169,7 +169,7 @@ func DBData(data *data.Data) {
//fmt.Printf("Inserts: %d, Updates: %d, Total: %d\n", insertCount, updateCount, totalCount) //fmt.Printf("Inserts: %d, Updates: %d, Total: %d\n", insertCount, updateCount, totalCount)
if totalCount%1000 == 0 { if totalCount%1000 == 0 {
fmt.Printf( fmt.Printf(
"Inserts: %d, Updates: %d, Total: %d; estimatedCalls = I: %d U: %d N: %d; recordedCalls = I: %d U: %d\n", "Inserts: %d, Updates: %d, Total: %d; estimatedCalls = I: %d U: %d N: %d; recordedCalls = I: %d U: %d N: %d\n",
insertCount, insertCount,
updateCount, updateCount,
totalCount, totalCount,
@@ -178,6 +178,7 @@ func DBData(data *data.Data) {
estimatedCallNoneCount, estimatedCallNoneCount,
recordedCallInsertCount, recordedCallInsertCount,
recordedCallUpdateCount, recordedCallUpdateCount,
recordedCallNoneCount,
) )
} }
} }
@@ -380,7 +381,7 @@ func DBData(data *data.Data) {
interfaceValues[i] = v interfaceValues[i] = v
} }
id, action, err := database.InsertOrUpdateRecordedCall(db, interfaceValues) id, action, err := database.InsertOrUpdateRecordedCall(ctx, db, interfaceValues, valkeyClient)
if err != nil { if err != nil {
fmt.Printf("Error inserting/updating recorded call: %v\n", err) fmt.Printf("Error inserting/updating recorded call: %v\n", err)
} else { } else {
@@ -393,6 +394,8 @@ func DBData(data *data.Data) {
//fmt.Printf("Action: %s, ID: %d\n", action, id) //fmt.Printf("Action: %s, ID: %d\n", action, id)
} else if action == "update" { } else if action == "update" {
recordedCallUpdateCount++ recordedCallUpdateCount++
} else if action == "none" {
recordedCallNoneCount++
} }
} }
} }
@@ -400,7 +403,7 @@ func DBData(data *data.Data) {
} }
fmt.Printf( fmt.Printf(
"DONE: Inserts: %d, Updates: %d, Total: %d; estimatedCalls = I: %d U: %d N: %d; recordedCalls = I: %d U: %d\n", "DONE: Inserts: %d, Updates: %d, Total: %d; estimatedCalls = I: %d U: %d N: %d; recordedCalls = I: %d U: %d N: %d\n",
insertCount, insertCount,
updateCount, updateCount,
totalCount, totalCount,
@@ -409,6 +412,7 @@ func DBData(data *data.Data) {
estimatedCallNoneCount, estimatedCallNoneCount,
recordedCallInsertCount, recordedCallInsertCount,
recordedCallUpdateCount, recordedCallUpdateCount,
recordedCallNoneCount,
) )
// Create map to hold JSON // Create map to hold JSON
serviceDeliveryJsonObject := make(map[string]interface{}) serviceDeliveryJsonObject := make(map[string]interface{})
@@ -421,6 +425,7 @@ func DBData(data *data.Data) {
serviceDeliveryJsonObject["EstimatedCallNone"] = estimatedCallNoneCount serviceDeliveryJsonObject["EstimatedCallNone"] = estimatedCallNoneCount
serviceDeliveryJsonObject["RecordedCallInserts"] = recordedCallInsertCount serviceDeliveryJsonObject["RecordedCallInserts"] = recordedCallInsertCount
serviceDeliveryJsonObject["RecordedCallUpdates"] = recordedCallUpdateCount serviceDeliveryJsonObject["RecordedCallUpdates"] = recordedCallUpdateCount
serviceDeliveryJsonObject["RecordedCallNone"] = recordedCallNoneCount
// Convert JSON object to JSON string // Convert JSON object to JSON string
serviceDeliveryJsonString, err := json.Marshal(serviceDeliveryJsonObject) serviceDeliveryJsonString, err := json.Marshal(serviceDeliveryJsonObject)