2 Commits

Author SHA1 Message Date
pigwin-3
497b5a6e86 yep 2025-08-15 16:12:43 +02:00
pigwin-3
8b56bf8370 increase time till deletion to 90 min 2025-06-07 14:18:48 +02:00
16 changed files with 522 additions and 746 deletions

View File

@@ -30,7 +30,7 @@ jobs:
- name: Get commit version - name: Get commit version
id: commit-version id: commit-version
run: | run: |
COMMIT_MSG=$(git log -1 --pretty=%B | head -n1 | xargs) COMMIT_MSG=$(git log -1 --pretty=%B)
echo "Commit message: $COMMIT_MSG" # Debugging output echo "Commit message: $COMMIT_MSG" # Debugging output
# Updated regex to handle both vX.Y, vX.Y.Z, and vX.Y-pre-release formats # Updated regex to handle both vX.Y, vX.Y.Z, and vX.Y-pre-release formats
if [[ "$COMMIT_MSG" =~ ^v[0-9]+\.[0-9]+(\.[0-9]+)?(-[a-zA-Z0-9._-]+)?$ ]]; then if [[ "$COMMIT_MSG" =~ ^v[0-9]+\.[0-9]+(\.[0-9]+)?(-[a-zA-Z0-9._-]+)?$ ]]; then

View File

@@ -1,5 +1,5 @@
# Use the official Golang image as the base image # Use the official Golang image as the base image
FROM golang:1.26.0 FROM golang:1.23.4
# Set the Current Working Directory inside the container # Set the Current Working Directory inside the container
WORKDIR /app WORKDIR /app

View File

@@ -138,7 +138,7 @@ nano postgres_data/postgresql.conf
Change the following values Change the following values
```conf ```conf
listen_addresses = '*' listen_addresses = '*'
max_connections = 200 max_connections = 100
shared_buffers = 16GB shared_buffers = 16GB
work_mem = 256MB work_mem = 256MB
maintenance_work_mem = 2GB maintenance_work_mem = 2GB

View File

@@ -10,8 +10,8 @@
"valkey": { "valkey": {
"host": "127.0.0.1", "host": "127.0.0.1",
"port": "6379", "port": "6379",
"max_conns": 100, "max_conns": 50,
"timeout_ms": 2000, "timeout_ms": 5000,
"password": "the_valkey_password" "password": "the_valkey_password"
}, },
"temp": "value" "temp": "value"

View File

@@ -27,11 +27,10 @@ func ConnectToPostgreSQL() (*sql.DB, error) {
return nil, err return nil, err
} }
// Set connection pool settings for high concurrency // Set connection pool settings
db.SetMaxOpenConns(50) // Maximum number of open connections to the database db.SetMaxOpenConns(25) // Maximum number of open connections to the database
db.SetMaxIdleConns(25) // Maximum number of connections in the idle connection pool db.SetMaxIdleConns(25) // Maximum number of connections in the idle connection pool
db.SetConnMaxLifetime(1 * time.Hour) // Maximum amount of time a connection may be reused db.SetConnMaxLifetime(1 * time.Hour) // Maximum amount of time a connection may be reused
db.SetConnMaxIdleTime(5 * time.Minute) // Maximum amount of time a connection may be idle
fmt.Println("Connection to PostgreSQL opened successfully :D") fmt.Println("Connection to PostgreSQL opened successfully :D")

View File

