59 Commits
v0.1.2 ... v1.0

Author SHA1 Message Date
pigwin
228a971257 Merge remote-tracking branch 'origin/main' into v1.0 2025-12-23 21:49:57 +00:00
pigwin
0c9c3653f4 v1.0.2 2025-12-23 21:49:39 +00:00
pigwin
4c72bb1d4c v1.0.2
v1.0.2 
Stor V ikke funke lel
2025-12-23 22:45:00 +01:00
pigwin
ea36b60233 v1.0.2 2025-12-23 21:44:06 +00:00
pigwin
201811e2d3 V1.0.2
V1.0.2 merge for da real timeing :)
2025-12-23 21:44:31 +01:00
pigwin
bcdc24e47f oppdatere print log 2025-12-23 20:42:13 +00:00
pigwin
4bb49e944f start end cool 2025-12-23 20:41:45 +00:00
pigwin
fbb20d576b v1.0.1 2025-12-23 17:14:43 +00:00
pigwin
823701c698 v1.0.0
V1.0
2025-12-23 18:06:08 +01:00
pigwin
00e6667bc0 v1.0.0 2025-12-23 16:44:24 +00:00
pigwin
9b433cdd57 dont diez plz 2025-12-23 14:40:53 +00:00
pigwin
c106267d76 retry stuffs?
plz workz
2025-12-23 14:34:25 +00:00
pigwin
b95fbb477d v num 2025-12-23 14:11:30 +00:00
pigwin
7424600a8b worth a shot 2025-12-23 13:24:20 +00:00
pigwin
5171dcf4f6 old folder
tmp
2025-12-23 13:03:56 +00:00
pigwin
0324912eb4 tstttttt 2025-12-23 12:46:30 +00:00
pigwin
c45723a5b0 v0.2.1 2025-01-11 21:09:27 +01:00
pigwin
779530f036 v0.2.1
added changes
2025-01-11 21:03:15 +01:00
pigwin
fd43b3e4bb nvm that esitmated via part 2025-01-11 19:51:41 +00:00
pigwin
bed666bd81 use valkey for Estimated Vehicle 2025-01-11 18:30:53 +00:00
pigwin
c09ea5784a woops forgot that part 2025-01-11 17:38:34 +00:00
pigwin
8af34a9ab3 InsertOrUpdateRecordedCall use valkey :) 2025-01-11 17:26:43 +00:00
pigwin
5bdc2acd81 Merge pull request #4 from pigwin-3/main
sync
2025-01-11 17:59:40 +01:00
pigwin
3058d98a67 v0.2.0 2025-01-09 20:23:38 +00:00
pigwin
21b3bba164 v0.2
just for the github action to run
2025-01-09 21:17:25 +01:00
pigwin
c08bcb8432 Merge pull request #3 from pigwin-3/dev
v0.2
2025-01-09 21:14:34 +01:00
pigwin
14af72959e almost release v0.2 2025-01-09 20:13:55 +00:00
pigwin
afd6b12acf chat gpt said this would work and i trust it like 2 % 2025-01-09 19:54:11 +00:00
pigwin
d996411adf inprove reporting???!?!!!!!. 2025-01-09 19:28:10 +00:00
pigwin
083a267b2a WTF hash match not do the match thing lol 2025-01-09 19:06:51 +00:00
pigwin
fe72a507e1 testing 2025-01-09 18:59:46 +00:00
pigwin
5203208fe7 hash validation and database insertion logic 2025-01-09 18:05:17 +00:00
pigwin
2141c5f168 hehe 2025-01-09 17:56:41 +00:00
pigwin
9919c159f2 gotta fix that valkey shit later 2025-01-07 20:39:47 +00:00
pigwin
07838da0ad lolololololollollollol 2025-01-07 20:33:29 +00:00
pigwin
b8a2a5837f hehehehehe mek valke wurke 2025-01-07 20:13:23 +00:00
pigwin
df0b5135bd wtf so mutch pain cus i named a package wrong lol 2025-01-07 19:51:14 +00:00
pigwin
a2c1766dd1 refactor: improve connection string formatting and enhance logging in ConnectToPostgreSQL function 2025-01-07 18:54:49 +00:00
pigwin
97a6506a65 valkey testy 2025-01-07 18:36:27 +00:00
pigwin
ba558803ff add password configuration to Valkey settings 2025-01-07 18:28:47 +00:00
pigwin
c6fc0070cf add Valkey configuration and connection management 2025-01-06 20:47:55 +00:00
pigwin
c456bdecdb hash for valky or reddis or smth else 2025-01-06 15:32:04 +00:00
pigwin
1020dacf79 add connection pool settings in ConnectToPostgreSQL function 2025-01-04 22:07:05 +00:00
pigwin
6c20858280 remove database connection pool settings from ConnectToPostgreSQL function 2025-01-04 21:30:01 +00:00
pigwin
119898acc5 Merge branch 'test' 2025-01-04 21:27:15 +00:00
pigwin
51fb986710 didnt work well 2025-01-04 21:26:09 +00:00
pigwin
3c1b84197a idk i just hope it works 2025-01-04 21:15:45 +00:00
pigwin
75e007603f chat gpt pls dp me help, yess 2025-01-04 20:57:01 +00:00
pigwin
a50ef5b899 i have no idea wtf im doin 2025-01-04 20:53:24 +00:00
pigwin
def3a9c38c add time package import in db.go 2025-01-04 20:30:05 +00:00
pigwin
c1992f7616 configure database connection settings for improved performance 2025-01-04 20:27:01 +00:00
pigwin
a274810818 Update docker-image.yml 2025-01-04 21:24:23 +01:00
pigwin
42b75360c4 test 2025-01-04 20:19:39 +00:00
pigwin
c99f22b131 lets undo that????? 2025-01-04 20:14:51 +00:00
pigwin
7b96214476 goroutines??? trying to make go fast lol 2025-01-04 20:11:30 +00:00
pigwin
c12ec02270 enhance PostgreSQL connection handling with connection pool settings 2025-01-04 17:07:41 +00:00
pigwin
6db5b12d7b update Dockerfile to use Go version 1.23.4 2025-01-04 16:06:19 +00:00
pigwin
8f372b4213 add log package import in data.go 2025-01-04 16:04:13 +00:00
pigwin
ccbf9c8d72 update Go version to 1.23.4 and add logging for data fetching 2025-01-04 16:01:17 +00:00
22 changed files with 1465 additions and 513 deletions

View File

@@ -4,6 +4,7 @@ on:
push:
branches:
- main
- dev
jobs:
build:
@@ -29,7 +30,7 @@ jobs:
- name: Get commit version
id: commit-version
run: |
COMMIT_MSG=$(git log -1 --pretty=%B)
COMMIT_MSG=$(git log -1 --pretty=%B | head -n1 | xargs)
echo "Commit message: $COMMIT_MSG" # Debugging output
# Updated regex to handle both vX.Y, vX.Y.Z, and vX.Y-pre-release formats
if [[ "$COMMIT_MSG" =~ ^v[0-9]+\.[0-9]+(\.[0-9]+)?(-[a-zA-Z0-9._-]+)?$ ]]; then

View File

@@ -1,5 +1,5 @@
# Use the official Golang image as the base image
FROM golang:1.22.1
FROM golang:1.23.4
# Set the Current Working Directory inside the container
WORKDIR /app

View File

@@ -1,6 +1,6 @@
# TI1
The best thing to happen since yesterday at 3 pm
The best thing to happen since yesterday at 2:56 pm
## Usage
@@ -15,7 +15,7 @@ services:
container_name: postgres-db
environment:
POSTGRES_USER: postgres
POSTGRES_PASSWORD: Root Password
POSTGRES_PASSWORD: RootPassword
POSTGRES_DB: ti1
ports:
- "5432:5432"
@@ -30,19 +30,37 @@ services:
retries: 5
restart: always # Ensure the container always restarts
valkey:
image: valkey/valkey:latest
container_name: valkey
environment:
VALKEY_PASSWORD: the_valkey_password
ports:
- "6379:6379"
volumes:
- ./valkey_data:/data
networks:
- app-network
restart: always # Ensure the container always restarts
ti1-container:
image: pigwin1/ti1:v0.1.1
image: pigwin1/ti1:dev
container_name: ti1-container
environment:
DB_HOST: db
DB_PORT: 5432
DB_USER: ti1
DB_PASSWORD: ti1 password
DB_PASSWORD: ti1password
DB_NAME: ti1
DB_SSLMODE: disable
VALKEY_HOST: valkey
VALKEY_PORT: 6379
VALKEY_PASSWORD: the_valkey_password
depends_on:
db:
condition: service_healthy # Wait until the db service is healthy
valkey:
condition: service_started # Wait until the valkey service is started
networks:
- app-network
restart: always # Ensure the container always restarts
@@ -54,6 +72,8 @@ networks:
volumes:
postgres_data:
driver: local
valkey_data:
driver: local
```
Create `init.sql`
@@ -62,7 +82,7 @@ Create `init.sql`
DO $$
BEGIN
IF NOT EXISTS (SELECT FROM pg_roles WHERE rolname = 'post') THEN
CREATE ROLE post WITH LOGIN PASSWORD 'post password';
CREATE ROLE post WITH LOGIN PASSWORD 'postpassword';
GRANT ALL PRIVILEGES ON DATABASE ti1 TO post;
ALTER ROLE post WITH SUPERUSER;
END IF;
@@ -73,7 +93,7 @@ $$;
DO $$
BEGIN
IF NOT EXISTS (SELECT FROM pg_roles WHERE rolname = 'ti1') THEN
CREATE ROLE ti1 WITH LOGIN PASSWORD 'ti1 password';
CREATE ROLE ti1 WITH LOGIN PASSWORD 'ti1password';
GRANT ALL PRIVILEGES ON DATABASE ti1 TO ti1;
GRANT ALL PRIVILEGES ON ALL TABLES IN SCHEMA public TO ti1;
GRANT ALL PRIVILEGES ON ALL SEQUENCES IN SCHEMA public TO ti1;
@@ -118,7 +138,7 @@ nano postgres_data/postgresql.conf
Change the following values
```conf
listen_addresses = '*'
max_connections = 100
max_connections = 200
shared_buffers = 16GB
work_mem = 256MB
maintenance_work_mem = 2GB

View File

@@ -7,5 +7,12 @@
"dbname": "ti1",
"sslmode": "disable"
},
"valkey": {
"host": "127.0.0.1",
"port": "6379",
"max_conns": 100,
"timeout_ms": 2000,
"password": "the_valkey_password"
},
"temp": "value"
}

View File

