69 Commits
v0.1 ... main

Author SHA1 Message Date
pigwin
caeb7651d3 v1.1.0 2026-02-12 11:00:03 +00:00
pigwin
21a133fc57 nowait 2026-02-12 10:50:29 +00:00
pigwin
a4c4159688 v1.0.4 2026-02-12 10:34:27 +00:00
pigwin
f2b984b6f9 Forkort tid mellom hent 2026-02-12 10:34:04 +00:00
pigwin
348b3cd5b7 v1.0.3 2026-02-12 09:30:43 +00:00
pigwin
4c94654e0a v1.0.3 - oppdatering av eksterne pakker 2026-02-12 09:30:13 +00:00
pigwin
5b840950c8 v1.0.2
come on plz work for ducks sake
2025-12-23 22:51:20 +01:00
pigwin
4c72bb1d4c v1.0.2
v1.0.2 
Stor V ikke funke lel
2025-12-23 22:45:00 +01:00
pigwin
ea36b60233 v1.0.2 2025-12-23 21:44:06 +00:00
pigwin
201811e2d3 V1.0.2
V1.0.2 merge for da real timeing :)
2025-12-23 21:44:31 +01:00
pigwin
bcdc24e47f oppdatere print log 2025-12-23 20:42:13 +00:00
pigwin
4bb49e944f start end cool 2025-12-23 20:41:45 +00:00
pigwin
fbb20d576b v1.0.1 2025-12-23 17:14:43 +00:00
pigwin
823701c698 v1.0.0
V1.0
2025-12-23 18:06:08 +01:00
pigwin
00e6667bc0 v1.0.0 2025-12-23 16:44:24 +00:00
pigwin
9b433cdd57 dont diez plz 2025-12-23 14:40:53 +00:00
pigwin
c106267d76 retry stuffs?
plz workz
2025-12-23 14:34:25 +00:00
pigwin
b95fbb477d v num 2025-12-23 14:11:30 +00:00
pigwin
7424600a8b worth a shot 2025-12-23 13:24:20 +00:00
pigwin
5171dcf4f6 old folder
tmp
2025-12-23 13:03:56 +00:00
pigwin
0324912eb4 tstttttt 2025-12-23 12:46:30 +00:00
pigwin
c45723a5b0 v0.2.1 2025-01-11 21:09:27 +01:00
pigwin
779530f036 v0.2.1
added changes
2025-01-11 21:03:15 +01:00
pigwin
fd43b3e4bb nvm that esitmated via part 2025-01-11 19:51:41 +00:00
pigwin
bed666bd81 use valkey for Estimated Vehicle 2025-01-11 18:30:53 +00:00
pigwin
c09ea5784a woops forgot that part 2025-01-11 17:38:34 +00:00
pigwin
8af34a9ab3 InsertOrUpdateRecordedCall use valkey :) 2025-01-11 17:26:43 +00:00
pigwin
5bdc2acd81 Merge pull request #4 from pigwin-3/main
sync
2025-01-11 17:59:40 +01:00
pigwin
3058d98a67 v0.2.0 2025-01-09 20:23:38 +00:00
pigwin
21b3bba164 v0.2
just for the github action to run
2025-01-09 21:17:25 +01:00
pigwin
c08bcb8432 Merge pull request #3 from pigwin-3/dev
v0.2
2025-01-09 21:14:34 +01:00
pigwin
14af72959e almost release v0.2 2025-01-09 20:13:55 +00:00
pigwin
afd6b12acf chat gpt said this would work and i trust it like 2 % 2025-01-09 19:54:11 +00:00
pigwin
d996411adf inprove reporting???!?!!!!!. 2025-01-09 19:28:10 +00:00
pigwin
083a267b2a WTF hash match not do the match thing lol 2025-01-09 19:06:51 +00:00
pigwin
fe72a507e1 testing 2025-01-09 18:59:46 +00:00
pigwin
5203208fe7 hash validation and database insertion logic 2025-01-09 18:05:17 +00:00
pigwin
2141c5f168 hehe 2025-01-09 17:56:41 +00:00
pigwin
9919c159f2 gotta fix that valkey shit later 2025-01-07 20:39:47 +00:00
pigwin
07838da0ad lolololololollollollol 2025-01-07 20:33:29 +00:00
pigwin
b8a2a5837f hehehehehe mek valke wurke 2025-01-07 20:13:23 +00:00
pigwin
df0b5135bd wtf so mutch pain cus i named a package wrong lol 2025-01-07 19:51:14 +00:00
pigwin
a2c1766dd1 refactor: improve connection string formatting and enhance logging in ConnectToPostgreSQL function 2025-01-07 18:54:49 +00:00
pigwin
97a6506a65 valkey testy 2025-01-07 18:36:27 +00:00
pigwin
ba558803ff add password configuration to Valkey settings 2025-01-07 18:28:47 +00:00
pigwin
c6fc0070cf add Valkey configuration and connection management 2025-01-06 20:47:55 +00:00
pigwin
c456bdecdb hash for valky or reddis or smth else 2025-01-06 15:32:04 +00:00
pigwin
1020dacf79 add connection pool settings in ConnectToPostgreSQL function 2025-01-04 22:07:05 +00:00
pigwin
6c20858280 remove database connection pool settings from ConnectToPostgreSQL function 2025-01-04 21:30:01 +00:00
pigwin
119898acc5 Merge branch 'test' 2025-01-04 21:27:15 +00:00
pigwin
51fb986710 didnt work well 2025-01-04 21:26:09 +00:00
pigwin
3c1b84197a idk i just hope it works 2025-01-04 21:15:45 +00:00
pigwin
75e007603f chat gpt pls dp me help, yess 2025-01-04 20:57:01 +00:00
pigwin
a50ef5b899 i have no idea wtf im doin 2025-01-04 20:53:24 +00:00
pigwin
def3a9c38c add time package import in db.go 2025-01-04 20:30:05 +00:00
pigwin
c1992f7616 configure database connection settings for improved performance 2025-01-04 20:27:01 +00:00
pigwin
a274810818 Update docker-image.yml 2025-01-04 21:24:23 +01:00
pigwin
42b75360c4 test 2025-01-04 20:19:39 +00:00
pigwin
c99f22b131 lets undo that????? 2025-01-04 20:14:51 +00:00
pigwin
7b96214476 goroutines??? trying to make go fast lol 2025-01-04 20:11:30 +00:00
pigwin
c12ec02270 enhance PostgreSQL connection handling with connection pool settings 2025-01-04 17:07:41 +00:00
pigwin
6db5b12d7b update Dockerfile to use Go version 1.23.4 2025-01-04 16:06:19 +00:00
pigwin
8f372b4213 add log package import in data.go 2025-01-04 16:04:13 +00:00
pigwin
ccbf9c8d72 update Go version to 1.23.4 and add logging for data fetching 2025-01-04 16:01:17 +00:00
pigwin
fb4862c078 v0.1.2 2024-12-30 21:49:22 +00:00
pigwin
f3dbd4505f md documentation so i dont forget imedietly 2024-12-30 21:35:19 +00:00
pigwin
2462c6057e log more 2024-12-30 21:16:45 +00:00
pigwin
cda95aa4f2 v0.1.1 2024-12-30 21:03:41 +00:00
pigwin
b7e448df8d v0.1 2024-12-29 19:55:27 +01:00
17 changed files with 1235 additions and 532 deletions

View File

@@ -4,6 +4,7 @@ on:
push:
branches:
- main
- dev
jobs:
build:
@@ -26,26 +27,32 @@ jobs:
id: timestamp
run: echo "TIMESTAMP=$(date +%Y%m%d%H%M%S)" >> $GITHUB_ENV
- name: Get the commit tag
id: get-tag
run: echo "GIT_TAG=$(git describe --tags --exact-match 2>/dev/null || echo 'no-tag')" >> $GITHUB_ENV
- name: Get commit version
id: commit-version
run: |
COMMIT_MSG=$(git log -1 --pretty=%B | head -n1 | xargs)
echo "Commit message: $COMMIT_MSG" # Debugging output
# Updated regex to handle both vX.Y, vX.Y.Z, and vX.Y-pre-release formats
if [[ "$COMMIT_MSG" =~ ^v[0-9]+\.[0-9]+(\.[0-9]+)?(-[a-zA-Z0-9._-]+)?$ ]]; then
echo "Version match: $COMMIT_MSG"
echo "VERSION=${COMMIT_MSG}" >> $GITHUB_ENV
else
echo "No version match, defaulting to 'dev'"
echo "VERSION=dev" >> $GITHUB_ENV
fi
- name: Build Docker image
run: |
if [ "${{ env.GIT_TAG }}" == "no-tag" ]; then
docker build -t ti1:dev-${{ env.TIMESTAMP }} .
else
docker build -t ti1:latest -t ti1:${{ env.GIT_TAG }} .
fi
docker build -t ti1:${{ env.VERSION }} .
- name: Push Docker image
run: |
if [ "${{ env.GIT_TAG }}" == "no-tag" ]; then
docker tag ti1:dev-${{ env.TIMESTAMP }} ${{ secrets.DOCKER_USERNAME }}/ti1:dev-${{ env.TIMESTAMP }}
docker push ${{ secrets.DOCKER_USERNAME }}/ti1:dev-${{ env.TIMESTAMP }}
else
docker tag ti1:latest ${{ secrets.DOCKER_USERNAME }}/ti1:latest
docker tag ti1:${{ env.GIT_TAG }} ${{ secrets.DOCKER_USERNAME }}/ti1:${{ env.GIT_TAG }}
docker push ${{ secrets.DOCKER_USERNAME }}/ti1:latest
docker push ${{ secrets.DOCKER_USERNAME }}/ti1:${{ env.GIT_TAG }}
# Always push to 'dev' tag
docker tag ti1:${{ env.VERSION }} ${{ secrets.DOCKER_USERNAME }}/ti1:dev
docker push ${{ secrets.DOCKER_USERNAME }}/ti1:dev
# If the version is valid, also push that specific version tag
if [[ "${{ env.VERSION }}" != "dev" ]]; then
docker tag ti1:${{ env.VERSION }} ${{ secrets.DOCKER_USERNAME }}/ti1:${{ env.VERSION }}
docker push ${{ secrets.DOCKER_USERNAME }}/ti1:${{ env.VERSION }}
fi

