45 Commits

Author SHA1 Message Date
pigwin-3
497b5a6e86 yep 2025-08-15 16:12:43 +02:00
pigwin-3
8b56bf8370 increase time till deletion to 90 min 2025-06-07 14:18:48 +02:00
pigwin
c45723a5b0 v0.2.1 2025-01-11 21:09:27 +01:00
pigwin
779530f036 v0.2.1
added changes
2025-01-11 21:03:15 +01:00
pigwin
fd43b3e4bb nvm that esitmated via part 2025-01-11 19:51:41 +00:00
pigwin
bed666bd81 use valkey for Estimated Vehicle 2025-01-11 18:30:53 +00:00
pigwin
c09ea5784a woops forgot that part 2025-01-11 17:38:34 +00:00
pigwin
8af34a9ab3 InsertOrUpdateRecordedCall use valkey :) 2025-01-11 17:26:43 +00:00
pigwin
5bdc2acd81 Merge pull request #4 from pigwin-3/main
sync
2025-01-11 17:59:40 +01:00
pigwin
3058d98a67 v0.2.0 2025-01-09 20:23:38 +00:00
pigwin
21b3bba164 v0.2
just for the github action to run
2025-01-09 21:17:25 +01:00
pigwin
c08bcb8432 Merge pull request #3 from pigwin-3/dev
v0.2
2025-01-09 21:14:34 +01:00
pigwin
14af72959e almost release v0.2 2025-01-09 20:13:55 +00:00
pigwin
afd6b12acf chat gpt said this would work and i trust it like 2 % 2025-01-09 19:54:11 +00:00
pigwin
d996411adf inprove reporting???!?!!!!!. 2025-01-09 19:28:10 +00:00
pigwin
083a267b2a WTF hash match not do the match thing lol 2025-01-09 19:06:51 +00:00
pigwin
fe72a507e1 testing 2025-01-09 18:59:46 +00:00
pigwin
5203208fe7 hash validation and database insertion logic 2025-01-09 18:05:17 +00:00
pigwin
2141c5f168 hehe 2025-01-09 17:56:41 +00:00
pigwin
9919c159f2 gotta fix that valkey shit later 2025-01-07 20:39:47 +00:00
pigwin
07838da0ad lolololololollollollol 2025-01-07 20:33:29 +00:00
pigwin
b8a2a5837f hehehehehe mek valke wurke 2025-01-07 20:13:23 +00:00
pigwin
df0b5135bd wtf so mutch pain cus i named a package wrong lol 2025-01-07 19:51:14 +00:00
pigwin
a2c1766dd1 refactor: improve connection string formatting and enhance logging in ConnectToPostgreSQL function 2025-01-07 18:54:49 +00:00
pigwin
97a6506a65 valkey testy 2025-01-07 18:36:27 +00:00
pigwin
ba558803ff add password configuration to Valkey settings 2025-01-07 18:28:47 +00:00
pigwin
c6fc0070cf add Valkey configuration and connection management 2025-01-06 20:47:55 +00:00
pigwin
c456bdecdb hash for valky or reddis or smth else 2025-01-06 15:32:04 +00:00
pigwin
1020dacf79 add connection pool settings in ConnectToPostgreSQL function 2025-01-04 22:07:05 +00:00
pigwin
6c20858280 remove database connection pool settings from ConnectToPostgreSQL function 2025-01-04 21:30:01 +00:00
pigwin
119898acc5 Merge branch 'test' 2025-01-04 21:27:15 +00:00
pigwin
51fb986710 didnt work well 2025-01-04 21:26:09 +00:00
pigwin
3c1b84197a idk i just hope it works 2025-01-04 21:15:45 +00:00
pigwin
75e007603f chat gpt pls dp me help, yess 2025-01-04 20:57:01 +00:00
pigwin
a50ef5b899 i have no idea wtf im doin 2025-01-04 20:53:24 +00:00
pigwin
def3a9c38c add time package import in db.go 2025-01-04 20:30:05 +00:00
pigwin
c1992f7616 configure database connection settings for improved performance 2025-01-04 20:27:01 +00:00
pigwin
a274810818 Update docker-image.yml 2025-01-04 21:24:23 +01:00
pigwin
42b75360c4 test 2025-01-04 20:19:39 +00:00
pigwin
c99f22b131 lets undo that????? 2025-01-04 20:14:51 +00:00
pigwin
7b96214476 goroutines??? trying to make go fast lol 2025-01-04 20:11:30 +00:00
pigwin
c12ec02270 enhance PostgreSQL connection handling with connection pool settings 2025-01-04 17:07:41 +00:00
pigwin
6db5b12d7b update Dockerfile to use Go version 1.23.4 2025-01-04 16:06:19 +00:00
pigwin
8f372b4213 add log package import in data.go 2025-01-04 16:04:13 +00:00
pigwin
ccbf9c8d72 update Go version to 1.23.4 and add logging for data fetching 2025-01-04 16:01:17 +00:00
16 changed files with 445 additions and 100 deletions

