21 Commits

Author SHA1 Message Date
pigwin
caeb7651d3 v1.1.0 2026-02-12 11:00:03 +00:00
pigwin
21a133fc57 nowait 2026-02-12 10:50:29 +00:00
pigwin
a4c4159688 v1.0.4 2026-02-12 10:34:27 +00:00
pigwin
f2b984b6f9 Forkort tid mellom hent 2026-02-12 10:34:04 +00:00
pigwin
348b3cd5b7 v1.0.3 2026-02-12 09:30:43 +00:00
pigwin
4c94654e0a v1.0.3 - oppdatering av eksterne pakker 2026-02-12 09:30:13 +00:00
pigwin
5b840950c8 v1.0.2
come on plz work for ducks sake
2025-12-23 22:51:20 +01:00
pigwin
4c72bb1d4c v1.0.2
v1.0.2 
Stor V ikke funke lel
2025-12-23 22:45:00 +01:00
pigwin
ea36b60233 v1.0.2 2025-12-23 21:44:06 +00:00
pigwin
201811e2d3 V1.0.2
V1.0.2 merge for da real timeing :)
2025-12-23 21:44:31 +01:00
pigwin
bcdc24e47f oppdatere print log 2025-12-23 20:42:13 +00:00
pigwin
4bb49e944f start end cool 2025-12-23 20:41:45 +00:00
pigwin
fbb20d576b v1.0.1 2025-12-23 17:14:43 +00:00
pigwin
823701c698 v1.0.0
V1.0
2025-12-23 18:06:08 +01:00
pigwin
00e6667bc0 v1.0.0 2025-12-23 16:44:24 +00:00
pigwin
9b433cdd57 dont diez plz 2025-12-23 14:40:53 +00:00
pigwin
c106267d76 retry stuffs?
plz workz
2025-12-23 14:34:25 +00:00
pigwin
b95fbb477d v num 2025-12-23 14:11:30 +00:00
pigwin
7424600a8b worth a shot 2025-12-23 13:24:20 +00:00
pigwin
5171dcf4f6 old folder
tmp
2025-12-23 13:03:56 +00:00
pigwin
0324912eb4 tstttttt 2025-12-23 12:46:30 +00:00
14 changed files with 762 additions and 475 deletions

View File

@@ -30,7 +30,7 @@ jobs:
- name: Get commit version - name: Get commit version
id: commit-version id: commit-version
run: | run: |
COMMIT_MSG=$(git log -1 --pretty=%B) COMMIT_MSG=$(git log -1 --pretty=%B | head -n1 | xargs)
echo "Commit message: $COMMIT_MSG" # Debugging output echo "Commit message: $COMMIT_MSG" # Debugging output
# Updated regex to handle both vX.Y, vX.Y.Z, and vX.Y-pre-release formats # Updated regex to handle both vX.Y, vX.Y.Z, and vX.Y-pre-release formats
if [[ "$COMMIT_MSG" =~ ^v[0-9]+\.[0-9]+(\.[0-9]+)?(-[a-zA-Z0-9._-]+)?$ ]]; then if [[ "$COMMIT_MSG" =~ ^v[0-9]+\.[0-9]+(\.[0-9]+)?(-[a-zA-Z0-9._-]+)?$ ]]; then

View File

@@ -1,5 +1,5 @@
# Use the official Golang image as the base image # Use the official Golang image as the base image
FROM golang:1.23.4 FROM golang:1.26.0
# Set the Current Working Directory inside the container # Set the Current Working Directory inside the container
WORKDIR /app WORKDIR /app

View File

@@ -138,7 +138,7 @@ nano postgres_data/postgresql.conf
Change the following values Change the following values
```conf ```conf
listen_addresses = '*' listen_addresses = '*'
max_connections = 100 max_connections = 200
shared_buffers = 16GB shared_buffers = 16GB
work_mem = 256MB work_mem = 256MB
maintenance_work_mem = 2GB maintenance_work_mem = 2GB

View File

@@ -10,8 +10,8 @@
"valkey": { "valkey": {
"host": "127.0.0.1", "host": "127.0.0.1",
"port": "6379", "port": "6379",
"max_conns": 50, "max_conns": 100,
"timeout_ms": 5000, "timeout_ms": 2000,
"password": "the_valkey_password" "password": "the_valkey_password"
}, },
"temp": "value" "temp": "value"

View File

@@ -27,10 +27,11 @@ func ConnectToPostgreSQL() (*sql.DB, error) {
return nil, err return nil, err
} }
// Set connection pool settings // Set connection pool settings for high concurrency
db.SetMaxOpenConns(25) // Maximum number of open connections to the database db.SetMaxOpenConns(50) // Maximum number of open connections to the database
db.SetMaxIdleConns(25) // Maximum number of connections in the idle connection pool db.SetMaxIdleConns(25) // Maximum number of connections in the idle connection pool
db.SetConnMaxLifetime(1 * time.Hour) // Maximum amount of time a connection may be reused db.SetConnMaxLifetime(1 * time.Hour) // Maximum amount of time a connection may be reused
db.SetConnMaxIdleTime(5 * time.Minute) // Maximum amount of time a connection may be idle
fmt.Println("Connection to PostgreSQL opened successfully :D") fmt.Println("Connection to PostgreSQL opened successfully :D")

View File