@@ -1,12 +1,9 @@
package data package data
import ( import (
"crypto/tls"
"encoding/xml"
"fmt"
"log" "log"
"encoding/xml"
"net/http" "net/http"
"time"
) )
type Data struct { type Data struct {
@@ -130,86 +127,23 @@ type Data struct {
} }
func FetchData(timestamp string) (*Data, error) { func FetchData(timestamp string) (*Data, error) {
// Configure HTTP client with timeout and HTTP/1.1 to avoid HTTP/2 stream errors client := &http.Client{}
transport := &http.Transport{
TLSClientConfig: &tls.Config{
MinVersion: tls.VersionTLS12,
},
MaxIdleConns: 10,
MaxIdleConnsPerHost: 10,
IdleConnTimeout: 90 * time.Second,
DisableCompression: false,
ForceAttemptHTTP2: false, // Disable HTTP/2 to avoid stream errors
TLSHandshakeTimeout: 10 * time.Second,
ResponseHeaderTimeout: 30 * time.Second,
ExpectContinueTimeout: 1 * time.Second,
}
client := &http.Client{
Transport: transport,
Timeout: 180 * time.Second, // 3 minute timeout for large datasets
}
requestorId := "ti1-" + timestamp requestorId := "ti1-" + timestamp
url := "https://api.entur.io/realtime/v1/rest/et?useOriginalId=true&maxSize=100000&requestorId=" + requestorId url := "https://api.entur.io/realtime/v1/rest/et?useOriginalId=true&maxSize=100000&requestorId=" + requestorId
log.Println("Fetching data from URL:", url)
// Retry logic for transient failures resp, err := client.Get(url)
var resp *http.Response
var err error
var data *Data
maxRetries := 3
for attempt := 1; attempt <= maxRetries; attempt++ {
log.Printf("Fetching data from URL (attempt %d/%d): %s", attempt, maxRetries, url)
resp, err = client.Get(url)
if err != nil {
log.Printf("Request failed: %v", err)
if attempt < maxRetries {
waitTime := time.Duration(attempt*2) * time.Second
log.Printf("Retrying in %v...", waitTime)
time.Sleep(waitTime)
}
continue
}
// Check HTTP status code
if resp.StatusCode != http.StatusOK {
resp.Body.Close()
err = fmt.Errorf("HTTP error: %s (status code: %d)", resp.Status, resp.StatusCode)
log.Printf("%v", err)
if attempt < maxRetries {
waitTime := time.Duration(attempt*2) * time.Second
log.Printf("Retrying in %v...", waitTime)
time.Sleep(waitTime)
}
continue
}
// Try to decode the response
data = &Data{}
decoder := xml.NewDecoder(resp.Body)
err = decoder.Decode(data)
resp.Body.Close()
if err != nil {
log.Printf("Failed to decode XML: %v", err)
if attempt < maxRetries {
waitTime := time.Duration(attempt*2) * time.Second
log.Printf("Retrying in %v...", waitTime)
time.Sleep(waitTime)
}
continue
}
// Success!
log.Printf("Successfully fetched and decoded data")
return data, nil
}
// All retries failed
if err != nil { if err != nil {
return nil, err return nil, err
} }
return nil, fmt.Errorf("Failed to fetch data after %d attempts", maxRetries) defer resp.Body.Close()
data := &Data{}
decoder := xml.NewDecoder(resp.Body)
err = decoder.Decode(data)
if err != nil {
return nil, err
}
return data, nil
} }

View File

@@ -6,18 +6,11 @@ import (
"database/sql" "database/sql"
"encoding/hex" "encoding/hex"
"fmt" "fmt"
"sync"
"ti1/valki" "ti1/valki"
"github.com/valkey-io/valkey-go" "github.com/valkey-io/valkey-go"
) )
type CallResult struct {
ID int
Action string
Error error
}
func InsertOrUpdateEstimatedCall(ctx context.Context, db *sql.DB, values []interface{}, valkeyClient valkey.Client) (int, string, error) { func InsertOrUpdateEstimatedCall(ctx context.Context, db *sql.DB, values []interface{}, valkeyClient valkey.Client) (int, string, error) {
// Replace empty strings with nil for timestamp fields // Replace empty strings with nil for timestamp fields
for i, v := range values { for i, v := range values {
@@ -35,15 +28,19 @@ func InsertOrUpdateEstimatedCall(ctx context.Context, db *sql.DB, values []inter
} }
hash := md5.Sum([]byte(valuesString)) hash := md5.Sum([]byte(valuesString))
hashString := hex.EncodeToString(hash[:]) hashString := hex.EncodeToString(hash[:])
//fmt.Println("HashString:", hashString)
estimatedVehicleJourneyID := values[0] estimatedVehicleJourneyID := values[0]
orderID := values[1] orderID := values[1]
key := fmt.Sprintf("%v.%v", estimatedVehicleJourneyID, orderID) key := fmt.Sprintf("%v.%v", estimatedVehicleJourneyID, orderID)
//fmt.Printf("Estimated Vehicle Journey ID: %v, Order ID: %v\n", estimatedVehicleJourneyID, orderID)
var err error
// Get the MD5 hash from Valkey // Get the MD5 hash from Valkey
retrievedHash, err := valki.GetValkeyValue(ctx, valkeyClient, key) retrievedHash, err := valki.GetValkeyValue(ctx, valkeyClient, key)
if err != nil { if err != nil {
return 0, "", fmt.Errorf("failed to get value from Valkey: %w", err) return 0, "", fmt.Errorf("failed to get value from Valkey: %v", err)
} }
// Check if the retrieved value matches the original MD5 hash // Check if the retrieved value matches the original MD5 hash
@@ -67,60 +64,26 @@ func InsertOrUpdateEstimatedCall(ctx context.Context, db *sql.DB, values []inter
estimated_data = EXCLUDED.estimated_data estimated_data = EXCLUDED.estimated_data
RETURNING CASE WHEN xmax = 0 THEN 'insert' ELSE 'update' END, id; RETURNING CASE WHEN xmax = 0 THEN 'insert' ELSE 'update' END, id;
` `
stmt, err := db.Prepare(query)
if err != nil {
return 0, "", fmt.Errorf("error preparing statement: %v", err)
}
defer stmt.Close()
err = valki.SetValkeyValue(ctx, valkeyClient, key, hashString) err = valki.SetValkeyValue(ctx, valkeyClient, key, hashString)
if err != nil { if err != nil {
return 0, "", fmt.Errorf("failed to set value in Valkey: %w", err) return 0, "", fmt.Errorf("failed to set value in Valkey: %v", err)
} }
var action string var action string
var id int var id int
err = db.QueryRowContext(ctx, query, values...).Scan(&action, &id) err = stmt.QueryRow(values...).Scan(&action, &id)
if err != nil { if err != nil {
return 0, "", fmt.Errorf("error executing statement: %w", err) return 0, "", fmt.Errorf("error executing statement: %v", err)
} }
return id, action, nil return id, action, nil
} } else {
//fmt.Printf("MATCH!!! Original Hash: %s, Retrieved Hash: %s\n", hashString, retrievedHash)
return 0, "none", nil return 0, "none", nil
} }
// BatchInsertEstimatedCalls processes multiple estimated calls concurrently
func BatchInsertEstimatedCalls(ctx context.Context, db *sql.DB, batch [][]interface{}, valkeyClient valkey.Client, workerCount int) ([]CallResult, error) {
if len(batch) == 0 {
return nil, nil
}
results := make([]CallResult, len(batch))
jobs := make(chan int, len(batch))
var wg sync.WaitGroup
// Start workers
for w := 0; w < workerCount; w++ {
wg.Add(1)
go func() {
defer wg.Done()
for idx := range jobs {
select {
case <-ctx.Done():
return
default:
id, action, err := InsertOrUpdateEstimatedCall(ctx, db, batch[idx], valkeyClient)
results[idx] = CallResult{
ID: id,
Action: action,
Error: err,
}
}
}
}()
}
// Send jobs
for i := range batch {
jobs <- i
}
close(jobs)
wg.Wait()
return results, nil
} }

View File

