6 Commits
main ... dev

Author SHA1 Message Date
pigwin
1bcbbf6f44 dette var jeg dårlig på 2025-02-08 14:58:08 +00:00
pigwin
8f973853bd XD 2025-02-08 14:55:11 +00:00
pigwin
6632c38c0c stay code :P 2025-02-08 14:48:20 +00:00
pigwin
8c0bd734c6 Update config file path in main.go 2025-02-08 12:27:21 +00:00
pigwin
087b0ec637 me feal dum 2025-02-08 12:23:27 +00:00
pigwin
e90e35cfbc Add dataset ID and excluded dataset IDz 2025-02-08 12:04:39 +00:00
17 changed files with 530 additions and 763 deletions

View File

@@ -30,7 +30,7 @@ jobs:
- name: Get commit version - name: Get commit version
id: commit-version id: commit-version
run: | run: |
COMMIT_MSG=$(git log -1 --pretty=%B | head -n1 | xargs) COMMIT_MSG=$(git log -1 --pretty=%B)
echo "Commit message: $COMMIT_MSG" # Debugging output echo "Commit message: $COMMIT_MSG" # Debugging output
# Updated regex to handle both vX.Y, vX.Y.Z, and vX.Y-pre-release formats # Updated regex to handle both vX.Y, vX.Y.Z, and vX.Y-pre-release formats
if [[ "$COMMIT_MSG" =~ ^v[0-9]+\.[0-9]+(\.[0-9]+)?(-[a-zA-Z0-9._-]+)?$ ]]; then if [[ "$COMMIT_MSG" =~ ^v[0-9]+\.[0-9]+(\.[0-9]+)?(-[a-zA-Z0-9._-]+)?$ ]]; then

View File

@@ -1,5 +1,5 @@
# Use the official Golang image as the base image # Use the official Golang image as the base image
FROM golang:1.26.0 FROM golang:1.23.4
# Set the Current Working Directory inside the container # Set the Current Working Directory inside the container
WORKDIR /app WORKDIR /app

View File

@@ -138,7 +138,7 @@ nano postgres_data/postgresql.conf
Change the following values Change the following values
```conf ```conf
listen_addresses = '*' listen_addresses = '*'
max_connections = 200 max_connections = 100
shared_buffers = 16GB shared_buffers = 16GB
work_mem = 256MB work_mem = 256MB
maintenance_work_mem = 2GB maintenance_work_mem = 2GB

View File

@@ -10,9 +10,11 @@
"valkey": { "valkey": {
"host": "127.0.0.1", "host": "127.0.0.1",
"port": "6379", "port": "6379",
"max_conns": 100, "max_conns": 50,
"timeout_ms": 2000, "timeout_ms": 5000,
"password": "the_valkey_password" "password": "the_valkey_password"
}, },
"temp": "value" "temp": "value",
"dataset_id": "",
"excluded_dataset_ids": ""
} }

View File

@@ -7,6 +7,8 @@ import (
"strconv" "strconv"
) )
const configFilePath = "config/conf.json"
type Config struct { type Config struct {
Database struct { Database struct {
Host string `json:"host"` Host string `json:"host"`
@@ -21,14 +23,16 @@ type Config struct {
Port string `json:"port"` Port string `json:"port"`
MaxConns int `json:"max_conns"` MaxConns int `json:"max_conns"`
TimeoutMs int `json:"timeout_ms"` TimeoutMs int `json:"timeout_ms"`
Password string `json:"password"` // Add this line Password string `json:"password"`
} `json:"valkey"` } `json:"valkey"`
Temp string `json:"temp"` Temp string `json:"temp"`
DatasetId string `json:"dataset_id"`
ExcludedDatasetIds string `json:"excluded_dataset_ids"`
} }
func LoadConfig(file string) (Config, error) { func LoadConfig() (Config, error) {
var config Config var config Config
configFile, err := os.Open(file) configFile, err := os.Open(configFilePath)
if err != nil { if err != nil {
return config, fmt.Errorf("failed to open config file: %w", err) return config, fmt.Errorf("failed to open config file: %w", err)
} }
@@ -80,5 +84,13 @@ func LoadConfig(file string) (Config, error) {
} }
} }
// Override datasetId and excludedDatasetIds with environment variables
if datasetId := os.Getenv("DATASET_ID"); datasetId != "" {
config.DatasetId = datasetId
}
if excludedDatasetIds := os.Getenv("EXCLUDED_DATASET_IDS"); excludedDatasetIds != "" {
config.ExcludedDatasetIds = excludedDatasetIds
}
return config, nil return config, nil
} }

View File

