50 Commits

Author SHA1 Message Date
pigwin-3
497b5a6e86 yep 2025-08-15 16:12:43 +02:00
pigwin-3
8b56bf8370 increase time till deletion to 90 min 2025-06-07 14:18:48 +02:00
pigwin
c45723a5b0 v0.2.1 2025-01-11 21:09:27 +01:00
pigwin
779530f036 v0.2.1
added changes
2025-01-11 21:03:15 +01:00
pigwin
fd43b3e4bb nvm that esitmated via part 2025-01-11 19:51:41 +00:00
pigwin
bed666bd81 use valkey for Estimated Vehicle 2025-01-11 18:30:53 +00:00
pigwin
c09ea5784a woops forgot that part 2025-01-11 17:38:34 +00:00
pigwin
8af34a9ab3 InsertOrUpdateRecordedCall use valkey :) 2025-01-11 17:26:43 +00:00
pigwin
5bdc2acd81 Merge pull request #4 from pigwin-3/main
sync
2025-01-11 17:59:40 +01:00
pigwin
3058d98a67 v0.2.0 2025-01-09 20:23:38 +00:00
pigwin
21b3bba164 v0.2
just for the github action to run
2025-01-09 21:17:25 +01:00
pigwin
c08bcb8432 Merge pull request #3 from pigwin-3/dev
v0.2
2025-01-09 21:14:34 +01:00
pigwin
14af72959e almost release v0.2 2025-01-09 20:13:55 +00:00
pigwin
afd6b12acf chat gpt said this would work and i trust it like 2 % 2025-01-09 19:54:11 +00:00
pigwin
d996411adf inprove reporting???!?!!!!!. 2025-01-09 19:28:10 +00:00
pigwin
083a267b2a WTF hash match not do the match thing lol 2025-01-09 19:06:51 +00:00
pigwin
fe72a507e1 testing 2025-01-09 18:59:46 +00:00
pigwin
5203208fe7 hash validation and database insertion logic 2025-01-09 18:05:17 +00:00
pigwin
2141c5f168 hehe 2025-01-09 17:56:41 +00:00
pigwin
9919c159f2 gotta fix that valkey shit later 2025-01-07 20:39:47 +00:00
pigwin
07838da0ad lolololololollollollol 2025-01-07 20:33:29 +00:00
pigwin
b8a2a5837f hehehehehe mek valke wurke 2025-01-07 20:13:23 +00:00
pigwin
df0b5135bd wtf so mutch pain cus i named a package wrong lol 2025-01-07 19:51:14 +00:00
pigwin
a2c1766dd1 refactor: improve connection string formatting and enhance logging in ConnectToPostgreSQL function 2025-01-07 18:54:49 +00:00
pigwin
97a6506a65 valkey testy 2025-01-07 18:36:27 +00:00
pigwin
ba558803ff add password configuration to Valkey settings 2025-01-07 18:28:47 +00:00
pigwin
c6fc0070cf add Valkey configuration and connection management 2025-01-06 20:47:55 +00:00
pigwin
c456bdecdb hash for valky or reddis or smth else 2025-01-06 15:32:04 +00:00
pigwin
1020dacf79 add connection pool settings in ConnectToPostgreSQL function 2025-01-04 22:07:05 +00:00
pigwin
6c20858280 remove database connection pool settings from ConnectToPostgreSQL function 2025-01-04 21:30:01 +00:00
pigwin
119898acc5 Merge branch 'test' 2025-01-04 21:27:15 +00:00
pigwin
51fb986710 didnt work well 2025-01-04 21:26:09 +00:00
pigwin
3c1b84197a idk i just hope it works 2025-01-04 21:15:45 +00:00
pigwin
75e007603f chat gpt pls dp me help, yess 2025-01-04 20:57:01 +00:00
pigwin
a50ef5b899 i have no idea wtf im doin 2025-01-04 20:53:24 +00:00
pigwin
def3a9c38c add time package import in db.go 2025-01-04 20:30:05 +00:00
pigwin
c1992f7616 configure database connection settings for improved performance 2025-01-04 20:27:01 +00:00
pigwin
a274810818 Update docker-image.yml 2025-01-04 21:24:23 +01:00
pigwin
42b75360c4 test 2025-01-04 20:19:39 +00:00
pigwin
c99f22b131 lets undo that????? 2025-01-04 20:14:51 +00:00
pigwin
7b96214476 goroutines??? trying to make go fast lol 2025-01-04 20:11:30 +00:00
pigwin
c12ec02270 enhance PostgreSQL connection handling with connection pool settings 2025-01-04 17:07:41 +00:00
pigwin
6db5b12d7b update Dockerfile to use Go version 1.23.4 2025-01-04 16:06:19 +00:00
pigwin
8f372b4213 add log package import in data.go 2025-01-04 16:04:13 +00:00
pigwin
ccbf9c8d72 update Go version to 1.23.4 and add logging for data fetching 2025-01-04 16:01:17 +00:00
pigwin
fb4862c078 v0.1.2 2024-12-30 21:49:22 +00:00
pigwin
f3dbd4505f md documentation so i dont forget imedietly 2024-12-30 21:35:19 +00:00
pigwin
2462c6057e log more 2024-12-30 21:16:45 +00:00
pigwin
cda95aa4f2 v0.1.1 2024-12-30 21:03:41 +00:00
pigwin
b7e448df8d v0.1 2024-12-29 19:55:27 +01:00
16 changed files with 600 additions and 121 deletions

