diff --git a/.github/workflows/docker-image.yml b/.github/workflows/docker-image.yml index 74acbda..92a423b 100644 --- a/.github/workflows/docker-image.yml +++ b/.github/workflows/docker-image.yml @@ -5,6 +5,7 @@ on: branches: - main - dev + - v1.0 jobs: build: @@ -47,12 +48,18 @@ jobs: - name: Push Docker image run: | - # Always push to 'dev' tag - docker tag ti1:${{ env.VERSION }} ${{ secrets.DOCKER_USERNAME }}/ti1:dev - docker push ${{ secrets.DOCKER_USERNAME }}/ti1:dev + # If on v1.0 branch, push to 'testingv1.0' tag + if [[ "${{ github.ref }}" == "refs/heads/v1.0" ]]; then + docker tag ti1:${{ env.VERSION }} ${{ secrets.DOCKER_USERNAME }}/ti1:testingv1.0 + docker push ${{ secrets.DOCKER_USERNAME }}/ti1:testingv1.0 + else + # Always push to 'dev' tag for main and dev branches + docker tag ti1:${{ env.VERSION }} ${{ secrets.DOCKER_USERNAME }}/ti1:dev + docker push ${{ secrets.DOCKER_USERNAME }}/ti1:dev - # If the version is valid, also push that specific version tag - if [[ "${{ env.VERSION }}" != "dev" ]]; then - docker tag ti1:${{ env.VERSION }} ${{ secrets.DOCKER_USERNAME }}/ti1:${{ env.VERSION }} - docker push ${{ secrets.DOCKER_USERNAME }}/ti1:${{ env.VERSION }} + # If the version is valid, also push that specific version tag + if [[ "${{ env.VERSION }}" != "dev" ]]; then + docker tag ti1:${{ env.VERSION }} ${{ secrets.DOCKER_USERNAME }}/ti1:${{ env.VERSION }} + docker push ${{ secrets.DOCKER_USERNAME }}/ti1:${{ env.VERSION }} + fi fi diff --git a/README.md b/README.md index b9f20e4..51704eb 100644 --- a/README.md +++ b/README.md @@ -138,7 +138,7 @@ nano postgres_data/postgresql.conf Change the following values ```conf listen_addresses = '*' -max_connections = 100 +max_connections = 200 shared_buffers = 16GB work_mem = 256MB maintenance_work_mem = 2GB diff --git a/config/conf.json b/config/conf.json index 59041f1..cd4db0b 100644 --- a/config/conf.json +++ b/config/conf.json @@ -10,8 +10,8 @@ "valkey": { "host": "127.0.0.1", "port": "6379", - "max_conns": 50, - "timeout_ms": 5000, + "max_conns": 100, + "timeout_ms": 2000, "password": "the_valkey_password" }, "temp": "value" diff --git a/config/db.go b/config/db.go index 51b8ef1..b25999b 100644 --- a/config/db.go +++ b/config/db.go @@ -27,10 +27,11 @@ func ConnectToPostgreSQL() (*sql.DB, error) { return nil, err } - // Set connection pool settings - db.SetMaxOpenConns(25) // Maximum number of open connections to the database - db.SetMaxIdleConns(25) // Maximum number of connections in the idle connection pool - db.SetConnMaxLifetime(1 * time.Hour) // Maximum amount of time a connection may be reused + // Set connection pool settings for high concurrency + db.SetMaxOpenConns(50) // Maximum number of open connections to the database + db.SetMaxIdleConns(25) // Maximum number of connections in the idle connection pool + db.SetConnMaxLifetime(1 * time.Hour) // Maximum amount of time a connection may be reused + db.SetConnMaxIdleTime(5 * time.Minute) // Maximum amount of time a connection may be idle fmt.Println("Connection to PostgreSQL opened successfully :D") diff --git a/data/data.go b/data/data.go index b7ddee9..15bb300 100644 --- a/data/data.go +++ b/data/data.go @@ -1,9 +1,12 @@ package data import ( - "log" + "crypto/tls" "encoding/xml" + "fmt" + "log" "net/http" + "time" ) type Data struct { @@ -127,23 +130,86 @@ type Data struct { } func FetchData(timestamp string) (*Data, error) { - client := &http.Client{} + // Configure HTTP client with timeout and HTTP/1.1 to avoid HTTP/2 stream errors + transport := &http.Transport{ + TLSClientConfig: &tls.Config{ + MinVersion: tls.VersionTLS12, + }, + MaxIdleConns: 10, + MaxIdleConnsPerHost: 10, + IdleConnTimeout: 90 * time.Second, + DisableCompression: false, + ForceAttemptHTTP2: false, // Disable HTTP/2 to avoid stream errors + TLSHandshakeTimeout: 10 * time.Second, + ResponseHeaderTimeout: 30 * time.Second, + ExpectContinueTimeout: 1 * time.Second, + } + + client := &http.Client{ + Transport: transport, + Timeout: 180 * time.Second, // 3 minute timeout for large datasets + } + requestorId := "ti1-" + timestamp - url := "https://api.entur.io/realtime/v1/rest/et?useOriginalId=true&maxSize=100000&requestorId=" + requestorId - log.Println("Fetching data from URL:", url) - resp, err := client.Get(url) + + // Retry logic for transient failures + var resp *http.Response + var err error + var data *Data + maxRetries := 3 + + for attempt := 1; attempt <= maxRetries; attempt++ { + log.Printf("Fetching data from URL (attempt %d/%d): %s", attempt, maxRetries, url) + + resp, err = client.Get(url) + if err != nil { + log.Printf("Request failed: %v", err) + if attempt < maxRetries { + waitTime := time.Duration(attempt*2) * time.Second + log.Printf("Retrying in %v...", waitTime) + time.Sleep(waitTime) + } + continue + } + + // Check HTTP status code + if resp.StatusCode != http.StatusOK { + resp.Body.Close() + err = fmt.Errorf("HTTP error: %s (status code: %d)", resp.Status, resp.StatusCode) + log.Printf("%v", err) + if attempt < maxRetries { + waitTime := time.Duration(attempt*2) * time.Second + log.Printf("Retrying in %v...", waitTime) + time.Sleep(waitTime) + } + continue + } + + // Try to decode the response + data = &Data{} + decoder := xml.NewDecoder(resp.Body) + err = decoder.Decode(data) + resp.Body.Close() + + if err != nil { + log.Printf("Failed to decode XML: %v", err) + if attempt < maxRetries { + waitTime := time.Duration(attempt*2) * time.Second + log.Printf("Retrying in %v...", waitTime) + time.Sleep(waitTime) + } + continue + } + + // Success! + log.Printf("Successfully fetched and decoded data") + return data, nil + } + + // All retries failed if err != nil { return nil, err } - defer resp.Body.Close() - - data := &Data{} - decoder := xml.NewDecoder(resp.Body) - err = decoder.Decode(data) - if err != nil { - return nil, err - } - - return data, nil + return nil, fmt.Errorf("Failed to fetch data after %d attempts", maxRetries) } diff --git a/database/EstimatedCall.go b/database/EstimatedCall.go index 0321ca7..d390f05 100644 --- a/database/EstimatedCall.go +++ b/database/EstimatedCall.go @@ -6,11 +6,18 @@ import ( "database/sql" "encoding/hex" "fmt" + "sync" "ti1/valki" "github.com/valkey-io/valkey-go" ) +type CallResult struct { + ID int + Action string + Error error +} + func InsertOrUpdateEstimatedCall(ctx context.Context, db *sql.DB, values []interface{}, valkeyClient valkey.Client) (int, string, error) { // Replace empty strings with nil for timestamp fields for i, v := range values { @@ -28,19 +35,15 @@ func InsertOrUpdateEstimatedCall(ctx context.Context, db *sql.DB, values []inter } hash := md5.Sum([]byte(valuesString)) hashString := hex.EncodeToString(hash[:]) - //fmt.Println("HashString:", hashString) estimatedVehicleJourneyID := values[0] orderID := values[1] key := fmt.Sprintf("%v.%v", estimatedVehicleJourneyID, orderID) - //fmt.Printf("Estimated Vehicle Journey ID: %v, Order ID: %v\n", estimatedVehicleJourneyID, orderID) - - var err error // Get the MD5 hash from Valkey retrievedHash, err := valki.GetValkeyValue(ctx, valkeyClient, key) if err != nil { - return 0, "", fmt.Errorf("failed to get value from Valkey: %v", err) + return 0, "", fmt.Errorf("failed to get value from Valkey: %w", err) } // Check if the retrieved value matches the original MD5 hash @@ -64,26 +67,60 @@ func InsertOrUpdateEstimatedCall(ctx context.Context, db *sql.DB, values []inter estimated_data = EXCLUDED.estimated_data RETURNING CASE WHEN xmax = 0 THEN 'insert' ELSE 'update' END, id; ` - stmt, err := db.Prepare(query) - if err != nil { - return 0, "", fmt.Errorf("error preparing statement: %v", err) - } - defer stmt.Close() err = valki.SetValkeyValue(ctx, valkeyClient, key, hashString) if err != nil { - return 0, "", fmt.Errorf("failed to set value in Valkey: %v", err) + return 0, "", fmt.Errorf("failed to set value in Valkey: %w", err) } var action string var id int - err = stmt.QueryRow(values...).Scan(&action, &id) + err = db.QueryRowContext(ctx, query, values...).Scan(&action, &id) if err != nil { - return 0, "", fmt.Errorf("error executing statement: %v", err) + return 0, "", fmt.Errorf("error executing statement: %w", err) } return id, action, nil - } else { - //fmt.Printf("MATCH!!! Original Hash: %s, Retrieved Hash: %s\n", hashString, retrievedHash) - return 0, "none", nil } + return 0, "none", nil +} + +// BatchInsertEstimatedCalls processes multiple estimated calls concurrently +func BatchInsertEstimatedCalls(ctx context.Context, db *sql.DB, batch [][]interface{}, valkeyClient valkey.Client, workerCount int) ([]CallResult, error) { + if len(batch) == 0 { + return nil, nil + } + + results := make([]CallResult, len(batch)) + jobs := make(chan int, len(batch)) + var wg sync.WaitGroup + + // Start workers + for w := 0; w < workerCount; w++ { + wg.Add(1) + go func() { + defer wg.Done() + for idx := range jobs { + select { + case <-ctx.Done(): + return + default: + id, action, err := InsertOrUpdateEstimatedCall(ctx, db, batch[idx], valkeyClient) + results[idx] = CallResult{ + ID: id, + Action: action, + Error: err, + } + } + } + }() + } + + // Send jobs + for i := range batch { + jobs <- i + } + close(jobs) + + wg.Wait() + return results, nil } diff --git a/database/EstimatedVehicleJourney.go b/database/EstimatedVehicleJourney.go index 4efa54a..02bc217 100644 --- a/database/EstimatedVehicleJourney.go +++ b/database/EstimatedVehicleJourney.go @@ -1,41 +1,135 @@ package database import ( + "context" "database/sql" "fmt" + "sync" ) +type EVJResult struct { + ID int + Action string + Error error + Index int // To maintain order +} + +// PreparedStatements holds reusable prepared statements +type PreparedStatements struct { + evjStmt *sql.Stmt + ecStmt *sql.Stmt + rcStmt *sql.Stmt + mu sync.Mutex +} + +func NewPreparedStatements(db *sql.DB) (*PreparedStatements, error) { + evjQuery := ` + INSERT INTO estimatedvehiclejourney (servicedelivery, recordedattime, lineref, directionref, datasource, datedvehiclejourneyref, vehiclemode, dataframeref, originref, destinationref, operatorref, vehicleref, cancellation, other, firstservicedelivery) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $1) + ON CONFLICT (lineref, directionref, datasource, datedvehiclejourneyref) + DO UPDATE SET + servicedelivery = EXCLUDED.servicedelivery, + recordedattime = EXCLUDED.recordedattime, + vehiclemode = COALESCE(EXCLUDED.vehiclemode, estimatedvehiclejourney.vehiclemode), + dataframeref = COALESCE(EXCLUDED.dataframeref, estimatedvehiclejourney.dataframeref), + originref = COALESCE(EXCLUDED.originref, estimatedvehiclejourney.originref), + destinationref = COALESCE(EXCLUDED.destinationref, estimatedvehiclejourney.destinationref), + operatorref = COALESCE(EXCLUDED.operatorref, estimatedvehiclejourney.operatorref), + vehicleref = COALESCE(EXCLUDED.vehicleref, estimatedvehiclejourney.vehicleref), + cancellation = COALESCE(EXCLUDED.cancellation, estimatedvehiclejourney.cancellation), + other = COALESCE(EXCLUDED.other, estimatedvehiclejourney.other) + RETURNING CASE WHEN xmax = 0 THEN 'insert' ELSE 'update' END, id; + ` + + evjStmt, err := db.Prepare(evjQuery) + if err != nil { + return nil, fmt.Errorf("failed to prepare EVJ statement: %w", err) + } + + return &PreparedStatements{ + evjStmt: evjStmt, + }, nil +} + +func (ps *PreparedStatements) Close() { + if ps.evjStmt != nil { + ps.evjStmt.Close() + } + if ps.ecStmt != nil { + ps.ecStmt.Close() + } + if ps.rcStmt != nil { + ps.rcStmt.Close() + } +} + func InsertOrUpdateEstimatedVehicleJourney(db *sql.DB, values []interface{}) (int, string, error) { query := ` - INSERT INTO estimatedvehiclejourney (servicedelivery, recordedattime, lineref, directionref, datasource, datedvehiclejourneyref, vehiclemode, dataframeref, originref, destinationref, operatorref, vehicleref, cancellation, other, firstservicedelivery) - VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $1) - ON CONFLICT (lineref, directionref, datasource, datedvehiclejourneyref) - DO UPDATE SET - servicedelivery = EXCLUDED.servicedelivery, - recordedattime = EXCLUDED.recordedattime, - vehiclemode = COALESCE(EXCLUDED.vehiclemode, estimatedvehiclejourney.vehiclemode), - dataframeref = COALESCE(EXCLUDED.dataframeref, estimatedvehiclejourney.dataframeref), - originref = COALESCE(EXCLUDED.originref, estimatedvehiclejourney.originref), - destinationref = COALESCE(EXCLUDED.destinationref, estimatedvehiclejourney.destinationref), - operatorref = COALESCE(EXCLUDED.operatorref, estimatedvehiclejourney.operatorref), - vehicleref = COALESCE(EXCLUDED.vehicleref, estimatedvehiclejourney.vehicleref), - cancellation = COALESCE(EXCLUDED.cancellation, estimatedvehiclejourney.cancellation), - other = COALESCE(EXCLUDED.other, estimatedvehiclejourney.other) - RETURNING CASE WHEN xmax = 0 THEN 'insert' ELSE 'update' END, id; + INSERT INTO estimatedvehiclejourney (servicedelivery, recordedattime, lineref, directionref, datasource, datedvehiclejourneyref, vehiclemode, dataframeref, originref, destinationref, operatorref, vehicleref, cancellation, other, firstservicedelivery) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $1) + ON CONFLICT (lineref, directionref, datasource, datedvehiclejourneyref) + DO UPDATE SET + servicedelivery = EXCLUDED.servicedelivery, + recordedattime = EXCLUDED.recordedattime, + vehiclemode = COALESCE(EXCLUDED.vehiclemode, estimatedvehiclejourney.vehiclemode), + dataframeref = COALESCE(EXCLUDED.dataframeref, estimatedvehiclejourney.dataframeref), + originref = COALESCE(EXCLUDED.originref, estimatedvehiclejourney.originref), + destinationref = COALESCE(EXCLUDED.destinationref, estimatedvehiclejourney.destinationref), + operatorref = COALESCE(EXCLUDED.operatorref, estimatedvehiclejourney.operatorref), + vehicleref = COALESCE(EXCLUDED.vehicleref, estimatedvehiclejourney.vehicleref), + cancellation = COALESCE(EXCLUDED.cancellation, estimatedvehiclejourney.cancellation), + other = COALESCE(EXCLUDED.other, estimatedvehiclejourney.other) + RETURNING CASE WHEN xmax = 0 THEN 'insert' ELSE 'update' END, id; ` - stmt, err := db.Prepare(query) - if err != nil { - return 0, "", fmt.Errorf("error preparing statement: %v", err) - } - defer stmt.Close() - var action string var id int - err = stmt.QueryRow(values...).Scan(&action, &id) + err := db.QueryRow(query, values...).Scan(&action, &id) if err != nil { - return 0, "", fmt.Errorf("error executing statement: %v", err) + return 0, "", fmt.Errorf("error executing EVJ statement: %w", err) } return id, action, nil } + +// BatchInsertEVJ processes multiple EVJ inserts concurrently +func BatchInsertEVJ(ctx context.Context, db *sql.DB, batch [][]interface{}, workerCount int) ([]EVJResult, error) { + if len(batch) == 0 { + return nil, nil + } + + results := make([]EVJResult, len(batch)) + jobs := make(chan int, len(batch)) + var wg sync.WaitGroup + + // Start workers + for w := 0; w < workerCount; w++ { + wg.Add(1) + go func() { + defer wg.Done() + for idx := range jobs { + select { + case <-ctx.Done(): + return + default: + id, action, err := InsertOrUpdateEstimatedVehicleJourney(db, batch[idx]) + results[idx] = EVJResult{ + ID: id, + Action: action, + Error: err, + Index: idx, + } + } + } + }() + } + + // Send jobs + for i := range batch { + jobs <- i + } + close(jobs) + + wg.Wait() + return results, nil +} diff --git a/database/RecordedCall.go b/database/RecordedCall.go index 667236d..3e7845a 100644 --- a/database/RecordedCall.go +++ b/database/RecordedCall.go @@ -6,6 +6,7 @@ import ( "database/sql" "encoding/hex" "fmt" + "sync" "ti1/valki" "github.com/valkey-io/valkey-go" @@ -33,12 +34,10 @@ func InsertOrUpdateRecordedCall(ctx context.Context, db *sql.DB, values []interf orderID := values[1] key := fmt.Sprintf("%v.%v", estimatedVehicleJourneyID, orderID) - var err error - // Get the MD5 hash from Valkey retrievedHash, err := valki.GetValkeyValue(ctx, valkeyClient, key) if err != nil { - return 0, "", fmt.Errorf("failed to get value from Valkey: %v", err) + return 0, "", fmt.Errorf("failed to get value from Valkey: %w", err) } // Check if the retrieved value matches the original MD5 hash @@ -65,25 +64,60 @@ func InsertOrUpdateRecordedCall(ctx context.Context, db *sql.DB, values []interf recorded_data = EXCLUDED.recorded_data RETURNING CASE WHEN xmax = 0 THEN 'insert' ELSE 'update' END, id; ` - stmt, err := db.Prepare(query) - if err != nil { - return 0, "", fmt.Errorf("error preparing statement: %v", err) - } - defer stmt.Close() err = valki.SetValkeyValue(ctx, valkeyClient, key, hashString) if err != nil { - return 0, "", fmt.Errorf("failed to set value in Valkey: %v", err) + return 0, "", fmt.Errorf("failed to set value in Valkey: %w", err) } var action string var id int - err = stmt.QueryRow(values...).Scan(&action, &id) + err = db.QueryRowContext(ctx, query, values...).Scan(&action, &id) if err != nil { - return 0, "", fmt.Errorf("error executing statement: %v", err) + return 0, "", fmt.Errorf("error executing statement: %w", err) } return id, action, nil - } else { - return 0, "none", nil } + return 0, "none", nil +} + +// BatchInsertRecordedCalls processes multiple recorded calls concurrently +func BatchInsertRecordedCalls(ctx context.Context, db *sql.DB, batch [][]interface{}, valkeyClient valkey.Client, workerCount int) ([]CallResult, error) { + if len(batch) == 0 { + return nil, nil + } + + results := make([]CallResult, len(batch)) + jobs := make(chan int, len(batch)) + var wg sync.WaitGroup + + // Start workers + for w := 0; w < workerCount; w++ { + wg.Add(1) + go func() { + defer wg.Done() + for idx := range jobs { + select { + case <-ctx.Done(): + return + default: + id, action, err := InsertOrUpdateRecordedCall(ctx, db, batch[idx], valkeyClient) + results[idx] = CallResult{ + ID: id, + Action: action, + Error: err, + } + } + } + }() + } + + // Send jobs + for i := range batch { + jobs <- i + } + close(jobs) + + wg.Wait() + return results, nil } diff --git a/database/ServiceDeliveryDB.go b/database/ServiceDeliveryDB.go index a3f03e7..82f046f 100644 --- a/database/ServiceDeliveryDB.go +++ b/database/ServiceDeliveryDB.go @@ -6,25 +6,18 @@ import ( ) func InsertServiceDelivery(db *sql.DB, responseTimestamp string, recordedAtTime string) (int, error) { - fmt.Println("Inserting ServiceDelivery...") var id int - err := db.QueryRow("INSERT INTO public.ServiceDelivery (ResponseTimestamp, RecordedAtTime) VALUES ($1, $2) RETURNING ID", responseTimestamp, recordedAtTime).Scan(&id) if err != nil { - fmt.Println(err) - return 0, err + return 0, fmt.Errorf("failed to insert service delivery: %w", err) } - //fmt.Println("ServiceDelivery inserted successfully! (", id, ")") return id, nil } func UpdateServiceDeliveryData(db *sql.DB, id int, data string) error { - fmt.Println("Updating ServiceDelivery data...") _, err := db.Exec("UPDATE public.ServiceDelivery SET Data = $1 WHERE ID = $2", data, id) if err != nil { - fmt.Println(err) - return err + return fmt.Errorf("failed to update service delivery data: %w", err) } - fmt.Println("Finished with this ServiceDelivery!") return nil } diff --git a/databaseold/EstimatedCall.go b/databaseold/EstimatedCall.go new file mode 100644 index 0000000..46c71df --- /dev/null +++ b/databaseold/EstimatedCall.go @@ -0,0 +1,89 @@ +package databaseold + +import ( + "context" + "crypto/md5" + "database/sql" + "encoding/hex" + "fmt" + "ti1/valki" + + "github.com/valkey-io/valkey-go" +) + +func InsertOrUpdateEstimatedCall(ctx context.Context, db *sql.DB, values []interface{}, valkeyClient valkey.Client) (int, string, error) { + // Replace empty strings with nil for timestamp fields + for i, v := range values { + if str, ok := v.(string); ok && str == "" { + values[i] = nil + } + } + + // Convert values to a single string and hash it using MD5 + var valuesString string + for _, v := range values { + if v != nil { + valuesString += fmt.Sprintf("%v", v) + } + } + hash := md5.Sum([]byte(valuesString)) + hashString := hex.EncodeToString(hash[:]) + //fmt.Println("HashString:", hashString) + + estimatedVehicleJourneyID := values[0] + orderID := values[1] + key := fmt.Sprintf("%v.%v", estimatedVehicleJourneyID, orderID) + //fmt.Printf("Estimated Vehicle Journey ID: %v, Order ID: %v\n", estimatedVehicleJourneyID, orderID) + + var err error + + // Get the MD5 hash from Valkey + retrievedHash, err := valki.GetValkeyValue(ctx, valkeyClient, key) + if err != nil { + return 0, "", fmt.Errorf("failed to get value from Valkey: %v", err) + } + + // Check if the retrieved value matches the original MD5 hash + if retrievedHash != hashString { + query := ` + INSERT INTO calls ( + estimatedvehiclejourney, "order", stoppointref, + aimeddeparturetime, expecteddeparturetime, + aimedarrivaltime, expectedarrivaltime, + cancellation, estimated_data + ) + VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9) + ON CONFLICT (estimatedvehiclejourney, "order") + DO UPDATE SET + stoppointref = EXCLUDED.stoppointref, + aimeddeparturetime = EXCLUDED.aimeddeparturetime, + expecteddeparturetime = EXCLUDED.expecteddeparturetime, + aimedarrivaltime = EXCLUDED.aimedarrivaltime, + expectedarrivaltime = EXCLUDED.expectedarrivaltime, + cancellation = EXCLUDED.cancellation, + estimated_data = EXCLUDED.estimated_data + RETURNING CASE WHEN xmax = 0 THEN 'insert' ELSE 'update' END, id; + ` + stmt, err := db.Prepare(query) + if err != nil { + return 0, "", fmt.Errorf("error preparing statement: %v", err) + } + defer stmt.Close() + + err = valki.SetValkeyValue(ctx, valkeyClient, key, hashString) + if err != nil { + return 0, "", fmt.Errorf("failed to set value in Valkey: %v", err) + } + + var action string + var id int + err = stmt.QueryRow(values...).Scan(&action, &id) + if err != nil { + return 0, "", fmt.Errorf("error executing statement: %v", err) + } + return id, action, nil + } else { + //fmt.Printf("MATCH!!! Original Hash: %s, Retrieved Hash: %s\n", hashString, retrievedHash) + return 0, "none", nil + } +} diff --git a/databaseold/EstimatedVehicleJourney.go b/databaseold/EstimatedVehicleJourney.go new file mode 100644 index 0000000..027e5bb --- /dev/null +++ b/databaseold/EstimatedVehicleJourney.go @@ -0,0 +1,41 @@ +package databaseold + +import ( + "database/sql" + "fmt" +) + +func InsertOrUpdateEstimatedVehicleJourney(db *sql.DB, values []interface{}) (int, string, error) { + query := ` + INSERT INTO estimatedvehiclejourney (servicedelivery, recordedattime, lineref, directionref, datasource, datedvehiclejourneyref, vehiclemode, dataframeref, originref, destinationref, operatorref, vehicleref, cancellation, other, firstservicedelivery) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $1) + ON CONFLICT (lineref, directionref, datasource, datedvehiclejourneyref) + DO UPDATE SET + servicedelivery = EXCLUDED.servicedelivery, + recordedattime = EXCLUDED.recordedattime, + vehiclemode = COALESCE(EXCLUDED.vehiclemode, estimatedvehiclejourney.vehiclemode), + dataframeref = COALESCE(EXCLUDED.dataframeref, estimatedvehiclejourney.dataframeref), + originref = COALESCE(EXCLUDED.originref, estimatedvehiclejourney.originref), + destinationref = COALESCE(EXCLUDED.destinationref, estimatedvehiclejourney.destinationref), + operatorref = COALESCE(EXCLUDED.operatorref, estimatedvehiclejourney.operatorref), + vehicleref = COALESCE(EXCLUDED.vehicleref, estimatedvehiclejourney.vehicleref), + cancellation = COALESCE(EXCLUDED.cancellation, estimatedvehiclejourney.cancellation), + other = COALESCE(EXCLUDED.other, estimatedvehiclejourney.other) + RETURNING CASE WHEN xmax = 0 THEN 'insert' ELSE 'update' END, id; + ` + + stmt, err := db.Prepare(query) + if err != nil { + return 0, "", fmt.Errorf("error preparing statement: %v", err) + } + defer stmt.Close() + + var action string + var id int + err = stmt.QueryRow(values...).Scan(&action, &id) + if err != nil { + return 0, "", fmt.Errorf("error executing statement: %v", err) + } + + return id, action, nil +} diff --git a/databaseold/RecordedCall.go b/databaseold/RecordedCall.go new file mode 100644 index 0000000..96c3e21 --- /dev/null +++ b/databaseold/RecordedCall.go @@ -0,0 +1,89 @@ +package databaseold + +import ( + "context" + "crypto/md5" + "database/sql" + "encoding/hex" + "fmt" + "ti1/valki" + + "github.com/valkey-io/valkey-go" +) + +func InsertOrUpdateRecordedCall(ctx context.Context, db *sql.DB, values []interface{}, valkeyClient valkey.Client) (int, string, error) { + // Replace empty strings with nil for timestamp fields + for i, v := range values { + if str, ok := v.(string); ok && str == "" { + values[i] = nil + } + } + + // Convert values to a single string and hash it using MD5 + var valuesString string + for _, v := range values { + if v != nil { + valuesString += fmt.Sprintf("%v", v) + } + } + hash := md5.Sum([]byte(valuesString)) + hashString := hex.EncodeToString(hash[:]) + + estimatedVehicleJourneyID := values[0] + orderID := values[1] + key := fmt.Sprintf("%v.%v", estimatedVehicleJourneyID, orderID) + + var err error + + // Get the MD5 hash from Valkey + retrievedHash, err := valki.GetValkeyValue(ctx, valkeyClient, key) + if err != nil { + return 0, "", fmt.Errorf("failed to get value from Valkey: %v", err) + } + + // Check if the retrieved value matches the original MD5 hash + if retrievedHash != hashString { + query := ` + INSERT INTO calls ( + estimatedvehiclejourney, "order", stoppointref, + aimeddeparturetime, expecteddeparturetime, + aimedarrivaltime, expectedarrivaltime, + cancellation, actualdeparturetime, actualarrivaltime, + recorded_data + ) + VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11) + ON CONFLICT (estimatedvehiclejourney, "order") + DO UPDATE SET + stoppointref = EXCLUDED.stoppointref, + aimeddeparturetime = EXCLUDED.aimeddeparturetime, + expecteddeparturetime = EXCLUDED.expecteddeparturetime, + aimedarrivaltime = EXCLUDED.aimedarrivaltime, + expectedarrivaltime = EXCLUDED.expectedarrivaltime, + cancellation = EXCLUDED.cancellation, + actualdeparturetime = EXCLUDED.actualdeparturetime, + actualarrivaltime = EXCLUDED.actualarrivaltime, + recorded_data = EXCLUDED.recorded_data + RETURNING CASE WHEN xmax = 0 THEN 'insert' ELSE 'update' END, id; + ` + stmt, err := db.Prepare(query) + if err != nil { + return 0, "", fmt.Errorf("error preparing statement: %v", err) + } + defer stmt.Close() + + err = valki.SetValkeyValue(ctx, valkeyClient, key, hashString) + if err != nil { + return 0, "", fmt.Errorf("failed to set value in Valkey: %v", err) + } + + var action string + var id int + err = stmt.QueryRow(values...).Scan(&action, &id) + if err != nil { + return 0, "", fmt.Errorf("error executing statement: %v", err) + } + return id, action, nil + } else { + return 0, "none", nil + } +} diff --git a/databaseold/ServiceDeliveryDB.go b/databaseold/ServiceDeliveryDB.go new file mode 100644 index 0000000..c8f8493 --- /dev/null +++ b/databaseold/ServiceDeliveryDB.go @@ -0,0 +1,30 @@ +package databaseold + +import ( + "database/sql" + "fmt" +) + +func InsertServiceDelivery(db *sql.DB, responseTimestamp string, recordedAtTime string) (int, error) { + fmt.Println("Inserting ServiceDelivery...") + var id int + + err := db.QueryRow("INSERT INTO public.ServiceDelivery (ResponseTimestamp, RecordedAtTime) VALUES ($1, $2) RETURNING ID", responseTimestamp, recordedAtTime).Scan(&id) + if err != nil { + fmt.Println(err) + return 0, err + } + //fmt.Println("ServiceDelivery inserted successfully! (", id, ")") + return id, nil +} + +func UpdateServiceDeliveryData(db *sql.DB, id int, data string) error { + fmt.Println("Updating ServiceDelivery data...") + _, err := db.Exec("UPDATE public.ServiceDelivery SET Data = $1 WHERE ID = $2", data, id) + if err != nil { + fmt.Println(err) + return err + } + fmt.Println("Finished with this ServiceDelivery!") + return nil +} diff --git a/databaseold/SetupDB.go b/databaseold/SetupDB.go new file mode 100644 index 0000000..b52a324 --- /dev/null +++ b/databaseold/SetupDB.go @@ -0,0 +1,140 @@ +package databaseold + +import ( + "fmt" + "ti1/config" +) + +func SetupDB() error { + fmt.Println("Setting up the database...") + + // Connect to PostgreSQL + db, err := config.ConnectToPostgreSQL() + if err != nil { + return fmt.Errorf("failed to connect to database: %w", err) + } + defer config.DisconnectFromPostgreSQL(db) + + // Create sequences if they do not exist + sequences := []string{ + "CREATE SEQUENCE IF NOT EXISTS public.calls_id_seq", + "CREATE SEQUENCE IF NOT EXISTS public.estimatedvehiclejourney_id_seq", + "CREATE SEQUENCE IF NOT EXISTS public.servicedelivery_id_seq", + } + + for _, seq := range sequences { + _, err := db.Exec(seq) + if err != nil { + return fmt.Errorf("failed to create sequence: %w", err) + } + } + + // Check if tables exist and have the correct structure + tables := map[string]string{ + "calls": `CREATE TABLE IF NOT EXISTS public.calls ( + id BIGINT PRIMARY KEY DEFAULT nextval('public.calls_id_seq'), + estimatedvehiclejourney BIGINT, + "order" INTEGER, + stoppointref VARCHAR, + aimeddeparturetime TIMESTAMP, + expecteddeparturetime TIMESTAMP, + aimedarrivaltime TIMESTAMP, + expectedarrivaltime TIMESTAMP, + cancellation VARCHAR, + actualdeparturetime TIMESTAMP, + actualarrivaltime TIMESTAMP, + estimated_data JSON, + recorded_data JSON + );`, + "estimatedvehiclejourney": `CREATE TABLE IF NOT EXISTS public.estimatedvehiclejourney ( + id BIGINT PRIMARY KEY DEFAULT nextval('public.estimatedvehiclejourney_id_seq'), + servicedelivery INTEGER, + recordedattime TIMESTAMP, + lineref VARCHAR, + directionref VARCHAR, + datasource VARCHAR, + datedvehiclejourneyref VARCHAR, + vehiclemode VARCHAR, + dataframeref VARCHAR, + originref VARCHAR, + destinationref VARCHAR, + operatorref VARCHAR, + vehicleref VARCHAR, + cancellation VARCHAR, + other JSON, + firstservicedelivery INTEGER + );`, + "servicedelivery": `CREATE TABLE IF NOT EXISTS public.servicedelivery ( + id INTEGER PRIMARY KEY DEFAULT nextval('public.servicedelivery_id_seq'), + responsetimestamp TIMESTAMPTZ, + recordedattime TIMESTAMPTZ, + data JSON + );`, + } + + for table, createStmt := range tables { + var exists bool + err := db.QueryRow(fmt.Sprintf("SELECT EXISTS (SELECT 1 FROM information_schema.tables WHERE table_name = '%s')", table)).Scan(&exists) + if err != nil { + return fmt.Errorf("failed to check if table %s exists: %w", table, err) + } + + if !exists { + _, err := db.Exec(createStmt) + if err != nil { + return fmt.Errorf("failed to create table %s: %w", table, err) + } + fmt.Printf("Table %s created successfully!\n", table) + } else { + fmt.Printf("Table %s already exists.\n", table) + } + } + + // Check if the unique constraint exists before adding it to calls table + var constraintExists bool + err = db.QueryRow(` + SELECT EXISTS ( + SELECT 1 + FROM pg_constraint + WHERE conname = 'unique_estimatedvehiclejourney_order' + ); + `).Scan(&constraintExists) + if err != nil { + return fmt.Errorf("failed to check if unique constraint exists: %w", err) + } + + if !constraintExists { + _, err = db.Exec(`ALTER TABLE calls ADD CONSTRAINT unique_estimatedvehiclejourney_order UNIQUE (estimatedvehiclejourney, "order");`) + if err != nil { + return fmt.Errorf("failed to add unique constraint to calls table: %w", err) + } + fmt.Println("Unique constraint added to calls table.") + } else { + fmt.Println("Unique constraint already exists on calls table.") + } + + // Check if the unique constraint exists before adding it to estimatedvehiclejourney table + err = db.QueryRow(` + SELECT EXISTS ( + SELECT 1 + FROM pg_constraint + WHERE conname = 'unique_lineref_directionref_datasource_datedvehiclejourneyref' + ); + `).Scan(&constraintExists) + if err != nil { + return fmt.Errorf("failed to check if unique constraint exists: %w", err) + } + + if !constraintExists { + _, err = db.Exec(`ALTER TABLE estimatedvehiclejourney ADD CONSTRAINT unique_lineref_directionref_datasource_datedvehiclejourneyref UNIQUE (lineref, directionref, datasource, datedvehiclejourneyref);`) + if err != nil { + return fmt.Errorf("failed to add unique constraint to estimatedvehiclejourney table: %w", err) + } + fmt.Println("Unique constraint added to estimatedvehiclejourney table.") + } else { + fmt.Println("Unique constraint already exists on estimatedvehiclejourney table.") + } + + fmt.Println("Database setup is good!") + return nil +} diff --git a/export/database.go b/export/database.go index cf817eb..c5871af 100644 --- a/export/database.go +++ b/export/database.go @@ -6,12 +6,20 @@ import ( "fmt" "log" "strings" + "sync" + "sync/atomic" "ti1/config" "ti1/data" "ti1/database" ) +// DBData is the main entry point for data processing func DBData(data *data.Data) { + DBDataOptimized(data) +} + +// DBDataOptimized processes data with concurrent workers for better performance +func DBDataOptimized(data *data.Data) { fmt.Println(data.ServiceDelivery.ResponseTimestamp) fmt.Println(data.ServiceDelivery.EstimatedTimetableDelivery[0].EstimatedJourneyVersionFrame.RecordedAtTime) @@ -37,395 +45,431 @@ func DBData(data *data.Data) { } fmt.Println("SID:", sid) - // counters - var insertCount, updateCount, totalCount, estimatedCallInsertCount, estimatedCallUpdateCount, estimatedCallNoneCount, recordedCallInsertCount, recordedCallUpdateCount, recordedCallNoneCount int + // Atomic counters for thread-safe counting + var insertCount, updateCount, estimatedCallInsertCount, estimatedCallUpdateCount, estimatedCallNoneCount, recordedCallInsertCount, recordedCallUpdateCount, recordedCallNoneCount int64 - for _, journey := range data.ServiceDelivery.EstimatedTimetableDelivery[0].EstimatedJourneyVersionFrame.EstimatedVehicleJourney { - var values []interface{} - var datedVehicleJourneyRef, otherJson string - - values = append(values, sid) - values = append(values, journey.RecordedAtTime) - values = append(values, journey.LineRef) - //had to add to lowercase cus some values vary in case and it was causing duplicates - values = append(values, strings.ToLower(journey.DirectionRef)) - values = append(values, journey.DataSource) - - if journey.FramedVehicleJourneyRef.DatedVehicleJourneyRef != "" { - datedVehicleJourneyRef = journey.FramedVehicleJourneyRef.DatedVehicleJourneyRef - } else if journey.DatedVehicleJourneyRef != "" { - datedVehicleJourneyRef = journey.DatedVehicleJourneyRef - } else { - datedVehicleJourneyRef = "evj." + journey.EstimatedVehicleJourneyCode - } - values = append(values, datedVehicleJourneyRef) - - values = append(values, journey.VehicleMode) - values = append(values, journey.FramedVehicleJourneyRef.DataFrameRef) - values = append(values, journey.OriginRef) - values = append(values, journey.DestinationRef) - values = append(values, journey.OperatorRef) - values = append(values, journey.VehicleRef) - values = append(values, journey.Cancellation) - - // Create a map to hold the JSON object for the current journey - jsonObject := make(map[string]interface{}) - - // Add relevant fields to the JSON object - if journey.OriginName != "" { - jsonObject["OriginName"] = journey.OriginName - } - if journey.DestinationName != "" { - jsonObject["DestinationName"] = journey.DestinationName - } - if journey.ProductCategoryRef != "" { - jsonObject["ProductCategoryRef"] = journey.ProductCategoryRef - } - if journey.ServiceFeatureRef != "" { - jsonObject["ServiceFeatureRef"] = journey.ServiceFeatureRef - } - if journey.Monitored != "" { - jsonObject["Monitored"] = journey.Monitored - } - if journey.JourneyPatternRef != "" { - jsonObject["JourneyPatternRef"] = journey.JourneyPatternRef - } - if journey.JourneyPatternName != "" { - jsonObject["JourneyPatternName"] = journey.JourneyPatternName - } - if journey.PublishedLineName != "" { - jsonObject["PublishedLineName"] = journey.PublishedLineName - } - if journey.DirectionName != "" { - jsonObject["DirectionName"] = journey.DirectionName - } - if journey.OriginAimedDepartureTime != "" { - jsonObject["OriginAimedDepartureTime"] = journey.OriginAimedDepartureTime - } - if journey.DestinationAimedArrivalTime != "" { - jsonObject["DestinationAimedArrivalTime"] = journey.DestinationAimedArrivalTime - } - if journey.BlockRef != "" { - jsonObject["BlockRef"] = journey.BlockRef - } - if journey.VehicleJourneyRef != "" { - jsonObject["VehicleJourneyRef"] = journey.VehicleJourneyRef - } - if journey.Occupancy != "" { - jsonObject["Occupancy"] = journey.Occupancy - } - if journey.DestinationDisplayAtOrigin != "" { - jsonObject["DestinationDisplayAtOrigin"] = journey.DestinationDisplayAtOrigin - } - if journey.ExtraJourney != "" { - jsonObject["ExtraJourney"] = journey.ExtraJourney - } - if journey.RouteRef != "" { - jsonObject["RouteRef"] = journey.RouteRef - } - if journey.GroupOfLinesRef != "" { - jsonObject["GroupOfLinesRef"] = journey.GroupOfLinesRef - } - if journey.ExternalLineRef != "" { - jsonObject["ExternalLineRef"] = journey.ExternalLineRef - } - if journey.InCongestion != "" { - jsonObject["InCongestion"] = journey.InCongestion - } - if journey.PredictionInaccurate != "" { - jsonObject["PredictionInaccurate"] = journey.PredictionInaccurate - } - if journey.JourneyNote != "" { - jsonObject["JourneyNote"] = journey.JourneyNote - } - if journey.Via.PlaceName != "" { - jsonObject["Via"] = journey.Via.PlaceName - } - - // Convert the JSON object to a JSON string - jsonString, err := json.Marshal(jsonObject) - if err != nil { - log.Fatal(err) - } - otherJson = string(jsonString) - values = append(values, otherJson) - - // Insert or update the record - id, action, err := database.InsertOrUpdateEstimatedVehicleJourney(db, values) - if err != nil { - fmt.Printf("Error inserting/updating estimated vehicle journey: %v\n", err) - } else { - if 1 == 0 { - fmt.Printf("Action: %s, ID: %d\n", action, id) - } - - if action == "insert" { - insertCount++ - } else if action == "update" { - updateCount++ - } - totalCount = insertCount + updateCount - - //fmt.Printf("Inserts: %d, Updates: %d, Total: %d\n", insertCount, updateCount, totalCount) - if totalCount%1000 == 0 { - fmt.Printf( - "Inserts: %d, Updates: %d, Total: %d; estimatedCalls = I: %d U: %d N: %d; recordedCalls = I: %d U: %d N: %d\n", - insertCount, - updateCount, - totalCount, - estimatedCallInsertCount, - estimatedCallUpdateCount, - estimatedCallNoneCount, - recordedCallInsertCount, - recordedCallUpdateCount, - recordedCallNoneCount, - ) - } - } - - for _, estimatedCall := range journey.EstimatedCalls { - for _, call := range estimatedCall.EstimatedCall { - var estimatedValues []interface{} - - //1 estimatedvehiclejourney - estimatedValues = append(estimatedValues, id) - //2 order - estimatedValues = append(estimatedValues, call.Order) - //3 stoppointref - estimatedValues = append(estimatedValues, call.StopPointRef) - //4 aimeddeparturetime - estimatedValues = append(estimatedValues, call.AimedDepartureTime) - //5 expecteddeparturetime - estimatedValues = append(estimatedValues, call.ExpectedDepartureTime) - //6 aimedarrivaltime - estimatedValues = append(estimatedValues, call.AimedArrivalTime) - //7 expectedarrivaltime - estimatedValues = append(estimatedValues, call.ExpectedArrivalTime) - //8 cancellation - estimatedValues = append(estimatedValues, call.Cancellation) - - //9 estimated_data (JSON) - estimatedJsonObject := make(map[string]interface{}) - // data allrady loged - if call.ExpectedDepartureTime != "" { - estimatedJsonObject["ExpectedDepartureTime"] = call.ExpectedDepartureTime - } - if call.ExpectedArrivalTime != "" { - estimatedJsonObject["ExpectedArrivalTime"] = call.ExpectedArrivalTime - } - if call.Cancellation != "" { - estimatedJsonObject["Cancellation"] = call.Cancellation - } - // The rest - if call.StopPointName != "" { - estimatedJsonObject["StopPointName"] = call.StopPointName - } - if call.RequestStop != "" { - estimatedJsonObject["RequestStop"] = call.RequestStop - } - if call.DepartureStatus != "" { - estimatedJsonObject["DepartureStatus"] = call.DepartureStatus - } - if call.DeparturePlatformName != "" { - estimatedJsonObject["DeparturePlatformName"] = call.DeparturePlatformName - } - if call.DepartureBoardingActivity != "" { - estimatedJsonObject["DepartureBoardingActivity"] = call.DepartureBoardingActivity - } - if call.DepartureStopAssignment.AimedQuayRef != "" { - estimatedJsonObject["DepartureStopAssignment.AimedQuayRef"] = call.DepartureStopAssignment.AimedQuayRef - } - if call.DepartureStopAssignment.ExpectedQuayRef != "" { - estimatedJsonObject["DepartureStopAssignment.ExpectedQuayRef"] = call.DepartureStopAssignment.ExpectedQuayRef - } - if call.DepartureStopAssignment.ActualQuayRef != "" { - estimatedJsonObject["DepartureStopAssignment.ActualQuayRef"] = call.DepartureStopAssignment.ActualQuayRef - } - if call.Extensions.StopsAtAirport != "" { - estimatedJsonObject["Extensions.StopsAtAirport"] = call.Extensions.StopsAtAirport - } - if call.ArrivalStatus != "" { - estimatedJsonObject["ArrivalStatus"] = call.ArrivalStatus - } - if call.ArrivalPlatformName != "" { - estimatedJsonObject["ArrivalPlatformName"] = call.ArrivalPlatformName - } - if call.ArrivalBoardingActivity != "" { - estimatedJsonObject["ArrivalBoardingActivity"] = call.ArrivalBoardingActivity - } - if call.ArrivalStopAssignment.AimedQuayRef != "" { - estimatedJsonObject["ArrivalStopAssignment.AimedQuayRef"] = call.ArrivalStopAssignment.AimedQuayRef - } - if call.ArrivalStopAssignment.ExpectedQuayRef != "" { - estimatedJsonObject["ArrivalStopAssignment.ExpectedQuayRef"] = call.ArrivalStopAssignment.ExpectedQuayRef - } - if call.ArrivalStopAssignment.ActualQuayRef != "" { - estimatedJsonObject["ArrivalStopAssignment.ActualQuayRef"] = call.ArrivalStopAssignment.ActualQuayRef - } - if call.CallNote != "" { - estimatedJsonObject["CallNote"] = call.CallNote - } - if call.DestinationDisplay != "" { - estimatedJsonObject["DestinationDisplay"] = call.DestinationDisplay - } - if call.ExpectedDeparturePredictionQuality.PredictionLevel != "" { - estimatedJsonObject["ExpectedDeparturePredictionQuality.PredictionLevel"] = call.ExpectedDeparturePredictionQuality.PredictionLevel - } - if call.ExpectedArrivalPredictionQuality.PredictionLevel != "" { - estimatedJsonObject["ExpectedArrivalPredictionQuality.PredictionLevel"] = call.ExpectedArrivalPredictionQuality.PredictionLevel - } - if call.TimingPoint != "" { - estimatedJsonObject["TimingPoint"] = call.TimingPoint - } - if call.SituationRef != "" { - estimatedJsonObject["SituationRef"] = call.SituationRef - } - if call.PredictionInaccurate != "" { - estimatedJsonObject["PredictionInaccurate"] = call.PredictionInaccurate - } - if call.Occupancy != "" { - estimatedJsonObject["Occupancy"] = call.Occupancy - } - - // Convert the JSON object to a JSON string - jsonString, err := json.Marshal(estimatedJsonObject) - if err != nil { - log.Fatal(err) - } - estimatedValues = append(estimatedValues, string(jsonString)) - - // Insert or update the record - stringValues := make([]string, len(estimatedValues)) - for i, v := range estimatedValues { - stringValues[i] = fmt.Sprintf("%v", v) - } - interfaceValues := make([]interface{}, len(stringValues)) - for i, v := range stringValues { - interfaceValues[i] = v - } - id, action, err := database.InsertOrUpdateEstimatedCall(ctx, db, interfaceValues, valkeyClient) - if err != nil { - log.Fatalf("Failed to insert or update estimated call: %v", err) - } else { - if 1 == 0 { - fmt.Printf("Action: %s, ID: %d\n", action, id) - } - - if action == "insert" { - estimatedCallInsertCount++ - } else if action == "update" { - estimatedCallUpdateCount++ - } else if action == "none" { - estimatedCallNoneCount++ - } - } - } - } - for _, recordedCall := range journey.RecordedCalls { - for _, call := range recordedCall.RecordedCall { - var recordedValues []interface{} - - //1 estimatedvehiclejourney - recordedValues = append(recordedValues, id) - //2 order - recordedValues = append(recordedValues, call.Order) - //3 stoppointref - recordedValues = append(recordedValues, call.StopPointRef) - //4 aimeddeparturetime - recordedValues = append(recordedValues, call.AimedDepartureTime) - //5 expecteddeparturetime - recordedValues = append(recordedValues, call.ExpectedDepartureTime) - //6 aimedarrivaltime - recordedValues = append(recordedValues, call.AimedArrivalTime) - //7 expectedarrivaltime - recordedValues = append(recordedValues, call.ExpectedArrivalTime) - //8 cancellation - recordedValues = append(recordedValues, call.Cancellation) - //9 actualdeparturetime - recordedValues = append(recordedValues, call.ActualDepartureTime) - //10 actualarrivaltime - recordedValues = append(recordedValues, call.ActualArrivalTime) - - //11 recorded_data (JSON) - recordedJsonObject := make(map[string]interface{}) - if call.StopPointName != "" { - recordedJsonObject["StopPointName"] = call.StopPointName - } - if call.ArrivalPlatformName != "" { - recordedJsonObject["ArrivalPlatformName"] = call.ArrivalPlatformName - } - if call.DeparturePlatformName != "" { - recordedJsonObject["DeparturePlatformName"] = call.DeparturePlatformName - } - if call.PredictionInaccurate != "" { - recordedJsonObject["PredictionInaccurate"] = call.PredictionInaccurate - } - if call.Occupancy != "" { - recordedJsonObject["Occupancy"] = call.Occupancy - } - - // Convert the JSON object to a JSON string - jsonString, err := json.Marshal(recordedJsonObject) - if err != nil { - log.Fatal(err) - } - recordedValues = append(recordedValues, string(jsonString)) - - // Insert or update the record - stringValues := make([]string, len(recordedValues)) - for i, v := range recordedValues { - stringValues[i] = fmt.Sprintf("%v", v) - } - interfaceValues := make([]interface{}, len(stringValues)) - for i, v := range stringValues { - interfaceValues[i] = v - } - - id, action, err := database.InsertOrUpdateRecordedCall(ctx, db, interfaceValues, valkeyClient) - if err != nil { - fmt.Printf("Error inserting/updating recorded call: %v\n", err) - } else { - if 1 == 0 { - fmt.Printf("Action: %s, ID: %d\n", action, id) - } - - if action == "insert" { - recordedCallInsertCount++ - //fmt.Printf("Action: %s, ID: %d\n", action, id) - } else if action == "update" { - recordedCallUpdateCount++ - } else if action == "none" { - recordedCallNoneCount++ - } - } - } - } + journeys := data.ServiceDelivery.EstimatedTimetableDelivery[0].EstimatedJourneyVersionFrame.EstimatedVehicleJourney + totalJourneys := len(journeys) + fmt.Printf("Processing %d journeys...\n", totalJourneys) + // Job structures + type evjJob struct { + index int } + + type callJob struct { + evjID int + values []interface{} + } + + // Channels + workerCount := 20 // Adjust based on your database and CPU + evjJobs := make(chan evjJob, workerCount*2) + estimatedCallJobs := make(chan callJob, workerCount*10) + recordedCallJobs := make(chan callJob, workerCount*10) + + var wg sync.WaitGroup + var callWg sync.WaitGroup + + // Start Estimated Call workers + for w := 0; w < workerCount; w++ { + callWg.Add(1) + go func() { + defer callWg.Done() + for job := range estimatedCallJobs { + id, action, err := database.InsertOrUpdateEstimatedCall(ctx, db, job.values, valkeyClient) + if err != nil { + log.Printf("Error inserting/updating estimated call: %v\n", err) + continue + } + if action == "insert" { + atomic.AddInt64(&estimatedCallInsertCount, 1) + } else if action == "update" { + atomic.AddInt64(&estimatedCallUpdateCount, 1) + } else if action == "none" { + atomic.AddInt64(&estimatedCallNoneCount, 1) + } + _ = id + } + }() + } + + // Start Recorded Call workers + for w := 0; w < workerCount; w++ { + callWg.Add(1) + go func() { + defer callWg.Done() + for job := range recordedCallJobs { + id, action, err := database.InsertOrUpdateRecordedCall(ctx, db, job.values, valkeyClient) + if err != nil { + log.Printf("Error inserting/updating recorded call: %v\n", err) + continue + } + if action == "insert" { + atomic.AddInt64(&recordedCallInsertCount, 1) + } else if action == "update" { + atomic.AddInt64(&recordedCallUpdateCount, 1) + } else if action == "none" { + atomic.AddInt64(&recordedCallNoneCount, 1) + } + _ = id + } + }() + } + + // Start EVJ workers + for w := 0; w < workerCount; w++ { + wg.Add(1) + go func() { + defer wg.Done() + for job := range evjJobs { + journey := &journeys[job.index] + + // Prepare values + var values []interface{} + var datedVehicleJourneyRef, otherJson string + + values = append(values, sid) + values = append(values, journey.RecordedAtTime) + values = append(values, journey.LineRef) + values = append(values, strings.ToLower(journey.DirectionRef)) + values = append(values, journey.DataSource) + + if journey.FramedVehicleJourneyRef.DatedVehicleJourneyRef != "" { + datedVehicleJourneyRef = journey.FramedVehicleJourneyRef.DatedVehicleJourneyRef + } else if journey.DatedVehicleJourneyRef != "" { + datedVehicleJourneyRef = journey.DatedVehicleJourneyRef + } else { + datedVehicleJourneyRef = "evj." + journey.EstimatedVehicleJourneyCode + } + values = append(values, datedVehicleJourneyRef) + + values = append(values, journey.VehicleMode) + values = append(values, journey.FramedVehicleJourneyRef.DataFrameRef) + values = append(values, journey.OriginRef) + values = append(values, journey.DestinationRef) + values = append(values, journey.OperatorRef) + values = append(values, journey.VehicleRef) + values = append(values, journey.Cancellation) + + // Create JSON object + jsonObject := make(map[string]interface{}) + if journey.OriginName != "" { + jsonObject["OriginName"] = journey.OriginName + } + if journey.DestinationName != "" { + jsonObject["DestinationName"] = journey.DestinationName + } + if journey.ProductCategoryRef != "" { + jsonObject["ProductCategoryRef"] = journey.ProductCategoryRef + } + if journey.ServiceFeatureRef != "" { + jsonObject["ServiceFeatureRef"] = journey.ServiceFeatureRef + } + if journey.Monitored != "" { + jsonObject["Monitored"] = journey.Monitored + } + if journey.JourneyPatternRef != "" { + jsonObject["JourneyPatternRef"] = journey.JourneyPatternRef + } + if journey.JourneyPatternName != "" { + jsonObject["JourneyPatternName"] = journey.JourneyPatternName + } + if journey.PublishedLineName != "" { + jsonObject["PublishedLineName"] = journey.PublishedLineName + } + if journey.DirectionName != "" { + jsonObject["DirectionName"] = journey.DirectionName + } + if journey.OriginAimedDepartureTime != "" { + jsonObject["OriginAimedDepartureTime"] = journey.OriginAimedDepartureTime + } + if journey.DestinationAimedArrivalTime != "" { + jsonObject["DestinationAimedArrivalTime"] = journey.DestinationAimedArrivalTime + } + if journey.BlockRef != "" { + jsonObject["BlockRef"] = journey.BlockRef + } + if journey.VehicleJourneyRef != "" { + jsonObject["VehicleJourneyRef"] = journey.VehicleJourneyRef + } + if journey.Occupancy != "" { + jsonObject["Occupancy"] = journey.Occupancy + } + if journey.DestinationDisplayAtOrigin != "" { + jsonObject["DestinationDisplayAtOrigin"] = journey.DestinationDisplayAtOrigin + } + if journey.ExtraJourney != "" { + jsonObject["ExtraJourney"] = journey.ExtraJourney + } + if journey.RouteRef != "" { + jsonObject["RouteRef"] = journey.RouteRef + } + if journey.GroupOfLinesRef != "" { + jsonObject["GroupOfLinesRef"] = journey.GroupOfLinesRef + } + if journey.ExternalLineRef != "" { + jsonObject["ExternalLineRef"] = journey.ExternalLineRef + } + if journey.InCongestion != "" { + jsonObject["InCongestion"] = journey.InCongestion + } + if journey.PredictionInaccurate != "" { + jsonObject["PredictionInaccurate"] = journey.PredictionInaccurate + } + if journey.JourneyNote != "" { + jsonObject["JourneyNote"] = journey.JourneyNote + } + if journey.Via.PlaceName != "" { + jsonObject["Via"] = journey.Via.PlaceName + } + + jsonString, err := json.Marshal(jsonObject) + if err != nil { + log.Printf("Error marshaling JSON: %v\n", err) + continue + } + otherJson = string(jsonString) + values = append(values, otherJson) + + // Insert or update EVJ + id, action, err := database.InsertOrUpdateEstimatedVehicleJourney(db, values) + if err != nil { + log.Printf("Error inserting/updating estimated vehicle journey: %v\n", err) + continue + } + + if action == "insert" { + atomic.AddInt64(&insertCount, 1) + } else if action == "update" { + atomic.AddInt64(&updateCount, 1) + } + + // Progress reporting + total := atomic.AddInt64(&insertCount, 0) + atomic.AddInt64(&updateCount, 0) + if total%1000 == 0 { + fmt.Printf( + "EVJ - I: %d, U: %d, Total: %d; EstCalls - I: %d U: %d N: %d; RecCalls - I: %d U: %d N: %d\n", + atomic.LoadInt64(&insertCount), + atomic.LoadInt64(&updateCount), + total, + atomic.LoadInt64(&estimatedCallInsertCount), + atomic.LoadInt64(&estimatedCallUpdateCount), + atomic.LoadInt64(&estimatedCallNoneCount), + atomic.LoadInt64(&recordedCallInsertCount), + atomic.LoadInt64(&recordedCallUpdateCount), + atomic.LoadInt64(&recordedCallNoneCount), + ) + } + + // Process Estimated Calls + for _, estimatedCall := range journey.EstimatedCalls { + for _, call := range estimatedCall.EstimatedCall { + var estimatedValues []interface{} + + estimatedValues = append(estimatedValues, id) + estimatedValues = append(estimatedValues, call.Order) + estimatedValues = append(estimatedValues, call.StopPointRef) + estimatedValues = append(estimatedValues, call.AimedDepartureTime) + estimatedValues = append(estimatedValues, call.ExpectedDepartureTime) + estimatedValues = append(estimatedValues, call.AimedArrivalTime) + estimatedValues = append(estimatedValues, call.ExpectedArrivalTime) + estimatedValues = append(estimatedValues, call.Cancellation) + + // estimated_data JSON + estimatedJsonObject := make(map[string]interface{}) + if call.ExpectedDepartureTime != "" { + estimatedJsonObject["ExpectedDepartureTime"] = call.ExpectedDepartureTime + } + if call.ExpectedArrivalTime != "" { + estimatedJsonObject["ExpectedArrivalTime"] = call.ExpectedArrivalTime + } + if call.Cancellation != "" { + estimatedJsonObject["Cancellation"] = call.Cancellation + } + if call.StopPointName != "" { + estimatedJsonObject["StopPointName"] = call.StopPointName + } + if call.RequestStop != "" { + estimatedJsonObject["RequestStop"] = call.RequestStop + } + if call.DepartureStatus != "" { + estimatedJsonObject["DepartureStatus"] = call.DepartureStatus + } + if call.DeparturePlatformName != "" { + estimatedJsonObject["DeparturePlatformName"] = call.DeparturePlatformName + } + if call.DepartureBoardingActivity != "" { + estimatedJsonObject["DepartureBoardingActivity"] = call.DepartureBoardingActivity + } + if call.DepartureStopAssignment.AimedQuayRef != "" { + estimatedJsonObject["DepartureStopAssignment.AimedQuayRef"] = call.DepartureStopAssignment.AimedQuayRef + } + if call.DepartureStopAssignment.ExpectedQuayRef != "" { + estimatedJsonObject["DepartureStopAssignment.ExpectedQuayRef"] = call.DepartureStopAssignment.ExpectedQuayRef + } + if call.DepartureStopAssignment.ActualQuayRef != "" { + estimatedJsonObject["DepartureStopAssignment.ActualQuayRef"] = call.DepartureStopAssignment.ActualQuayRef + } + if call.Extensions.StopsAtAirport != "" { + estimatedJsonObject["Extensions.StopsAtAirport"] = call.Extensions.StopsAtAirport + } + if call.ArrivalStatus != "" { + estimatedJsonObject["ArrivalStatus"] = call.ArrivalStatus + } + if call.ArrivalPlatformName != "" { + estimatedJsonObject["ArrivalPlatformName"] = call.ArrivalPlatformName + } + if call.ArrivalBoardingActivity != "" { + estimatedJsonObject["ArrivalBoardingActivity"] = call.ArrivalBoardingActivity + } + if call.ArrivalStopAssignment.AimedQuayRef != "" { + estimatedJsonObject["ArrivalStopAssignment.AimedQuayRef"] = call.ArrivalStopAssignment.AimedQuayRef + } + if call.ArrivalStopAssignment.ExpectedQuayRef != "" { + estimatedJsonObject["ArrivalStopAssignment.ExpectedQuayRef"] = call.ArrivalStopAssignment.ExpectedQuayRef + } + if call.ArrivalStopAssignment.ActualQuayRef != "" { + estimatedJsonObject["ArrivalStopAssignment.ActualQuayRef"] = call.ArrivalStopAssignment.ActualQuayRef + } + if call.CallNote != "" { + estimatedJsonObject["CallNote"] = call.CallNote + } + if call.DestinationDisplay != "" { + estimatedJsonObject["DestinationDisplay"] = call.DestinationDisplay + } + if call.ExpectedDeparturePredictionQuality.PredictionLevel != "" { + estimatedJsonObject["ExpectedDeparturePredictionQuality.PredictionLevel"] = call.ExpectedDeparturePredictionQuality.PredictionLevel + } + if call.ExpectedArrivalPredictionQuality.PredictionLevel != "" { + estimatedJsonObject["ExpectedArrivalPredictionQuality.PredictionLevel"] = call.ExpectedArrivalPredictionQuality.PredictionLevel + } + if call.TimingPoint != "" { + estimatedJsonObject["TimingPoint"] = call.TimingPoint + } + if call.SituationRef != "" { + estimatedJsonObject["SituationRef"] = call.SituationRef + } + if call.PredictionInaccurate != "" { + estimatedJsonObject["PredictionInaccurate"] = call.PredictionInaccurate + } + if call.Occupancy != "" { + estimatedJsonObject["Occupancy"] = call.Occupancy + } + + jsonString, err := json.Marshal(estimatedJsonObject) + if err != nil { + log.Printf("Error marshaling estimated call JSON: %v\n", err) + continue + } + estimatedValues = append(estimatedValues, string(jsonString)) + + // Convert to string values + interfaceValues := make([]interface{}, len(estimatedValues)) + for i, v := range estimatedValues { + interfaceValues[i] = fmt.Sprintf("%v", v) + } + + // Send to worker pool + estimatedCallJobs <- callJob{evjID: id, values: interfaceValues} + } + } + + // Process Recorded Calls + for _, recordedCall := range journey.RecordedCalls { + for _, call := range recordedCall.RecordedCall { + var recordedValues []interface{} + + recordedValues = append(recordedValues, id) + recordedValues = append(recordedValues, call.Order) + recordedValues = append(recordedValues, call.StopPointRef) + recordedValues = append(recordedValues, call.AimedDepartureTime) + recordedValues = append(recordedValues, call.ExpectedDepartureTime) + recordedValues = append(recordedValues, call.AimedArrivalTime) + recordedValues = append(recordedValues, call.ExpectedArrivalTime) + recordedValues = append(recordedValues, call.Cancellation) + recordedValues = append(recordedValues, call.ActualDepartureTime) + recordedValues = append(recordedValues, call.ActualArrivalTime) + + // recorded_data JSON + recordedJsonObject := make(map[string]interface{}) + if call.StopPointName != "" { + recordedJsonObject["StopPointName"] = call.StopPointName + } + if call.ArrivalPlatformName != "" { + recordedJsonObject["ArrivalPlatformName"] = call.ArrivalPlatformName + } + if call.DeparturePlatformName != "" { + recordedJsonObject["DeparturePlatformName"] = call.DeparturePlatformName + } + if call.PredictionInaccurate != "" { + recordedJsonObject["PredictionInaccurate"] = call.PredictionInaccurate + } + if call.Occupancy != "" { + recordedJsonObject["Occupancy"] = call.Occupancy + } + + jsonString, err := json.Marshal(recordedJsonObject) + if err != nil { + log.Printf("Error marshaling recorded call JSON: %v\n", err) + continue + } + recordedValues = append(recordedValues, string(jsonString)) + + // Convert to string values + interfaceValues := make([]interface{}, len(recordedValues)) + for i, v := range recordedValues { + interfaceValues[i] = fmt.Sprintf("%v", v) + } + + // Send to worker pool + recordedCallJobs <- callJob{evjID: id, values: interfaceValues} + } + } + } + }() + } + + // Send all EVJ jobs + for i := range journeys { + evjJobs <- evjJob{index: i} + } + close(evjJobs) + + // Wait for EVJ processing to complete + wg.Wait() + + // Close call job channels and wait for call processing to complete + close(estimatedCallJobs) + close(recordedCallJobs) + callWg.Wait() + + // Print final stats fmt.Printf( - "DONE: Inserts: %d, Updates: %d, Total: %d; estimatedCalls = I: %d U: %d N: %d; recordedCalls = I: %d U: %d N: %d\n", - insertCount, - updateCount, - totalCount, - estimatedCallInsertCount, - estimatedCallUpdateCount, - estimatedCallNoneCount, - recordedCallInsertCount, - recordedCallUpdateCount, - recordedCallNoneCount, + "\nDONE: EVJ - Inserts: %d, Updates: %d, Total: %d\n"+ + " EstimatedCalls - I: %d U: %d N: %d\n"+ + " RecordedCalls - I: %d U: %d N: %d\n", + atomic.LoadInt64(&insertCount), + atomic.LoadInt64(&updateCount), + atomic.LoadInt64(&insertCount)+atomic.LoadInt64(&updateCount), + atomic.LoadInt64(&estimatedCallInsertCount), + atomic.LoadInt64(&estimatedCallUpdateCount), + atomic.LoadInt64(&estimatedCallNoneCount), + atomic.LoadInt64(&recordedCallInsertCount), + atomic.LoadInt64(&recordedCallUpdateCount), + atomic.LoadInt64(&recordedCallNoneCount), ) + // Create map to hold JSON serviceDeliveryJsonObject := make(map[string]interface{}) - - // Add fields to JSON - serviceDeliveryJsonObject["Inserts"] = insertCount - serviceDeliveryJsonObject["Updates"] = updateCount - serviceDeliveryJsonObject["EstimatedCallInserts"] = estimatedCallInsertCount - serviceDeliveryJsonObject["EstimatedCallUpdates"] = estimatedCallUpdateCount - serviceDeliveryJsonObject["EstimatedCallNone"] = estimatedCallNoneCount - serviceDeliveryJsonObject["RecordedCallInserts"] = recordedCallInsertCount - serviceDeliveryJsonObject["RecordedCallUpdates"] = recordedCallUpdateCount - serviceDeliveryJsonObject["RecordedCallNone"] = recordedCallNoneCount + serviceDeliveryJsonObject["Inserts"] = atomic.LoadInt64(&insertCount) + serviceDeliveryJsonObject["Updates"] = atomic.LoadInt64(&updateCount) + serviceDeliveryJsonObject["EstimatedCallInserts"] = atomic.LoadInt64(&estimatedCallInsertCount) + serviceDeliveryJsonObject["EstimatedCallUpdates"] = atomic.LoadInt64(&estimatedCallUpdateCount) + serviceDeliveryJsonObject["EstimatedCallNone"] = atomic.LoadInt64(&estimatedCallNoneCount) + serviceDeliveryJsonObject["RecordedCallInserts"] = atomic.LoadInt64(&recordedCallInsertCount) + serviceDeliveryJsonObject["RecordedCallUpdates"] = atomic.LoadInt64(&recordedCallUpdateCount) + serviceDeliveryJsonObject["RecordedCallNone"] = atomic.LoadInt64(&recordedCallNoneCount) // Convert JSON object to JSON string serviceDeliveryJsonString, err := json.Marshal(serviceDeliveryJsonObject) @@ -438,4 +482,6 @@ func DBData(data *data.Data) { if err != nil { log.Fatal(err) } + + fmt.Println("Finished with this ServiceDelivery!") } diff --git a/main.go b/main.go index a540855..cdd201a 100644 --- a/main.go +++ b/main.go @@ -9,7 +9,7 @@ import ( ) func main() { - log.Println("ti1 v0.2.1") + log.Println("ti1 testing v1.0.0") log.Println("Starting...") // Setup the database