@@ -11,7 +11,7 @@ import (
func ConnectToPostgreSQL() (*sql.DB, error) { func ConnectToPostgreSQL() (*sql.DB, error) {
fmt.Println("Connecting to PostgreSQL...") fmt.Println("Connecting to PostgreSQL...")
config, err := LoadConfig("config/conf.json") config, err := LoadConfig()
if err != nil { if err != nil {
return nil, err return nil, err
} }
@@ -27,11 +27,10 @@ func ConnectToPostgreSQL() (*sql.DB, error) {
return nil, err return nil, err
} }
// Set connection pool settings for high concurrency // Set connection pool settings
db.SetMaxOpenConns(50) // Maximum number of open connections to the database db.SetMaxOpenConns(25) // Maximum number of open connections to the database
db.SetMaxIdleConns(25) // Maximum number of connections in the idle connection pool db.SetMaxIdleConns(25) // Maximum number of connections in the idle connection pool
db.SetConnMaxLifetime(1 * time.Hour) // Maximum amount of time a connection may be reused db.SetConnMaxLifetime(1 * time.Hour) // Maximum amount of time a connection may be reused
db.SetConnMaxIdleTime(5 * time.Minute) // Maximum amount of time a connection may be idle
fmt.Println("Connection to PostgreSQL opened successfully :D") fmt.Println("Connection to PostgreSQL opened successfully :D")
@@ -58,7 +57,7 @@ func DisconnectFromPostgreSQL(db *sql.DB) error {
} }
func PrintDBConfig() { func PrintDBConfig() {
config, err := LoadConfig("config/conf.json") config, err := LoadConfig()
if err != nil { if err != nil {
fmt.Println("Error loading config:", err) fmt.Println("Error loading config:", err)
return return

View File

@@ -17,7 +17,7 @@ type ValkeyConfig struct {
Port string `json:"port"` Port string `json:"port"`
MaxConns int `json:"max_conns"` MaxConns int `json:"max_conns"`
TimeoutMs int `json:"timeout_ms"` TimeoutMs int `json:"timeout_ms"`
Password string `json:"password"` // Add this line Password string `json:"password"`
} }
func LoadValkeyConfig(file string) (ValkeyConfig, error) { func LoadValkeyConfig(file string) (ValkeyConfig, error) {
@@ -56,9 +56,9 @@ func LoadValkeyConfig(file string) (ValkeyConfig, error) {
return config, nil return config, nil
} }
func ConnectToValkey(configPath string) (valkey.Client, error) { func ConnectToValkey() (valkey.Client, error) {
fmt.Println("Loading configuration...") fmt.Println("Loading configuration...")
config, err := LoadConfig(configPath) config, err := LoadConfig()
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to load config: %v", err) return nil, fmt.Errorf("failed to load config: %v", err)
} }

View File

@@ -1,12 +1,10 @@
package data package data
import ( import (
"crypto/tls"
"encoding/xml" "encoding/xml"
"fmt"
"log" "log"
"net/http" "net/http"
"time" "strings"
) )
type Data struct { type Data struct {
@@ -129,87 +127,31 @@ type Data struct {
} `xml:"ServiceDelivery"` } `xml:"ServiceDelivery"`
} }
func FetchData(timestamp string) (*Data, error) { func FetchData(timestamp, datasetId, excludedDatasetIds string) (*Data, error) {
// Configure HTTP client with timeout and HTTP/1.1 to avoid HTTP/2 stream errors client := &http.Client{}
transport := &http.Transport{
TLSClientConfig: &tls.Config{
MinVersion: tls.VersionTLS12,
},
MaxIdleConns: 10,
MaxIdleConnsPerHost: 10,
IdleConnTimeout: 90 * time.Second,
DisableCompression: false,
ForceAttemptHTTP2: false, // Disable HTTP/2 to avoid stream errors
TLSHandshakeTimeout: 10 * time.Second,
ResponseHeaderTimeout: 30 * time.Second,
ExpectContinueTimeout: 1 * time.Second,
}
client := &http.Client{
Transport: transport,
Timeout: 180 * time.Second, // 3 minute timeout for large datasets
}
requestorId := "ti1-" + timestamp requestorId := "ti1-" + timestamp
url := "https://api.entur.io/realtime/v1/rest/et?useOriginalId=true&maxSize=100000&requestorId=" + requestorId
// Retry logic for transient failures baseURL := "https://api.entur.io/realtime/v1/rest/et?useOriginalId=true&maxSize=100000&requestorId=" + requestorId
var resp *http.Response
var err error
var data *Data
maxRetries := 3
for attempt := 1; attempt <= maxRetries; attempt++ { if datasetId != "" {
log.Printf("Fetching data from URL (attempt %d/%d): %s", attempt, maxRetries, url) baseURL += "&datasetId=" + datasetId
} else if excludedDatasetIds != "" {
resp, err = client.Get(url) baseURL += "&excludedDatasetIds=" + strings.ReplaceAll(excludedDatasetIds, ",", "&excludedDatasetIds=")
if err != nil {
log.Printf("Request failed: %v", err)
if attempt < maxRetries {
waitTime := time.Duration(attempt*2) * time.Second
log.Printf("Retrying in %v...", waitTime)
time.Sleep(waitTime)
}
continue
} }
// Check HTTP status code log.Println("Fetching data from URL:", baseURL)
if resp.StatusCode != http.StatusOK { resp, err := client.Get(baseURL)
resp.Body.Close()
err = fmt.Errorf("HTTP error: %s (status code: %d)", resp.Status, resp.StatusCode)
log.Printf("%v", err)
if attempt < maxRetries {
waitTime := time.Duration(attempt*2) * time.Second
log.Printf("Retrying in %v...", waitTime)
time.Sleep(waitTime)
}
continue
}
// Try to decode the response
data = &Data{}
decoder := xml.NewDecoder(resp.Body)
err = decoder.Decode(data)
resp.Body.Close()
if err != nil {
log.Printf("Failed to decode XML: %v", err)
if attempt < maxRetries {
waitTime := time.Duration(attempt*2) * time.Second
log.Printf("Retrying in %v...", waitTime)
time.Sleep(waitTime)
}
continue
}
// Success!
log.Printf("Successfully fetched and decoded data")
return data, nil
}
// All retries failed
if err != nil { if err != nil {
return nil, err return nil, err
} }
return nil, fmt.Errorf("Failed to fetch data after %d attempts", maxRetries) defer resp.Body.Close()
data := &Data{}
decoder := xml.NewDecoder(resp.Body)
err = decoder.Decode(data)
if err != nil {
return nil, err
}
return data, nil
} }

View File

@@ -6,18 +6,11 @@ import (
"database/sql" "database/sql"
"encoding/hex" "encoding/hex"
"fmt" "fmt"
"sync"
"ti1/valki" "ti1/valki"
"github.com/valkey-io/valkey-go" "github.com/valkey-io/valkey-go"
) )
type CallResult struct {
ID int
Action string
Error error
}
func InsertOrUpdateEstimatedCall(ctx context.Context, db *sql.DB, values []interface{}, valkeyClient valkey.Client) (int, string, error) { func InsertOrUpdateEstimatedCall(ctx context.Context, db *sql.DB, values []interface{}, valkeyClient valkey.Client) (int, string, error) {
// Replace empty strings with nil for timestamp fields // Replace empty strings with nil for timestamp fields
for i, v := range values { for i, v := range values {
@@ -35,15 +28,19 @@ func InsertOrUpdateEstimatedCall(ctx context.Context, db *sql.DB, values []inter
} }
hash := md5.Sum([]byte(valuesString)) hash := md5.Sum([]byte(valuesString))
hashString := hex.EncodeToString(hash[:]) hashString := hex.EncodeToString(hash[:])
//fmt.Println("HashString:", hashString)
estimatedVehicleJourneyID := values[0] estimatedVehicleJourneyID := values[0]
orderID := values[1] orderID := values[1]
key := fmt.Sprintf("%v.%v", estimatedVehicleJourneyID, orderID) key := fmt.Sprintf("%v.%v", estimatedVehicleJourneyID, orderID)
//fmt.Printf("Estimated Vehicle Journey ID: %v, Order ID: %v\n", estimatedVehicleJourneyID, orderID)
var err error
// Get the MD5 hash from Valkey // Get the MD5 hash from Valkey
retrievedHash, err := valki.GetValkeyValue(ctx, valkeyClient, key) retrievedHash, err := valki.GetValkeyValue(ctx, valkeyClient, key)
if err != nil { if err != nil {
return 0, "", fmt.Errorf("failed to get value from Valkey: %w", err) return 0, "", fmt.Errorf("failed to get value from Valkey: %v", err)
} }
// Check if the retrieved value matches the original MD5 hash // Check if the retrieved value matches the original MD5 hash
@@ -67,60 +64,26 @@ func InsertOrUpdateEstimatedCall(ctx context.Context, db *sql.DB, values []inter
estimated_data = EXCLUDED.estimated_data estimated_data = EXCLUDED.estimated_data
RETURNING CASE WHEN xmax = 0 THEN 'insert' ELSE 'update' END, id; RETURNING CASE WHEN xmax = 0 THEN 'insert' ELSE 'update' END, id;
` `
stmt, err := db.Prepare(query)
if err != nil {
return 0, "", fmt.Errorf("error preparing statement: %v", err)
}
defer stmt.Close()
err = valki.SetValkeyValue(ctx, valkeyClient, key, hashString) err = valki.SetValkeyValue(ctx, valkeyClient, key, hashString)
if err != nil { if err != nil {
return 0, "", fmt.Errorf("failed to set value in Valkey: %w", err) return 0, "", fmt.Errorf("failed to set value in Valkey: %v", err)
} }
var action string var action string
var id int var id int
err = db.QueryRowContext(ctx, query, values...).Scan(&action, &id) err = stmt.QueryRow(values...).Scan(&action, &id)
if err != nil { if err != nil {
return 0, "", fmt.Errorf("error executing statement: %w", err) return 0, "", fmt.Errorf("error executing statement: %v", err)
} }
return id, action, nil return id, action, nil
} } else {
//fmt.Printf("MATCH!!! Original Hash: %s, Retrieved Hash: %s\n", hashString, retrievedHash)
return 0, "none", nil return 0, "none", nil
} }
// BatchInsertEstimatedCalls processes multiple estimated calls concurrently
func BatchInsertEstimatedCalls(ctx context.Context, db *sql.DB, batch [][]interface{}, valkeyClient valkey.Client, workerCount int) ([]CallResult, error) {
if len(batch) == 0 {
return nil, nil
}
results := make([]CallResult, len(batch))
jobs := make(chan int, len(batch))
var wg sync.WaitGroup
// Start workers
for w := 0; w < workerCount; w++ {
wg.Add(1)
go func() {
defer wg.Done()
for idx := range jobs {
select {
case <-ctx.Done():
return
default:
id, action, err := InsertOrUpdateEstimatedCall(ctx, db, batch[idx], valkeyClient)
results[idx] = CallResult{
ID: id,
Action: action,
Error: err,
}
}
}
}()
}
// Send jobs
for i := range batch {
jobs <- i
}
close(jobs)
wg.Wait()
return results, nil
} }

View File

@@ -1,68 +1,10 @@
package database package database
import ( import (
"context"
"database/sql" "database/sql"
"fmt" "fmt"
"sync"
) )
type EVJResult struct {
ID int
Action string
Error error
Index int // To maintain order
}
// PreparedStatements holds reusable prepared statements
type PreparedStatements struct {
evjStmt *sql.Stmt
ecStmt *sql.Stmt
rcStmt *sql.Stmt
mu sync.Mutex
}
func NewPreparedStatements(db *sql.DB) (*PreparedStatements, error) {
evjQuery := `
INSERT INTO estimatedvehiclejourney (servicedelivery, recordedattime, lineref, directionref, datasource, datedvehiclejourneyref, vehiclemode, dataframeref, originref, destinationref, operatorref, vehicleref, cancellation, other, firstservicedelivery)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $1)
ON CONFLICT (lineref, directionref, datasource, datedvehiclejourneyref)
DO UPDATE SET
servicedelivery = EXCLUDED.servicedelivery,
recordedattime = EXCLUDED.recordedattime,
vehiclemode = COALESCE(EXCLUDED.vehiclemode, estimatedvehiclejourney.vehiclemode),
dataframeref = COALESCE(EXCLUDED.dataframeref, estimatedvehiclejourney.dataframeref),
originref = COALESCE(EXCLUDED.originref, estimatedvehiclejourney.originref),
destinationref = COALESCE(EXCLUDED.destinationref, estimatedvehiclejourney.destinationref),
operatorref = COALESCE(EXCLUDED.operatorref, estimatedvehiclejourney.operatorref),
vehicleref = COALESCE(EXCLUDED.vehicleref, estimatedvehiclejourney.vehicleref),
cancellation = COALESCE(EXCLUDED.cancellation, estimatedvehiclejourney.cancellation),
other = COALESCE(EXCLUDED.other, estimatedvehiclejourney.other)
RETURNING CASE WHEN xmax = 0 THEN 'insert' ELSE 'update' END, id;
`
evjStmt, err := db.Prepare(evjQuery)
if err != nil {
return nil, fmt.Errorf("failed to prepare EVJ statement: %w", err)
}
return &PreparedStatements{
evjStmt: evjStmt,
}, nil
}
func (ps *PreparedStatements) Close() {
if ps.evjStmt != nil {
ps.evjStmt.Close()
}
if ps.ecStmt != nil {
ps.ecStmt.Close()
}
if ps.rcStmt != nil {
ps.rcStmt.Close()
}
}
func InsertOrUpdateEstimatedVehicleJourney(db *sql.DB, values []interface{}) (int, string, error) { func InsertOrUpdateEstimatedVehicleJourney(db *sql.DB, values []interface{}) (int, string, error) {
query := ` query := `
INSERT INTO estimatedvehiclejourney (servicedelivery, recordedattime, lineref, directionref, datasource, datedvehiclejourneyref, vehiclemode, dataframeref, originref, destinationref, operatorref, vehicleref, cancellation, other, firstservicedelivery) INSERT INTO estimatedvehiclejourney (servicedelivery, recordedattime, lineref, directionref, datasource, datedvehiclejourneyref, vehiclemode, dataframeref, originref, destinationref, operatorref, vehicleref, cancellation, other, firstservicedelivery)
@@ -82,54 +24,18 @@ func InsertOrUpdateEstimatedVehicleJourney(db *sql.DB, values []interface{}) (in
RETURNING CASE WHEN xmax = 0 THEN 'insert' ELSE 'update' END, id; RETURNING CASE WHEN xmax = 0 THEN 'insert' ELSE 'update' END, id;
` `
stmt, err := db.Prepare(query)
if err != nil {
return 0, "", fmt.Errorf("error preparing statement: %v", err)
}
defer stmt.Close()
var action string var action string
var id int var id int
err := db.QueryRow(query, values...).Scan(&action, &id) err = stmt.QueryRow(values...).Scan(&action, &id)
if err != nil { if err != nil {
return 0, "", fmt.Errorf("error executing EVJ statement: %w", err) return 0, "", fmt.Errorf("error executing statement: %v", err)
} }
return id, action, nil return id, action, nil
} }
// BatchInsertEVJ processes multiple EVJ inserts concurrently
func BatchInsertEVJ(ctx context.Context, db *sql.DB, batch [][]interface{}, workerCount int) ([]EVJResult, error) {
if len(batch) == 0 {
return nil, nil
}
results := make([]EVJResult, len(batch))
jobs := make(chan int, len(batch))
var wg sync.WaitGroup
// Start workers
for w := 0; w < workerCount; w++ {
wg.Add(1)
go func() {
defer wg.Done()
for idx := range jobs {
select {
case <-ctx.Done():
return
default:
id, action, err := InsertOrUpdateEstimatedVehicleJourney(db, batch[idx])
results[idx] = EVJResult{
ID: id,
Action: action,
Error: err,
Index: idx,
}
}
}
}()
}
// Send jobs
for i := range batch {
jobs <- i
}
close(jobs)
wg.Wait()
return results, nil
}

View File

@@ -6,7 +6,6 @@ import (
"database/sql" "database/sql"
"encoding/hex" "encoding/hex"
"fmt" "fmt"
"sync"
"ti1/valki" "ti1/valki"
"github.com/valkey-io/valkey-go" "github.com/valkey-io/valkey-go"
@@ -34,10 +33,12 @@ func InsertOrUpdateRecordedCall(ctx context.Context, db *sql.DB, values []interf
orderID := values[1] orderID := values[1]
key := fmt.Sprintf("%v.%v", estimatedVehicleJourneyID, orderID) key := fmt.Sprintf("%v.%v", estimatedVehicleJourneyID, orderID)
var err error
// Get the MD5 hash from Valkey // Get the MD5 hash from Valkey
retrievedHash, err := valki.GetValkeyValue(ctx, valkeyClient, key) retrievedHash, err := valki.GetValkeyValue(ctx, valkeyClient, key)
if err != nil { if err != nil {
return 0, "", fmt.Errorf("failed to get value from Valkey: %w", err) return 0, "", fmt.Errorf("failed to get value from Valkey: %v", err)
} }
// Check if the retrieved value matches the original MD5 hash // Check if the retrieved value matches the original MD5 hash
@@ -64,60 +65,25 @@ func InsertOrUpdateRecordedCall(ctx context.Context, db *sql.DB, values []interf
recorded_data = EXCLUDED.recorded_data recorded_data = EXCLUDED.recorded_data
RETURNING CASE WHEN xmax = 0 THEN 'insert' ELSE 'update' END, id; RETURNING CASE WHEN xmax = 0 THEN 'insert' ELSE 'update' END, id;
` `
stmt, err := db.Prepare(query)
if err != nil {
return 0, "", fmt.Errorf("error preparing statement: %v", err)
}
defer stmt.Close()
err = valki.SetValkeyValue(ctx, valkeyClient, key, hashString) err = valki.SetValkeyValue(ctx, valkeyClient, key, hashString)
if err != nil { if err != nil {
return 0, "", fmt.Errorf("failed to set value in Valkey: %w", err) return 0, "", fmt.Errorf("failed to set value in Valkey: %v", err)
} }
var action string var action string
var id int var id int
err = db.QueryRowContext(ctx, query, values...).Scan(&action, &id) err = stmt.QueryRow(values...).Scan(&action, &id)
if err != nil { if err != nil {
return 0, "", fmt.Errorf("error executing statement: %w", err) return 0, "", fmt.Errorf("error executing statement: %v", err)
} }
return id, action, nil return id, action, nil
} } else {
return 0, "none", nil return 0, "none", nil
} }
// BatchInsertRecordedCalls processes multiple recorded calls concurrently
func BatchInsertRecordedCalls(ctx context.Context, db *sql.DB, batch [][]interface{}, valkeyClient valkey.Client, workerCount int) ([]CallResult, error) {
if len(batch) == 0 {
return nil, nil
}
results := make([]CallResult, len(batch))
jobs := make(chan int, len(batch))
var wg sync.WaitGroup
// Start workers
for w := 0; w < workerCount; w++ {
wg.Add(1)
go func() {
defer wg.Done()
for idx := range jobs {
select {
case <-ctx.Done():
return
default:
id, action, err := InsertOrUpdateRecordedCall(ctx, db, batch[idx], valkeyClient)
results[idx] = CallResult{
ID: id,
Action: action,
Error: err,
}
}
}
}()
}
// Send jobs
for i := range batch {
jobs <- i
}
close(jobs)
wg.Wait()
return results, nil
} }

View File

@@ -3,21 +3,52 @@ package database
import ( import (
"database/sql" "database/sql"
"fmt" "fmt"
"ti1/config"
) )
func InsertServiceDelivery(db *sql.DB, responseTimestamp string, recordedAtTime string) (int, error) { func GetDatasetVariable(config config.Config) string {
var id int if config.DatasetId != "" {
err := db.QueryRow("INSERT INTO public.ServiceDelivery (ResponseTimestamp, RecordedAtTime) VALUES ($1, $2) RETURNING ID", responseTimestamp, recordedAtTime).Scan(&id) fmt.Println(config.DatasetId)
if err != nil { return config.DatasetId
return 0, fmt.Errorf("failed to insert service delivery: %w", err) } else if config.ExcludedDatasetIds != "" {
result := "EX." + config.ExcludedDatasetIds
fmt.Println(result)
return result
} }
fmt.Println("")
return ""
}
func InsertServiceDelivery(db *sql.DB, responseTimestamp string, recordedAtTime string) (int, error) {
fmt.Println("Inserting ServiceDelivery...")
var id int
// Load configuration
config, err := config.LoadConfig()
if err != nil {
fmt.Println("Error loading config:", err)
return 0, err
}
// Get dataset variable
datasetVariable := GetDatasetVariable(config)
err = db.QueryRow("INSERT INTO public.ServiceDelivery (ResponseTimestamp, RecordedAtTime, Source) VALUES ($1, $2, $3) RETURNING ID", responseTimestamp, recordedAtTime, datasetVariable).Scan(&id)
if err != nil {
fmt.Println(err)
return 0, err
}
//fmt.Println("ServiceDelivery inserted successfully! (", id, ")")
return id, nil return id, nil
} }
func UpdateServiceDeliveryData(db *sql.DB, id int, data string) error { func UpdateServiceDeliveryData(db *sql.DB, id int, data string) error {
fmt.Println("Updating ServiceDelivery data...")
_, err := db.Exec("UPDATE public.ServiceDelivery SET Data = $1 WHERE ID = $2", data, id) _, err := db.Exec("UPDATE public.ServiceDelivery SET Data = $1 WHERE ID = $2", data, id)
if err != nil { if err != nil {
return fmt.Errorf("failed to update service delivery data: %w", err) fmt.Println(err)
return err
} }
fmt.Println("Finished with this ServiceDelivery!")
return nil return nil
} }

View File

@@ -68,6 +68,7 @@ func SetupDB() error {
id INTEGER PRIMARY KEY DEFAULT nextval('public.servicedelivery_id_seq'), id INTEGER PRIMARY KEY DEFAULT nextval('public.servicedelivery_id_seq'),
responsetimestamp TIMESTAMPTZ, responsetimestamp TIMESTAMPTZ,
recordedattime TIMESTAMPTZ, recordedattime TIMESTAMPTZ,
source VARCHAR,
data JSON data JSON
);`, );`,
} }

View File

@@ -6,21 +6,12 @@ import (
"fmt" "fmt"
"log" "log"
"strings" "strings"
"sync"
"sync/atomic"
"ti1/config" "ti1/config"
"ti1/data" "ti1/data"
"ti1/database" "ti1/database"
"time"
) )
// DBData is the main entry point for data processing
func DBData(data *data.Data) { func DBData(data *data.Data) {
DBDataOptimized(data)
}
// DBDataOptimized processes data with concurrent workers for better performance
func DBDataOptimized(data *data.Data) {
fmt.Println(data.ServiceDelivery.ResponseTimestamp) fmt.Println(data.ServiceDelivery.ResponseTimestamp)
fmt.Println(data.ServiceDelivery.EstimatedTimetableDelivery[0].EstimatedJourneyVersionFrame.RecordedAtTime) fmt.Println(data.ServiceDelivery.EstimatedTimetableDelivery[0].EstimatedJourneyVersionFrame.RecordedAtTime)
@@ -31,7 +22,7 @@ func DBDataOptimized(data *data.Data) {
defer db.Close() defer db.Close()
// Connect to Valkey // Connect to Valkey
valkeyClient, err := config.ConnectToValkey("config/conf.json") valkeyClient, err := config.ConnectToValkey()
if err != nil { if err != nil {
log.Fatalf("Failed to connect to Valkey: %v", err) log.Fatalf("Failed to connect to Valkey: %v", err)
} }
@@ -46,96 +37,17 @@ func DBDataOptimized(data *data.Data) {
} }
fmt.Println("SID:", sid) fmt.Println("SID:", sid)
// Record start time // counters
startTime := time.Now() var insertCount, updateCount, totalCount, estimatedCallInsertCount, estimatedCallUpdateCount, estimatedCallNoneCount, recordedCallInsertCount, recordedCallUpdateCount, recordedCallNoneCount int
// Atomic counters for thread-safe counting for _, journey := range data.ServiceDelivery.EstimatedTimetableDelivery[0].EstimatedJourneyVersionFrame.EstimatedVehicleJourney {
var insertCount, updateCount, estimatedCallInsertCount, estimatedCallUpdateCount, estimatedCallNoneCount, recordedCallInsertCount, recordedCallUpdateCount, recordedCallNoneCount int64
journeys := data.ServiceDelivery.EstimatedTimetableDelivery[0].EstimatedJourneyVersionFrame.EstimatedVehicleJourney
totalJourneys := len(journeys)
fmt.Printf("Processing %d journeys...\n", totalJourneys)
// Job structures
type evjJob struct {
index int
}
type callJob struct {
evjID int
values []interface{}
}
// Channels
workerCount := 20 // Adjust based on your database and CPU
evjJobs := make(chan evjJob, workerCount*2)
estimatedCallJobs := make(chan callJob, workerCount*10)
recordedCallJobs := make(chan callJob, workerCount*10)
var wg sync.WaitGroup
var callWg sync.WaitGroup
// Start Estimated Call workers
for w := 0; w < workerCount; w++ {
callWg.Add(1)
go func() {
defer callWg.Done()
for job := range estimatedCallJobs {
id, action, err := database.InsertOrUpdateEstimatedCall(ctx, db, job.values, valkeyClient)
if err != nil {
log.Printf("Error inserting/updating estimated call: %v\n", err)
continue
}
if action == "insert" {
atomic.AddInt64(&estimatedCallInsertCount, 1)
} else if action == "update" {
atomic.AddInt64(&estimatedCallUpdateCount, 1)
} else if action == "none" {
atomic.AddInt64(&estimatedCallNoneCount, 1)
}
_ = id
}
}()
}
// Start Recorded Call workers
for w := 0; w < workerCount; w++ {
callWg.Add(1)
go func() {
defer callWg.Done()
for job := range recordedCallJobs {
id, action, err := database.InsertOrUpdateRecordedCall(ctx, db, job.values, valkeyClient)
if err != nil {
log.Printf("Error inserting/updating recorded call: %v\n", err)
continue
}
if action == "insert" {
atomic.AddInt64(&recordedCallInsertCount, 1)
} else if action == "update" {
atomic.AddInt64(&recordedCallUpdateCount, 1)
} else if action == "none" {
atomic.AddInt64(&recordedCallNoneCount, 1)
}
_ = id
}
}()
}
// Start EVJ workers
for w := 0; w < workerCount; w++ {
wg.Add(1)
go func() {
defer wg.Done()
for job := range evjJobs {
journey := &journeys[job.index]
// Prepare values
var values []interface{} var values []interface{}
var datedVehicleJourneyRef, otherJson string var datedVehicleJourneyRef, otherJson string
values = append(values, sid) values = append(values, sid)
values = append(values, journey.RecordedAtTime) values = append(values, journey.RecordedAtTime)
values = append(values, journey.LineRef) values = append(values, journey.LineRef)
//had to add to lowercase cus some values vary in case and it was causing duplicates
values = append(values, strings.ToLower(journey.DirectionRef)) values = append(values, strings.ToLower(journey.DirectionRef))
values = append(values, journey.DataSource) values = append(values, journey.DataSource)
@@ -156,8 +68,10 @@ func DBDataOptimized(data *data.Data) {
values = append(values, journey.VehicleRef) values = append(values, journey.VehicleRef)
values = append(values, journey.Cancellation) values = append(values, journey.Cancellation)
// Create JSON object // Create a map to hold the JSON object for the current journey
jsonObject := make(map[string]interface{}) jsonObject := make(map[string]interface{})
// Add relevant fields to the JSON object
if journey.OriginName != "" { if journey.OriginName != "" {
jsonObject["OriginName"] = journey.OriginName jsonObject["OriginName"] = journey.OriginName
} }
@@ -228,60 +142,71 @@ func DBDataOptimized(data *data.Data) {
jsonObject["Via"] = journey.Via.PlaceName jsonObject["Via"] = journey.Via.PlaceName
} }
// Convert the JSON object to a JSON string
jsonString, err := json.Marshal(jsonObject) jsonString, err := json.Marshal(jsonObject)
if err != nil { if err != nil {
log.Printf("Error marshaling JSON: %v\n", err) log.Fatal(err)
continue
} }
otherJson = string(jsonString) otherJson = string(jsonString)
values = append(values, otherJson) values = append(values, otherJson)
// Insert or update EVJ // Insert or update the record
id, action, err := database.InsertOrUpdateEstimatedVehicleJourney(db, values) id, action, err := database.InsertOrUpdateEstimatedVehicleJourney(db, values)
if err != nil { if err != nil {
log.Printf("Error inserting/updating estimated vehicle journey: %v\n", err) fmt.Printf("Error inserting/updating estimated vehicle journey: %v\n", err)
continue } else {
if 1 == 0 {
fmt.Printf("Action: %s, ID: %d\n", action, id)
} }
if action == "insert" { if action == "insert" {
atomic.AddInt64(&insertCount, 1) insertCount++
} else if action == "update" { } else if action == "update" {
atomic.AddInt64(&updateCount, 1) updateCount++
} }
totalCount = insertCount + updateCount
// Progress reporting //fmt.Printf("Inserts: %d, Updates: %d, Total: %d\n", insertCount, updateCount, totalCount)
total := atomic.AddInt64(&insertCount, 0) + atomic.AddInt64(&updateCount, 0) if totalCount%1000 == 0 {
if total%1000 == 0 {
fmt.Printf( fmt.Printf(
"EVJ - I: %d, U: %d, Total: %d; EstCalls - I: %d U: %d N: %d; RecCalls - I: %d U: %d N: %d\n", "Inserts: %d, Updates: %d, Total: %d; estimatedCalls = I: %d U: %d N: %d; recordedCalls = I: %d U: %d N: %d\n",
atomic.LoadInt64(&insertCount), insertCount,
atomic.LoadInt64(&updateCount), updateCount,
total, totalCount,
atomic.LoadInt64(&estimatedCallInsertCount), estimatedCallInsertCount,
atomic.LoadInt64(&estimatedCallUpdateCount), estimatedCallUpdateCount,
atomic.LoadInt64(&estimatedCallNoneCount), estimatedCallNoneCount,
atomic.LoadInt64(&recordedCallInsertCount), recordedCallInsertCount,
atomic.LoadInt64(&recordedCallUpdateCount), recordedCallUpdateCount,
atomic.LoadInt64(&recordedCallNoneCount), recordedCallNoneCount,
) )
} }
}
// Process Estimated Calls
for _, estimatedCall := range journey.EstimatedCalls { for _, estimatedCall := range journey.EstimatedCalls {
for _, call := range estimatedCall.EstimatedCall { for _, call := range estimatedCall.EstimatedCall {
var estimatedValues []interface{} var estimatedValues []interface{}
//1 estimatedvehiclejourney
estimatedValues = append(estimatedValues, id) estimatedValues = append(estimatedValues, id)
//2 order
estimatedValues = append(estimatedValues, call.Order) estimatedValues = append(estimatedValues, call.Order)
//3 stoppointref
estimatedValues = append(estimatedValues, call.StopPointRef) estimatedValues = append(estimatedValues, call.StopPointRef)
//4 aimeddeparturetime
estimatedValues = append(estimatedValues, call.AimedDepartureTime) estimatedValues = append(estimatedValues, call.AimedDepartureTime)
//5 expecteddeparturetime
estimatedValues = append(estimatedValues, call.ExpectedDepartureTime) estimatedValues = append(estimatedValues, call.ExpectedDepartureTime)
//6 aimedarrivaltime
estimatedValues = append(estimatedValues, call.AimedArrivalTime) estimatedValues = append(estimatedValues, call.AimedArrivalTime)
//7 expectedarrivaltime
estimatedValues = append(estimatedValues, call.ExpectedArrivalTime) estimatedValues = append(estimatedValues, call.ExpectedArrivalTime)
//8 cancellation
estimatedValues = append(estimatedValues, call.Cancellation) estimatedValues = append(estimatedValues, call.Cancellation)
// estimated_data JSON //9 estimated_data (JSON)
estimatedJsonObject := make(map[string]interface{}) estimatedJsonObject := make(map[string]interface{})
// data allrady loged
if call.ExpectedDepartureTime != "" { if call.ExpectedDepartureTime != "" {
estimatedJsonObject["ExpectedDepartureTime"] = call.ExpectedDepartureTime estimatedJsonObject["ExpectedDepartureTime"] = call.ExpectedDepartureTime
} }
@@ -291,6 +216,7 @@ func DBDataOptimized(data *data.Data) {
if call.Cancellation != "" { if call.Cancellation != "" {
estimatedJsonObject["Cancellation"] = call.Cancellation estimatedJsonObject["Cancellation"] = call.Cancellation
} }
// The rest
if call.StopPointName != "" { if call.StopPointName != "" {
estimatedJsonObject["StopPointName"] = call.StopPointName estimatedJsonObject["StopPointName"] = call.StopPointName
} }
@@ -361,41 +287,66 @@ func DBDataOptimized(data *data.Data) {
estimatedJsonObject["Occupancy"] = call.Occupancy estimatedJsonObject["Occupancy"] = call.Occupancy
} }
// Convert the JSON object to a JSON string
jsonString, err := json.Marshal(estimatedJsonObject) jsonString, err := json.Marshal(estimatedJsonObject)
if err != nil { if err != nil {
log.Printf("Error marshaling estimated call JSON: %v\n", err) log.Fatal(err)
continue
} }
estimatedValues = append(estimatedValues, string(jsonString)) estimatedValues = append(estimatedValues, string(jsonString))
// Convert to string values // Insert or update the record
interfaceValues := make([]interface{}, len(estimatedValues)) stringValues := make([]string, len(estimatedValues))
for i, v := range estimatedValues { for i, v := range estimatedValues {
interfaceValues[i] = fmt.Sprintf("%v", v) stringValues[i] = fmt.Sprintf("%v", v)
}
interfaceValues := make([]interface{}, len(stringValues))
for i, v := range stringValues {
interfaceValues[i] = v
}
id, action, err := database.InsertOrUpdateEstimatedCall(ctx, db, interfaceValues, valkeyClient)
if err != nil {
log.Fatalf("Failed to insert or update estimated call: %v", err)
} else {
if 1 == 0 {
fmt.Printf("Action: %s, ID: %d\n", action, id)
} }
// Send to worker pool if action == "insert" {
estimatedCallJobs <- callJob{evjID: id, values: interfaceValues} estimatedCallInsertCount++
} else if action == "update" {
estimatedCallUpdateCount++
} else if action == "none" {
estimatedCallNoneCount++
}
}
} }
} }
// Process Recorded Calls
for _, recordedCall := range journey.RecordedCalls { for _, recordedCall := range journey.RecordedCalls {
for _, call := range recordedCall.RecordedCall { for _, call := range recordedCall.RecordedCall {
var recordedValues []interface{} var recordedValues []interface{}
//1 estimatedvehiclejourney
recordedValues = append(recordedValues, id) recordedValues = append(recordedValues, id)
//2 order
recordedValues = append(recordedValues, call.Order) recordedValues = append(recordedValues, call.Order)
//3 stoppointref
recordedValues = append(recordedValues, call.StopPointRef) recordedValues = append(recordedValues, call.StopPointRef)
//4 aimeddeparturetime
recordedValues = append(recordedValues, call.AimedDepartureTime) recordedValues = append(recordedValues, call.AimedDepartureTime)
//5 expecteddeparturetime
recordedValues = append(recordedValues, call.ExpectedDepartureTime) recordedValues = append(recordedValues, call.ExpectedDepartureTime)
//6 aimedarrivaltime
recordedValues = append(recordedValues, call.AimedArrivalTime) recordedValues = append(recordedValues, call.AimedArrivalTime)
//7 expectedarrivaltime
recordedValues = append(recordedValues, call.ExpectedArrivalTime) recordedValues = append(recordedValues, call.ExpectedArrivalTime)
//8 cancellation
recordedValues = append(recordedValues, call.Cancellation) recordedValues = append(recordedValues, call.Cancellation)
//9 actualdeparturetime
recordedValues = append(recordedValues, call.ActualDepartureTime) recordedValues = append(recordedValues, call.ActualDepartureTime)
//10 actualarrivaltime
recordedValues = append(recordedValues, call.ActualArrivalTime) recordedValues = append(recordedValues, call.ActualArrivalTime)
// recorded_data JSON //11 recorded_data (JSON)
recordedJsonObject := make(map[string]interface{}) recordedJsonObject := make(map[string]interface{})
if call.StopPointName != "" { if call.StopPointName != "" {
recordedJsonObject["StopPointName"] = call.StopPointName recordedJsonObject["StopPointName"] = call.StopPointName
@@ -413,73 +364,68 @@ func DBDataOptimized(data *data.Data) {
recordedJsonObject["Occupancy"] = call.Occupancy recordedJsonObject["Occupancy"] = call.Occupancy
} }
// Convert the JSON object to a JSON string
jsonString, err := json.Marshal(recordedJsonObject) jsonString, err := json.Marshal(recordedJsonObject)
if err != nil { if err != nil {
log.Printf("Error marshaling recorded call JSON: %v\n", err) log.Fatal(err)
continue
} }
recordedValues = append(recordedValues, string(jsonString)) recordedValues = append(recordedValues, string(jsonString))
// Convert to string values // Insert or update the record
interfaceValues := make([]interface{}, len(recordedValues)) stringValues := make([]string, len(recordedValues))
for i, v := range recordedValues { for i, v := range recordedValues {
interfaceValues[i] = fmt.Sprintf("%v", v) stringValues[i] = fmt.Sprintf("%v", v)
}
interfaceValues := make([]interface{}, len(stringValues))
for i, v := range stringValues {
interfaceValues[i] = v
} }
// Send to worker pool id, action, err := database.InsertOrUpdateRecordedCall(ctx, db, interfaceValues, valkeyClient)
recordedCallJobs <- callJob{evjID: id, values: interfaceValues} if err != nil {
fmt.Printf("Error inserting/updating recorded call: %v\n", err)
} else {
if 1 == 0 {
fmt.Printf("Action: %s, ID: %d\n", action, id)
}
if action == "insert" {
recordedCallInsertCount++
//fmt.Printf("Action: %s, ID: %d\n", action, id)
} else if action == "update" {
recordedCallUpdateCount++
} else if action == "none" {
recordedCallNoneCount++
} }
} }
} }
}()
} }
// Send all EVJ jobs
for i := range journeys {
evjJobs <- evjJob{index: i}
} }
close(evjJobs)
// Wait for EVJ processing to complete
wg.Wait()
// Close call job channels and wait for call processing to complete
close(estimatedCallJobs)
close(recordedCallJobs)
callWg.Wait()
// Record end time
endTime := time.Now()
// Print final stats
fmt.Printf( fmt.Printf(
"\nDONE: EVJ - Inserts: %d, Updates: %d, Total: %d\n"+ "DONE: Inserts: %d, Updates: %d, Total: %d; estimatedCalls = I: %d U: %d N: %d; recordedCalls = I: %d U: %d N: %d\n",
" EstimatedCalls - I: %d U: %d N: %d\n"+ insertCount,
" RecordedCalls - I: %d U: %d N: %d\n", updateCount,
atomic.LoadInt64(&insertCount), totalCount,
atomic.LoadInt64(&updateCount), estimatedCallInsertCount,
atomic.LoadInt64(&insertCount)+atomic.LoadInt64(&updateCount), estimatedCallUpdateCount,
atomic.LoadInt64(&estimatedCallInsertCount), estimatedCallNoneCount,
atomic.LoadInt64(&estimatedCallUpdateCount), recordedCallInsertCount,
atomic.LoadInt64(&estimatedCallNoneCount), recordedCallUpdateCount,
atomic.LoadInt64(&recordedCallInsertCount), recordedCallNoneCount,
atomic.LoadInt64(&recordedCallUpdateCount),
atomic.LoadInt64(&recordedCallNoneCount),
) )
// Create map to hold JSON // Create map to hold JSON
serviceDeliveryJsonObject := make(map[string]interface{}) serviceDeliveryJsonObject := make(map[string]interface{})
serviceDeliveryJsonObject["Inserts"] = atomic.LoadInt64(&insertCount)
serviceDeliveryJsonObject["Updates"] = atomic.LoadInt64(&updateCount) // Add fields to JSON
serviceDeliveryJsonObject["EstimatedCallInserts"] = atomic.LoadInt64(&estimatedCallInsertCount) serviceDeliveryJsonObject["Inserts"] = insertCount
serviceDeliveryJsonObject["EstimatedCallUpdates"] = atomic.LoadInt64(&estimatedCallUpdateCount) serviceDeliveryJsonObject["Updates"] = updateCount
serviceDeliveryJsonObject["EstimatedCallNone"] = atomic.LoadInt64(&estimatedCallNoneCount) serviceDeliveryJsonObject["EstimatedCallInserts"] = estimatedCallInsertCount
serviceDeliveryJsonObject["RecordedCallInserts"] = atomic.LoadInt64(&recordedCallInsertCount) serviceDeliveryJsonObject["EstimatedCallUpdates"] = estimatedCallUpdateCount
serviceDeliveryJsonObject["RecordedCallUpdates"] = atomic.LoadInt64(&recordedCallUpdateCount) serviceDeliveryJsonObject["EstimatedCallNone"] = estimatedCallNoneCount
serviceDeliveryJsonObject["RecordedCallNone"] = atomic.LoadInt64(&recordedCallNoneCount) serviceDeliveryJsonObject["RecordedCallInserts"] = recordedCallInsertCount
serviceDeliveryJsonObject["StartTime"] = startTime.Format(time.RFC3339) serviceDeliveryJsonObject["RecordedCallUpdates"] = recordedCallUpdateCount
serviceDeliveryJsonObject["EndTime"] = endTime.Format(time.RFC3339) serviceDeliveryJsonObject["RecordedCallNone"] = recordedCallNoneCount
serviceDeliveryJsonObject["Duration"] = endTime.Sub(startTime).String()
// Convert JSON object to JSON string // Convert JSON object to JSON string
serviceDeliveryJsonString, err := json.Marshal(serviceDeliveryJsonObject) serviceDeliveryJsonString, err := json.Marshal(serviceDeliveryJsonObject)
@@ -492,6 +438,4 @@ func DBDataOptimized(data *data.Data) {
if err != nil { if err != nil {
log.Fatal(err) log.Fatal(err)
} }
fmt.Println("Finished with this ServiceDelivery!")
} }