@@ -1,68 +1,10 @@
package database package database
import ( import (
"context"
"database/sql" "database/sql"
"fmt" "fmt"
"sync"
) )
type EVJResult struct {
ID int
Action string
Error error
Index int // To maintain order
}
// PreparedStatements holds reusable prepared statements
type PreparedStatements struct {
evjStmt *sql.Stmt
ecStmt *sql.Stmt
rcStmt *sql.Stmt
mu sync.Mutex
}
func NewPreparedStatements(db *sql.DB) (*PreparedStatements, error) {
evjQuery := `
INSERT INTO estimatedvehiclejourney (servicedelivery, recordedattime, lineref, directionref, datasource, datedvehiclejourneyref, vehiclemode, dataframeref, originref, destinationref, operatorref, vehicleref, cancellation, other, firstservicedelivery)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $1)
ON CONFLICT (lineref, directionref, datasource, datedvehiclejourneyref)
DO UPDATE SET
servicedelivery = EXCLUDED.servicedelivery,
recordedattime = EXCLUDED.recordedattime,
vehiclemode = COALESCE(EXCLUDED.vehiclemode, estimatedvehiclejourney.vehiclemode),
dataframeref = COALESCE(EXCLUDED.dataframeref, estimatedvehiclejourney.dataframeref),
originref = COALESCE(EXCLUDED.originref, estimatedvehiclejourney.originref),
destinationref = COALESCE(EXCLUDED.destinationref, estimatedvehiclejourney.destinationref),
operatorref = COALESCE(EXCLUDED.operatorref, estimatedvehiclejourney.operatorref),
vehicleref = COALESCE(EXCLUDED.vehicleref, estimatedvehiclejourney.vehicleref),
cancellation = COALESCE(EXCLUDED.cancellation, estimatedvehiclejourney.cancellation),
other = COALESCE(EXCLUDED.other, estimatedvehiclejourney.other)
RETURNING CASE WHEN xmax = 0 THEN 'insert' ELSE 'update' END, id;
`
evjStmt, err := db.Prepare(evjQuery)
if err != nil {
return nil, fmt.Errorf("failed to prepare EVJ statement: %w", err)
}
return &PreparedStatements{
evjStmt: evjStmt,
}, nil
}
func (ps *PreparedStatements) Close() {
if ps.evjStmt != nil {
ps.evjStmt.Close()
}
if ps.ecStmt != nil {
ps.ecStmt.Close()
}
if ps.rcStmt != nil {
ps.rcStmt.Close()
}
}
func InsertOrUpdateEstimatedVehicleJourney(db *sql.DB, values []interface{}) (int, string, error) { func InsertOrUpdateEstimatedVehicleJourney(db *sql.DB, values []interface{}) (int, string, error) {
query := ` query := `
INSERT INTO estimatedvehiclejourney (servicedelivery, recordedattime, lineref, directionref, datasource, datedvehiclejourneyref, vehiclemode, dataframeref, originref, destinationref, operatorref, vehicleref, cancellation, other, firstservicedelivery) INSERT INTO estimatedvehiclejourney (servicedelivery, recordedattime, lineref, directionref, datasource, datedvehiclejourneyref, vehiclemode, dataframeref, originref, destinationref, operatorref, vehicleref, cancellation, other, firstservicedelivery)
@@ -82,54 +24,18 @@ func InsertOrUpdateEstimatedVehicleJourney(db *sql.DB, values []interface{}) (in
RETURNING CASE WHEN xmax = 0 THEN 'insert' ELSE 'update' END, id; RETURNING CASE WHEN xmax = 0 THEN 'insert' ELSE 'update' END, id;
` `
stmt, err := db.Prepare(query)
if err != nil {
return 0, "", fmt.Errorf("error preparing statement: %v", err)
}
defer stmt.Close()
var action string var action string
var id int var id int
err := db.QueryRow(query, values...).Scan(&action, &id) err = stmt.QueryRow(values...).Scan(&action, &id)
if err != nil { if err != nil {
return 0, "", fmt.Errorf("error executing EVJ statement: %w", err) return 0, "", fmt.Errorf("error executing statement: %v", err)
} }
return id, action, nil return id, action, nil
} }
// BatchInsertEVJ processes multiple EVJ inserts concurrently
func BatchInsertEVJ(ctx context.Context, db *sql.DB, batch [][]interface{}, workerCount int) ([]EVJResult, error) {
if len(batch) == 0 {
return nil, nil
}
results := make([]EVJResult, len(batch))
jobs := make(chan int, len(batch))
var wg sync.WaitGroup
// Start workers
for w := 0; w < workerCount; w++ {
wg.Add(1)
go func() {
defer wg.Done()
for idx := range jobs {
select {
case <-ctx.Done():
return
default:
id, action, err := InsertOrUpdateEstimatedVehicleJourney(db, batch[idx])
results[idx] = EVJResult{
ID: id,
Action: action,
Error: err,
Index: idx,
}
}
}
}()
}
// Send jobs
for i := range batch {
jobs <- i
}
close(jobs)
wg.Wait()
return results, nil
}

View File

@@ -6,7 +6,6 @@ import (
"database/sql" "database/sql"
"encoding/hex" "encoding/hex"
"fmt" "fmt"
"sync"
"ti1/valki" "ti1/valki"
"github.com/valkey-io/valkey-go" "github.com/valkey-io/valkey-go"
@@ -34,10 +33,12 @@ func InsertOrUpdateRecordedCall(ctx context.Context, db *sql.DB, values []interf
orderID := values[1] orderID := values[1]
key := fmt.Sprintf("%v.%v", estimatedVehicleJourneyID, orderID) key := fmt.Sprintf("%v.%v", estimatedVehicleJourneyID, orderID)
var err error
// Get the MD5 hash from Valkey // Get the MD5 hash from Valkey
retrievedHash, err := valki.GetValkeyValue(ctx, valkeyClient, key) retrievedHash, err := valki.GetValkeyValue(ctx, valkeyClient, key)
if err != nil { if err != nil {
return 0, "", fmt.Errorf("failed to get value from Valkey: %w", err) return 0, "", fmt.Errorf("failed to get value from Valkey: %v", err)
} }
// Check if the retrieved value matches the original MD5 hash // Check if the retrieved value matches the original MD5 hash
@@ -64,60 +65,25 @@ func InsertOrUpdateRecordedCall(ctx context.Context, db *sql.DB, values []interf
recorded_data = EXCLUDED.recorded_data recorded_data = EXCLUDED.recorded_data
RETURNING CASE WHEN xmax = 0 THEN 'insert' ELSE 'update' END, id; RETURNING CASE WHEN xmax = 0 THEN 'insert' ELSE 'update' END, id;
` `
stmt, err := db.Prepare(query)
if err != nil {
return 0, "", fmt.Errorf("error preparing statement: %v", err)
}
defer stmt.Close()
err = valki.SetValkeyValue(ctx, valkeyClient, key, hashString) err = valki.SetValkeyValue(ctx, valkeyClient, key, hashString)
if err != nil { if err != nil {
return 0, "", fmt.Errorf("failed to set value in Valkey: %w", err) return 0, "", fmt.Errorf("failed to set value in Valkey: %v", err)
} }
var action string var action string
var id int var id int
err = db.QueryRowContext(ctx, query, values...).Scan(&action, &id) err = stmt.QueryRow(values...).Scan(&action, &id)
if err != nil { if err != nil {
return 0, "", fmt.Errorf("error executing statement: %w", err) return 0, "", fmt.Errorf("error executing statement: %v", err)
} }
return id, action, nil return id, action, nil
} } else {
return 0, "none", nil return 0, "none", nil
} }
// BatchInsertRecordedCalls processes multiple recorded calls concurrently
func BatchInsertRecordedCalls(ctx context.Context, db *sql.DB, batch [][]interface{}, valkeyClient valkey.Client, workerCount int) ([]CallResult, error) {
if len(batch) == 0 {
return nil, nil
}
results := make([]CallResult, len(batch))
jobs := make(chan int, len(batch))
var wg sync.WaitGroup
// Start workers
for w := 0; w < workerCount; w++ {
wg.Add(1)
go func() {
defer wg.Done()
for idx := range jobs {
select {
case <-ctx.Done():
return
default:
id, action, err := InsertOrUpdateRecordedCall(ctx, db, batch[idx], valkeyClient)
results[idx] = CallResult{
ID: id,
Action: action,
Error: err,
}
}
}
}()
}
// Send jobs
for i := range batch {
jobs <- i
}
close(jobs)
wg.Wait()
return results, nil
} }

