From 1020dacf79ee7b28c28bf2b80ed693703099bd50 Mon Sep 17 00:00:00 2001 From: pigwin Date: Sat, 4 Jan 2025 22:07:05 +0000 Subject: [PATCH 01/17] add connection pool settings in ConnectToPostgreSQL function --- .github/workflows/docker-image.yml | 2 +- config/db.go | 6 ++++++ export/database.go | 2 +- 3 files changed, 8 insertions(+), 2 deletions(-) diff --git a/.github/workflows/docker-image.yml b/.github/workflows/docker-image.yml index c657c2b..74acbda 100644 --- a/.github/workflows/docker-image.yml +++ b/.github/workflows/docker-image.yml @@ -4,7 +4,7 @@ on: push: branches: - main - - test + - dev jobs: build: diff --git a/config/db.go b/config/db.go index ac747fc..762c46a 100644 --- a/config/db.go +++ b/config/db.go @@ -4,6 +4,7 @@ import ( "database/sql" "fmt" "log" + "time" _ "github.com/lib/pq" ) @@ -26,6 +27,11 @@ func ConnectToPostgreSQL() (*sql.DB, error) { return nil, err } + // Set connection pool settings + db.SetMaxOpenConns(25) // Maximum number of open connections to the database + db.SetMaxIdleConns(25) // Maximum number of connections in the idle connection pool + db.SetConnMaxLifetime(1 * time.Hour) // Maximum amount of time a connection may be reused + fmt.Println("Connection to PostgreSQL opened successfully :D") // Ping database to verify connection diff --git a/export/database.go b/export/database.go index 4b5c9fe..ec5fcd6 100644 --- a/export/database.go +++ b/export/database.go @@ -418,4 +418,4 @@ func DBData(data *data.Data) { if err != nil { log.Fatal(err) } -} \ No newline at end of file +} From c456bdecdb4c79737aef8d7909576799ac2cc039 Mon Sep 17 00:00:00 2001 From: pigwin Date: Mon, 6 Jan 2025 15:32:04 +0000 Subject: [PATCH 02/17] hash for valky or reddis or smth else --- database/EstimatedCall.go | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/database/EstimatedCall.go b/database/EstimatedCall.go index ac7cf3b..56c797b 100644 --- a/database/EstimatedCall.go +++ b/database/EstimatedCall.go @@ -1,7 +1,9 @@ package database import ( + "crypto/md5" "database/sql" + "encoding/hex" "fmt" ) @@ -13,6 +15,17 @@ func InsertOrUpdateEstimatedCall(db *sql.DB, values []interface{}) (int, string, } } + // Convert values to a single string and hash it using MD5 + var valuesString string + for _, v := range values { + if v != nil { + valuesString += fmt.Sprintf("%v", v) + } + } + hash := md5.Sum([]byte(valuesString)) + hashString := hex.EncodeToString(hash[:]) + println(hashString) + query := ` INSERT INTO calls ( estimatedvehiclejourney, "order", stoppointref, From c6fc0070cf851bb6db01fb9685884b3511a77529 Mon Sep 17 00:00:00 2001 From: pigwin Date: Mon, 6 Jan 2025 20:47:55 +0000 Subject: [PATCH 03/17] add Valkey configuration and connection management --- config/conf.json | 6 +++ config/config.go | 26 ++++++++++- config/valkey.go | 93 +++++++++++++++++++++++++++++++++++++++ database/EstimatedCall.go | 4 ++ go.mod | 5 +++ go.sum | 4 ++ valkey/commands.go | 26 +++++++++++ 7 files changed, 163 insertions(+), 1 deletion(-) create mode 100644 config/valkey.go create mode 100644 valkey/commands.go diff --git a/config/conf.json b/config/conf.json index eefb956..f1b5b1f 100644 --- a/config/conf.json +++ b/config/conf.json @@ -7,5 +7,11 @@ "dbname": "ti1", "sslmode": "disable" }, + "valkey": { + "host": "127.0.0.1", + "port": "6379", + "max_conns": 50, + "timeout_ms": 5000 + }, "temp": "value" } \ No newline at end of file diff --git a/config/config.go b/config/config.go index f59e215..1561e79 100644 --- a/config/config.go +++ b/config/config.go @@ -4,6 +4,7 @@ import ( "encoding/json" "fmt" "os" + "strconv" ) type Config struct { @@ -15,6 +16,12 @@ type Config struct { DBName string `json:"dbname"` SSLMode string `json:"sslmode"` } `json:"database"` + Valkey struct { + Host string `json:"host"` + Port string `json:"port"` + MaxConns int `json:"max_conns"` + TimeoutMs int `json:"timeout_ms"` + } `json:"valkey"` Temp string `json:"temp"` } @@ -53,7 +60,24 @@ func LoadConfig(file string) (Config, error) { if temp := os.Getenv("TEMP"); temp != "" { config.Temp = temp } - //log.Println("Temp value:", config.Temp) + + // Override Valkey settings with environment variables + if valkeyHost := os.Getenv("VALKEY_HOST"); valkeyHost != "" { + config.Valkey.Host = valkeyHost + } + if valkeyPort := os.Getenv("VALKEY_PORT"); valkeyPort != "" { + config.Valkey.Port = valkeyPort + } + if maxConns := os.Getenv("VALKEY_MAX_CONNS"); maxConns != "" { + if val, err := strconv.Atoi(maxConns); err == nil { + config.Valkey.MaxConns = val + } + } + if timeoutMs := os.Getenv("VALKEY_TIMEOUT_MS"); timeoutMs != "" { + if val, err := strconv.Atoi(timeoutMs); err == nil { + config.Valkey.TimeoutMs = val + } + } return config, nil } diff --git a/config/valkey.go b/config/valkey.go new file mode 100644 index 0000000..343e2b6 --- /dev/null +++ b/config/valkey.go @@ -0,0 +1,93 @@ +package config + +import ( + "context" + "encoding/json" + "fmt" + "log" + "os" + "strconv" + "time" + + "github.com/valkey-io/valkey-go" +) + +type ValkeyConfig struct { + Host string `json:"host"` + Port string `json:"port"` + MaxConns int `json:"max_conns"` + TimeoutMs int `json:"timeout_ms"` +} + +func LoadValkeyConfig(file string) (ValkeyConfig, error) { + var config ValkeyConfig + configFile, err := os.Open(file) + if err != nil { + return config, fmt.Errorf("failed to open config file: %w", err) + } + defer configFile.Close() + + if err := json.NewDecoder(configFile).Decode(&config); err != nil { + return config, fmt.Errorf("failed to parse Valkey config: %w", err) + } + + // Override with environment variables if set + if host := os.Getenv("VALKEY_HOST"); host != "" { + config.Host = host + } + if port := os.Getenv("VALKEY_PORT"); port != "" { + config.Port = port + } + if maxConns := os.Getenv("VALKEY_MAX_CONNS"); maxConns != "" { + if val, err := strconv.Atoi(maxConns); err == nil { + config.MaxConns = val + } + } + if timeoutMs := os.Getenv("VALKEY_TIMEOUT_MS"); timeoutMs != "" { + if val, err := strconv.Atoi(timeoutMs); err == nil { + config.TimeoutMs = val + } + } + + return config, nil +} + +func ConnectToValkey(configPath string) (valkey.Client, error) { + fmt.Println("Loading Valkey configuration...") + valkeyConfig, err := LoadValkeyConfig(configPath) + if err != nil { + return nil, fmt.Errorf("failed to load Valkey config: %v", err) + } + fmt.Println("Valkey configuration loaded successfully!") + + // Setup Valkey client options + options := valkey.ClientOption{ + InitAddress: []string{fmt.Sprintf("%s:%s", valkeyConfig.Host, valkeyConfig.Port)}, + // Additional options can be added here if required + } + + fmt.Printf("Connecting to Valkey at %s:%s...\n", valkeyConfig.Host, valkeyConfig.Port) + client, err := valkey.NewClient(options) + if err != nil { + return nil, fmt.Errorf("failed to connect to Valkey: %v", err) + } + + // Optionally, perform a ping to validate the connection + ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*time.Duration(valkeyConfig.TimeoutMs)) + defer cancel() + + if err := client.Do(ctx, client.B().Ping().Build()).Error(); err != nil { + client.Close() + return nil, fmt.Errorf("failed to ping Valkey: %v", err) + } + + log.Println("Connected to Valkey successfully!") + return client, nil +} + +func DisconnectFromValkey(client valkey.Client) error { + fmt.Println("Disconnecting from Valkey...") + client.Close() + log.Println("Disconnected from Valkey successfully!") + return nil +} diff --git a/database/EstimatedCall.go b/database/EstimatedCall.go index 56c797b..6cd627c 100644 --- a/database/EstimatedCall.go +++ b/database/EstimatedCall.go @@ -26,6 +26,10 @@ func InsertOrUpdateEstimatedCall(db *sql.DB, values []interface{}) (int, string, hashString := hex.EncodeToString(hash[:]) println(hashString) + estimatedVehicleJourneyID := values[0] + orderID := values[1] + fmt.Printf("Estimated Vehicle Journey ID: %v, Order ID: %v\n", estimatedVehicleJourneyID, orderID) + query := ` INSERT INTO calls ( estimatedvehiclejourney, "order", stoppointref, diff --git a/go.mod b/go.mod index 2e16502..4db7bea 100644 --- a/go.mod +++ b/go.mod @@ -3,3 +3,8 @@ module ti1 go 1.23.4 require github.com/lib/pq v1.10.9 + +require ( + github.com/valkey-io/valkey-go v1.0.52 // indirect + golang.org/x/sys v0.24.0 // indirect +) diff --git a/go.sum b/go.sum index aeddeae..4ba6c43 100644 --- a/go.sum +++ b/go.sum @@ -1,2 +1,6 @@ github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw= github.com/lib/pq v1.10.9/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o= +github.com/valkey-io/valkey-go v1.0.52 h1:ojrR736satGucqpllYzal8fUrNNROc11V10zokAyIYg= +github.com/valkey-io/valkey-go v1.0.52/go.mod h1:BXlVAPIL9rFQinSFM+N32JfWzfCaUAqBpZkc4vPY6fM= +golang.org/x/sys v0.24.0 h1:Twjiwq9dn6R1fQcyiK+wQyHWfaz/BJB+YIpzU/Cv3Xg= +golang.org/x/sys v0.24.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= diff --git a/valkey/commands.go b/valkey/commands.go new file mode 100644 index 0000000..97e1126 --- /dev/null +++ b/valkey/commands.go @@ -0,0 +1,26 @@ +package valkey + +import ( + "context" + "fmt" + + "github.com/valkey-io/valkey-go" +) + +func SetValkeyValue(client valkey.Client, key, value string) error { + ctx := context.Background() + err := client.Do(ctx, client.B().Set().Key(key).Value(value).Build()).Error() + if err != nil { + return fmt.Errorf("failed to set value in Valkey: %v", err) + } + return nil +} + +func GetValkeyValue(client valkey.Client, key string) (string, error) { + ctx := context.Background() + value, err := client.Do(ctx, client.B().Get().Key(key).Build()).ToString() + if err != nil { + return "", fmt.Errorf("failed to get value from Valkey: %v", err) + } + return value, nil +} From ba558803ff9c6f0b93f61f4df7b7f2f18b20704c Mon Sep 17 00:00:00 2001 From: pigwin Date: Tue, 7 Jan 2025 18:28:47 +0000 Subject: [PATCH 04/17] add password configuration to Valkey settings --- config/conf.json | 3 ++- config/config.go | 1 + config/valkey.go | 15 +++++++++++---- 3 files changed, 14 insertions(+), 5 deletions(-) diff --git a/config/conf.json b/config/conf.json index f1b5b1f..b68d93a 100644 --- a/config/conf.json +++ b/config/conf.json @@ -11,7 +11,8 @@ "host": "127.0.0.1", "port": "6379", "max_conns": 50, - "timeout_ms": 5000 + "timeout_ms": 5000, + "password": "the_valkey_password" }, "temp": "value" } \ No newline at end of file diff --git a/config/config.go b/config/config.go index 1561e79..79d212e 100644 --- a/config/config.go +++ b/config/config.go @@ -21,6 +21,7 @@ type Config struct { Port string `json:"port"` MaxConns int `json:"max_conns"` TimeoutMs int `json:"timeout_ms"` + Password string `json:"password"` // Add this line } `json:"valkey"` Temp string `json:"temp"` } diff --git a/config/valkey.go b/config/valkey.go index 343e2b6..778bf6d 100644 --- a/config/valkey.go +++ b/config/valkey.go @@ -17,6 +17,7 @@ type ValkeyConfig struct { Port string `json:"port"` MaxConns int `json:"max_conns"` TimeoutMs int `json:"timeout_ms"` + Password string `json:"password"` // Add this line } func LoadValkeyConfig(file string) (ValkeyConfig, error) { @@ -48,21 +49,27 @@ func LoadValkeyConfig(file string) (ValkeyConfig, error) { config.TimeoutMs = val } } + if password := os.Getenv("VALKEY_PASSWORD"); password != "" { + config.Password = password + } return config, nil } func ConnectToValkey(configPath string) (valkey.Client, error) { - fmt.Println("Loading Valkey configuration...") - valkeyConfig, err := LoadValkeyConfig(configPath) + fmt.Println("Loading configuration...") + config, err := LoadConfig(configPath) if err != nil { - return nil, fmt.Errorf("failed to load Valkey config: %v", err) + return nil, fmt.Errorf("failed to load config: %v", err) } - fmt.Println("Valkey configuration loaded successfully!") + fmt.Println("Configuration loaded successfully!") + + valkeyConfig := config.Valkey // Setup Valkey client options options := valkey.ClientOption{ InitAddress: []string{fmt.Sprintf("%s:%s", valkeyConfig.Host, valkeyConfig.Port)}, + Password: valkeyConfig.Password, // Additional options can be added here if required } From 97a6506a6582f438d2c351a020b628b74f177612 Mon Sep 17 00:00:00 2001 From: pigwin Date: Tue, 7 Jan 2025 18:36:27 +0000 Subject: [PATCH 05/17] valkey testy --- database/EstimatedCall.go | 30 ++++++++++++++++++++++++++++++ 1 file changed, 30 insertions(+) diff --git a/database/EstimatedCall.go b/database/EstimatedCall.go index 6cd627c..f3ef798 100644 --- a/database/EstimatedCall.go +++ b/database/EstimatedCall.go @@ -5,6 +5,9 @@ import ( "database/sql" "encoding/hex" "fmt" + "log" + "ti1/config" + "ti1/valkey" ) func InsertOrUpdateEstimatedCall(db *sql.DB, values []interface{}) (int, string, error) { @@ -28,8 +31,35 @@ func InsertOrUpdateEstimatedCall(db *sql.DB, values []interface{}) (int, string, estimatedVehicleJourneyID := values[0] orderID := values[1] + key := fmt.Sprintf("%v.%v", estimatedVehicleJourneyID, orderID) fmt.Printf("Estimated Vehicle Journey ID: %v, Order ID: %v\n", estimatedVehicleJourneyID, orderID) + // Connect to Valkey + valkeyClient, err := config.ConnectToValkey("config/conf.json") + if err != nil { + log.Fatalf("Failed to connect to Valkey: %v", err) + } + defer config.DisconnectFromValkey(valkeyClient) + + // Set the MD5 hash in Valkey + err = valkey.SetValkeyValue(valkeyClient, key, hashString) + if err != nil { + log.Fatalf("Failed to set value in Valkey: %v", err) + } + + // Get the MD5 hash from Valkey + retrievedHash, err := valkey.GetValkeyValue(valkeyClient, key) + if err != nil { + log.Fatalf("Failed to get value from Valkey: %v", err) + } + + // Check if the retrieved value matches the original MD5 hash + if retrievedHash != hashString { + log.Fatalf("Retrieved hash does not match the original hash. Original: %s, Retrieved: %s", hashString, retrievedHash) + } else { + fmt.Println("Retrieved hash matches the original hash.") + } + query := ` INSERT INTO calls ( estimatedvehiclejourney, "order", stoppointref, From a2c1766dd1f2ced9a1b5b749243b7b349d5e6fa3 Mon Sep 17 00:00:00 2001 From: pigwin Date: Tue, 7 Jan 2025 18:54:49 +0000 Subject: [PATCH 06/17] refactor: improve connection string formatting and enhance logging in ConnectToPostgreSQL function --- config/db.go | 50 +++++++++++++++++++++++++------------------------- 1 file changed, 25 insertions(+), 25 deletions(-) diff --git a/config/db.go b/config/db.go index 762c46a..51b8ef1 100644 --- a/config/db.go +++ b/config/db.go @@ -10,41 +10,41 @@ import ( ) func ConnectToPostgreSQL() (*sql.DB, error) { - fmt.Println("Connecting to PostgreSQL...") - config, err := LoadConfig("config/conf.json") - if err != nil { - return nil, err - } + fmt.Println("Connecting to PostgreSQL...") + config, err := LoadConfig("config/conf.json") + if err != nil { + return nil, err + } - fmt.Println("Configuration loaded successfully!") + fmt.Println("Configuration loaded successfully!") - connStr := fmt.Sprintf("host=%s port=%s user=%s password=%s dbname=%s sslmode=%s", - config.Database.Host, config.Database.Port, config.Database.User, config.Database.Password, config.Database.DBName, config.Database.SSLMode) + connStr := fmt.Sprintf("host=%s port=%s user='%s' password='%s' dbname='%s' sslmode=%s", + config.Database.Host, config.Database.Port, config.Database.User, config.Database.Password, config.Database.DBName, config.Database.SSLMode) - // Open connection to database - db, err := sql.Open("postgres", connStr) - if err != nil { - return nil, err - } + // Open connection to database + db, err := sql.Open("postgres", connStr) + if err != nil { + return nil, err + } - // Set connection pool settings - db.SetMaxOpenConns(25) // Maximum number of open connections to the database - db.SetMaxIdleConns(25) // Maximum number of connections in the idle connection pool - db.SetConnMaxLifetime(1 * time.Hour) // Maximum amount of time a connection may be reused + // Set connection pool settings + db.SetMaxOpenConns(25) // Maximum number of open connections to the database + db.SetMaxIdleConns(25) // Maximum number of connections in the idle connection pool + db.SetConnMaxLifetime(1 * time.Hour) // Maximum amount of time a connection may be reused fmt.Println("Connection to PostgreSQL opened successfully :D") - // Ping database to verify connection - err = db.Ping() + // Ping database to verify connection + err = db.Ping() - fmt.Println(err) - if err != nil { - return nil, err - } + fmt.Println(err) + if err != nil { + return nil, err + } - log.Println("Connected to PostgreSQL!") + log.Println("Connected to PostgreSQL!") - return db, nil + return db, nil } func DisconnectFromPostgreSQL(db *sql.DB) error { From df0b5135bd0127fcadf8f57583aa44524bca0064 Mon Sep 17 00:00:00 2001 From: pigwin Date: Tue, 7 Jan 2025 19:51:14 +0000 Subject: [PATCH 07/17] wtf so mutch pain cus i named a package wrong lol --- database/EstimatedCall.go | 38 +++++++++++------------------------ export/database.go | 14 +++++++++++-- {valkey => valki}/commands.go | 8 +++----- 3 files changed, 27 insertions(+), 33 deletions(-) rename {valkey => valki}/commands.go (66%) diff --git a/database/EstimatedCall.go b/database/EstimatedCall.go index f3ef798..1156d4d 100644 --- a/database/EstimatedCall.go +++ b/database/EstimatedCall.go @@ -1,16 +1,17 @@ package database import ( + "context" "crypto/md5" "database/sql" "encoding/hex" "fmt" - "log" - "ti1/config" - "ti1/valkey" + "ti1/valki" + + "github.com/valkey-io/valkey-go" ) -func InsertOrUpdateEstimatedCall(db *sql.DB, values []interface{}) (int, string, error) { +func InsertOrUpdateEstimatedCall(ctx context.Context, db *sql.DB, values []interface{}, valkeyClient valkey.Client) (int, string, error) { // Replace empty strings with nil for timestamp fields for i, v := range values { if str, ok := v.(string); ok && str == "" { @@ -27,38 +28,30 @@ func InsertOrUpdateEstimatedCall(db *sql.DB, values []interface{}) (int, string, } hash := md5.Sum([]byte(valuesString)) hashString := hex.EncodeToString(hash[:]) - println(hashString) + fmt.Println("HashString:", hashString) estimatedVehicleJourneyID := values[0] orderID := values[1] key := fmt.Sprintf("%v.%v", estimatedVehicleJourneyID, orderID) fmt.Printf("Estimated Vehicle Journey ID: %v, Order ID: %v\n", estimatedVehicleJourneyID, orderID) - // Connect to Valkey - valkeyClient, err := config.ConnectToValkey("config/conf.json") - if err != nil { - log.Fatalf("Failed to connect to Valkey: %v", err) - } - defer config.DisconnectFromValkey(valkeyClient) - // Set the MD5 hash in Valkey - err = valkey.SetValkeyValue(valkeyClient, key, hashString) + err := valki.SetValkeyValue(ctx, valkeyClient, key, hashString) if err != nil { - log.Fatalf("Failed to set value in Valkey: %v", err) + return 0, "", fmt.Errorf("failed to set value in Valkey: %v", err) } // Get the MD5 hash from Valkey - retrievedHash, err := valkey.GetValkeyValue(valkeyClient, key) + retrievedHash, err := valki.GetValkeyValue(ctx, valkeyClient, key) if err != nil { - log.Fatalf("Failed to get value from Valkey: %v", err) + return 0, "", fmt.Errorf("failed to get value from Valkey: %v", err) } // Check if the retrieved value matches the original MD5 hash if retrievedHash != hashString { - log.Fatalf("Retrieved hash does not match the original hash. Original: %s, Retrieved: %s", hashString, retrievedHash) - } else { - fmt.Println("Retrieved hash matches the original hash.") + return 0, "", fmt.Errorf("hash mismatch: original %s, retrieved %s", hashString, retrievedHash) } + fmt.Println("Retrieved hash matches the original hash.") query := ` INSERT INTO calls ( @@ -89,13 +82,6 @@ func InsertOrUpdateEstimatedCall(db *sql.DB, values []interface{}) (int, string, var id int err = stmt.QueryRow(values...).Scan(&action, &id) if err != nil { - if 1 == 0 { - fmt.Println("Executing query:", query) - for i, v := range values { - fmt.Printf("Value %d: (%v)\n", i+1, v) - } - - } return 0, "", fmt.Errorf("error executing statement: %v", err) } return id, action, nil diff --git a/export/database.go b/export/database.go index ec5fcd6..c6fd439 100644 --- a/export/database.go +++ b/export/database.go @@ -1,6 +1,7 @@ package export import ( + "context" "encoding/json" "fmt" "log" @@ -20,6 +21,15 @@ func DBData(data *data.Data) { } defer db.Close() + // Connect to Valkey + valkeyClient, err := config.ConnectToValkey("config/conf.json") + if err != nil { + log.Fatalf("Failed to connect to Valkey: %v", err) + } + defer config.DisconnectFromValkey(valkeyClient) + + ctx := context.Background() + // Get service id aka sid sid, err := database.InsertServiceDelivery(db, data.ServiceDelivery.ResponseTimestamp, data.ServiceDelivery.EstimatedTimetableDelivery[0].EstimatedJourneyVersionFrame.RecordedAtTime) if err != nil { @@ -291,9 +301,9 @@ func DBData(data *data.Data) { for i, v := range stringValues { interfaceValues[i] = v } - id, action, err := database.InsertOrUpdateEstimatedCall(db, interfaceValues) + id, action, err := database.InsertOrUpdateEstimatedCall(ctx, db, interfaceValues, valkeyClient) if err != nil { - fmt.Printf("Error inserting/updating estimated call: %v\n", err) + log.Fatalf("Failed to insert or update estimated call: %v", err) } else { if 1 == 0 { fmt.Printf("Action: %s, ID: %d\n", action, id) diff --git a/valkey/commands.go b/valki/commands.go similarity index 66% rename from valkey/commands.go rename to valki/commands.go index 97e1126..88fd482 100644 --- a/valkey/commands.go +++ b/valki/commands.go @@ -1,4 +1,4 @@ -package valkey +package valki import ( "context" @@ -7,8 +7,7 @@ import ( "github.com/valkey-io/valkey-go" ) -func SetValkeyValue(client valkey.Client, key, value string) error { - ctx := context.Background() +func SetValkeyValue(ctx context.Context, client valkey.Client, key, value string) error { err := client.Do(ctx, client.B().Set().Key(key).Value(value).Build()).Error() if err != nil { return fmt.Errorf("failed to set value in Valkey: %v", err) @@ -16,8 +15,7 @@ func SetValkeyValue(client valkey.Client, key, value string) error { return nil } -func GetValkeyValue(client valkey.Client, key string) (string, error) { - ctx := context.Background() +func GetValkeyValue(ctx context.Context, client valkey.Client, key string) (string, error) { value, err := client.Do(ctx, client.B().Get().Key(key).Build()).ToString() if err != nil { return "", fmt.Errorf("failed to get value from Valkey: %v", err) From b8a2a5837f019a3529526031110db22c43b7f16a Mon Sep 17 00:00:00 2001 From: pigwin Date: Tue, 7 Jan 2025 20:13:23 +0000 Subject: [PATCH 08/17] hehehehehe mek valke wurke --- database/EstimatedCall.go | 21 ++++++++++++--------- 1 file changed, 12 insertions(+), 9 deletions(-) diff --git a/database/EstimatedCall.go b/database/EstimatedCall.go index 1156d4d..5d79520 100644 --- a/database/EstimatedCall.go +++ b/database/EstimatedCall.go @@ -35,12 +35,6 @@ func InsertOrUpdateEstimatedCall(ctx context.Context, db *sql.DB, values []inter key := fmt.Sprintf("%v.%v", estimatedVehicleJourneyID, orderID) fmt.Printf("Estimated Vehicle Journey ID: %v, Order ID: %v\n", estimatedVehicleJourneyID, orderID) - // Set the MD5 hash in Valkey - err := valki.SetValkeyValue(ctx, valkeyClient, key, hashString) - if err != nil { - return 0, "", fmt.Errorf("failed to set value in Valkey: %v", err) - } - // Get the MD5 hash from Valkey retrievedHash, err := valki.GetValkeyValue(ctx, valkeyClient, key) if err != nil { @@ -48,10 +42,10 @@ func InsertOrUpdateEstimatedCall(ctx context.Context, db *sql.DB, values []inter } // Check if the retrieved value matches the original MD5 hash - if retrievedHash != hashString { - return 0, "", fmt.Errorf("hash mismatch: original %s, retrieved %s", hashString, retrievedHash) + if retrievedHash == hashString { + fmt.Println("Retrieved hash matches the original hash. No update needed.") + return 0, "no_update", nil } - fmt.Println("Retrieved hash matches the original hash.") query := ` INSERT INTO calls ( @@ -84,5 +78,14 @@ func InsertOrUpdateEstimatedCall(ctx context.Context, db *sql.DB, values []inter if err != nil { return 0, "", fmt.Errorf("error executing statement: %v", err) } + + // If the record was inserted or updated, set the new hash in Valkey + if action == "insert" || action == "update" { + err = valki.SetValkeyValue(ctx, valkeyClient, key, hashString) + if err != nil { + return 0, "", fmt.Errorf("failed to set value in Valkey: %v", err) + } + } + return id, action, nil } From 07838da0aded72259d1946db6448b85de246e942 Mon Sep 17 00:00:00 2001 From: pigwin Date: Tue, 7 Jan 2025 20:33:29 +0000 Subject: [PATCH 09/17] lolololololollollollol --- database/EstimatedCall.go | 21 +++++++++------------ 1 file changed, 9 insertions(+), 12 deletions(-) diff --git a/database/EstimatedCall.go b/database/EstimatedCall.go index 5d79520..f11dda1 100644 --- a/database/EstimatedCall.go +++ b/database/EstimatedCall.go @@ -42,9 +42,15 @@ func InsertOrUpdateEstimatedCall(ctx context.Context, db *sql.DB, values []inter } // Check if the retrieved value matches the original MD5 hash - if retrievedHash == hashString { - fmt.Println("Retrieved hash matches the original hash. No update needed.") - return 0, "no_update", nil + if retrievedHash != hashString { + return 0, "", fmt.Errorf("hash mismatch: original %s, retrieved %s", hashString, retrievedHash) + } + fmt.Println("Retrieved hash matches the original hash.") + + // Set the MD5 hash in Valkey + err = valki.SetValkeyValue(ctx, valkeyClient, key, hashString) + if err != nil { + return 0, "", fmt.Errorf("failed to set value in Valkey: %v", err) } query := ` @@ -78,14 +84,5 @@ func InsertOrUpdateEstimatedCall(ctx context.Context, db *sql.DB, values []inter if err != nil { return 0, "", fmt.Errorf("error executing statement: %v", err) } - - // If the record was inserted or updated, set the new hash in Valkey - if action == "insert" || action == "update" { - err = valki.SetValkeyValue(ctx, valkeyClient, key, hashString) - if err != nil { - return 0, "", fmt.Errorf("failed to set value in Valkey: %v", err) - } - } - return id, action, nil } From 9919c159f272c2d1176669d5e9402bad370ad5bc Mon Sep 17 00:00:00 2001 From: pigwin Date: Tue, 7 Jan 2025 20:39:47 +0000 Subject: [PATCH 10/17] gotta fix that valkey shit later --- database/EstimatedCall.go | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/database/EstimatedCall.go b/database/EstimatedCall.go index f11dda1..2efaf50 100644 --- a/database/EstimatedCall.go +++ b/database/EstimatedCall.go @@ -35,6 +35,14 @@ func InsertOrUpdateEstimatedCall(ctx context.Context, db *sql.DB, values []inter key := fmt.Sprintf("%v.%v", estimatedVehicleJourneyID, orderID) fmt.Printf("Estimated Vehicle Journey ID: %v, Order ID: %v\n", estimatedVehicleJourneyID, orderID) + var err error + + // Set the MD5 hash in Valkey + err = valki.SetValkeyValue(ctx, valkeyClient, key, hashString) + if err != nil { + return 0, "", fmt.Errorf("failed to set value in Valkey: %v", err) + } + // Get the MD5 hash from Valkey retrievedHash, err := valki.GetValkeyValue(ctx, valkeyClient, key) if err != nil { @@ -47,12 +55,6 @@ func InsertOrUpdateEstimatedCall(ctx context.Context, db *sql.DB, values []inter } fmt.Println("Retrieved hash matches the original hash.") - // Set the MD5 hash in Valkey - err = valki.SetValkeyValue(ctx, valkeyClient, key, hashString) - if err != nil { - return 0, "", fmt.Errorf("failed to set value in Valkey: %v", err) - } - query := ` INSERT INTO calls ( estimatedvehiclejourney, "order", stoppointref, From 2141c5f1687477c0268b5c262b68c5409fddbac9 Mon Sep 17 00:00:00 2001 From: pigwin Date: Thu, 9 Jan 2025 17:56:41 +0000 Subject: [PATCH 11/17] hehe --- valki/commands.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/valki/commands.go b/valki/commands.go index 88fd482..3195fc4 100644 --- a/valki/commands.go +++ b/valki/commands.go @@ -18,7 +18,8 @@ func SetValkeyValue(ctx context.Context, client valkey.Client, key, value string func GetValkeyValue(ctx context.Context, client valkey.Client, key string) (string, error) { value, err := client.Do(ctx, client.B().Get().Key(key).Build()).ToString() if err != nil { - return "", fmt.Errorf("failed to get value from Valkey: %v", err) + return "hehe", nil + //return "", fmt.Errorf("failed to get value from Valkey: %v", err) } return value, nil } From 5203208fe70f52d31233d6f9d07774f1c13fef2b Mon Sep 17 00:00:00 2001 From: pigwin Date: Thu, 9 Jan 2025 18:05:17 +0000 Subject: [PATCH 12/17] hash validation and database insertion logic --- database/EstimatedCall.go | 77 +++++++++++++++++++-------------------- 1 file changed, 38 insertions(+), 39 deletions(-) diff --git a/database/EstimatedCall.go b/database/EstimatedCall.go index 2efaf50..6d6110e 100644 --- a/database/EstimatedCall.go +++ b/database/EstimatedCall.go @@ -37,12 +37,6 @@ func InsertOrUpdateEstimatedCall(ctx context.Context, db *sql.DB, values []inter var err error - // Set the MD5 hash in Valkey - err = valki.SetValkeyValue(ctx, valkeyClient, key, hashString) - if err != nil { - return 0, "", fmt.Errorf("failed to set value in Valkey: %v", err) - } - // Get the MD5 hash from Valkey retrievedHash, err := valki.GetValkeyValue(ctx, valkeyClient, key) if err != nil { @@ -51,40 +45,45 @@ func InsertOrUpdateEstimatedCall(ctx context.Context, db *sql.DB, values []inter // Check if the retrieved value matches the original MD5 hash if retrievedHash != hashString { - return 0, "", fmt.Errorf("hash mismatch: original %s, retrieved %s", hashString, retrievedHash) - } - fmt.Println("Retrieved hash matches the original hash.") + query := ` + INSERT INTO calls ( + estimatedvehiclejourney, "order", stoppointref, + aimeddeparturetime, expecteddeparturetime, + aimedarrivaltime, expectedarrivaltime, + cancellation, estimated_data + ) + VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9) + ON CONFLICT (estimatedvehiclejourney, "order") + DO UPDATE SET + stoppointref = EXCLUDED.stoppointref, + aimeddeparturetime = EXCLUDED.aimeddeparturetime, + expecteddeparturetime = EXCLUDED.expecteddeparturetime, + aimedarrivaltime = EXCLUDED.aimedarrivaltime, + expectedarrivaltime = EXCLUDED.expectedarrivaltime, + cancellation = EXCLUDED.cancellation, + estimated_data = EXCLUDED.estimated_data + RETURNING CASE WHEN xmax = 0 THEN 'insert' ELSE 'update' END, id; + ` + stmt, err := db.Prepare(query) + if err != nil { + return 0, "", fmt.Errorf("error preparing statement: %v", err) + } + defer stmt.Close() - query := ` - INSERT INTO calls ( - estimatedvehiclejourney, "order", stoppointref, - aimeddeparturetime, expecteddeparturetime, - aimedarrivaltime, expectedarrivaltime, - cancellation, estimated_data - ) - VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9) - ON CONFLICT (estimatedvehiclejourney, "order") - DO UPDATE SET - stoppointref = EXCLUDED.stoppointref, - aimeddeparturetime = EXCLUDED.aimeddeparturetime, - expecteddeparturetime = EXCLUDED.expecteddeparturetime, - aimedarrivaltime = EXCLUDED.aimedarrivaltime, - expectedarrivaltime = EXCLUDED.expectedarrivaltime, - cancellation = EXCLUDED.cancellation, - estimated_data = EXCLUDED.estimated_data - RETURNING CASE WHEN xmax = 0 THEN 'insert' ELSE 'update' END, id; - ` - stmt, err := db.Prepare(query) - if err != nil { - return 0, "", fmt.Errorf("error preparing statement: %v", err) - } - defer stmt.Close() + err = valki.SetValkeyValue(ctx, valkeyClient, key, hashString) + if err != nil { + return 0, "", fmt.Errorf("failed to set value in Valkey: %v", err) + } - var action string - var id int - err = stmt.QueryRow(values...).Scan(&action, &id) - if err != nil { - return 0, "", fmt.Errorf("error executing statement: %v", err) + var action string + var id int + err = stmt.QueryRow(values...).Scan(&action, &id) + if err != nil { + return 0, "", fmt.Errorf("error executing statement: %v", err) + } + return id, action, nil + } else { + fmt.Println("Hashes match") + return 0, "no update", nil } - return id, action, nil } From fe72a507e165a5fcbabd8d8e2f9164d3e9b805a5 Mon Sep 17 00:00:00 2001 From: pigwin Date: Thu, 9 Jan 2025 18:59:46 +0000 Subject: [PATCH 13/17] testing --- database/EstimatedCall.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/database/EstimatedCall.go b/database/EstimatedCall.go index 6d6110e..84a4508 100644 --- a/database/EstimatedCall.go +++ b/database/EstimatedCall.go @@ -28,12 +28,12 @@ func InsertOrUpdateEstimatedCall(ctx context.Context, db *sql.DB, values []inter } hash := md5.Sum([]byte(valuesString)) hashString := hex.EncodeToString(hash[:]) - fmt.Println("HashString:", hashString) + //fmt.Println("HashString:", hashString) estimatedVehicleJourneyID := values[0] orderID := values[1] key := fmt.Sprintf("%v.%v", estimatedVehicleJourneyID, orderID) - fmt.Printf("Estimated Vehicle Journey ID: %v, Order ID: %v\n", estimatedVehicleJourneyID, orderID) + //fmt.Printf("Estimated Vehicle Journey ID: %v, Order ID: %v\n", estimatedVehicleJourneyID, orderID) var err error @@ -42,6 +42,7 @@ func InsertOrUpdateEstimatedCall(ctx context.Context, db *sql.DB, values []inter if err != nil { return 0, "", fmt.Errorf("failed to get value from Valkey: %v", err) } + fmt.Printf("Original Hash: %s, Retrieved Hash: %s\n", hashString, retrievedHash) // Check if the retrieved value matches the original MD5 hash if retrievedHash != hashString { From 083a267b2a09979cbd4cb9fe2c538d5b4e603ace Mon Sep 17 00:00:00 2001 From: pigwin Date: Thu, 9 Jan 2025 19:06:51 +0000 Subject: [PATCH 14/17] WTF hash match not do the match thing lol --- database/EstimatedCall.go | 3 +-- main.go | 1 + 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/database/EstimatedCall.go b/database/EstimatedCall.go index 84a4508..691ab11 100644 --- a/database/EstimatedCall.go +++ b/database/EstimatedCall.go @@ -42,7 +42,6 @@ func InsertOrUpdateEstimatedCall(ctx context.Context, db *sql.DB, values []inter if err != nil { return 0, "", fmt.Errorf("failed to get value from Valkey: %v", err) } - fmt.Printf("Original Hash: %s, Retrieved Hash: %s\n", hashString, retrievedHash) // Check if the retrieved value matches the original MD5 hash if retrievedHash != hashString { @@ -84,7 +83,7 @@ func InsertOrUpdateEstimatedCall(ctx context.Context, db *sql.DB, values []inter } return id, action, nil } else { - fmt.Println("Hashes match") + fmt.Printf("MATCH!!! Original Hash: %s, Retrieved Hash: %s\n", hashString, retrievedHash) return 0, "no update", nil } } diff --git a/main.go b/main.go index fbff506..f215944 100644 --- a/main.go +++ b/main.go @@ -9,6 +9,7 @@ import ( ) func main() { + log.Println("ti1 v0.1.2 - 09.01...") log.Println("Starting...") // Setup the database From d996411adfc599da8b62d4e573aaa74d2c577f01 Mon Sep 17 00:00:00 2001 From: pigwin Date: Thu, 9 Jan 2025 19:28:10 +0000 Subject: [PATCH 15/17] inprove reporting???!?!!!!!. --- database/EstimatedCall.go | 4 ++-- export/database.go | 10 +++++++--- main.go | 2 +- 3 files changed, 10 insertions(+), 6 deletions(-) diff --git a/database/EstimatedCall.go b/database/EstimatedCall.go index 691ab11..0321ca7 100644 --- a/database/EstimatedCall.go +++ b/database/EstimatedCall.go @@ -83,7 +83,7 @@ func InsertOrUpdateEstimatedCall(ctx context.Context, db *sql.DB, values []inter } return id, action, nil } else { - fmt.Printf("MATCH!!! Original Hash: %s, Retrieved Hash: %s\n", hashString, retrievedHash) - return 0, "no update", nil + //fmt.Printf("MATCH!!! Original Hash: %s, Retrieved Hash: %s\n", hashString, retrievedHash) + return 0, "none", nil } } diff --git a/export/database.go b/export/database.go index c6fd439..3841499 100644 --- a/export/database.go +++ b/export/database.go @@ -38,7 +38,7 @@ func DBData(data *data.Data) { fmt.Println("SID:", sid) // counters - var insertCount, updateCount, totalCount, estimatedCallInsertCount, estimatedCallUpdateCount, recordedCallInsertCount, recordedCallUpdateCount int + var insertCount, updateCount, totalCount, estimatedCallInsertCount, estimatedCallUpdateCount, estimatedCallNoneCount, recordedCallInsertCount, recordedCallUpdateCount int for _, journey := range data.ServiceDelivery.EstimatedTimetableDelivery[0].EstimatedJourneyVersionFrame.EstimatedVehicleJourney { var values []interface{} @@ -169,12 +169,13 @@ func DBData(data *data.Data) { //fmt.Printf("Inserts: %d, Updates: %d, Total: %d\n", insertCount, updateCount, totalCount) if totalCount%1000 == 0 { fmt.Printf( - "Inserts: %d, Updates: %d, Total: %d; estimatedCalls = I: %d U: %d; recordedCalls = I: %d U: %d\n", + "Inserts: %d, Updates: %d, Total: %d; estimatedCalls = I: %d U: %d N: %d; recordedCalls = I: %d U: %d\n", insertCount, updateCount, totalCount, estimatedCallInsertCount, estimatedCallUpdateCount, + estimatedCallNoneCount, recordedCallInsertCount, recordedCallUpdateCount, ) @@ -313,6 +314,8 @@ func DBData(data *data.Data) { estimatedCallInsertCount++ } else if action == "update" { estimatedCallUpdateCount++ + } else if action == "none" { + estimatedCallNoneCount++ } } } @@ -397,12 +400,13 @@ func DBData(data *data.Data) { } fmt.Printf( - "DONE: Inserts: %d, Updates: %d, Total: %d; estimatedCalls = I: %d U: %d; recordedCalls = I: %d U: %d\n", + "DONE: Inserts: %d, Updates: %d, Total: %d; estimatedCalls = I: %d U: %d N: %d; recordedCalls = I: %d U: %d\n", insertCount, updateCount, totalCount, estimatedCallInsertCount, estimatedCallUpdateCount, + estimatedCallNoneCount, recordedCallInsertCount, recordedCallUpdateCount, ) diff --git a/main.go b/main.go index f215944..2000e0b 100644 --- a/main.go +++ b/main.go @@ -9,7 +9,7 @@ import ( ) func main() { - log.Println("ti1 v0.1.2 - 09.01...") + log.Println("ti1 v0.1.2 - 09.01.2025..") log.Println("Starting...") // Setup the database From afd6b12acf6cb5ed0d130294441d45110bdc36e3 Mon Sep 17 00:00:00 2001 From: pigwin Date: Thu, 9 Jan 2025 19:54:11 +0000 Subject: [PATCH 16/17] chat gpt said this would work and i trust it like 2 % --- export/database.go | 1 + valki/commands.go | 3 ++- 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/export/database.go b/export/database.go index 3841499..6789c17 100644 --- a/export/database.go +++ b/export/database.go @@ -418,6 +418,7 @@ func DBData(data *data.Data) { serviceDeliveryJsonObject["Updates"] = updateCount serviceDeliveryJsonObject["EstimatedCallInserts"] = estimatedCallInsertCount serviceDeliveryJsonObject["EstimatedCallUpdates"] = estimatedCallUpdateCount + serviceDeliveryJsonObject["EstimatedCallNone"] = estimatedCallNoneCount serviceDeliveryJsonObject["RecordedCallInserts"] = recordedCallInsertCount serviceDeliveryJsonObject["RecordedCallUpdates"] = recordedCallUpdateCount diff --git a/valki/commands.go b/valki/commands.go index 3195fc4..a17746f 100644 --- a/valki/commands.go +++ b/valki/commands.go @@ -3,12 +3,13 @@ package valki import ( "context" "fmt" + "time" "github.com/valkey-io/valkey-go" ) func SetValkeyValue(ctx context.Context, client valkey.Client, key, value string) error { - err := client.Do(ctx, client.B().Set().Key(key).Value(value).Build()).Error() + err := client.Do(ctx, client.B().Set().Key(key).Value(value).Ex(time.Hour).Build()).Error() if err != nil { return fmt.Errorf("failed to set value in Valkey: %v", err) } From 14af72959ed618ac006d98b0ad5e7a6a6b45633d Mon Sep 17 00:00:00 2001 From: pigwin Date: Thu, 9 Jan 2025 20:13:55 +0000 Subject: [PATCH 17/17] almost release v0.2 --- README.md | 38 +++++++++++++++++++++++++++++--------- main.go | 2 +- 2 files changed, 30 insertions(+), 10 deletions(-) diff --git a/README.md b/README.md index bb6e0a1..02c41e2 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,6 @@ # TI1 -The best thing to happen since yesterday at 3 pm +The best thing to happen since yesterday at 2:57 pm ## Usage @@ -15,7 +15,7 @@ services: container_name: postgres-db environment: POSTGRES_USER: postgres - POSTGRES_PASSWORD: Root Password + POSTGRES_PASSWORD: RootPassword POSTGRES_DB: ti1 ports: - "5432:5432" @@ -29,31 +29,51 @@ services: interval: 10s retries: 5 restart: always # Ensure the container always restarts - + + valkey: + image: valkey/valkey:latest + container_name: valkey + environment: + VALKEY_PASSWORD: the_valkey_password + ports: + - "6379:6379" + volumes: + - ./valkey_data:/data + networks: + - app-network + restart: always # Ensure the container always restarts + ti1-container: - image: pigwin1/ti1:v0.1.1 + image: pigwin1/ti1:dev container_name: ti1-container environment: DB_HOST: db DB_PORT: 5432 DB_USER: ti1 - DB_PASSWORD: ti1 password + DB_PASSWORD: ti1password DB_NAME: ti1 DB_SSLMODE: disable + VALKEY_HOST: valkey + VALKEY_PORT: 6379 + VALKEY_PASSWORD: the_valkey_password depends_on: db: condition: service_healthy # Wait until the db service is healthy + valkey: + condition: service_started # Wait until the valkey service is started networks: - app-network restart: always # Ensure the container always restarts - + networks: app-network: driver: bridge - + volumes: postgres_data: driver: local + valkey_data: + driver: local ``` Create `init.sql` @@ -62,7 +82,7 @@ Create `init.sql` DO $$ BEGIN IF NOT EXISTS (SELECT FROM pg_roles WHERE rolname = 'post') THEN - CREATE ROLE post WITH LOGIN PASSWORD 'post password'; + CREATE ROLE post WITH LOGIN PASSWORD 'postpassword'; GRANT ALL PRIVILEGES ON DATABASE ti1 TO post; ALTER ROLE post WITH SUPERUSER; END IF; @@ -73,7 +93,7 @@ $$; DO $$ BEGIN IF NOT EXISTS (SELECT FROM pg_roles WHERE rolname = 'ti1') THEN - CREATE ROLE ti1 WITH LOGIN PASSWORD 'ti1 password'; + CREATE ROLE ti1 WITH LOGIN PASSWORD 'ti1password'; GRANT ALL PRIVILEGES ON DATABASE ti1 TO ti1; GRANT ALL PRIVILEGES ON ALL TABLES IN SCHEMA public TO ti1; GRANT ALL PRIVILEGES ON ALL SEQUENCES IN SCHEMA public TO ti1; diff --git a/main.go b/main.go index 2000e0b..a76df14 100644 --- a/main.go +++ b/main.go @@ -9,7 +9,7 @@ import ( ) func main() { - log.Println("ti1 v0.1.2 - 09.01.2025..") + log.Println("ti1 v0.2") log.Println("Starting...") // Setup the database