View File

@@ -1,5 +1,5 @@
# Use the official Golang image as the base image
FROM golang:1.22.1
FROM golang:1.26.0
# Set the Current Working Directory inside the container
WORKDIR /app

160
README.md
View File

@@ -1,22 +1,164 @@
# TI1
The best thing to happen since yesterday at 3 pm
The best thing to happen since yesterday at 2:56 pm
## Usage
To use this project, you can pull the Docker image from Docker Hub and run it using the following commands:
Start with getting Docker then do the following:
### Pull the Docker Image
### Create the setup files
Create a `docker-compose.yml`
```yaml
services:
db:
image: postgres:17.2
container_name: postgres-db
environment:
POSTGRES_USER: postgres
POSTGRES_PASSWORD: RootPassword
POSTGRES_DB: ti1
ports:
- "5432:5432"
volumes:
- ./postgres_data:/var/lib/postgresql/data # Store data in the current directory
- ./init.sql:/docker-entrypoint-initdb.d/init.sql:ro
networks:
- app-network
healthcheck:
test: ["CMD", "pg_isready", "-U", "postgres", "-d", "ti1", "-h", "db"]
interval: 10s
retries: 5
restart: always # Ensure the container always restarts
```sh
docker pull pigwin1/ti1:latest
valkey:
image: valkey/valkey:latest
container_name: valkey
environment:
VALKEY_PASSWORD: the_valkey_password
ports:
- "6379:6379"
volumes:
- ./valkey_data:/data
networks:
- app-network
restart: always # Ensure the container always restarts
ti1-container:
image: pigwin1/ti1:dev
container_name: ti1-container
environment:
DB_HOST: db
DB_PORT: 5432
DB_USER: ti1
DB_PASSWORD: ti1password
DB_NAME: ti1
DB_SSLMODE: disable
VALKEY_HOST: valkey
VALKEY_PORT: 6379
VALKEY_PASSWORD: the_valkey_password
depends_on:
db:
condition: service_healthy # Wait until the db service is healthy
valkey:
condition: service_started # Wait until the valkey service is started
networks:
- app-network
restart: always # Ensure the container always restarts
networks:
app-network:
driver: bridge
volumes:
postgres_data:
driver: local
valkey_data:
driver: local
```
### Run the Docker Container
```sh
docker run -d --name ti1-container -e DB_HOST=<your_db_host> -e DB_PORT=<your_db_port> -e DB_USER=<your_db_user> -e DB_PASSWORD=<your_db_password> -e DB_NAME=<your_db_name> -e DB_SSLMODE=<your_db_sslmode> pigwin1/ti1:latest
Create `init.sql`
```sql
-- Check if 'post' user exists; create if not
DO $$
BEGIN
IF NOT EXISTS (SELECT FROM pg_roles WHERE rolname = 'post') THEN
CREATE ROLE post WITH LOGIN PASSWORD 'postpassword';
GRANT ALL PRIVILEGES ON DATABASE ti1 TO post;
ALTER ROLE post WITH SUPERUSER;
END IF;
END
$$;
-- Check if 'ti1' user exists; create if not
DO $$
BEGIN
IF NOT EXISTS (SELECT FROM pg_roles WHERE rolname = 'ti1') THEN
CREATE ROLE ti1 WITH LOGIN PASSWORD 'ti1password';
GRANT ALL PRIVILEGES ON DATABASE ti1 TO ti1;
GRANT ALL PRIVILEGES ON ALL TABLES IN SCHEMA public TO ti1;
GRANT ALL PRIVILEGES ON ALL SEQUENCES IN SCHEMA public TO ti1;
GRANT ALL PRIVILEGES ON ALL FUNCTIONS IN SCHEMA public TO ti1;
-- Grant the ti1 user the necessary permissions on the public schema
GRANT USAGE, CREATE ON SCHEMA public TO ti1;
-- Grant all permissions (SELECT, INSERT, UPDATE, DELETE, etc.) on all existing tables in the public schema
GRANT ALL ON ALL TABLES IN SCHEMA public TO ti1;
-- Grant all permissions on all existing sequences in the public schema
GRANT ALL ON ALL SEQUENCES IN SCHEMA public TO ti1;
-- Grant all permissions on all functions in the public schema
GRANT ALL ON ALL FUNCTIONS IN SCHEMA public TO ti1;
-- Ensure that the ti1 user will have access to new tables, sequences, and functions created in the public schema
ALTER DEFAULT PRIVILEGES IN SCHEMA public GRANT ALL ON TABLES TO ti1;
ALTER DEFAULT PRIVILEGES IN SCHEMA public GRANT ALL ON SEQUENCES TO ti1;
ALTER DEFAULT PRIVILEGES IN SCHEMA public GRANT ALL ON FUNCTIONS TO ti1;
-- Optionally, grant full permissions on the entire database to ti1 (if needed)
-- GRANT ALL PRIVILEGES ON DATABASE ti1 TO ti1;
END IF;
END
$$;
```
Remember to change the password values
### Run the Docker Containers
```sh
docker compose up -d
```
### edit the postgress config (optinal)
open the config file
```sh
nano postgres_data/postgresql.conf
```
Change the following values
```conf
listen_addresses = '*'
max_connections = 200
shared_buffers = 16GB
work_mem = 256MB
maintenance_work_mem = 2GB
dynamic_shared_memory_type = posix
max_wal_size = 1GB
min_wal_size = 80MB
```
set these to what makes most sense for you
These values should also be set bet not necessarily changed
```conf
log_timezone = 'Etc/UTC'
datestyle = 'iso, mdy'
timezone = 'Etc/UTC'
lc_messages = 'en_US.utf8'
lc_monetary = 'en_US.utf8'
lc_numeric = 'en_US.utf8'
lc_time = 'en_US.utf8'
default_text_search_config = 'pg_catalog.english'
```
Replace `<your_db_host>`, `<your_db_port>`, `<your_db_user>`, `<your_db_password>`, `<your_db_name>`, and `<your_db_sslmode>` with your actual database configuration values.
### Docker Hub Repository
You can find the Docker image on Docker Hub at the following link:

View File

@@ -7,5 +7,12 @@
"dbname": "ti1",
"sslmode": "disable"
},
"valkey": {
"host": "127.0.0.1",
"port": "6379",
"max_conns": 100,
"timeout_ms": 2000,
"password": "the_valkey_password"
},
"temp": "value"
}

View File

@@ -4,6 +4,7 @@ import (
"encoding/json"
"fmt"
"os"
"strconv"
)
type Config struct {
@@ -15,6 +16,13 @@ type Config struct {
DBName string `json:"dbname"`
SSLMode string `json:"sslmode"`
} `json:"database"`
Valkey struct {
Host string `json:"host"`
Port string `json:"port"`
MaxConns int `json:"max_conns"`
TimeoutMs int `json:"timeout_ms"`
Password string `json:"password"` // Add this line
} `json:"valkey"`
Temp string `json:"temp"`
}
@@ -53,7 +61,24 @@ func LoadConfig(file string) (Config, error) {
if temp := os.Getenv("TEMP"); temp != "" {
config.Temp = temp
}
//log.Println("Temp value:", config.Temp)
// Override Valkey settings with environment variables
if valkeyHost := os.Getenv("VALKEY_HOST"); valkeyHost != "" {
config.Valkey.Host = valkeyHost
}
if valkeyPort := os.Getenv("VALKEY_PORT"); valkeyPort != "" {
config.Valkey.Port = valkeyPort
}
if maxConns := os.Getenv("VALKEY_MAX_CONNS"); maxConns != "" {
if val, err := strconv.Atoi(maxConns); err == nil {
config.Valkey.MaxConns = val
}
}
if timeoutMs := os.Getenv("VALKEY_TIMEOUT_MS"); timeoutMs != "" {
if val, err := strconv.Atoi(timeoutMs); err == nil {
config.Valkey.TimeoutMs = val
}
}
return config, nil
}

View File

