6 Commits
v1.0 ... dev

Author SHA1 Message Date
pigwin
1bcbbf6f44 dette var jeg dårlig på 2025-02-08 14:58:08 +00:00
pigwin
8f973853bd XD 2025-02-08 14:55:11 +00:00
pigwin
6632c38c0c stay code :P 2025-02-08 14:48:20 +00:00
pigwin
8c0bd734c6 Update config file path in main.go 2025-02-08 12:27:21 +00:00
pigwin
087b0ec637 me feal dum 2025-02-08 12:23:27 +00:00
pigwin
e90e35cfbc Add dataset ID and excluded dataset IDz 2025-02-08 12:04:39 +00:00
19 changed files with 522 additions and 1138 deletions

View File

@@ -30,7 +30,7 @@ jobs:
- name: Get commit version - name: Get commit version
id: commit-version id: commit-version
run: | run: |
COMMIT_MSG=$(git log -1 --pretty=%B | head -n1 | xargs) COMMIT_MSG=$(git log -1 --pretty=%B)
echo "Commit message: $COMMIT_MSG" # Debugging output echo "Commit message: $COMMIT_MSG" # Debugging output
# Updated regex to handle both vX.Y, vX.Y.Z, and vX.Y-pre-release formats # Updated regex to handle both vX.Y, vX.Y.Z, and vX.Y-pre-release formats
if [[ "$COMMIT_MSG" =~ ^v[0-9]+\.[0-9]+(\.[0-9]+)?(-[a-zA-Z0-9._-]+)?$ ]]; then if [[ "$COMMIT_MSG" =~ ^v[0-9]+\.[0-9]+(\.[0-9]+)?(-[a-zA-Z0-9._-]+)?$ ]]; then
@@ -55,4 +55,4 @@ jobs:
if [[ "${{ env.VERSION }}" != "dev" ]]; then if [[ "${{ env.VERSION }}" != "dev" ]]; then
docker tag ti1:${{ env.VERSION }} ${{ secrets.DOCKER_USERNAME }}/ti1:${{ env.VERSION }} docker tag ti1:${{ env.VERSION }} ${{ secrets.DOCKER_USERNAME }}/ti1:${{ env.VERSION }}
docker push ${{ secrets.DOCKER_USERNAME }}/ti1:${{ env.VERSION }} docker push ${{ secrets.DOCKER_USERNAME }}/ti1:${{ env.VERSION }}
fi fi

View File

@@ -138,7 +138,7 @@ nano postgres_data/postgresql.conf
Change the following values Change the following values
```conf ```conf
listen_addresses = '*' listen_addresses = '*'
max_connections = 200 max_connections = 100
shared_buffers = 16GB shared_buffers = 16GB
work_mem = 256MB work_mem = 256MB
maintenance_work_mem = 2GB maintenance_work_mem = 2GB

View File

@@ -8,11 +8,13 @@
"sslmode": "disable" "sslmode": "disable"
}, },
"valkey": { "valkey": {
"host": "127.0.0.1", "host": "127.0.0.1",
"port": "6379", "port": "6379",
"max_conns": 100, "max_conns": 50,
"timeout_ms": 2000, "timeout_ms": 5000,
"password": "the_valkey_password" "password": "the_valkey_password"
}, },
"temp": "value" "temp": "value",
"dataset_id": "",
"excluded_dataset_ids": ""
} }

View File

@@ -7,6 +7,8 @@ import (
"strconv" "strconv"
) )
const configFilePath = "config/conf.json"
type Config struct { type Config struct {
Database struct { Database struct {
Host string `json:"host"` Host string `json:"host"`
@@ -21,14 +23,16 @@ type Config struct {
Port string `json:"port"` Port string `json:"port"`
MaxConns int `json:"max_conns"` MaxConns int `json:"max_conns"`
TimeoutMs int `json:"timeout_ms"` TimeoutMs int `json:"timeout_ms"`
Password string `json:"password"` // Add this line Password string `json:"password"`
} `json:"valkey"` } `json:"valkey"`
Temp string `json:"temp"` Temp string `json:"temp"`
DatasetId string `json:"dataset_id"`
ExcludedDatasetIds string `json:"excluded_dataset_ids"`
} }
func LoadConfig(file string) (Config, error) { func LoadConfig() (Config, error) {
var config Config var config Config
configFile, err := os.Open(file) configFile, err := os.Open(configFilePath)
if err != nil { if err != nil {
return config, fmt.Errorf("failed to open config file: %w", err) return config, fmt.Errorf("failed to open config file: %w", err)
} }
@@ -80,5 +84,13 @@ func LoadConfig(file string) (Config, error) {
} }
} }
// Override datasetId and excludedDatasetIds with environment variables
if datasetId := os.Getenv("DATASET_ID"); datasetId != "" {
config.DatasetId = datasetId
}
if excludedDatasetIds := os.Getenv("EXCLUDED_DATASET_IDS"); excludedDatasetIds != "" {
config.ExcludedDatasetIds = excludedDatasetIds
}
return config, nil return config, nil
} }

View File