10
go.mod
View File

@@ -1,10 +1,10 @@
module ti1 module ti1
go 1.26.0 go 1.23.4
require github.com/lib/pq v1.10.9
require ( require (
github.com/lib/pq v1.11.2 github.com/valkey-io/valkey-go v1.0.52 // indirect
github.com/valkey-io/valkey-go v1.0.71 golang.org/x/sys v0.24.0 // indirect
) )
require golang.org/x/sys v0.41.0 // indirect

6
go.sum
View File

@@ -1,12 +1,6 @@
github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw= github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw=
github.com/lib/pq v1.10.9/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o= github.com/lib/pq v1.10.9/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o=
github.com/lib/pq v1.11.2 h1:x6gxUeu39V0BHZiugWe8LXZYZ+Utk7hSJGThs8sdzfs=
github.com/lib/pq v1.11.2/go.mod h1:/p+8NSbOcwzAEI7wiMXFlgydTwcgTr3OSKMsD2BitpA=
github.com/valkey-io/valkey-go v1.0.52 h1:ojrR736satGucqpllYzal8fUrNNROc11V10zokAyIYg= github.com/valkey-io/valkey-go v1.0.52 h1:ojrR736satGucqpllYzal8fUrNNROc11V10zokAyIYg=
github.com/valkey-io/valkey-go v1.0.52/go.mod h1:BXlVAPIL9rFQinSFM+N32JfWzfCaUAqBpZkc4vPY6fM= github.com/valkey-io/valkey-go v1.0.52/go.mod h1:BXlVAPIL9rFQinSFM+N32JfWzfCaUAqBpZkc4vPY6fM=
github.com/valkey-io/valkey-go v1.0.71 h1:tuKjGVLd7/I8CyUwqAq5EaD7isxQdlvJzXo3jS8pZW0=
github.com/valkey-io/valkey-go v1.0.71/go.mod h1:VGhZ6fs68Qrn2+OhH+6waZH27bjpgQOiLyUQyXuYK5k=
golang.org/x/sys v0.24.0 h1:Twjiwq9dn6R1fQcyiK+wQyHWfaz/BJB+YIpzU/Cv3Xg= golang.org/x/sys v0.24.0 h1:Twjiwq9dn6R1fQcyiK+wQyHWfaz/BJB+YIpzU/Cv3Xg=
golang.org/x/sys v0.24.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/sys v0.24.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.41.0 h1:Ivj+2Cp/ylzLiEU89QhWblYnOE9zerudt9Ftecq2C6k=
golang.org/x/sys v0.41.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks=