View File

@@ -4,6 +4,7 @@ on:
push: push:
branches: branches:
- main - main
- dev
jobs: jobs:
build: build:
@@ -26,26 +27,32 @@ jobs:
id: timestamp id: timestamp
run: echo "TIMESTAMP=$(date +%Y%m%d%H%M%S)" >> $GITHUB_ENV run: echo "TIMESTAMP=$(date +%Y%m%d%H%M%S)" >> $GITHUB_ENV
- name: Get the commit tag - name: Get commit version
id: get-tag id: commit-version
run: echo "GIT_TAG=$(git describe --tags --exact-match 2>/dev/null || echo 'no-tag')" >> $GITHUB_ENV run: |
COMMIT_MSG=$(git log -1 --pretty=%B)
echo "Commit message: $COMMIT_MSG" # Debugging output
# Updated regex to handle both vX.Y, vX.Y.Z, and vX.Y-pre-release formats
if [[ "$COMMIT_MSG" =~ ^v[0-9]+\.[0-9]+(\.[0-9]+)?(-[a-zA-Z0-9._-]+)?$ ]]; then
echo "Version match: $COMMIT_MSG"
echo "VERSION=${COMMIT_MSG}" >> $GITHUB_ENV
else
echo "No version match, defaulting to 'dev'"
echo "VERSION=dev" >> $GITHUB_ENV
fi
- name: Build Docker image - name: Build Docker image
run: | run: |
if [ "${{ env.GIT_TAG }}" == "no-tag" ]; then docker build -t ti1:${{ env.VERSION }} .
docker build -t ti1:dev-${{ env.TIMESTAMP }} .
else
docker build -t ti1:latest -t ti1:${{ env.GIT_TAG }} .
fi
- name: Push Docker image - name: Push Docker image
run: | run: |
if [ "${{ env.GIT_TAG }}" == "no-tag" ]; then # Always push to 'dev' tag
docker tag ti1:dev-${{ env.TIMESTAMP }} ${{ secrets.DOCKER_USERNAME }}/ti1:dev-${{ env.TIMESTAMP }} docker tag ti1:${{ env.VERSION }} ${{ secrets.DOCKER_USERNAME }}/ti1:dev
docker push ${{ secrets.DOCKER_USERNAME }}/ti1:dev-${{ env.TIMESTAMP }} docker push ${{ secrets.DOCKER_USERNAME }}/ti1:dev
else
docker tag ti1:latest ${{ secrets.DOCKER_USERNAME }}/ti1:latest # If the version is valid, also push that specific version tag
docker tag ti1:${{ env.GIT_TAG }} ${{ secrets.DOCKER_USERNAME }}/ti1:${{ env.GIT_TAG }} if [[ "${{ env.VERSION }}" != "dev" ]]; then
docker push ${{ secrets.DOCKER_USERNAME }}/ti1:latest docker tag ti1:${{ env.VERSION }} ${{ secrets.DOCKER_USERNAME }}/ti1:${{ env.VERSION }}
docker push ${{ secrets.DOCKER_USERNAME }}/ti1:${{ env.GIT_TAG }} docker push ${{ secrets.DOCKER_USERNAME }}/ti1:${{ env.VERSION }}
fi fi

View File

@@ -1,5 +1,5 @@
# Use the official Golang image as the base image # Use the official Golang image as the base image
FROM golang:1.22.1 FROM golang:1.23.4
# Set the Current Working Directory inside the container # Set the Current Working Directory inside the container
WORKDIR /app WORKDIR /app

160
README.md
View File