@@ -4,6 +4,7 @@ import (
"encoding/json"
"fmt"
"os"
"strconv"
)
type Config struct {
@@ -15,6 +16,13 @@ type Config struct {
DBName string `json:"dbname"`
SSLMode string `json:"sslmode"`
} `json:"database"`
Valkey struct {
Host string `json:"host"`
Port string `json:"port"`
MaxConns int `json:"max_conns"`
TimeoutMs int `json:"timeout_ms"`
Password string `json:"password"` // Add this line
} `json:"valkey"`
Temp string `json:"temp"`
}
@@ -53,7 +61,24 @@ func LoadConfig(file string) (Config, error) {
if temp := os.Getenv("TEMP"); temp != "" {
config.Temp = temp
}
//log.Println("Temp value:", config.Temp)
// Override Valkey settings with environment variables
if valkeyHost := os.Getenv("VALKEY_HOST"); valkeyHost != "" {
config.Valkey.Host = valkeyHost
}
if valkeyPort := os.Getenv("VALKEY_PORT"); valkeyPort != "" {
config.Valkey.Port = valkeyPort
}
if maxConns := os.Getenv("VALKEY_MAX_CONNS"); maxConns != "" {
if val, err := strconv.Atoi(maxConns); err == nil {
config.Valkey.MaxConns = val
}
}
if timeoutMs := os.Getenv("VALKEY_TIMEOUT_MS"); timeoutMs != "" {
if val, err := strconv.Atoi(timeoutMs); err == nil {
config.Valkey.TimeoutMs = val
}
}
return config, nil
}

View File