View File

@@ -6,18 +6,25 @@ import (
) )
func InsertServiceDelivery(db *sql.DB, responseTimestamp string, recordedAtTime string) (int, error) { func InsertServiceDelivery(db *sql.DB, responseTimestamp string, recordedAtTime string) (int, error) {
fmt.Println("Inserting ServiceDelivery...")
var id int var id int
err := db.QueryRow("INSERT INTO public.ServiceDelivery (ResponseTimestamp, RecordedAtTime) VALUES ($1, $2) RETURNING ID", responseTimestamp, recordedAtTime).Scan(&id) err := db.QueryRow("INSERT INTO public.ServiceDelivery (ResponseTimestamp, RecordedAtTime) VALUES ($1, $2) RETURNING ID", responseTimestamp, recordedAtTime).Scan(&id)
if err != nil { if err != nil {
return 0, fmt.Errorf("failed to insert service delivery: %w", err) fmt.Println(err)
return 0, err
} }
//fmt.Println("ServiceDelivery inserted successfully! (", id, ")")
return id, nil return id, nil
} }
func UpdateServiceDeliveryData(db *sql.DB, id int, data string) error { func UpdateServiceDeliveryData(db *sql.DB, id int, data string) error {
fmt.Println("Updating ServiceDelivery data...")
_, err := db.Exec("UPDATE public.ServiceDelivery SET Data = $1 WHERE ID = $2", data, id) _, err := db.Exec("UPDATE public.ServiceDelivery SET Data = $1 WHERE ID = $2", data, id)
if err != nil { if err != nil {
return fmt.Errorf("failed to update service delivery data: %w", err) fmt.Println(err)
return err
} }
fmt.Println("Finished with this ServiceDelivery!")
return nil return nil
} }

61
docker-compose.yaml Normal file
View File

@@ -0,0 +1,61 @@
services:
db:
image: postgres:17.2
container_name: postgres-db
environment:
POSTGRES_USER: postgres
POSTGRES_PASSWORD: RootPassword
POSTGRES_DB: ti1
ports:
- "5432:5432"
volumes:
- /tmp/ti1/postgres_data:/var/lib/postgresql/data:z
- /tmp/ti1/init.sql:/docker-entrypoint-initdb.d/init.sql:ro,z
networks:
- app-network
healthcheck:
test: ["CMD", "pg_isready", "-U", "postgres", "-d", "ti1", "-h", "db"]
interval: 10s
retries: 5
restart: always
valkey:
image: valkey/valkey:latest
container_name: valkey
environment:
VALKEY_PASSWORD: the_valkey_password
ports:
- "6379:6379"
volumes:
- /tmp/ti1/valkey_data:/data:z
networks:
- app-network
restart: always
ti1-container:
build:
context: .
dockerfile: Dockerfile
container_name: ti1-container
environment:
DB_HOST: db
DB_PORT: 5432
DB_USER: postgres
DB_PASSWORD: RootPassword
DB_NAME: ti1
DB_SSLMODE: disable
VALKEY_HOST: valkey
VALKEY_PORT: 6379
VALKEY_PASSWORD: the_valkey_password
depends_on:
db:
condition: service_healthy
valkey:
condition: service_started
networks:
- app-network
restart: always
networks:
app-network:
driver: bridge

View File