@@ -11,7 +11,7 @@ import (
func ConnectToPostgreSQL() (*sql.DB, error) { func ConnectToPostgreSQL() (*sql.DB, error) {
fmt.Println("Connecting to PostgreSQL...") fmt.Println("Connecting to PostgreSQL...")
config, err := LoadConfig("config/conf.json") config, err := LoadConfig()
if err != nil { if err != nil {
return nil, err return nil, err
} }
@@ -27,11 +27,10 @@ func ConnectToPostgreSQL() (*sql.DB, error) {
return nil, err return nil, err
} }
// Set connection pool settings for high concurrency // Set connection pool settings
db.SetMaxOpenConns(50) // Maximum number of open connections to the database db.SetMaxOpenConns(25) // Maximum number of open connections to the database
db.SetMaxIdleConns(25) // Maximum number of connections in the idle connection pool db.SetMaxIdleConns(25) // Maximum number of connections in the idle connection pool
db.SetConnMaxLifetime(1 * time.Hour) // Maximum amount of time a connection may be reused db.SetConnMaxLifetime(1 * time.Hour) // Maximum amount of time a connection may be reused
db.SetConnMaxIdleTime(5 * time.Minute) // Maximum amount of time a connection may be idle
fmt.Println("Connection to PostgreSQL opened successfully :D") fmt.Println("Connection to PostgreSQL opened successfully :D")
@@ -58,7 +57,7 @@ func DisconnectFromPostgreSQL(db *sql.DB) error {
} }
func PrintDBConfig() { func PrintDBConfig() {
config, err := LoadConfig("config/conf.json") config, err := LoadConfig()
if err != nil { if err != nil {
fmt.Println("Error loading config:", err) fmt.Println("Error loading config:", err)
return return

View File

@@ -17,7 +17,7 @@ type ValkeyConfig struct {
Port string `json:"port"` Port string `json:"port"`
MaxConns int `json:"max_conns"` MaxConns int `json:"max_conns"`
TimeoutMs int `json:"timeout_ms"` TimeoutMs int `json:"timeout_ms"`
Password string `json:"password"` // Add this line Password string `json:"password"`
} }
func LoadValkeyConfig(file string) (ValkeyConfig, error) { func LoadValkeyConfig(file string) (ValkeyConfig, error) {
@@ -56,9 +56,9 @@ func LoadValkeyConfig(file string) (ValkeyConfig, error) {
return config, nil return config, nil
} }
func ConnectToValkey(configPath string) (valkey.Client, error) { func ConnectToValkey() (valkey.Client, error) {
fmt.Println("Loading configuration...") fmt.Println("Loading configuration...")
config, err := LoadConfig(configPath) config, err := LoadConfig()
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to load config: %v", err) return nil, fmt.Errorf("failed to load config: %v", err)
} }

View File

@@ -1,12 +1,10 @@
package data package data
import ( import (
"crypto/tls"
"encoding/xml" "encoding/xml"
"fmt"
"log" "log"
"net/http" "net/http"
"time" "strings"
) )
type Data struct { type Data struct {
@@ -129,87 +127,31 @@ type Data struct {
} `xml:"ServiceDelivery"` } `xml:"ServiceDelivery"`
} }
func FetchData(timestamp string) (*Data, error) { func FetchData(timestamp, datasetId, excludedDatasetIds string) (*Data, error) {
// Configure HTTP client with timeout and HTTP/1.1 to avoid HTTP/2 stream errors client := &http.Client{}
transport := &http.Transport{
TLSClientConfig: &tls.Config{
MinVersion: tls.VersionTLS12,
},
MaxIdleConns: 10,
MaxIdleConnsPerHost: 10,
IdleConnTimeout: 90 * time.Second,
DisableCompression: false,
ForceAttemptHTTP2: false, // Disable HTTP/2 to avoid stream errors
TLSHandshakeTimeout: 10 * time.Second,
ResponseHeaderTimeout: 30 * time.Second,
ExpectContinueTimeout: 1 * time.Second,
}
client := &http.Client{
Transport: transport,
Timeout: 180 * time.Second, // 3 minute timeout for large datasets
}
requestorId := "ti1-" + timestamp requestorId := "ti1-" + timestamp
url := "https://api.entur.io/realtime/v1/rest/et?useOriginalId=true&maxSize=100000&requestorId=" + requestorId
// Retry logic for transient failures baseURL := "https://api.entur.io/realtime/v1/rest/et?useOriginalId=true&maxSize=100000&requestorId=" + requestorId
var resp *http.Response
var err error
var data *Data
maxRetries := 3
for attempt := 1; attempt <= maxRetries; attempt++ { if datasetId != "" {
log.Printf("Fetching data from URL (attempt %d/%d): %s", attempt, maxRetries, url) baseURL += "&datasetId=" + datasetId
} else if excludedDatasetIds != "" {
resp, err = client.Get(url) baseURL += "&excludedDatasetIds=" + strings.ReplaceAll(excludedDatasetIds, ",", "&excludedDatasetIds=")
if err != nil {
log.Printf("Request failed: %v", err)
if attempt < maxRetries {
waitTime := time.Duration(attempt*2) * time.Second
log.Printf("Retrying in %v...", waitTime)
time.Sleep(waitTime)
}
continue
}
// Check HTTP status code
if resp.StatusCode != http.StatusOK {
resp.Body.Close()
err = fmt.Errorf("HTTP error: %s (status code: %d)", resp.Status, resp.StatusCode)
log.Printf("%v", err)
if attempt < maxRetries {
waitTime := time.Duration(attempt*2) * time.Second
log.Printf("Retrying in %v...", waitTime)
time.Sleep(waitTime)
}
continue
}
// Try to decode the response
data = &Data{}
decoder := xml.NewDecoder(resp.Body)
err = decoder.Decode(data)
resp.Body.Close()
if err != nil {
log.Printf("Failed to decode XML: %v", err)
if attempt < maxRetries {
waitTime := time.Duration(attempt*2) * time.Second
log.Printf("Retrying in %v...", waitTime)
time.Sleep(waitTime)
}
continue
}
// Success!
log.Printf("Successfully fetched and decoded data")
return data, nil
} }
// All retries failed log.Println("Fetching data from URL:", baseURL)
resp, err := client.Get(baseURL)
if err != nil { if err != nil {
return nil, err return nil, err
} }
return nil, fmt.Errorf("Failed to fetch data after %d attempts", maxRetries) defer resp.Body.Close()
data := &Data{}
decoder := xml.NewDecoder(resp.Body)
err = decoder.Decode(data)
if err != nil {
return nil, err
}
return data, nil
} }

View File

@@ -6,18 +6,11 @@ import (
"database/sql" "database/sql"
"encoding/hex" "encoding/hex"
"fmt" "fmt"
"sync"
"ti1/valki" "ti1/valki"
"github.com/valkey-io/valkey-go" "github.com/valkey-io/valkey-go"
) )
type CallResult struct {
ID int
Action string
Error error
}
func InsertOrUpdateEstimatedCall(ctx context.Context, db *sql.DB, values []interface{}, valkeyClient valkey.Client) (int, string, error) { func InsertOrUpdateEstimatedCall(ctx context.Context, db *sql.DB, values []interface{}, valkeyClient valkey.Client) (int, string, error) {
// Replace empty strings with nil for timestamp fields // Replace empty strings with nil for timestamp fields
for i, v := range values { for i, v := range values {
@@ -35,15 +28,19 @@ func InsertOrUpdateEstimatedCall(ctx context.Context, db *sql.DB, values []inter
} }
hash := md5.Sum([]byte(valuesString)) hash := md5.Sum([]byte(valuesString))
hashString := hex.EncodeToString(hash[:]) hashString := hex.EncodeToString(hash[:])
//fmt.Println("HashString:", hashString)
estimatedVehicleJourneyID := values[0] estimatedVehicleJourneyID := values[0]
orderID := values[1] orderID := values[1]
key := fmt.Sprintf("%v.%v", estimatedVehicleJourneyID, orderID) key := fmt.Sprintf("%v.%v", estimatedVehicleJourneyID, orderID)
//fmt.Printf("Estimated Vehicle Journey ID: %v, Order ID: %v\n", estimatedVehicleJourneyID, orderID)
var err error
// Get the MD5 hash from Valkey // Get the MD5 hash from Valkey
retrievedHash, err := valki.GetValkeyValue(ctx, valkeyClient, key) retrievedHash, err := valki.GetValkeyValue(ctx, valkeyClient, key)
if err != nil { if err != nil {
return 0, "", fmt.Errorf("failed to get value from Valkey: %w", err) return 0, "", fmt.Errorf("failed to get value from Valkey: %v", err)
} }
// Check if the retrieved value matches the original MD5 hash // Check if the retrieved value matches the original MD5 hash
@@ -67,60 +64,26 @@ func InsertOrUpdateEstimatedCall(ctx context.Context, db *sql.DB, values []inter
estimated_data = EXCLUDED.estimated_data estimated_data = EXCLUDED.estimated_data
RETURNING CASE WHEN xmax = 0 THEN 'insert' ELSE 'update' END, id; RETURNING CASE WHEN xmax = 0 THEN 'insert' ELSE 'update' END, id;
` `
stmt, err := db.Prepare(query)
if err != nil {
return 0, "", fmt.Errorf("error preparing statement: %v", err)
}
defer stmt.Close()
err = valki.SetValkeyValue(ctx, valkeyClient, key, hashString) err = valki.SetValkeyValue(ctx, valkeyClient, key, hashString)
if err != nil { if err != nil {
return 0, "", fmt.Errorf("failed to set value in Valkey: %w", err) return 0, "", fmt.Errorf("failed to set value in Valkey: %v", err)
} }
var action string var action string
var id int var id int
err = db.QueryRowContext(ctx, query, values...).Scan(&action, &id) err = stmt.QueryRow(values...).Scan(&action, &id)
if err != nil { if err != nil {
return 0, "", fmt.Errorf("error executing statement: %w", err) return 0, "", fmt.Errorf("error executing statement: %v", err)
} }
return id, action, nil return id, action, nil
} else {
//fmt.Printf("MATCH!!! Original Hash: %s, Retrieved Hash: %s\n", hashString, retrievedHash)
return 0, "none", nil
} }
return 0, "none", nil
}
// BatchInsertEstimatedCalls processes multiple estimated calls concurrently
func BatchInsertEstimatedCalls(ctx context.Context, db *sql.DB, batch [][]interface{}, valkeyClient valkey.Client, workerCount int) ([]CallResult, error) {
if len(batch) == 0 {
return nil, nil
}
results := make([]CallResult, len(batch))
jobs := make(chan int, len(batch))
var wg sync.WaitGroup
// Start workers
for w := 0; w < workerCount; w++ {
wg.Add(1)
go func() {
defer wg.Done()
for idx := range jobs {
select {
case <-ctx.Done():
return
default:
id, action, err := InsertOrUpdateEstimatedCall(ctx, db, batch[idx], valkeyClient)
results[idx] = CallResult{
ID: id,
Action: action,
Error: err,
}
}
}
}()
}
// Send jobs
for i := range batch {
jobs <- i
}
close(jobs)
wg.Wait()
return results, nil
} }

View File

@@ -1,135 +1,41 @@
package database package database
import ( import (
"context"
"database/sql" "database/sql"
"fmt" "fmt"
"sync"
) )
type EVJResult struct {
ID int
Action string
Error error
Index int // To maintain order
}
// PreparedStatements holds reusable prepared statements
type PreparedStatements struct {
evjStmt *sql.Stmt
ecStmt *sql.Stmt
rcStmt *sql.Stmt
mu sync.Mutex
}
func NewPreparedStatements(db *sql.DB) (*PreparedStatements, error) {
evjQuery := `
INSERT INTO estimatedvehiclejourney (servicedelivery, recordedattime, lineref, directionref, datasource, datedvehiclejourneyref, vehiclemode, dataframeref, originref, destinationref, operatorref, vehicleref, cancellation, other, firstservicedelivery)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $1)
ON CONFLICT (lineref, directionref, datasource, datedvehiclejourneyref)
DO UPDATE SET
servicedelivery = EXCLUDED.servicedelivery,
recordedattime = EXCLUDED.recordedattime,
vehiclemode = COALESCE(EXCLUDED.vehiclemode, estimatedvehiclejourney.vehiclemode),
dataframeref = COALESCE(EXCLUDED.dataframeref, estimatedvehiclejourney.dataframeref),
originref = COALESCE(EXCLUDED.originref, estimatedvehiclejourney.originref),
destinationref = COALESCE(EXCLUDED.destinationref, estimatedvehiclejourney.destinationref),
operatorref = COALESCE(EXCLUDED.operatorref, estimatedvehiclejourney.operatorref),
vehicleref = COALESCE(EXCLUDED.vehicleref, estimatedvehiclejourney.vehicleref),
cancellation = COALESCE(EXCLUDED.cancellation, estimatedvehiclejourney.cancellation),
other = COALESCE(EXCLUDED.other, estimatedvehiclejourney.other)
RETURNING CASE WHEN xmax = 0 THEN 'insert' ELSE 'update' END, id;
`
evjStmt, err := db.Prepare(evjQuery)
if err != nil {
return nil, fmt.Errorf("failed to prepare EVJ statement: %w", err)
}
return &PreparedStatements{
evjStmt: evjStmt,
}, nil
}
func (ps *PreparedStatements) Close() {
if ps.evjStmt != nil {
ps.evjStmt.Close()
}
if ps.ecStmt != nil {
ps.ecStmt.Close()
}
if ps.rcStmt != nil {
ps.rcStmt.Close()
}
}
func InsertOrUpdateEstimatedVehicleJourney(db *sql.DB, values []interface{}) (int, string, error) { func InsertOrUpdateEstimatedVehicleJourney(db *sql.DB, values []interface{}) (int, string, error) {
query := ` query := `
INSERT INTO estimatedvehiclejourney (servicedelivery, recordedattime, lineref, directionref, datasource, datedvehiclejourneyref, vehiclemode, dataframeref, originref, destinationref, operatorref, vehicleref, cancellation, other, firstservicedelivery) INSERT INTO estimatedvehiclejourney (servicedelivery, recordedattime, lineref, directionref, datasource, datedvehiclejourneyref, vehiclemode, dataframeref, originref, destinationref, operatorref, vehicleref, cancellation, other, firstservicedelivery)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $1) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $1)
ON CONFLICT (lineref, directionref, datasource, datedvehiclejourneyref) ON CONFLICT (lineref, directionref, datasource, datedvehiclejourneyref)
DO UPDATE SET DO UPDATE SET
servicedelivery = EXCLUDED.servicedelivery, servicedelivery = EXCLUDED.servicedelivery,
recordedattime = EXCLUDED.recordedattime, recordedattime = EXCLUDED.recordedattime,
vehiclemode = COALESCE(EXCLUDED.vehiclemode, estimatedvehiclejourney.vehiclemode), vehiclemode = COALESCE(EXCLUDED.vehiclemode, estimatedvehiclejourney.vehiclemode),
dataframeref = COALESCE(EXCLUDED.dataframeref, estimatedvehiclejourney.dataframeref), dataframeref = COALESCE(EXCLUDED.dataframeref, estimatedvehiclejourney.dataframeref),
originref = COALESCE(EXCLUDED.originref, estimatedvehiclejourney.originref), originref = COALESCE(EXCLUDED.originref, estimatedvehiclejourney.originref),
destinationref = COALESCE(EXCLUDED.destinationref, estimatedvehiclejourney.destinationref), destinationref = COALESCE(EXCLUDED.destinationref, estimatedvehiclejourney.destinationref),
operatorref = COALESCE(EXCLUDED.operatorref, estimatedvehiclejourney.operatorref), operatorref = COALESCE(EXCLUDED.operatorref, estimatedvehiclejourney.operatorref),
vehicleref = COALESCE(EXCLUDED.vehicleref, estimatedvehiclejourney.vehicleref), vehicleref = COALESCE(EXCLUDED.vehicleref, estimatedvehiclejourney.vehicleref),
cancellation = COALESCE(EXCLUDED.cancellation, estimatedvehiclejourney.cancellation), cancellation = COALESCE(EXCLUDED.cancellation, estimatedvehiclejourney.cancellation),
other = COALESCE(EXCLUDED.other, estimatedvehiclejourney.other) other = COALESCE(EXCLUDED.other, estimatedvehiclejourney.other)
RETURNING CASE WHEN xmax = 0 THEN 'insert' ELSE 'update' END, id; RETURNING CASE WHEN xmax = 0 THEN 'insert' ELSE 'update' END, id;
` `
stmt, err := db.Prepare(query)
if err != nil {
return 0, "", fmt.Errorf("error preparing statement: %v", err)
}
defer stmt.Close()
var action string var action string
var id int var id int
err := db.QueryRow(query, values...).Scan(&action, &id) err = stmt.QueryRow(values...).Scan(&action, &id)
if err != nil { if err != nil {
return 0, "", fmt.Errorf("error executing EVJ statement: %w", err) return 0, "", fmt.Errorf("error executing statement: %v", err)
} }
return id, action, nil return id, action, nil
} }
// BatchInsertEVJ processes multiple EVJ inserts concurrently
func BatchInsertEVJ(ctx context.Context, db *sql.DB, batch [][]interface{}, workerCount int) ([]EVJResult, error) {
if len(batch) == 0 {
return nil, nil
}
results := make([]EVJResult, len(batch))
jobs := make(chan int, len(batch))
var wg sync.WaitGroup
// Start workers
for w := 0; w < workerCount; w++ {
wg.Add(1)
go func() {
defer wg.Done()
for idx := range jobs {
select {
case <-ctx.Done():
return
default:
id, action, err := InsertOrUpdateEstimatedVehicleJourney(db, batch[idx])
results[idx] = EVJResult{
ID: id,
Action: action,
Error: err,
Index: idx,
}
}
}
}()
}
// Send jobs
for i := range batch {
jobs <- i
}
close(jobs)
wg.Wait()
return results, nil
}

View File

@@ -6,7 +6,6 @@ import (
"database/sql" "database/sql"
"encoding/hex" "encoding/hex"
"fmt" "fmt"
"sync"
"ti1/valki" "ti1/valki"
"github.com/valkey-io/valkey-go" "github.com/valkey-io/valkey-go"
@@ -34,10 +33,12 @@ func InsertOrUpdateRecordedCall(ctx context.Context, db *sql.DB, values []interf
orderID := values[1] orderID := values[1]
key := fmt.Sprintf("%v.%v", estimatedVehicleJourneyID, orderID) key := fmt.Sprintf("%v.%v", estimatedVehicleJourneyID, orderID)
var err error
// Get the MD5 hash from Valkey // Get the MD5 hash from Valkey
retrievedHash, err := valki.GetValkeyValue(ctx, valkeyClient, key) retrievedHash, err := valki.GetValkeyValue(ctx, valkeyClient, key)
if err != nil { if err != nil {
return 0, "", fmt.Errorf("failed to get value from Valkey: %w", err) return 0, "", fmt.Errorf("failed to get value from Valkey: %v", err)
} }
// Check if the retrieved value matches the original MD5 hash // Check if the retrieved value matches the original MD5 hash
@@ -64,60 +65,25 @@ func InsertOrUpdateRecordedCall(ctx context.Context, db *sql.DB, values []interf
recorded_data = EXCLUDED.recorded_data recorded_data = EXCLUDED.recorded_data
RETURNING CASE WHEN xmax = 0 THEN 'insert' ELSE 'update' END, id; RETURNING CASE WHEN xmax = 0 THEN 'insert' ELSE 'update' END, id;
` `
stmt, err := db.Prepare(query)
if err != nil {
return 0, "", fmt.Errorf("error preparing statement: %v", err)
}
defer stmt.Close()
err = valki.SetValkeyValue(ctx, valkeyClient, key, hashString) err = valki.SetValkeyValue(ctx, valkeyClient, key, hashString)
if err != nil { if err != nil {
return 0, "", fmt.Errorf("failed to set value in Valkey: %w", err) return 0, "", fmt.Errorf("failed to set value in Valkey: %v", err)
} }
var action string var action string
var id int var id int
err = db.QueryRowContext(ctx, query, values...).Scan(&action, &id) err = stmt.QueryRow(values...).Scan(&action, &id)
if err != nil { if err != nil {
return 0, "", fmt.Errorf("error executing statement: %w", err) return 0, "", fmt.Errorf("error executing statement: %v", err)
} }
return id, action, nil return id, action, nil
} else {
return 0, "none", nil
} }
return 0, "none", nil
}
// BatchInsertRecordedCalls processes multiple recorded calls concurrently
func BatchInsertRecordedCalls(ctx context.Context, db *sql.DB, batch [][]interface{}, valkeyClient valkey.Client, workerCount int) ([]CallResult, error) {
if len(batch) == 0 {
return nil, nil
}
results := make([]CallResult, len(batch))
jobs := make(chan int, len(batch))
var wg sync.WaitGroup
// Start workers
for w := 0; w < workerCount; w++ {
wg.Add(1)
go func() {
defer wg.Done()
for idx := range jobs {
select {
case <-ctx.Done():
return
default:
id, action, err := InsertOrUpdateRecordedCall(ctx, db, batch[idx], valkeyClient)
results[idx] = CallResult{
ID: id,
Action: action,
Error: err,
}
}
}
}()
}
// Send jobs
for i := range batch {
jobs <- i
}
close(jobs)
wg.Wait()
return results, nil
} }