View File

@@ -4,6 +4,7 @@ on:
push: push:
branches: branches:
- main - main
- dev
jobs: jobs:
build: build:

View File

@@ -1,5 +1,5 @@
# Use the official Golang image as the base image # Use the official Golang image as the base image
FROM golang:1.22.1 FROM golang:1.23.4
# Set the Current Working Directory inside the container # Set the Current Working Directory inside the container
WORKDIR /app WORKDIR /app

View File

@@ -1,6 +1,6 @@
# TI1 # TI1
The best thing to happen since yesterday at 3 pm The best thing to happen since yesterday at 2:56 pm
## Usage ## Usage
@@ -15,7 +15,7 @@ services:
container_name: postgres-db container_name: postgres-db
environment: environment:
POSTGRES_USER: postgres POSTGRES_USER: postgres
POSTGRES_PASSWORD: Root Password POSTGRES_PASSWORD: RootPassword
POSTGRES_DB: ti1 POSTGRES_DB: ti1
ports: ports:
- "5432:5432" - "5432:5432"
@@ -30,19 +30,37 @@ services:
retries: 5 retries: 5
restart: always # Ensure the container always restarts restart: always # Ensure the container always restarts
valkey:
image: valkey/valkey:latest
container_name: valkey
environment:
VALKEY_PASSWORD: the_valkey_password
ports:
- "6379:6379"
volumes:
- ./valkey_data:/data
networks:
- app-network
restart: always # Ensure the container always restarts
ti1-container: ti1-container:
image: pigwin1/ti1:v0.1.1 image: pigwin1/ti1:dev
container_name: ti1-container container_name: ti1-container
environment: environment:
DB_HOST: db DB_HOST: db
DB_PORT: 5432 DB_PORT: 5432
DB_USER: ti1 DB_USER: ti1
DB_PASSWORD: ti1 password DB_PASSWORD: ti1password
DB_NAME: ti1 DB_NAME: ti1
DB_SSLMODE: disable DB_SSLMODE: disable
VALKEY_HOST: valkey
VALKEY_PORT: 6379
VALKEY_PASSWORD: the_valkey_password
depends_on: depends_on:
db: db:
condition: service_healthy # Wait until the db service is healthy condition: service_healthy # Wait until the db service is healthy
valkey:
condition: service_started # Wait until the valkey service is started
networks: networks:
- app-network - app-network
restart: always # Ensure the container always restarts restart: always # Ensure the container always restarts
@@ -54,6 +72,8 @@ networks:
volumes: volumes:
postgres_data: postgres_data:
driver: local driver: local
valkey_data:
driver: local
``` ```
Create `init.sql` Create `init.sql`
@@ -62,7 +82,7 @@ Create `init.sql`
DO $$ DO $$
BEGIN BEGIN
IF NOT EXISTS (SELECT FROM pg_roles WHERE rolname = 'post') THEN IF NOT EXISTS (SELECT FROM pg_roles WHERE rolname = 'post') THEN
CREATE ROLE post WITH LOGIN PASSWORD 'post password'; CREATE ROLE post WITH LOGIN PASSWORD 'postpassword';
GRANT ALL PRIVILEGES ON DATABASE ti1 TO post; GRANT ALL PRIVILEGES ON DATABASE ti1 TO post;
ALTER ROLE post WITH SUPERUSER; ALTER ROLE post WITH SUPERUSER;
END IF; END IF;
@@ -73,7 +93,7 @@ $$;
DO $$ DO $$
BEGIN BEGIN
IF NOT EXISTS (SELECT FROM pg_roles WHERE rolname = 'ti1') THEN IF NOT EXISTS (SELECT FROM pg_roles WHERE rolname = 'ti1') THEN
CREATE ROLE ti1 WITH LOGIN PASSWORD 'ti1 password'; CREATE ROLE ti1 WITH LOGIN PASSWORD 'ti1password';
GRANT ALL PRIVILEGES ON DATABASE ti1 TO ti1; GRANT ALL PRIVILEGES ON DATABASE ti1 TO ti1;
GRANT ALL PRIVILEGES ON ALL TABLES IN SCHEMA public TO ti1; GRANT ALL PRIVILEGES ON ALL TABLES IN SCHEMA public TO ti1;
GRANT ALL PRIVILEGES ON ALL SEQUENCES IN SCHEMA public TO ti1; GRANT ALL PRIVILEGES ON ALL SEQUENCES IN SCHEMA public TO ti1;