@@ -1,9 +1,12 @@
package data package data
import ( import (
"log" "crypto/tls"
"encoding/xml" "encoding/xml"
"fmt"
"log"
"net/http" "net/http"
"time"
) )
type Data struct { type Data struct {
@@ -127,23 +130,86 @@ type Data struct {
} }
func FetchData(timestamp string) (*Data, error) { func FetchData(timestamp string) (*Data, error) {
client := &http.Client{} // Configure HTTP client with timeout and HTTP/1.1 to avoid HTTP/2 stream errors
requestorId := "ti1-" + timestamp transport := &http.Transport{
TLSClientConfig: &tls.Config{
url := "https://api.entur.io/realtime/v1/rest/et?useOriginalId=true&maxSize=100000&requestorId=" + requestorId MinVersion: tls.VersionTLS12,
log.Println("Fetching data from URL:", url) },
resp, err := client.Get(url) MaxIdleConns: 10,
if err != nil { MaxIdleConnsPerHost: 10,
return nil, err IdleConnTimeout: 90 * time.Second,
DisableCompression: false,
ForceAttemptHTTP2: false, // Disable HTTP/2 to avoid stream errors
TLSHandshakeTimeout: 10 * time.Second,
ResponseHeaderTimeout: 30 * time.Second,
ExpectContinueTimeout: 1 * time.Second,
} }
defer resp.Body.Close()
data := &Data{} client := &http.Client{
Transport: transport,
Timeout: 180 * time.Second, // 3 minute timeout for large datasets
}
requestorId := "ti1-" + timestamp
url := "https://api.entur.io/realtime/v1/rest/et?useOriginalId=true&maxSize=100000&requestorId=" + requestorId
// Retry logic for transient failures
var resp *http.Response
var err error
var data *Data
maxRetries := 3
for attempt := 1; attempt <= maxRetries; attempt++ {
log.Printf("Fetching data from URL (attempt %d/%d): %s", attempt, maxRetries, url)
resp, err = client.Get(url)
if err != nil {
log.Printf("Request failed: %v", err)
if attempt < maxRetries {
waitTime := time.Duration(attempt*2) * time.Second
log.Printf("Retrying in %v...", waitTime)
time.Sleep(waitTime)
}
continue
}
// Check HTTP status code
if resp.StatusCode != http.StatusOK {
resp.Body.Close()
err = fmt.Errorf("HTTP error: %s (status code: %d)", resp.Status, resp.StatusCode)
log.Printf("%v", err)
if attempt < maxRetries {
waitTime := time.Duration(attempt*2) * time.Second
log.Printf("Retrying in %v...", waitTime)
time.Sleep(waitTime)
}
continue
}
// Try to decode the response
data = &Data{}
decoder := xml.NewDecoder(resp.Body) decoder := xml.NewDecoder(resp.Body)
err = decoder.Decode(data) err = decoder.Decode(data)
resp.Body.Close()
if err != nil {
log.Printf("Failed to decode XML: %v", err)
if attempt < maxRetries {
waitTime := time.Duration(attempt*2) * time.Second
log.Printf("Retrying in %v...", waitTime)
time.Sleep(waitTime)
}
continue
}
// Success!
log.Printf("Successfully fetched and decoded data")
return data, nil
}
// All retries failed
if err != nil { if err != nil {
return nil, err return nil, err
} }
return nil, fmt.Errorf("Failed to fetch data after %d attempts", maxRetries)
return data, nil
} }

View File

@@ -6,11 +6,18 @@ import (
"database/sql" "database/sql"
"encoding/hex" "encoding/hex"
"fmt" "fmt"
"sync"
"ti1/valki" "ti1/valki"
"github.com/valkey-io/valkey-go" "github.com/valkey-io/valkey-go"
) )
type CallResult struct {
ID int
Action string
Error error
}
func InsertOrUpdateEstimatedCall(ctx context.Context, db *sql.DB, values []interface{}, valkeyClient valkey.Client) (int, string, error) { func InsertOrUpdateEstimatedCall(ctx context.Context, db *sql.DB, values []interface{}, valkeyClient valkey.Client) (int, string, error) {
// Replace empty strings with nil for timestamp fields // Replace empty strings with nil for timestamp fields
for i, v := range values { for i, v := range values {
@@ -28,19 +35,15 @@ func InsertOrUpdateEstimatedCall(ctx context.Context, db *sql.DB, values []inter
} }
hash := md5.Sum([]byte(valuesString)) hash := md5.Sum([]byte(valuesString))
hashString := hex.EncodeToString(hash[:]) hashString := hex.EncodeToString(hash[:])
//fmt.Println("HashString:", hashString)
estimatedVehicleJourneyID := values[0] estimatedVehicleJourneyID := values[0]
orderID := values[1] orderID := values[1]
key := fmt.Sprintf("%v.%v", estimatedVehicleJourneyID, orderID) key := fmt.Sprintf("%v.%v", estimatedVehicleJourneyID, orderID)
//fmt.Printf("Estimated Vehicle Journey ID: %v, Order ID: %v\n", estimatedVehicleJourneyID, orderID)
var err error
// Get the MD5 hash from Valkey // Get the MD5 hash from Valkey
retrievedHash, err := valki.GetValkeyValue(ctx, valkeyClient, key) retrievedHash, err := valki.GetValkeyValue(ctx, valkeyClient, key)
if err != nil { if err != nil {
return 0, "", fmt.Errorf("failed to get value from Valkey: %v", err) return 0, "", fmt.Errorf("failed to get value from Valkey: %w", err)
} }
// Check if the retrieved value matches the original MD5 hash // Check if the retrieved value matches the original MD5 hash
@@ -64,26 +67,60 @@ func InsertOrUpdateEstimatedCall(ctx context.Context, db *sql.DB, values []inter
estimated_data = EXCLUDED.estimated_data estimated_data = EXCLUDED.estimated_data
RETURNING CASE WHEN xmax = 0 THEN 'insert' ELSE 'update' END, id; RETURNING CASE WHEN xmax = 0 THEN 'insert' ELSE 'update' END, id;
` `
stmt, err := db.Prepare(query)
if err != nil {
return 0, "", fmt.Errorf("error preparing statement: %v", err)
}
defer stmt.Close()
err = valki.SetValkeyValue(ctx, valkeyClient, key, hashString) err = valki.SetValkeyValue(ctx, valkeyClient, key, hashString)
if err != nil { if err != nil {
return 0, "", fmt.Errorf("failed to set value in Valkey: %v", err) return 0, "", fmt.Errorf("failed to set value in Valkey: %w", err)
} }
var action string var action string
var id int var id int
err = stmt.QueryRow(values...).Scan(&action, &id) err = db.QueryRowContext(ctx, query, values...).Scan(&action, &id)
if err != nil { if err != nil {
return 0, "", fmt.Errorf("error executing statement: %v", err) return 0, "", fmt.Errorf("error executing statement: %w", err)
} }
return id, action, nil return id, action, nil
} else { }
//fmt.Printf("MATCH!!! Original Hash: %s, Retrieved Hash: %s\n", hashString, retrievedHash)
return 0, "none", nil return 0, "none", nil
} }
// BatchInsertEstimatedCalls processes multiple estimated calls concurrently
func BatchInsertEstimatedCalls(ctx context.Context, db *sql.DB, batch [][]interface{}, valkeyClient valkey.Client, workerCount int) ([]CallResult, error) {
if len(batch) == 0 {
return nil, nil
}
results := make([]CallResult, len(batch))
jobs := make(chan int, len(batch))
var wg sync.WaitGroup
// Start workers
for w := 0; w < workerCount; w++ {
wg.Add(1)
go func() {
defer wg.Done()
for idx := range jobs {
select {
case <-ctx.Done():
return
default:
id, action, err := InsertOrUpdateEstimatedCall(ctx, db, batch[idx], valkeyClient)
results[idx] = CallResult{
ID: id,
Action: action,
Error: err,
}
}
}
}()
}
// Send jobs
for i := range batch {
jobs <- i
}
close(jobs)
wg.Wait()
return results, nil
} }