19
main.go
View File

@@ -2,6 +2,7 @@ package main
import ( import (
"log" "log"
"ti1/config"
"ti1/data" "ti1/data"
"ti1/database" "ti1/database"
"ti1/export" "ti1/export"
@@ -9,11 +10,17 @@ import (
) )
func main() { func main() {
log.Println("ti1 v1.1.0") log.Println("ti1 v0.2.1")
log.Println("Starting...") log.Println("Starting...")
// Load configuration
cfg, err := config.LoadConfig()
if err != nil {
log.Fatalf("Failed to load config: %v", err)
}
// Setup the database // Setup the database
err := database.SetupDB() err = database.SetupDB()
if err != nil { if err != nil {
log.Fatalf("Database setup failed: %v", err) log.Fatalf("Database setup failed: %v", err)
} }
@@ -25,7 +32,7 @@ func main() {
for { for {
start := time.Now() start := time.Now()
data, err := data.FetchData(starttimestamp) data, err := data.FetchData(starttimestamp, cfg.DatasetId, cfg.ExcludedDatasetIds)
if err != nil { if err != nil {
log.Fatal(err) log.Fatal(err)
} }
@@ -34,9 +41,9 @@ func main() {
log.Println("finished in", time.Since(start)) log.Println("finished in", time.Since(start))
elapsed := time.Since(start) elapsed := time.Since(start)
if elapsed < 20*time.Second { if elapsed < 5*time.Minute {
log.Printf("starting again in %v", 20*time.Second-elapsed) log.Printf("starting again in %v", 5*time.Minute-elapsed)
time.Sleep(20*time.Second - elapsed) time.Sleep(5*time.Minute - elapsed)
} }
} }
} }