View File

@@ -7,5 +7,12 @@
"dbname": "ti1", "dbname": "ti1",
"sslmode": "disable" "sslmode": "disable"
}, },
"valkey": {
"host": "127.0.0.1",
"port": "6379",
"max_conns": 50,
"timeout_ms": 5000,
"password": "the_valkey_password"
},
"temp": "value" "temp": "value"
} }

View File

@@ -4,6 +4,7 @@ import (
"encoding/json" "encoding/json"
"fmt" "fmt"
"os" "os"
"strconv"
) )
type Config struct { type Config struct {
@@ -15,6 +16,13 @@ type Config struct {
DBName string `json:"dbname"` DBName string `json:"dbname"`
SSLMode string `json:"sslmode"` SSLMode string `json:"sslmode"`
} `json:"database"` } `json:"database"`
Valkey struct {
Host string `json:"host"`
Port string `json:"port"`
MaxConns int `json:"max_conns"`
TimeoutMs int `json:"timeout_ms"`
Password string `json:"password"` // Add this line
} `json:"valkey"`
Temp string `json:"temp"` Temp string `json:"temp"`
} }
@@ -53,7 +61,24 @@ func LoadConfig(file string) (Config, error) {
if temp := os.Getenv("TEMP"); temp != "" { if temp := os.Getenv("TEMP"); temp != "" {
config.Temp = temp config.Temp = temp
} }
//log.Println("Temp value:", config.Temp)
// Override Valkey settings with environment variables
if valkeyHost := os.Getenv("VALKEY_HOST"); valkeyHost != "" {
config.Valkey.Host = valkeyHost
}
if valkeyPort := os.Getenv("VALKEY_PORT"); valkeyPort != "" {
config.Valkey.Port = valkeyPort
}
if maxConns := os.Getenv("VALKEY_MAX_CONNS"); maxConns != "" {
if val, err := strconv.Atoi(maxConns); err == nil {
config.Valkey.MaxConns = val
}
}
if timeoutMs := os.Getenv("VALKEY_TIMEOUT_MS"); timeoutMs != "" {
if val, err := strconv.Atoi(timeoutMs); err == nil {
config.Valkey.TimeoutMs = val
}
}
return config, nil return config, nil
} }

View File

@@ -4,6 +4,7 @@ import (
"database/sql" "database/sql"
"fmt" "fmt"
"log" "log"
"time"
_ "github.com/lib/pq" _ "github.com/lib/pq"
) )
@@ -17,7 +18,7 @@ func ConnectToPostgreSQL() (*sql.DB, error) {
fmt.Println("Configuration loaded successfully!") fmt.Println("Configuration loaded successfully!")
connStr := fmt.Sprintf("host=%s port=%s user=%s password=%s dbname=%s sslmode=%s", connStr := fmt.Sprintf("host=%s port=%s user='%s' password='%s' dbname='%s' sslmode=%s",
config.Database.Host, config.Database.Port, config.Database.User, config.Database.Password, config.Database.DBName, config.Database.SSLMode) config.Database.Host, config.Database.Port, config.Database.User, config.Database.Password, config.Database.DBName, config.Database.SSLMode)
// Open connection to database // Open connection to database
@@ -26,7 +27,12 @@ func ConnectToPostgreSQL() (*sql.DB, error) {
return nil, err return nil, err
} }
fmt.Println("Connection to PostgreSQL opened successfully!") // Set connection pool settings
db.SetMaxOpenConns(25) // Maximum number of open connections to the database
db.SetMaxIdleConns(25) // Maximum number of connections in the idle connection pool
db.SetConnMaxLifetime(1 * time.Hour) // Maximum amount of time a connection may be reused
fmt.Println("Connection to PostgreSQL opened successfully :D")
// Ping database to verify connection // Ping database to verify connection
err = db.Ping() err = db.Ping()

