diff --git a/.github/workflows/docker-image.yml b/.github/workflows/docker-image.yml index c657c2b..74acbda 100644 --- a/.github/workflows/docker-image.yml +++ b/.github/workflows/docker-image.yml @@ -4,7 +4,7 @@ on: push: branches: - main - - test + - dev jobs: build: diff --git a/README.md b/README.md index bb6e0a1..02c41e2 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,6 @@ # TI1 -The best thing to happen since yesterday at 3 pm +The best thing to happen since yesterday at 2:57 pm ## Usage @@ -15,7 +15,7 @@ services: container_name: postgres-db environment: POSTGRES_USER: postgres - POSTGRES_PASSWORD: Root Password + POSTGRES_PASSWORD: RootPassword POSTGRES_DB: ti1 ports: - "5432:5432" @@ -29,31 +29,51 @@ services: interval: 10s retries: 5 restart: always # Ensure the container always restarts - + + valkey: + image: valkey/valkey:latest + container_name: valkey + environment: + VALKEY_PASSWORD: the_valkey_password + ports: + - "6379:6379" + volumes: + - ./valkey_data:/data + networks: + - app-network + restart: always # Ensure the container always restarts + ti1-container: - image: pigwin1/ti1:v0.1.1 + image: pigwin1/ti1:dev container_name: ti1-container environment: DB_HOST: db DB_PORT: 5432 DB_USER: ti1 - DB_PASSWORD: ti1 password + DB_PASSWORD: ti1password DB_NAME: ti1 DB_SSLMODE: disable + VALKEY_HOST: valkey + VALKEY_PORT: 6379 + VALKEY_PASSWORD: the_valkey_password depends_on: db: condition: service_healthy # Wait until the db service is healthy + valkey: + condition: service_started # Wait until the valkey service is started networks: - app-network restart: always # Ensure the container always restarts - + networks: app-network: driver: bridge - + volumes: postgres_data: driver: local + valkey_data: + driver: local ``` Create `init.sql` @@ -62,7 +82,7 @@ Create `init.sql` DO $$ BEGIN IF NOT EXISTS (SELECT FROM pg_roles WHERE rolname = 'post') THEN - CREATE ROLE post WITH LOGIN PASSWORD 'post password'; + CREATE ROLE post WITH LOGIN PASSWORD 'postpassword'; GRANT ALL PRIVILEGES ON DATABASE ti1 TO post; ALTER ROLE post WITH SUPERUSER; END IF; @@ -73,7 +93,7 @@ $$; DO $$ BEGIN IF NOT EXISTS (SELECT FROM pg_roles WHERE rolname = 'ti1') THEN - CREATE ROLE ti1 WITH LOGIN PASSWORD 'ti1 password'; + CREATE ROLE ti1 WITH LOGIN PASSWORD 'ti1password'; GRANT ALL PRIVILEGES ON DATABASE ti1 TO ti1; GRANT ALL PRIVILEGES ON ALL TABLES IN SCHEMA public TO ti1; GRANT ALL PRIVILEGES ON ALL SEQUENCES IN SCHEMA public TO ti1; diff --git a/config/conf.json b/config/conf.json index eefb956..b68d93a 100644 --- a/config/conf.json +++ b/config/conf.json @@ -7,5 +7,12 @@ "dbname": "ti1", "sslmode": "disable" }, + "valkey": { + "host": "127.0.0.1", + "port": "6379", + "max_conns": 50, + "timeout_ms": 5000, + "password": "the_valkey_password" + }, "temp": "value" } \ No newline at end of file diff --git a/config/config.go b/config/config.go index f59e215..79d212e 100644 --- a/config/config.go +++ b/config/config.go @@ -4,6 +4,7 @@ import ( "encoding/json" "fmt" "os" + "strconv" ) type Config struct { @@ -15,6 +16,13 @@ type Config struct { DBName string `json:"dbname"` SSLMode string `json:"sslmode"` } `json:"database"` + Valkey struct { + Host string `json:"host"` + Port string `json:"port"` + MaxConns int `json:"max_conns"` + TimeoutMs int `json:"timeout_ms"` + Password string `json:"password"` // Add this line + } `json:"valkey"` Temp string `json:"temp"` } @@ -53,7 +61,24 @@ func LoadConfig(file string) (Config, error) { if temp := os.Getenv("TEMP"); temp != "" { config.Temp = temp } - //log.Println("Temp value:", config.Temp) + + // Override Valkey settings with environment variables + if valkeyHost := os.Getenv("VALKEY_HOST"); valkeyHost != "" { + config.Valkey.Host = valkeyHost + } + if valkeyPort := os.Getenv("VALKEY_PORT"); valkeyPort != "" { + config.Valkey.Port = valkeyPort + } + if maxConns := os.Getenv("VALKEY_MAX_CONNS"); maxConns != "" { + if val, err := strconv.Atoi(maxConns); err == nil { + config.Valkey.MaxConns = val + } + } + if timeoutMs := os.Getenv("VALKEY_TIMEOUT_MS"); timeoutMs != "" { + if val, err := strconv.Atoi(timeoutMs); err == nil { + config.Valkey.TimeoutMs = val + } + } return config, nil } diff --git a/config/db.go b/config/db.go index ac747fc..51b8ef1 100644 --- a/config/db.go +++ b/config/db.go @@ -4,41 +4,47 @@ import ( "database/sql" "fmt" "log" + "time" _ "github.com/lib/pq" ) func ConnectToPostgreSQL() (*sql.DB, error) { - fmt.Println("Connecting to PostgreSQL...") - config, err := LoadConfig("config/conf.json") - if err != nil { - return nil, err - } + fmt.Println("Connecting to PostgreSQL...") + config, err := LoadConfig("config/conf.json") + if err != nil { + return nil, err + } - fmt.Println("Configuration loaded successfully!") + fmt.Println("Configuration loaded successfully!") - connStr := fmt.Sprintf("host=%s port=%s user=%s password=%s dbname=%s sslmode=%s", - config.Database.Host, config.Database.Port, config.Database.User, config.Database.Password, config.Database.DBName, config.Database.SSLMode) + connStr := fmt.Sprintf("host=%s port=%s user='%s' password='%s' dbname='%s' sslmode=%s", + config.Database.Host, config.Database.Port, config.Database.User, config.Database.Password, config.Database.DBName, config.Database.SSLMode) - // Open connection to database - db, err := sql.Open("postgres", connStr) - if err != nil { - return nil, err - } + // Open connection to database + db, err := sql.Open("postgres", connStr) + if err != nil { + return nil, err + } + + // Set connection pool settings + db.SetMaxOpenConns(25) // Maximum number of open connections to the database + db.SetMaxIdleConns(25) // Maximum number of connections in the idle connection pool + db.SetConnMaxLifetime(1 * time.Hour) // Maximum amount of time a connection may be reused fmt.Println("Connection to PostgreSQL opened successfully :D") - // Ping database to verify connection - err = db.Ping() + // Ping database to verify connection + err = db.Ping() - fmt.Println(err) - if err != nil { - return nil, err - } + fmt.Println(err) + if err != nil { + return nil, err + } - log.Println("Connected to PostgreSQL!") + log.Println("Connected to PostgreSQL!") - return db, nil + return db, nil } func DisconnectFromPostgreSQL(db *sql.DB) error { diff --git a/config/valkey.go b/config/valkey.go new file mode 100644 index 0000000..778bf6d --- /dev/null +++ b/config/valkey.go @@ -0,0 +1,100 @@ +package config + +import ( + "context" + "encoding/json" + "fmt" + "log" + "os" + "strconv" + "time" + + "github.com/valkey-io/valkey-go" +) + +type ValkeyConfig struct { + Host string `json:"host"` + Port string `json:"port"` + MaxConns int `json:"max_conns"` + TimeoutMs int `json:"timeout_ms"` + Password string `json:"password"` // Add this line +} + +func LoadValkeyConfig(file string) (ValkeyConfig, error) { + var config ValkeyConfig + configFile, err := os.Open(file) + if err != nil { + return config, fmt.Errorf("failed to open config file: %w", err) + } + defer configFile.Close() + + if err := json.NewDecoder(configFile).Decode(&config); err != nil { + return config, fmt.Errorf("failed to parse Valkey config: %w", err) + } + + // Override with environment variables if set + if host := os.Getenv("VALKEY_HOST"); host != "" { + config.Host = host + } + if port := os.Getenv("VALKEY_PORT"); port != "" { + config.Port = port + } + if maxConns := os.Getenv("VALKEY_MAX_CONNS"); maxConns != "" { + if val, err := strconv.Atoi(maxConns); err == nil { + config.MaxConns = val + } + } + if timeoutMs := os.Getenv("VALKEY_TIMEOUT_MS"); timeoutMs != "" { + if val, err := strconv.Atoi(timeoutMs); err == nil { + config.TimeoutMs = val + } + } + if password := os.Getenv("VALKEY_PASSWORD"); password != "" { + config.Password = password + } + + return config, nil +} + +func ConnectToValkey(configPath string) (valkey.Client, error) { + fmt.Println("Loading configuration...") + config, err := LoadConfig(configPath) + if err != nil { + return nil, fmt.Errorf("failed to load config: %v", err) + } + fmt.Println("Configuration loaded successfully!") + + valkeyConfig := config.Valkey + + // Setup Valkey client options + options := valkey.ClientOption{ + InitAddress: []string{fmt.Sprintf("%s:%s", valkeyConfig.Host, valkeyConfig.Port)}, + Password: valkeyConfig.Password, + // Additional options can be added here if required + } + + fmt.Printf("Connecting to Valkey at %s:%s...\n", valkeyConfig.Host, valkeyConfig.Port) + client, err := valkey.NewClient(options) + if err != nil { + return nil, fmt.Errorf("failed to connect to Valkey: %v", err) + } + + // Optionally, perform a ping to validate the connection + ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*time.Duration(valkeyConfig.TimeoutMs)) + defer cancel() + + if err := client.Do(ctx, client.B().Ping().Build()).Error(); err != nil { + client.Close() + return nil, fmt.Errorf("failed to ping Valkey: %v", err) + } + + log.Println("Connected to Valkey successfully!") + return client, nil +} + +func DisconnectFromValkey(client valkey.Client) error { + fmt.Println("Disconnecting from Valkey...") + client.Close() + log.Println("Disconnected from Valkey successfully!") + return nil +} diff --git a/database/EstimatedCall.go b/database/EstimatedCall.go index ac7cf3b..0321ca7 100644 --- a/database/EstimatedCall.go +++ b/database/EstimatedCall.go @@ -1,11 +1,17 @@ package database import ( + "context" + "crypto/md5" "database/sql" + "encoding/hex" "fmt" + "ti1/valki" + + "github.com/valkey-io/valkey-go" ) -func InsertOrUpdateEstimatedCall(db *sql.DB, values []interface{}) (int, string, error) { +func InsertOrUpdateEstimatedCall(ctx context.Context, db *sql.DB, values []interface{}, valkeyClient valkey.Client) (int, string, error) { // Replace empty strings with nil for timestamp fields for i, v := range values { if str, ok := v.(string); ok && str == "" { @@ -13,43 +19,71 @@ func InsertOrUpdateEstimatedCall(db *sql.DB, values []interface{}) (int, string, } } - query := ` - INSERT INTO calls ( - estimatedvehiclejourney, "order", stoppointref, - aimeddeparturetime, expecteddeparturetime, - aimedarrivaltime, expectedarrivaltime, - cancellation, estimated_data - ) - VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9) - ON CONFLICT (estimatedvehiclejourney, "order") - DO UPDATE SET - stoppointref = EXCLUDED.stoppointref, - aimeddeparturetime = EXCLUDED.aimeddeparturetime, - expecteddeparturetime = EXCLUDED.expecteddeparturetime, - aimedarrivaltime = EXCLUDED.aimedarrivaltime, - expectedarrivaltime = EXCLUDED.expectedarrivaltime, - cancellation = EXCLUDED.cancellation, - estimated_data = EXCLUDED.estimated_data - RETURNING CASE WHEN xmax = 0 THEN 'insert' ELSE 'update' END, id; - ` - stmt, err := db.Prepare(query) - if err != nil { - return 0, "", fmt.Errorf("error preparing statement: %v", err) - } - defer stmt.Close() - - var action string - var id int - err = stmt.QueryRow(values...).Scan(&action, &id) - if err != nil { - if 1 == 0 { - fmt.Println("Executing query:", query) - for i, v := range values { - fmt.Printf("Value %d: (%v)\n", i+1, v) - } - + // Convert values to a single string and hash it using MD5 + var valuesString string + for _, v := range values { + if v != nil { + valuesString += fmt.Sprintf("%v", v) } - return 0, "", fmt.Errorf("error executing statement: %v", err) } - return id, action, nil + hash := md5.Sum([]byte(valuesString)) + hashString := hex.EncodeToString(hash[:]) + //fmt.Println("HashString:", hashString) + + estimatedVehicleJourneyID := values[0] + orderID := values[1] + key := fmt.Sprintf("%v.%v", estimatedVehicleJourneyID, orderID) + //fmt.Printf("Estimated Vehicle Journey ID: %v, Order ID: %v\n", estimatedVehicleJourneyID, orderID) + + var err error + + // Get the MD5 hash from Valkey + retrievedHash, err := valki.GetValkeyValue(ctx, valkeyClient, key) + if err != nil { + return 0, "", fmt.Errorf("failed to get value from Valkey: %v", err) + } + + // Check if the retrieved value matches the original MD5 hash + if retrievedHash != hashString { + query := ` + INSERT INTO calls ( + estimatedvehiclejourney, "order", stoppointref, + aimeddeparturetime, expecteddeparturetime, + aimedarrivaltime, expectedarrivaltime, + cancellation, estimated_data + ) + VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9) + ON CONFLICT (estimatedvehiclejourney, "order") + DO UPDATE SET + stoppointref = EXCLUDED.stoppointref, + aimeddeparturetime = EXCLUDED.aimeddeparturetime, + expecteddeparturetime = EXCLUDED.expecteddeparturetime, + aimedarrivaltime = EXCLUDED.aimedarrivaltime, + expectedarrivaltime = EXCLUDED.expectedarrivaltime, + cancellation = EXCLUDED.cancellation, + estimated_data = EXCLUDED.estimated_data + RETURNING CASE WHEN xmax = 0 THEN 'insert' ELSE 'update' END, id; + ` + stmt, err := db.Prepare(query) + if err != nil { + return 0, "", fmt.Errorf("error preparing statement: %v", err) + } + defer stmt.Close() + + err = valki.SetValkeyValue(ctx, valkeyClient, key, hashString) + if err != nil { + return 0, "", fmt.Errorf("failed to set value in Valkey: %v", err) + } + + var action string + var id int + err = stmt.QueryRow(values...).Scan(&action, &id) + if err != nil { + return 0, "", fmt.Errorf("error executing statement: %v", err) + } + return id, action, nil + } else { + //fmt.Printf("MATCH!!! Original Hash: %s, Retrieved Hash: %s\n", hashString, retrievedHash) + return 0, "none", nil + } } diff --git a/export/database.go b/export/database.go index 4b5c9fe..6789c17 100644 --- a/export/database.go +++ b/export/database.go @@ -1,6 +1,7 @@ package export import ( + "context" "encoding/json" "fmt" "log" @@ -20,6 +21,15 @@ func DBData(data *data.Data) { } defer db.Close() + // Connect to Valkey + valkeyClient, err := config.ConnectToValkey("config/conf.json") + if err != nil { + log.Fatalf("Failed to connect to Valkey: %v", err) + } + defer config.DisconnectFromValkey(valkeyClient) + + ctx := context.Background() + // Get service id aka sid sid, err := database.InsertServiceDelivery(db, data.ServiceDelivery.ResponseTimestamp, data.ServiceDelivery.EstimatedTimetableDelivery[0].EstimatedJourneyVersionFrame.RecordedAtTime) if err != nil { @@ -28,7 +38,7 @@ func DBData(data *data.Data) { fmt.Println("SID:", sid) // counters - var insertCount, updateCount, totalCount, estimatedCallInsertCount, estimatedCallUpdateCount, recordedCallInsertCount, recordedCallUpdateCount int + var insertCount, updateCount, totalCount, estimatedCallInsertCount, estimatedCallUpdateCount, estimatedCallNoneCount, recordedCallInsertCount, recordedCallUpdateCount int for _, journey := range data.ServiceDelivery.EstimatedTimetableDelivery[0].EstimatedJourneyVersionFrame.EstimatedVehicleJourney { var values []interface{} @@ -159,12 +169,13 @@ func DBData(data *data.Data) { //fmt.Printf("Inserts: %d, Updates: %d, Total: %d\n", insertCount, updateCount, totalCount) if totalCount%1000 == 0 { fmt.Printf( - "Inserts: %d, Updates: %d, Total: %d; estimatedCalls = I: %d U: %d; recordedCalls = I: %d U: %d\n", + "Inserts: %d, Updates: %d, Total: %d; estimatedCalls = I: %d U: %d N: %d; recordedCalls = I: %d U: %d\n", insertCount, updateCount, totalCount, estimatedCallInsertCount, estimatedCallUpdateCount, + estimatedCallNoneCount, recordedCallInsertCount, recordedCallUpdateCount, ) @@ -291,9 +302,9 @@ func DBData(data *data.Data) { for i, v := range stringValues { interfaceValues[i] = v } - id, action, err := database.InsertOrUpdateEstimatedCall(db, interfaceValues) + id, action, err := database.InsertOrUpdateEstimatedCall(ctx, db, interfaceValues, valkeyClient) if err != nil { - fmt.Printf("Error inserting/updating estimated call: %v\n", err) + log.Fatalf("Failed to insert or update estimated call: %v", err) } else { if 1 == 0 { fmt.Printf("Action: %s, ID: %d\n", action, id) @@ -303,6 +314,8 @@ func DBData(data *data.Data) { estimatedCallInsertCount++ } else if action == "update" { estimatedCallUpdateCount++ + } else if action == "none" { + estimatedCallNoneCount++ } } } @@ -387,12 +400,13 @@ func DBData(data *data.Data) { } fmt.Printf( - "DONE: Inserts: %d, Updates: %d, Total: %d; estimatedCalls = I: %d U: %d; recordedCalls = I: %d U: %d\n", + "DONE: Inserts: %d, Updates: %d, Total: %d; estimatedCalls = I: %d U: %d N: %d; recordedCalls = I: %d U: %d\n", insertCount, updateCount, totalCount, estimatedCallInsertCount, estimatedCallUpdateCount, + estimatedCallNoneCount, recordedCallInsertCount, recordedCallUpdateCount, ) @@ -404,6 +418,7 @@ func DBData(data *data.Data) { serviceDeliveryJsonObject["Updates"] = updateCount serviceDeliveryJsonObject["EstimatedCallInserts"] = estimatedCallInsertCount serviceDeliveryJsonObject["EstimatedCallUpdates"] = estimatedCallUpdateCount + serviceDeliveryJsonObject["EstimatedCallNone"] = estimatedCallNoneCount serviceDeliveryJsonObject["RecordedCallInserts"] = recordedCallInsertCount serviceDeliveryJsonObject["RecordedCallUpdates"] = recordedCallUpdateCount @@ -418,4 +433,4 @@ func DBData(data *data.Data) { if err != nil { log.Fatal(err) } -} \ No newline at end of file +} diff --git a/go.mod b/go.mod index 2e16502..4db7bea 100644 --- a/go.mod +++ b/go.mod @@ -3,3 +3,8 @@ module ti1 go 1.23.4 require github.com/lib/pq v1.10.9 + +require ( + github.com/valkey-io/valkey-go v1.0.52 // indirect + golang.org/x/sys v0.24.0 // indirect +) diff --git a/go.sum b/go.sum index aeddeae..4ba6c43 100644 --- a/go.sum +++ b/go.sum @@ -1,2 +1,6 @@ github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw= github.com/lib/pq v1.10.9/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o= +github.com/valkey-io/valkey-go v1.0.52 h1:ojrR736satGucqpllYzal8fUrNNROc11V10zokAyIYg= +github.com/valkey-io/valkey-go v1.0.52/go.mod h1:BXlVAPIL9rFQinSFM+N32JfWzfCaUAqBpZkc4vPY6fM= +golang.org/x/sys v0.24.0 h1:Twjiwq9dn6R1fQcyiK+wQyHWfaz/BJB+YIpzU/Cv3Xg= +golang.org/x/sys v0.24.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= diff --git a/main.go b/main.go index fbff506..a76df14 100644 --- a/main.go +++ b/main.go @@ -9,6 +9,7 @@ import ( ) func main() { + log.Println("ti1 v0.2") log.Println("Starting...") // Setup the database diff --git a/valki/commands.go b/valki/commands.go new file mode 100644 index 0000000..a17746f --- /dev/null +++ b/valki/commands.go @@ -0,0 +1,26 @@ +package valki + +import ( + "context" + "fmt" + "time" + + "github.com/valkey-io/valkey-go" +) + +func SetValkeyValue(ctx context.Context, client valkey.Client, key, value string) error { + err := client.Do(ctx, client.B().Set().Key(key).Value(value).Ex(time.Hour).Build()).Error() + if err != nil { + return fmt.Errorf("failed to set value in Valkey: %v", err) + } + return nil +} + +func GetValkeyValue(ctx context.Context, client valkey.Client, key string) (string, error) { + value, err := client.Do(ctx, client.B().Get().Key(key).Build()).ToString() + if err != nil { + return "hehe", nil + //return "", fmt.Errorf("failed to get value from Valkey: %v", err) + } + return value, nil +}