View File

@@ -3,21 +3,52 @@ package database
import ( import (
"database/sql" "database/sql"
"fmt" "fmt"
"ti1/config"
) )
func InsertServiceDelivery(db *sql.DB, responseTimestamp string, recordedAtTime string) (int, error) { func GetDatasetVariable(config config.Config) string {
var id int if config.DatasetId != "" {
err := db.QueryRow("INSERT INTO public.ServiceDelivery (ResponseTimestamp, RecordedAtTime) VALUES ($1, $2) RETURNING ID", responseTimestamp, recordedAtTime).Scan(&id) fmt.Println(config.DatasetId)
if err != nil { return config.DatasetId
return 0, fmt.Errorf("failed to insert service delivery: %w", err) } else if config.ExcludedDatasetIds != "" {
result := "EX." + config.ExcludedDatasetIds
fmt.Println(result)
return result
} }
fmt.Println("")
return ""
}
func InsertServiceDelivery(db *sql.DB, responseTimestamp string, recordedAtTime string) (int, error) {
fmt.Println("Inserting ServiceDelivery...")
var id int
// Load configuration
config, err := config.LoadConfig()
if err != nil {
fmt.Println("Error loading config:", err)
return 0, err
}
// Get dataset variable
datasetVariable := GetDatasetVariable(config)
err = db.QueryRow("INSERT INTO public.ServiceDelivery (ResponseTimestamp, RecordedAtTime, Source) VALUES ($1, $2, $3) RETURNING ID", responseTimestamp, recordedAtTime, datasetVariable).Scan(&id)
if err != nil {
fmt.Println(err)
return 0, err
}
//fmt.Println("ServiceDelivery inserted successfully! (", id, ")")
return id, nil return id, nil
} }
func UpdateServiceDeliveryData(db *sql.DB, id int, data string) error { func UpdateServiceDeliveryData(db *sql.DB, id int, data string) error {
fmt.Println("Updating ServiceDelivery data...")
_, err := db.Exec("UPDATE public.ServiceDelivery SET Data = $1 WHERE ID = $2", data, id) _, err := db.Exec("UPDATE public.ServiceDelivery SET Data = $1 WHERE ID = $2", data, id)
if err != nil { if err != nil {
return fmt.Errorf("failed to update service delivery data: %w", err) fmt.Println(err)
return err
} }
fmt.Println("Finished with this ServiceDelivery!")
return nil return nil
} }

View File

@@ -68,6 +68,7 @@ func SetupDB() error {
id INTEGER PRIMARY KEY DEFAULT nextval('public.servicedelivery_id_seq'), id INTEGER PRIMARY KEY DEFAULT nextval('public.servicedelivery_id_seq'),
responsetimestamp TIMESTAMPTZ, responsetimestamp TIMESTAMPTZ,
recordedattime TIMESTAMPTZ, recordedattime TIMESTAMPTZ,
source VARCHAR,
data JSON data JSON
);`, );`,
} }

View File

