Compare commits
16 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
228a971257 | ||
|
|
0c9c3653f4 | ||
|
|
4c72bb1d4c | ||
|
|
ea36b60233 | ||
|
|
201811e2d3 | ||
|
|
bcdc24e47f | ||
|
|
4bb49e944f | ||
|
|
fbb20d576b | ||
|
|
823701c698 | ||
|
|
00e6667bc0 | ||
|
|
9b433cdd57 | ||
|
|
c106267d76 | ||
|
|
b95fbb477d | ||
|
|
7424600a8b | ||
|
|
5171dcf4f6 | ||
|
|
0324912eb4 |
4
.github/workflows/docker-image.yml
vendored
4
.github/workflows/docker-image.yml
vendored
@@ -30,7 +30,7 @@ jobs:
|
||||
- name: Get commit version
|
||||
id: commit-version
|
||||
run: |
|
||||
COMMIT_MSG=$(git log -1 --pretty=%B)
|
||||
COMMIT_MSG=$(git log -1 --pretty=%B | head -n1 | xargs)
|
||||
echo "Commit message: $COMMIT_MSG" # Debugging output
|
||||
# Updated regex to handle both vX.Y, vX.Y.Z, and vX.Y-pre-release formats
|
||||
if [[ "$COMMIT_MSG" =~ ^v[0-9]+\.[0-9]+(\.[0-9]+)?(-[a-zA-Z0-9._-]+)?$ ]]; then
|
||||
@@ -55,4 +55,4 @@ jobs:
|
||||
if [[ "${{ env.VERSION }}" != "dev" ]]; then
|
||||
docker tag ti1:${{ env.VERSION }} ${{ secrets.DOCKER_USERNAME }}/ti1:${{ env.VERSION }}
|
||||
docker push ${{ secrets.DOCKER_USERNAME }}/ti1:${{ env.VERSION }}
|
||||
fi
|
||||
fi
|
||||
@@ -138,7 +138,7 @@ nano postgres_data/postgresql.conf
|
||||
Change the following values
|
||||
```conf
|
||||
listen_addresses = '*'
|
||||
max_connections = 100
|
||||
max_connections = 200
|
||||
shared_buffers = 16GB
|
||||
work_mem = 256MB
|
||||
maintenance_work_mem = 2GB
|
||||
|
||||
@@ -8,13 +8,11 @@
|
||||
"sslmode": "disable"
|
||||
},
|
||||
"valkey": {
|
||||
"host": "127.0.0.1",
|
||||
"port": "6379",
|
||||
"max_conns": 50,
|
||||
"timeout_ms": 5000,
|
||||
"password": "the_valkey_password"
|
||||
},
|
||||
"temp": "value",
|
||||
"dataset_id": "",
|
||||
"excluded_dataset_ids": ""
|
||||
"host": "127.0.0.1",
|
||||
"port": "6379",
|
||||
"max_conns": 100,
|
||||
"timeout_ms": 2000,
|
||||
"password": "the_valkey_password"
|
||||
},
|
||||
"temp": "value"
|
||||
}
|
||||
|
||||
@@ -7,8 +7,6 @@ import (
|
||||
"strconv"
|
||||
)
|
||||
|
||||
const configFilePath = "config/conf.json"
|
||||
|
||||
type Config struct {
|
||||
Database struct {
|
||||
Host string `json:"host"`
|
||||
@@ -23,16 +21,14 @@ type Config struct {
|
||||
Port string `json:"port"`
|
||||
MaxConns int `json:"max_conns"`
|
||||
TimeoutMs int `json:"timeout_ms"`
|
||||
Password string `json:"password"`
|
||||
Password string `json:"password"` // Add this line
|
||||
} `json:"valkey"`
|
||||
Temp string `json:"temp"`
|
||||
DatasetId string `json:"dataset_id"`
|
||||
ExcludedDatasetIds string `json:"excluded_dataset_ids"`
|
||||
Temp string `json:"temp"`
|
||||
}
|
||||
|
||||
func LoadConfig() (Config, error) {
|
||||
func LoadConfig(file string) (Config, error) {
|
||||
var config Config
|
||||
configFile, err := os.Open(configFilePath)
|
||||
configFile, err := os.Open(file)
|
||||
if err != nil {
|
||||
return config, fmt.Errorf("failed to open config file: %w", err)
|
||||
}
|
||||
@@ -84,13 +80,5 @@ func LoadConfig() (Config, error) {
|
||||
}
|
||||
}
|
||||
|
||||
// Override datasetId and excludedDatasetIds with environment variables
|
||||
if datasetId := os.Getenv("DATASET_ID"); datasetId != "" {
|
||||
config.DatasetId = datasetId
|
||||
}
|
||||
if excludedDatasetIds := os.Getenv("EXCLUDED_DATASET_IDS"); excludedDatasetIds != "" {
|
||||
config.ExcludedDatasetIds = excludedDatasetIds
|
||||
}
|
||||
|
||||
return config, nil
|
||||
}
|
||||
|
||||
13
config/db.go
13
config/db.go
@@ -11,7 +11,7 @@ import (
|
||||
|
||||
func ConnectToPostgreSQL() (*sql.DB, error) {
|
||||
fmt.Println("Connecting to PostgreSQL...")
|
||||
config, err := LoadConfig()
|
||||
config, err := LoadConfig("config/conf.json")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -27,10 +27,11 @@ func ConnectToPostgreSQL() (*sql.DB, error) {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Set connection pool settings
|
||||
db.SetMaxOpenConns(25) // Maximum number of open connections to the database
|
||||
db.SetMaxIdleConns(25) // Maximum number of connections in the idle connection pool
|
||||
db.SetConnMaxLifetime(1 * time.Hour) // Maximum amount of time a connection may be reused
|
||||
// Set connection pool settings for high concurrency
|
||||
db.SetMaxOpenConns(50) // Maximum number of open connections to the database
|
||||
db.SetMaxIdleConns(25) // Maximum number of connections in the idle connection pool
|
||||
db.SetConnMaxLifetime(1 * time.Hour) // Maximum amount of time a connection may be reused
|
||||
db.SetConnMaxIdleTime(5 * time.Minute) // Maximum amount of time a connection may be idle
|
||||
|
||||
fmt.Println("Connection to PostgreSQL opened successfully :D")
|
||||
|
||||
@@ -57,7 +58,7 @@ func DisconnectFromPostgreSQL(db *sql.DB) error {
|
||||
}
|
||||
|
||||
func PrintDBConfig() {
|
||||
config, err := LoadConfig()
|
||||
config, err := LoadConfig("config/conf.json")
|
||||
if err != nil {
|
||||
fmt.Println("Error loading config:", err)
|
||||
return
|
||||
|
||||
@@ -17,7 +17,7 @@ type ValkeyConfig struct {
|
||||
Port string `json:"port"`
|
||||
MaxConns int `json:"max_conns"`
|
||||
TimeoutMs int `json:"timeout_ms"`
|
||||
Password string `json:"password"`
|
||||
Password string `json:"password"` // Add this line
|
||||
}
|
||||
|
||||
func LoadValkeyConfig(file string) (ValkeyConfig, error) {
|
||||
@@ -56,9 +56,9 @@ func LoadValkeyConfig(file string) (ValkeyConfig, error) {
|
||||
return config, nil
|
||||
}
|
||||
|
||||
func ConnectToValkey() (valkey.Client, error) {
|
||||
func ConnectToValkey(configPath string) (valkey.Client, error) {
|
||||
fmt.Println("Loading configuration...")
|
||||
config, err := LoadConfig()
|
||||
config, err := LoadConfig(configPath)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to load config: %v", err)
|
||||
}
|
||||
|
||||
98
data/data.go
98
data/data.go
@@ -1,10 +1,12 @@
|
||||
package data
|
||||
|
||||
import (
|
||||
"crypto/tls"
|
||||
"encoding/xml"
|
||||
"fmt"
|
||||
"log"
|
||||
"net/http"
|
||||
"strings"
|
||||
"time"
|
||||
)
|
||||
|
||||
type Data struct {
|
||||
@@ -127,31 +129,87 @@ type Data struct {
|
||||
} `xml:"ServiceDelivery"`
|
||||
}
|
||||
|
||||
func FetchData(timestamp, datasetId, excludedDatasetIds string) (*Data, error) {
|
||||
client := &http.Client{}
|
||||
func FetchData(timestamp string) (*Data, error) {
|
||||
// Configure HTTP client with timeout and HTTP/1.1 to avoid HTTP/2 stream errors
|
||||
transport := &http.Transport{
|
||||
TLSClientConfig: &tls.Config{
|
||||
MinVersion: tls.VersionTLS12,
|
||||
},
|
||||
MaxIdleConns: 10,
|
||||
MaxIdleConnsPerHost: 10,
|
||||
IdleConnTimeout: 90 * time.Second,
|
||||
DisableCompression: false,
|
||||
ForceAttemptHTTP2: false, // Disable HTTP/2 to avoid stream errors
|
||||
TLSHandshakeTimeout: 10 * time.Second,
|
||||
ResponseHeaderTimeout: 30 * time.Second,
|
||||
ExpectContinueTimeout: 1 * time.Second,
|
||||
}
|
||||
|
||||
client := &http.Client{
|
||||
Transport: transport,
|
||||
Timeout: 180 * time.Second, // 3 minute timeout for large datasets
|
||||
}
|
||||
|
||||
requestorId := "ti1-" + timestamp
|
||||
url := "https://api.entur.io/realtime/v1/rest/et?useOriginalId=true&maxSize=100000&requestorId=" + requestorId
|
||||
|
||||
baseURL := "https://api.entur.io/realtime/v1/rest/et?useOriginalId=true&maxSize=100000&requestorId=" + requestorId
|
||||
// Retry logic for transient failures
|
||||
var resp *http.Response
|
||||
var err error
|
||||
var data *Data
|
||||
maxRetries := 3
|
||||
|
||||
if datasetId != "" {
|
||||
baseURL += "&datasetId=" + datasetId
|
||||
} else if excludedDatasetIds != "" {
|
||||
baseURL += "&excludedDatasetIds=" + strings.ReplaceAll(excludedDatasetIds, ",", "&excludedDatasetIds=")
|
||||
for attempt := 1; attempt <= maxRetries; attempt++ {
|
||||
log.Printf("Fetching data from URL (attempt %d/%d): %s", attempt, maxRetries, url)
|
||||
|
||||
resp, err = client.Get(url)
|
||||
if err != nil {
|
||||
log.Printf("Request failed: %v", err)
|
||||
if attempt < maxRetries {
|
||||
waitTime := time.Duration(attempt*2) * time.Second
|
||||
log.Printf("Retrying in %v...", waitTime)
|
||||
time.Sleep(waitTime)
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
// Check HTTP status code
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
resp.Body.Close()
|
||||
err = fmt.Errorf("HTTP error: %s (status code: %d)", resp.Status, resp.StatusCode)
|
||||
log.Printf("%v", err)
|
||||
if attempt < maxRetries {
|
||||
waitTime := time.Duration(attempt*2) * time.Second
|
||||
log.Printf("Retrying in %v...", waitTime)
|
||||
time.Sleep(waitTime)
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
// Try to decode the response
|
||||
data = &Data{}
|
||||
decoder := xml.NewDecoder(resp.Body)
|
||||
err = decoder.Decode(data)
|
||||
resp.Body.Close()
|
||||
|
||||
if err != nil {
|
||||
log.Printf("Failed to decode XML: %v", err)
|
||||
if attempt < maxRetries {
|
||||
waitTime := time.Duration(attempt*2) * time.Second
|
||||
log.Printf("Retrying in %v...", waitTime)
|
||||
time.Sleep(waitTime)
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
// Success!
|
||||
log.Printf("Successfully fetched and decoded data")
|
||||
return data, nil
|
||||
}
|
||||
|
||||
log.Println("Fetching data from URL:", baseURL)
|
||||
resp, err := client.Get(baseURL)
|
||||
// All retries failed
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
data := &Data{}
|
||||
decoder := xml.NewDecoder(resp.Body)
|
||||
err = decoder.Decode(data)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return data, nil
|
||||
return nil, fmt.Errorf("Failed to fetch data after %d attempts", maxRetries)
|
||||
}
|
||||
|
||||
@@ -6,11 +6,18 @@ import (
|
||||
"database/sql"
|
||||
"encoding/hex"
|
||||
"fmt"
|
||||
"sync"
|
||||
"ti1/valki"
|
||||
|
||||
"github.com/valkey-io/valkey-go"
|
||||
)
|
||||
|
||||
type CallResult struct {
|
||||
ID int
|
||||
Action string
|
||||
Error error
|
||||
}
|
||||
|
||||
func InsertOrUpdateEstimatedCall(ctx context.Context, db *sql.DB, values []interface{}, valkeyClient valkey.Client) (int, string, error) {
|
||||
// Replace empty strings with nil for timestamp fields
|
||||
for i, v := range values {
|
||||
@@ -28,19 +35,15 @@ func InsertOrUpdateEstimatedCall(ctx context.Context, db *sql.DB, values []inter
|
||||
}
|
||||
hash := md5.Sum([]byte(valuesString))
|
||||
hashString := hex.EncodeToString(hash[:])
|
||||
//fmt.Println("HashString:", hashString)
|
||||
|
||||
estimatedVehicleJourneyID := values[0]
|
||||
orderID := values[1]
|
||||
key := fmt.Sprintf("%v.%v", estimatedVehicleJourneyID, orderID)
|
||||
//fmt.Printf("Estimated Vehicle Journey ID: %v, Order ID: %v\n", estimatedVehicleJourneyID, orderID)
|
||||
|
||||
var err error
|
||||
|
||||
// Get the MD5 hash from Valkey
|
||||
retrievedHash, err := valki.GetValkeyValue(ctx, valkeyClient, key)
|
||||
if err != nil {
|
||||
return 0, "", fmt.Errorf("failed to get value from Valkey: %v", err)
|
||||
return 0, "", fmt.Errorf("failed to get value from Valkey: %w", err)
|
||||
}
|
||||
|
||||
// Check if the retrieved value matches the original MD5 hash
|
||||
@@ -64,26 +67,60 @@ func InsertOrUpdateEstimatedCall(ctx context.Context, db *sql.DB, values []inter
|
||||
estimated_data = EXCLUDED.estimated_data
|
||||
RETURNING CASE WHEN xmax = 0 THEN 'insert' ELSE 'update' END, id;
|
||||
`
|
||||
stmt, err := db.Prepare(query)
|
||||
if err != nil {
|
||||
return 0, "", fmt.Errorf("error preparing statement: %v", err)
|
||||
}
|
||||
defer stmt.Close()
|
||||
|
||||
err = valki.SetValkeyValue(ctx, valkeyClient, key, hashString)
|
||||
if err != nil {
|
||||
return 0, "", fmt.Errorf("failed to set value in Valkey: %v", err)
|
||||
return 0, "", fmt.Errorf("failed to set value in Valkey: %w", err)
|
||||
}
|
||||
|
||||
var action string
|
||||
var id int
|
||||
err = stmt.QueryRow(values...).Scan(&action, &id)
|
||||
err = db.QueryRowContext(ctx, query, values...).Scan(&action, &id)
|
||||
if err != nil {
|
||||
return 0, "", fmt.Errorf("error executing statement: %v", err)
|
||||
return 0, "", fmt.Errorf("error executing statement: %w", err)
|
||||
}
|
||||
return id, action, nil
|
||||
} else {
|
||||
//fmt.Printf("MATCH!!! Original Hash: %s, Retrieved Hash: %s\n", hashString, retrievedHash)
|
||||
return 0, "none", nil
|
||||
}
|
||||
return 0, "none", nil
|
||||
}
|
||||
|
||||
// BatchInsertEstimatedCalls processes multiple estimated calls concurrently
|
||||
func BatchInsertEstimatedCalls(ctx context.Context, db *sql.DB, batch [][]interface{}, valkeyClient valkey.Client, workerCount int) ([]CallResult, error) {
|
||||
if len(batch) == 0 {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
results := make([]CallResult, len(batch))
|
||||
jobs := make(chan int, len(batch))
|
||||
var wg sync.WaitGroup
|
||||
|
||||
// Start workers
|
||||
for w := 0; w < workerCount; w++ {
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
for idx := range jobs {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
default:
|
||||
id, action, err := InsertOrUpdateEstimatedCall(ctx, db, batch[idx], valkeyClient)
|
||||
results[idx] = CallResult{
|
||||
ID: id,
|
||||
Action: action,
|
||||
Error: err,
|
||||
}
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
// Send jobs
|
||||
for i := range batch {
|
||||
jobs <- i
|
||||
}
|
||||
close(jobs)
|
||||
|
||||
wg.Wait()
|
||||
return results, nil
|
||||
}
|
||||
|
||||
@@ -1,41 +1,135 @@
|
||||
package database
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"fmt"
|
||||
"sync"
|
||||
)
|
||||
|
||||
type EVJResult struct {
|
||||
ID int
|
||||
Action string
|
||||
Error error
|
||||
Index int // To maintain order
|
||||
}
|
||||
|
||||
// PreparedStatements holds reusable prepared statements
|
||||
type PreparedStatements struct {
|
||||
evjStmt *sql.Stmt
|
||||
ecStmt *sql.Stmt
|
||||
rcStmt *sql.Stmt
|
||||
mu sync.Mutex
|
||||
}
|
||||
|
||||
func NewPreparedStatements(db *sql.DB) (*PreparedStatements, error) {
|
||||
evjQuery := `
|
||||
INSERT INTO estimatedvehiclejourney (servicedelivery, recordedattime, lineref, directionref, datasource, datedvehiclejourneyref, vehiclemode, dataframeref, originref, destinationref, operatorref, vehicleref, cancellation, other, firstservicedelivery)
|
||||
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $1)
|
||||
ON CONFLICT (lineref, directionref, datasource, datedvehiclejourneyref)
|
||||
DO UPDATE SET
|
||||
servicedelivery = EXCLUDED.servicedelivery,
|
||||
recordedattime = EXCLUDED.recordedattime,
|
||||
vehiclemode = COALESCE(EXCLUDED.vehiclemode, estimatedvehiclejourney.vehiclemode),
|
||||
dataframeref = COALESCE(EXCLUDED.dataframeref, estimatedvehiclejourney.dataframeref),
|
||||
originref = COALESCE(EXCLUDED.originref, estimatedvehiclejourney.originref),
|
||||
destinationref = COALESCE(EXCLUDED.destinationref, estimatedvehiclejourney.destinationref),
|
||||
operatorref = COALESCE(EXCLUDED.operatorref, estimatedvehiclejourney.operatorref),
|
||||
vehicleref = COALESCE(EXCLUDED.vehicleref, estimatedvehiclejourney.vehicleref),
|
||||
cancellation = COALESCE(EXCLUDED.cancellation, estimatedvehiclejourney.cancellation),
|
||||
other = COALESCE(EXCLUDED.other, estimatedvehiclejourney.other)
|
||||
RETURNING CASE WHEN xmax = 0 THEN 'insert' ELSE 'update' END, id;
|
||||
`
|
||||
|
||||
evjStmt, err := db.Prepare(evjQuery)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to prepare EVJ statement: %w", err)
|
||||
}
|
||||
|
||||
return &PreparedStatements{
|
||||
evjStmt: evjStmt,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (ps *PreparedStatements) Close() {
|
||||
if ps.evjStmt != nil {
|
||||
ps.evjStmt.Close()
|
||||
}
|
||||
if ps.ecStmt != nil {
|
||||
ps.ecStmt.Close()
|
||||
}
|
||||
if ps.rcStmt != nil {
|
||||
ps.rcStmt.Close()
|
||||
}
|
||||
}
|
||||
|
||||
func InsertOrUpdateEstimatedVehicleJourney(db *sql.DB, values []interface{}) (int, string, error) {
|
||||
query := `
|
||||
INSERT INTO estimatedvehiclejourney (servicedelivery, recordedattime, lineref, directionref, datasource, datedvehiclejourneyref, vehiclemode, dataframeref, originref, destinationref, operatorref, vehicleref, cancellation, other, firstservicedelivery)
|
||||
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $1)
|
||||
ON CONFLICT (lineref, directionref, datasource, datedvehiclejourneyref)
|
||||
DO UPDATE SET
|
||||
servicedelivery = EXCLUDED.servicedelivery,
|
||||
recordedattime = EXCLUDED.recordedattime,
|
||||
vehiclemode = COALESCE(EXCLUDED.vehiclemode, estimatedvehiclejourney.vehiclemode),
|
||||
dataframeref = COALESCE(EXCLUDED.dataframeref, estimatedvehiclejourney.dataframeref),
|
||||
originref = COALESCE(EXCLUDED.originref, estimatedvehiclejourney.originref),
|
||||
destinationref = COALESCE(EXCLUDED.destinationref, estimatedvehiclejourney.destinationref),
|
||||
operatorref = COALESCE(EXCLUDED.operatorref, estimatedvehiclejourney.operatorref),
|
||||
vehicleref = COALESCE(EXCLUDED.vehicleref, estimatedvehiclejourney.vehicleref),
|
||||
cancellation = COALESCE(EXCLUDED.cancellation, estimatedvehiclejourney.cancellation),
|
||||
other = COALESCE(EXCLUDED.other, estimatedvehiclejourney.other)
|
||||
RETURNING CASE WHEN xmax = 0 THEN 'insert' ELSE 'update' END, id;
|
||||
INSERT INTO estimatedvehiclejourney (servicedelivery, recordedattime, lineref, directionref, datasource, datedvehiclejourneyref, vehiclemode, dataframeref, originref, destinationref, operatorref, vehicleref, cancellation, other, firstservicedelivery)
|
||||
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $1)
|
||||
ON CONFLICT (lineref, directionref, datasource, datedvehiclejourneyref)
|
||||
DO UPDATE SET
|
||||
servicedelivery = EXCLUDED.servicedelivery,
|
||||
recordedattime = EXCLUDED.recordedattime,
|
||||
vehiclemode = COALESCE(EXCLUDED.vehiclemode, estimatedvehiclejourney.vehiclemode),
|
||||
dataframeref = COALESCE(EXCLUDED.dataframeref, estimatedvehiclejourney.dataframeref),
|
||||
originref = COALESCE(EXCLUDED.originref, estimatedvehiclejourney.originref),
|
||||
destinationref = COALESCE(EXCLUDED.destinationref, estimatedvehiclejourney.destinationref),
|
||||
operatorref = COALESCE(EXCLUDED.operatorref, estimatedvehiclejourney.operatorref),
|
||||
vehicleref = COALESCE(EXCLUDED.vehicleref, estimatedvehiclejourney.vehicleref),
|
||||
cancellation = COALESCE(EXCLUDED.cancellation, estimatedvehiclejourney.cancellation),
|
||||
other = COALESCE(EXCLUDED.other, estimatedvehiclejourney.other)
|
||||
RETURNING CASE WHEN xmax = 0 THEN 'insert' ELSE 'update' END, id;
|
||||
`
|
||||
|
||||
stmt, err := db.Prepare(query)
|
||||
if err != nil {
|
||||
return 0, "", fmt.Errorf("error preparing statement: %v", err)
|
||||
}
|
||||
defer stmt.Close()
|
||||
|
||||
var action string
|
||||
var id int
|
||||
err = stmt.QueryRow(values...).Scan(&action, &id)
|
||||
err := db.QueryRow(query, values...).Scan(&action, &id)
|
||||
if err != nil {
|
||||
return 0, "", fmt.Errorf("error executing statement: %v", err)
|
||||
return 0, "", fmt.Errorf("error executing EVJ statement: %w", err)
|
||||
}
|
||||
|
||||
return id, action, nil
|
||||
}
|
||||
|
||||
// BatchInsertEVJ processes multiple EVJ inserts concurrently
|
||||
func BatchInsertEVJ(ctx context.Context, db *sql.DB, batch [][]interface{}, workerCount int) ([]EVJResult, error) {
|
||||
if len(batch) == 0 {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
results := make([]EVJResult, len(batch))
|
||||
jobs := make(chan int, len(batch))
|
||||
var wg sync.WaitGroup
|
||||
|
||||
// Start workers
|
||||
for w := 0; w < workerCount; w++ {
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
for idx := range jobs {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
default:
|
||||
id, action, err := InsertOrUpdateEstimatedVehicleJourney(db, batch[idx])
|
||||
results[idx] = EVJResult{
|
||||
ID: id,
|
||||
Action: action,
|
||||
Error: err,
|
||||
Index: idx,
|
||||
}
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
// Send jobs
|
||||
for i := range batch {
|
||||
jobs <- i
|
||||
}
|
||||
close(jobs)
|
||||
|
||||
wg.Wait()
|
||||
return results, nil
|
||||
}
|
||||
|
||||
@@ -6,6 +6,7 @@ import (
|
||||
"database/sql"
|
||||
"encoding/hex"
|
||||
"fmt"
|
||||
"sync"
|
||||
"ti1/valki"
|
||||
|
||||
"github.com/valkey-io/valkey-go"
|
||||
@@ -33,12 +34,10 @@ func InsertOrUpdateRecordedCall(ctx context.Context, db *sql.DB, values []interf
|
||||
orderID := values[1]
|
||||
key := fmt.Sprintf("%v.%v", estimatedVehicleJourneyID, orderID)
|
||||
|
||||
var err error
|
||||
|
||||
// Get the MD5 hash from Valkey
|
||||
retrievedHash, err := valki.GetValkeyValue(ctx, valkeyClient, key)
|
||||
if err != nil {
|
||||
return 0, "", fmt.Errorf("failed to get value from Valkey: %v", err)
|
||||
return 0, "", fmt.Errorf("failed to get value from Valkey: %w", err)
|
||||
}
|
||||
|
||||
// Check if the retrieved value matches the original MD5 hash
|
||||
@@ -65,25 +64,60 @@ func InsertOrUpdateRecordedCall(ctx context.Context, db *sql.DB, values []interf
|
||||
recorded_data = EXCLUDED.recorded_data
|
||||
RETURNING CASE WHEN xmax = 0 THEN 'insert' ELSE 'update' END, id;
|
||||
`
|
||||
stmt, err := db.Prepare(query)
|
||||
if err != nil {
|
||||
return 0, "", fmt.Errorf("error preparing statement: %v", err)
|
||||
}
|
||||
defer stmt.Close()
|
||||
|
||||
err = valki.SetValkeyValue(ctx, valkeyClient, key, hashString)
|
||||
if err != nil {
|
||||
return 0, "", fmt.Errorf("failed to set value in Valkey: %v", err)
|
||||
return 0, "", fmt.Errorf("failed to set value in Valkey: %w", err)
|
||||
}
|
||||
|
||||
var action string
|
||||
var id int
|
||||
err = stmt.QueryRow(values...).Scan(&action, &id)
|
||||
err = db.QueryRowContext(ctx, query, values...).Scan(&action, &id)
|
||||
if err != nil {
|
||||
return 0, "", fmt.Errorf("error executing statement: %v", err)
|
||||
return 0, "", fmt.Errorf("error executing statement: %w", err)
|
||||
}
|
||||
return id, action, nil
|
||||
} else {
|
||||
return 0, "none", nil
|
||||
}
|
||||
return 0, "none", nil
|
||||
}
|
||||
|
||||
// BatchInsertRecordedCalls processes multiple recorded calls concurrently
|
||||
func BatchInsertRecordedCalls(ctx context.Context, db *sql.DB, batch [][]interface{}, valkeyClient valkey.Client, workerCount int) ([]CallResult, error) {
|
||||
if len(batch) == 0 {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
results := make([]CallResult, len(batch))
|
||||
jobs := make(chan int, len(batch))
|
||||
var wg sync.WaitGroup
|
||||
|
||||
// Start workers
|
||||
for w := 0; w < workerCount; w++ {
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
for idx := range jobs {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
default:
|
||||
id, action, err := InsertOrUpdateRecordedCall(ctx, db, batch[idx], valkeyClient)
|
||||
results[idx] = CallResult{
|
||||
ID: id,
|
||||
Action: action,
|
||||
Error: err,
|
||||
}
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
// Send jobs
|
||||
for i := range batch {
|
||||
jobs <- i
|
||||
}
|
||||
close(jobs)
|
||||
|
||||
wg.Wait()
|
||||
return results, nil
|
||||
}
|
||||
|
||||
@@ -3,52 +3,21 @@ package database
|
||||
import (
|
||||
"database/sql"
|
||||
"fmt"
|
||||
"ti1/config"
|
||||
)
|
||||
|
||||
func GetDatasetVariable(config config.Config) string {
|
||||
if config.DatasetId != "" {
|
||||
fmt.Println(config.DatasetId)
|
||||
return config.DatasetId
|
||||
} else if config.ExcludedDatasetIds != "" {
|
||||
result := "EX." + config.ExcludedDatasetIds
|
||||
fmt.Println(result)
|
||||
return result
|
||||
}
|
||||
fmt.Println("")
|
||||
return ""
|
||||
}
|
||||
|
||||
func InsertServiceDelivery(db *sql.DB, responseTimestamp string, recordedAtTime string) (int, error) {
|
||||
fmt.Println("Inserting ServiceDelivery...")
|
||||
var id int
|
||||
|
||||
// Load configuration
|
||||
config, err := config.LoadConfig()
|
||||
err := db.QueryRow("INSERT INTO public.ServiceDelivery (ResponseTimestamp, RecordedAtTime) VALUES ($1, $2) RETURNING ID", responseTimestamp, recordedAtTime).Scan(&id)
|
||||
if err != nil {
|
||||
fmt.Println("Error loading config:", err)
|
||||
return 0, err
|
||||
return 0, fmt.Errorf("failed to insert service delivery: %w", err)
|
||||
}
|
||||
|
||||
// Get dataset variable
|
||||
datasetVariable := GetDatasetVariable(config)
|
||||
|
||||
err = db.QueryRow("INSERT INTO public.ServiceDelivery (ResponseTimestamp, RecordedAtTime, Source) VALUES ($1, $2, $3) RETURNING ID", responseTimestamp, recordedAtTime, datasetVariable).Scan(&id)
|
||||
if err != nil {
|
||||
fmt.Println(err)
|
||||
return 0, err
|
||||
}
|
||||
//fmt.Println("ServiceDelivery inserted successfully! (", id, ")")
|
||||
return id, nil
|
||||
}
|
||||
|
||||
func UpdateServiceDeliveryData(db *sql.DB, id int, data string) error {
|
||||
fmt.Println("Updating ServiceDelivery data...")
|
||||
_, err := db.Exec("UPDATE public.ServiceDelivery SET Data = $1 WHERE ID = $2", data, id)
|
||||
if err != nil {
|
||||
fmt.Println(err)
|
||||
return err
|
||||
return fmt.Errorf("failed to update service delivery data: %w", err)
|
||||
}
|
||||
fmt.Println("Finished with this ServiceDelivery!")
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -68,7 +68,6 @@ func SetupDB() error {
|
||||
id INTEGER PRIMARY KEY DEFAULT nextval('public.servicedelivery_id_seq'),
|
||||
responsetimestamp TIMESTAMPTZ,
|
||||
recordedattime TIMESTAMPTZ,
|
||||
source VARCHAR,
|
||||
data JSON
|
||||
);`,
|
||||
}
|
||||
|
||||
89
databaseold/EstimatedCall.go
Normal file
89
databaseold/EstimatedCall.go
Normal file
@@ -0,0 +1,89 @@
|
||||
package databaseold
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/md5"
|
||||
"database/sql"
|
||||
"encoding/hex"
|
||||
"fmt"
|
||||
"ti1/valki"
|
||||
|
||||
"github.com/valkey-io/valkey-go"
|
||||
)
|
||||
|
||||
func InsertOrUpdateEstimatedCall(ctx context.Context, db *sql.DB, values []interface{}, valkeyClient valkey.Client) (int, string, error) {
|
||||
// Replace empty strings with nil for timestamp fields
|
||||
for i, v := range values {
|
||||
if str, ok := v.(string); ok && str == "" {
|
||||
values[i] = nil
|
||||
}
|
||||
}
|
||||
|
||||
// Convert values to a single string and hash it using MD5
|
||||
var valuesString string
|
||||
for _, v := range values {
|
||||
if v != nil {
|
||||
valuesString += fmt.Sprintf("%v", v)
|
||||
}
|
||||
}
|
||||
hash := md5.Sum([]byte(valuesString))
|
||||
hashString := hex.EncodeToString(hash[:])
|
||||
//fmt.Println("HashString:", hashString)
|
||||
|
||||
estimatedVehicleJourneyID := values[0]
|
||||
orderID := values[1]
|
||||
key := fmt.Sprintf("%v.%v", estimatedVehicleJourneyID, orderID)
|
||||
//fmt.Printf("Estimated Vehicle Journey ID: %v, Order ID: %v\n", estimatedVehicleJourneyID, orderID)
|
||||
|
||||
var err error
|
||||
|
||||
// Get the MD5 hash from Valkey
|
||||
retrievedHash, err := valki.GetValkeyValue(ctx, valkeyClient, key)
|
||||
if err != nil {
|
||||
return 0, "", fmt.Errorf("failed to get value from Valkey: %v", err)
|
||||
}
|
||||
|
||||
// Check if the retrieved value matches the original MD5 hash
|
||||
if retrievedHash != hashString {
|
||||
query := `
|
||||
INSERT INTO calls (
|
||||
estimatedvehiclejourney, "order", stoppointref,
|
||||
aimeddeparturetime, expecteddeparturetime,
|
||||
aimedarrivaltime, expectedarrivaltime,
|
||||
cancellation, estimated_data
|
||||
)
|
||||
VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9)
|
||||
ON CONFLICT (estimatedvehiclejourney, "order")
|
||||
DO UPDATE SET
|
||||
stoppointref = EXCLUDED.stoppointref,
|
||||
aimeddeparturetime = EXCLUDED.aimeddeparturetime,
|
||||
expecteddeparturetime = EXCLUDED.expecteddeparturetime,
|
||||
aimedarrivaltime = EXCLUDED.aimedarrivaltime,
|
||||
expectedarrivaltime = EXCLUDED.expectedarrivaltime,
|
||||
cancellation = EXCLUDED.cancellation,
|
||||
estimated_data = EXCLUDED.estimated_data
|
||||
RETURNING CASE WHEN xmax = 0 THEN 'insert' ELSE 'update' END, id;
|
||||
`
|
||||
stmt, err := db.Prepare(query)
|
||||
if err != nil {
|
||||
return 0, "", fmt.Errorf("error preparing statement: %v", err)
|
||||
}
|
||||
defer stmt.Close()
|
||||
|
||||
err = valki.SetValkeyValue(ctx, valkeyClient, key, hashString)
|
||||
if err != nil {
|
||||
return 0, "", fmt.Errorf("failed to set value in Valkey: %v", err)
|
||||
}
|
||||
|
||||
var action string
|
||||
var id int
|
||||
err = stmt.QueryRow(values...).Scan(&action, &id)
|
||||
if err != nil {
|
||||
return 0, "", fmt.Errorf("error executing statement: %v", err)
|
||||
}
|
||||
return id, action, nil
|
||||
} else {
|
||||
//fmt.Printf("MATCH!!! Original Hash: %s, Retrieved Hash: %s\n", hashString, retrievedHash)
|
||||
return 0, "none", nil
|
||||
}
|
||||
}
|
||||
41
databaseold/EstimatedVehicleJourney.go
Normal file
41
databaseold/EstimatedVehicleJourney.go
Normal file
@@ -0,0 +1,41 @@
|
||||
package databaseold
|
||||
|
||||
import (
|
||||
"database/sql"
|
||||
"fmt"
|
||||
)
|
||||
|
||||
func InsertOrUpdateEstimatedVehicleJourney(db *sql.DB, values []interface{}) (int, string, error) {
|
||||
query := `
|
||||
INSERT INTO estimatedvehiclejourney (servicedelivery, recordedattime, lineref, directionref, datasource, datedvehiclejourneyref, vehiclemode, dataframeref, originref, destinationref, operatorref, vehicleref, cancellation, other, firstservicedelivery)
|
||||
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $1)
|
||||
ON CONFLICT (lineref, directionref, datasource, datedvehiclejourneyref)
|
||||
DO UPDATE SET
|
||||
servicedelivery = EXCLUDED.servicedelivery,
|
||||
recordedattime = EXCLUDED.recordedattime,
|
||||
vehiclemode = COALESCE(EXCLUDED.vehiclemode, estimatedvehiclejourney.vehiclemode),
|
||||
dataframeref = COALESCE(EXCLUDED.dataframeref, estimatedvehiclejourney.dataframeref),
|
||||
originref = COALESCE(EXCLUDED.originref, estimatedvehiclejourney.originref),
|
||||
destinationref = COALESCE(EXCLUDED.destinationref, estimatedvehiclejourney.destinationref),
|
||||
operatorref = COALESCE(EXCLUDED.operatorref, estimatedvehiclejourney.operatorref),
|
||||
vehicleref = COALESCE(EXCLUDED.vehicleref, estimatedvehiclejourney.vehicleref),
|
||||
cancellation = COALESCE(EXCLUDED.cancellation, estimatedvehiclejourney.cancellation),
|
||||
other = COALESCE(EXCLUDED.other, estimatedvehiclejourney.other)
|
||||
RETURNING CASE WHEN xmax = 0 THEN 'insert' ELSE 'update' END, id;
|
||||
`
|
||||
|
||||
stmt, err := db.Prepare(query)
|
||||
if err != nil {
|
||||
return 0, "", fmt.Errorf("error preparing statement: %v", err)
|
||||
}
|
||||
defer stmt.Close()
|
||||
|
||||
var action string
|
||||
var id int
|
||||
err = stmt.QueryRow(values...).Scan(&action, &id)
|
||||
if err != nil {
|
||||
return 0, "", fmt.Errorf("error executing statement: %v", err)
|
||||
}
|
||||
|
||||
return id, action, nil
|
||||
}
|
||||
89
databaseold/RecordedCall.go
Normal file
89
databaseold/RecordedCall.go
Normal file
@@ -0,0 +1,89 @@
|
||||
package databaseold
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/md5"
|
||||
"database/sql"
|
||||
"encoding/hex"
|
||||
"fmt"
|
||||
"ti1/valki"
|
||||
|
||||
"github.com/valkey-io/valkey-go"
|
||||
)
|
||||
|
||||
func InsertOrUpdateRecordedCall(ctx context.Context, db *sql.DB, values []interface{}, valkeyClient valkey.Client) (int, string, error) {
|
||||
// Replace empty strings with nil for timestamp fields
|
||||
for i, v := range values {
|
||||
if str, ok := v.(string); ok && str == "" {
|
||||
values[i] = nil
|
||||
}
|
||||
}
|
||||
|
||||
// Convert values to a single string and hash it using MD5
|
||||
var valuesString string
|
||||
for _, v := range values {
|
||||
if v != nil {
|
||||
valuesString += fmt.Sprintf("%v", v)
|
||||
}
|
||||
}
|
||||
hash := md5.Sum([]byte(valuesString))
|
||||
hashString := hex.EncodeToString(hash[:])
|
||||
|
||||
estimatedVehicleJourneyID := values[0]
|
||||
orderID := values[1]
|
||||
key := fmt.Sprintf("%v.%v", estimatedVehicleJourneyID, orderID)
|
||||
|
||||
var err error
|
||||
|
||||
// Get the MD5 hash from Valkey
|
||||
retrievedHash, err := valki.GetValkeyValue(ctx, valkeyClient, key)
|
||||
if err != nil {
|
||||
return 0, "", fmt.Errorf("failed to get value from Valkey: %v", err)
|
||||
}
|
||||
|
||||
// Check if the retrieved value matches the original MD5 hash
|
||||
if retrievedHash != hashString {
|
||||
query := `
|
||||
INSERT INTO calls (
|
||||
estimatedvehiclejourney, "order", stoppointref,
|
||||
aimeddeparturetime, expecteddeparturetime,
|
||||
aimedarrivaltime, expectedarrivaltime,
|
||||
cancellation, actualdeparturetime, actualarrivaltime,
|
||||
recorded_data
|
||||
)
|
||||
VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11)
|
||||
ON CONFLICT (estimatedvehiclejourney, "order")
|
||||
DO UPDATE SET
|
||||
stoppointref = EXCLUDED.stoppointref,
|
||||
aimeddeparturetime = EXCLUDED.aimeddeparturetime,
|
||||
expecteddeparturetime = EXCLUDED.expecteddeparturetime,
|
||||
aimedarrivaltime = EXCLUDED.aimedarrivaltime,
|
||||
expectedarrivaltime = EXCLUDED.expectedarrivaltime,
|
||||
cancellation = EXCLUDED.cancellation,
|
||||
actualdeparturetime = EXCLUDED.actualdeparturetime,
|
||||
actualarrivaltime = EXCLUDED.actualarrivaltime,
|
||||
recorded_data = EXCLUDED.recorded_data
|
||||
RETURNING CASE WHEN xmax = 0 THEN 'insert' ELSE 'update' END, id;
|
||||
`
|
||||
stmt, err := db.Prepare(query)
|
||||
if err != nil {
|
||||
return 0, "", fmt.Errorf("error preparing statement: %v", err)
|
||||
}
|
||||
defer stmt.Close()
|
||||
|
||||
err = valki.SetValkeyValue(ctx, valkeyClient, key, hashString)
|
||||
if err != nil {
|
||||
return 0, "", fmt.Errorf("failed to set value in Valkey: %v", err)
|
||||
}
|
||||
|
||||
var action string
|
||||
var id int
|
||||
err = stmt.QueryRow(values...).Scan(&action, &id)
|
||||
if err != nil {
|
||||
return 0, "", fmt.Errorf("error executing statement: %v", err)
|
||||
}
|
||||
return id, action, nil
|
||||
} else {
|
||||
return 0, "none", nil
|
||||
}
|
||||
}
|
||||
30
databaseold/ServiceDeliveryDB.go
Normal file
30
databaseold/ServiceDeliveryDB.go
Normal file
@@ -0,0 +1,30 @@
|
||||
package databaseold
|
||||
|
||||
import (
|
||||
"database/sql"
|
||||
"fmt"
|
||||
)
|
||||
|
||||
func InsertServiceDelivery(db *sql.DB, responseTimestamp string, recordedAtTime string) (int, error) {
|
||||
fmt.Println("Inserting ServiceDelivery...")
|
||||
var id int
|
||||
|
||||
err := db.QueryRow("INSERT INTO public.ServiceDelivery (ResponseTimestamp, RecordedAtTime) VALUES ($1, $2) RETURNING ID", responseTimestamp, recordedAtTime).Scan(&id)
|
||||
if err != nil {
|
||||
fmt.Println(err)
|
||||
return 0, err
|
||||
}
|
||||
//fmt.Println("ServiceDelivery inserted successfully! (", id, ")")
|
||||
return id, nil
|
||||
}
|
||||
|
||||
func UpdateServiceDeliveryData(db *sql.DB, id int, data string) error {
|
||||
fmt.Println("Updating ServiceDelivery data...")
|
||||
_, err := db.Exec("UPDATE public.ServiceDelivery SET Data = $1 WHERE ID = $2", data, id)
|
||||
if err != nil {
|
||||
fmt.Println(err)
|
||||
return err
|
||||
}
|
||||
fmt.Println("Finished with this ServiceDelivery!")
|
||||
return nil
|
||||
}
|
||||
140
databaseold/SetupDB.go
Normal file
140
databaseold/SetupDB.go
Normal file
@@ -0,0 +1,140 @@
|
||||
package databaseold
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"ti1/config"
|
||||
)
|
||||
|
||||
func SetupDB() error {
|
||||
fmt.Println("Setting up the database...")
|
||||
|
||||
// Connect to PostgreSQL
|
||||
db, err := config.ConnectToPostgreSQL()
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to connect to database: %w", err)
|
||||
}
|
||||
defer config.DisconnectFromPostgreSQL(db)
|
||||
|
||||
// Create sequences if they do not exist
|
||||
sequences := []string{
|
||||
"CREATE SEQUENCE IF NOT EXISTS public.calls_id_seq",
|
||||
"CREATE SEQUENCE IF NOT EXISTS public.estimatedvehiclejourney_id_seq",
|
||||
"CREATE SEQUENCE IF NOT EXISTS public.servicedelivery_id_seq",
|
||||
}
|
||||
|
||||
for _, seq := range sequences {
|
||||
_, err := db.Exec(seq)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to create sequence: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
// Check if tables exist and have the correct structure
|
||||
tables := map[string]string{
|
||||
"calls": `CREATE TABLE IF NOT EXISTS public.calls (
|
||||
id BIGINT PRIMARY KEY DEFAULT nextval('public.calls_id_seq'),
|
||||
estimatedvehiclejourney BIGINT,
|
||||
"order" INTEGER,
|
||||
stoppointref VARCHAR,
|
||||
aimeddeparturetime TIMESTAMP,
|
||||
expecteddeparturetime TIMESTAMP,
|
||||
aimedarrivaltime TIMESTAMP,
|
||||
expectedarrivaltime TIMESTAMP,
|
||||
cancellation VARCHAR,
|
||||
actualdeparturetime TIMESTAMP,
|
||||
actualarrivaltime TIMESTAMP,
|
||||
estimated_data JSON,
|
||||
recorded_data JSON
|
||||
);`,
|
||||
"estimatedvehiclejourney": `CREATE TABLE IF NOT EXISTS public.estimatedvehiclejourney (
|
||||
id BIGINT PRIMARY KEY DEFAULT nextval('public.estimatedvehiclejourney_id_seq'),
|
||||
servicedelivery INTEGER,
|
||||
recordedattime TIMESTAMP,
|
||||
lineref VARCHAR,
|
||||
directionref VARCHAR,
|
||||
datasource VARCHAR,
|
||||
datedvehiclejourneyref VARCHAR,
|
||||
vehiclemode VARCHAR,
|
||||
dataframeref VARCHAR,
|
||||
originref VARCHAR,
|
||||
destinationref VARCHAR,
|
||||
operatorref VARCHAR,
|
||||
vehicleref VARCHAR,
|
||||
cancellation VARCHAR,
|
||||
other JSON,
|
||||
firstservicedelivery INTEGER
|
||||
);`,
|
||||
"servicedelivery": `CREATE TABLE IF NOT EXISTS public.servicedelivery (
|
||||
id INTEGER PRIMARY KEY DEFAULT nextval('public.servicedelivery_id_seq'),
|
||||
responsetimestamp TIMESTAMPTZ,
|
||||
recordedattime TIMESTAMPTZ,
|
||||
data JSON
|
||||
);`,
|
||||
}
|
||||
|
||||
for table, createStmt := range tables {
|
||||
var exists bool
|
||||
err := db.QueryRow(fmt.Sprintf("SELECT EXISTS (SELECT 1 FROM information_schema.tables WHERE table_name = '%s')", table)).Scan(&exists)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to check if table %s exists: %w", table, err)
|
||||
}
|
||||
|
||||
if !exists {
|
||||
_, err := db.Exec(createStmt)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to create table %s: %w", table, err)
|
||||
}
|
||||
fmt.Printf("Table %s created successfully!\n", table)
|
||||
} else {
|
||||
fmt.Printf("Table %s already exists.\n", table)
|
||||
}
|
||||
}
|
||||
|
||||
// Check if the unique constraint exists before adding it to calls table
|
||||
var constraintExists bool
|
||||
err = db.QueryRow(`
|
||||
SELECT EXISTS (
|
||||
SELECT 1
|
||||
FROM pg_constraint
|
||||
WHERE conname = 'unique_estimatedvehiclejourney_order'
|
||||
);
|
||||
`).Scan(&constraintExists)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to check if unique constraint exists: %w", err)
|
||||
}
|
||||
|
||||
if !constraintExists {
|
||||
_, err = db.Exec(`ALTER TABLE calls ADD CONSTRAINT unique_estimatedvehiclejourney_order UNIQUE (estimatedvehiclejourney, "order");`)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to add unique constraint to calls table: %w", err)
|
||||
}
|
||||
fmt.Println("Unique constraint added to calls table.")
|
||||
} else {
|
||||
fmt.Println("Unique constraint already exists on calls table.")
|
||||
}
|
||||
|
||||
// Check if the unique constraint exists before adding it to estimatedvehiclejourney table
|
||||
err = db.QueryRow(`
|
||||
SELECT EXISTS (
|
||||
SELECT 1
|
||||
FROM pg_constraint
|
||||
WHERE conname = 'unique_lineref_directionref_datasource_datedvehiclejourneyref'
|
||||
);
|
||||
`).Scan(&constraintExists)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to check if unique constraint exists: %w", err)
|
||||
}
|
||||
|
||||
if !constraintExists {
|
||||
_, err = db.Exec(`ALTER TABLE estimatedvehiclejourney ADD CONSTRAINT unique_lineref_directionref_datasource_datedvehiclejourneyref UNIQUE (lineref, directionref, datasource, datedvehiclejourneyref);`)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to add unique constraint to estimatedvehiclejourney table: %w", err)
|
||||
}
|
||||
fmt.Println("Unique constraint added to estimatedvehiclejourney table.")
|
||||
} else {
|
||||
fmt.Println("Unique constraint already exists on estimatedvehiclejourney table.")
|
||||
}
|
||||
|
||||
fmt.Println("Database setup is good!")
|
||||
return nil
|
||||
}
|
||||
@@ -6,12 +6,21 @@ import (
|
||||
"fmt"
|
||||
"log"
|
||||
"strings"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"ti1/config"
|
||||
"ti1/data"
|
||||
"ti1/database"
|
||||
"time"
|
||||
)
|
||||
|
||||
// DBData is the main entry point for data processing
|
||||
func DBData(data *data.Data) {
|
||||
DBDataOptimized(data)
|
||||
}
|
||||
|
||||
// DBDataOptimized processes data with concurrent workers for better performance
|
||||
func DBDataOptimized(data *data.Data) {
|
||||
fmt.Println(data.ServiceDelivery.ResponseTimestamp)
|
||||
fmt.Println(data.ServiceDelivery.EstimatedTimetableDelivery[0].EstimatedJourneyVersionFrame.RecordedAtTime)
|
||||
|
||||
@@ -22,7 +31,7 @@ func DBData(data *data.Data) {
|
||||
defer db.Close()
|
||||
|
||||
// Connect to Valkey
|
||||
valkeyClient, err := config.ConnectToValkey()
|
||||
valkeyClient, err := config.ConnectToValkey("config/conf.json")
|
||||
if err != nil {
|
||||
log.Fatalf("Failed to connect to Valkey: %v", err)
|
||||
}
|
||||
@@ -37,395 +46,440 @@ func DBData(data *data.Data) {
|
||||
}
|
||||
fmt.Println("SID:", sid)
|
||||
|
||||
// counters
|
||||
var insertCount, updateCount, totalCount, estimatedCallInsertCount, estimatedCallUpdateCount, estimatedCallNoneCount, recordedCallInsertCount, recordedCallUpdateCount, recordedCallNoneCount int
|
||||
// Record start time
|
||||
startTime := time.Now()
|
||||
|
||||
for _, journey := range data.ServiceDelivery.EstimatedTimetableDelivery[0].EstimatedJourneyVersionFrame.EstimatedVehicleJourney {
|
||||
var values []interface{}
|
||||
var datedVehicleJourneyRef, otherJson string
|
||||
// Atomic counters for thread-safe counting
|
||||
var insertCount, updateCount, estimatedCallInsertCount, estimatedCallUpdateCount, estimatedCallNoneCount, recordedCallInsertCount, recordedCallUpdateCount, recordedCallNoneCount int64
|
||||
|
||||
values = append(values, sid)
|
||||
values = append(values, journey.RecordedAtTime)
|
||||
values = append(values, journey.LineRef)
|
||||
//had to add to lowercase cus some values vary in case and it was causing duplicates
|
||||
values = append(values, strings.ToLower(journey.DirectionRef))
|
||||
values = append(values, journey.DataSource)
|
||||
|
||||
if journey.FramedVehicleJourneyRef.DatedVehicleJourneyRef != "" {
|
||||
datedVehicleJourneyRef = journey.FramedVehicleJourneyRef.DatedVehicleJourneyRef
|
||||
} else if journey.DatedVehicleJourneyRef != "" {
|
||||
datedVehicleJourneyRef = journey.DatedVehicleJourneyRef
|
||||
} else {
|
||||
datedVehicleJourneyRef = "evj." + journey.EstimatedVehicleJourneyCode
|
||||
}
|
||||
values = append(values, datedVehicleJourneyRef)
|
||||
|
||||
values = append(values, journey.VehicleMode)
|
||||
values = append(values, journey.FramedVehicleJourneyRef.DataFrameRef)
|
||||
values = append(values, journey.OriginRef)
|
||||
values = append(values, journey.DestinationRef)
|
||||
values = append(values, journey.OperatorRef)
|
||||
values = append(values, journey.VehicleRef)
|
||||
values = append(values, journey.Cancellation)
|
||||
|
||||
// Create a map to hold the JSON object for the current journey
|
||||
jsonObject := make(map[string]interface{})
|
||||
|
||||
// Add relevant fields to the JSON object
|
||||
if journey.OriginName != "" {
|
||||
jsonObject["OriginName"] = journey.OriginName
|
||||
}
|
||||
if journey.DestinationName != "" {
|
||||
jsonObject["DestinationName"] = journey.DestinationName
|
||||
}
|
||||
if journey.ProductCategoryRef != "" {
|
||||
jsonObject["ProductCategoryRef"] = journey.ProductCategoryRef
|
||||
}
|
||||
if journey.ServiceFeatureRef != "" {
|
||||
jsonObject["ServiceFeatureRef"] = journey.ServiceFeatureRef
|
||||
}
|
||||
if journey.Monitored != "" {
|
||||
jsonObject["Monitored"] = journey.Monitored
|
||||
}
|
||||
if journey.JourneyPatternRef != "" {
|
||||
jsonObject["JourneyPatternRef"] = journey.JourneyPatternRef
|
||||
}
|
||||
if journey.JourneyPatternName != "" {
|
||||
jsonObject["JourneyPatternName"] = journey.JourneyPatternName
|
||||
}
|
||||
if journey.PublishedLineName != "" {
|
||||
jsonObject["PublishedLineName"] = journey.PublishedLineName
|
||||
}
|
||||
if journey.DirectionName != "" {
|
||||
jsonObject["DirectionName"] = journey.DirectionName
|
||||
}
|
||||
if journey.OriginAimedDepartureTime != "" {
|
||||
jsonObject["OriginAimedDepartureTime"] = journey.OriginAimedDepartureTime
|
||||
}
|
||||
if journey.DestinationAimedArrivalTime != "" {
|
||||
jsonObject["DestinationAimedArrivalTime"] = journey.DestinationAimedArrivalTime
|
||||
}
|
||||
if journey.BlockRef != "" {
|
||||
jsonObject["BlockRef"] = journey.BlockRef
|
||||
}
|
||||
if journey.VehicleJourneyRef != "" {
|
||||
jsonObject["VehicleJourneyRef"] = journey.VehicleJourneyRef
|
||||
}
|
||||
if journey.Occupancy != "" {
|
||||
jsonObject["Occupancy"] = journey.Occupancy
|
||||
}
|
||||
if journey.DestinationDisplayAtOrigin != "" {
|
||||
jsonObject["DestinationDisplayAtOrigin"] = journey.DestinationDisplayAtOrigin
|
||||
}
|
||||
if journey.ExtraJourney != "" {
|
||||
jsonObject["ExtraJourney"] = journey.ExtraJourney
|
||||
}
|
||||
if journey.RouteRef != "" {
|
||||
jsonObject["RouteRef"] = journey.RouteRef
|
||||
}
|
||||
if journey.GroupOfLinesRef != "" {
|
||||
jsonObject["GroupOfLinesRef"] = journey.GroupOfLinesRef
|
||||
}
|
||||
if journey.ExternalLineRef != "" {
|
||||
jsonObject["ExternalLineRef"] = journey.ExternalLineRef
|
||||
}
|
||||
if journey.InCongestion != "" {
|
||||
jsonObject["InCongestion"] = journey.InCongestion
|
||||
}
|
||||
if journey.PredictionInaccurate != "" {
|
||||
jsonObject["PredictionInaccurate"] = journey.PredictionInaccurate
|
||||
}
|
||||
if journey.JourneyNote != "" {
|
||||
jsonObject["JourneyNote"] = journey.JourneyNote
|
||||
}
|
||||
if journey.Via.PlaceName != "" {
|
||||
jsonObject["Via"] = journey.Via.PlaceName
|
||||
}
|
||||
|
||||
// Convert the JSON object to a JSON string
|
||||
jsonString, err := json.Marshal(jsonObject)
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
otherJson = string(jsonString)
|
||||
values = append(values, otherJson)
|
||||
|
||||
// Insert or update the record
|
||||
id, action, err := database.InsertOrUpdateEstimatedVehicleJourney(db, values)
|
||||
if err != nil {
|
||||
fmt.Printf("Error inserting/updating estimated vehicle journey: %v\n", err)
|
||||
} else {
|
||||
if 1 == 0 {
|
||||
fmt.Printf("Action: %s, ID: %d\n", action, id)
|
||||
}
|
||||
|
||||
if action == "insert" {
|
||||
insertCount++
|
||||
} else if action == "update" {
|
||||
updateCount++
|
||||
}
|
||||
totalCount = insertCount + updateCount
|
||||
|
||||
//fmt.Printf("Inserts: %d, Updates: %d, Total: %d\n", insertCount, updateCount, totalCount)
|
||||
if totalCount%1000 == 0 {
|
||||
fmt.Printf(
|
||||
"Inserts: %d, Updates: %d, Total: %d; estimatedCalls = I: %d U: %d N: %d; recordedCalls = I: %d U: %d N: %d\n",
|
||||
insertCount,
|
||||
updateCount,
|
||||
totalCount,
|
||||
estimatedCallInsertCount,
|
||||
estimatedCallUpdateCount,
|
||||
estimatedCallNoneCount,
|
||||
recordedCallInsertCount,
|
||||
recordedCallUpdateCount,
|
||||
recordedCallNoneCount,
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
for _, estimatedCall := range journey.EstimatedCalls {
|
||||
for _, call := range estimatedCall.EstimatedCall {
|
||||
var estimatedValues []interface{}
|
||||
|
||||
//1 estimatedvehiclejourney
|
||||
estimatedValues = append(estimatedValues, id)
|
||||
//2 order
|
||||
estimatedValues = append(estimatedValues, call.Order)
|
||||
//3 stoppointref
|
||||
estimatedValues = append(estimatedValues, call.StopPointRef)
|
||||
//4 aimeddeparturetime
|
||||
estimatedValues = append(estimatedValues, call.AimedDepartureTime)
|
||||
//5 expecteddeparturetime
|
||||
estimatedValues = append(estimatedValues, call.ExpectedDepartureTime)
|
||||
//6 aimedarrivaltime
|
||||
estimatedValues = append(estimatedValues, call.AimedArrivalTime)
|
||||
//7 expectedarrivaltime
|
||||
estimatedValues = append(estimatedValues, call.ExpectedArrivalTime)
|
||||
//8 cancellation
|
||||
estimatedValues = append(estimatedValues, call.Cancellation)
|
||||
|
||||
//9 estimated_data (JSON)
|
||||
estimatedJsonObject := make(map[string]interface{})
|
||||
// data allrady loged
|
||||
if call.ExpectedDepartureTime != "" {
|
||||
estimatedJsonObject["ExpectedDepartureTime"] = call.ExpectedDepartureTime
|
||||
}
|
||||
if call.ExpectedArrivalTime != "" {
|
||||
estimatedJsonObject["ExpectedArrivalTime"] = call.ExpectedArrivalTime
|
||||
}
|
||||
if call.Cancellation != "" {
|
||||
estimatedJsonObject["Cancellation"] = call.Cancellation
|
||||
}
|
||||
// The rest
|
||||
if call.StopPointName != "" {
|
||||
estimatedJsonObject["StopPointName"] = call.StopPointName
|
||||
}
|
||||
if call.RequestStop != "" {
|
||||
estimatedJsonObject["RequestStop"] = call.RequestStop
|
||||
}
|
||||
if call.DepartureStatus != "" {
|
||||
estimatedJsonObject["DepartureStatus"] = call.DepartureStatus
|
||||
}
|
||||
if call.DeparturePlatformName != "" {
|
||||
estimatedJsonObject["DeparturePlatformName"] = call.DeparturePlatformName
|
||||
}
|
||||
if call.DepartureBoardingActivity != "" {
|
||||
estimatedJsonObject["DepartureBoardingActivity"] = call.DepartureBoardingActivity
|
||||
}
|
||||
if call.DepartureStopAssignment.AimedQuayRef != "" {
|
||||
estimatedJsonObject["DepartureStopAssignment.AimedQuayRef"] = call.DepartureStopAssignment.AimedQuayRef
|
||||
}
|
||||
if call.DepartureStopAssignment.ExpectedQuayRef != "" {
|
||||
estimatedJsonObject["DepartureStopAssignment.ExpectedQuayRef"] = call.DepartureStopAssignment.ExpectedQuayRef
|
||||
}
|
||||
if call.DepartureStopAssignment.ActualQuayRef != "" {
|
||||
estimatedJsonObject["DepartureStopAssignment.ActualQuayRef"] = call.DepartureStopAssignment.ActualQuayRef
|
||||
}
|
||||
if call.Extensions.StopsAtAirport != "" {
|
||||
estimatedJsonObject["Extensions.StopsAtAirport"] = call.Extensions.StopsAtAirport
|
||||
}
|
||||
if call.ArrivalStatus != "" {
|
||||
estimatedJsonObject["ArrivalStatus"] = call.ArrivalStatus
|
||||
}
|
||||
if call.ArrivalPlatformName != "" {
|
||||
estimatedJsonObject["ArrivalPlatformName"] = call.ArrivalPlatformName
|
||||
}
|
||||
if call.ArrivalBoardingActivity != "" {
|
||||
estimatedJsonObject["ArrivalBoardingActivity"] = call.ArrivalBoardingActivity
|
||||
}
|
||||
if call.ArrivalStopAssignment.AimedQuayRef != "" {
|
||||
estimatedJsonObject["ArrivalStopAssignment.AimedQuayRef"] = call.ArrivalStopAssignment.AimedQuayRef
|
||||
}
|
||||
if call.ArrivalStopAssignment.ExpectedQuayRef != "" {
|
||||
estimatedJsonObject["ArrivalStopAssignment.ExpectedQuayRef"] = call.ArrivalStopAssignment.ExpectedQuayRef
|
||||
}
|
||||
if call.ArrivalStopAssignment.ActualQuayRef != "" {
|
||||
estimatedJsonObject["ArrivalStopAssignment.ActualQuayRef"] = call.ArrivalStopAssignment.ActualQuayRef
|
||||
}
|
||||
if call.CallNote != "" {
|
||||
estimatedJsonObject["CallNote"] = call.CallNote
|
||||
}
|
||||
if call.DestinationDisplay != "" {
|
||||
estimatedJsonObject["DestinationDisplay"] = call.DestinationDisplay
|
||||
}
|
||||
if call.ExpectedDeparturePredictionQuality.PredictionLevel != "" {
|
||||
estimatedJsonObject["ExpectedDeparturePredictionQuality.PredictionLevel"] = call.ExpectedDeparturePredictionQuality.PredictionLevel
|
||||
}
|
||||
if call.ExpectedArrivalPredictionQuality.PredictionLevel != "" {
|
||||
estimatedJsonObject["ExpectedArrivalPredictionQuality.PredictionLevel"] = call.ExpectedArrivalPredictionQuality.PredictionLevel
|
||||
}
|
||||
if call.TimingPoint != "" {
|
||||
estimatedJsonObject["TimingPoint"] = call.TimingPoint
|
||||
}
|
||||
if call.SituationRef != "" {
|
||||
estimatedJsonObject["SituationRef"] = call.SituationRef
|
||||
}
|
||||
if call.PredictionInaccurate != "" {
|
||||
estimatedJsonObject["PredictionInaccurate"] = call.PredictionInaccurate
|
||||
}
|
||||
if call.Occupancy != "" {
|
||||
estimatedJsonObject["Occupancy"] = call.Occupancy
|
||||
}
|
||||
|
||||
// Convert the JSON object to a JSON string
|
||||
jsonString, err := json.Marshal(estimatedJsonObject)
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
estimatedValues = append(estimatedValues, string(jsonString))
|
||||
|
||||
// Insert or update the record
|
||||
stringValues := make([]string, len(estimatedValues))
|
||||
for i, v := range estimatedValues {
|
||||
stringValues[i] = fmt.Sprintf("%v", v)
|
||||
}
|
||||
interfaceValues := make([]interface{}, len(stringValues))
|
||||
for i, v := range stringValues {
|
||||
interfaceValues[i] = v
|
||||
}
|
||||
id, action, err := database.InsertOrUpdateEstimatedCall(ctx, db, interfaceValues, valkeyClient)
|
||||
if err != nil {
|
||||
log.Fatalf("Failed to insert or update estimated call: %v", err)
|
||||
} else {
|
||||
if 1 == 0 {
|
||||
fmt.Printf("Action: %s, ID: %d\n", action, id)
|
||||
}
|
||||
|
||||
if action == "insert" {
|
||||
estimatedCallInsertCount++
|
||||
} else if action == "update" {
|
||||
estimatedCallUpdateCount++
|
||||
} else if action == "none" {
|
||||
estimatedCallNoneCount++
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
for _, recordedCall := range journey.RecordedCalls {
|
||||
for _, call := range recordedCall.RecordedCall {
|
||||
var recordedValues []interface{}
|
||||
|
||||
//1 estimatedvehiclejourney
|
||||
recordedValues = append(recordedValues, id)
|
||||
//2 order
|
||||
recordedValues = append(recordedValues, call.Order)
|
||||
//3 stoppointref
|
||||
recordedValues = append(recordedValues, call.StopPointRef)
|
||||
//4 aimeddeparturetime
|
||||
recordedValues = append(recordedValues, call.AimedDepartureTime)
|
||||
//5 expecteddeparturetime
|
||||
recordedValues = append(recordedValues, call.ExpectedDepartureTime)
|
||||
//6 aimedarrivaltime
|
||||
recordedValues = append(recordedValues, call.AimedArrivalTime)
|
||||
//7 expectedarrivaltime
|
||||
recordedValues = append(recordedValues, call.ExpectedArrivalTime)
|
||||
//8 cancellation
|
||||
recordedValues = append(recordedValues, call.Cancellation)
|
||||
//9 actualdeparturetime
|
||||
recordedValues = append(recordedValues, call.ActualDepartureTime)
|
||||
//10 actualarrivaltime
|
||||
recordedValues = append(recordedValues, call.ActualArrivalTime)
|
||||
|
||||
//11 recorded_data (JSON)
|
||||
recordedJsonObject := make(map[string]interface{})
|
||||
if call.StopPointName != "" {
|
||||
recordedJsonObject["StopPointName"] = call.StopPointName
|
||||
}
|
||||
if call.ArrivalPlatformName != "" {
|
||||
recordedJsonObject["ArrivalPlatformName"] = call.ArrivalPlatformName
|
||||
}
|
||||
if call.DeparturePlatformName != "" {
|
||||
recordedJsonObject["DeparturePlatformName"] = call.DeparturePlatformName
|
||||
}
|
||||
if call.PredictionInaccurate != "" {
|
||||
recordedJsonObject["PredictionInaccurate"] = call.PredictionInaccurate
|
||||
}
|
||||
if call.Occupancy != "" {
|
||||
recordedJsonObject["Occupancy"] = call.Occupancy
|
||||
}
|
||||
|
||||
// Convert the JSON object to a JSON string
|
||||
jsonString, err := json.Marshal(recordedJsonObject)
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
recordedValues = append(recordedValues, string(jsonString))
|
||||
|
||||
// Insert or update the record
|
||||
stringValues := make([]string, len(recordedValues))
|
||||
for i, v := range recordedValues {
|
||||
stringValues[i] = fmt.Sprintf("%v", v)
|
||||
}
|
||||
interfaceValues := make([]interface{}, len(stringValues))
|
||||
for i, v := range stringValues {
|
||||
interfaceValues[i] = v
|
||||
}
|
||||
|
||||
id, action, err := database.InsertOrUpdateRecordedCall(ctx, db, interfaceValues, valkeyClient)
|
||||
if err != nil {
|
||||
fmt.Printf("Error inserting/updating recorded call: %v\n", err)
|
||||
} else {
|
||||
if 1 == 0 {
|
||||
fmt.Printf("Action: %s, ID: %d\n", action, id)
|
||||
}
|
||||
|
||||
if action == "insert" {
|
||||
recordedCallInsertCount++
|
||||
//fmt.Printf("Action: %s, ID: %d\n", action, id)
|
||||
} else if action == "update" {
|
||||
recordedCallUpdateCount++
|
||||
} else if action == "none" {
|
||||
recordedCallNoneCount++
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
journeys := data.ServiceDelivery.EstimatedTimetableDelivery[0].EstimatedJourneyVersionFrame.EstimatedVehicleJourney
|
||||
totalJourneys := len(journeys)
|
||||
fmt.Printf("Processing %d journeys...\n", totalJourneys)
|
||||
|
||||
// Job structures
|
||||
type evjJob struct {
|
||||
index int
|
||||
}
|
||||
|
||||
type callJob struct {
|
||||
evjID int
|
||||
values []interface{}
|
||||
}
|
||||
|
||||
// Channels
|
||||
workerCount := 20 // Adjust based on your database and CPU
|
||||
evjJobs := make(chan evjJob, workerCount*2)
|
||||
estimatedCallJobs := make(chan callJob, workerCount*10)
|
||||
recordedCallJobs := make(chan callJob, workerCount*10)
|
||||
|
||||
var wg sync.WaitGroup
|
||||
var callWg sync.WaitGroup
|
||||
|
||||
// Start Estimated Call workers
|
||||
for w := 0; w < workerCount; w++ {
|
||||
callWg.Add(1)
|
||||
go func() {
|
||||
defer callWg.Done()
|
||||
for job := range estimatedCallJobs {
|
||||
id, action, err := database.InsertOrUpdateEstimatedCall(ctx, db, job.values, valkeyClient)
|
||||
if err != nil {
|
||||
log.Printf("Error inserting/updating estimated call: %v\n", err)
|
||||
continue
|
||||
}
|
||||
if action == "insert" {
|
||||
atomic.AddInt64(&estimatedCallInsertCount, 1)
|
||||
} else if action == "update" {
|
||||
atomic.AddInt64(&estimatedCallUpdateCount, 1)
|
||||
} else if action == "none" {
|
||||
atomic.AddInt64(&estimatedCallNoneCount, 1)
|
||||
}
|
||||
_ = id
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
// Start Recorded Call workers
|
||||
for w := 0; w < workerCount; w++ {
|
||||
callWg.Add(1)
|
||||
go func() {
|
||||
defer callWg.Done()
|
||||
for job := range recordedCallJobs {
|
||||
id, action, err := database.InsertOrUpdateRecordedCall(ctx, db, job.values, valkeyClient)
|
||||
if err != nil {
|
||||
log.Printf("Error inserting/updating recorded call: %v\n", err)
|
||||
continue
|
||||
}
|
||||
if action == "insert" {
|
||||
atomic.AddInt64(&recordedCallInsertCount, 1)
|
||||
} else if action == "update" {
|
||||
atomic.AddInt64(&recordedCallUpdateCount, 1)
|
||||
} else if action == "none" {
|
||||
atomic.AddInt64(&recordedCallNoneCount, 1)
|
||||
}
|
||||
_ = id
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
// Start EVJ workers
|
||||
for w := 0; w < workerCount; w++ {
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
for job := range evjJobs {
|
||||
journey := &journeys[job.index]
|
||||
|
||||
// Prepare values
|
||||
var values []interface{}
|
||||
var datedVehicleJourneyRef, otherJson string
|
||||
|
||||
values = append(values, sid)
|
||||
values = append(values, journey.RecordedAtTime)
|
||||
values = append(values, journey.LineRef)
|
||||
values = append(values, strings.ToLower(journey.DirectionRef))
|
||||
values = append(values, journey.DataSource)
|
||||
|
||||
if journey.FramedVehicleJourneyRef.DatedVehicleJourneyRef != "" {
|
||||
datedVehicleJourneyRef = journey.FramedVehicleJourneyRef.DatedVehicleJourneyRef
|
||||
} else if journey.DatedVehicleJourneyRef != "" {
|
||||
datedVehicleJourneyRef = journey.DatedVehicleJourneyRef
|
||||
} else {
|
||||
datedVehicleJourneyRef = "evj." + journey.EstimatedVehicleJourneyCode
|
||||
}
|
||||
values = append(values, datedVehicleJourneyRef)
|
||||
|
||||
values = append(values, journey.VehicleMode)
|
||||
values = append(values, journey.FramedVehicleJourneyRef.DataFrameRef)
|
||||
values = append(values, journey.OriginRef)
|
||||
values = append(values, journey.DestinationRef)
|
||||
values = append(values, journey.OperatorRef)
|
||||
values = append(values, journey.VehicleRef)
|
||||
values = append(values, journey.Cancellation)
|
||||
|
||||
// Create JSON object
|
||||
jsonObject := make(map[string]interface{})
|
||||
if journey.OriginName != "" {
|
||||
jsonObject["OriginName"] = journey.OriginName
|
||||
}
|
||||
if journey.DestinationName != "" {
|
||||
jsonObject["DestinationName"] = journey.DestinationName
|
||||
}
|
||||
if journey.ProductCategoryRef != "" {
|
||||
jsonObject["ProductCategoryRef"] = journey.ProductCategoryRef
|
||||
}
|
||||
if journey.ServiceFeatureRef != "" {
|
||||
jsonObject["ServiceFeatureRef"] = journey.ServiceFeatureRef
|
||||
}
|
||||
if journey.Monitored != "" {
|
||||
jsonObject["Monitored"] = journey.Monitored
|
||||
}
|
||||
if journey.JourneyPatternRef != "" {
|
||||
jsonObject["JourneyPatternRef"] = journey.JourneyPatternRef
|
||||
}
|
||||
if journey.JourneyPatternName != "" {
|
||||
jsonObject["JourneyPatternName"] = journey.JourneyPatternName
|
||||
}
|
||||
if journey.PublishedLineName != "" {
|
||||
jsonObject["PublishedLineName"] = journey.PublishedLineName
|
||||
}
|
||||
if journey.DirectionName != "" {
|
||||
jsonObject["DirectionName"] = journey.DirectionName
|
||||
}
|
||||
if journey.OriginAimedDepartureTime != "" {
|
||||
jsonObject["OriginAimedDepartureTime"] = journey.OriginAimedDepartureTime
|
||||
}
|
||||
if journey.DestinationAimedArrivalTime != "" {
|
||||
jsonObject["DestinationAimedArrivalTime"] = journey.DestinationAimedArrivalTime
|
||||
}
|
||||
if journey.BlockRef != "" {
|
||||
jsonObject["BlockRef"] = journey.BlockRef
|
||||
}
|
||||
if journey.VehicleJourneyRef != "" {
|
||||
jsonObject["VehicleJourneyRef"] = journey.VehicleJourneyRef
|
||||
}
|
||||
if journey.Occupancy != "" {
|
||||
jsonObject["Occupancy"] = journey.Occupancy
|
||||
}
|
||||
if journey.DestinationDisplayAtOrigin != "" {
|
||||
jsonObject["DestinationDisplayAtOrigin"] = journey.DestinationDisplayAtOrigin
|
||||
}
|
||||
if journey.ExtraJourney != "" {
|
||||
jsonObject["ExtraJourney"] = journey.ExtraJourney
|
||||
}
|
||||
if journey.RouteRef != "" {
|
||||
jsonObject["RouteRef"] = journey.RouteRef
|
||||
}
|
||||
if journey.GroupOfLinesRef != "" {
|
||||
jsonObject["GroupOfLinesRef"] = journey.GroupOfLinesRef
|
||||
}
|
||||
if journey.ExternalLineRef != "" {
|
||||
jsonObject["ExternalLineRef"] = journey.ExternalLineRef
|
||||
}
|
||||
if journey.InCongestion != "" {
|
||||
jsonObject["InCongestion"] = journey.InCongestion
|
||||
}
|
||||
if journey.PredictionInaccurate != "" {
|
||||
jsonObject["PredictionInaccurate"] = journey.PredictionInaccurate
|
||||
}
|
||||
if journey.JourneyNote != "" {
|
||||
jsonObject["JourneyNote"] = journey.JourneyNote
|
||||
}
|
||||
if journey.Via.PlaceName != "" {
|
||||
jsonObject["Via"] = journey.Via.PlaceName
|
||||
}
|
||||
|
||||
jsonString, err := json.Marshal(jsonObject)
|
||||
if err != nil {
|
||||
log.Printf("Error marshaling JSON: %v\n", err)
|
||||
continue
|
||||
}
|
||||
otherJson = string(jsonString)
|
||||
values = append(values, otherJson)
|
||||
|
||||
// Insert or update EVJ
|
||||
id, action, err := database.InsertOrUpdateEstimatedVehicleJourney(db, values)
|
||||
if err != nil {
|
||||
log.Printf("Error inserting/updating estimated vehicle journey: %v\n", err)
|
||||
continue
|
||||
}
|
||||
|
||||
if action == "insert" {
|
||||
atomic.AddInt64(&insertCount, 1)
|
||||
} else if action == "update" {
|
||||
atomic.AddInt64(&updateCount, 1)
|
||||
}
|
||||
|
||||
// Progress reporting
|
||||
total := atomic.AddInt64(&insertCount, 0) + atomic.AddInt64(&updateCount, 0)
|
||||
if total%1000 == 0 {
|
||||
fmt.Printf(
|
||||
"EVJ - I: %d, U: %d, Total: %d; EstCalls - I: %d U: %d N: %d; RecCalls - I: %d U: %d N: %d\n",
|
||||
atomic.LoadInt64(&insertCount),
|
||||
atomic.LoadInt64(&updateCount),
|
||||
total,
|
||||
atomic.LoadInt64(&estimatedCallInsertCount),
|
||||
atomic.LoadInt64(&estimatedCallUpdateCount),
|
||||
atomic.LoadInt64(&estimatedCallNoneCount),
|
||||
atomic.LoadInt64(&recordedCallInsertCount),
|
||||
atomic.LoadInt64(&recordedCallUpdateCount),
|
||||
atomic.LoadInt64(&recordedCallNoneCount),
|
||||
)
|
||||
}
|
||||
|
||||
// Process Estimated Calls
|
||||
for _, estimatedCall := range journey.EstimatedCalls {
|
||||
for _, call := range estimatedCall.EstimatedCall {
|
||||
var estimatedValues []interface{}
|
||||
|
||||
estimatedValues = append(estimatedValues, id)
|
||||
estimatedValues = append(estimatedValues, call.Order)
|
||||
estimatedValues = append(estimatedValues, call.StopPointRef)
|
||||
estimatedValues = append(estimatedValues, call.AimedDepartureTime)
|
||||
estimatedValues = append(estimatedValues, call.ExpectedDepartureTime)
|
||||
estimatedValues = append(estimatedValues, call.AimedArrivalTime)
|
||||
estimatedValues = append(estimatedValues, call.ExpectedArrivalTime)
|
||||
estimatedValues = append(estimatedValues, call.Cancellation)
|
||||
|
||||
// estimated_data JSON
|
||||
estimatedJsonObject := make(map[string]interface{})
|
||||
if call.ExpectedDepartureTime != "" {
|
||||
estimatedJsonObject["ExpectedDepartureTime"] = call.ExpectedDepartureTime
|
||||
}
|
||||
if call.ExpectedArrivalTime != "" {
|
||||
estimatedJsonObject["ExpectedArrivalTime"] = call.ExpectedArrivalTime
|
||||
}
|
||||
if call.Cancellation != "" {
|
||||
estimatedJsonObject["Cancellation"] = call.Cancellation
|
||||
}
|
||||
if call.StopPointName != "" {
|
||||
estimatedJsonObject["StopPointName"] = call.StopPointName
|
||||
}
|
||||
if call.RequestStop != "" {
|
||||
estimatedJsonObject["RequestStop"] = call.RequestStop
|
||||
}
|
||||
if call.DepartureStatus != "" {
|
||||
estimatedJsonObject["DepartureStatus"] = call.DepartureStatus
|
||||
}
|
||||
if call.DeparturePlatformName != "" {
|
||||
estimatedJsonObject["DeparturePlatformName"] = call.DeparturePlatformName
|
||||
}
|
||||
if call.DepartureBoardingActivity != "" {
|
||||
estimatedJsonObject["DepartureBoardingActivity"] = call.DepartureBoardingActivity
|
||||
}
|
||||
if call.DepartureStopAssignment.AimedQuayRef != "" {
|
||||
estimatedJsonObject["DepartureStopAssignment.AimedQuayRef"] = call.DepartureStopAssignment.AimedQuayRef
|
||||
}
|
||||
if call.DepartureStopAssignment.ExpectedQuayRef != "" {
|
||||
estimatedJsonObject["DepartureStopAssignment.ExpectedQuayRef"] = call.DepartureStopAssignment.ExpectedQuayRef
|
||||
}
|
||||
if call.DepartureStopAssignment.ActualQuayRef != "" {
|
||||
estimatedJsonObject["DepartureStopAssignment.ActualQuayRef"] = call.DepartureStopAssignment.ActualQuayRef
|
||||
}
|
||||
if call.Extensions.StopsAtAirport != "" {
|
||||
estimatedJsonObject["Extensions.StopsAtAirport"] = call.Extensions.StopsAtAirport
|
||||
}
|
||||
if call.ArrivalStatus != "" {
|
||||
estimatedJsonObject["ArrivalStatus"] = call.ArrivalStatus
|
||||
}
|
||||
if call.ArrivalPlatformName != "" {
|
||||
estimatedJsonObject["ArrivalPlatformName"] = call.ArrivalPlatformName
|
||||
}
|
||||
if call.ArrivalBoardingActivity != "" {
|
||||
estimatedJsonObject["ArrivalBoardingActivity"] = call.ArrivalBoardingActivity
|
||||
}
|
||||
if call.ArrivalStopAssignment.AimedQuayRef != "" {
|
||||
estimatedJsonObject["ArrivalStopAssignment.AimedQuayRef"] = call.ArrivalStopAssignment.AimedQuayRef
|
||||
}
|
||||
if call.ArrivalStopAssignment.ExpectedQuayRef != "" {
|
||||
estimatedJsonObject["ArrivalStopAssignment.ExpectedQuayRef"] = call.ArrivalStopAssignment.ExpectedQuayRef
|
||||
}
|
||||
if call.ArrivalStopAssignment.ActualQuayRef != "" {
|
||||
estimatedJsonObject["ArrivalStopAssignment.ActualQuayRef"] = call.ArrivalStopAssignment.ActualQuayRef
|
||||
}
|
||||
if call.CallNote != "" {
|
||||
estimatedJsonObject["CallNote"] = call.CallNote
|
||||
}
|
||||
if call.DestinationDisplay != "" {
|
||||
estimatedJsonObject["DestinationDisplay"] = call.DestinationDisplay
|
||||
}
|
||||
if call.ExpectedDeparturePredictionQuality.PredictionLevel != "" {
|
||||
estimatedJsonObject["ExpectedDeparturePredictionQuality.PredictionLevel"] = call.ExpectedDeparturePredictionQuality.PredictionLevel
|
||||
}
|
||||
if call.ExpectedArrivalPredictionQuality.PredictionLevel != "" {
|
||||
estimatedJsonObject["ExpectedArrivalPredictionQuality.PredictionLevel"] = call.ExpectedArrivalPredictionQuality.PredictionLevel
|
||||
}
|
||||
if call.TimingPoint != "" {
|
||||
estimatedJsonObject["TimingPoint"] = call.TimingPoint
|
||||
}
|
||||
if call.SituationRef != "" {
|
||||
estimatedJsonObject["SituationRef"] = call.SituationRef
|
||||
}
|
||||
if call.PredictionInaccurate != "" {
|
||||
estimatedJsonObject["PredictionInaccurate"] = call.PredictionInaccurate
|
||||
}
|
||||
if call.Occupancy != "" {
|
||||
estimatedJsonObject["Occupancy"] = call.Occupancy
|
||||
}
|
||||
|
||||
jsonString, err := json.Marshal(estimatedJsonObject)
|
||||
if err != nil {
|
||||
log.Printf("Error marshaling estimated call JSON: %v\n", err)
|
||||
continue
|
||||
}
|
||||
estimatedValues = append(estimatedValues, string(jsonString))
|
||||
|
||||
// Convert to string values
|
||||
interfaceValues := make([]interface{}, len(estimatedValues))
|
||||
for i, v := range estimatedValues {
|
||||
interfaceValues[i] = fmt.Sprintf("%v", v)
|
||||
}
|
||||
|
||||
// Send to worker pool
|
||||
estimatedCallJobs <- callJob{evjID: id, values: interfaceValues}
|
||||
}
|
||||
}
|
||||
|
||||
// Process Recorded Calls
|
||||
for _, recordedCall := range journey.RecordedCalls {
|
||||
for _, call := range recordedCall.RecordedCall {
|
||||
var recordedValues []interface{}
|
||||
|
||||
recordedValues = append(recordedValues, id)
|
||||
recordedValues = append(recordedValues, call.Order)
|
||||
recordedValues = append(recordedValues, call.StopPointRef)
|
||||
recordedValues = append(recordedValues, call.AimedDepartureTime)
|
||||
recordedValues = append(recordedValues, call.ExpectedDepartureTime)
|
||||
recordedValues = append(recordedValues, call.AimedArrivalTime)
|
||||
recordedValues = append(recordedValues, call.ExpectedArrivalTime)
|
||||
recordedValues = append(recordedValues, call.Cancellation)
|
||||
recordedValues = append(recordedValues, call.ActualDepartureTime)
|
||||
recordedValues = append(recordedValues, call.ActualArrivalTime)
|
||||
|
||||
// recorded_data JSON
|
||||
recordedJsonObject := make(map[string]interface{})
|
||||
if call.StopPointName != "" {
|
||||
recordedJsonObject["StopPointName"] = call.StopPointName
|
||||
}
|
||||
if call.ArrivalPlatformName != "" {
|
||||
recordedJsonObject["ArrivalPlatformName"] = call.ArrivalPlatformName
|
||||
}
|
||||
if call.DeparturePlatformName != "" {
|
||||
recordedJsonObject["DeparturePlatformName"] = call.DeparturePlatformName
|
||||
}
|
||||
if call.PredictionInaccurate != "" {
|
||||
recordedJsonObject["PredictionInaccurate"] = call.PredictionInaccurate
|
||||
}
|
||||
if call.Occupancy != "" {
|
||||
recordedJsonObject["Occupancy"] = call.Occupancy
|
||||
}
|
||||
|
||||
jsonString, err := json.Marshal(recordedJsonObject)
|
||||
if err != nil {
|
||||
log.Printf("Error marshaling recorded call JSON: %v\n", err)
|
||||
continue
|
||||
}
|
||||
recordedValues = append(recordedValues, string(jsonString))
|
||||
|
||||
// Convert to string values
|
||||
interfaceValues := make([]interface{}, len(recordedValues))
|
||||
for i, v := range recordedValues {
|
||||
interfaceValues[i] = fmt.Sprintf("%v", v)
|
||||
}
|
||||
|
||||
// Send to worker pool
|
||||
recordedCallJobs <- callJob{evjID: id, values: interfaceValues}
|
||||
}
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
// Send all EVJ jobs
|
||||
for i := range journeys {
|
||||
evjJobs <- evjJob{index: i}
|
||||
}
|
||||
close(evjJobs)
|
||||
|
||||
// Wait for EVJ processing to complete
|
||||
wg.Wait()
|
||||
|
||||
// Close call job channels and wait for call processing to complete
|
||||
close(estimatedCallJobs)
|
||||
close(recordedCallJobs)
|
||||
callWg.Wait()
|
||||
|
||||
// Record end time
|
||||
endTime := time.Now()
|
||||
|
||||
// Print final stats
|
||||
fmt.Printf(
|
||||
"DONE: Inserts: %d, Updates: %d, Total: %d; estimatedCalls = I: %d U: %d N: %d; recordedCalls = I: %d U: %d N: %d\n",
|
||||
insertCount,
|
||||
updateCount,
|
||||
totalCount,
|
||||
estimatedCallInsertCount,
|
||||
estimatedCallUpdateCount,
|
||||
estimatedCallNoneCount,
|
||||
recordedCallInsertCount,
|
||||
recordedCallUpdateCount,
|
||||
recordedCallNoneCount,
|
||||
"\nDONE: EVJ - Inserts: %d, Updates: %d, Total: %d\n"+
|
||||
" EstimatedCalls - I: %d U: %d N: %d\n"+
|
||||
" RecordedCalls - I: %d U: %d N: %d\n",
|
||||
atomic.LoadInt64(&insertCount),
|
||||
atomic.LoadInt64(&updateCount),
|
||||
atomic.LoadInt64(&insertCount)+atomic.LoadInt64(&updateCount),
|
||||
atomic.LoadInt64(&estimatedCallInsertCount),
|
||||
atomic.LoadInt64(&estimatedCallUpdateCount),
|
||||
atomic.LoadInt64(&estimatedCallNoneCount),
|
||||
atomic.LoadInt64(&recordedCallInsertCount),
|
||||
atomic.LoadInt64(&recordedCallUpdateCount),
|
||||
atomic.LoadInt64(&recordedCallNoneCount),
|
||||
)
|
||||
|
||||
// Create map to hold JSON
|
||||
serviceDeliveryJsonObject := make(map[string]interface{})
|
||||
|
||||
// Add fields to JSON
|
||||
serviceDeliveryJsonObject["Inserts"] = insertCount
|
||||
serviceDeliveryJsonObject["Updates"] = updateCount
|
||||
serviceDeliveryJsonObject["EstimatedCallInserts"] = estimatedCallInsertCount
|
||||
serviceDeliveryJsonObject["EstimatedCallUpdates"] = estimatedCallUpdateCount
|
||||
serviceDeliveryJsonObject["EstimatedCallNone"] = estimatedCallNoneCount
|
||||
serviceDeliveryJsonObject["RecordedCallInserts"] = recordedCallInsertCount
|
||||
serviceDeliveryJsonObject["RecordedCallUpdates"] = recordedCallUpdateCount
|
||||
serviceDeliveryJsonObject["RecordedCallNone"] = recordedCallNoneCount
|
||||
serviceDeliveryJsonObject["Inserts"] = atomic.LoadInt64(&insertCount)
|
||||
serviceDeliveryJsonObject["Updates"] = atomic.LoadInt64(&updateCount)
|
||||
serviceDeliveryJsonObject["EstimatedCallInserts"] = atomic.LoadInt64(&estimatedCallInsertCount)
|
||||
serviceDeliveryJsonObject["EstimatedCallUpdates"] = atomic.LoadInt64(&estimatedCallUpdateCount)
|
||||
serviceDeliveryJsonObject["EstimatedCallNone"] = atomic.LoadInt64(&estimatedCallNoneCount)
|
||||
serviceDeliveryJsonObject["RecordedCallInserts"] = atomic.LoadInt64(&recordedCallInsertCount)
|
||||
serviceDeliveryJsonObject["RecordedCallUpdates"] = atomic.LoadInt64(&recordedCallUpdateCount)
|
||||
serviceDeliveryJsonObject["RecordedCallNone"] = atomic.LoadInt64(&recordedCallNoneCount)
|
||||
serviceDeliveryJsonObject["StartTime"] = startTime.Format(time.RFC3339)
|
||||
serviceDeliveryJsonObject["EndTime"] = endTime.Format(time.RFC3339)
|
||||
serviceDeliveryJsonObject["Duration"] = endTime.Sub(startTime).String()
|
||||
|
||||
// Convert JSON object to JSON string
|
||||
serviceDeliveryJsonString, err := json.Marshal(serviceDeliveryJsonObject)
|
||||
@@ -438,4 +492,6 @@ func DBData(data *data.Data) {
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
|
||||
fmt.Println("Finished with this ServiceDelivery!")
|
||||
}
|
||||
|
||||
13
main.go
13
main.go
@@ -2,7 +2,6 @@ package main
|
||||
|
||||
import (
|
||||
"log"
|
||||
"ti1/config"
|
||||
"ti1/data"
|
||||
"ti1/database"
|
||||
"ti1/export"
|
||||
@@ -10,17 +9,11 @@ import (
|
||||
)
|
||||
|
||||
func main() {
|
||||
log.Println("ti1 v0.2.1")
|
||||
log.Println("ti1 v1.0.2")
|
||||
log.Println("Starting...")
|
||||
|
||||
// Load configuration
|
||||
cfg, err := config.LoadConfig()
|
||||
if err != nil {
|
||||
log.Fatalf("Failed to load config: %v", err)
|
||||
}
|
||||
|
||||
// Setup the database
|
||||
err = database.SetupDB()
|
||||
err := database.SetupDB()
|
||||
if err != nil {
|
||||
log.Fatalf("Database setup failed: %v", err)
|
||||
}
|
||||
@@ -32,7 +25,7 @@ func main() {
|
||||
for {
|
||||
start := time.Now()
|
||||
|
||||
data, err := data.FetchData(starttimestamp, cfg.DatasetId, cfg.ExcludedDatasetIds)
|
||||
data, err := data.FetchData(starttimestamp)
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user