View File

@@ -1,10 +1,68 @@
package database package database
import ( import (
"context"
"database/sql" "database/sql"
"fmt" "fmt"
"sync"
) )
type EVJResult struct {
ID int
Action string
Error error
Index int // To maintain order
}
// PreparedStatements holds reusable prepared statements
type PreparedStatements struct {
evjStmt *sql.Stmt
ecStmt *sql.Stmt
rcStmt *sql.Stmt
mu sync.Mutex
}
func NewPreparedStatements(db *sql.DB) (*PreparedStatements, error) {
evjQuery := `
INSERT INTO estimatedvehiclejourney (servicedelivery, recordedattime, lineref, directionref, datasource, datedvehiclejourneyref, vehiclemode, dataframeref, originref, destinationref, operatorref, vehicleref, cancellation, other, firstservicedelivery)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $1)
ON CONFLICT (lineref, directionref, datasource, datedvehiclejourneyref)
DO UPDATE SET
servicedelivery = EXCLUDED.servicedelivery,
recordedattime = EXCLUDED.recordedattime,
vehiclemode = COALESCE(EXCLUDED.vehiclemode, estimatedvehiclejourney.vehiclemode),
dataframeref = COALESCE(EXCLUDED.dataframeref, estimatedvehiclejourney.dataframeref),
originref = COALESCE(EXCLUDED.originref, estimatedvehiclejourney.originref),
destinationref = COALESCE(EXCLUDED.destinationref, estimatedvehiclejourney.destinationref),
operatorref = COALESCE(EXCLUDED.operatorref, estimatedvehiclejourney.operatorref),
vehicleref = COALESCE(EXCLUDED.vehicleref, estimatedvehiclejourney.vehicleref),
cancellation = COALESCE(EXCLUDED.cancellation, estimatedvehiclejourney.cancellation),
other = COALESCE(EXCLUDED.other, estimatedvehiclejourney.other)
RETURNING CASE WHEN xmax = 0 THEN 'insert' ELSE 'update' END, id;
`
evjStmt, err := db.Prepare(evjQuery)
if err != nil {
return nil, fmt.Errorf("failed to prepare EVJ statement: %w", err)
}
return &PreparedStatements{
evjStmt: evjStmt,
}, nil
}
func (ps *PreparedStatements) Close() {
if ps.evjStmt != nil {
ps.evjStmt.Close()
}
if ps.ecStmt != nil {
ps.ecStmt.Close()
}
if ps.rcStmt != nil {
ps.rcStmt.Close()
}
}
func InsertOrUpdateEstimatedVehicleJourney(db *sql.DB, values []interface{}) (int, string, error) { func InsertOrUpdateEstimatedVehicleJourney(db *sql.DB, values []interface{}) (int, string, error) {
query := ` query := `
INSERT INTO estimatedvehiclejourney (servicedelivery, recordedattime, lineref, directionref, datasource, datedvehiclejourneyref, vehiclemode, dataframeref, originref, destinationref, operatorref, vehicleref, cancellation, other, firstservicedelivery) INSERT INTO estimatedvehiclejourney (servicedelivery, recordedattime, lineref, directionref, datasource, datedvehiclejourneyref, vehiclemode, dataframeref, originref, destinationref, operatorref, vehicleref, cancellation, other, firstservicedelivery)
@@ -24,18 +82,54 @@ func InsertOrUpdateEstimatedVehicleJourney(db *sql.DB, values []interface{}) (in
RETURNING CASE WHEN xmax = 0 THEN 'insert' ELSE 'update' END, id; RETURNING CASE WHEN xmax = 0 THEN 'insert' ELSE 'update' END, id;
` `
stmt, err := db.Prepare(query)
if err != nil {
return 0, "", fmt.Errorf("error preparing statement: %v", err)
}
defer stmt.Close()
var action string var action string
var id int var id int
err = stmt.QueryRow(values...).Scan(&action, &id) err := db.QueryRow(query, values...).Scan(&action, &id)
if err != nil { if err != nil {
return 0, "", fmt.Errorf("error executing statement: %v", err) return 0, "", fmt.Errorf("error executing EVJ statement: %w", err)
} }
return id, action, nil return id, action, nil
} }
// BatchInsertEVJ processes multiple EVJ inserts concurrently
func BatchInsertEVJ(ctx context.Context, db *sql.DB, batch [][]interface{}, workerCount int) ([]EVJResult, error) {
if len(batch) == 0 {
return nil, nil
}
results := make([]EVJResult, len(batch))
jobs := make(chan int, len(batch))
var wg sync.WaitGroup
// Start workers
for w := 0; w < workerCount; w++ {
wg.Add(1)
go func() {
defer wg.Done()
for idx := range jobs {
select {
case <-ctx.Done():
return
default:
id, action, err := InsertOrUpdateEstimatedVehicleJourney(db, batch[idx])
results[idx] = EVJResult{
ID: id,
Action: action,
Error: err,
Index: idx,
}
}
}
}()
}
// Send jobs
for i := range batch {
jobs <- i
}
close(jobs)
wg.Wait()
return results, nil
}

View File

@@ -6,6 +6,7 @@ import (
"database/sql" "database/sql"
"encoding/hex" "encoding/hex"
"fmt" "fmt"
"sync"
"ti1/valki" "ti1/valki"
"github.com/valkey-io/valkey-go" "github.com/valkey-io/valkey-go"
@@ -33,12 +34,10 @@ func InsertOrUpdateRecordedCall(ctx context.Context, db *sql.DB, values []interf
orderID := values[1] orderID := values[1]
key := fmt.Sprintf("%v.%v", estimatedVehicleJourneyID, orderID) key := fmt.Sprintf("%v.%v", estimatedVehicleJourneyID, orderID)
var err error
// Get the MD5 hash from Valkey // Get the MD5 hash from Valkey
retrievedHash, err := valki.GetValkeyValue(ctx, valkeyClient, key) retrievedHash, err := valki.GetValkeyValue(ctx, valkeyClient, key)
if err != nil { if err != nil {
return 0, "", fmt.Errorf("failed to get value from Valkey: %v", err) return 0, "", fmt.Errorf("failed to get value from Valkey: %w", err)
} }
// Check if the retrieved value matches the original MD5 hash // Check if the retrieved value matches the original MD5 hash
@@ -65,25 +64,60 @@ func InsertOrUpdateRecordedCall(ctx context.Context, db *sql.DB, values []interf
recorded_data = EXCLUDED.recorded_data recorded_data = EXCLUDED.recorded_data
RETURNING CASE WHEN xmax = 0 THEN 'insert' ELSE 'update' END, id; RETURNING CASE WHEN xmax = 0 THEN 'insert' ELSE 'update' END, id;
` `
stmt, err := db.Prepare(query)
if err != nil {
return 0, "", fmt.Errorf("error preparing statement: %v", err)
}
defer stmt.Close()
err = valki.SetValkeyValue(ctx, valkeyClient, key, hashString) err = valki.SetValkeyValue(ctx, valkeyClient, key, hashString)
if err != nil { if err != nil {
return 0, "", fmt.Errorf("failed to set value in Valkey: %v", err) return 0, "", fmt.Errorf("failed to set value in Valkey: %w", err)
} }
var action string var action string
var id int var id int
err = stmt.QueryRow(values...).Scan(&action, &id) err = db.QueryRowContext(ctx, query, values...).Scan(&action, &id)
if err != nil { if err != nil {
return 0, "", fmt.Errorf("error executing statement: %v", err) return 0, "", fmt.Errorf("error executing statement: %w", err)
} }
return id, action, nil return id, action, nil
} else { }
return 0, "none", nil return 0, "none", nil
} }
// BatchInsertRecordedCalls processes multiple recorded calls concurrently
func BatchInsertRecordedCalls(ctx context.Context, db *sql.DB, batch [][]interface{}, valkeyClient valkey.Client, workerCount int) ([]CallResult, error) {
if len(batch) == 0 {
return nil, nil
}
results := make([]CallResult, len(batch))
jobs := make(chan int, len(batch))
var wg sync.WaitGroup
// Start workers
for w := 0; w < workerCount; w++ {
wg.Add(1)
go func() {
defer wg.Done()
for idx := range jobs {
select {
case <-ctx.Done():
return
default:
id, action, err := InsertOrUpdateRecordedCall(ctx, db, batch[idx], valkeyClient)
results[idx] = CallResult{
ID: id,
Action: action,
Error: err,
}
}
}
}()
}
// Send jobs
for i := range batch {
jobs <- i
}
close(jobs)
wg.Wait()
return results, nil
} }