@@ -1,89 +0,0 @@
package databaseold
import (
"context"
"crypto/md5"
"database/sql"
"encoding/hex"
"fmt"
"ti1/valki"
"github.com/valkey-io/valkey-go"
)
func InsertOrUpdateEstimatedCall(ctx context.Context, db *sql.DB, values []interface{}, valkeyClient valkey.Client) (int, string, error) {
// Replace empty strings with nil for timestamp fields
for i, v := range values {
if str, ok := v.(string); ok && str == "" {
values[i] = nil
}
}
// Convert values to a single string and hash it using MD5
var valuesString string
for _, v := range values {
if v != nil {
valuesString += fmt.Sprintf("%v", v)
}
}
hash := md5.Sum([]byte(valuesString))
hashString := hex.EncodeToString(hash[:])
//fmt.Println("HashString:", hashString)
estimatedVehicleJourneyID := values[0]
orderID := values[1]
key := fmt.Sprintf("%v.%v", estimatedVehicleJourneyID, orderID)
//fmt.Printf("Estimated Vehicle Journey ID: %v, Order ID: %v\n", estimatedVehicleJourneyID, orderID)
var err error
// Get the MD5 hash from Valkey
retrievedHash, err := valki.GetValkeyValue(ctx, valkeyClient, key)
if err != nil {
return 0, "", fmt.Errorf("failed to get value from Valkey: %v", err)
}
// Check if the retrieved value matches the original MD5 hash
if retrievedHash != hashString {
query := `
INSERT INTO calls (
estimatedvehiclejourney, "order", stoppointref,
aimeddeparturetime, expecteddeparturetime,
aimedarrivaltime, expectedarrivaltime,
cancellation, estimated_data
)
VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9)
ON CONFLICT (estimatedvehiclejourney, "order")
DO UPDATE SET
stoppointref = EXCLUDED.stoppointref,
aimeddeparturetime = EXCLUDED.aimeddeparturetime,
expecteddeparturetime = EXCLUDED.expecteddeparturetime,
aimedarrivaltime = EXCLUDED.aimedarrivaltime,
expectedarrivaltime = EXCLUDED.expectedarrivaltime,
cancellation = EXCLUDED.cancellation,
estimated_data = EXCLUDED.estimated_data
RETURNING CASE WHEN xmax = 0 THEN 'insert' ELSE 'update' END, id;
`
stmt, err := db.Prepare(query)
if err != nil {
return 0, "", fmt.Errorf("error preparing statement: %v", err)
}
defer stmt.Close()
err = valki.SetValkeyValue(ctx, valkeyClient, key, hashString)
if err != nil {
return 0, "", fmt.Errorf("failed to set value in Valkey: %v", err)
}
var action string
var id int
err = stmt.QueryRow(values...).Scan(&action, &id)
if err != nil {
return 0, "", fmt.Errorf("error executing statement: %v", err)
}
return id, action, nil
} else {
//fmt.Printf("MATCH!!! Original Hash: %s, Retrieved Hash: %s\n", hashString, retrievedHash)
return 0, "none", nil
}
}

View File

@@ -1,41 +0,0 @@
package databaseold
import (
"database/sql"
"fmt"
)
func InsertOrUpdateEstimatedVehicleJourney(db *sql.DB, values []interface{}) (int, string, error) {
query := `
INSERT INTO estimatedvehiclejourney (servicedelivery, recordedattime, lineref, directionref, datasource, datedvehiclejourneyref, vehiclemode, dataframeref, originref, destinationref, operatorref, vehicleref, cancellation, other, firstservicedelivery)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $1)
ON CONFLICT (lineref, directionref, datasource, datedvehiclejourneyref)
DO UPDATE SET
servicedelivery = EXCLUDED.servicedelivery,
recordedattime = EXCLUDED.recordedattime,
vehiclemode = COALESCE(EXCLUDED.vehiclemode, estimatedvehiclejourney.vehiclemode),
dataframeref = COALESCE(EXCLUDED.dataframeref, estimatedvehiclejourney.dataframeref),
originref = COALESCE(EXCLUDED.originref, estimatedvehiclejourney.originref),
destinationref = COALESCE(EXCLUDED.destinationref, estimatedvehiclejourney.destinationref),
operatorref = COALESCE(EXCLUDED.operatorref, estimatedvehiclejourney.operatorref),
vehicleref = COALESCE(EXCLUDED.vehicleref, estimatedvehiclejourney.vehicleref),
cancellation = COALESCE(EXCLUDED.cancellation, estimatedvehiclejourney.cancellation),
other = COALESCE(EXCLUDED.other, estimatedvehiclejourney.other)
RETURNING CASE WHEN xmax = 0 THEN 'insert' ELSE 'update' END, id;
`
stmt, err := db.Prepare(query)
if err != nil {
return 0, "", fmt.Errorf("error preparing statement: %v", err)
}
defer stmt.Close()
var action string
var id int
err = stmt.QueryRow(values...).Scan(&action, &id)
if err != nil {
return 0, "", fmt.Errorf("error executing statement: %v", err)
}
return id, action, nil
}

View File

@@ -1,89 +0,0 @@
package databaseold
import (
"context"
"crypto/md5"
"database/sql"
"encoding/hex"
"fmt"
"ti1/valki"
"github.com/valkey-io/valkey-go"
)
func InsertOrUpdateRecordedCall(ctx context.Context, db *sql.DB, values []interface{}, valkeyClient valkey.Client) (int, string, error) {
// Replace empty strings with nil for timestamp fields
for i, v := range values {
if str, ok := v.(string); ok && str == "" {
values[i] = nil
}
}
// Convert values to a single string and hash it using MD5
var valuesString string
for _, v := range values {
if v != nil {
valuesString += fmt.Sprintf("%v", v)
}
}
hash := md5.Sum([]byte(valuesString))
hashString := hex.EncodeToString(hash[:])
estimatedVehicleJourneyID := values[0]
orderID := values[1]
key := fmt.Sprintf("%v.%v", estimatedVehicleJourneyID, orderID)
var err error
// Get the MD5 hash from Valkey
retrievedHash, err := valki.GetValkeyValue(ctx, valkeyClient, key)
if err != nil {
return 0, "", fmt.Errorf("failed to get value from Valkey: %v", err)
}
// Check if the retrieved value matches the original MD5 hash
if retrievedHash != hashString {
query := `
INSERT INTO calls (
estimatedvehiclejourney, "order", stoppointref,
aimeddeparturetime, expecteddeparturetime,
aimedarrivaltime, expectedarrivaltime,
cancellation, actualdeparturetime, actualarrivaltime,
recorded_data
)
VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11)
ON CONFLICT (estimatedvehiclejourney, "order")
DO UPDATE SET
stoppointref = EXCLUDED.stoppointref,
aimeddeparturetime = EXCLUDED.aimeddeparturetime,
expecteddeparturetime = EXCLUDED.expecteddeparturetime,
aimedarrivaltime = EXCLUDED.aimedarrivaltime,
expectedarrivaltime = EXCLUDED.expectedarrivaltime,
cancellation = EXCLUDED.cancellation,
actualdeparturetime = EXCLUDED.actualdeparturetime,
actualarrivaltime = EXCLUDED.actualarrivaltime,
recorded_data = EXCLUDED.recorded_data
RETURNING CASE WHEN xmax = 0 THEN 'insert' ELSE 'update' END, id;
`
stmt, err := db.Prepare(query)
if err != nil {
return 0, "", fmt.Errorf("error preparing statement: %v", err)
}
defer stmt.Close()
err = valki.SetValkeyValue(ctx, valkeyClient, key, hashString)
if err != nil {
return 0, "", fmt.Errorf("failed to set value in Valkey: %v", err)
}
var action string
var id int
err = stmt.QueryRow(values...).Scan(&action, &id)
if err != nil {
return 0, "", fmt.Errorf("error executing statement: %v", err)
}
return id, action, nil
} else {
return 0, "none", nil
}
}

View File

@@ -1,30 +0,0 @@
package databaseold
import (
"database/sql"
"fmt"
)
func InsertServiceDelivery(db *sql.DB, responseTimestamp string, recordedAtTime string) (int, error) {
fmt.Println("Inserting ServiceDelivery...")
var id int
err := db.QueryRow("INSERT INTO public.ServiceDelivery (ResponseTimestamp, RecordedAtTime) VALUES ($1, $2) RETURNING ID", responseTimestamp, recordedAtTime).Scan(&id)
if err != nil {
fmt.Println(err)
return 0, err
}
//fmt.Println("ServiceDelivery inserted successfully! (", id, ")")
return id, nil
}
func UpdateServiceDeliveryData(db *sql.DB, id int, data string) error {
fmt.Println("Updating ServiceDelivery data...")
_, err := db.Exec("UPDATE public.ServiceDelivery SET Data = $1 WHERE ID = $2", data, id)
if err != nil {
fmt.Println(err)
return err
}
fmt.Println("Finished with this ServiceDelivery!")
return nil
}

View File