100
config/valkey.go Normal file
View File

@@ -0,0 +1,100 @@
package config
import (
"context"
"encoding/json"
"fmt"
"log"
"os"
"strconv"
"time"
"github.com/valkey-io/valkey-go"
)
type ValkeyConfig struct {
Host string `json:"host"`
Port string `json:"port"`
MaxConns int `json:"max_conns"`
TimeoutMs int `json:"timeout_ms"`
Password string `json:"password"` // Add this line
}
func LoadValkeyConfig(file string) (ValkeyConfig, error) {
var config ValkeyConfig
configFile, err := os.Open(file)
if err != nil {
return config, fmt.Errorf("failed to open config file: %w", err)
}
defer configFile.Close()
if err := json.NewDecoder(configFile).Decode(&config); err != nil {
return config, fmt.Errorf("failed to parse Valkey config: %w", err)
}
// Override with environment variables if set
if host := os.Getenv("VALKEY_HOST"); host != "" {
config.Host = host
}
if port := os.Getenv("VALKEY_PORT"); port != "" {
config.Port = port
}
if maxConns := os.Getenv("VALKEY_MAX_CONNS"); maxConns != "" {
if val, err := strconv.Atoi(maxConns); err == nil {
config.MaxConns = val
}
}
if timeoutMs := os.Getenv("VALKEY_TIMEOUT_MS"); timeoutMs != "" {
if val, err := strconv.Atoi(timeoutMs); err == nil {
config.TimeoutMs = val
}
}
if password := os.Getenv("VALKEY_PASSWORD"); password != "" {
config.Password = password
}
return config, nil
}
func ConnectToValkey(configPath string) (valkey.Client, error) {
fmt.Println("Loading configuration...")
config, err := LoadConfig(configPath)
if err != nil {
return nil, fmt.Errorf("failed to load config: %v", err)
}
fmt.Println("Configuration loaded successfully!")
valkeyConfig := config.Valkey
// Setup Valkey client options
options := valkey.ClientOption{
InitAddress: []string{fmt.Sprintf("%s:%s", valkeyConfig.Host, valkeyConfig.Port)},
Password: valkeyConfig.Password,
// Additional options can be added here if required
}
fmt.Printf("Connecting to Valkey at %s:%s...\n", valkeyConfig.Host, valkeyConfig.Port)
client, err := valkey.NewClient(options)
if err != nil {
return nil, fmt.Errorf("failed to connect to Valkey: %v", err)
}
// Optionally, perform a ping to validate the connection
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*time.Duration(valkeyConfig.TimeoutMs))
defer cancel()
if err := client.Do(ctx, client.B().Ping().Build()).Error(); err != nil {
client.Close()
return nil, fmt.Errorf("failed to ping Valkey: %v", err)
}
log.Println("Connected to Valkey successfully!")
return client, nil
}
func DisconnectFromValkey(client valkey.Client) error {
fmt.Println("Disconnecting from Valkey...")
client.Close()
log.Println("Disconnected from Valkey successfully!")
return nil
}

View File