@@ -1,22 +1,164 @@
# TI1 # TI1
The best thing to happen since yesterday at 3 pm The best thing to happen since yesterday at 2:56 pm
## Usage ## Usage
To use this project, you can pull the Docker image from Docker Hub and run it using the following commands: Start with getting Docker then do the following:
### Pull the Docker Image ### Create the setup files
Create a `docker-compose.yml`
```yaml
services:
db:
image: postgres:17.2
container_name: postgres-db
environment:
POSTGRES_USER: postgres
POSTGRES_PASSWORD: RootPassword
POSTGRES_DB: ti1
ports:
- "5432:5432"
volumes:
- ./postgres_data:/var/lib/postgresql/data # Store data in the current directory
- ./init.sql:/docker-entrypoint-initdb.d/init.sql:ro
networks:
- app-network
healthcheck:
test: ["CMD", "pg_isready", "-U", "postgres", "-d", "ti1", "-h", "db"]
interval: 10s
retries: 5
restart: always # Ensure the container always restarts
```sh valkey:
docker pull pigwin1/ti1:latest image: valkey/valkey:latest
container_name: valkey
environment:
VALKEY_PASSWORD: the_valkey_password
ports:
- "6379:6379"
volumes:
- ./valkey_data:/data
networks:
- app-network
restart: always # Ensure the container always restarts
ti1-container:
image: pigwin1/ti1:dev
container_name: ti1-container
environment:
DB_HOST: db
DB_PORT: 5432
DB_USER: ti1
DB_PASSWORD: ti1password
DB_NAME: ti1
DB_SSLMODE: disable
VALKEY_HOST: valkey
VALKEY_PORT: 6379
VALKEY_PASSWORD: the_valkey_password
depends_on:
db:
condition: service_healthy # Wait until the db service is healthy
valkey:
condition: service_started # Wait until the valkey service is started
networks:
- app-network
restart: always # Ensure the container always restarts
networks:
app-network:
driver: bridge
volumes:
postgres_data:
driver: local
valkey_data:
driver: local
``` ```
### Run the Docker Container Create `init.sql`
```sh ```sql
docker run -d --name ti1-container -e DB_HOST=<your_db_host> -e DB_PORT=<your_db_port> -e DB_USER=<your_db_user> -e DB_PASSWORD=<your_db_password> -e DB_NAME=<your_db_name> -e DB_SSLMODE=<your_db_sslmode> pigwin1/ti1:latest -- Check if 'post' user exists; create if not
DO $$
BEGIN
IF NOT EXISTS (SELECT FROM pg_roles WHERE rolname = 'post') THEN
CREATE ROLE post WITH LOGIN PASSWORD 'postpassword';
GRANT ALL PRIVILEGES ON DATABASE ti1 TO post;
ALTER ROLE post WITH SUPERUSER;
END IF;
END
$$;
-- Check if 'ti1' user exists; create if not
DO $$
BEGIN
IF NOT EXISTS (SELECT FROM pg_roles WHERE rolname = 'ti1') THEN
CREATE ROLE ti1 WITH LOGIN PASSWORD 'ti1password';
GRANT ALL PRIVILEGES ON DATABASE ti1 TO ti1;
GRANT ALL PRIVILEGES ON ALL TABLES IN SCHEMA public TO ti1;
GRANT ALL PRIVILEGES ON ALL SEQUENCES IN SCHEMA public TO ti1;
GRANT ALL PRIVILEGES ON ALL FUNCTIONS IN SCHEMA public TO ti1;
-- Grant the ti1 user the necessary permissions on the public schema
GRANT USAGE, CREATE ON SCHEMA public TO ti1;
-- Grant all permissions (SELECT, INSERT, UPDATE, DELETE, etc.) on all existing tables in the public schema
GRANT ALL ON ALL TABLES IN SCHEMA public TO ti1;
-- Grant all permissions on all existing sequences in the public schema
GRANT ALL ON ALL SEQUENCES IN SCHEMA public TO ti1;
-- Grant all permissions on all functions in the public schema
GRANT ALL ON ALL FUNCTIONS IN SCHEMA public TO ti1;
-- Ensure that the ti1 user will have access to new tables, sequences, and functions created in the public schema
ALTER DEFAULT PRIVILEGES IN SCHEMA public GRANT ALL ON TABLES TO ti1;
ALTER DEFAULT PRIVILEGES IN SCHEMA public GRANT ALL ON SEQUENCES TO ti1;
ALTER DEFAULT PRIVILEGES IN SCHEMA public GRANT ALL ON FUNCTIONS TO ti1;
-- Optionally, grant full permissions on the entire database to ti1 (if needed)
-- GRANT ALL PRIVILEGES ON DATABASE ti1 TO ti1;
END IF;
END
$$;
```
Remember to change the password values
### Run the Docker Containers
```sh
docker compose up -d
```
### edit the postgress config (optinal)
open the config file
```sh
nano postgres_data/postgresql.conf
```
Change the following values
```conf
listen_addresses = '*'
max_connections = 100
shared_buffers = 16GB
work_mem = 256MB
maintenance_work_mem = 2GB
dynamic_shared_memory_type = posix
max_wal_size = 1GB
min_wal_size = 80MB
```
set these to what makes most sense for you
These values should also be set bet not necessarily changed
```conf
log_timezone = 'Etc/UTC'
datestyle = 'iso, mdy'
timezone = 'Etc/UTC'
lc_messages = 'en_US.utf8'
lc_monetary = 'en_US.utf8'
lc_numeric = 'en_US.utf8'
lc_time = 'en_US.utf8'
default_text_search_config = 'pg_catalog.english'
``` ```
Replace `<your_db_host>`, `<your_db_port>`, `<your_db_user>`, `<your_db_password>`, `<your_db_name>`, and `<your_db_sslmode>` with your actual database configuration values.
### Docker Hub Repository ### Docker Hub Repository
You can find the Docker image on Docker Hub at the following link: You can find the Docker image on Docker Hub at the following link:

View File

@@ -7,5 +7,12 @@
"dbname": "ti1", "dbname": "ti1",
"sslmode": "disable" "sslmode": "disable"
}, },
"valkey": {
"host": "127.0.0.1",
"port": "6379",
"max_conns": 50,
"timeout_ms": 5000,
"password": "the_valkey_password"
},
"temp": "value" "temp": "value"
} }