@@ -4,6 +4,7 @@ import (
"database/sql"
"fmt"
"log"
"time"
_ "github.com/lib/pq"
)
@@ -17,7 +18,7 @@ func ConnectToPostgreSQL() (*sql.DB, error) {
fmt.Println("Configuration loaded successfully!")
connStr := fmt.Sprintf("host=%s port=%s user=%s password=%s dbname=%s sslmode=%s",
connStr := fmt.Sprintf("host=%s port=%s user='%s' password='%s' dbname='%s' sslmode=%s",
config.Database.Host, config.Database.Port, config.Database.User, config.Database.Password, config.Database.DBName, config.Database.SSLMode)
// Open connection to database
@@ -26,7 +27,13 @@ func ConnectToPostgreSQL() (*sql.DB, error) {
return nil, err
}
fmt.Println("Connection to PostgreSQL opened successfully!")
// Set connection pool settings for high concurrency
db.SetMaxOpenConns(50) // Maximum number of open connections to the database
db.SetMaxIdleConns(25) // Maximum number of connections in the idle connection pool
db.SetConnMaxLifetime(1 * time.Hour) // Maximum amount of time a connection may be reused
db.SetConnMaxIdleTime(5 * time.Minute) // Maximum amount of time a connection may be idle
fmt.Println("Connection to PostgreSQL opened successfully :D")
// Ping database to verify connection
err = db.Ping()

100
config/valkey.go Normal file
View File

@@ -0,0 +1,100 @@
package config
import (
"context"
"encoding/json"
"fmt"
"log"
"os"
"strconv"
"time"
"github.com/valkey-io/valkey-go"
)
type ValkeyConfig struct {
Host string `json:"host"`
Port string `json:"port"`
MaxConns int `json:"max_conns"`
TimeoutMs int `json:"timeout_ms"`
Password string `json:"password"` // Add this line
}
func LoadValkeyConfig(file string) (ValkeyConfig, error) {
var config ValkeyConfig
configFile, err := os.Open(file)
if err != nil {
return config, fmt.Errorf("failed to open config file: %w", err)
}
defer configFile.Close()
if err := json.NewDecoder(configFile).Decode(&config); err != nil {
return config, fmt.Errorf("failed to parse Valkey config: %w", err)
}
// Override with environment variables if set
if host := os.Getenv("VALKEY_HOST"); host != "" {
config.Host = host
}
if port := os.Getenv("VALKEY_PORT"); port != "" {
config.Port = port
}
if maxConns := os.Getenv("VALKEY_MAX_CONNS"); maxConns != "" {
if val, err := strconv.Atoi(maxConns); err == nil {
config.MaxConns = val
}
}
if timeoutMs := os.Getenv("VALKEY_TIMEOUT_MS"); timeoutMs != "" {
if val, err := strconv.Atoi(timeoutMs); err == nil {
config.TimeoutMs = val
}
}
if password := os.Getenv("VALKEY_PASSWORD"); password != "" {
config.Password = password
}
return config, nil
}
func ConnectToValkey(configPath string) (valkey.Client, error) {
fmt.Println("Loading configuration...")
config, err := LoadConfig(configPath)
if err != nil {
return nil, fmt.Errorf("failed to load config: %v", err)
}
fmt.Println("Configuration loaded successfully!")
valkeyConfig := config.Valkey
// Setup Valkey client options
options := valkey.ClientOption{
InitAddress: []string{fmt.Sprintf("%s:%s", valkeyConfig.Host, valkeyConfig.Port)},
Password: valkeyConfig.Password,
// Additional options can be added here if required
}
fmt.Printf("Connecting to Valkey at %s:%s...\n", valkeyConfig.Host, valkeyConfig.Port)
client, err := valkey.NewClient(options)
if err != nil {
return nil, fmt.Errorf("failed to connect to Valkey: %v", err)
}
// Optionally, perform a ping to validate the connection
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*time.Duration(valkeyConfig.TimeoutMs))
defer cancel()
if err := client.Do(ctx, client.B().Ping().Build()).Error(); err != nil {
client.Close()
return nil, fmt.Errorf("failed to ping Valkey: %v", err)
}
log.Println("Connected to Valkey successfully!")
return client, nil
}
func DisconnectFromValkey(client valkey.Client) error {
fmt.Println("Disconnecting from Valkey...")
client.Close()
log.Println("Disconnected from Valkey successfully!")
return nil
}

View File

@@ -1,8 +1,12 @@
package data
import (
"crypto/tls"
"encoding/xml"
"fmt"
"log"
"net/http"
"time"
)
type Data struct {
@@ -125,21 +129,87 @@ type Data struct {
} `xml:"ServiceDelivery"`
}
func FetchData() (*Data, error) {
client := &http.Client{}
func FetchData(timestamp string) (*Data, error) {
// Configure HTTP client with timeout and HTTP/1.1 to avoid HTTP/2 stream errors
transport := &http.Transport{
TLSClientConfig: &tls.Config{
MinVersion: tls.VersionTLS12,
},
MaxIdleConns: 10,
MaxIdleConnsPerHost: 10,
IdleConnTimeout: 90 * time.Second,
DisableCompression: false,
ForceAttemptHTTP2: false, // Disable HTTP/2 to avoid stream errors
TLSHandshakeTimeout: 10 * time.Second,
ResponseHeaderTimeout: 30 * time.Second,
ExpectContinueTimeout: 1 * time.Second,
}
resp, err := client.Get("https://api.entur.io/realtime/v1/rest/et?useOriginalId=true&maxSize=100000")
client := &http.Client{
Transport: transport,
Timeout: 180 * time.Second, // 3 minute timeout for large datasets
}
requestorId := "ti1-" + timestamp
url := "https://api.entur.io/realtime/v1/rest/et?useOriginalId=true&maxSize=100000&requestorId=" + requestorId
// Retry logic for transient failures
var resp *http.Response
var err error
var data *Data
maxRetries := 3
for attempt := 1; attempt <= maxRetries; attempt++ {
log.Printf("Fetching data from URL (attempt %d/%d): %s", attempt, maxRetries, url)
resp, err = client.Get(url)
if err != nil {
log.Printf("Request failed: %v", err)
if attempt < maxRetries {
waitTime := time.Duration(attempt*2) * time.Second
log.Printf("Retrying in %v...", waitTime)
time.Sleep(waitTime)
}
continue
}
// Check HTTP status code
if resp.StatusCode != http.StatusOK {
resp.Body.Close()
err = fmt.Errorf("HTTP error: %s (status code: %d)", resp.Status, resp.StatusCode)
log.Printf("%v", err)
if attempt < maxRetries {
waitTime := time.Duration(attempt*2) * time.Second
log.Printf("Retrying in %v...", waitTime)
time.Sleep(waitTime)
}
continue
}
// Try to decode the response
data = &Data{}
decoder := xml.NewDecoder(resp.Body)
err = decoder.Decode(data)
resp.Body.Close()
if err != nil {
log.Printf("Failed to decode XML: %v", err)
if attempt < maxRetries {
waitTime := time.Duration(attempt*2) * time.Second
log.Printf("Retrying in %v...", waitTime)
time.Sleep(waitTime)
}
continue
}
// Success!
log.Printf("Successfully fetched and decoded data")
return data, nil
}
// All retries failed
if err != nil {
return nil, err
}
defer resp.Body.Close()
data := &Data{}
decoder := xml.NewDecoder(resp.Body)
err = decoder.Decode(data)
if err != nil {
return nil, err
}
return data, nil
return nil, fmt.Errorf("Failed to fetch data after %d attempts", maxRetries)
}

View File

@@ -1,11 +1,24 @@
package database
import (
"context"
"crypto/md5"
"database/sql"
"encoding/hex"
"fmt"
"sync"
"ti1/valki"
"github.com/valkey-io/valkey-go"
)
func InsertOrUpdateEstimatedCall(db *sql.DB, values []interface{}) (int, string, error) {
type CallResult struct {
ID int
Action string
Error error
}
func InsertOrUpdateEstimatedCall(ctx context.Context, db *sql.DB, values []interface{}, valkeyClient valkey.Client) (int, string, error) {
// Replace empty strings with nil for timestamp fields
for i, v := range values {
if str, ok := v.(string); ok && str == "" {
@@ -13,43 +26,101 @@ func InsertOrUpdateEstimatedCall(db *sql.DB, values []interface{}) (int, string,
}
}
query := `
INSERT INTO calls (
estimatedvehiclejourney, "order", stoppointref,
aimeddeparturetime, expecteddeparturetime,
aimedarrivaltime, expectedarrivaltime,
cancellation, estimated_data
)
VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9)
ON CONFLICT (estimatedvehiclejourney, "order")
DO UPDATE SET
stoppointref = EXCLUDED.stoppointref,
aimeddeparturetime = EXCLUDED.aimeddeparturetime,
expecteddeparturetime = EXCLUDED.expecteddeparturetime,
aimedarrivaltime = EXCLUDED.aimedarrivaltime,
expectedarrivaltime = EXCLUDED.expectedarrivaltime,
cancellation = EXCLUDED.cancellation,
estimated_data = EXCLUDED.estimated_data
RETURNING CASE WHEN xmax = 0 THEN 'insert' ELSE 'update' END, id;
`
stmt, err := db.Prepare(query)
if err != nil {
return 0, "", fmt.Errorf("error preparing statement: %v", err)
}
defer stmt.Close()
var action string
var id int
err = stmt.QueryRow(values...).Scan(&action, &id)
if err != nil {
if 1 == 0 {
fmt.Println("Executing query:", query)
for i, v := range values {
fmt.Printf("Value %d: (%v)\n", i+1, v)
}
// Convert values to a single string and hash it using MD5
var valuesString string
for _, v := range values {
if v != nil {
valuesString += fmt.Sprintf("%v", v)
}
return 0, "", fmt.Errorf("error executing statement: %v", err)
}
return id, action, nil
hash := md5.Sum([]byte(valuesString))
hashString := hex.EncodeToString(hash[:])
estimatedVehicleJourneyID := values[0]
orderID := values[1]
key := fmt.Sprintf("%v.%v", estimatedVehicleJourneyID, orderID)
// Get the MD5 hash from Valkey
retrievedHash, err := valki.GetValkeyValue(ctx, valkeyClient, key)
if err != nil {
return 0, "", fmt.Errorf("failed to get value from Valkey: %w", err)
}
// Check if the retrieved value matches the original MD5 hash
if retrievedHash != hashString {
query := `
INSERT INTO calls (
estimatedvehiclejourney, "order", stoppointref,
aimeddeparturetime, expecteddeparturetime,
aimedarrivaltime, expectedarrivaltime,
cancellation, estimated_data
)
VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9)
ON CONFLICT (estimatedvehiclejourney, "order")
DO UPDATE SET
stoppointref = EXCLUDED.stoppointref,
aimeddeparturetime = EXCLUDED.aimeddeparturetime,
expecteddeparturetime = EXCLUDED.expecteddeparturetime,
aimedarrivaltime = EXCLUDED.aimedarrivaltime,
expectedarrivaltime = EXCLUDED.expectedarrivaltime,
cancellation = EXCLUDED.cancellation,
estimated_data = EXCLUDED.estimated_data
RETURNING CASE WHEN xmax = 0 THEN 'insert' ELSE 'update' END, id;
`
err = valki.SetValkeyValue(ctx, valkeyClient, key, hashString)
if err != nil {
return 0, "", fmt.Errorf("failed to set value in Valkey: %w", err)
}
var action string
var id int
err = db.QueryRowContext(ctx, query, values...).Scan(&action, &id)
if err != nil {
return 0, "", fmt.Errorf("error executing statement: %w", err)
}
return id, action, nil
}
return 0, "none", nil
}
// BatchInsertEstimatedCalls processes multiple estimated calls concurrently
func BatchInsertEstimatedCalls(ctx context.Context, db *sql.DB, batch [][]interface{}, valkeyClient valkey.Client, workerCount int) ([]CallResult, error) {
if len(batch) == 0 {
return nil, nil
}
results := make([]CallResult, len(batch))
jobs := make(chan int, len(batch))
var wg sync.WaitGroup
// Start workers
for w := 0; w < workerCount; w++ {
wg.Add(1)
go func() {
defer wg.Done()
for idx := range jobs {
select {
case <-ctx.Done():
return
default:
id, action, err := InsertOrUpdateEstimatedCall(ctx, db, batch[idx], valkeyClient)
results[idx] = CallResult{
ID: id,
Action: action,
Error: err,
}
}
}
}()
}
// Send jobs
for i := range batch {
jobs <- i
}
close(jobs)
wg.Wait()
return results, nil
}

View File

@@ -1,41 +1,135 @@
package database
import (
"context"
"database/sql"
"fmt"
"sync"
)
type EVJResult struct {
ID int
Action string
Error error
Index int // To maintain order
}
// PreparedStatements holds reusable prepared statements
type PreparedStatements struct {
evjStmt *sql.Stmt
ecStmt *sql.Stmt
rcStmt *sql.Stmt
mu sync.Mutex
}
func NewPreparedStatements(db *sql.DB) (*PreparedStatements, error) {
evjQuery := `
INSERT INTO estimatedvehiclejourney (servicedelivery, recordedattime, lineref, directionref, datasource, datedvehiclejourneyref, vehiclemode, dataframeref, originref, destinationref, operatorref, vehicleref, cancellation, other, firstservicedelivery)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $1)
ON CONFLICT (lineref, directionref, datasource, datedvehiclejourneyref)
DO UPDATE SET
servicedelivery = EXCLUDED.servicedelivery,
recordedattime = EXCLUDED.recordedattime,
vehiclemode = COALESCE(EXCLUDED.vehiclemode, estimatedvehiclejourney.vehiclemode),
dataframeref = COALESCE(EXCLUDED.dataframeref, estimatedvehiclejourney.dataframeref),
originref = COALESCE(EXCLUDED.originref, estimatedvehiclejourney.originref),
destinationref = COALESCE(EXCLUDED.destinationref, estimatedvehiclejourney.destinationref),
operatorref = COALESCE(EXCLUDED.operatorref, estimatedvehiclejourney.operatorref),
vehicleref = COALESCE(EXCLUDED.vehicleref, estimatedvehiclejourney.vehicleref),
cancellation = COALESCE(EXCLUDED.cancellation, estimatedvehiclejourney.cancellation),
other = COALESCE(EXCLUDED.other, estimatedvehiclejourney.other)
RETURNING CASE WHEN xmax = 0 THEN 'insert' ELSE 'update' END, id;
`
evjStmt, err := db.Prepare(evjQuery)
if err != nil {
return nil, fmt.Errorf("failed to prepare EVJ statement: %w", err)
}
return &PreparedStatements{
evjStmt: evjStmt,
}, nil
}
func (ps *PreparedStatements) Close() {
if ps.evjStmt != nil {
ps.evjStmt.Close()
}
if ps.ecStmt != nil {
ps.ecStmt.Close()
}
if ps.rcStmt != nil {
ps.rcStmt.Close()
}
}
func InsertOrUpdateEstimatedVehicleJourney(db *sql.DB, values []interface{}) (int, string, error) {
query := `
INSERT INTO estimatedvehiclejourney (servicedelivery, recordedattime, lineref, directionref, datasource, datedvehiclejourneyref, vehiclemode, dataframeref, originref, destinationref, operatorref, vehicleref, cancellation, other, firstservicedelivery)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $1)
ON CONFLICT (lineref, directionref, datasource, datedvehiclejourneyref)
DO UPDATE SET
servicedelivery = EXCLUDED.servicedelivery,
recordedattime = EXCLUDED.recordedattime,
vehiclemode = COALESCE(EXCLUDED.vehiclemode, estimatedvehiclejourney.vehiclemode),
dataframeref = COALESCE(EXCLUDED.dataframeref, estimatedvehiclejourney.dataframeref),
originref = COALESCE(EXCLUDED.originref, estimatedvehiclejourney.originref),
destinationref = COALESCE(EXCLUDED.destinationref, estimatedvehiclejourney.destinationref),
operatorref = COALESCE(EXCLUDED.operatorref, estimatedvehiclejourney.operatorref),
vehicleref = COALESCE(EXCLUDED.vehicleref, estimatedvehiclejourney.vehicleref),
cancellation = COALESCE(EXCLUDED.cancellation, estimatedvehiclejourney.cancellation),
other = COALESCE(EXCLUDED.other, estimatedvehiclejourney.other)
RETURNING CASE WHEN xmax = 0 THEN 'insert' ELSE 'update' END, id;
INSERT INTO estimatedvehiclejourney (servicedelivery, recordedattime, lineref, directionref, datasource, datedvehiclejourneyref, vehiclemode, dataframeref, originref, destinationref, operatorref, vehicleref, cancellation, other, firstservicedelivery)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $1)
ON CONFLICT (lineref, directionref, datasource, datedvehiclejourneyref)
DO UPDATE SET
servicedelivery = EXCLUDED.servicedelivery,
recordedattime = EXCLUDED.recordedattime,
vehiclemode = COALESCE(EXCLUDED.vehiclemode, estimatedvehiclejourney.vehiclemode),
dataframeref = COALESCE(EXCLUDED.dataframeref, estimatedvehiclejourney.dataframeref),
originref = COALESCE(EXCLUDED.originref, estimatedvehiclejourney.originref),
destinationref = COALESCE(EXCLUDED.destinationref, estimatedvehiclejourney.destinationref),
operatorref = COALESCE(EXCLUDED.operatorref, estimatedvehiclejourney.operatorref),
vehicleref = COALESCE(EXCLUDED.vehicleref, estimatedvehiclejourney.vehicleref),
cancellation = COALESCE(EXCLUDED.cancellation, estimatedvehiclejourney.cancellation),
other = COALESCE(EXCLUDED.other, estimatedvehiclejourney.other)
RETURNING CASE WHEN xmax = 0 THEN 'insert' ELSE 'update' END, id;
`
stmt, err := db.Prepare(query)
if err != nil {
return 0, "", fmt.Errorf("error preparing statement: %v", err)
}
defer stmt.Close()
var action string
var id int
err = stmt.QueryRow(values...).Scan(&action, &id)
err := db.QueryRow(query, values...).Scan(&action, &id)
if err != nil {
return 0, "", fmt.Errorf("error executing statement: %v", err)
return 0, "", fmt.Errorf("error executing EVJ statement: %w", err)
}
return id, action, nil
}
// BatchInsertEVJ processes multiple EVJ inserts concurrently
func BatchInsertEVJ(ctx context.Context, db *sql.DB, batch [][]interface{}, workerCount int) ([]EVJResult, error) {
if len(batch) == 0 {
return nil, nil
}
results := make([]EVJResult, len(batch))
jobs := make(chan int, len(batch))
var wg sync.WaitGroup
// Start workers
for w := 0; w < workerCount; w++ {
wg.Add(1)
go func() {
defer wg.Done()
for idx := range jobs {
select {
case <-ctx.Done():
return
default:
id, action, err := InsertOrUpdateEstimatedVehicleJourney(db, batch[idx])
results[idx] = EVJResult{
ID: id,
Action: action,
Error: err,
Index: idx,
}
}
}
}()
}
// Send jobs
for i := range batch {
jobs <- i
}
close(jobs)
wg.Wait()
return results, nil
}

View File

@@ -1,11 +1,18 @@
package database
import (
"context"
"crypto/md5"
"database/sql"
"encoding/hex"
"fmt"
"sync"
"ti1/valki"
"github.com/valkey-io/valkey-go"
)
func InsertOrUpdateRecordedCall(db *sql.DB, values []interface{}) (int, string, error) {
func InsertOrUpdateRecordedCall(ctx context.Context, db *sql.DB, values []interface{}, valkeyClient valkey.Client) (int, string, error) {
// Replace empty strings with nil for timestamp fields
for i, v := range values {
if str, ok := v.(string); ok && str == "" {
@@ -13,46 +20,104 @@ func InsertOrUpdateRecordedCall(db *sql.DB, values []interface{}) (int, string,
}
}
query := `
INSERT INTO calls (
estimatedvehiclejourney, "order", stoppointref,
aimeddeparturetime, expecteddeparturetime,
aimedarrivaltime, expectedarrivaltime,
cancellation, actualdeparturetime, actualarrivaltime,
recorded_data
)
VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11)
ON CONFLICT (estimatedvehiclejourney, "order")
DO UPDATE SET
stoppointref = EXCLUDED.stoppointref,
aimeddeparturetime = EXCLUDED.aimeddeparturetime,
expecteddeparturetime = EXCLUDED.expecteddeparturetime,
aimedarrivaltime = EXCLUDED.aimedarrivaltime,
expectedarrivaltime = EXCLUDED.expectedarrivaltime,
cancellation = EXCLUDED.cancellation,
actualdeparturetime = EXCLUDED.actualdeparturetime,
actualarrivaltime = EXCLUDED.actualarrivaltime,
recorded_data = EXCLUDED.recorded_data
RETURNING CASE WHEN xmax = 0 THEN 'insert' ELSE 'update' END, id;
`
stmt, err := db.Prepare(query)
if err != nil {
return 0, "", fmt.Errorf("error preparing statement: %v", err)
}
defer stmt.Close()
var action string
var id int
err = stmt.QueryRow(values...).Scan(&action, &id)
if err != nil {
if 1 == 0 {
fmt.Println("Executing query:", query)
for i, v := range values {
fmt.Printf("Value %d: (%v)\n", i+1, v)
}
// Convert values to a single string and hash it using MD5
var valuesString string
for _, v := range values {
if v != nil {
valuesString += fmt.Sprintf("%v", v)
}
return 0, "", fmt.Errorf("error executing statement: %v", err)
}
return id, action, nil
hash := md5.Sum([]byte(valuesString))
hashString := hex.EncodeToString(hash[:])
estimatedVehicleJourneyID := values[0]
orderID := values[1]
key := fmt.Sprintf("%v.%v", estimatedVehicleJourneyID, orderID)
// Get the MD5 hash from Valkey
retrievedHash, err := valki.GetValkeyValue(ctx, valkeyClient, key)
if err != nil {
return 0, "", fmt.Errorf("failed to get value from Valkey: %w", err)
}
// Check if the retrieved value matches the original MD5 hash
if retrievedHash != hashString {
query := `
INSERT INTO calls (
estimatedvehiclejourney, "order", stoppointref,
aimeddeparturetime, expecteddeparturetime,
aimedarrivaltime, expectedarrivaltime,
cancellation, actualdeparturetime, actualarrivaltime,
recorded_data
)
VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11)
ON CONFLICT (estimatedvehiclejourney, "order")
DO UPDATE SET
stoppointref = EXCLUDED.stoppointref,
aimeddeparturetime = EXCLUDED.aimeddeparturetime,
expecteddeparturetime = EXCLUDED.expecteddeparturetime,
aimedarrivaltime = EXCLUDED.aimedarrivaltime,
expectedarrivaltime = EXCLUDED.expectedarrivaltime,
cancellation = EXCLUDED.cancellation,
actualdeparturetime = EXCLUDED.actualdeparturetime,
actualarrivaltime = EXCLUDED.actualarrivaltime,
recorded_data = EXCLUDED.recorded_data
RETURNING CASE WHEN xmax = 0 THEN 'insert' ELSE 'update' END, id;
`
err = valki.SetValkeyValue(ctx, valkeyClient, key, hashString)
if err != nil {
return 0, "", fmt.Errorf("failed to set value in Valkey: %w", err)
}
var action string
var id int
err = db.QueryRowContext(ctx, query, values...).Scan(&action, &id)
if err != nil {
return 0, "", fmt.Errorf("error executing statement: %w", err)
}
return id, action, nil
}
return 0, "none", nil
}
// BatchInsertRecordedCalls processes multiple recorded calls concurrently
func BatchInsertRecordedCalls(ctx context.Context, db *sql.DB, batch [][]interface{}, valkeyClient valkey.Client, workerCount int) ([]CallResult, error) {
if len(batch) == 0 {
return nil, nil
}
results := make([]CallResult, len(batch))
jobs := make(chan int, len(batch))
var wg sync.WaitGroup
// Start workers
for w := 0; w < workerCount; w++ {
wg.Add(1)
go func() {
defer wg.Done()
for idx := range jobs {
select {
case <-ctx.Done():
return
default:
id, action, err := InsertOrUpdateRecordedCall(ctx, db, batch[idx], valkeyClient)
results[idx] = CallResult{
ID: id,
Action: action,
Error: err,
}
}
}
}()
}
// Send jobs
for i := range batch {
jobs <- i
}
close(jobs)
wg.Wait()
return results, nil
}

View File

@@ -6,25 +6,18 @@ import (
)
func InsertServiceDelivery(db *sql.DB, responseTimestamp string, recordedAtTime string) (int, error) {
fmt.Println("Inserting ServiceDelivery...")
var id int
err := db.QueryRow("INSERT INTO public.ServiceDelivery (ResponseTimestamp, RecordedAtTime) VALUES ($1, $2) RETURNING ID", responseTimestamp, recordedAtTime).Scan(&id)
if err != nil {
fmt.Println(err)
return 0, err
return 0, fmt.Errorf("failed to insert service delivery: %w", err)
}
//fmt.Println("ServiceDelivery inserted successfully! (", id, ")")
return id, nil
}
func UpdateServiceDeliveryData(db *sql.DB, id int, data string) error {
fmt.Println("Updating ServiceDelivery data...")
_, err := db.Exec("UPDATE public.ServiceDelivery SET Data = $1 WHERE ID = $2", data, id)
if err != nil {
fmt.Println(err)
return err
return fmt.Errorf("failed to update service delivery data: %w", err)
}
fmt.Println("Finished with this ServiceDelivery!")
return nil
}

View File

@@ -1,16 +1,26 @@
package export
import (
"context"
"encoding/json"
"fmt"
"log"
"strings"
"sync"
"sync/atomic"
"ti1/config"
"ti1/data"
"ti1/database"
"time"
)
// DBData is the main entry point for data processing
func DBData(data *data.Data) {
DBDataOptimized(data)
}
// DBDataOptimized processes data with concurrent workers for better performance
func DBDataOptimized(data *data.Data) {
fmt.Println(data.ServiceDelivery.ResponseTimestamp)
fmt.Println(data.ServiceDelivery.EstimatedTimetableDelivery[0].EstimatedJourneyVersionFrame.RecordedAtTime)
@@ -20,6 +30,15 @@ func DBData(data *data.Data) {
}
defer db.Close()
// Connect to Valkey
valkeyClient, err := config.ConnectToValkey("config/conf.json")
if err != nil {
log.Fatalf("Failed to connect to Valkey: %v", err)
}
defer config.DisconnectFromValkey(valkeyClient)
ctx := context.Background()
// Get service id aka sid
sid, err := database.InsertServiceDelivery(db, data.ServiceDelivery.ResponseTimestamp, data.ServiceDelivery.EstimatedTimetableDelivery[0].EstimatedJourneyVersionFrame.RecordedAtTime)
if err != nil {
@@ -27,385 +46,440 @@ func DBData(data *data.Data) {
}
fmt.Println("SID:", sid)
// counters
var insertCount, updateCount, totalCount, estimatedCallInsertCount, estimatedCallUpdateCount, recordedCallInsertCount, recordedCallUpdateCount int
// Record start time
startTime := time.Now()
for _, journey := range data.ServiceDelivery.EstimatedTimetableDelivery[0].EstimatedJourneyVersionFrame.EstimatedVehicleJourney {
var values []interface{}
var datedVehicleJourneyRef, otherJson string
// Atomic counters for thread-safe counting
var insertCount, updateCount, estimatedCallInsertCount, estimatedCallUpdateCount, estimatedCallNoneCount, recordedCallInsertCount, recordedCallUpdateCount, recordedCallNoneCount int64
values = append(values, sid)
values = append(values, journey.RecordedAtTime)
values = append(values, journey.LineRef)
//had to add to lowercase cus some values vary in case and it was causing duplicates
values = append(values, strings.ToLower(journey.DirectionRef))
values = append(values, journey.DataSource)
if journey.FramedVehicleJourneyRef.DatedVehicleJourneyRef != "" {
datedVehicleJourneyRef = journey.FramedVehicleJourneyRef.DatedVehicleJourneyRef
} else if journey.DatedVehicleJourneyRef != "" {
datedVehicleJourneyRef = journey.DatedVehicleJourneyRef
} else {
datedVehicleJourneyRef = "evj." + journey.EstimatedVehicleJourneyCode
}
values = append(values, datedVehicleJourneyRef)
values = append(values, journey.VehicleMode)
values = append(values, journey.FramedVehicleJourneyRef.DataFrameRef)
values = append(values, journey.OriginRef)
values = append(values, journey.DestinationRef)
values = append(values, journey.OperatorRef)
values = append(values, journey.VehicleRef)
values = append(values, journey.Cancellation)
// Create a map to hold the JSON object for the current journey
jsonObject := make(map[string]interface{})
// Add relevant fields to the JSON object
if journey.OriginName != "" {
jsonObject["OriginName"] = journey.OriginName
}
if journey.DestinationName != "" {
jsonObject["DestinationName"] = journey.DestinationName
}
if journey.ProductCategoryRef != "" {
jsonObject["ProductCategoryRef"] = journey.ProductCategoryRef
}
if journey.ServiceFeatureRef != "" {
jsonObject["ServiceFeatureRef"] = journey.ServiceFeatureRef
}
if journey.Monitored != "" {
jsonObject["Monitored"] = journey.Monitored
}
if journey.JourneyPatternRef != "" {
jsonObject["JourneyPatternRef"] = journey.JourneyPatternRef
}
if journey.JourneyPatternName != "" {
jsonObject["JourneyPatternName"] = journey.JourneyPatternName
}
if journey.PublishedLineName != "" {
jsonObject["PublishedLineName"] = journey.PublishedLineName
}
if journey.DirectionName != "" {
jsonObject["DirectionName"] = journey.DirectionName
}
if journey.OriginAimedDepartureTime != "" {
jsonObject["OriginAimedDepartureTime"] = journey.OriginAimedDepartureTime
}
if journey.DestinationAimedArrivalTime != "" {
jsonObject["DestinationAimedArrivalTime"] = journey.DestinationAimedArrivalTime
}
if journey.BlockRef != "" {
jsonObject["BlockRef"] = journey.BlockRef
}
if journey.VehicleJourneyRef != "" {
jsonObject["VehicleJourneyRef"] = journey.VehicleJourneyRef
}
if journey.Occupancy != "" {
jsonObject["Occupancy"] = journey.Occupancy
}
if journey.DestinationDisplayAtOrigin != "" {
jsonObject["DestinationDisplayAtOrigin"] = journey.DestinationDisplayAtOrigin
}
if journey.ExtraJourney != "" {
jsonObject["ExtraJourney"] = journey.ExtraJourney
}
if journey.RouteRef != "" {
jsonObject["RouteRef"] = journey.RouteRef
}
if journey.GroupOfLinesRef != "" {
jsonObject["GroupOfLinesRef"] = journey.GroupOfLinesRef
}
if journey.ExternalLineRef != "" {
jsonObject["ExternalLineRef"] = journey.ExternalLineRef
}
if journey.InCongestion != "" {
jsonObject["InCongestion"] = journey.InCongestion
}
if journey.PredictionInaccurate != "" {
jsonObject["PredictionInaccurate"] = journey.PredictionInaccurate
}
if journey.JourneyNote != "" {
jsonObject["JourneyNote"] = journey.JourneyNote
}
if journey.Via.PlaceName != "" {
jsonObject["Via"] = journey.Via.PlaceName
}
// Convert the JSON object to a JSON string
jsonString, err := json.Marshal(jsonObject)
if err != nil {
log.Fatal(err)
}
otherJson = string(jsonString)
values = append(values, otherJson)
// Insert or update the record
id, action, err := database.InsertOrUpdateEstimatedVehicleJourney(db, values)
if err != nil {
fmt.Printf("Error inserting/updating estimated vehicle journey: %v\n", err)
} else {
if 1 == 0 {
fmt.Printf("Action: %s, ID: %d\n", action, id)
}
if action == "insert" {
insertCount++
} else if action == "update" {
updateCount++
}
totalCount = insertCount + updateCount
//fmt.Printf("Inserts: %d, Updates: %d, Total: %d\n", insertCount, updateCount, totalCount)
if totalCount%1000 == 0 {
fmt.Printf(
"Inserts: %d, Updates: %d, Total: %d; estimatedCalls = I: %d U: %d; recordedCalls = I: %d U: %d\n",
insertCount,
updateCount,
totalCount,
estimatedCallInsertCount,
estimatedCallUpdateCount,
recordedCallInsertCount,
recordedCallUpdateCount,
)
}
}
for _, estimatedCall := range journey.EstimatedCalls {
for _, call := range estimatedCall.EstimatedCall {
var estimatedValues []interface{}
//1 estimatedvehiclejourney
estimatedValues = append(estimatedValues, id)
//2 order
estimatedValues = append(estimatedValues, call.Order)
//3 stoppointref
estimatedValues = append(estimatedValues, call.StopPointRef)
//4 aimeddeparturetime
estimatedValues = append(estimatedValues, call.AimedDepartureTime)
//5 expecteddeparturetime
estimatedValues = append(estimatedValues, call.ExpectedDepartureTime)
//6 aimedarrivaltime
estimatedValues = append(estimatedValues, call.AimedArrivalTime)
//7 expectedarrivaltime
estimatedValues = append(estimatedValues, call.ExpectedArrivalTime)
//8 cancellation
estimatedValues = append(estimatedValues, call.Cancellation)
//9 estimated_data (JSON)
estimatedJsonObject := make(map[string]interface{})
// data allrady loged
if call.ExpectedDepartureTime != "" {
estimatedJsonObject["ExpectedDepartureTime"] = call.ExpectedDepartureTime
}
if call.ExpectedArrivalTime != "" {
estimatedJsonObject["ExpectedArrivalTime"] = call.ExpectedArrivalTime
}
if call.Cancellation != "" {
estimatedJsonObject["Cancellation"] = call.Cancellation
}
// The rest
if call.StopPointName != "" {
estimatedJsonObject["StopPointName"] = call.StopPointName
}
if call.RequestStop != "" {
estimatedJsonObject["RequestStop"] = call.RequestStop
}
if call.DepartureStatus != "" {
estimatedJsonObject["DepartureStatus"] = call.DepartureStatus
}
if call.DeparturePlatformName != "" {
estimatedJsonObject["DeparturePlatformName"] = call.DeparturePlatformName
}
if call.DepartureBoardingActivity != "" {
estimatedJsonObject["DepartureBoardingActivity"] = call.DepartureBoardingActivity
}
if call.DepartureStopAssignment.AimedQuayRef != "" {
estimatedJsonObject["DepartureStopAssignment.AimedQuayRef"] = call.DepartureStopAssignment.AimedQuayRef
}
if call.DepartureStopAssignment.ExpectedQuayRef != "" {
estimatedJsonObject["DepartureStopAssignment.ExpectedQuayRef"] = call.DepartureStopAssignment.ExpectedQuayRef
}
if call.DepartureStopAssignment.ActualQuayRef != "" {
estimatedJsonObject["DepartureStopAssignment.ActualQuayRef"] = call.DepartureStopAssignment.ActualQuayRef
}
if call.Extensions.StopsAtAirport != "" {
estimatedJsonObject["Extensions.StopsAtAirport"] = call.Extensions.StopsAtAirport
}
if call.ArrivalStatus != "" {
estimatedJsonObject["ArrivalStatus"] = call.ArrivalStatus
}
if call.ArrivalPlatformName != "" {
estimatedJsonObject["ArrivalPlatformName"] = call.ArrivalPlatformName
}
if call.ArrivalBoardingActivity != "" {
estimatedJsonObject["ArrivalBoardingActivity"] = call.ArrivalBoardingActivity
}
if call.ArrivalStopAssignment.AimedQuayRef != "" {
estimatedJsonObject["ArrivalStopAssignment.AimedQuayRef"] = call.ArrivalStopAssignment.AimedQuayRef
}
if call.ArrivalStopAssignment.ExpectedQuayRef != "" {
estimatedJsonObject["ArrivalStopAssignment.ExpectedQuayRef"] = call.ArrivalStopAssignment.ExpectedQuayRef
}
if call.ArrivalStopAssignment.ActualQuayRef != "" {
estimatedJsonObject["ArrivalStopAssignment.ActualQuayRef"] = call.ArrivalStopAssignment.ActualQuayRef
}
if call.CallNote != "" {
estimatedJsonObject["CallNote"] = call.CallNote
}
if call.DestinationDisplay != "" {
estimatedJsonObject["DestinationDisplay"] = call.DestinationDisplay
}
if call.ExpectedDeparturePredictionQuality.PredictionLevel != "" {
estimatedJsonObject["ExpectedDeparturePredictionQuality.PredictionLevel"] = call.ExpectedDeparturePredictionQuality.PredictionLevel
}
if call.ExpectedArrivalPredictionQuality.PredictionLevel != "" {
estimatedJsonObject["ExpectedArrivalPredictionQuality.PredictionLevel"] = call.ExpectedArrivalPredictionQuality.PredictionLevel
}
if call.TimingPoint != "" {
estimatedJsonObject["TimingPoint"] = call.TimingPoint
}
if call.SituationRef != "" {
estimatedJsonObject["SituationRef"] = call.SituationRef
}
if call.PredictionInaccurate != "" {
estimatedJsonObject["PredictionInaccurate"] = call.PredictionInaccurate
}
if call.Occupancy != "" {
estimatedJsonObject["Occupancy"] = call.Occupancy
}
// Convert the JSON object to a JSON string
jsonString, err := json.Marshal(estimatedJsonObject)
if err != nil {
log.Fatal(err)
}
estimatedValues = append(estimatedValues, string(jsonString))
// Insert or update the record
stringValues := make([]string, len(estimatedValues))
for i, v := range estimatedValues {
stringValues[i] = fmt.Sprintf("%v", v)
}
interfaceValues := make([]interface{}, len(stringValues))
for i, v := range stringValues {
interfaceValues[i] = v
}
id, action, err := database.InsertOrUpdateEstimatedCall(db, interfaceValues)
if err != nil {
fmt.Printf("Error inserting/updating estimated call: %v\n", err)
} else {
if 1 == 0 {
fmt.Printf("Action: %s, ID: %d\n", action, id)
}
if action == "insert" {
estimatedCallInsertCount++
} else if action == "update" {
estimatedCallUpdateCount++
}
}
}
}
for _, recordedCall := range journey.RecordedCalls {
for _, call := range recordedCall.RecordedCall {
var recordedValues []interface{}
//1 estimatedvehiclejourney
recordedValues = append(recordedValues, id)
//2 order
recordedValues = append(recordedValues, call.Order)
//3 stoppointref
recordedValues = append(recordedValues, call.StopPointRef)
//4 aimeddeparturetime
recordedValues = append(recordedValues, call.AimedDepartureTime)
//5 expecteddeparturetime
recordedValues = append(recordedValues, call.ExpectedDepartureTime)
//6 aimedarrivaltime
recordedValues = append(recordedValues, call.AimedArrivalTime)
//7 expectedarrivaltime
recordedValues = append(recordedValues, call.ExpectedArrivalTime)
//8 cancellation
recordedValues = append(recordedValues, call.Cancellation)
//9 actualdeparturetime
recordedValues = append(recordedValues, call.ActualDepartureTime)
//10 actualarrivaltime
recordedValues = append(recordedValues, call.ActualArrivalTime)
//11 recorded_data (JSON)
recordedJsonObject := make(map[string]interface{})
if call.StopPointName != "" {
recordedJsonObject["StopPointName"] = call.StopPointName
}
if call.ArrivalPlatformName != "" {
recordedJsonObject["ArrivalPlatformName"] = call.ArrivalPlatformName
}
if call.DeparturePlatformName != "" {
recordedJsonObject["DeparturePlatformName"] = call.DeparturePlatformName
}
if call.PredictionInaccurate != "" {
recordedJsonObject["PredictionInaccurate"] = call.PredictionInaccurate
}
if call.Occupancy != "" {
recordedJsonObject["Occupancy"] = call.Occupancy
}
// Convert the JSON object to a JSON string
jsonString, err := json.Marshal(recordedJsonObject)
if err != nil {
log.Fatal(err)
}
recordedValues = append(recordedValues, string(jsonString))
// Insert or update the record
stringValues := make([]string, len(recordedValues))
for i, v := range recordedValues {
stringValues[i] = fmt.Sprintf("%v", v)
}
interfaceValues := make([]interface{}, len(stringValues))
for i, v := range stringValues {
interfaceValues[i] = v
}
id, action, err := database.InsertOrUpdateRecordedCall(db, interfaceValues)
if err != nil {
fmt.Printf("Error inserting/updating recorded call: %v\n", err)
} else {
if 1 == 0 {
fmt.Printf("Action: %s, ID: %d\n", action, id)
}
if action == "insert" {
recordedCallInsertCount++
//fmt.Printf("Action: %s, ID: %d\n", action, id)
} else if action == "update" {
recordedCallUpdateCount++
}
}
}
}
journeys := data.ServiceDelivery.EstimatedTimetableDelivery[0].EstimatedJourneyVersionFrame.EstimatedVehicleJourney
totalJourneys := len(journeys)
fmt.Printf("Processing %d journeys...\n", totalJourneys)
// Job structures
type evjJob struct {
index int
}
type callJob struct {
evjID int
values []interface{}
}
// Channels
workerCount := 20 // Adjust based on your database and CPU
evjJobs := make(chan evjJob, workerCount*2)
estimatedCallJobs := make(chan callJob, workerCount*10)
recordedCallJobs := make(chan callJob, workerCount*10)
var wg sync.WaitGroup
var callWg sync.WaitGroup
// Start Estimated Call workers
for w := 0; w < workerCount; w++ {
callWg.Add(1)
go func() {
defer callWg.Done()
for job := range estimatedCallJobs {
id, action, err := database.InsertOrUpdateEstimatedCall(ctx, db, job.values, valkeyClient)
if err != nil {
log.Printf("Error inserting/updating estimated call: %v\n", err)
continue
}
if action == "insert" {
atomic.AddInt64(&estimatedCallInsertCount, 1)
} else if action == "update" {
atomic.AddInt64(&estimatedCallUpdateCount, 1)
} else if action == "none" {
atomic.AddInt64(&estimatedCallNoneCount, 1)
}
_ = id
}
}()
}
// Start Recorded Call workers
for w := 0; w < workerCount; w++ {
callWg.Add(1)
go func() {
defer callWg.Done()
for job := range recordedCallJobs {
id, action, err := database.InsertOrUpdateRecordedCall(ctx, db, job.values, valkeyClient)
if err != nil {
log.Printf("Error inserting/updating recorded call: %v\n", err)
continue
}
if action == "insert" {
atomic.AddInt64(&recordedCallInsertCount, 1)
} else if action == "update" {
atomic.AddInt64(&recordedCallUpdateCount, 1)
} else if action == "none" {
atomic.AddInt64(&recordedCallNoneCount, 1)
}
_ = id
}
}()
}
// Start EVJ workers
for w := 0; w < workerCount; w++ {
wg.Add(1)
go func() {
defer wg.Done()
for job := range evjJobs {
journey := &journeys[job.index]
// Prepare values
var values []interface{}
var datedVehicleJourneyRef, otherJson string
values = append(values, sid)
values = append(values, journey.RecordedAtTime)
values = append(values, journey.LineRef)
values = append(values, strings.ToLower(journey.DirectionRef))
values = append(values, journey.DataSource)
if journey.FramedVehicleJourneyRef.DatedVehicleJourneyRef != "" {
datedVehicleJourneyRef = journey.FramedVehicleJourneyRef.DatedVehicleJourneyRef
} else if journey.DatedVehicleJourneyRef != "" {
datedVehicleJourneyRef = journey.DatedVehicleJourneyRef
} else {
datedVehicleJourneyRef = "evj." + journey.EstimatedVehicleJourneyCode
}
values = append(values, datedVehicleJourneyRef)
values = append(values, journey.VehicleMode)
values = append(values, journey.FramedVehicleJourneyRef.DataFrameRef)
values = append(values, journey.OriginRef)
values = append(values, journey.DestinationRef)
values = append(values, journey.OperatorRef)
values = append(values, journey.VehicleRef)
values = append(values, journey.Cancellation)
// Create JSON object
jsonObject := make(map[string]interface{})
if journey.OriginName != "" {
jsonObject["OriginName"] = journey.OriginName
}
if journey.DestinationName != "" {
jsonObject["DestinationName"] = journey.DestinationName
}
if journey.ProductCategoryRef != "" {
jsonObject["ProductCategoryRef"] = journey.ProductCategoryRef
}
if journey.ServiceFeatureRef != "" {
jsonObject["ServiceFeatureRef"] = journey.ServiceFeatureRef
}
if journey.Monitored != "" {
jsonObject["Monitored"] = journey.Monitored
}
if journey.JourneyPatternRef != "" {
jsonObject["JourneyPatternRef"] = journey.JourneyPatternRef
}
if journey.JourneyPatternName != "" {
jsonObject["JourneyPatternName"] = journey.JourneyPatternName
}
if journey.PublishedLineName != "" {
jsonObject["PublishedLineName"] = journey.PublishedLineName
}
if journey.DirectionName != "" {
jsonObject["DirectionName"] = journey.DirectionName
}
if journey.OriginAimedDepartureTime != "" {
jsonObject["OriginAimedDepartureTime"] = journey.OriginAimedDepartureTime
}
if journey.DestinationAimedArrivalTime != "" {
jsonObject["DestinationAimedArrivalTime"] = journey.DestinationAimedArrivalTime
}
if journey.BlockRef != "" {
jsonObject["BlockRef"] = journey.BlockRef
}
if journey.VehicleJourneyRef != "" {
jsonObject["VehicleJourneyRef"] = journey.VehicleJourneyRef
}
if journey.Occupancy != "" {
jsonObject["Occupancy"] = journey.Occupancy
}
if journey.DestinationDisplayAtOrigin != "" {
jsonObject["DestinationDisplayAtOrigin"] = journey.DestinationDisplayAtOrigin
}
if journey.ExtraJourney != "" {
jsonObject["ExtraJourney"] = journey.ExtraJourney
}
if journey.RouteRef != "" {
jsonObject["RouteRef"] = journey.RouteRef
}
if journey.GroupOfLinesRef != "" {
jsonObject["GroupOfLinesRef"] = journey.GroupOfLinesRef
}
if journey.ExternalLineRef != "" {
jsonObject["ExternalLineRef"] = journey.ExternalLineRef
}
if journey.InCongestion != "" {
jsonObject["InCongestion"] = journey.InCongestion
}
if journey.PredictionInaccurate != "" {
jsonObject["PredictionInaccurate"] = journey.PredictionInaccurate
}
if journey.JourneyNote != "" {
jsonObject["JourneyNote"] = journey.JourneyNote
}
if journey.Via.PlaceName != "" {
jsonObject["Via"] = journey.Via.PlaceName
}
jsonString, err := json.Marshal(jsonObject)
if err != nil {
log.Printf("Error marshaling JSON: %v\n", err)
continue
}
otherJson = string(jsonString)
values = append(values, otherJson)
// Insert or update EVJ
id, action, err := database.InsertOrUpdateEstimatedVehicleJourney(db, values)
if err != nil {
log.Printf("Error inserting/updating estimated vehicle journey: %v\n", err)
continue
}
if action == "insert" {
atomic.AddInt64(&insertCount, 1)
} else if action == "update" {
atomic.AddInt64(&updateCount, 1)
}
// Progress reporting
total := atomic.AddInt64(&insertCount, 0) + atomic.AddInt64(&updateCount, 0)
if total%1000 == 0 {
fmt.Printf(
"EVJ - I: %d, U: %d, Total: %d; EstCalls - I: %d U: %d N: %d; RecCalls - I: %d U: %d N: %d\n",
atomic.LoadInt64(&insertCount),
atomic.LoadInt64(&updateCount),
total,
atomic.LoadInt64(&estimatedCallInsertCount),
atomic.LoadInt64(&estimatedCallUpdateCount),
atomic.LoadInt64(&estimatedCallNoneCount),
atomic.LoadInt64(&recordedCallInsertCount),
atomic.LoadInt64(&recordedCallUpdateCount),
atomic.LoadInt64(&recordedCallNoneCount),
)
}
// Process Estimated Calls
for _, estimatedCall := range journey.EstimatedCalls {
for _, call := range estimatedCall.EstimatedCall {
var estimatedValues []interface{}
estimatedValues = append(estimatedValues, id)
estimatedValues = append(estimatedValues, call.Order)
estimatedValues = append(estimatedValues, call.StopPointRef)
estimatedValues = append(estimatedValues, call.AimedDepartureTime)
estimatedValues = append(estimatedValues, call.ExpectedDepartureTime)
estimatedValues = append(estimatedValues, call.AimedArrivalTime)
estimatedValues = append(estimatedValues, call.ExpectedArrivalTime)
estimatedValues = append(estimatedValues, call.Cancellation)
// estimated_data JSON
estimatedJsonObject := make(map[string]interface{})
if call.ExpectedDepartureTime != "" {
estimatedJsonObject["ExpectedDepartureTime"] = call.ExpectedDepartureTime
}
if call.ExpectedArrivalTime != "" {
estimatedJsonObject["ExpectedArrivalTime"] = call.ExpectedArrivalTime
}
if call.Cancellation != "" {
estimatedJsonObject["Cancellation"] = call.Cancellation
}
if call.StopPointName != "" {
estimatedJsonObject["StopPointName"] = call.StopPointName
}
if call.RequestStop != "" {
estimatedJsonObject["RequestStop"] = call.RequestStop
}
if call.DepartureStatus != "" {
estimatedJsonObject["DepartureStatus"] = call.DepartureStatus
}
if call.DeparturePlatformName != "" {
estimatedJsonObject["DeparturePlatformName"] = call.DeparturePlatformName
}
if call.DepartureBoardingActivity != "" {
estimatedJsonObject["DepartureBoardingActivity"] = call.DepartureBoardingActivity
}
if call.DepartureStopAssignment.AimedQuayRef != "" {
estimatedJsonObject["DepartureStopAssignment.AimedQuayRef"] = call.DepartureStopAssignment.AimedQuayRef
}
if call.DepartureStopAssignment.ExpectedQuayRef != "" {
estimatedJsonObject["DepartureStopAssignment.ExpectedQuayRef"] = call.DepartureStopAssignment.ExpectedQuayRef
}
if call.DepartureStopAssignment.ActualQuayRef != "" {
estimatedJsonObject["DepartureStopAssignment.ActualQuayRef"] = call.DepartureStopAssignment.ActualQuayRef
}
if call.Extensions.StopsAtAirport != "" {
estimatedJsonObject["Extensions.StopsAtAirport"] = call.Extensions.StopsAtAirport
}
if call.ArrivalStatus != "" {
estimatedJsonObject["ArrivalStatus"] = call.ArrivalStatus
}
if call.ArrivalPlatformName != "" {
estimatedJsonObject["ArrivalPlatformName"] = call.ArrivalPlatformName
}
if call.ArrivalBoardingActivity != "" {
estimatedJsonObject["ArrivalBoardingActivity"] = call.ArrivalBoardingActivity
}
if call.ArrivalStopAssignment.AimedQuayRef != "" {
estimatedJsonObject["ArrivalStopAssignment.AimedQuayRef"] = call.ArrivalStopAssignment.AimedQuayRef
}
if call.ArrivalStopAssignment.ExpectedQuayRef != "" {
estimatedJsonObject["ArrivalStopAssignment.ExpectedQuayRef"] = call.ArrivalStopAssignment.ExpectedQuayRef
}
if call.ArrivalStopAssignment.ActualQuayRef != "" {
estimatedJsonObject["ArrivalStopAssignment.ActualQuayRef"] = call.ArrivalStopAssignment.ActualQuayRef
}
if call.CallNote != "" {
estimatedJsonObject["CallNote"] = call.CallNote
}
if call.DestinationDisplay != "" {
estimatedJsonObject["DestinationDisplay"] = call.DestinationDisplay
}
if call.ExpectedDeparturePredictionQuality.PredictionLevel != "" {
estimatedJsonObject["ExpectedDeparturePredictionQuality.PredictionLevel"] = call.ExpectedDeparturePredictionQuality.PredictionLevel
}
if call.ExpectedArrivalPredictionQuality.PredictionLevel != "" {
estimatedJsonObject["ExpectedArrivalPredictionQuality.PredictionLevel"] = call.ExpectedArrivalPredictionQuality.PredictionLevel
}
if call.TimingPoint != "" {
estimatedJsonObject["TimingPoint"] = call.TimingPoint
}
if call.SituationRef != "" {
estimatedJsonObject["SituationRef"] = call.SituationRef
}
if call.PredictionInaccurate != "" {
estimatedJsonObject["PredictionInaccurate"] = call.PredictionInaccurate
}
if call.Occupancy != "" {
estimatedJsonObject["Occupancy"] = call.Occupancy
}
jsonString, err := json.Marshal(estimatedJsonObject)
if err != nil {
log.Printf("Error marshaling estimated call JSON: %v\n", err)
continue
}
estimatedValues = append(estimatedValues, string(jsonString))
// Convert to string values
interfaceValues := make([]interface{}, len(estimatedValues))
for i, v := range estimatedValues {
interfaceValues[i] = fmt.Sprintf("%v", v)
}
// Send to worker pool
estimatedCallJobs <- callJob{evjID: id, values: interfaceValues}
}
}
// Process Recorded Calls
for _, recordedCall := range journey.RecordedCalls {
for _, call := range recordedCall.RecordedCall {
var recordedValues []interface{}
recordedValues = append(recordedValues, id)
recordedValues = append(recordedValues, call.Order)
recordedValues = append(recordedValues, call.StopPointRef)
recordedValues = append(recordedValues, call.AimedDepartureTime)
recordedValues = append(recordedValues, call.ExpectedDepartureTime)
recordedValues = append(recordedValues, call.AimedArrivalTime)
recordedValues = append(recordedValues, call.ExpectedArrivalTime)
recordedValues = append(recordedValues, call.Cancellation)
recordedValues = append(recordedValues, call.ActualDepartureTime)
recordedValues = append(recordedValues, call.ActualArrivalTime)
// recorded_data JSON
recordedJsonObject := make(map[string]interface{})
if call.StopPointName != "" {
recordedJsonObject["StopPointName"] = call.StopPointName
}
if call.ArrivalPlatformName != "" {
recordedJsonObject["ArrivalPlatformName"] = call.ArrivalPlatformName
}
if call.DeparturePlatformName != "" {
recordedJsonObject["DeparturePlatformName"] = call.DeparturePlatformName
}
if call.PredictionInaccurate != "" {
recordedJsonObject["PredictionInaccurate"] = call.PredictionInaccurate
}
if call.Occupancy != "" {
recordedJsonObject["Occupancy"] = call.Occupancy
}
jsonString, err := json.Marshal(recordedJsonObject)
if err != nil {
log.Printf("Error marshaling recorded call JSON: %v\n", err)
continue
}
recordedValues = append(recordedValues, string(jsonString))
// Convert to string values
interfaceValues := make([]interface{}, len(recordedValues))
for i, v := range recordedValues {
interfaceValues[i] = fmt.Sprintf("%v", v)
}
// Send to worker pool
recordedCallJobs <- callJob{evjID: id, values: interfaceValues}
}
}
}
}()
}
// Send all EVJ jobs
for i := range journeys {
evjJobs <- evjJob{index: i}
}
close(evjJobs)
// Wait for EVJ processing to complete
wg.Wait()
// Close call job channels and wait for call processing to complete
close(estimatedCallJobs)
close(recordedCallJobs)
callWg.Wait()
// Record end time
endTime := time.Now()
// Print final stats
fmt.Printf(
"DONE: Inserts: %d, Updates: %d, Total: %d; estimatedCalls = I: %d U: %d; recordedCalls = I: %d U: %d\n",
insertCount,
updateCount,
totalCount,
estimatedCallInsertCount,
estimatedCallUpdateCount,
recordedCallInsertCount,
recordedCallUpdateCount,
"\nDONE: EVJ - Inserts: %d, Updates: %d, Total: %d\n"+
" EstimatedCalls - I: %d U: %d N: %d\n"+
" RecordedCalls - I: %d U: %d N: %d\n",
atomic.LoadInt64(&insertCount),
atomic.LoadInt64(&updateCount),
atomic.LoadInt64(&insertCount)+atomic.LoadInt64(&updateCount),
atomic.LoadInt64(&estimatedCallInsertCount),
atomic.LoadInt64(&estimatedCallUpdateCount),
atomic.LoadInt64(&estimatedCallNoneCount),
atomic.LoadInt64(&recordedCallInsertCount),
atomic.LoadInt64(&recordedCallUpdateCount),
atomic.LoadInt64(&recordedCallNoneCount),
)
// Create map to hold JSON
serviceDeliveryJsonObject := make(map[string]interface{})
// Add fields to JSON
serviceDeliveryJsonObject["Inserts"] = insertCount
serviceDeliveryJsonObject["Updates"] = updateCount
serviceDeliveryJsonObject["EstimatedCallInserts"] = estimatedCallInsertCount
serviceDeliveryJsonObject["EstimatedCallUpdates"] = estimatedCallUpdateCount
serviceDeliveryJsonObject["RecordedCallInserts"] = recordedCallInsertCount
serviceDeliveryJsonObject["RecordedCallUpdates"] = recordedCallUpdateCount
serviceDeliveryJsonObject["Inserts"] = atomic.LoadInt64(&insertCount)
serviceDeliveryJsonObject["Updates"] = atomic.LoadInt64(&updateCount)
serviceDeliveryJsonObject["EstimatedCallInserts"] = atomic.LoadInt64(&estimatedCallInsertCount)
serviceDeliveryJsonObject["EstimatedCallUpdates"] = atomic.LoadInt64(&estimatedCallUpdateCount)
serviceDeliveryJsonObject["EstimatedCallNone"] = atomic.LoadInt64(&estimatedCallNoneCount)
serviceDeliveryJsonObject["RecordedCallInserts"] = atomic.LoadInt64(&recordedCallInsertCount)
serviceDeliveryJsonObject["RecordedCallUpdates"] = atomic.LoadInt64(&recordedCallUpdateCount)
serviceDeliveryJsonObject["RecordedCallNone"] = atomic.LoadInt64(&recordedCallNoneCount)
serviceDeliveryJsonObject["StartTime"] = startTime.Format(time.RFC3339)
serviceDeliveryJsonObject["EndTime"] = endTime.Format(time.RFC3339)
serviceDeliveryJsonObject["Duration"] = endTime.Sub(startTime).String()
// Convert JSON object to JSON string
serviceDeliveryJsonString, err := json.Marshal(serviceDeliveryJsonObject)
@@ -418,4 +492,6 @@ func DBData(data *data.Data) {
if err != nil {
log.Fatal(err)
}
fmt.Println("Finished with this ServiceDelivery!")
}

9
go.mod
View File

@@ -1,5 +1,10 @@
module ti1
go 1.22.1
go 1.26.0
require github.com/lib/pq v1.10.9
require (
github.com/lib/pq v1.11.2
github.com/valkey-io/valkey-go v1.0.71
)
require golang.org/x/sys v0.41.0 // indirect

10
go.sum
View File

@@ -1,2 +1,12 @@
github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw=
github.com/lib/pq v1.10.9/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o=
github.com/lib/pq v1.11.2 h1:x6gxUeu39V0BHZiugWe8LXZYZ+Utk7hSJGThs8sdzfs=
github.com/lib/pq v1.11.2/go.mod h1:/p+8NSbOcwzAEI7wiMXFlgydTwcgTr3OSKMsD2BitpA=
github.com/valkey-io/valkey-go v1.0.52 h1:ojrR736satGucqpllYzal8fUrNNROc11V10zokAyIYg=
github.com/valkey-io/valkey-go v1.0.52/go.mod h1:BXlVAPIL9rFQinSFM+N32JfWzfCaUAqBpZkc4vPY6fM=
github.com/valkey-io/valkey-go v1.0.71 h1:tuKjGVLd7/I8CyUwqAq5EaD7isxQdlvJzXo3jS8pZW0=
github.com/valkey-io/valkey-go v1.0.71/go.mod h1:VGhZ6fs68Qrn2+OhH+6waZH27bjpgQOiLyUQyXuYK5k=
golang.org/x/sys v0.24.0 h1:Twjiwq9dn6R1fQcyiK+wQyHWfaz/BJB+YIpzU/Cv3Xg=
golang.org/x/sys v0.24.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.41.0 h1:Ivj+2Cp/ylzLiEU89QhWblYnOE9zerudt9Ftecq2C6k=
golang.org/x/sys v0.41.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks=

13
main.go
View File

@@ -9,6 +9,7 @@ import (
)
func main() {
log.Println("ti1 v1.1.0")
log.Println("Starting...")
// Setup the database
@@ -17,10 +18,14 @@ func main() {
log.Fatalf("Database setup failed: %v", err)
}
// Get the current timestamp
starttimestamp := time.Now().Format("20060102T150405")
log.Printf("Starting timestamp: %s", starttimestamp)
for {
start := time.Now()
data, err := data.FetchData()
data, err := data.FetchData(starttimestamp)
if err != nil {
log.Fatal(err)
}
@@ -29,9 +34,9 @@ func main() {
log.Println("finished in", time.Since(start))
elapsed := time.Since(start)
if elapsed < 5*time.Minute {
log.Printf("starting again in %v", 5*time.Minute-elapsed)
time.Sleep(1*time.Minute - elapsed)
if elapsed < 20*time.Second {
log.Printf("starting again in %v", 20*time.Second-elapsed)
time.Sleep(20*time.Second - elapsed)
}
}
}

26
valki/commands.go Normal file
View File

@@ -0,0 +1,26 @@
package valki
import (
"context"
"fmt"
"time"
"github.com/valkey-io/valkey-go"
)
func SetValkeyValue(ctx context.Context, client valkey.Client, key, value string) error {
err := client.Do(ctx, client.B().Set().Key(key).Value(value).Ex(time.Hour).Build()).Error()
if err != nil {
return fmt.Errorf("failed to set value in Valkey: %v", err)
}
return nil
}
func GetValkeyValue(ctx context.Context, client valkey.Client, key string) (string, error) {
value, err := client.Do(ctx, client.B().Get().Key(key).Build()).ToString()
if err != nil {
return "hehe", nil
//return "", fmt.Errorf("failed to get value from Valkey: %v", err)
}
return value, nil
}