@@ -6,21 +6,12 @@ import (
"fmt" "fmt"
"log" "log"
"strings" "strings"
"sync"
"sync/atomic"
"ti1/config" "ti1/config"
"ti1/data" "ti1/data"
"ti1/database" "ti1/database"
"time"
) )
// DBData is the main entry point for data processing
func DBData(data *data.Data) { func DBData(data *data.Data) {
DBDataOptimized(data)
}
// DBDataOptimized processes data with concurrent workers for better performance
func DBDataOptimized(data *data.Data) {
fmt.Println(data.ServiceDelivery.ResponseTimestamp) fmt.Println(data.ServiceDelivery.ResponseTimestamp)
fmt.Println(data.ServiceDelivery.EstimatedTimetableDelivery[0].EstimatedJourneyVersionFrame.RecordedAtTime) fmt.Println(data.ServiceDelivery.EstimatedTimetableDelivery[0].EstimatedJourneyVersionFrame.RecordedAtTime)
@@ -46,96 +37,17 @@ func DBDataOptimized(data *data.Data) {
} }
fmt.Println("SID:", sid) fmt.Println("SID:", sid)
// Record start time // counters
startTime := time.Now() var insertCount, updateCount, totalCount, estimatedCallInsertCount, estimatedCallUpdateCount, estimatedCallNoneCount, recordedCallInsertCount, recordedCallUpdateCount, recordedCallNoneCount int
// Atomic counters for thread-safe counting for _, journey := range data.ServiceDelivery.EstimatedTimetableDelivery[0].EstimatedJourneyVersionFrame.EstimatedVehicleJourney {
var insertCount, updateCount, estimatedCallInsertCount, estimatedCallUpdateCount, estimatedCallNoneCount, recordedCallInsertCount, recordedCallUpdateCount, recordedCallNoneCount int64
journeys := data.ServiceDelivery.EstimatedTimetableDelivery[0].EstimatedJourneyVersionFrame.EstimatedVehicleJourney
totalJourneys := len(journeys)
fmt.Printf("Processing %d journeys...\n", totalJourneys)
// Job structures
type evjJob struct {
index int
}
type callJob struct {
evjID int
values []interface{}
}
// Channels
workerCount := 20 // Adjust based on your database and CPU
evjJobs := make(chan evjJob, workerCount*2)
estimatedCallJobs := make(chan callJob, workerCount*10)
recordedCallJobs := make(chan callJob, workerCount*10)
var wg sync.WaitGroup
var callWg sync.WaitGroup
// Start Estimated Call workers
for w := 0; w < workerCount; w++ {
callWg.Add(1)
go func() {
defer callWg.Done()
for job := range estimatedCallJobs {
id, action, err := database.InsertOrUpdateEstimatedCall(ctx, db, job.values, valkeyClient)
if err != nil {
log.Printf("Error inserting/updating estimated call: %v\n", err)
continue
}
if action == "insert" {
atomic.AddInt64(&estimatedCallInsertCount, 1)
} else if action == "update" {
atomic.AddInt64(&estimatedCallUpdateCount, 1)
} else if action == "none" {
atomic.AddInt64(&estimatedCallNoneCount, 1)
}
_ = id
}
}()
}
// Start Recorded Call workers
for w := 0; w < workerCount; w++ {
callWg.Add(1)
go func() {
defer callWg.Done()
for job := range recordedCallJobs {
id, action, err := database.InsertOrUpdateRecordedCall(ctx, db, job.values, valkeyClient)
if err != nil {
log.Printf("Error inserting/updating recorded call: %v\n", err)
continue
}
if action == "insert" {
atomic.AddInt64(&recordedCallInsertCount, 1)
} else if action == "update" {
atomic.AddInt64(&recordedCallUpdateCount, 1)
} else if action == "none" {
atomic.AddInt64(&recordedCallNoneCount, 1)
}
_ = id
}
}()
}
// Start EVJ workers
for w := 0; w < workerCount; w++ {
wg.Add(1)
go func() {
defer wg.Done()
for job := range evjJobs {
journey := &journeys[job.index]
// Prepare values
var values []interface{} var values []interface{}
var datedVehicleJourneyRef, otherJson string var datedVehicleJourneyRef, otherJson string
values = append(values, sid) values = append(values, sid)
values = append(values, journey.RecordedAtTime) values = append(values, journey.RecordedAtTime)
values = append(values, journey.LineRef) values = append(values, journey.LineRef)
//had to add to lowercase cus some values vary in case and it was causing duplicates
values = append(values, strings.ToLower(journey.DirectionRef)) values = append(values, strings.ToLower(journey.DirectionRef))
values = append(values, journey.DataSource) values = append(values, journey.DataSource)
@@ -156,8 +68,10 @@ func DBDataOptimized(data *data.Data) {
values = append(values, journey.VehicleRef) values = append(values, journey.VehicleRef)
values = append(values, journey.Cancellation) values = append(values, journey.Cancellation)
// Create JSON object // Create a map to hold the JSON object for the current journey
jsonObject := make(map[string]interface{}) jsonObject := make(map[string]interface{})
// Add relevant fields to the JSON object
if journey.OriginName != "" { if journey.OriginName != "" {
jsonObject["OriginName"] = journey.OriginName jsonObject["OriginName"] = journey.OriginName
} }
@@ -228,60 +142,71 @@ func DBDataOptimized(data *data.Data) {
jsonObject["Via"] = journey.Via.PlaceName jsonObject["Via"] = journey.Via.PlaceName
} }
// Convert the JSON object to a JSON string
jsonString, err := json.Marshal(jsonObject) jsonString, err := json.Marshal(jsonObject)
if err != nil { if err != nil {
log.Printf("Error marshaling JSON: %v\n", err) log.Fatal(err)
continue
} }
otherJson = string(jsonString) otherJson = string(jsonString)
values = append(values, otherJson) values = append(values, otherJson)
// Insert or update EVJ // Insert or update the record
id, action, err := database.InsertOrUpdateEstimatedVehicleJourney(db, values) id, action, err := database.InsertOrUpdateEstimatedVehicleJourney(db, values)
if err != nil { if err != nil {
log.Printf("Error inserting/updating estimated vehicle journey: %v\n", err) fmt.Printf("Error inserting/updating estimated vehicle journey: %v\n", err)
continue } else {
if 1 == 0 {
fmt.Printf("Action: %s, ID: %d\n", action, id)
} }
if action == "insert" { if action == "insert" {
atomic.AddInt64(&insertCount, 1) insertCount++
} else if action == "update" { } else if action == "update" {
atomic.AddInt64(&updateCount, 1) updateCount++
} }
totalCount = insertCount + updateCount
// Progress reporting //fmt.Printf("Inserts: %d, Updates: %d, Total: %d\n", insertCount, updateCount, totalCount)
total := atomic.AddInt64(&insertCount, 0) + atomic.AddInt64(&updateCount, 0) if totalCount%1000 == 0 {
if total%1000 == 0 {
fmt.Printf( fmt.Printf(
"EVJ - I: %d, U: %d, Total: %d; EstCalls - I: %d U: %d N: %d; RecCalls - I: %d U: %d N: %d\n", "Inserts: %d, Updates: %d, Total: %d; estimatedCalls = I: %d U: %d N: %d; recordedCalls = I: %d U: %d N: %d\n",
atomic.LoadInt64(&insertCount), insertCount,
atomic.LoadInt64(&updateCount), updateCount,
total, totalCount,
atomic.LoadInt64(&estimatedCallInsertCount), estimatedCallInsertCount,
atomic.LoadInt64(&estimatedCallUpdateCount), estimatedCallUpdateCount,
atomic.LoadInt64(&estimatedCallNoneCount), estimatedCallNoneCount,
atomic.LoadInt64(&recordedCallInsertCount), recordedCallInsertCount,
atomic.LoadInt64(&recordedCallUpdateCount), recordedCallUpdateCount,
atomic.LoadInt64(&recordedCallNoneCount), recordedCallNoneCount,
) )
} }
}
// Process Estimated Calls
for _, estimatedCall := range journey.EstimatedCalls { for _, estimatedCall := range journey.EstimatedCalls {
for _, call := range estimatedCall.EstimatedCall { for _, call := range estimatedCall.EstimatedCall {
var estimatedValues []interface{} var estimatedValues []interface{}
//1 estimatedvehiclejourney
estimatedValues = append(estimatedValues, id) estimatedValues = append(estimatedValues, id)
//2 order
estimatedValues = append(estimatedValues, call.Order) estimatedValues = append(estimatedValues, call.Order)
//3 stoppointref
estimatedValues = append(estimatedValues, call.StopPointRef) estimatedValues = append(estimatedValues, call.StopPointRef)
//4 aimeddeparturetime
estimatedValues = append(estimatedValues, call.AimedDepartureTime) estimatedValues = append(estimatedValues, call.AimedDepartureTime)
//5 expecteddeparturetime
estimatedValues = append(estimatedValues, call.ExpectedDepartureTime) estimatedValues = append(estimatedValues, call.ExpectedDepartureTime)
//6 aimedarrivaltime
estimatedValues = append(estimatedValues, call.AimedArrivalTime) estimatedValues = append(estimatedValues, call.AimedArrivalTime)
//7 expectedarrivaltime
estimatedValues = append(estimatedValues, call.ExpectedArrivalTime) estimatedValues = append(estimatedValues, call.ExpectedArrivalTime)
//8 cancellation
estimatedValues = append(estimatedValues, call.Cancellation) estimatedValues = append(estimatedValues, call.Cancellation)
// estimated_data JSON //9 estimated_data (JSON)
estimatedJsonObject := make(map[string]interface{}) estimatedJsonObject := make(map[string]interface{})
// data allrady loged
if call.ExpectedDepartureTime != "" { if call.ExpectedDepartureTime != "" {
estimatedJsonObject["ExpectedDepartureTime"] = call.ExpectedDepartureTime estimatedJsonObject["ExpectedDepartureTime"] = call.ExpectedDepartureTime
} }
@@ -291,6 +216,7 @@ func DBDataOptimized(data *data.Data) {
if call.Cancellation != "" { if call.Cancellation != "" {
estimatedJsonObject["Cancellation"] = call.Cancellation estimatedJsonObject["Cancellation"] = call.Cancellation
} }
// The rest
if call.StopPointName != "" { if call.StopPointName != "" {
estimatedJsonObject["StopPointName"] = call.StopPointName estimatedJsonObject["StopPointName"] = call.StopPointName
} }
@@ -361,41 +287,66 @@ func DBDataOptimized(data *data.Data) {
estimatedJsonObject["Occupancy"] = call.Occupancy estimatedJsonObject["Occupancy"] = call.Occupancy
} }
// Convert the JSON object to a JSON string
jsonString, err := json.Marshal(estimatedJsonObject) jsonString, err := json.Marshal(estimatedJsonObject)
if err != nil { if err != nil {
log.Printf("Error marshaling estimated call JSON: %v\n", err) log.Fatal(err)
continue
} }
estimatedValues = append(estimatedValues, string(jsonString)) estimatedValues = append(estimatedValues, string(jsonString))
// Convert to string values // Insert or update the record
interfaceValues := make([]interface{}, len(estimatedValues)) stringValues := make([]string, len(estimatedValues))
for i, v := range estimatedValues { for i, v := range estimatedValues {
interfaceValues[i] = fmt.Sprintf("%v", v) stringValues[i] = fmt.Sprintf("%v", v)
}
interfaceValues := make([]interface{}, len(stringValues))
for i, v := range stringValues {
interfaceValues[i] = v
}
id, action, err := database.InsertOrUpdateEstimatedCall(ctx, db, interfaceValues, valkeyClient)
if err != nil {
log.Fatalf("Failed to insert or update estimated call: %v", err)
} else {
if 1 == 0 {
fmt.Printf("Action: %s, ID: %d\n", action, id)
} }
// Send to worker pool if action == "insert" {
estimatedCallJobs <- callJob{evjID: id, values: interfaceValues} estimatedCallInsertCount++
} else if action == "update" {
estimatedCallUpdateCount++
} else if action == "none" {
estimatedCallNoneCount++
}
}
} }
} }
// Process Recorded Calls
for _, recordedCall := range journey.RecordedCalls { for _, recordedCall := range journey.RecordedCalls {
for _, call := range recordedCall.RecordedCall { for _, call := range recordedCall.RecordedCall {
var recordedValues []interface{} var recordedValues []interface{}
//1 estimatedvehiclejourney
recordedValues = append(recordedValues, id) recordedValues = append(recordedValues, id)
//2 order
recordedValues = append(recordedValues, call.Order) recordedValues = append(recordedValues, call.Order)
//3 stoppointref
recordedValues = append(recordedValues, call.StopPointRef) recordedValues = append(recordedValues, call.StopPointRef)
//4 aimeddeparturetime
recordedValues = append(recordedValues, call.AimedDepartureTime) recordedValues = append(recordedValues, call.AimedDepartureTime)
//5 expecteddeparturetime
recordedValues = append(recordedValues, call.ExpectedDepartureTime) recordedValues = append(recordedValues, call.ExpectedDepartureTime)
//6 aimedarrivaltime
recordedValues = append(recordedValues, call.AimedArrivalTime) recordedValues = append(recordedValues, call.AimedArrivalTime)
//7 expectedarrivaltime
recordedValues = append(recordedValues, call.ExpectedArrivalTime) recordedValues = append(recordedValues, call.ExpectedArrivalTime)
//8 cancellation
recordedValues = append(recordedValues, call.Cancellation) recordedValues = append(recordedValues, call.Cancellation)
//9 actualdeparturetime
recordedValues = append(recordedValues, call.ActualDepartureTime) recordedValues = append(recordedValues, call.ActualDepartureTime)
//10 actualarrivaltime
recordedValues = append(recordedValues, call.ActualArrivalTime) recordedValues = append(recordedValues, call.ActualArrivalTime)
// recorded_data JSON //11 recorded_data (JSON)
recordedJsonObject := make(map[string]interface{}) recordedJsonObject := make(map[string]interface{})
if call.StopPointName != "" { if call.StopPointName != "" {
recordedJsonObject["StopPointName"] = call.StopPointName recordedJsonObject["StopPointName"] = call.StopPointName
@@ -413,73 +364,68 @@ func DBDataOptimized(data *data.Data) {
recordedJsonObject["Occupancy"] = call.Occupancy recordedJsonObject["Occupancy"] = call.Occupancy
} }
// Convert the JSON object to a JSON string
jsonString, err := json.Marshal(recordedJsonObject) jsonString, err := json.Marshal(recordedJsonObject)
if err != nil { if err != nil {
log.Printf("Error marshaling recorded call JSON: %v\n", err) log.Fatal(err)
continue
} }
recordedValues = append(recordedValues, string(jsonString)) recordedValues = append(recordedValues, string(jsonString))
// Convert to string values // Insert or update the record
interfaceValues := make([]interface{}, len(recordedValues)) stringValues := make([]string, len(recordedValues))
for i, v := range recordedValues { for i, v := range recordedValues {
interfaceValues[i] = fmt.Sprintf("%v", v) stringValues[i] = fmt.Sprintf("%v", v)
}
interfaceValues := make([]interface{}, len(stringValues))
for i, v := range stringValues {
interfaceValues[i] = v
} }
// Send to worker pool id, action, err := database.InsertOrUpdateRecordedCall(ctx, db, interfaceValues, valkeyClient)
recordedCallJobs <- callJob{evjID: id, values: interfaceValues} if err != nil {
fmt.Printf("Error inserting/updating recorded call: %v\n", err)
} else {
if 1 == 0 {
fmt.Printf("Action: %s, ID: %d\n", action, id)
}
if action == "insert" {
recordedCallInsertCount++
//fmt.Printf("Action: %s, ID: %d\n", action, id)
} else if action == "update" {
recordedCallUpdateCount++
} else if action == "none" {
recordedCallNoneCount++
} }
} }
} }
}()
} }
// Send all EVJ jobs
for i := range journeys {
evjJobs <- evjJob{index: i}
} }
close(evjJobs)
// Wait for EVJ processing to complete
wg.Wait()
// Close call job channels and wait for call processing to complete
close(estimatedCallJobs)
close(recordedCallJobs)
callWg.Wait()
// Record end time
endTime := time.Now()
// Print final stats
fmt.Printf( fmt.Printf(
"\nDONE: EVJ - Inserts: %d, Updates: %d, Total: %d\n"+ "DONE: Inserts: %d, Updates: %d, Total: %d; estimatedCalls = I: %d U: %d N: %d; recordedCalls = I: %d U: %d N: %d\n",
" EstimatedCalls - I: %d U: %d N: %d\n"+ insertCount,
" RecordedCalls - I: %d U: %d N: %d\n", updateCount,
atomic.LoadInt64(&insertCount), totalCount,
atomic.LoadInt64(&updateCount), estimatedCallInsertCount,
atomic.LoadInt64(&insertCount)+atomic.LoadInt64(&updateCount), estimatedCallUpdateCount,
atomic.LoadInt64(&estimatedCallInsertCount), estimatedCallNoneCount,
atomic.LoadInt64(&estimatedCallUpdateCount), recordedCallInsertCount,
atomic.LoadInt64(&estimatedCallNoneCount), recordedCallUpdateCount,
atomic.LoadInt64(&recordedCallInsertCount), recordedCallNoneCount,
atomic.LoadInt64(&recordedCallUpdateCount),
atomic.LoadInt64(&recordedCallNoneCount),
) )
// Create map to hold JSON // Create map to hold JSON
serviceDeliveryJsonObject := make(map[string]interface{}) serviceDeliveryJsonObject := make(map[string]interface{})
serviceDeliveryJsonObject["Inserts"] = atomic.LoadInt64(&insertCount)
serviceDeliveryJsonObject["Updates"] = atomic.LoadInt64(&updateCount) // Add fields to JSON
serviceDeliveryJsonObject["EstimatedCallInserts"] = atomic.LoadInt64(&estimatedCallInsertCount) serviceDeliveryJsonObject["Inserts"] = insertCount
serviceDeliveryJsonObject["EstimatedCallUpdates"] = atomic.LoadInt64(&estimatedCallUpdateCount) serviceDeliveryJsonObject["Updates"] = updateCount
serviceDeliveryJsonObject["EstimatedCallNone"] = atomic.LoadInt64(&estimatedCallNoneCount) serviceDeliveryJsonObject["EstimatedCallInserts"] = estimatedCallInsertCount
serviceDeliveryJsonObject["RecordedCallInserts"] = atomic.LoadInt64(&recordedCallInsertCount) serviceDeliveryJsonObject["EstimatedCallUpdates"] = estimatedCallUpdateCount
serviceDeliveryJsonObject["RecordedCallUpdates"] = atomic.LoadInt64(&recordedCallUpdateCount) serviceDeliveryJsonObject["EstimatedCallNone"] = estimatedCallNoneCount
serviceDeliveryJsonObject["RecordedCallNone"] = atomic.LoadInt64(&recordedCallNoneCount) serviceDeliveryJsonObject["RecordedCallInserts"] = recordedCallInsertCount
serviceDeliveryJsonObject["StartTime"] = startTime.Format(time.RFC3339) serviceDeliveryJsonObject["RecordedCallUpdates"] = recordedCallUpdateCount
serviceDeliveryJsonObject["EndTime"] = endTime.Format(time.RFC3339) serviceDeliveryJsonObject["RecordedCallNone"] = recordedCallNoneCount
serviceDeliveryJsonObject["Duration"] = endTime.Sub(startTime).String()
// Convert JSON object to JSON string // Convert JSON object to JSON string
serviceDeliveryJsonString, err := json.Marshal(serviceDeliveryJsonObject) serviceDeliveryJsonString, err := json.Marshal(serviceDeliveryJsonObject)
@@ -492,6 +438,4 @@ func DBDataOptimized(data *data.Data) {
if err != nil { if err != nil {
log.Fatal(err) log.Fatal(err)
} }
fmt.Println("Finished with this ServiceDelivery!")
} }