View File

@@ -6,25 +6,18 @@ import (
) )
func InsertServiceDelivery(db *sql.DB, responseTimestamp string, recordedAtTime string) (int, error) { func InsertServiceDelivery(db *sql.DB, responseTimestamp string, recordedAtTime string) (int, error) {
fmt.Println("Inserting ServiceDelivery...")
var id int var id int
err := db.QueryRow("INSERT INTO public.ServiceDelivery (ResponseTimestamp, RecordedAtTime) VALUES ($1, $2) RETURNING ID", responseTimestamp, recordedAtTime).Scan(&id) err := db.QueryRow("INSERT INTO public.ServiceDelivery (ResponseTimestamp, RecordedAtTime) VALUES ($1, $2) RETURNING ID", responseTimestamp, recordedAtTime).Scan(&id)
if err != nil { if err != nil {
fmt.Println(err) return 0, fmt.Errorf("failed to insert service delivery: %w", err)
return 0, err
} }
//fmt.Println("ServiceDelivery inserted successfully! (", id, ")")
return id, nil return id, nil
} }
func UpdateServiceDeliveryData(db *sql.DB, id int, data string) error { func UpdateServiceDeliveryData(db *sql.DB, id int, data string) error {
fmt.Println("Updating ServiceDelivery data...")
_, err := db.Exec("UPDATE public.ServiceDelivery SET Data = $1 WHERE ID = $2", data, id) _, err := db.Exec("UPDATE public.ServiceDelivery SET Data = $1 WHERE ID = $2", data, id)
if err != nil { if err != nil {
fmt.Println(err) return fmt.Errorf("failed to update service delivery data: %w", err)
return err
} }
fmt.Println("Finished with this ServiceDelivery!")
return nil return nil
} }

