From 8af34a9ab361d0430aa89309b9535436744b1f7a Mon Sep 17 00:00:00 2001 From: pigwin Date: Sat, 11 Jan 2025 17:26:43 +0000 Subject: [PATCH 1/4] InsertOrUpdateRecordedCall use valkey :) --- database/RecordedCall.go | 113 +++++++++++++++++++++++++-------------- 1 file changed, 72 insertions(+), 41 deletions(-) diff --git a/database/RecordedCall.go b/database/RecordedCall.go index 6cd1136..667236d 100644 --- a/database/RecordedCall.go +++ b/database/RecordedCall.go @@ -1,11 +1,17 @@ package database import ( + "context" + "crypto/md5" "database/sql" + "encoding/hex" "fmt" + "ti1/valki" + + "github.com/valkey-io/valkey-go" ) -func InsertOrUpdateRecordedCall(db *sql.DB, values []interface{}) (int, string, error) { +func InsertOrUpdateRecordedCall(ctx context.Context, db *sql.DB, values []interface{}, valkeyClient valkey.Client) (int, string, error) { // Replace empty strings with nil for timestamp fields for i, v := range values { if str, ok := v.(string); ok && str == "" { @@ -13,46 +19,71 @@ func InsertOrUpdateRecordedCall(db *sql.DB, values []interface{}) (int, string, } } - query := ` - INSERT INTO calls ( - estimatedvehiclejourney, "order", stoppointref, - aimeddeparturetime, expecteddeparturetime, - aimedarrivaltime, expectedarrivaltime, - cancellation, actualdeparturetime, actualarrivaltime, - recorded_data - ) - VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11) - ON CONFLICT (estimatedvehiclejourney, "order") - DO UPDATE SET - stoppointref = EXCLUDED.stoppointref, - aimeddeparturetime = EXCLUDED.aimeddeparturetime, - expecteddeparturetime = EXCLUDED.expecteddeparturetime, - aimedarrivaltime = EXCLUDED.aimedarrivaltime, - expectedarrivaltime = EXCLUDED.expectedarrivaltime, - cancellation = EXCLUDED.cancellation, - actualdeparturetime = EXCLUDED.actualdeparturetime, - actualarrivaltime = EXCLUDED.actualarrivaltime, - recorded_data = EXCLUDED.recorded_data - RETURNING CASE WHEN xmax = 0 THEN 'insert' ELSE 'update' END, id; - ` - stmt, err := db.Prepare(query) - if err != nil { - return 0, "", fmt.Errorf("error preparing statement: %v", err) - } - defer stmt.Close() - - var action string - var id int - err = stmt.QueryRow(values...).Scan(&action, &id) - if err != nil { - if 1 == 0 { - fmt.Println("Executing query:", query) - for i, v := range values { - fmt.Printf("Value %d: (%v)\n", i+1, v) - } - + // Convert values to a single string and hash it using MD5 + var valuesString string + for _, v := range values { + if v != nil { + valuesString += fmt.Sprintf("%v", v) } - return 0, "", fmt.Errorf("error executing statement: %v", err) } - return id, action, nil + hash := md5.Sum([]byte(valuesString)) + hashString := hex.EncodeToString(hash[:]) + + estimatedVehicleJourneyID := values[0] + orderID := values[1] + key := fmt.Sprintf("%v.%v", estimatedVehicleJourneyID, orderID) + + var err error + + // Get the MD5 hash from Valkey + retrievedHash, err := valki.GetValkeyValue(ctx, valkeyClient, key) + if err != nil { + return 0, "", fmt.Errorf("failed to get value from Valkey: %v", err) + } + + // Check if the retrieved value matches the original MD5 hash + if retrievedHash != hashString { + query := ` + INSERT INTO calls ( + estimatedvehiclejourney, "order", stoppointref, + aimeddeparturetime, expecteddeparturetime, + aimedarrivaltime, expectedarrivaltime, + cancellation, actualdeparturetime, actualarrivaltime, + recorded_data + ) + VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11) + ON CONFLICT (estimatedvehiclejourney, "order") + DO UPDATE SET + stoppointref = EXCLUDED.stoppointref, + aimeddeparturetime = EXCLUDED.aimeddeparturetime, + expecteddeparturetime = EXCLUDED.expecteddeparturetime, + aimedarrivaltime = EXCLUDED.aimedarrivaltime, + expectedarrivaltime = EXCLUDED.expectedarrivaltime, + cancellation = EXCLUDED.cancellation, + actualdeparturetime = EXCLUDED.actualdeparturetime, + actualarrivaltime = EXCLUDED.actualarrivaltime, + recorded_data = EXCLUDED.recorded_data + RETURNING CASE WHEN xmax = 0 THEN 'insert' ELSE 'update' END, id; + ` + stmt, err := db.Prepare(query) + if err != nil { + return 0, "", fmt.Errorf("error preparing statement: %v", err) + } + defer stmt.Close() + + err = valki.SetValkeyValue(ctx, valkeyClient, key, hashString) + if err != nil { + return 0, "", fmt.Errorf("failed to set value in Valkey: %v", err) + } + + var action string + var id int + err = stmt.QueryRow(values...).Scan(&action, &id) + if err != nil { + return 0, "", fmt.Errorf("error executing statement: %v", err) + } + return id, action, nil + } else { + return 0, "none", nil + } } From c09ea5784af806752d97bcad908383855e4f337f Mon Sep 17 00:00:00 2001 From: pigwin Date: Sat, 11 Jan 2025 17:38:34 +0000 Subject: [PATCH 2/4] woops forgot that part --- export/database.go | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/export/database.go b/export/database.go index 6789c17..cf817eb 100644 --- a/export/database.go +++ b/export/database.go @@ -38,7 +38,7 @@ func DBData(data *data.Data) { fmt.Println("SID:", sid) // counters - var insertCount, updateCount, totalCount, estimatedCallInsertCount, estimatedCallUpdateCount, estimatedCallNoneCount, recordedCallInsertCount, recordedCallUpdateCount int + var insertCount, updateCount, totalCount, estimatedCallInsertCount, estimatedCallUpdateCount, estimatedCallNoneCount, recordedCallInsertCount, recordedCallUpdateCount, recordedCallNoneCount int for _, journey := range data.ServiceDelivery.EstimatedTimetableDelivery[0].EstimatedJourneyVersionFrame.EstimatedVehicleJourney { var values []interface{} @@ -169,7 +169,7 @@ func DBData(data *data.Data) { //fmt.Printf("Inserts: %d, Updates: %d, Total: %d\n", insertCount, updateCount, totalCount) if totalCount%1000 == 0 { fmt.Printf( - "Inserts: %d, Updates: %d, Total: %d; estimatedCalls = I: %d U: %d N: %d; recordedCalls = I: %d U: %d\n", + "Inserts: %d, Updates: %d, Total: %d; estimatedCalls = I: %d U: %d N: %d; recordedCalls = I: %d U: %d N: %d\n", insertCount, updateCount, totalCount, @@ -178,6 +178,7 @@ func DBData(data *data.Data) { estimatedCallNoneCount, recordedCallInsertCount, recordedCallUpdateCount, + recordedCallNoneCount, ) } } @@ -380,7 +381,7 @@ func DBData(data *data.Data) { interfaceValues[i] = v } - id, action, err := database.InsertOrUpdateRecordedCall(db, interfaceValues) + id, action, err := database.InsertOrUpdateRecordedCall(ctx, db, interfaceValues, valkeyClient) if err != nil { fmt.Printf("Error inserting/updating recorded call: %v\n", err) } else { @@ -393,6 +394,8 @@ func DBData(data *data.Data) { //fmt.Printf("Action: %s, ID: %d\n", action, id) } else if action == "update" { recordedCallUpdateCount++ + } else if action == "none" { + recordedCallNoneCount++ } } } @@ -400,7 +403,7 @@ func DBData(data *data.Data) { } fmt.Printf( - "DONE: Inserts: %d, Updates: %d, Total: %d; estimatedCalls = I: %d U: %d N: %d; recordedCalls = I: %d U: %d\n", + "DONE: Inserts: %d, Updates: %d, Total: %d; estimatedCalls = I: %d U: %d N: %d; recordedCalls = I: %d U: %d N: %d\n", insertCount, updateCount, totalCount, @@ -409,6 +412,7 @@ func DBData(data *data.Data) { estimatedCallNoneCount, recordedCallInsertCount, recordedCallUpdateCount, + recordedCallNoneCount, ) // Create map to hold JSON serviceDeliveryJsonObject := make(map[string]interface{}) @@ -421,6 +425,7 @@ func DBData(data *data.Data) { serviceDeliveryJsonObject["EstimatedCallNone"] = estimatedCallNoneCount serviceDeliveryJsonObject["RecordedCallInserts"] = recordedCallInsertCount serviceDeliveryJsonObject["RecordedCallUpdates"] = recordedCallUpdateCount + serviceDeliveryJsonObject["RecordedCallNone"] = recordedCallNoneCount // Convert JSON object to JSON string serviceDeliveryJsonString, err := json.Marshal(serviceDeliveryJsonObject) From bed666bd81d453b358301c2c94895f386a4d5949 Mon Sep 17 00:00:00 2001 From: pigwin Date: Sat, 11 Jan 2025 18:30:53 +0000 Subject: [PATCH 3/4] use valkey for Estimated Vehicle --- database/EstimatedVehicleJourney.go | 92 ++++++++++++++++++++--------- export/database.go | 8 ++- 2 files changed, 70 insertions(+), 30 deletions(-) diff --git a/database/EstimatedVehicleJourney.go b/database/EstimatedVehicleJourney.go index 4efa54a..b04924b 100644 --- a/database/EstimatedVehicleJourney.go +++ b/database/EstimatedVehicleJourney.go @@ -1,41 +1,79 @@ package database import ( + "context" + "crypto/md5" "database/sql" + "encoding/hex" "fmt" + "ti1/valki" + + "github.com/valkey-io/valkey-go" ) -func InsertOrUpdateEstimatedVehicleJourney(db *sql.DB, values []interface{}) (int, string, error) { - query := ` - INSERT INTO estimatedvehiclejourney (servicedelivery, recordedattime, lineref, directionref, datasource, datedvehiclejourneyref, vehiclemode, dataframeref, originref, destinationref, operatorref, vehicleref, cancellation, other, firstservicedelivery) - VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $1) - ON CONFLICT (lineref, directionref, datasource, datedvehiclejourneyref) - DO UPDATE SET - servicedelivery = EXCLUDED.servicedelivery, - recordedattime = EXCLUDED.recordedattime, - vehiclemode = COALESCE(EXCLUDED.vehiclemode, estimatedvehiclejourney.vehiclemode), - dataframeref = COALESCE(EXCLUDED.dataframeref, estimatedvehiclejourney.dataframeref), - originref = COALESCE(EXCLUDED.originref, estimatedvehiclejourney.originref), - destinationref = COALESCE(EXCLUDED.destinationref, estimatedvehiclejourney.destinationref), - operatorref = COALESCE(EXCLUDED.operatorref, estimatedvehiclejourney.operatorref), - vehicleref = COALESCE(EXCLUDED.vehicleref, estimatedvehiclejourney.vehicleref), - cancellation = COALESCE(EXCLUDED.cancellation, estimatedvehiclejourney.cancellation), - other = COALESCE(EXCLUDED.other, estimatedvehiclejourney.other) - RETURNING CASE WHEN xmax = 0 THEN 'insert' ELSE 'update' END, id; - ` +func InsertOrUpdateEstimatedVehicleJourney(ctx context.Context, db *sql.DB, values []interface{}, valkeyClient valkey.Client) (int, string, error) { + // Generate a key using lineref, directionref, datasource, and datedvehiclejourneyref + lineref := values[2] + directionref := values[3] + datasource := values[4] + datedvehiclejourneyref := values[5] + key := fmt.Sprintf("%v.%v.%v.%v", lineref, directionref, datasource, datedvehiclejourneyref) - stmt, err := db.Prepare(query) - if err != nil { - return 0, "", fmt.Errorf("error preparing statement: %v", err) + // Convert values to a single string and hash it using MD5 + var valuesString string + for _, v := range values { + if v != nil { + valuesString += fmt.Sprintf("%v", v) + } } - defer stmt.Close() + hash := md5.Sum([]byte(valuesString)) + hashString := hex.EncodeToString(hash[:]) - var action string - var id int - err = stmt.QueryRow(values...).Scan(&action, &id) + // Get the MD5 hash from Valkey + retrievedHash, err := valki.GetValkeyValue(ctx, valkeyClient, key) if err != nil { - return 0, "", fmt.Errorf("error executing statement: %v", err) + return 0, "", fmt.Errorf("failed to get value from Valkey: %v", err) } - return id, action, nil + // Check if the retrieved value matches the original MD5 hash + if retrievedHash != hashString { + query := ` + INSERT INTO estimatedvehiclejourney (servicedelivery, recordedattime, lineref, directionref, datasource, datedvehiclejourneyref, vehiclemode, dataframeref, originref, destinationref, operatorref, vehicleref, cancellation, other, firstservicedelivery) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $1) + ON CONFLICT (lineref, directionref, datasource, datedvehiclejourneyref) + DO UPDATE SET + servicedelivery = EXCLUDED.servicedelivery, + recordedattime = EXCLUDED.recordedattime, + vehiclemode = COALESCE(EXCLUDED.vehiclemode, estimatedvehiclejourney.vehiclemode), + dataframeref = COALESCE(EXCLUDED.dataframeref, estimatedvehiclejourney.dataframeref), + originref = COALESCE(EXCLUDED.originref, estimatedvehiclejourney.originref), + destinationref = COALESCE(EXCLUDED.destinationref, estimatedvehiclejourney.destinationref), + operatorref = COALESCE(EXCLUDED.operatorref, estimatedvehiclejourney.operatorref), + vehicleref = COALESCE(EXCLUDED.vehicleref, estimatedvehiclejourney.vehicleref), + cancellation = COALESCE(EXCLUDED.cancellation, estimatedvehiclejourney.cancellation), + other = COALESCE(EXCLUDED.other, estimatedvehiclejourney.other) + RETURNING CASE WHEN xmax = 0 THEN 'insert' ELSE 'update' END, id; + ` + + stmt, err := db.Prepare(query) + if err != nil { + return 0, "", fmt.Errorf("error preparing statement: %v", err) + } + defer stmt.Close() + + err = valki.SetValkeyValue(ctx, valkeyClient, key, hashString) + if err != nil { + return 0, "", fmt.Errorf("failed to set value in Valkey: %v", err) + } + + var action string + var id int + err = stmt.QueryRow(values...).Scan(&action, &id) + if err != nil { + return 0, "", fmt.Errorf("error executing statement: %v", err) + } + return id, action, nil + } else { + return 0, "none", nil + } } diff --git a/export/database.go b/export/database.go index cf817eb..68e59c8 100644 --- a/export/database.go +++ b/export/database.go @@ -38,7 +38,7 @@ func DBData(data *data.Data) { fmt.Println("SID:", sid) // counters - var insertCount, updateCount, totalCount, estimatedCallInsertCount, estimatedCallUpdateCount, estimatedCallNoneCount, recordedCallInsertCount, recordedCallUpdateCount, recordedCallNoneCount int + var insertCount, updateCount, noneCount, totalCount, estimatedCallInsertCount, estimatedCallUpdateCount, estimatedCallNoneCount, recordedCallInsertCount, recordedCallUpdateCount, recordedCallNoneCount int for _, journey := range data.ServiceDelivery.EstimatedTimetableDelivery[0].EstimatedJourneyVersionFrame.EstimatedVehicleJourney { var values []interface{} @@ -151,7 +151,7 @@ func DBData(data *data.Data) { values = append(values, otherJson) // Insert or update the record - id, action, err := database.InsertOrUpdateEstimatedVehicleJourney(db, values) + id, action, err := database.InsertOrUpdateEstimatedVehicleJourney(ctx, db, values, valkeyClient) if err != nil { fmt.Printf("Error inserting/updating estimated vehicle journey: %v\n", err) } else { @@ -163,8 +163,10 @@ func DBData(data *data.Data) { insertCount++ } else if action == "update" { updateCount++ + } else if action == "none" { + noneCount++ } - totalCount = insertCount + updateCount + totalCount = insertCount + updateCount + noneCount //fmt.Printf("Inserts: %d, Updates: %d, Total: %d\n", insertCount, updateCount, totalCount) if totalCount%1000 == 0 { From fd43b3e4bb747fdd4bd2510be1fc008474c060df Mon Sep 17 00:00:00 2001 From: pigwin Date: Sat, 11 Jan 2025 19:51:41 +0000 Subject: [PATCH 4/4] nvm that esitmated via part --- database/EstimatedVehicleJourney.go | 96 +++++++++-------------------- export/database.go | 8 +-- 2 files changed, 32 insertions(+), 72 deletions(-) diff --git a/database/EstimatedVehicleJourney.go b/database/EstimatedVehicleJourney.go index b04924b..4efa54a 100644 --- a/database/EstimatedVehicleJourney.go +++ b/database/EstimatedVehicleJourney.go @@ -1,79 +1,41 @@ package database import ( - "context" - "crypto/md5" "database/sql" - "encoding/hex" "fmt" - "ti1/valki" - - "github.com/valkey-io/valkey-go" ) -func InsertOrUpdateEstimatedVehicleJourney(ctx context.Context, db *sql.DB, values []interface{}, valkeyClient valkey.Client) (int, string, error) { - // Generate a key using lineref, directionref, datasource, and datedvehiclejourneyref - lineref := values[2] - directionref := values[3] - datasource := values[4] - datedvehiclejourneyref := values[5] - key := fmt.Sprintf("%v.%v.%v.%v", lineref, directionref, datasource, datedvehiclejourneyref) +func InsertOrUpdateEstimatedVehicleJourney(db *sql.DB, values []interface{}) (int, string, error) { + query := ` + INSERT INTO estimatedvehiclejourney (servicedelivery, recordedattime, lineref, directionref, datasource, datedvehiclejourneyref, vehiclemode, dataframeref, originref, destinationref, operatorref, vehicleref, cancellation, other, firstservicedelivery) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $1) + ON CONFLICT (lineref, directionref, datasource, datedvehiclejourneyref) + DO UPDATE SET + servicedelivery = EXCLUDED.servicedelivery, + recordedattime = EXCLUDED.recordedattime, + vehiclemode = COALESCE(EXCLUDED.vehiclemode, estimatedvehiclejourney.vehiclemode), + dataframeref = COALESCE(EXCLUDED.dataframeref, estimatedvehiclejourney.dataframeref), + originref = COALESCE(EXCLUDED.originref, estimatedvehiclejourney.originref), + destinationref = COALESCE(EXCLUDED.destinationref, estimatedvehiclejourney.destinationref), + operatorref = COALESCE(EXCLUDED.operatorref, estimatedvehiclejourney.operatorref), + vehicleref = COALESCE(EXCLUDED.vehicleref, estimatedvehiclejourney.vehicleref), + cancellation = COALESCE(EXCLUDED.cancellation, estimatedvehiclejourney.cancellation), + other = COALESCE(EXCLUDED.other, estimatedvehiclejourney.other) + RETURNING CASE WHEN xmax = 0 THEN 'insert' ELSE 'update' END, id; + ` - // Convert values to a single string and hash it using MD5 - var valuesString string - for _, v := range values { - if v != nil { - valuesString += fmt.Sprintf("%v", v) - } - } - hash := md5.Sum([]byte(valuesString)) - hashString := hex.EncodeToString(hash[:]) - - // Get the MD5 hash from Valkey - retrievedHash, err := valki.GetValkeyValue(ctx, valkeyClient, key) + stmt, err := db.Prepare(query) if err != nil { - return 0, "", fmt.Errorf("failed to get value from Valkey: %v", err) + return 0, "", fmt.Errorf("error preparing statement: %v", err) + } + defer stmt.Close() + + var action string + var id int + err = stmt.QueryRow(values...).Scan(&action, &id) + if err != nil { + return 0, "", fmt.Errorf("error executing statement: %v", err) } - // Check if the retrieved value matches the original MD5 hash - if retrievedHash != hashString { - query := ` - INSERT INTO estimatedvehiclejourney (servicedelivery, recordedattime, lineref, directionref, datasource, datedvehiclejourneyref, vehiclemode, dataframeref, originref, destinationref, operatorref, vehicleref, cancellation, other, firstservicedelivery) - VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $1) - ON CONFLICT (lineref, directionref, datasource, datedvehiclejourneyref) - DO UPDATE SET - servicedelivery = EXCLUDED.servicedelivery, - recordedattime = EXCLUDED.recordedattime, - vehiclemode = COALESCE(EXCLUDED.vehiclemode, estimatedvehiclejourney.vehiclemode), - dataframeref = COALESCE(EXCLUDED.dataframeref, estimatedvehiclejourney.dataframeref), - originref = COALESCE(EXCLUDED.originref, estimatedvehiclejourney.originref), - destinationref = COALESCE(EXCLUDED.destinationref, estimatedvehiclejourney.destinationref), - operatorref = COALESCE(EXCLUDED.operatorref, estimatedvehiclejourney.operatorref), - vehicleref = COALESCE(EXCLUDED.vehicleref, estimatedvehiclejourney.vehicleref), - cancellation = COALESCE(EXCLUDED.cancellation, estimatedvehiclejourney.cancellation), - other = COALESCE(EXCLUDED.other, estimatedvehiclejourney.other) - RETURNING CASE WHEN xmax = 0 THEN 'insert' ELSE 'update' END, id; - ` - - stmt, err := db.Prepare(query) - if err != nil { - return 0, "", fmt.Errorf("error preparing statement: %v", err) - } - defer stmt.Close() - - err = valki.SetValkeyValue(ctx, valkeyClient, key, hashString) - if err != nil { - return 0, "", fmt.Errorf("failed to set value in Valkey: %v", err) - } - - var action string - var id int - err = stmt.QueryRow(values...).Scan(&action, &id) - if err != nil { - return 0, "", fmt.Errorf("error executing statement: %v", err) - } - return id, action, nil - } else { - return 0, "none", nil - } + return id, action, nil } diff --git a/export/database.go b/export/database.go index 68e59c8..cf817eb 100644 --- a/export/database.go +++ b/export/database.go @@ -38,7 +38,7 @@ func DBData(data *data.Data) { fmt.Println("SID:", sid) // counters - var insertCount, updateCount, noneCount, totalCount, estimatedCallInsertCount, estimatedCallUpdateCount, estimatedCallNoneCount, recordedCallInsertCount, recordedCallUpdateCount, recordedCallNoneCount int + var insertCount, updateCount, totalCount, estimatedCallInsertCount, estimatedCallUpdateCount, estimatedCallNoneCount, recordedCallInsertCount, recordedCallUpdateCount, recordedCallNoneCount int for _, journey := range data.ServiceDelivery.EstimatedTimetableDelivery[0].EstimatedJourneyVersionFrame.EstimatedVehicleJourney { var values []interface{} @@ -151,7 +151,7 @@ func DBData(data *data.Data) { values = append(values, otherJson) // Insert or update the record - id, action, err := database.InsertOrUpdateEstimatedVehicleJourney(ctx, db, values, valkeyClient) + id, action, err := database.InsertOrUpdateEstimatedVehicleJourney(db, values) if err != nil { fmt.Printf("Error inserting/updating estimated vehicle journey: %v\n", err) } else { @@ -163,10 +163,8 @@ func DBData(data *data.Data) { insertCount++ } else if action == "update" { updateCount++ - } else if action == "none" { - noneCount++ } - totalCount = insertCount + updateCount + noneCount + totalCount = insertCount + updateCount //fmt.Printf("Inserts: %d, Updates: %d, Total: %d\n", insertCount, updateCount, totalCount) if totalCount%1000 == 0 {