@@ -1,140 +0,0 @@
package databaseold
import (
"fmt"
"ti1/config"
)
func SetupDB() error {
fmt.Println("Setting up the database...")
// Connect to PostgreSQL
db, err := config.ConnectToPostgreSQL()
if err != nil {
return fmt.Errorf("failed to connect to database: %w", err)
}
defer config.DisconnectFromPostgreSQL(db)
// Create sequences if they do not exist
sequences := []string{
"CREATE SEQUENCE IF NOT EXISTS public.calls_id_seq",
"CREATE SEQUENCE IF NOT EXISTS public.estimatedvehiclejourney_id_seq",
"CREATE SEQUENCE IF NOT EXISTS public.servicedelivery_id_seq",
}
for _, seq := range sequences {
_, err := db.Exec(seq)
if err != nil {
return fmt.Errorf("failed to create sequence: %w", err)
}
}
// Check if tables exist and have the correct structure
tables := map[string]string{
"calls": `CREATE TABLE IF NOT EXISTS public.calls (
id BIGINT PRIMARY KEY DEFAULT nextval('public.calls_id_seq'),
estimatedvehiclejourney BIGINT,
"order" INTEGER,
stoppointref VARCHAR,
aimeddeparturetime TIMESTAMP,
expecteddeparturetime TIMESTAMP,
aimedarrivaltime TIMESTAMP,
expectedarrivaltime TIMESTAMP,
cancellation VARCHAR,
actualdeparturetime TIMESTAMP,
actualarrivaltime TIMESTAMP,
estimated_data JSON,
recorded_data JSON
);`,
"estimatedvehiclejourney": `CREATE TABLE IF NOT EXISTS public.estimatedvehiclejourney (
id BIGINT PRIMARY KEY DEFAULT nextval('public.estimatedvehiclejourney_id_seq'),
servicedelivery INTEGER,
recordedattime TIMESTAMP,
lineref VARCHAR,
directionref VARCHAR,
datasource VARCHAR,
datedvehiclejourneyref VARCHAR,
vehiclemode VARCHAR,
dataframeref VARCHAR,
originref VARCHAR,
destinationref VARCHAR,
operatorref VARCHAR,
vehicleref VARCHAR,
cancellation VARCHAR,
other JSON,
firstservicedelivery INTEGER
);`,
"servicedelivery": `CREATE TABLE IF NOT EXISTS public.servicedelivery (
id INTEGER PRIMARY KEY DEFAULT nextval('public.servicedelivery_id_seq'),
responsetimestamp TIMESTAMPTZ,
recordedattime TIMESTAMPTZ,
data JSON
);`,
}
for table, createStmt := range tables {
var exists bool
err := db.QueryRow(fmt.Sprintf("SELECT EXISTS (SELECT 1 FROM information_schema.tables WHERE table_name = '%s')", table)).Scan(&exists)
if err != nil {
return fmt.Errorf("failed to check if table %s exists: %w", table, err)
}
if !exists {
_, err := db.Exec(createStmt)
if err != nil {
return fmt.Errorf("failed to create table %s: %w", table, err)
}
fmt.Printf("Table %s created successfully!\n", table)
} else {
fmt.Printf("Table %s already exists.\n", table)
}
}
// Check if the unique constraint exists before adding it to calls table
var constraintExists bool
err = db.QueryRow(`
SELECT EXISTS (
SELECT 1
FROM pg_constraint
WHERE conname = 'unique_estimatedvehiclejourney_order'
);
`).Scan(&constraintExists)
if err != nil {
return fmt.Errorf("failed to check if unique constraint exists: %w", err)
}
if !constraintExists {
_, err = db.Exec(`ALTER TABLE calls ADD CONSTRAINT unique_estimatedvehiclejourney_order UNIQUE (estimatedvehiclejourney, "order");`)
if err != nil {
return fmt.Errorf("failed to add unique constraint to calls table: %w", err)
}
fmt.Println("Unique constraint added to calls table.")
} else {
fmt.Println("Unique constraint already exists on calls table.")
}
// Check if the unique constraint exists before adding it to estimatedvehiclejourney table
err = db.QueryRow(`
SELECT EXISTS (
SELECT 1
FROM pg_constraint
WHERE conname = 'unique_lineref_directionref_datasource_datedvehiclejourneyref'
);
`).Scan(&constraintExists)
if err != nil {
return fmt.Errorf("failed to check if unique constraint exists: %w", err)
}
if !constraintExists {
_, err = db.Exec(`ALTER TABLE estimatedvehiclejourney ADD CONSTRAINT unique_lineref_directionref_datasource_datedvehiclejourneyref UNIQUE (lineref, directionref, datasource, datedvehiclejourneyref);`)
if err != nil {
return fmt.Errorf("failed to add unique constraint to estimatedvehiclejourney table: %w", err)
}
fmt.Println("Unique constraint added to estimatedvehiclejourney table.")
} else {
fmt.Println("Unique constraint already exists on estimatedvehiclejourney table.")
}
fmt.Println("Database setup is good!")
return nil
}

View File