10
go.mod
View File

@@ -1,10 +1,10 @@
module ti1 module ti1
go 1.26.0 go 1.23.4
require github.com/lib/pq v1.10.9
require ( require (
github.com/lib/pq v1.11.2 github.com/valkey-io/valkey-go v1.0.52 // indirect
github.com/valkey-io/valkey-go v1.0.71 golang.org/x/sys v0.24.0 // indirect
) )
require golang.org/x/sys v0.41.0 // indirect

6
go.sum
View File

@@ -1,12 +1,6 @@
github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw= github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw=
github.com/lib/pq v1.10.9/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o= github.com/lib/pq v1.10.9/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o=
github.com/lib/pq v1.11.2 h1:x6gxUeu39V0BHZiugWe8LXZYZ+Utk7hSJGThs8sdzfs=
github.com/lib/pq v1.11.2/go.mod h1:/p+8NSbOcwzAEI7wiMXFlgydTwcgTr3OSKMsD2BitpA=
github.com/valkey-io/valkey-go v1.0.52 h1:ojrR736satGucqpllYzal8fUrNNROc11V10zokAyIYg= github.com/valkey-io/valkey-go v1.0.52 h1:ojrR736satGucqpllYzal8fUrNNROc11V10zokAyIYg=
github.com/valkey-io/valkey-go v1.0.52/go.mod h1:BXlVAPIL9rFQinSFM+N32JfWzfCaUAqBpZkc4vPY6fM= github.com/valkey-io/valkey-go v1.0.52/go.mod h1:BXlVAPIL9rFQinSFM+N32JfWzfCaUAqBpZkc4vPY6fM=
github.com/valkey-io/valkey-go v1.0.71 h1:tuKjGVLd7/I8CyUwqAq5EaD7isxQdlvJzXo3jS8pZW0=
github.com/valkey-io/valkey-go v1.0.71/go.mod h1:VGhZ6fs68Qrn2+OhH+6waZH27bjpgQOiLyUQyXuYK5k=
golang.org/x/sys v0.24.0 h1:Twjiwq9dn6R1fQcyiK+wQyHWfaz/BJB+YIpzU/Cv3Xg= golang.org/x/sys v0.24.0 h1:Twjiwq9dn6R1fQcyiK+wQyHWfaz/BJB+YIpzU/Cv3Xg=
golang.org/x/sys v0.24.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/sys v0.24.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.41.0 h1:Ivj+2Cp/ylzLiEU89QhWblYnOE9zerudt9Ftecq2C6k=
golang.org/x/sys v0.41.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks=