@@ -1,6 +1,7 @@
package data package data
import ( import (
"log"
"encoding/xml" "encoding/xml"
"net/http" "net/http"
) )
@@ -130,6 +131,7 @@ func FetchData(timestamp string) (*Data, error) {
requestorId := "ti1-" + timestamp requestorId := "ti1-" + timestamp
url := "https://api.entur.io/realtime/v1/rest/et?useOriginalId=true&maxSize=100000&requestorId=" + requestorId url := "https://api.entur.io/realtime/v1/rest/et?useOriginalId=true&maxSize=100000&requestorId=" + requestorId
log.Println("Fetching data from URL:", url)
resp, err := client.Get(url) resp, err := client.Get(url)
if err != nil { if err != nil {
return nil, err return nil, err

View File

@@ -1,11 +1,17 @@
package database package database
import ( import (
"context"
"crypto/md5"
"database/sql" "database/sql"
"encoding/hex"
"fmt" "fmt"
"ti1/valki"
"github.com/valkey-io/valkey-go"
) )
func InsertOrUpdateEstimatedCall(db *sql.DB, values []interface{}) (int, string, error) { func InsertOrUpdateEstimatedCall(ctx context.Context, db *sql.DB, values []interface{}, valkeyClient valkey.Client) (int, string, error) {
// Replace empty strings with nil for timestamp fields // Replace empty strings with nil for timestamp fields
for i, v := range values { for i, v := range values {
if str, ok := v.(string); ok && str == "" { if str, ok := v.(string); ok && str == "" {
@@ -13,6 +19,32 @@ func InsertOrUpdateEstimatedCall(db *sql.DB, values []interface{}) (int, string,
} }
} }
// Convert values to a single string and hash it using MD5
var valuesString string
for _, v := range values {
if v != nil {
valuesString += fmt.Sprintf("%v", v)
}
}
hash := md5.Sum([]byte(valuesString))
hashString := hex.EncodeToString(hash[:])
//fmt.Println("HashString:", hashString)
estimatedVehicleJourneyID := values[0]
orderID := values[1]
key := fmt.Sprintf("%v.%v", estimatedVehicleJourneyID, orderID)
//fmt.Printf("Estimated Vehicle Journey ID: %v, Order ID: %v\n", estimatedVehicleJourneyID, orderID)
var err error
// Get the MD5 hash from Valkey
retrievedHash, err := valki.GetValkeyValue(ctx, valkeyClient, key)
if err != nil {
return 0, "", fmt.Errorf("failed to get value from Valkey: %v", err)
}
// Check if the retrieved value matches the original MD5 hash
if retrievedHash != hashString {
query := ` query := `
INSERT INTO calls ( INSERT INTO calls (
estimatedvehiclejourney, "order", stoppointref, estimatedvehiclejourney, "order", stoppointref,
@@ -38,18 +70,20 @@ func InsertOrUpdateEstimatedCall(db *sql.DB, values []interface{}) (int, string,
} }
defer stmt.Close() defer stmt.Close()
err = valki.SetValkeyValue(ctx, valkeyClient, key, hashString)
if err != nil {
return 0, "", fmt.Errorf("failed to set value in Valkey: %v", err)
}
var action string var action string
var id int var id int
err = stmt.QueryRow(values...).Scan(&action, &id) err = stmt.QueryRow(values...).Scan(&action, &id)
if err != nil { if err != nil {
if 1 == 0 {
fmt.Println("Executing query:", query)
for i, v := range values {
fmt.Printf("Value %d: (%v)\n", i+1, v)
}
}
return 0, "", fmt.Errorf("error executing statement: %v", err) return 0, "", fmt.Errorf("error executing statement: %v", err)
} }
return id, action, nil return id, action, nil
} else {
//fmt.Printf("MATCH!!! Original Hash: %s, Retrieved Hash: %s\n", hashString, retrievedHash)
return 0, "none", nil
}
} }

View File

@@ -1,11 +1,17 @@
package database package database
import ( import (
"context"
"crypto/md5"
"database/sql" "database/sql"
"encoding/hex"
"fmt" "fmt"
"ti1/valki"
"github.com/valkey-io/valkey-go"
) )
func InsertOrUpdateRecordedCall(db *sql.DB, values []interface{}) (int, string, error) { func InsertOrUpdateRecordedCall(ctx context.Context, db *sql.DB, values []interface{}, valkeyClient valkey.Client) (int, string, error) {
// Replace empty strings with nil for timestamp fields // Replace empty strings with nil for timestamp fields
for i, v := range values { for i, v := range values {
if str, ok := v.(string); ok && str == "" { if str, ok := v.(string); ok && str == "" {
@@ -13,6 +19,30 @@ func InsertOrUpdateRecordedCall(db *sql.DB, values []interface{}) (int, string,
} }
} }
// Convert values to a single string and hash it using MD5
var valuesString string
for _, v := range values {
if v != nil {
valuesString += fmt.Sprintf("%v", v)
}
}
hash := md5.Sum([]byte(valuesString))
hashString := hex.EncodeToString(hash[:])
estimatedVehicleJourneyID := values[0]
orderID := values[1]
key := fmt.Sprintf("%v.%v", estimatedVehicleJourneyID, orderID)
var err error
// Get the MD5 hash from Valkey
retrievedHash, err := valki.GetValkeyValue(ctx, valkeyClient, key)
if err != nil {
return 0, "", fmt.Errorf("failed to get value from Valkey: %v", err)
}
// Check if the retrieved value matches the original MD5 hash
if retrievedHash != hashString {
query := ` query := `
INSERT INTO calls ( INSERT INTO calls (
estimatedvehiclejourney, "order", stoppointref, estimatedvehiclejourney, "order", stoppointref,
@@ -41,18 +71,19 @@ func InsertOrUpdateRecordedCall(db *sql.DB, values []interface{}) (int, string,
} }
defer stmt.Close() defer stmt.Close()
err = valki.SetValkeyValue(ctx, valkeyClient, key, hashString)
if err != nil {
return 0, "", fmt.Errorf("failed to set value in Valkey: %v", err)
}
var action string var action string
var id int var id int
err = stmt.QueryRow(values...).Scan(&action, &id) err = stmt.QueryRow(values...).Scan(&action, &id)
if err != nil { if err != nil {
if 1 == 0 {
fmt.Println("Executing query:", query)
for i, v := range values {
fmt.Printf("Value %d: (%v)\n", i+1, v)
}
}
return 0, "", fmt.Errorf("error executing statement: %v", err) return 0, "", fmt.Errorf("error executing statement: %v", err)
} }
return id, action, nil return id, action, nil
} else {
return 0, "none", nil
}
} }

61
docker-compose.yaml Normal file
View File

@@ -0,0 +1,61 @@
services:
db:
image: postgres:17.2
container_name: postgres-db
environment:
POSTGRES_USER: postgres
POSTGRES_PASSWORD: RootPassword
POSTGRES_DB: ti1
ports:
- "5432:5432"
volumes:
- /tmp/ti1/postgres_data:/var/lib/postgresql/data:z
- /tmp/ti1/init.sql:/docker-entrypoint-initdb.d/init.sql:ro,z
networks:
- app-network
healthcheck:
test: ["CMD", "pg_isready", "-U", "postgres", "-d", "ti1", "-h", "db"]
interval: 10s
retries: 5
restart: always
valkey:
image: valkey/valkey:latest
container_name: valkey
environment:
VALKEY_PASSWORD: the_valkey_password
ports:
- "6379:6379"
volumes:
- /tmp/ti1/valkey_data:/data:z
networks:
- app-network
restart: always
ti1-container:
build:
context: .
dockerfile: Dockerfile
container_name: ti1-container
environment:
DB_HOST: db
DB_PORT: 5432
DB_USER: postgres
DB_PASSWORD: RootPassword
DB_NAME: ti1
DB_SSLMODE: disable
VALKEY_HOST: valkey
VALKEY_PORT: 6379
VALKEY_PASSWORD: the_valkey_password
depends_on:
db:
condition: service_healthy
valkey:
condition: service_started
networks:
- app-network
restart: always
networks:
app-network:
driver: bridge

View File

@@ -1,6 +1,7 @@
package export package export
import ( import (
"context"
"encoding/json" "encoding/json"
"fmt" "fmt"
"log" "log"
@@ -20,6 +21,15 @@ func DBData(data *data.Data) {
} }
defer db.Close() defer db.Close()
// Connect to Valkey
valkeyClient, err := config.ConnectToValkey("config/conf.json")
if err != nil {
log.Fatalf("Failed to connect to Valkey: %v", err)
}
defer config.DisconnectFromValkey(valkeyClient)
ctx := context.Background()
// Get service id aka sid // Get service id aka sid
sid, err := database.InsertServiceDelivery(db, data.ServiceDelivery.ResponseTimestamp, data.ServiceDelivery.EstimatedTimetableDelivery[0].EstimatedJourneyVersionFrame.RecordedAtTime) sid, err := database.InsertServiceDelivery(db, data.ServiceDelivery.ResponseTimestamp, data.ServiceDelivery.EstimatedTimetableDelivery[0].EstimatedJourneyVersionFrame.RecordedAtTime)
if err != nil { if err != nil {
@@ -28,7 +38,7 @@ func DBData(data *data.Data) {
fmt.Println("SID:", sid) fmt.Println("SID:", sid)
// counters // counters
var insertCount, updateCount, totalCount, estimatedCallInsertCount, estimatedCallUpdateCount, recordedCallInsertCount, recordedCallUpdateCount int var insertCount, updateCount, totalCount, estimatedCallInsertCount, estimatedCallUpdateCount, estimatedCallNoneCount, recordedCallInsertCount, recordedCallUpdateCount, recordedCallNoneCount int
for _, journey := range data.ServiceDelivery.EstimatedTimetableDelivery[0].EstimatedJourneyVersionFrame.EstimatedVehicleJourney { for _, journey := range data.ServiceDelivery.EstimatedTimetableDelivery[0].EstimatedJourneyVersionFrame.EstimatedVehicleJourney {
var values []interface{} var values []interface{}
@@ -159,14 +169,16 @@ func DBData(data *data.Data) {
//fmt.Printf("Inserts: %d, Updates: %d, Total: %d\n", insertCount, updateCount, totalCount) //fmt.Printf("Inserts: %d, Updates: %d, Total: %d\n", insertCount, updateCount, totalCount)
if totalCount%1000 == 0 { if totalCount%1000 == 0 {
fmt.Printf( fmt.Printf(
"Inserts: %d, Updates: %d, Total: %d; estimatedCalls = I: %d U: %d; recordedCalls = I: %d U: %d\n", "Inserts: %d, Updates: %d, Total: %d; estimatedCalls = I: %d U: %d N: %d; recordedCalls = I: %d U: %d N: %d\n",
insertCount, insertCount,
updateCount, updateCount,
totalCount, totalCount,
estimatedCallInsertCount, estimatedCallInsertCount,
estimatedCallUpdateCount, estimatedCallUpdateCount,
estimatedCallNoneCount,
recordedCallInsertCount, recordedCallInsertCount,
recordedCallUpdateCount, recordedCallUpdateCount,
recordedCallNoneCount,
) )
} }
} }
@@ -291,9 +303,9 @@ func DBData(data *data.Data) {
for i, v := range stringValues { for i, v := range stringValues {
interfaceValues[i] = v interfaceValues[i] = v
} }
id, action, err := database.InsertOrUpdateEstimatedCall(db, interfaceValues) id, action, err := database.InsertOrUpdateEstimatedCall(ctx, db, interfaceValues, valkeyClient)
if err != nil { if err != nil {
fmt.Printf("Error inserting/updating estimated call: %v\n", err) log.Fatalf("Failed to insert or update estimated call: %v", err)
} else { } else {
if 1 == 0 { if 1 == 0 {
fmt.Printf("Action: %s, ID: %d\n", action, id) fmt.Printf("Action: %s, ID: %d\n", action, id)
@@ -303,6 +315,8 @@ func DBData(data *data.Data) {
estimatedCallInsertCount++ estimatedCallInsertCount++
} else if action == "update" { } else if action == "update" {
estimatedCallUpdateCount++ estimatedCallUpdateCount++
} else if action == "none" {
estimatedCallNoneCount++
} }
} }
} }
@@ -367,7 +381,7 @@ func DBData(data *data.Data) {
interfaceValues[i] = v interfaceValues[i] = v
} }
id, action, err := database.InsertOrUpdateRecordedCall(db, interfaceValues) id, action, err := database.InsertOrUpdateRecordedCall(ctx, db, interfaceValues, valkeyClient)
if err != nil { if err != nil {
fmt.Printf("Error inserting/updating recorded call: %v\n", err) fmt.Printf("Error inserting/updating recorded call: %v\n", err)
} else { } else {
@@ -380,6 +394,8 @@ func DBData(data *data.Data) {
//fmt.Printf("Action: %s, ID: %d\n", action, id) //fmt.Printf("Action: %s, ID: %d\n", action, id)
} else if action == "update" { } else if action == "update" {
recordedCallUpdateCount++ recordedCallUpdateCount++
} else if action == "none" {
recordedCallNoneCount++
} }
} }
} }
@@ -387,14 +403,16 @@ func DBData(data *data.Data) {
} }
fmt.Printf( fmt.Printf(
"DONE: Inserts: %d, Updates: %d, Total: %d; estimatedCalls = I: %d U: %d; recordedCalls = I: %d U: %d\n", "DONE: Inserts: %d, Updates: %d, Total: %d; estimatedCalls = I: %d U: %d N: %d; recordedCalls = I: %d U: %d N: %d\n",
insertCount, insertCount,
updateCount, updateCount,
totalCount, totalCount,
estimatedCallInsertCount, estimatedCallInsertCount,
estimatedCallUpdateCount, estimatedCallUpdateCount,
estimatedCallNoneCount,
recordedCallInsertCount, recordedCallInsertCount,
recordedCallUpdateCount, recordedCallUpdateCount,
recordedCallNoneCount,
) )
// Create map to hold JSON // Create map to hold JSON
serviceDeliveryJsonObject := make(map[string]interface{}) serviceDeliveryJsonObject := make(map[string]interface{})
@@ -404,8 +422,10 @@ func DBData(data *data.Data) {
serviceDeliveryJsonObject["Updates"] = updateCount serviceDeliveryJsonObject["Updates"] = updateCount
serviceDeliveryJsonObject["EstimatedCallInserts"] = estimatedCallInsertCount serviceDeliveryJsonObject["EstimatedCallInserts"] = estimatedCallInsertCount
serviceDeliveryJsonObject["EstimatedCallUpdates"] = estimatedCallUpdateCount serviceDeliveryJsonObject["EstimatedCallUpdates"] = estimatedCallUpdateCount
serviceDeliveryJsonObject["EstimatedCallNone"] = estimatedCallNoneCount
serviceDeliveryJsonObject["RecordedCallInserts"] = recordedCallInsertCount serviceDeliveryJsonObject["RecordedCallInserts"] = recordedCallInsertCount
serviceDeliveryJsonObject["RecordedCallUpdates"] = recordedCallUpdateCount serviceDeliveryJsonObject["RecordedCallUpdates"] = recordedCallUpdateCount
serviceDeliveryJsonObject["RecordedCallNone"] = recordedCallNoneCount
// Convert JSON object to JSON string // Convert JSON object to JSON string
serviceDeliveryJsonString, err := json.Marshal(serviceDeliveryJsonObject) serviceDeliveryJsonString, err := json.Marshal(serviceDeliveryJsonObject)

7
go.mod
View File

@@ -1,5 +1,10 @@
module ti1 module ti1
go 1.22.1 go 1.23.4
require github.com/lib/pq v1.10.9 require github.com/lib/pq v1.10.9
require (
github.com/valkey-io/valkey-go v1.0.52 // indirect
golang.org/x/sys v0.24.0 // indirect
)

4
go.sum
View File

@@ -1,2 +1,6 @@
github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw= github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw=
github.com/lib/pq v1.10.9/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o= github.com/lib/pq v1.10.9/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o=
github.com/valkey-io/valkey-go v1.0.52 h1:ojrR736satGucqpllYzal8fUrNNROc11V10zokAyIYg=
github.com/valkey-io/valkey-go v1.0.52/go.mod h1:BXlVAPIL9rFQinSFM+N32JfWzfCaUAqBpZkc4vPY6fM=
golang.org/x/sys v0.24.0 h1:Twjiwq9dn6R1fQcyiK+wQyHWfaz/BJB+YIpzU/Cv3Xg=
golang.org/x/sys v0.24.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=

View File

@@ -9,6 +9,7 @@ import (
) )
func main() { func main() {
log.Println("ti1 v0.2.1")
log.Println("Starting...") log.Println("Starting...")
// Setup the database // Setup the database

28
valki/commands.go Normal file
View File

@@ -0,0 +1,28 @@
package valki
import (
"context"
"fmt"
"time"
"github.com/valkey-io/valkey-go"
)
func SetValkeyValue(ctx context.Context, client valkey.Client, key, value string) error {
err := client.Do(ctx, client.B().Set().Key(key).Value(value).Ex(90*time.Minute).Build()).Error()
if err != nil {
return fmt.Errorf("failed to set value in Valkey: %v", err)
}
return nil
}
func GetValkeyValue(ctx context.Context, client valkey.Client, key string) (string, error) {
value, err := client.Do(ctx, client.B().Get().Key(key).Build()).ToString()
if err != nil {
return "hehe", nil
//return "", fmt.Errorf("failed to get value from Valkey: %v", err)
}
return value, nil
}