View File

@@ -4,6 +4,7 @@ import (
"encoding/json" "encoding/json"
"fmt" "fmt"
"os" "os"
"strconv"
) )
type Config struct { type Config struct {
@@ -15,6 +16,13 @@ type Config struct {
DBName string `json:"dbname"` DBName string `json:"dbname"`
SSLMode string `json:"sslmode"` SSLMode string `json:"sslmode"`
} `json:"database"` } `json:"database"`
Valkey struct {
Host string `json:"host"`
Port string `json:"port"`
MaxConns int `json:"max_conns"`
TimeoutMs int `json:"timeout_ms"`
Password string `json:"password"` // Add this line
} `json:"valkey"`
Temp string `json:"temp"` Temp string `json:"temp"`
} }
@@ -53,7 +61,24 @@ func LoadConfig(file string) (Config, error) {
if temp := os.Getenv("TEMP"); temp != "" { if temp := os.Getenv("TEMP"); temp != "" {
config.Temp = temp config.Temp = temp
} }
//log.Println("Temp value:", config.Temp)
// Override Valkey settings with environment variables
if valkeyHost := os.Getenv("VALKEY_HOST"); valkeyHost != "" {
config.Valkey.Host = valkeyHost
}
if valkeyPort := os.Getenv("VALKEY_PORT"); valkeyPort != "" {
config.Valkey.Port = valkeyPort
}
if maxConns := os.Getenv("VALKEY_MAX_CONNS"); maxConns != "" {
if val, err := strconv.Atoi(maxConns); err == nil {
config.Valkey.MaxConns = val
}
}
if timeoutMs := os.Getenv("VALKEY_TIMEOUT_MS"); timeoutMs != "" {
if val, err := strconv.Atoi(timeoutMs); err == nil {
config.Valkey.TimeoutMs = val
}
}
return config, nil return config, nil
} }

View File

@@ -4,6 +4,7 @@ import (
"database/sql" "database/sql"
"fmt" "fmt"
"log" "log"
"time"
_ "github.com/lib/pq" _ "github.com/lib/pq"
) )
@@ -17,7 +18,7 @@ func ConnectToPostgreSQL() (*sql.DB, error) {
fmt.Println("Configuration loaded successfully!") fmt.Println("Configuration loaded successfully!")
connStr := fmt.Sprintf("host=%s port=%s user=%s password=%s dbname=%s sslmode=%s", connStr := fmt.Sprintf("host=%s port=%s user='%s' password='%s' dbname='%s' sslmode=%s",
config.Database.Host, config.Database.Port, config.Database.User, config.Database.Password, config.Database.DBName, config.Database.SSLMode) config.Database.Host, config.Database.Port, config.Database.User, config.Database.Password, config.Database.DBName, config.Database.SSLMode)
// Open connection to database // Open connection to database
@@ -26,7 +27,12 @@ func ConnectToPostgreSQL() (*sql.DB, error) {
return nil, err return nil, err
} }
fmt.Println("Connection to PostgreSQL opened successfully!") // Set connection pool settings
db.SetMaxOpenConns(25) // Maximum number of open connections to the database
db.SetMaxIdleConns(25) // Maximum number of connections in the idle connection pool
db.SetConnMaxLifetime(1 * time.Hour) // Maximum amount of time a connection may be reused
fmt.Println("Connection to PostgreSQL opened successfully :D")
// Ping database to verify connection // Ping database to verify connection
err = db.Ping() err = db.Ping()

100
config/valkey.go Normal file
View File

@@ -0,0 +1,100 @@
package config
import (
"context"
"encoding/json"
"fmt"
"log"
"os"
"strconv"
"time"
"github.com/valkey-io/valkey-go"
)
type ValkeyConfig struct {
Host string `json:"host"`
Port string `json:"port"`
MaxConns int `json:"max_conns"`
TimeoutMs int `json:"timeout_ms"`
Password string `json:"password"` // Add this line
}
func LoadValkeyConfig(file string) (ValkeyConfig, error) {
var config ValkeyConfig
configFile, err := os.Open(file)
if err != nil {
return config, fmt.Errorf("failed to open config file: %w", err)
}
defer configFile.Close()
if err := json.NewDecoder(configFile).Decode(&config); err != nil {
return config, fmt.Errorf("failed to parse Valkey config: %w", err)
}
// Override with environment variables if set
if host := os.Getenv("VALKEY_HOST"); host != "" {
config.Host = host
}
if port := os.Getenv("VALKEY_PORT"); port != "" {
config.Port = port
}
if maxConns := os.Getenv("VALKEY_MAX_CONNS"); maxConns != "" {
if val, err := strconv.Atoi(maxConns); err == nil {
config.MaxConns = val
}
}
if timeoutMs := os.Getenv("VALKEY_TIMEOUT_MS"); timeoutMs != "" {
if val, err := strconv.Atoi(timeoutMs); err == nil {
config.TimeoutMs = val
}
}
if password := os.Getenv("VALKEY_PASSWORD"); password != "" {
config.Password = password
}
return config, nil
}
func ConnectToValkey(configPath string) (valkey.Client, error) {
fmt.Println("Loading configuration...")
config, err := LoadConfig(configPath)
if err != nil {
return nil, fmt.Errorf("failed to load config: %v", err)
}
fmt.Println("Configuration loaded successfully!")
valkeyConfig := config.Valkey
// Setup Valkey client options
options := valkey.ClientOption{
InitAddress: []string{fmt.Sprintf("%s:%s", valkeyConfig.Host, valkeyConfig.Port)},
Password: valkeyConfig.Password,
// Additional options can be added here if required
}
fmt.Printf("Connecting to Valkey at %s:%s...\n", valkeyConfig.Host, valkeyConfig.Port)
client, err := valkey.NewClient(options)
if err != nil {
return nil, fmt.Errorf("failed to connect to Valkey: %v", err)
}
// Optionally, perform a ping to validate the connection
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*time.Duration(valkeyConfig.TimeoutMs))
defer cancel()
if err := client.Do(ctx, client.B().Ping().Build()).Error(); err != nil {
client.Close()
return nil, fmt.Errorf("failed to ping Valkey: %v", err)
}
log.Println("Connected to Valkey successfully!")
return client, nil
}
func DisconnectFromValkey(client valkey.Client) error {
fmt.Println("Disconnecting from Valkey...")
client.Close()
log.Println("Disconnected from Valkey successfully!")
return nil
}