View File

@@ -6,12 +6,21 @@ import (
"fmt" "fmt"
"log" "log"
"strings" "strings"
"sync"
"sync/atomic"
"ti1/config" "ti1/config"
"ti1/data" "ti1/data"
"ti1/database" "ti1/database"
"time"
) )
// DBData is the main entry point for data processing
func DBData(data *data.Data) { func DBData(data *data.Data) {
DBDataOptimized(data)
}
// DBDataOptimized processes data with concurrent workers for better performance
func DBDataOptimized(data *data.Data) {
fmt.Println(data.ServiceDelivery.ResponseTimestamp) fmt.Println(data.ServiceDelivery.ResponseTimestamp)
fmt.Println(data.ServiceDelivery.EstimatedTimetableDelivery[0].EstimatedJourneyVersionFrame.RecordedAtTime) fmt.Println(data.ServiceDelivery.EstimatedTimetableDelivery[0].EstimatedJourneyVersionFrame.RecordedAtTime)
@@ -37,17 +46,96 @@ func DBData(data *data.Data) {
} }
fmt.Println("SID:", sid) fmt.Println("SID:", sid)
// counters // Record start time
var insertCount, updateCount, totalCount, estimatedCallInsertCount, estimatedCallUpdateCount, estimatedCallNoneCount, recordedCallInsertCount, recordedCallUpdateCount, recordedCallNoneCount int startTime := time.Now()
for _, journey := range data.ServiceDelivery.EstimatedTimetableDelivery[0].EstimatedJourneyVersionFrame.EstimatedVehicleJourney { // Atomic counters for thread-safe counting
var insertCount, updateCount, estimatedCallInsertCount, estimatedCallUpdateCount, estimatedCallNoneCount, recordedCallInsertCount, recordedCallUpdateCount, recordedCallNoneCount int64
journeys := data.ServiceDelivery.EstimatedTimetableDelivery[0].EstimatedJourneyVersionFrame.EstimatedVehicleJourney
totalJourneys := len(journeys)
fmt.Printf("Processing %d journeys...\n", totalJourneys)
// Job structures
type evjJob struct {
index int
}
type callJob struct {
evjID int
values []interface{}
}
// Channels
workerCount := 20 // Adjust based on your database and CPU
evjJobs := make(chan evjJob, workerCount*2)
estimatedCallJobs := make(chan callJob, workerCount*10)
recordedCallJobs := make(chan callJob, workerCount*10)
var wg sync.WaitGroup
var callWg sync.WaitGroup
// Start Estimated Call workers
for w := 0; w < workerCount; w++ {
callWg.Add(1)
go func() {
defer callWg.Done()
for job := range estimatedCallJobs {
id, action, err := database.InsertOrUpdateEstimatedCall(ctx, db, job.values, valkeyClient)
if err != nil {
log.Printf("Error inserting/updating estimated call: %v\n", err)
continue
}
if action == "insert" {
atomic.AddInt64(&estimatedCallInsertCount, 1)
} else if action == "update" {
atomic.AddInt64(&estimatedCallUpdateCount, 1)
} else if action == "none" {
atomic.AddInt64(&estimatedCallNoneCount, 1)
}
_ = id
}
}()
}
// Start Recorded Call workers
for w := 0; w < workerCount; w++ {
callWg.Add(1)
go func() {
defer callWg.Done()
for job := range recordedCallJobs {
id, action, err := database.InsertOrUpdateRecordedCall(ctx, db, job.values, valkeyClient)
if err != nil {
log.Printf("Error inserting/updating recorded call: %v\n", err)
continue
}
if action == "insert" {
atomic.AddInt64(&recordedCallInsertCount, 1)
} else if action == "update" {
atomic.AddInt64(&recordedCallUpdateCount, 1)
} else if action == "none" {
atomic.AddInt64(&recordedCallNoneCount, 1)
}
_ = id
}
}()
}
// Start EVJ workers
for w := 0; w < workerCount; w++ {
wg.Add(1)
go func() {
defer wg.Done()
for job := range evjJobs {
journey := &journeys[job.index]
// Prepare values
var values []interface{} var values []interface{}
var datedVehicleJourneyRef, otherJson string var datedVehicleJourneyRef, otherJson string
values = append(values, sid) values = append(values, sid)
values = append(values, journey.RecordedAtTime) values = append(values, journey.RecordedAtTime)
values = append(values, journey.LineRef) values = append(values, journey.LineRef)
//had to add to lowercase cus some values vary in case and it was causing duplicates
values = append(values, strings.ToLower(journey.DirectionRef)) values = append(values, strings.ToLower(journey.DirectionRef))
values = append(values, journey.DataSource) values = append(values, journey.DataSource)
@@ -68,10 +156,8 @@ func DBData(data *data.Data) {
values = append(values, journey.VehicleRef) values = append(values, journey.VehicleRef)
values = append(values, journey.Cancellation) values = append(values, journey.Cancellation)
// Create a map to hold the JSON object for the current journey // Create JSON object
jsonObject := make(map[string]interface{}) jsonObject := make(map[string]interface{})
// Add relevant fields to the JSON object
if journey.OriginName != "" { if journey.OriginName != "" {
jsonObject["OriginName"] = journey.OriginName jsonObject["OriginName"] = journey.OriginName
} }
@@ -142,71 +228,60 @@ func DBData(data *data.Data) {
jsonObject["Via"] = journey.Via.PlaceName jsonObject["Via"] = journey.Via.PlaceName
} }
// Convert the JSON object to a JSON string
jsonString, err := json.Marshal(jsonObject) jsonString, err := json.Marshal(jsonObject)
if err != nil { if err != nil {
log.Fatal(err) log.Printf("Error marshaling JSON: %v\n", err)
continue
} }
otherJson = string(jsonString) otherJson = string(jsonString)
values = append(values, otherJson) values = append(values, otherJson)
// Insert or update the record // Insert or update EVJ
id, action, err := database.InsertOrUpdateEstimatedVehicleJourney(db, values) id, action, err := database.InsertOrUpdateEstimatedVehicleJourney(db, values)
if err != nil { if err != nil {
fmt.Printf("Error inserting/updating estimated vehicle journey: %v\n", err) log.Printf("Error inserting/updating estimated vehicle journey: %v\n", err)
} else { continue
if 1 == 0 {
fmt.Printf("Action: %s, ID: %d\n", action, id)
} }
if action == "insert" { if action == "insert" {
insertCount++ atomic.AddInt64(&insertCount, 1)
} else if action == "update" { } else if action == "update" {
updateCount++ atomic.AddInt64(&updateCount, 1)
} }
totalCount = insertCount + updateCount
//fmt.Printf("Inserts: %d, Updates: %d, Total: %d\n", insertCount, updateCount, totalCount) // Progress reporting
if totalCount%1000 == 0 { total := atomic.AddInt64(&insertCount, 0) + atomic.AddInt64(&updateCount, 0)
if total%1000 == 0 {
fmt.Printf( fmt.Printf(
"Inserts: %d, Updates: %d, Total: %d; estimatedCalls = I: %d U: %d N: %d; recordedCalls = I: %d U: %d N: %d\n", "EVJ - I: %d, U: %d, Total: %d; EstCalls - I: %d U: %d N: %d; RecCalls - I: %d U: %d N: %d\n",
insertCount, atomic.LoadInt64(&insertCount),
updateCount, atomic.LoadInt64(&updateCount),
totalCount, total,
estimatedCallInsertCount, atomic.LoadInt64(&estimatedCallInsertCount),
estimatedCallUpdateCount, atomic.LoadInt64(&estimatedCallUpdateCount),
estimatedCallNoneCount, atomic.LoadInt64(&estimatedCallNoneCount),
recordedCallInsertCount, atomic.LoadInt64(&recordedCallInsertCount),
recordedCallUpdateCount, atomic.LoadInt64(&recordedCallUpdateCount),
recordedCallNoneCount, atomic.LoadInt64(&recordedCallNoneCount),
) )
} }
}
// Process Estimated Calls
for _, estimatedCall := range journey.EstimatedCalls { for _, estimatedCall := range journey.EstimatedCalls {
for _, call := range estimatedCall.EstimatedCall { for _, call := range estimatedCall.EstimatedCall {
var estimatedValues []interface{} var estimatedValues []interface{}
//1 estimatedvehiclejourney
estimatedValues = append(estimatedValues, id) estimatedValues = append(estimatedValues, id)
//2 order
estimatedValues = append(estimatedValues, call.Order) estimatedValues = append(estimatedValues, call.Order)
//3 stoppointref
estimatedValues = append(estimatedValues, call.StopPointRef) estimatedValues = append(estimatedValues, call.StopPointRef)
//4 aimeddeparturetime
estimatedValues = append(estimatedValues, call.AimedDepartureTime) estimatedValues = append(estimatedValues, call.AimedDepartureTime)
//5 expecteddeparturetime
estimatedValues = append(estimatedValues, call.ExpectedDepartureTime) estimatedValues = append(estimatedValues, call.ExpectedDepartureTime)
//6 aimedarrivaltime
estimatedValues = append(estimatedValues, call.AimedArrivalTime) estimatedValues = append(estimatedValues, call.AimedArrivalTime)
//7 expectedarrivaltime
estimatedValues = append(estimatedValues, call.ExpectedArrivalTime) estimatedValues = append(estimatedValues, call.ExpectedArrivalTime)
//8 cancellation
estimatedValues = append(estimatedValues, call.Cancellation) estimatedValues = append(estimatedValues, call.Cancellation)
//9 estimated_data (JSON) // estimated_data JSON
estimatedJsonObject := make(map[string]interface{}) estimatedJsonObject := make(map[string]interface{})
// data allrady loged
if call.ExpectedDepartureTime != "" { if call.ExpectedDepartureTime != "" {
estimatedJsonObject["ExpectedDepartureTime"] = call.ExpectedDepartureTime estimatedJsonObject["ExpectedDepartureTime"] = call.ExpectedDepartureTime
} }
@@ -216,7 +291,6 @@ func DBData(data *data.Data) {
if call.Cancellation != "" { if call.Cancellation != "" {
estimatedJsonObject["Cancellation"] = call.Cancellation estimatedJsonObject["Cancellation"] = call.Cancellation
} }
// The rest
if call.StopPointName != "" { if call.StopPointName != "" {
estimatedJsonObject["StopPointName"] = call.StopPointName estimatedJsonObject["StopPointName"] = call.StopPointName
} }
@@ -287,66 +361,41 @@ func DBData(data *data.Data) {
estimatedJsonObject["Occupancy"] = call.Occupancy estimatedJsonObject["Occupancy"] = call.Occupancy
} }
// Convert the JSON object to a JSON string
jsonString, err := json.Marshal(estimatedJsonObject) jsonString, err := json.Marshal(estimatedJsonObject)
if err != nil { if err != nil {
log.Fatal(err) log.Printf("Error marshaling estimated call JSON: %v\n", err)
continue
} }
estimatedValues = append(estimatedValues, string(jsonString)) estimatedValues = append(estimatedValues, string(jsonString))
// Insert or update the record // Convert to string values
stringValues := make([]string, len(estimatedValues)) interfaceValues := make([]interface{}, len(estimatedValues))
for i, v := range estimatedValues { for i, v := range estimatedValues {
stringValues[i] = fmt.Sprintf("%v", v) interfaceValues[i] = fmt.Sprintf("%v", v)
}
interfaceValues := make([]interface{}, len(stringValues))
for i, v := range stringValues {
interfaceValues[i] = v
}
id, action, err := database.InsertOrUpdateEstimatedCall(ctx, db, interfaceValues, valkeyClient)
if err != nil {
log.Fatalf("Failed to insert or update estimated call: %v", err)
} else {
if 1 == 0 {
fmt.Printf("Action: %s, ID: %d\n", action, id)
} }
if action == "insert" { // Send to worker pool
estimatedCallInsertCount++ estimatedCallJobs <- callJob{evjID: id, values: interfaceValues}
} else if action == "update" {
estimatedCallUpdateCount++
} else if action == "none" {
estimatedCallNoneCount++
}
}
} }
} }
// Process Recorded Calls
for _, recordedCall := range journey.RecordedCalls { for _, recordedCall := range journey.RecordedCalls {
for _, call := range recordedCall.RecordedCall { for _, call := range recordedCall.RecordedCall {
var recordedValues []interface{} var recordedValues []interface{}
//1 estimatedvehiclejourney
recordedValues = append(recordedValues, id) recordedValues = append(recordedValues, id)
//2 order
recordedValues = append(recordedValues, call.Order) recordedValues = append(recordedValues, call.Order)
//3 stoppointref
recordedValues = append(recordedValues, call.StopPointRef) recordedValues = append(recordedValues, call.StopPointRef)
//4 aimeddeparturetime
recordedValues = append(recordedValues, call.AimedDepartureTime) recordedValues = append(recordedValues, call.AimedDepartureTime)
//5 expecteddeparturetime
recordedValues = append(recordedValues, call.ExpectedDepartureTime) recordedValues = append(recordedValues, call.ExpectedDepartureTime)
//6 aimedarrivaltime
recordedValues = append(recordedValues, call.AimedArrivalTime) recordedValues = append(recordedValues, call.AimedArrivalTime)
//7 expectedarrivaltime
recordedValues = append(recordedValues, call.ExpectedArrivalTime) recordedValues = append(recordedValues, call.ExpectedArrivalTime)
//8 cancellation
recordedValues = append(recordedValues, call.Cancellation) recordedValues = append(recordedValues, call.Cancellation)
//9 actualdeparturetime
recordedValues = append(recordedValues, call.ActualDepartureTime) recordedValues = append(recordedValues, call.ActualDepartureTime)
//10 actualarrivaltime
recordedValues = append(recordedValues, call.ActualArrivalTime) recordedValues = append(recordedValues, call.ActualArrivalTime)
//11 recorded_data (JSON) // recorded_data JSON
recordedJsonObject := make(map[string]interface{}) recordedJsonObject := make(map[string]interface{})
if call.StopPointName != "" { if call.StopPointName != "" {
recordedJsonObject["StopPointName"] = call.StopPointName recordedJsonObject["StopPointName"] = call.StopPointName
@@ -364,68 +413,73 @@ func DBData(data *data.Data) {
recordedJsonObject["Occupancy"] = call.Occupancy recordedJsonObject["Occupancy"] = call.Occupancy
} }
// Convert the JSON object to a JSON string
jsonString, err := json.Marshal(recordedJsonObject) jsonString, err := json.Marshal(recordedJsonObject)
if err != nil { if err != nil {
log.Fatal(err) log.Printf("Error marshaling recorded call JSON: %v\n", err)
continue
} }
recordedValues = append(recordedValues, string(jsonString)) recordedValues = append(recordedValues, string(jsonString))
// Insert or update the record // Convert to string values
stringValues := make([]string, len(recordedValues)) interfaceValues := make([]interface{}, len(recordedValues))
for i, v := range recordedValues { for i, v := range recordedValues {
stringValues[i] = fmt.Sprintf("%v", v) interfaceValues[i] = fmt.Sprintf("%v", v)
}
interfaceValues := make([]interface{}, len(stringValues))
for i, v := range stringValues {
interfaceValues[i] = v
} }
id, action, err := database.InsertOrUpdateRecordedCall(ctx, db, interfaceValues, valkeyClient) // Send to worker pool
if err != nil { recordedCallJobs <- callJob{evjID: id, values: interfaceValues}
fmt.Printf("Error inserting/updating recorded call: %v\n", err) }
} else { }
if 1 == 0 { }
fmt.Printf("Action: %s, ID: %d\n", action, id) }()
} }
if action == "insert" { // Send all EVJ jobs
recordedCallInsertCount++ for i := range journeys {
//fmt.Printf("Action: %s, ID: %d\n", action, id) evjJobs <- evjJob{index: i}
} else if action == "update" {
recordedCallUpdateCount++
} else if action == "none" {
recordedCallNoneCount++
}
}
}
} }
close(evjJobs)
} // Wait for EVJ processing to complete
wg.Wait()
// Close call job channels and wait for call processing to complete
close(estimatedCallJobs)
close(recordedCallJobs)
callWg.Wait()
// Record end time
endTime := time.Now()
// Print final stats
fmt.Printf( fmt.Printf(
"DONE: Inserts: %d, Updates: %d, Total: %d; estimatedCalls = I: %d U: %d N: %d; recordedCalls = I: %d U: %d N: %d\n", "\nDONE: EVJ - Inserts: %d, Updates: %d, Total: %d\n"+
insertCount, " EstimatedCalls - I: %d U: %d N: %d\n"+
updateCount, " RecordedCalls - I: %d U: %d N: %d\n",
totalCount, atomic.LoadInt64(&insertCount),
estimatedCallInsertCount, atomic.LoadInt64(&updateCount),
estimatedCallUpdateCount, atomic.LoadInt64(&insertCount)+atomic.LoadInt64(&updateCount),
estimatedCallNoneCount, atomic.LoadInt64(&estimatedCallInsertCount),
recordedCallInsertCount, atomic.LoadInt64(&estimatedCallUpdateCount),
recordedCallUpdateCount, atomic.LoadInt64(&estimatedCallNoneCount),
recordedCallNoneCount, atomic.LoadInt64(&recordedCallInsertCount),
atomic.LoadInt64(&recordedCallUpdateCount),
atomic.LoadInt64(&recordedCallNoneCount),
) )
// Create map to hold JSON // Create map to hold JSON
serviceDeliveryJsonObject := make(map[string]interface{}) serviceDeliveryJsonObject := make(map[string]interface{})
serviceDeliveryJsonObject["Inserts"] = atomic.LoadInt64(&insertCount)
// Add fields to JSON serviceDeliveryJsonObject["Updates"] = atomic.LoadInt64(&updateCount)
serviceDeliveryJsonObject["Inserts"] = insertCount serviceDeliveryJsonObject["EstimatedCallInserts"] = atomic.LoadInt64(&estimatedCallInsertCount)
serviceDeliveryJsonObject["Updates"] = updateCount serviceDeliveryJsonObject["EstimatedCallUpdates"] = atomic.LoadInt64(&estimatedCallUpdateCount)
serviceDeliveryJsonObject["EstimatedCallInserts"] = estimatedCallInsertCount serviceDeliveryJsonObject["EstimatedCallNone"] = atomic.LoadInt64(&estimatedCallNoneCount)
serviceDeliveryJsonObject["EstimatedCallUpdates"] = estimatedCallUpdateCount serviceDeliveryJsonObject["RecordedCallInserts"] = atomic.LoadInt64(&recordedCallInsertCount)
serviceDeliveryJsonObject["EstimatedCallNone"] = estimatedCallNoneCount serviceDeliveryJsonObject["RecordedCallUpdates"] = atomic.LoadInt64(&recordedCallUpdateCount)
serviceDeliveryJsonObject["RecordedCallInserts"] = recordedCallInsertCount serviceDeliveryJsonObject["RecordedCallNone"] = atomic.LoadInt64(&recordedCallNoneCount)
serviceDeliveryJsonObject["RecordedCallUpdates"] = recordedCallUpdateCount serviceDeliveryJsonObject["StartTime"] = startTime.Format(time.RFC3339)
serviceDeliveryJsonObject["RecordedCallNone"] = recordedCallNoneCount serviceDeliveryJsonObject["EndTime"] = endTime.Format(time.RFC3339)
serviceDeliveryJsonObject["Duration"] = endTime.Sub(startTime).String()
// Convert JSON object to JSON string // Convert JSON object to JSON string
serviceDeliveryJsonString, err := json.Marshal(serviceDeliveryJsonObject) serviceDeliveryJsonString, err := json.Marshal(serviceDeliveryJsonObject)
@@ -438,4 +492,6 @@ func DBData(data *data.Data) {
if err != nil { if err != nil {
log.Fatal(err) log.Fatal(err)
} }
fmt.Println("Finished with this ServiceDelivery!")
} }

10
go.mod
View File

@@ -1,10 +1,10 @@
module ti1 module ti1
go 1.23.4 go 1.26.0
require github.com/lib/pq v1.10.9
require ( require (
github.com/valkey-io/valkey-go v1.0.52 // indirect github.com/lib/pq v1.11.2
golang.org/x/sys v0.24.0 // indirect github.com/valkey-io/valkey-go v1.0.71
) )
require golang.org/x/sys v0.41.0 // indirect

6
go.sum
View File

@@ -1,6 +1,12 @@
github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw= github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw=
github.com/lib/pq v1.10.9/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o= github.com/lib/pq v1.10.9/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o=
github.com/lib/pq v1.11.2 h1:x6gxUeu39V0BHZiugWe8LXZYZ+Utk7hSJGThs8sdzfs=
github.com/lib/pq v1.11.2/go.mod h1:/p+8NSbOcwzAEI7wiMXFlgydTwcgTr3OSKMsD2BitpA=
github.com/valkey-io/valkey-go v1.0.52 h1:ojrR736satGucqpllYzal8fUrNNROc11V10zokAyIYg= github.com/valkey-io/valkey-go v1.0.52 h1:ojrR736satGucqpllYzal8fUrNNROc11V10zokAyIYg=
github.com/valkey-io/valkey-go v1.0.52/go.mod h1:BXlVAPIL9rFQinSFM+N32JfWzfCaUAqBpZkc4vPY6fM= github.com/valkey-io/valkey-go v1.0.52/go.mod h1:BXlVAPIL9rFQinSFM+N32JfWzfCaUAqBpZkc4vPY6fM=
github.com/valkey-io/valkey-go v1.0.71 h1:tuKjGVLd7/I8CyUwqAq5EaD7isxQdlvJzXo3jS8pZW0=
github.com/valkey-io/valkey-go v1.0.71/go.mod h1:VGhZ6fs68Qrn2+OhH+6waZH27bjpgQOiLyUQyXuYK5k=
golang.org/x/sys v0.24.0 h1:Twjiwq9dn6R1fQcyiK+wQyHWfaz/BJB+YIpzU/Cv3Xg= golang.org/x/sys v0.24.0 h1:Twjiwq9dn6R1fQcyiK+wQyHWfaz/BJB+YIpzU/Cv3Xg=
golang.org/x/sys v0.24.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/sys v0.24.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.41.0 h1:Ivj+2Cp/ylzLiEU89QhWblYnOE9zerudt9Ftecq2C6k=
golang.org/x/sys v0.41.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks=

View File

@@ -9,7 +9,7 @@ import (
) )
func main() { func main() {
log.Println("ti1 v0.2.1") log.Println("ti1 v1.1.0")
log.Println("Starting...") log.Println("Starting...")
// Setup the database // Setup the database
@@ -34,9 +34,9 @@ func main() {
log.Println("finished in", time.Since(start)) log.Println("finished in", time.Since(start))
elapsed := time.Since(start) elapsed := time.Since(start)
if elapsed < 5*time.Minute { if elapsed < 20*time.Second {
log.Printf("starting again in %v", 5*time.Minute-elapsed) log.Printf("starting again in %v", 20*time.Second-elapsed)
time.Sleep(5*time.Minute - elapsed) time.Sleep(20*time.Second - elapsed)
} }
} }
} }