@@ -4,6 +4,7 @@ import (
"database/sql"
"fmt"
"log"
"time"
_ "github.com/lib/pq"
)
@@ -17,7 +18,7 @@ func ConnectToPostgreSQL() (*sql.DB, error) {
fmt.Println("Configuration loaded successfully!")
connStr := fmt.Sprintf("host=%s port=%s user=%s password=%s dbname=%s sslmode=%s",
connStr := fmt.Sprintf("host=%s port=%s user='%s' password='%s' dbname='%s' sslmode=%s",
config.Database.Host, config.Database.Port, config.Database.User, config.Database.Password, config.Database.DBName, config.Database.SSLMode)
// Open connection to database
@@ -26,7 +27,13 @@ func ConnectToPostgreSQL() (*sql.DB, error) {
return nil, err
}
fmt.Println("Connection to PostgreSQL opened successfully!")
// Set connection pool settings for high concurrency
db.SetMaxOpenConns(50) // Maximum number of open connections to the database
db.SetMaxIdleConns(25) // Maximum number of connections in the idle connection pool
db.SetConnMaxLifetime(1 * time.Hour) // Maximum amount of time a connection may be reused
db.SetConnMaxIdleTime(5 * time.Minute) // Maximum amount of time a connection may be idle
fmt.Println("Connection to PostgreSQL opened successfully :D")
// Ping database to verify connection
err = db.Ping()

100
config/valkey.go Normal file
View File

@@ -0,0 +1,100 @@
package config
import (
"context"
"encoding/json"
"fmt"
"log"
"os"
"strconv"
"time"
"github.com/valkey-io/valkey-go"
)
type ValkeyConfig struct {
Host string `json:"host"`
Port string `json:"port"`
MaxConns int `json:"max_conns"`
TimeoutMs int `json:"timeout_ms"`
Password string `json:"password"` // Add this line
}
func LoadValkeyConfig(file string) (ValkeyConfig, error) {
var config ValkeyConfig
configFile, err := os.Open(file)
if err != nil {
return config, fmt.Errorf("failed to open config file: %w", err)
}
defer configFile.Close()
if err := json.NewDecoder(configFile).Decode(&config); err != nil {
return config, fmt.Errorf("failed to parse Valkey config: %w", err)
}
// Override with environment variables if set
if host := os.Getenv("VALKEY_HOST"); host != "" {
config.Host = host
}
if port := os.Getenv("VALKEY_PORT"); port != "" {
config.Port = port
}
if maxConns := os.Getenv("VALKEY_MAX_CONNS"); maxConns != "" {
if val, err := strconv.Atoi(maxConns); err == nil {
config.MaxConns = val
}
}
if timeoutMs := os.Getenv("VALKEY_TIMEOUT_MS"); timeoutMs != "" {
if val, err := strconv.Atoi(timeoutMs); err == nil {
config.TimeoutMs = val
}
}
if password := os.Getenv("VALKEY_PASSWORD"); password != "" {
config.Password = password
}
return config, nil
}
func ConnectToValkey(configPath string) (valkey.Client, error) {
fmt.Println("Loading configuration...")
config, err := LoadConfig(configPath)
if err != nil {
return nil, fmt.Errorf("failed to load config: %v", err)
}
fmt.Println("Configuration loaded successfully!")
valkeyConfig := config.Valkey
// Setup Valkey client options
options := valkey.ClientOption{
InitAddress: []string{fmt.Sprintf("%s:%s", valkeyConfig.Host, valkeyConfig.Port)},
Password: valkeyConfig.Password,
// Additional options can be added here if required
}
fmt.Printf("Connecting to Valkey at %s:%s...\n", valkeyConfig.Host, valkeyConfig.Port)
client, err := valkey.NewClient(options)
if err != nil {
return nil, fmt.Errorf("failed to connect to Valkey: %v", err)
}
// Optionally, perform a ping to validate the connection
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*time.Duration(valkeyConfig.TimeoutMs))
defer cancel()
if err := client.Do(ctx, client.B().Ping().Build()).Error(); err != nil {
client.Close()
return nil, fmt.Errorf("failed to ping Valkey: %v", err)
}
log.Println("Connected to Valkey successfully!")
return client, nil
}
func DisconnectFromValkey(client valkey.Client) error {
fmt.Println("Disconnecting from Valkey...")
client.Close()
log.Println("Disconnected from Valkey successfully!")
return nil
}

View File

@@ -1,8 +1,12 @@
package data
import (
"crypto/tls"
"encoding/xml"
"fmt"
"log"
"net/http"
"time"
)
type Data struct {
@@ -126,22 +130,86 @@ type Data struct {
}
func FetchData(timestamp string) (*Data, error) {
client := &http.Client{}
// Configure HTTP client with timeout and HTTP/1.1 to avoid HTTP/2 stream errors
transport := &http.Transport{
TLSClientConfig: &tls.Config{
MinVersion: tls.VersionTLS12,
},
MaxIdleConns: 10,
MaxIdleConnsPerHost: 10,
IdleConnTimeout: 90 * time.Second,
DisableCompression: false,
ForceAttemptHTTP2: false, // Disable HTTP/2 to avoid stream errors
TLSHandshakeTimeout: 10 * time.Second,
ResponseHeaderTimeout: 30 * time.Second,
ExpectContinueTimeout: 1 * time.Second,
}
client := &http.Client{
Transport: transport,
Timeout: 180 * time.Second, // 3 minute timeout for large datasets
}
requestorId := "ti1-" + timestamp
url := "https://api.entur.io/realtime/v1/rest/et?useOriginalId=true&maxSize=100000&requestorId=" + requestorId
resp, err := client.Get(url)
// Retry logic for transient failures
var resp *http.Response
var err error
var data *Data
maxRetries := 3
for attempt := 1; attempt <= maxRetries; attempt++ {
log.Printf("Fetching data from URL (attempt %d/%d): %s", attempt, maxRetries, url)
resp, err = client.Get(url)
if err != nil {
log.Printf("Request failed: %v", err)
if attempt < maxRetries {
waitTime := time.Duration(attempt*2) * time.Second
log.Printf("Retrying in %v...", waitTime)
time.Sleep(waitTime)
}
continue
}
// Check HTTP status code
if resp.StatusCode != http.StatusOK {
resp.Body.Close()
err = fmt.Errorf("HTTP error: %s (status code: %d)", resp.Status, resp.StatusCode)
log.Printf("%v", err)
if attempt < maxRetries {
waitTime := time.Duration(attempt*2) * time.Second
log.Printf("Retrying in %v...", waitTime)
time.Sleep(waitTime)
}
continue
}
// Try to decode the response
data = &Data{}
decoder := xml.NewDecoder(resp.Body)
err = decoder.Decode(data)
resp.Body.Close()
if err != nil {
log.Printf("Failed to decode XML: %v", err)
if attempt < maxRetries {
waitTime := time.Duration(attempt*2) * time.Second
log.Printf("Retrying in %v...", waitTime)
time.Sleep(waitTime)
}
continue
}
// Success!
log.Printf("Successfully fetched and decoded data")
return data, nil
}
// All retries failed
if err != nil {
return nil, err
}
defer resp.Body.Close()
data := &Data{}
decoder := xml.NewDecoder(resp.Body)
err = decoder.Decode(data)
if err != nil {
return nil, err
}
return data, nil
return nil, fmt.Errorf("Failed to fetch data after %d attempts", maxRetries)
}

View File

@@ -1,11 +1,24 @@
package database
import (
"context"
"crypto/md5"
"database/sql"
"encoding/hex"
"fmt"
"sync"
"ti1/valki"
"github.com/valkey-io/valkey-go"
)
func InsertOrUpdateEstimatedCall(db *sql.DB, values []interface{}) (int, string, error) {
type CallResult struct {
ID int
Action string
Error error
}
func InsertOrUpdateEstimatedCall(ctx context.Context, db *sql.DB, values []interface{}, valkeyClient valkey.Client) (int, string, error) {
// Replace empty strings with nil for timestamp fields
for i, v := range values {
if str, ok := v.(string); ok && str == "" {
@@ -13,43 +26,101 @@ func InsertOrUpdateEstimatedCall(db *sql.DB, values []interface{}) (int, string,
}
}
query := `
INSERT INTO calls (
estimatedvehiclejourney, "order", stoppointref,
aimeddeparturetime, expecteddeparturetime,
aimedarrivaltime, expectedarrivaltime,
cancellation, estimated_data
)
VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9)
ON CONFLICT (estimatedvehiclejourney, "order")
DO UPDATE SET
stoppointref = EXCLUDED.stoppointref,
aimeddeparturetime = EXCLUDED.aimeddeparturetime,
expecteddeparturetime = EXCLUDED.expecteddeparturetime,
aimedarrivaltime = EXCLUDED.aimedarrivaltime,
expectedarrivaltime = EXCLUDED.expectedarrivaltime,
cancellation = EXCLUDED.cancellation,
estimated_data = EXCLUDED.estimated_data
RETURNING CASE WHEN xmax = 0 THEN 'insert' ELSE 'update' END, id;
`
stmt, err := db.Prepare(query)
if err != nil {
return 0, "", fmt.Errorf("error preparing statement: %v", err)
}
defer stmt.Close()
var action string
var id int
err = stmt.QueryRow(values...).Scan(&action, &id)
if err != nil {
if 1 == 0 {
fmt.Println("Executing query:", query)
for i, v := range values {
fmt.Printf("Value %d: (%v)\n", i+1, v)
}
// Convert values to a single string and hash it using MD5
var valuesString string
for _, v := range values {
if v != nil {
valuesString += fmt.Sprintf("%v", v)
}
return 0, "", fmt.Errorf("error executing statement: %v", err)
}
return id, action, nil
hash := md5.Sum([]byte(valuesString))
hashString := hex.EncodeToString(hash[:])
estimatedVehicleJourneyID := values[0]
orderID := values[1]
key := fmt.Sprintf("%v.%v", estimatedVehicleJourneyID, orderID)
// Get the MD5 hash from Valkey
retrievedHash, err := valki.GetValkeyValue(ctx, valkeyClient, key)
if err != nil {
return 0, "", fmt.Errorf("failed to get value from Valkey: %w", err)
}
// Check if the retrieved value matches the original MD5 hash
if retrievedHash != hashString {
query := `
INSERT INTO calls (
estimatedvehiclejourney, "order", stoppointref,
aimeddeparturetime, expecteddeparturetime,
aimedarrivaltime, expectedarrivaltime,
cancellation, estimated_data
)
VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9)
ON CONFLICT (estimatedvehiclejourney, "order")
DO UPDATE SET
stoppointref = EXCLUDED.stoppointref,
aimeddeparturetime = EXCLUDED.aimeddeparturetime,
expecteddeparturetime = EXCLUDED.expecteddeparturetime,
aimedarrivaltime = EXCLUDED.aimedarrivaltime,
expectedarrivaltime = EXCLUDED.expectedarrivaltime,
cancellation = EXCLUDED.cancellation,
estimated_data = EXCLUDED.estimated_data
RETURNING CASE WHEN xmax = 0 THEN 'insert' ELSE 'update' END, id;
`
err = valki.SetValkeyValue(ctx, valkeyClient, key, hashString)
if err != nil {
return 0, "", fmt.Errorf("failed to set value in Valkey: %w", err)
}
var action string
var id int
err = db.QueryRowContext(ctx, query, values...).Scan(&action, &id)
if err != nil {
return 0, "", fmt.Errorf("error executing statement: %w", err)
}
return id, action, nil
}
return 0, "none", nil
}
// BatchInsertEstimatedCalls processes multiple estimated calls concurrently
func BatchInsertEstimatedCalls(ctx context.Context, db *sql.DB, batch [][]interface{}, valkeyClient valkey.Client, workerCount int) ([]CallResult, error) {
if len(batch) == 0 {
return nil, nil
}
results := make([]CallResult, len(batch))
jobs := make(chan int, len(batch))
var wg sync.WaitGroup
// Start workers
for w := 0; w < workerCount; w++ {
wg.Add(1)
go func() {
defer wg.Done()
for idx := range jobs {
select {
case <-ctx.Done():
return
default:
id, action, err := InsertOrUpdateEstimatedCall(ctx, db, batch[idx], valkeyClient)
results[idx] = CallResult{
ID: id,
Action: action,
Error: err,
}
}
}
}()
}
// Send jobs
for i := range batch {
jobs <- i
}
close(jobs)
wg.Wait()
return results, nil
}

View File

@@ -1,41 +1,135 @@
package database
import (
"context"
"database/sql"
"fmt"
"sync"
)
type EVJResult struct {
ID int
Action string
Error error
Index int // To maintain order
}
// PreparedStatements holds reusable prepared statements
type PreparedStatements struct {
evjStmt *sql.Stmt
ecStmt *sql.Stmt
rcStmt *sql.Stmt
mu sync.Mutex
}
func NewPreparedStatements(db *sql.DB) (*PreparedStatements, error) {
evjQuery := `
INSERT INTO estimatedvehiclejourney (servicedelivery, recordedattime, lineref, directionref, datasource, datedvehiclejourneyref, vehiclemode, dataframeref, originref, destinationref, operatorref, vehicleref, cancellation, other, firstservicedelivery)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $1)
ON CONFLICT (lineref, directionref, datasource, datedvehiclejourneyref)
DO UPDATE SET
servicedelivery = EXCLUDED.servicedelivery,
recordedattime = EXCLUDED.recordedattime,
vehiclemode = COALESCE(EXCLUDED.vehiclemode, estimatedvehiclejourney.vehiclemode),
dataframeref = COALESCE(EXCLUDED.dataframeref, estimatedvehiclejourney.dataframeref),
originref = COALESCE(EXCLUDED.originref, estimatedvehiclejourney.originref),
destinationref = COALESCE(EXCLUDED.destinationref, estimatedvehiclejourney.destinationref),
operatorref = COALESCE(EXCLUDED.operatorref, estimatedvehiclejourney.operatorref),
vehicleref = COALESCE(EXCLUDED.vehicleref, estimatedvehiclejourney.vehicleref),
cancellation = COALESCE(EXCLUDED.cancellation, estimatedvehiclejourney.cancellation),
other = COALESCE(EXCLUDED.other, estimatedvehiclejourney.other)
RETURNING CASE WHEN xmax = 0 THEN 'insert' ELSE 'update' END, id;
`
evjStmt, err := db.Prepare(evjQuery)
if err != nil {
return nil, fmt.Errorf("failed to prepare EVJ statement: %w", err)
}
return &PreparedStatements{
evjStmt: evjStmt,
}, nil
}
func (ps *PreparedStatements) Close() {
if ps.evjStmt != nil {
ps.evjStmt.Close()
}
if ps.ecStmt != nil {
ps.ecStmt.Close()
}
if ps.rcStmt != nil {
ps.rcStmt.Close()
}
}
func InsertOrUpdateEstimatedVehicleJourney(db *sql.DB, values []interface{}) (int, string, error) {
query := `
INSERT INTO estimatedvehiclejourney (servicedelivery, recordedattime, lineref, directionref, datasource, datedvehiclejourneyref, vehiclemode, dataframeref, originref, destinationref, operatorref, vehicleref, cancellation, other, firstservicedelivery)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $1)
ON CONFLICT (lineref, directionref, datasource, datedvehiclejourneyref)
DO UPDATE SET
servicedelivery = EXCLUDED.servicedelivery,
recordedattime = EXCLUDED.recordedattime,
vehiclemode = COALESCE(EXCLUDED.vehiclemode, estimatedvehiclejourney.vehiclemode),
dataframeref = COALESCE(EXCLUDED.dataframeref, estimatedvehiclejourney.dataframeref),
originref = COALESCE(EXCLUDED.originref, estimatedvehiclejourney.originref),
destinationref = COALESCE(EXCLUDED.destinationref, estimatedvehiclejourney.destinationref),
operatorref = COALESCE(EXCLUDED.operatorref, estimatedvehiclejourney.operatorref),
vehicleref = COALESCE(EXCLUDED.vehicleref, estimatedvehiclejourney.vehicleref),
cancellation = COALESCE(EXCLUDED.cancellation, estimatedvehiclejourney.cancellation),
other = COALESCE(EXCLUDED.other, estimatedvehiclejourney.other)
RETURNING CASE WHEN xmax = 0 THEN 'insert' ELSE 'update' END, id;
INSERT INTO estimatedvehiclejourney (servicedelivery, recordedattime, lineref, directionref, datasource, datedvehiclejourneyref, vehiclemode, dataframeref, originref, destinationref, operatorref, vehicleref, cancellation, other, firstservicedelivery)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $1)
ON CONFLICT (lineref, directionref, datasource, datedvehiclejourneyref)
DO UPDATE SET
servicedelivery = EXCLUDED.servicedelivery,
recordedattime = EXCLUDED.recordedattime,
vehiclemode = COALESCE(EXCLUDED.vehiclemode, estimatedvehiclejourney.vehiclemode),
dataframeref = COALESCE(EXCLUDED.dataframeref, estimatedvehiclejourney.dataframeref),
originref = COALESCE(EXCLUDED.originref, estimatedvehiclejourney.originref),
destinationref = COALESCE(EXCLUDED.destinationref, estimatedvehiclejourney.destinationref),
operatorref = COALESCE(EXCLUDED.operatorref, estimatedvehiclejourney.operatorref),
vehicleref = COALESCE(EXCLUDED.vehicleref, estimatedvehiclejourney.vehicleref),
cancellation = COALESCE(EXCLUDED.cancellation, estimatedvehiclejourney.cancellation),
other = COALESCE(EXCLUDED.other, estimatedvehiclejourney.other)
RETURNING CASE WHEN xmax = 0 THEN 'insert' ELSE 'update' END, id;
`
stmt, err := db.Prepare(query)
if err != nil {
return 0, "", fmt.Errorf("error preparing statement: %v", err)
}
defer stmt.Close()
var action string
var id int
err = stmt.QueryRow(values...).Scan(&action, &id)
err := db.QueryRow(query, values...).Scan(&action, &id)
if err != nil {
return 0, "", fmt.Errorf("error executing statement: %v", err)
return 0, "", fmt.Errorf("error executing EVJ statement: %w", err)
}
return id, action, nil
}
// BatchInsertEVJ processes multiple EVJ inserts concurrently
func BatchInsertEVJ(ctx context.Context, db *sql.DB, batch [][]interface{}, workerCount int) ([]EVJResult, error) {
if len(batch) == 0 {
return nil, nil
}
results := make([]EVJResult, len(batch))
jobs := make(chan int, len(batch))
var wg sync.WaitGroup
// Start workers
for w := 0; w < workerCount; w++ {
wg.Add(1)
go func() {
defer wg.Done()
for idx := range jobs {
select {
case <-ctx.Done():
return
default:
id, action, err := InsertOrUpdateEstimatedVehicleJourney(db, batch[idx])
results[idx] = EVJResult{
ID: id,
Action: action,
Error: err,
Index: idx,
}
}
}
}()
}
// Send jobs
for i := range batch {
jobs <- i
}
close(jobs)
wg.Wait()
return results, nil
}

View File

@@ -1,11 +1,18 @@
package database
import (
"context"
"crypto/md5"
"database/sql"
"encoding/hex"
"fmt"
"sync"
"ti1/valki"
"github.com/valkey-io/valkey-go"
)
func InsertOrUpdateRecordedCall(db *sql.DB, values []interface{}) (int, string, error) {
func InsertOrUpdateRecordedCall(ctx context.Context, db *sql.DB, values []interface{}, valkeyClient valkey.Client) (int, string, error) {
// Replace empty strings with nil for timestamp fields
for i, v := range values {
if str, ok := v.(string); ok && str == "" {
@@ -13,46 +20,104 @@ func InsertOrUpdateRecordedCall(db *sql.DB, values []interface{}) (int, string,
}
}
query := `
INSERT INTO calls (
estimatedvehiclejourney, "order", stoppointref,
aimeddeparturetime, expecteddeparturetime,
aimedarrivaltime, expectedarrivaltime,
cancellation, actualdeparturetime, actualarrivaltime,
recorded_data
)
VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11)
ON CONFLICT (estimatedvehiclejourney, "order")
DO UPDATE SET
stoppointref = EXCLUDED.stoppointref,
aimeddeparturetime = EXCLUDED.aimeddeparturetime,
expecteddeparturetime = EXCLUDED.expecteddeparturetime,
aimedarrivaltime = EXCLUDED.aimedarrivaltime,
expectedarrivaltime = EXCLUDED.expectedarrivaltime,
cancellation = EXCLUDED.cancellation,
actualdeparturetime = EXCLUDED.actualdeparturetime,
actualarrivaltime = EXCLUDED.actualarrivaltime,
recorded_data = EXCLUDED.recorded_data
RETURNING CASE WHEN xmax = 0 THEN 'insert' ELSE 'update' END, id;
`
stmt, err := db.Prepare(query)
if err != nil {
return 0, "", fmt.Errorf("error preparing statement: %v", err)
}
defer stmt.Close()
var action string
var id int
err = stmt.QueryRow(values...).Scan(&action, &id)
if err != nil {
if 1 == 0 {
fmt.Println("Executing query:", query)
for i, v := range values {
fmt.Printf("Value %d: (%v)\n", i+1, v)
}
// Convert values to a single string and hash it using MD5
var valuesString string
for _, v := range values {
if v != nil {
valuesString += fmt.Sprintf("%v", v)
}
return 0, "", fmt.Errorf("error executing statement: %v", err)
}
return id, action, nil
hash := md5.Sum([]byte(valuesString))
hashString := hex.EncodeToString(hash[:])
estimatedVehicleJourneyID := values[0]
orderID := values[1]
key := fmt.Sprintf("%v.%v", estimatedVehicleJourneyID, orderID)
// Get the MD5 hash from Valkey
retrievedHash, err := valki.GetValkeyValue(ctx, valkeyClient, key)
if err != nil {
return 0, "", fmt.Errorf("failed to get value from Valkey: %w", err)
}
// Check if the retrieved value matches the original MD5 hash
if retrievedHash != hashString {
query := `
INSERT INTO calls (
estimatedvehiclejourney, "order", stoppointref,
aimeddeparturetime, expecteddeparturetime,
aimedarrivaltime, expectedarrivaltime,
cancellation, actualdeparturetime, actualarrivaltime,
recorded_data
)
VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11)
ON CONFLICT (estimatedvehiclejourney, "order")
DO UPDATE SET
stoppointref = EXCLUDED.stoppointref,
aimeddeparturetime = EXCLUDED.aimeddeparturetime,
expecteddeparturetime = EXCLUDED.expecteddeparturetime,
aimedarrivaltime = EXCLUDED.aimedarrivaltime,
expectedarrivaltime = EXCLUDED.expectedarrivaltime,
cancellation = EXCLUDED.cancellation,
actualdeparturetime = EXCLUDED.actualdeparturetime,
actualarrivaltime = EXCLUDED.actualarrivaltime,
recorded_data = EXCLUDED.recorded_data
RETURNING CASE WHEN xmax = 0 THEN 'insert' ELSE 'update' END, id;
`
err = valki.SetValkeyValue(ctx, valkeyClient, key, hashString)
if err != nil {
return 0, "", fmt.Errorf("failed to set value in Valkey: %w", err)
}
var action string
var id int
err = db.QueryRowContext(ctx, query, values...).Scan(&action, &id)
if err != nil {
return 0, "", fmt.Errorf("error executing statement: %w", err)
}
return id, action, nil
}
return 0, "none", nil
}
// BatchInsertRecordedCalls processes multiple recorded calls concurrently
func BatchInsertRecordedCalls(ctx context.Context, db *sql.DB, batch [][]interface{}, valkeyClient valkey.Client, workerCount int) ([]CallResult, error) {
if len(batch) == 0 {
return nil, nil
}
results := make([]CallResult, len(batch))
jobs := make(chan int, len(batch))
var wg sync.WaitGroup
// Start workers
for w := 0; w < workerCount; w++ {
wg.Add(1)
go func() {
defer wg.Done()
for idx := range jobs {
select {
case <-ctx.Done():
return
default:
id, action, err := InsertOrUpdateRecordedCall(ctx, db, batch[idx], valkeyClient)
results[idx] = CallResult{
ID: id,
Action: action,
Error: err,
}
}
}
}()
}
// Send jobs
for i := range batch {
jobs <- i
}
close(jobs)
wg.Wait()
return results, nil
}

View File

@@ -6,25 +6,18 @@ import (
)
func InsertServiceDelivery(db *sql.DB, responseTimestamp string, recordedAtTime string) (int, error) {
fmt.Println("Inserting ServiceDelivery...")
var id int
err := db.QueryRow("INSERT INTO public.ServiceDelivery (ResponseTimestamp, RecordedAtTime) VALUES ($1, $2) RETURNING ID", responseTimestamp, recordedAtTime).Scan(&id)
if err != nil {
fmt.Println(err)
return 0, err
return 0, fmt.Errorf("failed to insert service delivery: %w", err)
}
//fmt.Println("ServiceDelivery inserted successfully! (", id, ")")
return id, nil
}
func UpdateServiceDeliveryData(db *sql.DB, id int, data string) error {
fmt.Println("Updating ServiceDelivery data...")
_, err := db.Exec("UPDATE public.ServiceDelivery SET Data = $1 WHERE ID = $2", data, id)
if err != nil {
fmt.Println(err)
return err
return fmt.Errorf("failed to update service delivery data: %w", err)
}
fmt.Println("Finished with this ServiceDelivery!")
return nil
}

View File

@@ -0,0 +1,89 @@
package databaseold
import (
"context"
"crypto/md5"
"database/sql"
"encoding/hex"
"fmt"
"ti1/valki"
"github.com/valkey-io/valkey-go"
)
func InsertOrUpdateEstimatedCall(ctx context.Context, db *sql.DB, values []interface{}, valkeyClient valkey.Client) (int, string, error) {
// Replace empty strings with nil for timestamp fields
for i, v := range values {
if str, ok := v.(string); ok && str == "" {
values[i] = nil
}
}
// Convert values to a single string and hash it using MD5
var valuesString string
for _, v := range values {
if v != nil {
valuesString += fmt.Sprintf("%v", v)
}
}
hash := md5.Sum([]byte(valuesString))
hashString := hex.EncodeToString(hash[:])
//fmt.Println("HashString:", hashString)
estimatedVehicleJourneyID := values[0]
orderID := values[1]
key := fmt.Sprintf("%v.%v", estimatedVehicleJourneyID, orderID)
//fmt.Printf("Estimated Vehicle Journey ID: %v, Order ID: %v\n", estimatedVehicleJourneyID, orderID)
var err error
// Get the MD5 hash from Valkey
retrievedHash, err := valki.GetValkeyValue(ctx, valkeyClient, key)
if err != nil {
return 0, "", fmt.Errorf("failed to get value from Valkey: %v", err)
}
// Check if the retrieved value matches the original MD5 hash
if retrievedHash != hashString {
query := `
INSERT INTO calls (
estimatedvehiclejourney, "order", stoppointref,
aimeddeparturetime, expecteddeparturetime,
aimedarrivaltime, expectedarrivaltime,
cancellation, estimated_data
)
VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9)
ON CONFLICT (estimatedvehiclejourney, "order")
DO UPDATE SET
stoppointref = EXCLUDED.stoppointref,
aimeddeparturetime = EXCLUDED.aimeddeparturetime,
expecteddeparturetime = EXCLUDED.expecteddeparturetime,
aimedarrivaltime = EXCLUDED.aimedarrivaltime,
expectedarrivaltime = EXCLUDED.expectedarrivaltime,
cancellation = EXCLUDED.cancellation,
estimated_data = EXCLUDED.estimated_data
RETURNING CASE WHEN xmax = 0 THEN 'insert' ELSE 'update' END, id;
`
stmt, err := db.Prepare(query)
if err != nil {
return 0, "", fmt.Errorf("error preparing statement: %v", err)
}
defer stmt.Close()
err = valki.SetValkeyValue(ctx, valkeyClient, key, hashString)
if err != nil {
return 0, "", fmt.Errorf("failed to set value in Valkey: %v", err)
}
var action string
var id int
err = stmt.QueryRow(values...).Scan(&action, &id)
if err != nil {
return 0, "", fmt.Errorf("error executing statement: %v", err)
}
return id, action, nil
} else {
//fmt.Printf("MATCH!!! Original Hash: %s, Retrieved Hash: %s\n", hashString, retrievedHash)
return 0, "none", nil
}
}

View File

@@ -0,0 +1,41 @@
package databaseold
import (
"database/sql"
"fmt"
)
func InsertOrUpdateEstimatedVehicleJourney(db *sql.DB, values []interface{}) (int, string, error) {
query := `
INSERT INTO estimatedvehiclejourney (servicedelivery, recordedattime, lineref, directionref, datasource, datedvehiclejourneyref, vehiclemode, dataframeref, originref, destinationref, operatorref, vehicleref, cancellation, other, firstservicedelivery)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $1)
ON CONFLICT (lineref, directionref, datasource, datedvehiclejourneyref)
DO UPDATE SET
servicedelivery = EXCLUDED.servicedelivery,
recordedattime = EXCLUDED.recordedattime,
vehiclemode = COALESCE(EXCLUDED.vehiclemode, estimatedvehiclejourney.vehiclemode),
dataframeref = COALESCE(EXCLUDED.dataframeref, estimatedvehiclejourney.dataframeref),
originref = COALESCE(EXCLUDED.originref, estimatedvehiclejourney.originref),
destinationref = COALESCE(EXCLUDED.destinationref, estimatedvehiclejourney.destinationref),
operatorref = COALESCE(EXCLUDED.operatorref, estimatedvehiclejourney.operatorref),
vehicleref = COALESCE(EXCLUDED.vehicleref, estimatedvehiclejourney.vehicleref),
cancellation = COALESCE(EXCLUDED.cancellation, estimatedvehiclejourney.cancellation),
other = COALESCE(EXCLUDED.other, estimatedvehiclejourney.other)
RETURNING CASE WHEN xmax = 0 THEN 'insert' ELSE 'update' END, id;
`
stmt, err := db.Prepare(query)
if err != nil {
return 0, "", fmt.Errorf("error preparing statement: %v", err)
}
defer stmt.Close()
var action string
var id int
err = stmt.QueryRow(values...).Scan(&action, &id)
if err != nil {
return 0, "", fmt.Errorf("error executing statement: %v", err)
}
return id, action, nil
}

View File

@@ -0,0 +1,89 @@
package databaseold
import (
"context"
"crypto/md5"
"database/sql"
"encoding/hex"
"fmt"
"ti1/valki"
"github.com/valkey-io/valkey-go"
)
func InsertOrUpdateRecordedCall(ctx context.Context, db *sql.DB, values []interface{}, valkeyClient valkey.Client) (int, string, error) {
// Replace empty strings with nil for timestamp fields
for i, v := range values {
if str, ok := v.(string); ok && str == "" {
values[i] = nil
}
}
// Convert values to a single string and hash it using MD5
var valuesString string
for _, v := range values {
if v != nil {
valuesString += fmt.Sprintf("%v", v)
}
}
hash := md5.Sum([]byte(valuesString))
hashString := hex.EncodeToString(hash[:])
estimatedVehicleJourneyID := values[0]
orderID := values[1]
key := fmt.Sprintf("%v.%v", estimatedVehicleJourneyID, orderID)
var err error
// Get the MD5 hash from Valkey
retrievedHash, err := valki.GetValkeyValue(ctx, valkeyClient, key)
if err != nil {
return 0, "", fmt.Errorf("failed to get value from Valkey: %v", err)
}
// Check if the retrieved value matches the original MD5 hash
if retrievedHash != hashString {
query := `
INSERT INTO calls (
estimatedvehiclejourney, "order", stoppointref,
aimeddeparturetime, expecteddeparturetime,
aimedarrivaltime, expectedarrivaltime,
cancellation, actualdeparturetime, actualarrivaltime,
recorded_data
)
VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11)
ON CONFLICT (estimatedvehiclejourney, "order")
DO UPDATE SET
stoppointref = EXCLUDED.stoppointref,
aimeddeparturetime = EXCLUDED.aimeddeparturetime,
expecteddeparturetime = EXCLUDED.expecteddeparturetime,
aimedarrivaltime = EXCLUDED.aimedarrivaltime,
expectedarrivaltime = EXCLUDED.expectedarrivaltime,
cancellation = EXCLUDED.cancellation,
actualdeparturetime = EXCLUDED.actualdeparturetime,
actualarrivaltime = EXCLUDED.actualarrivaltime,
recorded_data = EXCLUDED.recorded_data
RETURNING CASE WHEN xmax = 0 THEN 'insert' ELSE 'update' END, id;
`
stmt, err := db.Prepare(query)
if err != nil {
return 0, "", fmt.Errorf("error preparing statement: %v", err)
}
defer stmt.Close()
err = valki.SetValkeyValue(ctx, valkeyClient, key, hashString)
if err != nil {
return 0, "", fmt.Errorf("failed to set value in Valkey: %v", err)
}
var action string
var id int
err = stmt.QueryRow(values...).Scan(&action, &id)
if err != nil {
return 0, "", fmt.Errorf("error executing statement: %v", err)
}
return id, action, nil
} else {
return 0, "none", nil
}
}

View File

@@ -0,0 +1,30 @@
package databaseold
import (
"database/sql"
"fmt"
)
func InsertServiceDelivery(db *sql.DB, responseTimestamp string, recordedAtTime string) (int, error) {
fmt.Println("Inserting ServiceDelivery...")
var id int
err := db.QueryRow("INSERT INTO public.ServiceDelivery (ResponseTimestamp, RecordedAtTime) VALUES ($1, $2) RETURNING ID", responseTimestamp, recordedAtTime).Scan(&id)
if err != nil {
fmt.Println(err)
return 0, err
}
//fmt.Println("ServiceDelivery inserted successfully! (", id, ")")
return id, nil
}
func UpdateServiceDeliveryData(db *sql.DB, id int, data string) error {
fmt.Println("Updating ServiceDelivery data...")
_, err := db.Exec("UPDATE public.ServiceDelivery SET Data = $1 WHERE ID = $2", data, id)
if err != nil {
fmt.Println(err)
return err
}
fmt.Println("Finished with this ServiceDelivery!")
return nil
}

140
databaseold/SetupDB.go Normal file
View File

@@ -0,0 +1,140 @@
package databaseold
import (
"fmt"
"ti1/config"
)
func SetupDB() error {
fmt.Println("Setting up the database...")
// Connect to PostgreSQL
db, err := config.ConnectToPostgreSQL()
if err != nil {
return fmt.Errorf("failed to connect to database: %w", err)
}
defer config.DisconnectFromPostgreSQL(db)
// Create sequences if they do not exist
sequences := []string{
"CREATE SEQUENCE IF NOT EXISTS public.calls_id_seq",
"CREATE SEQUENCE IF NOT EXISTS public.estimatedvehiclejourney_id_seq",
"CREATE SEQUENCE IF NOT EXISTS public.servicedelivery_id_seq",
}
for _, seq := range sequences {
_, err := db.Exec(seq)
if err != nil {
return fmt.Errorf("failed to create sequence: %w", err)
}
}
// Check if tables exist and have the correct structure
tables := map[string]string{
"calls": `CREATE TABLE IF NOT EXISTS public.calls (
id BIGINT PRIMARY KEY DEFAULT nextval('public.calls_id_seq'),
estimatedvehiclejourney BIGINT,
"order" INTEGER,
stoppointref VARCHAR,
aimeddeparturetime TIMESTAMP,
expecteddeparturetime TIMESTAMP,
aimedarrivaltime TIMESTAMP,
expectedarrivaltime TIMESTAMP,
cancellation VARCHAR,
actualdeparturetime TIMESTAMP,
actualarrivaltime TIMESTAMP,
estimated_data JSON,
recorded_data JSON
);`,
"estimatedvehiclejourney": `CREATE TABLE IF NOT EXISTS public.estimatedvehiclejourney (
id BIGINT PRIMARY KEY DEFAULT nextval('public.estimatedvehiclejourney_id_seq'),
servicedelivery INTEGER,
recordedattime TIMESTAMP,
lineref VARCHAR,
directionref VARCHAR,
datasource VARCHAR,
datedvehiclejourneyref VARCHAR,
vehiclemode VARCHAR,
dataframeref VARCHAR,
originref VARCHAR,
destinationref VARCHAR,
operatorref VARCHAR,
vehicleref VARCHAR,
cancellation VARCHAR,
other JSON,
firstservicedelivery INTEGER
);`,
"servicedelivery": `CREATE TABLE IF NOT EXISTS public.servicedelivery (
id INTEGER PRIMARY KEY DEFAULT nextval('public.servicedelivery_id_seq'),
responsetimestamp TIMESTAMPTZ,
recordedattime TIMESTAMPTZ,
data JSON
);`,
}
for table, createStmt := range tables {
var exists bool
err := db.QueryRow(fmt.Sprintf("SELECT EXISTS (SELECT 1 FROM information_schema.tables WHERE table_name = '%s')", table)).Scan(&exists)
if err != nil {
return fmt.Errorf("failed to check if table %s exists: %w", table, err)
}
if !exists {
_, err := db.Exec(createStmt)
if err != nil {
return fmt.Errorf("failed to create table %s: %w", table, err)
}
fmt.Printf("Table %s created successfully!\n", table)
} else {
fmt.Printf("Table %s already exists.\n", table)
}
}
// Check if the unique constraint exists before adding it to calls table
var constraintExists bool
err = db.QueryRow(`
SELECT EXISTS (
SELECT 1
FROM pg_constraint
WHERE conname = 'unique_estimatedvehiclejourney_order'
);
`).Scan(&constraintExists)
if err != nil {
return fmt.Errorf("failed to check if unique constraint exists: %w", err)
}
if !constraintExists {
_, err = db.Exec(`ALTER TABLE calls ADD CONSTRAINT unique_estimatedvehiclejourney_order UNIQUE (estimatedvehiclejourney, "order");`)
if err != nil {
return fmt.Errorf("failed to add unique constraint to calls table: %w", err)
}
fmt.Println("Unique constraint added to calls table.")
} else {
fmt.Println("Unique constraint already exists on calls table.")
}
// Check if the unique constraint exists before adding it to estimatedvehiclejourney table
err = db.QueryRow(`
SELECT EXISTS (
SELECT 1
FROM pg_constraint
WHERE conname = 'unique_lineref_directionref_datasource_datedvehiclejourneyref'
);
`).Scan(&constraintExists)
if err != nil {
return fmt.Errorf("failed to check if unique constraint exists: %w", err)
}
if !constraintExists {
_, err = db.Exec(`ALTER TABLE estimatedvehiclejourney ADD CONSTRAINT unique_lineref_directionref_datasource_datedvehiclejourneyref UNIQUE (lineref, directionref, datasource, datedvehiclejourneyref);`)
if err != nil {
return fmt.Errorf("failed to add unique constraint to estimatedvehiclejourney table: %w", err)
}
fmt.Println("Unique constraint added to estimatedvehiclejourney table.")
} else {
fmt.Println("Unique constraint already exists on estimatedvehiclejourney table.")
}
fmt.Println("Database setup is good!")
return nil
}

View File

@@ -1,16 +1,26 @@
package export
import (
"context"
"encoding/json"
"fmt"
"log"
"strings"
"sync"
"sync/atomic"
"ti1/config"
"ti1/data"
"ti1/database"
"time"
)
// DBData is the main entry point for data processing
func DBData(data *data.Data) {
DBDataOptimized(data)
}
// DBDataOptimized processes data with concurrent workers for better performance
func DBDataOptimized(data *data.Data) {
fmt.Println(data.ServiceDelivery.ResponseTimestamp)
fmt.Println(data.ServiceDelivery.EstimatedTimetableDelivery[0].EstimatedJourneyVersionFrame.RecordedAtTime)
@@ -20,6 +30,15 @@ func DBData(data *data.Data) {
}
defer db.Close()
// Connect to Valkey
valkeyClient, err := config.ConnectToValkey("config/conf.json")
if err != nil {
log.Fatalf("Failed to connect to Valkey: %v", err)
}
defer config.DisconnectFromValkey(valkeyClient)
ctx := context.Background()
// Get service id aka sid
sid, err := database.InsertServiceDelivery(db, data.ServiceDelivery.ResponseTimestamp, data.ServiceDelivery.EstimatedTimetableDelivery[0].EstimatedJourneyVersionFrame.RecordedAtTime)
if err != nil {
@@ -27,385 +46,440 @@ func DBData(data *data.Data) {
}
fmt.Println("SID:", sid)
// counters
var insertCount, updateCount, totalCount, estimatedCallInsertCount, estimatedCallUpdateCount, recordedCallInsertCount, recordedCallUpdateCount int
// Record start time
startTime := time.Now()
for _, journey := range data.ServiceDelivery.EstimatedTimetableDelivery[0].EstimatedJourneyVersionFrame.EstimatedVehicleJourney {
var values []interface{}
var datedVehicleJourneyRef, otherJson string
// Atomic counters for thread-safe counting
var insertCount, updateCount, estimatedCallInsertCount, estimatedCallUpdateCount, estimatedCallNoneCount, recordedCallInsertCount, recordedCallUpdateCount, recordedCallNoneCount int64
values = append(values, sid)
values = append(values, journey.RecordedAtTime)
values = append(values, journey.LineRef)
//had to add to lowercase cus some values vary in case and it was causing duplicates
values = append(values, strings.ToLower(journey.DirectionRef))
values = append(values, journey.DataSource)
if journey.FramedVehicleJourneyRef.DatedVehicleJourneyRef != "" {
datedVehicleJourneyRef = journey.FramedVehicleJourneyRef.DatedVehicleJourneyRef
} else if journey.DatedVehicleJourneyRef != "" {
datedVehicleJourneyRef = journey.DatedVehicleJourneyRef
} else {
datedVehicleJourneyRef = "evj." + journey.EstimatedVehicleJourneyCode
}
values = append(values, datedVehicleJourneyRef)
values = append(values, journey.VehicleMode)
values = append(values, journey.FramedVehicleJourneyRef.DataFrameRef)
values = append(values, journey.OriginRef)
values = append(values, journey.DestinationRef)
values = append(values, journey.OperatorRef)
values = append(values, journey.VehicleRef)
values = append(values, journey.Cancellation)
// Create a map to hold the JSON object for the current journey
jsonObject := make(map[string]interface{})
// Add relevant fields to the JSON object
if journey.OriginName != "" {
jsonObject["OriginName"] = journey.OriginName
}
if journey.DestinationName != "" {
jsonObject["DestinationName"] = journey.DestinationName
}
if journey.ProductCategoryRef != "" {
jsonObject["ProductCategoryRef"] = journey.ProductCategoryRef
}
if journey.ServiceFeatureRef != "" {
jsonObject["ServiceFeatureRef"] = journey.ServiceFeatureRef
}
if journey.Monitored != "" {
jsonObject["Monitored"] = journey.Monitored
}
if journey.JourneyPatternRef != "" {
jsonObject["JourneyPatternRef"] = journey.JourneyPatternRef
}
if journey.JourneyPatternName != "" {
jsonObject["JourneyPatternName"] = journey.JourneyPatternName
}
if journey.PublishedLineName != "" {
jsonObject["PublishedLineName"] = journey.PublishedLineName
}
if journey.DirectionName != "" {
jsonObject["DirectionName"] = journey.DirectionName
}
if journey.OriginAimedDepartureTime != "" {
jsonObject["OriginAimedDepartureTime"] = journey.OriginAimedDepartureTime
}
if journey.DestinationAimedArrivalTime != "" {
jsonObject["DestinationAimedArrivalTime"] = journey.DestinationAimedArrivalTime
}
if journey.BlockRef != "" {
jsonObject["BlockRef"] = journey.BlockRef
}
if journey.VehicleJourneyRef != "" {
jsonObject["VehicleJourneyRef"] = journey.VehicleJourneyRef
}
if journey.Occupancy != "" {
jsonObject["Occupancy"] = journey.Occupancy
}
if journey.DestinationDisplayAtOrigin != "" {
jsonObject["DestinationDisplayAtOrigin"] = journey.DestinationDisplayAtOrigin
}
if journey.ExtraJourney != "" {
jsonObject["ExtraJourney"] = journey.ExtraJourney
}
if journey.RouteRef != "" {
jsonObject["RouteRef"] = journey.RouteRef
}
if journey.GroupOfLinesRef != "" {
jsonObject["GroupOfLinesRef"] = journey.GroupOfLinesRef
}
if journey.ExternalLineRef != "" {
jsonObject["ExternalLineRef"] = journey.ExternalLineRef
}
if journey.InCongestion != "" {
jsonObject["InCongestion"] = journey.InCongestion
}
if journey.PredictionInaccurate != "" {
jsonObject["PredictionInaccurate"] = journey.PredictionInaccurate
}
if journey.JourneyNote != "" {
jsonObject["JourneyNote"] = journey.JourneyNote
}
if journey.Via.PlaceName != "" {
jsonObject["Via"] = journey.Via.PlaceName
}
// Convert the JSON object to a JSON string
jsonString, err := json.Marshal(jsonObject)
if err != nil {
log.Fatal(err)
}
otherJson = string(jsonString)
values = append(values, otherJson)
// Insert or update the record
id, action, err := database.InsertOrUpdateEstimatedVehicleJourney(db, values)
if err != nil {
fmt.Printf("Error inserting/updating estimated vehicle journey: %v\n", err)
} else {
if 1 == 0 {
fmt.Printf("Action: %s, ID: %d\n", action, id)
}
if action == "insert" {
insertCount++
} else if action == "update" {
updateCount++
}
totalCount = insertCount + updateCount
//fmt.Printf("Inserts: %d, Updates: %d, Total: %d\n", insertCount, updateCount, totalCount)
if totalCount%1000 == 0 {
fmt.Printf(
"Inserts: %d, Updates: %d, Total: %d; estimatedCalls = I: %d U: %d; recordedCalls = I: %d U: %d\n",
insertCount,
updateCount,
totalCount,
estimatedCallInsertCount,
estimatedCallUpdateCount,
recordedCallInsertCount,
recordedCallUpdateCount,
)
}
}
for _, estimatedCall := range journey.EstimatedCalls {
for _, call := range estimatedCall.EstimatedCall {
var estimatedValues []interface{}
//1 estimatedvehiclejourney
estimatedValues = append(estimatedValues, id)
//2 order
estimatedValues = append(estimatedValues, call.Order)
//3 stoppointref
estimatedValues = append(estimatedValues, call.StopPointRef)
//4 aimeddeparturetime
estimatedValues = append(estimatedValues, call.AimedDepartureTime)
//5 expecteddeparturetime
estimatedValues = append(estimatedValues, call.ExpectedDepartureTime)
//6 aimedarrivaltime
estimatedValues = append(estimatedValues, call.AimedArrivalTime)
//7 expectedarrivaltime
estimatedValues = append(estimatedValues, call.ExpectedArrivalTime)
//8 cancellation
estimatedValues = append(estimatedValues, call.Cancellation)
//9 estimated_data (JSON)
estimatedJsonObject := make(map[string]interface{})
// data allrady loged
if call.ExpectedDepartureTime != "" {
estimatedJsonObject["ExpectedDepartureTime"] = call.ExpectedDepartureTime
}
if call.ExpectedArrivalTime != "" {
estimatedJsonObject["ExpectedArrivalTime"] = call.ExpectedArrivalTime
}
if call.Cancellation != "" {
estimatedJsonObject["Cancellation"] = call.Cancellation
}
// The rest
if call.StopPointName != "" {
estimatedJsonObject["StopPointName"] = call.StopPointName
}
if call.RequestStop != "" {
estimatedJsonObject["RequestStop"] = call.RequestStop
}
if call.DepartureStatus != "" {
estimatedJsonObject["DepartureStatus"] = call.DepartureStatus
}
if call.DeparturePlatformName != "" {
estimatedJsonObject["DeparturePlatformName"] = call.DeparturePlatformName
}
if call.DepartureBoardingActivity != "" {
estimatedJsonObject["DepartureBoardingActivity"] = call.DepartureBoardingActivity
}
if call.DepartureStopAssignment.AimedQuayRef != "" {
estimatedJsonObject["DepartureStopAssignment.AimedQuayRef"] = call.DepartureStopAssignment.AimedQuayRef
}
if call.DepartureStopAssignment.ExpectedQuayRef != "" {
estimatedJsonObject["DepartureStopAssignment.ExpectedQuayRef"] = call.DepartureStopAssignment.ExpectedQuayRef
}
if call.DepartureStopAssignment.ActualQuayRef != "" {
estimatedJsonObject["DepartureStopAssignment.ActualQuayRef"] = call.DepartureStopAssignment.ActualQuayRef
}
if call.Extensions.StopsAtAirport != "" {
estimatedJsonObject["Extensions.StopsAtAirport"] = call.Extensions.StopsAtAirport
}
if call.ArrivalStatus != "" {
estimatedJsonObject["ArrivalStatus"] = call.ArrivalStatus
}
if call.ArrivalPlatformName != "" {
estimatedJsonObject["ArrivalPlatformName"] = call.ArrivalPlatformName
}
if call.ArrivalBoardingActivity != "" {
estimatedJsonObject["ArrivalBoardingActivity"] = call.ArrivalBoardingActivity
}
if call.ArrivalStopAssignment.AimedQuayRef != "" {
estimatedJsonObject["ArrivalStopAssignment.AimedQuayRef"] = call.ArrivalStopAssignment.AimedQuayRef
}
if call.ArrivalStopAssignment.ExpectedQuayRef != "" {
estimatedJsonObject["ArrivalStopAssignment.ExpectedQuayRef"] = call.ArrivalStopAssignment.ExpectedQuayRef
}
if call.ArrivalStopAssignment.ActualQuayRef != "" {
estimatedJsonObject["ArrivalStopAssignment.ActualQuayRef"] = call.ArrivalStopAssignment.ActualQuayRef
}
if call.CallNote != "" {
estimatedJsonObject["CallNote"] = call.CallNote
}
if call.DestinationDisplay != "" {
estimatedJsonObject["DestinationDisplay"] = call.DestinationDisplay
}
if call.ExpectedDeparturePredictionQuality.PredictionLevel != "" {
estimatedJsonObject["ExpectedDeparturePredictionQuality.PredictionLevel"] = call.ExpectedDeparturePredictionQuality.PredictionLevel
}
if call.ExpectedArrivalPredictionQuality.PredictionLevel != "" {
estimatedJsonObject["ExpectedArrivalPredictionQuality.PredictionLevel"] = call.ExpectedArrivalPredictionQuality.PredictionLevel
}
if call.TimingPoint != "" {
estimatedJsonObject["TimingPoint"] = call.TimingPoint
}
if call.SituationRef != "" {
estimatedJsonObject["SituationRef"] = call.SituationRef
}
if call.PredictionInaccurate != "" {
estimatedJsonObject["PredictionInaccurate"] = call.PredictionInaccurate
}
if call.Occupancy != "" {
estimatedJsonObject["Occupancy"] = call.Occupancy
}
// Convert the JSON object to a JSON string
jsonString, err := json.Marshal(estimatedJsonObject)
if err != nil {
log.Fatal(err)
}
estimatedValues = append(estimatedValues, string(jsonString))
// Insert or update the record
stringValues := make([]string, len(estimatedValues))
for i, v := range estimatedValues {
stringValues[i] = fmt.Sprintf("%v", v)
}
interfaceValues := make([]interface{}, len(stringValues))
for i, v := range stringValues {
interfaceValues[i] = v
}
id, action, err := database.InsertOrUpdateEstimatedCall(db, interfaceValues)
if err != nil {
fmt.Printf("Error inserting/updating estimated call: %v\n", err)
} else {
if 1 == 0 {
fmt.Printf("Action: %s, ID: %d\n", action, id)
}
if action == "insert" {
estimatedCallInsertCount++
} else if action == "update" {
estimatedCallUpdateCount++
}
}
}
}
for _, recordedCall := range journey.RecordedCalls {
for _, call := range recordedCall.RecordedCall {
var recordedValues []interface{}
//1 estimatedvehiclejourney
recordedValues = append(recordedValues, id)
//2 order
recordedValues = append(recordedValues, call.Order)
//3 stoppointref
recordedValues = append(recordedValues, call.StopPointRef)
//4 aimeddeparturetime
recordedValues = append(recordedValues, call.AimedDepartureTime)
//5 expecteddeparturetime
recordedValues = append(recordedValues, call.ExpectedDepartureTime)
//6 aimedarrivaltime
recordedValues = append(recordedValues, call.AimedArrivalTime)
//7 expectedarrivaltime
recordedValues = append(recordedValues, call.ExpectedArrivalTime)
//8 cancellation
recordedValues = append(recordedValues, call.Cancellation)
//9 actualdeparturetime
recordedValues = append(recordedValues, call.ActualDepartureTime)
//10 actualarrivaltime
recordedValues = append(recordedValues, call.ActualArrivalTime)
//11 recorded_data (JSON)
recordedJsonObject := make(map[string]interface{})
if call.StopPointName != "" {
recordedJsonObject["StopPointName"] = call.StopPointName
}
if call.ArrivalPlatformName != "" {
recordedJsonObject["ArrivalPlatformName"] = call.ArrivalPlatformName
}
if call.DeparturePlatformName != "" {
recordedJsonObject["DeparturePlatformName"] = call.DeparturePlatformName
}
if call.PredictionInaccurate != "" {
recordedJsonObject["PredictionInaccurate"] = call.PredictionInaccurate
}
if call.Occupancy != "" {
recordedJsonObject["Occupancy"] = call.Occupancy
}
// Convert the JSON object to a JSON string
jsonString, err := json.Marshal(recordedJsonObject)
if err != nil {
log.Fatal(err)
}
recordedValues = append(recordedValues, string(jsonString))
// Insert or update the record
stringValues := make([]string, len(recordedValues))
for i, v := range recordedValues {
stringValues[i] = fmt.Sprintf("%v", v)
}
interfaceValues := make([]interface{}, len(stringValues))
for i, v := range stringValues {
interfaceValues[i] = v
}
id, action, err := database.InsertOrUpdateRecordedCall(db, interfaceValues)
if err != nil {
fmt.Printf("Error inserting/updating recorded call: %v\n", err)
} else {
if 1 == 0 {
fmt.Printf("Action: %s, ID: %d\n", action, id)
}
if action == "insert" {
recordedCallInsertCount++
//fmt.Printf("Action: %s, ID: %d\n", action, id)
} else if action == "update" {
recordedCallUpdateCount++
}
}
}
}
journeys := data.ServiceDelivery.EstimatedTimetableDelivery[0].EstimatedJourneyVersionFrame.EstimatedVehicleJourney
totalJourneys := len(journeys)
fmt.Printf("Processing %d journeys...\n", totalJourneys)
// Job structures
type evjJob struct {
index int
}
type callJob struct {
evjID int
values []interface{}
}
// Channels
workerCount := 20 // Adjust based on your database and CPU
evjJobs := make(chan evjJob, workerCount*2)
estimatedCallJobs := make(chan callJob, workerCount*10)
recordedCallJobs := make(chan callJob, workerCount*10)
var wg sync.WaitGroup
var callWg sync.WaitGroup
// Start Estimated Call workers
for w := 0; w < workerCount; w++ {
callWg.Add(1)
go func() {
defer callWg.Done()
for job := range estimatedCallJobs {
id, action, err := database.InsertOrUpdateEstimatedCall(ctx, db, job.values, valkeyClient)
if err != nil {
log.Printf("Error inserting/updating estimated call: %v\n", err)
continue
}
if action == "insert" {
atomic.AddInt64(&estimatedCallInsertCount, 1)
} else if action == "update" {
atomic.AddInt64(&estimatedCallUpdateCount, 1)
} else if action == "none" {
atomic.AddInt64(&estimatedCallNoneCount, 1)
}
_ = id
}
}()
}
// Start Recorded Call workers
for w := 0; w < workerCount; w++ {
callWg.Add(1)
go func() {
defer callWg.Done()
for job := range recordedCallJobs {
id, action, err := database.InsertOrUpdateRecordedCall(ctx, db, job.values, valkeyClient)
if err != nil {
log.Printf("Error inserting/updating recorded call: %v\n", err)
continue
}
if action == "insert" {
atomic.AddInt64(&recordedCallInsertCount, 1)
} else if action == "update" {
atomic.AddInt64(&recordedCallUpdateCount, 1)
} else if action == "none" {
atomic.AddInt64(&recordedCallNoneCount, 1)
}
_ = id
}
}()
}
// Start EVJ workers
for w := 0; w < workerCount; w++ {
wg.Add(1)
go func() {
defer wg.Done()
for job := range evjJobs {
journey := &journeys[job.index]
// Prepare values
var values []interface{}
var datedVehicleJourneyRef, otherJson string
values = append(values, sid)
values = append(values, journey.RecordedAtTime)
values = append(values, journey.LineRef)
values = append(values, strings.ToLower(journey.DirectionRef))
values = append(values, journey.DataSource)
if journey.FramedVehicleJourneyRef.DatedVehicleJourneyRef != "" {
datedVehicleJourneyRef = journey.FramedVehicleJourneyRef.DatedVehicleJourneyRef
} else if journey.DatedVehicleJourneyRef != "" {
datedVehicleJourneyRef = journey.DatedVehicleJourneyRef
} else {
datedVehicleJourneyRef = "evj." + journey.EstimatedVehicleJourneyCode
}
values = append(values, datedVehicleJourneyRef)
values = append(values, journey.VehicleMode)
values = append(values, journey.FramedVehicleJourneyRef.DataFrameRef)
values = append(values, journey.OriginRef)
values = append(values, journey.DestinationRef)
values = append(values, journey.OperatorRef)
values = append(values, journey.VehicleRef)
values = append(values, journey.Cancellation)
// Create JSON object
jsonObject := make(map[string]interface{})
if journey.OriginName != "" {
jsonObject["OriginName"] = journey.OriginName
}
if journey.DestinationName != "" {
jsonObject["DestinationName"] = journey.DestinationName
}
if journey.ProductCategoryRef != "" {
jsonObject["ProductCategoryRef"] = journey.ProductCategoryRef
}
if journey.ServiceFeatureRef != "" {
jsonObject["ServiceFeatureRef"] = journey.ServiceFeatureRef
}
if journey.Monitored != "" {
jsonObject["Monitored"] = journey.Monitored
}
if journey.JourneyPatternRef != "" {
jsonObject["JourneyPatternRef"] = journey.JourneyPatternRef
}
if journey.JourneyPatternName != "" {
jsonObject["JourneyPatternName"] = journey.JourneyPatternName
}
if journey.PublishedLineName != "" {
jsonObject["PublishedLineName"] = journey.PublishedLineName
}
if journey.DirectionName != "" {
jsonObject["DirectionName"] = journey.DirectionName
}
if journey.OriginAimedDepartureTime != "" {
jsonObject["OriginAimedDepartureTime"] = journey.OriginAimedDepartureTime
}
if journey.DestinationAimedArrivalTime != "" {
jsonObject["DestinationAimedArrivalTime"] = journey.DestinationAimedArrivalTime
}
if journey.BlockRef != "" {
jsonObject["BlockRef"] = journey.BlockRef
}
if journey.VehicleJourneyRef != "" {
jsonObject["VehicleJourneyRef"] = journey.VehicleJourneyRef
}
if journey.Occupancy != "" {
jsonObject["Occupancy"] = journey.Occupancy
}
if journey.DestinationDisplayAtOrigin != "" {
jsonObject["DestinationDisplayAtOrigin"] = journey.DestinationDisplayAtOrigin
}
if journey.ExtraJourney != "" {
jsonObject["ExtraJourney"] = journey.ExtraJourney
}
if journey.RouteRef != "" {
jsonObject["RouteRef"] = journey.RouteRef
}
if journey.GroupOfLinesRef != "" {
jsonObject["GroupOfLinesRef"] = journey.GroupOfLinesRef
}
if journey.ExternalLineRef != "" {
jsonObject["ExternalLineRef"] = journey.ExternalLineRef
}
if journey.InCongestion != "" {
jsonObject["InCongestion"] = journey.InCongestion
}
if journey.PredictionInaccurate != "" {
jsonObject["PredictionInaccurate"] = journey.PredictionInaccurate
}
if journey.JourneyNote != "" {
jsonObject["JourneyNote"] = journey.JourneyNote
}
if journey.Via.PlaceName != "" {
jsonObject["Via"] = journey.Via.PlaceName
}
jsonString, err := json.Marshal(jsonObject)
if err != nil {
log.Printf("Error marshaling JSON: %v\n", err)
continue
}
otherJson = string(jsonString)
values = append(values, otherJson)
// Insert or update EVJ
id, action, err := database.InsertOrUpdateEstimatedVehicleJourney(db, values)
if err != nil {
log.Printf("Error inserting/updating estimated vehicle journey: %v\n", err)
continue
}
if action == "insert" {
atomic.AddInt64(&insertCount, 1)
} else if action == "update" {
atomic.AddInt64(&updateCount, 1)
}
// Progress reporting
total := atomic.AddInt64(&insertCount, 0) + atomic.AddInt64(&updateCount, 0)
if total%1000 == 0 {
fmt.Printf(
"EVJ - I: %d, U: %d, Total: %d; EstCalls - I: %d U: %d N: %d; RecCalls - I: %d U: %d N: %d\n",
atomic.LoadInt64(&insertCount),
atomic.LoadInt64(&updateCount),
total,
atomic.LoadInt64(&estimatedCallInsertCount),
atomic.LoadInt64(&estimatedCallUpdateCount),
atomic.LoadInt64(&estimatedCallNoneCount),
atomic.LoadInt64(&recordedCallInsertCount),
atomic.LoadInt64(&recordedCallUpdateCount),
atomic.LoadInt64(&recordedCallNoneCount),
)
}
// Process Estimated Calls
for _, estimatedCall := range journey.EstimatedCalls {
for _, call := range estimatedCall.EstimatedCall {
var estimatedValues []interface{}
estimatedValues = append(estimatedValues, id)
estimatedValues = append(estimatedValues, call.Order)
estimatedValues = append(estimatedValues, call.StopPointRef)
estimatedValues = append(estimatedValues, call.AimedDepartureTime)
estimatedValues = append(estimatedValues, call.ExpectedDepartureTime)
estimatedValues = append(estimatedValues, call.AimedArrivalTime)
estimatedValues = append(estimatedValues, call.ExpectedArrivalTime)
estimatedValues = append(estimatedValues, call.Cancellation)
// estimated_data JSON
estimatedJsonObject := make(map[string]interface{})
if call.ExpectedDepartureTime != "" {
estimatedJsonObject["ExpectedDepartureTime"] = call.ExpectedDepartureTime
}
if call.ExpectedArrivalTime != "" {
estimatedJsonObject["ExpectedArrivalTime"] = call.ExpectedArrivalTime
}
if call.Cancellation != "" {
estimatedJsonObject["Cancellation"] = call.Cancellation
}
if call.StopPointName != "" {
estimatedJsonObject["StopPointName"] = call.StopPointName
}
if call.RequestStop != "" {
estimatedJsonObject["RequestStop"] = call.RequestStop
}
if call.DepartureStatus != "" {
estimatedJsonObject["DepartureStatus"] = call.DepartureStatus
}
if call.DeparturePlatformName != "" {
estimatedJsonObject["DeparturePlatformName"] = call.DeparturePlatformName
}
if call.DepartureBoardingActivity != "" {
estimatedJsonObject["DepartureBoardingActivity"] = call.DepartureBoardingActivity
}
if call.DepartureStopAssignment.AimedQuayRef != "" {
estimatedJsonObject["DepartureStopAssignment.AimedQuayRef"] = call.DepartureStopAssignment.AimedQuayRef
}
if call.DepartureStopAssignment.ExpectedQuayRef != "" {
estimatedJsonObject["DepartureStopAssignment.ExpectedQuayRef"] = call.DepartureStopAssignment.ExpectedQuayRef
}
if call.DepartureStopAssignment.ActualQuayRef != "" {
estimatedJsonObject["DepartureStopAssignment.ActualQuayRef"] = call.DepartureStopAssignment.ActualQuayRef
}
if call.Extensions.StopsAtAirport != "" {
estimatedJsonObject["Extensions.StopsAtAirport"] = call.Extensions.StopsAtAirport
}
if call.ArrivalStatus != "" {
estimatedJsonObject["ArrivalStatus"] = call.ArrivalStatus
}
if call.ArrivalPlatformName != "" {
estimatedJsonObject["ArrivalPlatformName"] = call.ArrivalPlatformName
}
if call.ArrivalBoardingActivity != "" {
estimatedJsonObject["ArrivalBoardingActivity"] = call.ArrivalBoardingActivity
}
if call.ArrivalStopAssignment.AimedQuayRef != "" {
estimatedJsonObject["ArrivalStopAssignment.AimedQuayRef"] = call.ArrivalStopAssignment.AimedQuayRef
}
if call.ArrivalStopAssignment.ExpectedQuayRef != "" {
estimatedJsonObject["ArrivalStopAssignment.ExpectedQuayRef"] = call.ArrivalStopAssignment.ExpectedQuayRef
}
if call.ArrivalStopAssignment.ActualQuayRef != "" {
estimatedJsonObject["ArrivalStopAssignment.ActualQuayRef"] = call.ArrivalStopAssignment.ActualQuayRef
}
if call.CallNote != "" {
estimatedJsonObject["CallNote"] = call.CallNote
}
if call.DestinationDisplay != "" {
estimatedJsonObject["DestinationDisplay"] = call.DestinationDisplay
}
if call.ExpectedDeparturePredictionQuality.PredictionLevel != "" {
estimatedJsonObject["ExpectedDeparturePredictionQuality.PredictionLevel"] = call.ExpectedDeparturePredictionQuality.PredictionLevel
}
if call.ExpectedArrivalPredictionQuality.PredictionLevel != "" {
estimatedJsonObject["ExpectedArrivalPredictionQuality.PredictionLevel"] = call.ExpectedArrivalPredictionQuality.PredictionLevel
}
if call.TimingPoint != "" {
estimatedJsonObject["TimingPoint"] = call.TimingPoint
}
if call.SituationRef != "" {
estimatedJsonObject["SituationRef"] = call.SituationRef
}
if call.PredictionInaccurate != "" {
estimatedJsonObject["PredictionInaccurate"] = call.PredictionInaccurate
}
if call.Occupancy != "" {
estimatedJsonObject["Occupancy"] = call.Occupancy
}
jsonString, err := json.Marshal(estimatedJsonObject)
if err != nil {
log.Printf("Error marshaling estimated call JSON: %v\n", err)
continue
}
estimatedValues = append(estimatedValues, string(jsonString))
// Convert to string values
interfaceValues := make([]interface{}, len(estimatedValues))
for i, v := range estimatedValues {
interfaceValues[i] = fmt.Sprintf("%v", v)
}
// Send to worker pool
estimatedCallJobs <- callJob{evjID: id, values: interfaceValues}
}
}
// Process Recorded Calls
for _, recordedCall := range journey.RecordedCalls {
for _, call := range recordedCall.RecordedCall {
var recordedValues []interface{}
recordedValues = append(recordedValues, id)
recordedValues = append(recordedValues, call.Order)
recordedValues = append(recordedValues, call.StopPointRef)
recordedValues = append(recordedValues, call.AimedDepartureTime)
recordedValues = append(recordedValues, call.ExpectedDepartureTime)
recordedValues = append(recordedValues, call.AimedArrivalTime)
recordedValues = append(recordedValues, call.ExpectedArrivalTime)
recordedValues = append(recordedValues, call.Cancellation)
recordedValues = append(recordedValues, call.ActualDepartureTime)
recordedValues = append(recordedValues, call.ActualArrivalTime)
// recorded_data JSON
recordedJsonObject := make(map[string]interface{})
if call.StopPointName != "" {
recordedJsonObject["StopPointName"] = call.StopPointName
}
if call.ArrivalPlatformName != "" {
recordedJsonObject["ArrivalPlatformName"] = call.ArrivalPlatformName
}
if call.DeparturePlatformName != "" {
recordedJsonObject["DeparturePlatformName"] = call.DeparturePlatformName
}
if call.PredictionInaccurate != "" {
recordedJsonObject["PredictionInaccurate"] = call.PredictionInaccurate
}
if call.Occupancy != "" {
recordedJsonObject["Occupancy"] = call.Occupancy
}
jsonString, err := json.Marshal(recordedJsonObject)
if err != nil {
log.Printf("Error marshaling recorded call JSON: %v\n", err)
continue
}
recordedValues = append(recordedValues, string(jsonString))
// Convert to string values
interfaceValues := make([]interface{}, len(recordedValues))
for i, v := range recordedValues {
interfaceValues[i] = fmt.Sprintf("%v", v)
}
// Send to worker pool
recordedCallJobs <- callJob{evjID: id, values: interfaceValues}
}
}
}
}()
}
// Send all EVJ jobs
for i := range journeys {
evjJobs <- evjJob{index: i}
}
close(evjJobs)
// Wait for EVJ processing to complete
wg.Wait()
// Close call job channels and wait for call processing to complete
close(estimatedCallJobs)
close(recordedCallJobs)
callWg.Wait()
// Record end time
endTime := time.Now()
// Print final stats
fmt.Printf(
"DONE: Inserts: %d, Updates: %d, Total: %d; estimatedCalls = I: %d U: %d; recordedCalls = I: %d U: %d\n",
insertCount,
updateCount,
totalCount,
estimatedCallInsertCount,
estimatedCallUpdateCount,
recordedCallInsertCount,
recordedCallUpdateCount,
"\nDONE: EVJ - Inserts: %d, Updates: %d, Total: %d\n"+
" EstimatedCalls - I: %d U: %d N: %d\n"+
" RecordedCalls - I: %d U: %d N: %d\n",
atomic.LoadInt64(&insertCount),
atomic.LoadInt64(&updateCount),
atomic.LoadInt64(&insertCount)+atomic.LoadInt64(&updateCount),
atomic.LoadInt64(&estimatedCallInsertCount),
atomic.LoadInt64(&estimatedCallUpdateCount),
atomic.LoadInt64(&estimatedCallNoneCount),
atomic.LoadInt64(&recordedCallInsertCount),
atomic.LoadInt64(&recordedCallUpdateCount),
atomic.LoadInt64(&recordedCallNoneCount),
)
// Create map to hold JSON
serviceDeliveryJsonObject := make(map[string]interface{})
// Add fields to JSON
serviceDeliveryJsonObject["Inserts"] = insertCount
serviceDeliveryJsonObject["Updates"] = updateCount
serviceDeliveryJsonObject["EstimatedCallInserts"] = estimatedCallInsertCount
serviceDeliveryJsonObject["EstimatedCallUpdates"] = estimatedCallUpdateCount
serviceDeliveryJsonObject["RecordedCallInserts"] = recordedCallInsertCount
serviceDeliveryJsonObject["RecordedCallUpdates"] = recordedCallUpdateCount
serviceDeliveryJsonObject["Inserts"] = atomic.LoadInt64(&insertCount)
serviceDeliveryJsonObject["Updates"] = atomic.LoadInt64(&updateCount)
serviceDeliveryJsonObject["EstimatedCallInserts"] = atomic.LoadInt64(&estimatedCallInsertCount)
serviceDeliveryJsonObject["EstimatedCallUpdates"] = atomic.LoadInt64(&estimatedCallUpdateCount)
serviceDeliveryJsonObject["EstimatedCallNone"] = atomic.LoadInt64(&estimatedCallNoneCount)
serviceDeliveryJsonObject["RecordedCallInserts"] = atomic.LoadInt64(&recordedCallInsertCount)
serviceDeliveryJsonObject["RecordedCallUpdates"] = atomic.LoadInt64(&recordedCallUpdateCount)
serviceDeliveryJsonObject["RecordedCallNone"] = atomic.LoadInt64(&recordedCallNoneCount)
serviceDeliveryJsonObject["StartTime"] = startTime.Format(time.RFC3339)
serviceDeliveryJsonObject["EndTime"] = endTime.Format(time.RFC3339)
serviceDeliveryJsonObject["Duration"] = endTime.Sub(startTime).String()
// Convert JSON object to JSON string
serviceDeliveryJsonString, err := json.Marshal(serviceDeliveryJsonObject)
@@ -418,4 +492,6 @@ func DBData(data *data.Data) {
if err != nil {
log.Fatal(err)
}
fmt.Println("Finished with this ServiceDelivery!")
}

7
go.mod
View File

@@ -1,5 +1,10 @@
module ti1
go 1.22.1
go 1.23.4
require github.com/lib/pq v1.10.9
require (
github.com/valkey-io/valkey-go v1.0.52 // indirect
golang.org/x/sys v0.24.0 // indirect
)

4
go.sum
View File

@@ -1,2 +1,6 @@
github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw=
github.com/lib/pq v1.10.9/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o=
github.com/valkey-io/valkey-go v1.0.52 h1:ojrR736satGucqpllYzal8fUrNNROc11V10zokAyIYg=
github.com/valkey-io/valkey-go v1.0.52/go.mod h1:BXlVAPIL9rFQinSFM+N32JfWzfCaUAqBpZkc4vPY6fM=
golang.org/x/sys v0.24.0 h1:Twjiwq9dn6R1fQcyiK+wQyHWfaz/BJB+YIpzU/Cv3Xg=
golang.org/x/sys v0.24.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=

View File

@@ -9,6 +9,7 @@ import (
)
func main() {
log.Println("ti1 v1.0.2")
log.Println("Starting...")
// Setup the database

26
valki/commands.go Normal file
View File

@@ -0,0 +1,26 @@
package valki
import (
"context"
"fmt"
"time"
"github.com/valkey-io/valkey-go"
)
func SetValkeyValue(ctx context.Context, client valkey.Client, key, value string) error {
err := client.Do(ctx, client.B().Set().Key(key).Value(value).Ex(time.Hour).Build()).Error()
if err != nil {
return fmt.Errorf("failed to set value in Valkey: %v", err)
}
return nil
}
func GetValkeyValue(ctx context.Context, client valkey.Client, key string) (string, error) {
value, err := client.Do(ctx, client.B().Get().Key(key).Build()).ToString()
if err != nil {
return "hehe", nil
//return "", fmt.Errorf("failed to get value from Valkey: %v", err)
}
return value, nil
}