View File

@@ -1,6 +1,7 @@
package data package data
import ( import (
"log"
"encoding/xml" "encoding/xml"
"net/http" "net/http"
) )
@@ -125,10 +126,13 @@ type Data struct {
} `xml:"ServiceDelivery"` } `xml:"ServiceDelivery"`
} }
func FetchData() (*Data, error) { func FetchData(timestamp string) (*Data, error) {
client := &http.Client{} client := &http.Client{}
requestorId := "ti1-" + timestamp
resp, err := client.Get("https://api.entur.io/realtime/v1/rest/et?useOriginalId=true&maxSize=100000") url := "https://api.entur.io/realtime/v1/rest/et?useOriginalId=true&maxSize=100000&requestorId=" + requestorId
log.Println("Fetching data from URL:", url)
resp, err := client.Get(url)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@@ -1,11 +1,17 @@
package database package database
import ( import (
"context"
"crypto/md5"
"database/sql" "database/sql"
"encoding/hex"
"fmt" "fmt"
"ti1/valki"
"github.com/valkey-io/valkey-go"
) )
func InsertOrUpdateEstimatedCall(db *sql.DB, values []interface{}) (int, string, error) { func InsertOrUpdateEstimatedCall(ctx context.Context, db *sql.DB, values []interface{}, valkeyClient valkey.Client) (int, string, error) {
// Replace empty strings with nil for timestamp fields // Replace empty strings with nil for timestamp fields
for i, v := range values { for i, v := range values {
if str, ok := v.(string); ok && str == "" { if str, ok := v.(string); ok && str == "" {
@@ -13,6 +19,32 @@ func InsertOrUpdateEstimatedCall(db *sql.DB, values []interface{}) (int, string,
} }
} }
// Convert values to a single string and hash it using MD5
var valuesString string
for _, v := range values {
if v != nil {
valuesString += fmt.Sprintf("%v", v)
}
}
hash := md5.Sum([]byte(valuesString))
hashString := hex.EncodeToString(hash[:])
//fmt.Println("HashString:", hashString)
estimatedVehicleJourneyID := values[0]
orderID := values[1]
key := fmt.Sprintf("%v.%v", estimatedVehicleJourneyID, orderID)
//fmt.Printf("Estimated Vehicle Journey ID: %v, Order ID: %v\n", estimatedVehicleJourneyID, orderID)
var err error
// Get the MD5 hash from Valkey
retrievedHash, err := valki.GetValkeyValue(ctx, valkeyClient, key)
if err != nil {
return 0, "", fmt.Errorf("failed to get value from Valkey: %v", err)
}
// Check if the retrieved value matches the original MD5 hash
if retrievedHash != hashString {
query := ` query := `
INSERT INTO calls ( INSERT INTO calls (
estimatedvehiclejourney, "order", stoppointref, estimatedvehiclejourney, "order", stoppointref,
@@ -38,18 +70,20 @@ func InsertOrUpdateEstimatedCall(db *sql.DB, values []interface{}) (int, string,
} }
defer stmt.Close() defer stmt.Close()
err = valki.SetValkeyValue(ctx, valkeyClient, key, hashString)
if err != nil {
return 0, "", fmt.Errorf("failed to set value in Valkey: %v", err)
}
var action string var action string
var id int var id int
err = stmt.QueryRow(values...).Scan(&action, &id) err = stmt.QueryRow(values...).Scan(&action, &id)
if err != nil { if err != nil {
if 1 == 0 {
fmt.Println("Executing query:", query)
for i, v := range values {
fmt.Printf("Value %d: (%v)\n", i+1, v)
}
}
return 0, "", fmt.Errorf("error executing statement: %v", err) return 0, "", fmt.Errorf("error executing statement: %v", err)
} }
return id, action, nil return id, action, nil
} else {
//fmt.Printf("MATCH!!! Original Hash: %s, Retrieved Hash: %s\n", hashString, retrievedHash)
return 0, "none", nil
}
} }

View File

@@ -1,11 +1,17 @@
package database package database
import ( import (
"context"
"crypto/md5"
"database/sql" "database/sql"
"encoding/hex"
"fmt" "fmt"
"ti1/valki"
"github.com/valkey-io/valkey-go"
) )
func InsertOrUpdateRecordedCall(db *sql.DB, values []interface{}) (int, string, error) { func InsertOrUpdateRecordedCall(ctx context.Context, db *sql.DB, values []interface{}, valkeyClient valkey.Client) (int, string, error) {
// Replace empty strings with nil for timestamp fields // Replace empty strings with nil for timestamp fields
for i, v := range values { for i, v := range values {
if str, ok := v.(string); ok && str == "" { if str, ok := v.(string); ok && str == "" {
@@ -13,6 +19,30 @@ func InsertOrUpdateRecordedCall(db *sql.DB, values []interface{}) (int, string,
} }
} }
// Convert values to a single string and hash it using MD5
var valuesString string
for _, v := range values {
if v != nil {
valuesString += fmt.Sprintf("%v", v)
}
}
hash := md5.Sum([]byte(valuesString))
hashString := hex.EncodeToString(hash[:])
estimatedVehicleJourneyID := values[0]
orderID := values[1]
key := fmt.Sprintf("%v.%v", estimatedVehicleJourneyID, orderID)
var err error
// Get the MD5 hash from Valkey
retrievedHash, err := valki.GetValkeyValue(ctx, valkeyClient, key)
if err != nil {
return 0, "", fmt.Errorf("failed to get value from Valkey: %v", err)
}
// Check if the retrieved value matches the original MD5 hash
if retrievedHash != hashString {
query := ` query := `
INSERT INTO calls ( INSERT INTO calls (
estimatedvehiclejourney, "order", stoppointref, estimatedvehiclejourney, "order", stoppointref,
@@ -41,18 +71,19 @@ func InsertOrUpdateRecordedCall(db *sql.DB, values []interface{}) (int, string,
} }
defer stmt.Close() defer stmt.Close()
err = valki.SetValkeyValue(ctx, valkeyClient, key, hashString)
if err != nil {
return 0, "", fmt.Errorf("failed to set value in Valkey: %v", err)
}
var action string var action string
var id int var id int
err = stmt.QueryRow(values...).Scan(&action, &id) err = stmt.QueryRow(values...).Scan(&action, &id)
if err != nil { if err != nil {
if 1 == 0 {
fmt.Println("Executing query:", query)
for i, v := range values {
fmt.Printf("Value %d: (%v)\n", i+1, v)
}
}
return 0, "", fmt.Errorf("error executing statement: %v", err) return 0, "", fmt.Errorf("error executing statement: %v", err)
} }
return id, action, nil return id, action, nil
} else {
return 0, "none", nil
}
} }

61
docker-compose.yaml Normal file
View File

@@ -0,0 +1,61 @@
services:
db:
image: postgres:17.2
container_name: postgres-db
environment:
POSTGRES_USER: postgres
POSTGRES_PASSWORD: RootPassword
POSTGRES_DB: ti1
ports:
- "5432:5432"
volumes:
- /tmp/ti1/postgres_data:/var/lib/postgresql/data:z
- /tmp/ti1/init.sql:/docker-entrypoint-initdb.d/init.sql:ro,z
networks:
- app-network
healthcheck:
test: ["CMD", "pg_isready", "-U", "postgres", "-d", "ti1", "-h", "db"]
interval: 10s
retries: 5
restart: always
valkey:
image: valkey/valkey:latest
container_name: valkey
environment:
VALKEY_PASSWORD: the_valkey_password
ports:
- "6379:6379"
volumes:
- /tmp/ti1/valkey_data:/data:z
networks:
- app-network
restart: always
ti1-container:
build:
context: .
dockerfile: Dockerfile
container_name: ti1-container
environment:
DB_HOST: db
DB_PORT: 5432
DB_USER: postgres
DB_PASSWORD: RootPassword
DB_NAME: ti1
DB_SSLMODE: disable
VALKEY_HOST: valkey
VALKEY_PORT: 6379
VALKEY_PASSWORD: the_valkey_password
depends_on:
db:
condition: service_healthy
valkey:
condition: service_started
networks:
- app-network
restart: always
networks:
app-network:
driver: bridge

View File

@@ -1,6 +1,7 @@
package export package export
import ( import (
"context"
"encoding/json" "encoding/json"
"fmt" "fmt"
"log" "log"
@@ -20,6 +21,15 @@ func DBData(data *data.Data) {
} }
defer db.Close() defer db.Close()
// Connect to Valkey
valkeyClient, err := config.ConnectToValkey("config/conf.json")
if err != nil {
log.Fatalf("Failed to connect to Valkey: %v", err)
}
defer config.DisconnectFromValkey(valkeyClient)
ctx := context.Background()
// Get service id aka sid // Get service id aka sid
sid, err := database.InsertServiceDelivery(db, data.ServiceDelivery.ResponseTimestamp, data.ServiceDelivery.EstimatedTimetableDelivery[0].EstimatedJourneyVersionFrame.RecordedAtTime) sid, err := database.InsertServiceDelivery(db, data.ServiceDelivery.ResponseTimestamp, data.ServiceDelivery.EstimatedTimetableDelivery[0].EstimatedJourneyVersionFrame.RecordedAtTime)
if err != nil { if err != nil {
@@ -28,7 +38,7 @@ func DBData(data *data.Data) {
fmt.Println("SID:", sid) fmt.Println("SID:", sid)
// counters // counters
var insertCount, updateCount, totalCount, estimatedCallInsertCount, estimatedCallUpdateCount, recordedCallInsertCount, recordedCallUpdateCount int var insertCount, updateCount, totalCount, estimatedCallInsertCount, estimatedCallUpdateCount, estimatedCallNoneCount, recordedCallInsertCount, recordedCallUpdateCount, recordedCallNoneCount int
for _, journey := range data.ServiceDelivery.EstimatedTimetableDelivery[0].EstimatedJourneyVersionFrame.EstimatedVehicleJourney { for _, journey := range data.ServiceDelivery.EstimatedTimetableDelivery[0].EstimatedJourneyVersionFrame.EstimatedVehicleJourney {
var values []interface{} var values []interface{}
@@ -159,14 +169,16 @@ func DBData(data *data.Data) {
//fmt.Printf("Inserts: %d, Updates: %d, Total: %d\n", insertCount, updateCount, totalCount) //fmt.Printf("Inserts: %d, Updates: %d, Total: %d\n", insertCount, updateCount, totalCount)
if totalCount%1000 == 0 { if totalCount%1000 == 0 {
fmt.Printf( fmt.Printf(
"Inserts: %d, Updates: %d, Total: %d; estimatedCalls = I: %d U: %d; recordedCalls = I: %d U: %d\n", "Inserts: %d, Updates: %d, Total: %d; estimatedCalls = I: %d U: %d N: %d; recordedCalls = I: %d U: %d N: %d\n",
insertCount, insertCount,
updateCount, updateCount,
totalCount, totalCount,
estimatedCallInsertCount, estimatedCallInsertCount,
estimatedCallUpdateCount, estimatedCallUpdateCount,
estimatedCallNoneCount,
recordedCallInsertCount, recordedCallInsertCount,
recordedCallUpdateCount, recordedCallUpdateCount,
recordedCallNoneCount,
) )
} }
} }
@@ -291,9 +303,9 @@ func DBData(data *data.Data) {
for i, v := range stringValues { for i, v := range stringValues {
interfaceValues[i] = v interfaceValues[i] = v
} }
id, action, err := database.InsertOrUpdateEstimatedCall(db, interfaceValues) id, action, err := database.InsertOrUpdateEstimatedCall(ctx, db, interfaceValues, valkeyClient)
if err != nil { if err != nil {
fmt.Printf("Error inserting/updating estimated call: %v\n", err) log.Fatalf("Failed to insert or update estimated call: %v", err)
} else { } else {
if 1 == 0 { if 1 == 0 {
fmt.Printf("Action: %s, ID: %d\n", action, id) fmt.Printf("Action: %s, ID: %d\n", action, id)
@@ -303,6 +315,8 @@ func DBData(data *data.Data) {
estimatedCallInsertCount++ estimatedCallInsertCount++
} else if action == "update" { } else if action == "update" {
estimatedCallUpdateCount++ estimatedCallUpdateCount++
} else if action == "none" {
estimatedCallNoneCount++
} }
} }
} }
@@ -367,7 +381,7 @@ func DBData(data *data.Data) {
interfaceValues[i] = v interfaceValues[i] = v
} }
id, action, err := database.InsertOrUpdateRecordedCall(db, interfaceValues) id, action, err := database.InsertOrUpdateRecordedCall(ctx, db, interfaceValues, valkeyClient)
if err != nil { if err != nil {
fmt.Printf("Error inserting/updating recorded call: %v\n", err) fmt.Printf("Error inserting/updating recorded call: %v\n", err)
} else { } else {
@@ -380,6 +394,8 @@ func DBData(data *data.Data) {
//fmt.Printf("Action: %s, ID: %d\n", action, id) //fmt.Printf("Action: %s, ID: %d\n", action, id)
} else if action == "update" { } else if action == "update" {
recordedCallUpdateCount++ recordedCallUpdateCount++
} else if action == "none" {
recordedCallNoneCount++
} }
} }
} }
@@ -387,14 +403,16 @@ func DBData(data *data.Data) {
} }
fmt.Printf( fmt.Printf(
"DONE: Inserts: %d, Updates: %d, Total: %d; estimatedCalls = I: %d U: %d; recordedCalls = I: %d U: %d\n", "DONE: Inserts: %d, Updates: %d, Total: %d; estimatedCalls = I: %d U: %d N: %d; recordedCalls = I: %d U: %d N: %d\n",
insertCount, insertCount,
updateCount, updateCount,
totalCount, totalCount,
estimatedCallInsertCount, estimatedCallInsertCount,
estimatedCallUpdateCount, estimatedCallUpdateCount,
estimatedCallNoneCount,
recordedCallInsertCount, recordedCallInsertCount,
recordedCallUpdateCount, recordedCallUpdateCount,
recordedCallNoneCount,
) )
// Create map to hold JSON // Create map to hold JSON
serviceDeliveryJsonObject := make(map[string]interface{}) serviceDeliveryJsonObject := make(map[string]interface{})
@@ -404,8 +422,10 @@ func DBData(data *data.Data) {
serviceDeliveryJsonObject["Updates"] = updateCount serviceDeliveryJsonObject["Updates"] = updateCount
serviceDeliveryJsonObject["EstimatedCallInserts"] = estimatedCallInsertCount serviceDeliveryJsonObject["EstimatedCallInserts"] = estimatedCallInsertCount
serviceDeliveryJsonObject["EstimatedCallUpdates"] = estimatedCallUpdateCount serviceDeliveryJsonObject["EstimatedCallUpdates"] = estimatedCallUpdateCount
serviceDeliveryJsonObject["EstimatedCallNone"] = estimatedCallNoneCount
serviceDeliveryJsonObject["RecordedCallInserts"] = recordedCallInsertCount serviceDeliveryJsonObject["RecordedCallInserts"] = recordedCallInsertCount
serviceDeliveryJsonObject["RecordedCallUpdates"] = recordedCallUpdateCount serviceDeliveryJsonObject["RecordedCallUpdates"] = recordedCallUpdateCount
serviceDeliveryJsonObject["RecordedCallNone"] = recordedCallNoneCount
// Convert JSON object to JSON string // Convert JSON object to JSON string
serviceDeliveryJsonString, err := json.Marshal(serviceDeliveryJsonObject) serviceDeliveryJsonString, err := json.Marshal(serviceDeliveryJsonObject)

7
go.mod
View File

@@ -1,5 +1,10 @@
module ti1 module ti1
go 1.22.1 go 1.23.4
require github.com/lib/pq v1.10.9 require github.com/lib/pq v1.10.9
require (
github.com/valkey-io/valkey-go v1.0.52 // indirect
golang.org/x/sys v0.24.0 // indirect
)

4
go.sum
View File

@@ -1,2 +1,6 @@
github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw= github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw=
github.com/lib/pq v1.10.9/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o= github.com/lib/pq v1.10.9/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o=
github.com/valkey-io/valkey-go v1.0.52 h1:ojrR736satGucqpllYzal8fUrNNROc11V10zokAyIYg=
github.com/valkey-io/valkey-go v1.0.52/go.mod h1:BXlVAPIL9rFQinSFM+N32JfWzfCaUAqBpZkc4vPY6fM=
golang.org/x/sys v0.24.0 h1:Twjiwq9dn6R1fQcyiK+wQyHWfaz/BJB+YIpzU/Cv3Xg=
golang.org/x/sys v0.24.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=

View File

@@ -9,6 +9,7 @@ import (
) )
func main() { func main() {
log.Println("ti1 v0.2.1")
log.Println("Starting...") log.Println("Starting...")
// Setup the database // Setup the database
@@ -17,10 +18,14 @@ func main() {
log.Fatalf("Database setup failed: %v", err) log.Fatalf("Database setup failed: %v", err)
} }
// Get the current timestamp
starttimestamp := time.Now().Format("20060102T150405")
log.Printf("Starting timestamp: %s", starttimestamp)
for { for {
start := time.Now() start := time.Now()
data, err := data.FetchData() data, err := data.FetchData(starttimestamp)
if err != nil { if err != nil {
log.Fatal(err) log.Fatal(err)
} }
@@ -31,7 +36,7 @@ func main() {
elapsed := time.Since(start) elapsed := time.Since(start)
if elapsed < 5*time.Minute { if elapsed < 5*time.Minute {
log.Printf("starting again in %v", 5*time.Minute-elapsed) log.Printf("starting again in %v", 5*time.Minute-elapsed)
time.Sleep(1*time.Minute - elapsed) time.Sleep(5*time.Minute - elapsed)
} }
} }
} }

28
valki/commands.go Normal file
View File

@@ -0,0 +1,28 @@
package valki
import (
"context"
"fmt"
"time"
"github.com/valkey-io/valkey-go"
)
func SetValkeyValue(ctx context.Context, client valkey.Client, key, value string) error {
err := client.Do(ctx, client.B().Set().Key(key).Value(value).Ex(90*time.Minute).Build()).Error()
if err != nil {
return fmt.Errorf("failed to set value in Valkey: %v", err)
}
return nil
}
func GetValkeyValue(ctx context.Context, client valkey.Client, key string) (string, error) {
value, err := client.Do(ctx, client.B().Get().Key(key).Build()).ToString()
if err != nil {
return "hehe", nil
//return "", fmt.Errorf("failed to get value from Valkey: %v", err)
}
return value, nil
}