View File

@@ -9,7 +9,7 @@ import (
) )
func main() { func main() {
log.Println("ti1 v1.1.0") log.Println("ti1 v0.2.1")
log.Println("Starting...") log.Println("Starting...")
// Setup the database // Setup the database
@@ -34,9 +34,9 @@ func main() {
log.Println("finished in", time.Since(start)) log.Println("finished in", time.Since(start))
elapsed := time.Since(start) elapsed := time.Since(start)
if elapsed < 20*time.Second { if elapsed < 5*time.Minute {
log.Printf("starting again in %v", 20*time.Second-elapsed) log.Printf("starting again in %v", 5*time.Minute-elapsed)
time.Sleep(20*time.Second - elapsed) time.Sleep(5*time.Minute - elapsed)
} }
} }
} }

View File

@@ -9,13 +9,15 @@ import (
) )
func SetValkeyValue(ctx context.Context, client valkey.Client, key, value string) error { func SetValkeyValue(ctx context.Context, client valkey.Client, key, value string) error {
err := client.Do(ctx, client.B().Set().Key(key).Value(value).Ex(time.Hour).Build()).Error() err := client.Do(ctx, client.B().Set().Key(key).Value(value).Ex(90*time.Minute).Build()).Error()
if err != nil { if err != nil {
return fmt.Errorf("failed to set value in Valkey: %v", err) return fmt.Errorf("failed to set value in Valkey: %v", err)
} }
return nil return nil
} }
func GetValkeyValue(ctx context.Context, client valkey.Client, key string) (string, error) { func GetValkeyValue(ctx context.Context, client valkey.Client, key string) (string, error) {
value, err := client.Do(ctx, client.B().Get().Key(key).Build()).ToString() value, err := client.Do(ctx, client.B().Get().Key(key).Build()).ToString()
if err != nil { if err != nil {