@@ -6,21 +6,12 @@ import (
"fmt" "fmt"
"log" "log"
"strings" "strings"
"sync"
"sync/atomic"
"ti1/config" "ti1/config"
"ti1/data" "ti1/data"
"ti1/database" "ti1/database"
"time"
) )
// DBData is the main entry point for data processing
func DBData(data *data.Data) { func DBData(data *data.Data) {
DBDataOptimized(data)
}
// DBDataOptimized processes data with concurrent workers for better performance
func DBDataOptimized(data *data.Data) {
fmt.Println(data.ServiceDelivery.ResponseTimestamp) fmt.Println(data.ServiceDelivery.ResponseTimestamp)
fmt.Println(data.ServiceDelivery.EstimatedTimetableDelivery[0].EstimatedJourneyVersionFrame.RecordedAtTime) fmt.Println(data.ServiceDelivery.EstimatedTimetableDelivery[0].EstimatedJourneyVersionFrame.RecordedAtTime)
@@ -31,7 +22,7 @@ func DBDataOptimized(data *data.Data) {
defer db.Close() defer db.Close()
// Connect to Valkey // Connect to Valkey
valkeyClient, err := config.ConnectToValkey("config/conf.json") valkeyClient, err := config.ConnectToValkey()
if err != nil { if err != nil {
log.Fatalf("Failed to connect to Valkey: %v", err) log.Fatalf("Failed to connect to Valkey: %v", err)
} }
@@ -46,440 +37,395 @@ func DBDataOptimized(data *data.Data) {
} }
fmt.Println("SID:", sid) fmt.Println("SID:", sid)
// Record start time // counters
startTime := time.Now() var insertCount, updateCount, totalCount, estimatedCallInsertCount, estimatedCallUpdateCount, estimatedCallNoneCount, recordedCallInsertCount, recordedCallUpdateCount, recordedCallNoneCount int
// Atomic counters for thread-safe counting for _, journey := range data.ServiceDelivery.EstimatedTimetableDelivery[0].EstimatedJourneyVersionFrame.EstimatedVehicleJourney {
var insertCount, updateCount, estimatedCallInsertCount, estimatedCallUpdateCount, estimatedCallNoneCount, recordedCallInsertCount, recordedCallUpdateCount, recordedCallNoneCount int64 var values []interface{}
var datedVehicleJourneyRef, otherJson string
journeys := data.ServiceDelivery.EstimatedTimetableDelivery[0].EstimatedJourneyVersionFrame.EstimatedVehicleJourney values = append(values, sid)
totalJourneys := len(journeys) values = append(values, journey.RecordedAtTime)
fmt.Printf("Processing %d journeys...\n", totalJourneys) values = append(values, journey.LineRef)
//had to add to lowercase cus some values vary in case and it was causing duplicates
values = append(values, strings.ToLower(journey.DirectionRef))
values = append(values, journey.DataSource)
// Job structures if journey.FramedVehicleJourneyRef.DatedVehicleJourneyRef != "" {
type evjJob struct { datedVehicleJourneyRef = journey.FramedVehicleJourneyRef.DatedVehicleJourneyRef
index int } else if journey.DatedVehicleJourneyRef != "" {
} datedVehicleJourneyRef = journey.DatedVehicleJourneyRef
} else {
datedVehicleJourneyRef = "evj." + journey.EstimatedVehicleJourneyCode
}
values = append(values, datedVehicleJourneyRef)
type callJob struct { values = append(values, journey.VehicleMode)
evjID int values = append(values, journey.FramedVehicleJourneyRef.DataFrameRef)
values []interface{} values = append(values, journey.OriginRef)
} values = append(values, journey.DestinationRef)
values = append(values, journey.OperatorRef)
values = append(values, journey.VehicleRef)
values = append(values, journey.Cancellation)
// Channels // Create a map to hold the JSON object for the current journey
workerCount := 20 // Adjust based on your database and CPU jsonObject := make(map[string]interface{})
evjJobs := make(chan evjJob, workerCount*2)
estimatedCallJobs := make(chan callJob, workerCount*10)
recordedCallJobs := make(chan callJob, workerCount*10)
var wg sync.WaitGroup // Add relevant fields to the JSON object
var callWg sync.WaitGroup if journey.OriginName != "" {
jsonObject["OriginName"] = journey.OriginName
}
if journey.DestinationName != "" {
jsonObject["DestinationName"] = journey.DestinationName
}
if journey.ProductCategoryRef != "" {
jsonObject["ProductCategoryRef"] = journey.ProductCategoryRef
}
if journey.ServiceFeatureRef != "" {
jsonObject["ServiceFeatureRef"] = journey.ServiceFeatureRef
}
if journey.Monitored != "" {
jsonObject["Monitored"] = journey.Monitored
}
if journey.JourneyPatternRef != "" {
jsonObject["JourneyPatternRef"] = journey.JourneyPatternRef
}
if journey.JourneyPatternName != "" {
jsonObject["JourneyPatternName"] = journey.JourneyPatternName
}
if journey.PublishedLineName != "" {
jsonObject["PublishedLineName"] = journey.PublishedLineName
}
if journey.DirectionName != "" {
jsonObject["DirectionName"] = journey.DirectionName
}
if journey.OriginAimedDepartureTime != "" {
jsonObject["OriginAimedDepartureTime"] = journey.OriginAimedDepartureTime
}
if journey.DestinationAimedArrivalTime != "" {
jsonObject["DestinationAimedArrivalTime"] = journey.DestinationAimedArrivalTime
}
if journey.BlockRef != "" {
jsonObject["BlockRef"] = journey.BlockRef
}
if journey.VehicleJourneyRef != "" {
jsonObject["VehicleJourneyRef"] = journey.VehicleJourneyRef
}
if journey.Occupancy != "" {
jsonObject["Occupancy"] = journey.Occupancy
}
if journey.DestinationDisplayAtOrigin != "" {
jsonObject["DestinationDisplayAtOrigin"] = journey.DestinationDisplayAtOrigin
}
if journey.ExtraJourney != "" {
jsonObject["ExtraJourney"] = journey.ExtraJourney
}
if journey.RouteRef != "" {
jsonObject["RouteRef"] = journey.RouteRef
}
if journey.GroupOfLinesRef != "" {
jsonObject["GroupOfLinesRef"] = journey.GroupOfLinesRef
}
if journey.ExternalLineRef != "" {
jsonObject["ExternalLineRef"] = journey.ExternalLineRef
}
if journey.InCongestion != "" {
jsonObject["InCongestion"] = journey.InCongestion
}
if journey.PredictionInaccurate != "" {
jsonObject["PredictionInaccurate"] = journey.PredictionInaccurate
}
if journey.JourneyNote != "" {
jsonObject["JourneyNote"] = journey.JourneyNote
}
if journey.Via.PlaceName != "" {
jsonObject["Via"] = journey.Via.PlaceName
}
// Start Estimated Call workers // Convert the JSON object to a JSON string
for w := 0; w < workerCount; w++ { jsonString, err := json.Marshal(jsonObject)
callWg.Add(1) if err != nil {
go func() { log.Fatal(err)
defer callWg.Done() }
for job := range estimatedCallJobs { otherJson = string(jsonString)
id, action, err := database.InsertOrUpdateEstimatedCall(ctx, db, job.values, valkeyClient) values = append(values, otherJson)
if err != nil {
log.Printf("Error inserting/updating estimated call: %v\n", err) // Insert or update the record
continue id, action, err := database.InsertOrUpdateEstimatedVehicleJourney(db, values)
} if err != nil {
if action == "insert" { fmt.Printf("Error inserting/updating estimated vehicle journey: %v\n", err)
atomic.AddInt64(&estimatedCallInsertCount, 1) } else {
} else if action == "update" { if 1 == 0 {
atomic.AddInt64(&estimatedCallUpdateCount, 1) fmt.Printf("Action: %s, ID: %d\n", action, id)
} else if action == "none" {
atomic.AddInt64(&estimatedCallNoneCount, 1)
}
_ = id
} }
}()
}
// Start Recorded Call workers if action == "insert" {
for w := 0; w < workerCount; w++ { insertCount++
callWg.Add(1) } else if action == "update" {
go func() { updateCount++
defer callWg.Done()
for job := range recordedCallJobs {
id, action, err := database.InsertOrUpdateRecordedCall(ctx, db, job.values, valkeyClient)
if err != nil {
log.Printf("Error inserting/updating recorded call: %v\n", err)
continue
}
if action == "insert" {
atomic.AddInt64(&recordedCallInsertCount, 1)
} else if action == "update" {
atomic.AddInt64(&recordedCallUpdateCount, 1)
} else if action == "none" {
atomic.AddInt64(&recordedCallNoneCount, 1)
}
_ = id
} }
}() totalCount = insertCount + updateCount
}
// Start EVJ workers //fmt.Printf("Inserts: %d, Updates: %d, Total: %d\n", insertCount, updateCount, totalCount)
for w := 0; w < workerCount; w++ { if totalCount%1000 == 0 {
wg.Add(1) fmt.Printf(
go func() { "Inserts: %d, Updates: %d, Total: %d; estimatedCalls = I: %d U: %d N: %d; recordedCalls = I: %d U: %d N: %d\n",
defer wg.Done() insertCount,
for job := range evjJobs { updateCount,
journey := &journeys[job.index] totalCount,
estimatedCallInsertCount,
estimatedCallUpdateCount,
estimatedCallNoneCount,
recordedCallInsertCount,
recordedCallUpdateCount,
recordedCallNoneCount,
)
}
}
// Prepare values for _, estimatedCall := range journey.EstimatedCalls {
var values []interface{} for _, call := range estimatedCall.EstimatedCall {
var datedVehicleJourneyRef, otherJson string var estimatedValues []interface{}
values = append(values, sid) //1 estimatedvehiclejourney
values = append(values, journey.RecordedAtTime) estimatedValues = append(estimatedValues, id)
values = append(values, journey.LineRef) //2 order
values = append(values, strings.ToLower(journey.DirectionRef)) estimatedValues = append(estimatedValues, call.Order)
values = append(values, journey.DataSource) //3 stoppointref
estimatedValues = append(estimatedValues, call.StopPointRef)
//4 aimeddeparturetime
estimatedValues = append(estimatedValues, call.AimedDepartureTime)
//5 expecteddeparturetime
estimatedValues = append(estimatedValues, call.ExpectedDepartureTime)
//6 aimedarrivaltime
estimatedValues = append(estimatedValues, call.AimedArrivalTime)
//7 expectedarrivaltime
estimatedValues = append(estimatedValues, call.ExpectedArrivalTime)
//8 cancellation
estimatedValues = append(estimatedValues, call.Cancellation)
if journey.FramedVehicleJourneyRef.DatedVehicleJourneyRef != "" { //9 estimated_data (JSON)
datedVehicleJourneyRef = journey.FramedVehicleJourneyRef.DatedVehicleJourneyRef estimatedJsonObject := make(map[string]interface{})
} else if journey.DatedVehicleJourneyRef != "" { // data allrady loged
datedVehicleJourneyRef = journey.DatedVehicleJourneyRef if call.ExpectedDepartureTime != "" {
estimatedJsonObject["ExpectedDepartureTime"] = call.ExpectedDepartureTime
}
if call.ExpectedArrivalTime != "" {
estimatedJsonObject["ExpectedArrivalTime"] = call.ExpectedArrivalTime
}
if call.Cancellation != "" {
estimatedJsonObject["Cancellation"] = call.Cancellation
}
// The rest
if call.StopPointName != "" {
estimatedJsonObject["StopPointName"] = call.StopPointName
}
if call.RequestStop != "" {
estimatedJsonObject["RequestStop"] = call.RequestStop
}
if call.DepartureStatus != "" {
estimatedJsonObject["DepartureStatus"] = call.DepartureStatus
}
if call.DeparturePlatformName != "" {
estimatedJsonObject["DeparturePlatformName"] = call.DeparturePlatformName
}
if call.DepartureBoardingActivity != "" {
estimatedJsonObject["DepartureBoardingActivity"] = call.DepartureBoardingActivity
}
if call.DepartureStopAssignment.AimedQuayRef != "" {
estimatedJsonObject["DepartureStopAssignment.AimedQuayRef"] = call.DepartureStopAssignment.AimedQuayRef
}
if call.DepartureStopAssignment.ExpectedQuayRef != "" {
estimatedJsonObject["DepartureStopAssignment.ExpectedQuayRef"] = call.DepartureStopAssignment.ExpectedQuayRef
}
if call.DepartureStopAssignment.ActualQuayRef != "" {
estimatedJsonObject["DepartureStopAssignment.ActualQuayRef"] = call.DepartureStopAssignment.ActualQuayRef
}
if call.Extensions.StopsAtAirport != "" {
estimatedJsonObject["Extensions.StopsAtAirport"] = call.Extensions.StopsAtAirport
}
if call.ArrivalStatus != "" {
estimatedJsonObject["ArrivalStatus"] = call.ArrivalStatus
}
if call.ArrivalPlatformName != "" {
estimatedJsonObject["ArrivalPlatformName"] = call.ArrivalPlatformName
}
if call.ArrivalBoardingActivity != "" {
estimatedJsonObject["ArrivalBoardingActivity"] = call.ArrivalBoardingActivity
}
if call.ArrivalStopAssignment.AimedQuayRef != "" {
estimatedJsonObject["ArrivalStopAssignment.AimedQuayRef"] = call.ArrivalStopAssignment.AimedQuayRef
}
if call.ArrivalStopAssignment.ExpectedQuayRef != "" {
estimatedJsonObject["ArrivalStopAssignment.ExpectedQuayRef"] = call.ArrivalStopAssignment.ExpectedQuayRef
}
if call.ArrivalStopAssignment.ActualQuayRef != "" {
estimatedJsonObject["ArrivalStopAssignment.ActualQuayRef"] = call.ArrivalStopAssignment.ActualQuayRef
}
if call.CallNote != "" {
estimatedJsonObject["CallNote"] = call.CallNote
}
if call.DestinationDisplay != "" {
estimatedJsonObject["DestinationDisplay"] = call.DestinationDisplay
}
if call.ExpectedDeparturePredictionQuality.PredictionLevel != "" {
estimatedJsonObject["ExpectedDeparturePredictionQuality.PredictionLevel"] = call.ExpectedDeparturePredictionQuality.PredictionLevel
}
if call.ExpectedArrivalPredictionQuality.PredictionLevel != "" {
estimatedJsonObject["ExpectedArrivalPredictionQuality.PredictionLevel"] = call.ExpectedArrivalPredictionQuality.PredictionLevel
}
if call.TimingPoint != "" {
estimatedJsonObject["TimingPoint"] = call.TimingPoint
}
if call.SituationRef != "" {
estimatedJsonObject["SituationRef"] = call.SituationRef
}
if call.PredictionInaccurate != "" {
estimatedJsonObject["PredictionInaccurate"] = call.PredictionInaccurate
}
if call.Occupancy != "" {
estimatedJsonObject["Occupancy"] = call.Occupancy
}
// Convert the JSON object to a JSON string
jsonString, err := json.Marshal(estimatedJsonObject)
if err != nil {
log.Fatal(err)
}
estimatedValues = append(estimatedValues, string(jsonString))
// Insert or update the record
stringValues := make([]string, len(estimatedValues))
for i, v := range estimatedValues {
stringValues[i] = fmt.Sprintf("%v", v)
}
interfaceValues := make([]interface{}, len(stringValues))
for i, v := range stringValues {
interfaceValues[i] = v
}
id, action, err := database.InsertOrUpdateEstimatedCall(ctx, db, interfaceValues, valkeyClient)
if err != nil {
log.Fatalf("Failed to insert or update estimated call: %v", err)
} else { } else {
datedVehicleJourneyRef = "evj." + journey.EstimatedVehicleJourneyCode if 1 == 0 {
} fmt.Printf("Action: %s, ID: %d\n", action, id)
values = append(values, datedVehicleJourneyRef)
values = append(values, journey.VehicleMode)
values = append(values, journey.FramedVehicleJourneyRef.DataFrameRef)
values = append(values, journey.OriginRef)
values = append(values, journey.DestinationRef)
values = append(values, journey.OperatorRef)
values = append(values, journey.VehicleRef)
values = append(values, journey.Cancellation)
// Create JSON object
jsonObject := make(map[string]interface{})
if journey.OriginName != "" {
jsonObject["OriginName"] = journey.OriginName
}
if journey.DestinationName != "" {
jsonObject["DestinationName"] = journey.DestinationName
}
if journey.ProductCategoryRef != "" {
jsonObject["ProductCategoryRef"] = journey.ProductCategoryRef
}
if journey.ServiceFeatureRef != "" {
jsonObject["ServiceFeatureRef"] = journey.ServiceFeatureRef
}
if journey.Monitored != "" {
jsonObject["Monitored"] = journey.Monitored
}
if journey.JourneyPatternRef != "" {
jsonObject["JourneyPatternRef"] = journey.JourneyPatternRef
}
if journey.JourneyPatternName != "" {
jsonObject["JourneyPatternName"] = journey.JourneyPatternName
}
if journey.PublishedLineName != "" {
jsonObject["PublishedLineName"] = journey.PublishedLineName
}
if journey.DirectionName != "" {
jsonObject["DirectionName"] = journey.DirectionName
}
if journey.OriginAimedDepartureTime != "" {
jsonObject["OriginAimedDepartureTime"] = journey.OriginAimedDepartureTime
}
if journey.DestinationAimedArrivalTime != "" {
jsonObject["DestinationAimedArrivalTime"] = journey.DestinationAimedArrivalTime
}
if journey.BlockRef != "" {
jsonObject["BlockRef"] = journey.BlockRef
}
if journey.VehicleJourneyRef != "" {
jsonObject["VehicleJourneyRef"] = journey.VehicleJourneyRef
}
if journey.Occupancy != "" {
jsonObject["Occupancy"] = journey.Occupancy
}
if journey.DestinationDisplayAtOrigin != "" {
jsonObject["DestinationDisplayAtOrigin"] = journey.DestinationDisplayAtOrigin
}
if journey.ExtraJourney != "" {
jsonObject["ExtraJourney"] = journey.ExtraJourney
}
if journey.RouteRef != "" {
jsonObject["RouteRef"] = journey.RouteRef
}
if journey.GroupOfLinesRef != "" {
jsonObject["GroupOfLinesRef"] = journey.GroupOfLinesRef
}
if journey.ExternalLineRef != "" {
jsonObject["ExternalLineRef"] = journey.ExternalLineRef
}
if journey.InCongestion != "" {
jsonObject["InCongestion"] = journey.InCongestion
}
if journey.PredictionInaccurate != "" {
jsonObject["PredictionInaccurate"] = journey.PredictionInaccurate
}
if journey.JourneyNote != "" {
jsonObject["JourneyNote"] = journey.JourneyNote
}
if journey.Via.PlaceName != "" {
jsonObject["Via"] = journey.Via.PlaceName
}
jsonString, err := json.Marshal(jsonObject)
if err != nil {
log.Printf("Error marshaling JSON: %v\n", err)
continue
}
otherJson = string(jsonString)
values = append(values, otherJson)
// Insert or update EVJ
id, action, err := database.InsertOrUpdateEstimatedVehicleJourney(db, values)
if err != nil {
log.Printf("Error inserting/updating estimated vehicle journey: %v\n", err)
continue
}
if action == "insert" {
atomic.AddInt64(&insertCount, 1)
} else if action == "update" {
atomic.AddInt64(&updateCount, 1)
}
// Progress reporting
total := atomic.AddInt64(&insertCount, 0) + atomic.AddInt64(&updateCount, 0)
if total%1000 == 0 {
fmt.Printf(
"EVJ - I: %d, U: %d, Total: %d; EstCalls - I: %d U: %d N: %d; RecCalls - I: %d U: %d N: %d\n",
atomic.LoadInt64(&insertCount),
atomic.LoadInt64(&updateCount),
total,
atomic.LoadInt64(&estimatedCallInsertCount),
atomic.LoadInt64(&estimatedCallUpdateCount),
atomic.LoadInt64(&estimatedCallNoneCount),
atomic.LoadInt64(&recordedCallInsertCount),
atomic.LoadInt64(&recordedCallUpdateCount),
atomic.LoadInt64(&recordedCallNoneCount),
)
}
// Process Estimated Calls
for _, estimatedCall := range journey.EstimatedCalls {
for _, call := range estimatedCall.EstimatedCall {
var estimatedValues []interface{}
estimatedValues = append(estimatedValues, id)
estimatedValues = append(estimatedValues, call.Order)
estimatedValues = append(estimatedValues, call.StopPointRef)
estimatedValues = append(estimatedValues, call.AimedDepartureTime)
estimatedValues = append(estimatedValues, call.ExpectedDepartureTime)
estimatedValues = append(estimatedValues, call.AimedArrivalTime)
estimatedValues = append(estimatedValues, call.ExpectedArrivalTime)
estimatedValues = append(estimatedValues, call.Cancellation)
// estimated_data JSON
estimatedJsonObject := make(map[string]interface{})
if call.ExpectedDepartureTime != "" {
estimatedJsonObject["ExpectedDepartureTime"] = call.ExpectedDepartureTime
}
if call.ExpectedArrivalTime != "" {
estimatedJsonObject["ExpectedArrivalTime"] = call.ExpectedArrivalTime
}
if call.Cancellation != "" {
estimatedJsonObject["Cancellation"] = call.Cancellation
}
if call.StopPointName != "" {
estimatedJsonObject["StopPointName"] = call.StopPointName
}
if call.RequestStop != "" {
estimatedJsonObject["RequestStop"] = call.RequestStop
}
if call.DepartureStatus != "" {
estimatedJsonObject["DepartureStatus"] = call.DepartureStatus
}
if call.DeparturePlatformName != "" {
estimatedJsonObject["DeparturePlatformName"] = call.DeparturePlatformName
}
if call.DepartureBoardingActivity != "" {
estimatedJsonObject["DepartureBoardingActivity"] = call.DepartureBoardingActivity
}
if call.DepartureStopAssignment.AimedQuayRef != "" {
estimatedJsonObject["DepartureStopAssignment.AimedQuayRef"] = call.DepartureStopAssignment.AimedQuayRef
}
if call.DepartureStopAssignment.ExpectedQuayRef != "" {
estimatedJsonObject["DepartureStopAssignment.ExpectedQuayRef"] = call.DepartureStopAssignment.ExpectedQuayRef
}
if call.DepartureStopAssignment.ActualQuayRef != "" {
estimatedJsonObject["DepartureStopAssignment.ActualQuayRef"] = call.DepartureStopAssignment.ActualQuayRef
}
if call.Extensions.StopsAtAirport != "" {
estimatedJsonObject["Extensions.StopsAtAirport"] = call.Extensions.StopsAtAirport
}
if call.ArrivalStatus != "" {
estimatedJsonObject["ArrivalStatus"] = call.ArrivalStatus
}
if call.ArrivalPlatformName != "" {
estimatedJsonObject["ArrivalPlatformName"] = call.ArrivalPlatformName
}
if call.ArrivalBoardingActivity != "" {
estimatedJsonObject["ArrivalBoardingActivity"] = call.ArrivalBoardingActivity
}
if call.ArrivalStopAssignment.AimedQuayRef != "" {
estimatedJsonObject["ArrivalStopAssignment.AimedQuayRef"] = call.ArrivalStopAssignment.AimedQuayRef
}
if call.ArrivalStopAssignment.ExpectedQuayRef != "" {
estimatedJsonObject["ArrivalStopAssignment.ExpectedQuayRef"] = call.ArrivalStopAssignment.ExpectedQuayRef
}
if call.ArrivalStopAssignment.ActualQuayRef != "" {
estimatedJsonObject["ArrivalStopAssignment.ActualQuayRef"] = call.ArrivalStopAssignment.ActualQuayRef
}
if call.CallNote != "" {
estimatedJsonObject["CallNote"] = call.CallNote
}
if call.DestinationDisplay != "" {
estimatedJsonObject["DestinationDisplay"] = call.DestinationDisplay
}
if call.ExpectedDeparturePredictionQuality.PredictionLevel != "" {
estimatedJsonObject["ExpectedDeparturePredictionQuality.PredictionLevel"] = call.ExpectedDeparturePredictionQuality.PredictionLevel
}
if call.ExpectedArrivalPredictionQuality.PredictionLevel != "" {
estimatedJsonObject["ExpectedArrivalPredictionQuality.PredictionLevel"] = call.ExpectedArrivalPredictionQuality.PredictionLevel
}
if call.TimingPoint != "" {
estimatedJsonObject["TimingPoint"] = call.TimingPoint
}
if call.SituationRef != "" {
estimatedJsonObject["SituationRef"] = call.SituationRef
}
if call.PredictionInaccurate != "" {
estimatedJsonObject["PredictionInaccurate"] = call.PredictionInaccurate
}
if call.Occupancy != "" {
estimatedJsonObject["Occupancy"] = call.Occupancy
}
jsonString, err := json.Marshal(estimatedJsonObject)
if err != nil {
log.Printf("Error marshaling estimated call JSON: %v\n", err)
continue
}
estimatedValues = append(estimatedValues, string(jsonString))
// Convert to string values
interfaceValues := make([]interface{}, len(estimatedValues))
for i, v := range estimatedValues {
interfaceValues[i] = fmt.Sprintf("%v", v)
}
// Send to worker pool
estimatedCallJobs <- callJob{evjID: id, values: interfaceValues}
} }
}
// Process Recorded Calls if action == "insert" {
for _, recordedCall := range journey.RecordedCalls { estimatedCallInsertCount++
for _, call := range recordedCall.RecordedCall { } else if action == "update" {
var recordedValues []interface{} estimatedCallUpdateCount++
} else if action == "none" {
recordedValues = append(recordedValues, id) estimatedCallNoneCount++
recordedValues = append(recordedValues, call.Order)
recordedValues = append(recordedValues, call.StopPointRef)
recordedValues = append(recordedValues, call.AimedDepartureTime)
recordedValues = append(recordedValues, call.ExpectedDepartureTime)
recordedValues = append(recordedValues, call.AimedArrivalTime)
recordedValues = append(recordedValues, call.ExpectedArrivalTime)
recordedValues = append(recordedValues, call.Cancellation)
recordedValues = append(recordedValues, call.ActualDepartureTime)
recordedValues = append(recordedValues, call.ActualArrivalTime)
// recorded_data JSON
recordedJsonObject := make(map[string]interface{})
if call.StopPointName != "" {
recordedJsonObject["StopPointName"] = call.StopPointName
}
if call.ArrivalPlatformName != "" {
recordedJsonObject["ArrivalPlatformName"] = call.ArrivalPlatformName
}
if call.DeparturePlatformName != "" {
recordedJsonObject["DeparturePlatformName"] = call.DeparturePlatformName
}
if call.PredictionInaccurate != "" {
recordedJsonObject["PredictionInaccurate"] = call.PredictionInaccurate
}
if call.Occupancy != "" {
recordedJsonObject["Occupancy"] = call.Occupancy
}
jsonString, err := json.Marshal(recordedJsonObject)
if err != nil {
log.Printf("Error marshaling recorded call JSON: %v\n", err)
continue
}
recordedValues = append(recordedValues, string(jsonString))
// Convert to string values
interfaceValues := make([]interface{}, len(recordedValues))
for i, v := range recordedValues {
interfaceValues[i] = fmt.Sprintf("%v", v)
}
// Send to worker pool
recordedCallJobs <- callJob{evjID: id, values: interfaceValues}
} }
} }
} }
}() }
for _, recordedCall := range journey.RecordedCalls {
for _, call := range recordedCall.RecordedCall {
var recordedValues []interface{}
//1 estimatedvehiclejourney
recordedValues = append(recordedValues, id)
//2 order
recordedValues = append(recordedValues, call.Order)
//3 stoppointref
recordedValues = append(recordedValues, call.StopPointRef)
//4 aimeddeparturetime
recordedValues = append(recordedValues, call.AimedDepartureTime)
//5 expecteddeparturetime
recordedValues = append(recordedValues, call.ExpectedDepartureTime)
//6 aimedarrivaltime
recordedValues = append(recordedValues, call.AimedArrivalTime)
//7 expectedarrivaltime
recordedValues = append(recordedValues, call.ExpectedArrivalTime)
//8 cancellation
recordedValues = append(recordedValues, call.Cancellation)
//9 actualdeparturetime
recordedValues = append(recordedValues, call.ActualDepartureTime)
//10 actualarrivaltime
recordedValues = append(recordedValues, call.ActualArrivalTime)
//11 recorded_data (JSON)
recordedJsonObject := make(map[string]interface{})
if call.StopPointName != "" {
recordedJsonObject["StopPointName"] = call.StopPointName
}
if call.ArrivalPlatformName != "" {
recordedJsonObject["ArrivalPlatformName"] = call.ArrivalPlatformName
}
if call.DeparturePlatformName != "" {
recordedJsonObject["DeparturePlatformName"] = call.DeparturePlatformName
}
if call.PredictionInaccurate != "" {
recordedJsonObject["PredictionInaccurate"] = call.PredictionInaccurate
}
if call.Occupancy != "" {
recordedJsonObject["Occupancy"] = call.Occupancy
}
// Convert the JSON object to a JSON string
jsonString, err := json.Marshal(recordedJsonObject)
if err != nil {
log.Fatal(err)
}
recordedValues = append(recordedValues, string(jsonString))
// Insert or update the record
stringValues := make([]string, len(recordedValues))
for i, v := range recordedValues {
stringValues[i] = fmt.Sprintf("%v", v)
}
interfaceValues := make([]interface{}, len(stringValues))
for i, v := range stringValues {
interfaceValues[i] = v
}
id, action, err := database.InsertOrUpdateRecordedCall(ctx, db, interfaceValues, valkeyClient)
if err != nil {
fmt.Printf("Error inserting/updating recorded call: %v\n", err)
} else {
if 1 == 0 {
fmt.Printf("Action: %s, ID: %d\n", action, id)
}
if action == "insert" {
recordedCallInsertCount++
//fmt.Printf("Action: %s, ID: %d\n", action, id)
} else if action == "update" {
recordedCallUpdateCount++
} else if action == "none" {
recordedCallNoneCount++
}
}
}
}
} }
// Send all EVJ jobs
for i := range journeys {
evjJobs <- evjJob{index: i}
}
close(evjJobs)
// Wait for EVJ processing to complete
wg.Wait()
// Close call job channels and wait for call processing to complete
close(estimatedCallJobs)
close(recordedCallJobs)
callWg.Wait()
// Record end time
endTime := time.Now()
// Print final stats
fmt.Printf( fmt.Printf(
"\nDONE: EVJ - Inserts: %d, Updates: %d, Total: %d\n"+ "DONE: Inserts: %d, Updates: %d, Total: %d; estimatedCalls = I: %d U: %d N: %d; recordedCalls = I: %d U: %d N: %d\n",
" EstimatedCalls - I: %d U: %d N: %d\n"+ insertCount,
" RecordedCalls - I: %d U: %d N: %d\n", updateCount,
atomic.LoadInt64(&insertCount), totalCount,
atomic.LoadInt64(&updateCount), estimatedCallInsertCount,
atomic.LoadInt64(&insertCount)+atomic.LoadInt64(&updateCount), estimatedCallUpdateCount,
atomic.LoadInt64(&estimatedCallInsertCount), estimatedCallNoneCount,
atomic.LoadInt64(&estimatedCallUpdateCount), recordedCallInsertCount,
atomic.LoadInt64(&estimatedCallNoneCount), recordedCallUpdateCount,
atomic.LoadInt64(&recordedCallInsertCount), recordedCallNoneCount,
atomic.LoadInt64(&recordedCallUpdateCount),
atomic.LoadInt64(&recordedCallNoneCount),
) )
// Create map to hold JSON // Create map to hold JSON
serviceDeliveryJsonObject := make(map[string]interface{}) serviceDeliveryJsonObject := make(map[string]interface{})
serviceDeliveryJsonObject["Inserts"] = atomic.LoadInt64(&insertCount)
serviceDeliveryJsonObject["Updates"] = atomic.LoadInt64(&updateCount) // Add fields to JSON
serviceDeliveryJsonObject["EstimatedCallInserts"] = atomic.LoadInt64(&estimatedCallInsertCount) serviceDeliveryJsonObject["Inserts"] = insertCount
serviceDeliveryJsonObject["EstimatedCallUpdates"] = atomic.LoadInt64(&estimatedCallUpdateCount) serviceDeliveryJsonObject["Updates"] = updateCount
serviceDeliveryJsonObject["EstimatedCallNone"] = atomic.LoadInt64(&estimatedCallNoneCount) serviceDeliveryJsonObject["EstimatedCallInserts"] = estimatedCallInsertCount
serviceDeliveryJsonObject["RecordedCallInserts"] = atomic.LoadInt64(&recordedCallInsertCount) serviceDeliveryJsonObject["EstimatedCallUpdates"] = estimatedCallUpdateCount
serviceDeliveryJsonObject["RecordedCallUpdates"] = atomic.LoadInt64(&recordedCallUpdateCount) serviceDeliveryJsonObject["EstimatedCallNone"] = estimatedCallNoneCount
serviceDeliveryJsonObject["RecordedCallNone"] = atomic.LoadInt64(&recordedCallNoneCount) serviceDeliveryJsonObject["RecordedCallInserts"] = recordedCallInsertCount
serviceDeliveryJsonObject["StartTime"] = startTime.Format(time.RFC3339) serviceDeliveryJsonObject["RecordedCallUpdates"] = recordedCallUpdateCount
serviceDeliveryJsonObject["EndTime"] = endTime.Format(time.RFC3339) serviceDeliveryJsonObject["RecordedCallNone"] = recordedCallNoneCount
serviceDeliveryJsonObject["Duration"] = endTime.Sub(startTime).String()
// Convert JSON object to JSON string // Convert JSON object to JSON string
serviceDeliveryJsonString, err := json.Marshal(serviceDeliveryJsonObject) serviceDeliveryJsonString, err := json.Marshal(serviceDeliveryJsonObject)
@@ -492,6 +438,4 @@ func DBDataOptimized(data *data.Data) {
if err != nil { if err != nil {
log.Fatal(err) log.Fatal(err)
} }
fmt.Println("Finished with this ServiceDelivery!")
} }

13
main.go
View File

@@ -2,6 +2,7 @@ package main
import ( import (
"log" "log"
"ti1/config"
"ti1/data" "ti1/data"
"ti1/database" "ti1/database"
"ti1/export" "ti1/export"
@@ -9,11 +10,17 @@ import (
) )
func main() { func main() {
log.Println("ti1 v1.0.2") log.Println("ti1 v0.2.1")
log.Println("Starting...") log.Println("Starting...")
// Load configuration
cfg, err := config.LoadConfig()
if err != nil {
log.Fatalf("Failed to load config: %v", err)
}
// Setup the database // Setup the database
err := database.SetupDB() err = database.SetupDB()
if err != nil { if err != nil {
log.Fatalf("Database setup failed: %v", err) log.Fatalf("Database setup failed: %v", err)
} }
@@ -25,7 +32,7 @@ func main() {
for { for {
start := time.Now() start := time.Now()
data, err := data.FetchData(starttimestamp) data, err := data.FetchData(starttimestamp, cfg.DatasetId, cfg.ExcludedDatasetIds)
if err != nil { if err != nil {
log.Fatal(err) log.Fatal(err)
} }