From 7424600a8b7bd65972a40ba4213936954ca0a129 Mon Sep 17 00:00:00 2001 From: pigwin Date: Tue, 23 Dec 2025 13:24:20 +0000 Subject: [PATCH] worth a shot --- .github/workflows/docker-image.yml | 6 +- README.md | 2 +- config/conf.json | 4 +- config/db.go | 9 +- database/EstimatedCall.go | 69 ++- database/EstimatedVehicleJourney.go | 140 ++++- database/RecordedCall.go | 60 ++- database/ServiceDeliveryDB.go | 11 +- export/database.go | 810 +++++++++++++++------------- 9 files changed, 658 insertions(+), 453 deletions(-) diff --git a/.github/workflows/docker-image.yml b/.github/workflows/docker-image.yml index 7473f71..92a423b 100644 --- a/.github/workflows/docker-image.yml +++ b/.github/workflows/docker-image.yml @@ -48,10 +48,10 @@ jobs: - name: Push Docker image run: | - # If on v1.0 branch, push to 'testing.v1.0' tag + # If on v1.0 branch, push to 'testingv1.0' tag if [[ "${{ github.ref }}" == "refs/heads/v1.0" ]]; then - docker tag ti1:${{ env.VERSION }} ${{ secrets.DOCKER_USERNAME }}/ti1:testing-v1.0 - docker push ${{ secrets.DOCKER_USERNAME }}/ti1:testing.v1.0 + docker tag ti1:${{ env.VERSION }} ${{ secrets.DOCKER_USERNAME }}/ti1:testingv1.0 + docker push ${{ secrets.DOCKER_USERNAME }}/ti1:testingv1.0 else # Always push to 'dev' tag for main and dev branches docker tag ti1:${{ env.VERSION }} ${{ secrets.DOCKER_USERNAME }}/ti1:dev diff --git a/README.md b/README.md index b9f20e4..51704eb 100644 --- a/README.md +++ b/README.md @@ -138,7 +138,7 @@ nano postgres_data/postgresql.conf Change the following values ```conf listen_addresses = '*' -max_connections = 100 +max_connections = 200 shared_buffers = 16GB work_mem = 256MB maintenance_work_mem = 2GB diff --git a/config/conf.json b/config/conf.json index 59041f1..5ce5c90 100644 --- a/config/conf.json +++ b/config/conf.json @@ -10,8 +10,8 @@ "valkey": { "host": "127.0.0.1", "port": "6379", - "max_conns": 50, - "timeout_ms": 5000, + "max_conns": 110, + "timeout_ms": 2000, "password": "the_valkey_password" }, "temp": "value" diff --git a/config/db.go b/config/db.go index 51b8ef1..2197db1 100644 --- a/config/db.go +++ b/config/db.go @@ -27,10 +27,11 @@ func ConnectToPostgreSQL() (*sql.DB, error) { return nil, err } - // Set connection pool settings - db.SetMaxOpenConns(25) // Maximum number of open connections to the database - db.SetMaxIdleConns(25) // Maximum number of connections in the idle connection pool - db.SetConnMaxLifetime(1 * time.Hour) // Maximum amount of time a connection may be reused + // Set connection pool settings for high concurrency + db.SetMaxOpenConns(100) // Maximum number of open connections to the database + db.SetMaxIdleConns(50) // Maximum number of connections in the idle connection pool + db.SetConnMaxLifetime(1 * time.Hour) // Maximum amount of time a connection may be reused + db.SetConnMaxIdleTime(5 * time.Minute) // Maximum amount of time a connection may be idle fmt.Println("Connection to PostgreSQL opened successfully :D") diff --git a/database/EstimatedCall.go b/database/EstimatedCall.go index 0321ca7..d390f05 100644 --- a/database/EstimatedCall.go +++ b/database/EstimatedCall.go @@ -6,11 +6,18 @@ import ( "database/sql" "encoding/hex" "fmt" + "sync" "ti1/valki" "github.com/valkey-io/valkey-go" ) +type CallResult struct { + ID int + Action string + Error error +} + func InsertOrUpdateEstimatedCall(ctx context.Context, db *sql.DB, values []interface{}, valkeyClient valkey.Client) (int, string, error) { // Replace empty strings with nil for timestamp fields for i, v := range values { @@ -28,19 +35,15 @@ func InsertOrUpdateEstimatedCall(ctx context.Context, db *sql.DB, values []inter } hash := md5.Sum([]byte(valuesString)) hashString := hex.EncodeToString(hash[:]) - //fmt.Println("HashString:", hashString) estimatedVehicleJourneyID := values[0] orderID := values[1] key := fmt.Sprintf("%v.%v", estimatedVehicleJourneyID, orderID) - //fmt.Printf("Estimated Vehicle Journey ID: %v, Order ID: %v\n", estimatedVehicleJourneyID, orderID) - - var err error // Get the MD5 hash from Valkey retrievedHash, err := valki.GetValkeyValue(ctx, valkeyClient, key) if err != nil { - return 0, "", fmt.Errorf("failed to get value from Valkey: %v", err) + return 0, "", fmt.Errorf("failed to get value from Valkey: %w", err) } // Check if the retrieved value matches the original MD5 hash @@ -64,26 +67,60 @@ func InsertOrUpdateEstimatedCall(ctx context.Context, db *sql.DB, values []inter estimated_data = EXCLUDED.estimated_data RETURNING CASE WHEN xmax = 0 THEN 'insert' ELSE 'update' END, id; ` - stmt, err := db.Prepare(query) - if err != nil { - return 0, "", fmt.Errorf("error preparing statement: %v", err) - } - defer stmt.Close() err = valki.SetValkeyValue(ctx, valkeyClient, key, hashString) if err != nil { - return 0, "", fmt.Errorf("failed to set value in Valkey: %v", err) + return 0, "", fmt.Errorf("failed to set value in Valkey: %w", err) } var action string var id int - err = stmt.QueryRow(values...).Scan(&action, &id) + err = db.QueryRowContext(ctx, query, values...).Scan(&action, &id) if err != nil { - return 0, "", fmt.Errorf("error executing statement: %v", err) + return 0, "", fmt.Errorf("error executing statement: %w", err) } return id, action, nil - } else { - //fmt.Printf("MATCH!!! Original Hash: %s, Retrieved Hash: %s\n", hashString, retrievedHash) - return 0, "none", nil } + return 0, "none", nil +} + +// BatchInsertEstimatedCalls processes multiple estimated calls concurrently +func BatchInsertEstimatedCalls(ctx context.Context, db *sql.DB, batch [][]interface{}, valkeyClient valkey.Client, workerCount int) ([]CallResult, error) { + if len(batch) == 0 { + return nil, nil + } + + results := make([]CallResult, len(batch)) + jobs := make(chan int, len(batch)) + var wg sync.WaitGroup + + // Start workers + for w := 0; w < workerCount; w++ { + wg.Add(1) + go func() { + defer wg.Done() + for idx := range jobs { + select { + case <-ctx.Done(): + return + default: + id, action, err := InsertOrUpdateEstimatedCall(ctx, db, batch[idx], valkeyClient) + results[idx] = CallResult{ + ID: id, + Action: action, + Error: err, + } + } + } + }() + } + + // Send jobs + for i := range batch { + jobs <- i + } + close(jobs) + + wg.Wait() + return results, nil } diff --git a/database/EstimatedVehicleJourney.go b/database/EstimatedVehicleJourney.go index 4efa54a..02bc217 100644 --- a/database/EstimatedVehicleJourney.go +++ b/database/EstimatedVehicleJourney.go @@ -1,41 +1,135 @@ package database import ( + "context" "database/sql" "fmt" + "sync" ) +type EVJResult struct { + ID int + Action string + Error error + Index int // To maintain order +} + +// PreparedStatements holds reusable prepared statements +type PreparedStatements struct { + evjStmt *sql.Stmt + ecStmt *sql.Stmt + rcStmt *sql.Stmt + mu sync.Mutex +} + +func NewPreparedStatements(db *sql.DB) (*PreparedStatements, error) { + evjQuery := ` + INSERT INTO estimatedvehiclejourney (servicedelivery, recordedattime, lineref, directionref, datasource, datedvehiclejourneyref, vehiclemode, dataframeref, originref, destinationref, operatorref, vehicleref, cancellation, other, firstservicedelivery) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $1) + ON CONFLICT (lineref, directionref, datasource, datedvehiclejourneyref) + DO UPDATE SET + servicedelivery = EXCLUDED.servicedelivery, + recordedattime = EXCLUDED.recordedattime, + vehiclemode = COALESCE(EXCLUDED.vehiclemode, estimatedvehiclejourney.vehiclemode), + dataframeref = COALESCE(EXCLUDED.dataframeref, estimatedvehiclejourney.dataframeref), + originref = COALESCE(EXCLUDED.originref, estimatedvehiclejourney.originref), + destinationref = COALESCE(EXCLUDED.destinationref, estimatedvehiclejourney.destinationref), + operatorref = COALESCE(EXCLUDED.operatorref, estimatedvehiclejourney.operatorref), + vehicleref = COALESCE(EXCLUDED.vehicleref, estimatedvehiclejourney.vehicleref), + cancellation = COALESCE(EXCLUDED.cancellation, estimatedvehiclejourney.cancellation), + other = COALESCE(EXCLUDED.other, estimatedvehiclejourney.other) + RETURNING CASE WHEN xmax = 0 THEN 'insert' ELSE 'update' END, id; + ` + + evjStmt, err := db.Prepare(evjQuery) + if err != nil { + return nil, fmt.Errorf("failed to prepare EVJ statement: %w", err) + } + + return &PreparedStatements{ + evjStmt: evjStmt, + }, nil +} + +func (ps *PreparedStatements) Close() { + if ps.evjStmt != nil { + ps.evjStmt.Close() + } + if ps.ecStmt != nil { + ps.ecStmt.Close() + } + if ps.rcStmt != nil { + ps.rcStmt.Close() + } +} + func InsertOrUpdateEstimatedVehicleJourney(db *sql.DB, values []interface{}) (int, string, error) { query := ` - INSERT INTO estimatedvehiclejourney (servicedelivery, recordedattime, lineref, directionref, datasource, datedvehiclejourneyref, vehiclemode, dataframeref, originref, destinationref, operatorref, vehicleref, cancellation, other, firstservicedelivery) - VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $1) - ON CONFLICT (lineref, directionref, datasource, datedvehiclejourneyref) - DO UPDATE SET - servicedelivery = EXCLUDED.servicedelivery, - recordedattime = EXCLUDED.recordedattime, - vehiclemode = COALESCE(EXCLUDED.vehiclemode, estimatedvehiclejourney.vehiclemode), - dataframeref = COALESCE(EXCLUDED.dataframeref, estimatedvehiclejourney.dataframeref), - originref = COALESCE(EXCLUDED.originref, estimatedvehiclejourney.originref), - destinationref = COALESCE(EXCLUDED.destinationref, estimatedvehiclejourney.destinationref), - operatorref = COALESCE(EXCLUDED.operatorref, estimatedvehiclejourney.operatorref), - vehicleref = COALESCE(EXCLUDED.vehicleref, estimatedvehiclejourney.vehicleref), - cancellation = COALESCE(EXCLUDED.cancellation, estimatedvehiclejourney.cancellation), - other = COALESCE(EXCLUDED.other, estimatedvehiclejourney.other) - RETURNING CASE WHEN xmax = 0 THEN 'insert' ELSE 'update' END, id; + INSERT INTO estimatedvehiclejourney (servicedelivery, recordedattime, lineref, directionref, datasource, datedvehiclejourneyref, vehiclemode, dataframeref, originref, destinationref, operatorref, vehicleref, cancellation, other, firstservicedelivery) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $1) + ON CONFLICT (lineref, directionref, datasource, datedvehiclejourneyref) + DO UPDATE SET + servicedelivery = EXCLUDED.servicedelivery, + recordedattime = EXCLUDED.recordedattime, + vehiclemode = COALESCE(EXCLUDED.vehiclemode, estimatedvehiclejourney.vehiclemode), + dataframeref = COALESCE(EXCLUDED.dataframeref, estimatedvehiclejourney.dataframeref), + originref = COALESCE(EXCLUDED.originref, estimatedvehiclejourney.originref), + destinationref = COALESCE(EXCLUDED.destinationref, estimatedvehiclejourney.destinationref), + operatorref = COALESCE(EXCLUDED.operatorref, estimatedvehiclejourney.operatorref), + vehicleref = COALESCE(EXCLUDED.vehicleref, estimatedvehiclejourney.vehicleref), + cancellation = COALESCE(EXCLUDED.cancellation, estimatedvehiclejourney.cancellation), + other = COALESCE(EXCLUDED.other, estimatedvehiclejourney.other) + RETURNING CASE WHEN xmax = 0 THEN 'insert' ELSE 'update' END, id; ` - stmt, err := db.Prepare(query) - if err != nil { - return 0, "", fmt.Errorf("error preparing statement: %v", err) - } - defer stmt.Close() - var action string var id int - err = stmt.QueryRow(values...).Scan(&action, &id) + err := db.QueryRow(query, values...).Scan(&action, &id) if err != nil { - return 0, "", fmt.Errorf("error executing statement: %v", err) + return 0, "", fmt.Errorf("error executing EVJ statement: %w", err) } return id, action, nil } + +// BatchInsertEVJ processes multiple EVJ inserts concurrently +func BatchInsertEVJ(ctx context.Context, db *sql.DB, batch [][]interface{}, workerCount int) ([]EVJResult, error) { + if len(batch) == 0 { + return nil, nil + } + + results := make([]EVJResult, len(batch)) + jobs := make(chan int, len(batch)) + var wg sync.WaitGroup + + // Start workers + for w := 0; w < workerCount; w++ { + wg.Add(1) + go func() { + defer wg.Done() + for idx := range jobs { + select { + case <-ctx.Done(): + return + default: + id, action, err := InsertOrUpdateEstimatedVehicleJourney(db, batch[idx]) + results[idx] = EVJResult{ + ID: id, + Action: action, + Error: err, + Index: idx, + } + } + } + }() + } + + // Send jobs + for i := range batch { + jobs <- i + } + close(jobs) + + wg.Wait() + return results, nil +} diff --git a/database/RecordedCall.go b/database/RecordedCall.go index 667236d..3e7845a 100644 --- a/database/RecordedCall.go +++ b/database/RecordedCall.go @@ -6,6 +6,7 @@ import ( "database/sql" "encoding/hex" "fmt" + "sync" "ti1/valki" "github.com/valkey-io/valkey-go" @@ -33,12 +34,10 @@ func InsertOrUpdateRecordedCall(ctx context.Context, db *sql.DB, values []interf orderID := values[1] key := fmt.Sprintf("%v.%v", estimatedVehicleJourneyID, orderID) - var err error - // Get the MD5 hash from Valkey retrievedHash, err := valki.GetValkeyValue(ctx, valkeyClient, key) if err != nil { - return 0, "", fmt.Errorf("failed to get value from Valkey: %v", err) + return 0, "", fmt.Errorf("failed to get value from Valkey: %w", err) } // Check if the retrieved value matches the original MD5 hash @@ -65,25 +64,60 @@ func InsertOrUpdateRecordedCall(ctx context.Context, db *sql.DB, values []interf recorded_data = EXCLUDED.recorded_data RETURNING CASE WHEN xmax = 0 THEN 'insert' ELSE 'update' END, id; ` - stmt, err := db.Prepare(query) - if err != nil { - return 0, "", fmt.Errorf("error preparing statement: %v", err) - } - defer stmt.Close() err = valki.SetValkeyValue(ctx, valkeyClient, key, hashString) if err != nil { - return 0, "", fmt.Errorf("failed to set value in Valkey: %v", err) + return 0, "", fmt.Errorf("failed to set value in Valkey: %w", err) } var action string var id int - err = stmt.QueryRow(values...).Scan(&action, &id) + err = db.QueryRowContext(ctx, query, values...).Scan(&action, &id) if err != nil { - return 0, "", fmt.Errorf("error executing statement: %v", err) + return 0, "", fmt.Errorf("error executing statement: %w", err) } return id, action, nil - } else { - return 0, "none", nil } + return 0, "none", nil +} + +// BatchInsertRecordedCalls processes multiple recorded calls concurrently +func BatchInsertRecordedCalls(ctx context.Context, db *sql.DB, batch [][]interface{}, valkeyClient valkey.Client, workerCount int) ([]CallResult, error) { + if len(batch) == 0 { + return nil, nil + } + + results := make([]CallResult, len(batch)) + jobs := make(chan int, len(batch)) + var wg sync.WaitGroup + + // Start workers + for w := 0; w < workerCount; w++ { + wg.Add(1) + go func() { + defer wg.Done() + for idx := range jobs { + select { + case <-ctx.Done(): + return + default: + id, action, err := InsertOrUpdateRecordedCall(ctx, db, batch[idx], valkeyClient) + results[idx] = CallResult{ + ID: id, + Action: action, + Error: err, + } + } + } + }() + } + + // Send jobs + for i := range batch { + jobs <- i + } + close(jobs) + + wg.Wait() + return results, nil } diff --git a/database/ServiceDeliveryDB.go b/database/ServiceDeliveryDB.go index a3f03e7..82f046f 100644 --- a/database/ServiceDeliveryDB.go +++ b/database/ServiceDeliveryDB.go @@ -6,25 +6,18 @@ import ( ) func InsertServiceDelivery(db *sql.DB, responseTimestamp string, recordedAtTime string) (int, error) { - fmt.Println("Inserting ServiceDelivery...") var id int - err := db.QueryRow("INSERT INTO public.ServiceDelivery (ResponseTimestamp, RecordedAtTime) VALUES ($1, $2) RETURNING ID", responseTimestamp, recordedAtTime).Scan(&id) if err != nil { - fmt.Println(err) - return 0, err + return 0, fmt.Errorf("failed to insert service delivery: %w", err) } - //fmt.Println("ServiceDelivery inserted successfully! (", id, ")") return id, nil } func UpdateServiceDeliveryData(db *sql.DB, id int, data string) error { - fmt.Println("Updating ServiceDelivery data...") _, err := db.Exec("UPDATE public.ServiceDelivery SET Data = $1 WHERE ID = $2", data, id) if err != nil { - fmt.Println(err) - return err + return fmt.Errorf("failed to update service delivery data: %w", err) } - fmt.Println("Finished with this ServiceDelivery!") return nil } diff --git a/export/database.go b/export/database.go index cf817eb..c5871af 100644 --- a/export/database.go +++ b/export/database.go @@ -6,12 +6,20 @@ import ( "fmt" "log" "strings" + "sync" + "sync/atomic" "ti1/config" "ti1/data" "ti1/database" ) +// DBData is the main entry point for data processing func DBData(data *data.Data) { + DBDataOptimized(data) +} + +// DBDataOptimized processes data with concurrent workers for better performance +func DBDataOptimized(data *data.Data) { fmt.Println(data.ServiceDelivery.ResponseTimestamp) fmt.Println(data.ServiceDelivery.EstimatedTimetableDelivery[0].EstimatedJourneyVersionFrame.RecordedAtTime) @@ -37,395 +45,431 @@ func DBData(data *data.Data) { } fmt.Println("SID:", sid) - // counters - var insertCount, updateCount, totalCount, estimatedCallInsertCount, estimatedCallUpdateCount, estimatedCallNoneCount, recordedCallInsertCount, recordedCallUpdateCount, recordedCallNoneCount int + // Atomic counters for thread-safe counting + var insertCount, updateCount, estimatedCallInsertCount, estimatedCallUpdateCount, estimatedCallNoneCount, recordedCallInsertCount, recordedCallUpdateCount, recordedCallNoneCount int64 - for _, journey := range data.ServiceDelivery.EstimatedTimetableDelivery[0].EstimatedJourneyVersionFrame.EstimatedVehicleJourney { - var values []interface{} - var datedVehicleJourneyRef, otherJson string - - values = append(values, sid) - values = append(values, journey.RecordedAtTime) - values = append(values, journey.LineRef) - //had to add to lowercase cus some values vary in case and it was causing duplicates - values = append(values, strings.ToLower(journey.DirectionRef)) - values = append(values, journey.DataSource) - - if journey.FramedVehicleJourneyRef.DatedVehicleJourneyRef != "" { - datedVehicleJourneyRef = journey.FramedVehicleJourneyRef.DatedVehicleJourneyRef - } else if journey.DatedVehicleJourneyRef != "" { - datedVehicleJourneyRef = journey.DatedVehicleJourneyRef - } else { - datedVehicleJourneyRef = "evj." + journey.EstimatedVehicleJourneyCode - } - values = append(values, datedVehicleJourneyRef) - - values = append(values, journey.VehicleMode) - values = append(values, journey.FramedVehicleJourneyRef.DataFrameRef) - values = append(values, journey.OriginRef) - values = append(values, journey.DestinationRef) - values = append(values, journey.OperatorRef) - values = append(values, journey.VehicleRef) - values = append(values, journey.Cancellation) - - // Create a map to hold the JSON object for the current journey - jsonObject := make(map[string]interface{}) - - // Add relevant fields to the JSON object - if journey.OriginName != "" { - jsonObject["OriginName"] = journey.OriginName - } - if journey.DestinationName != "" { - jsonObject["DestinationName"] = journey.DestinationName - } - if journey.ProductCategoryRef != "" { - jsonObject["ProductCategoryRef"] = journey.ProductCategoryRef - } - if journey.ServiceFeatureRef != "" { - jsonObject["ServiceFeatureRef"] = journey.ServiceFeatureRef - } - if journey.Monitored != "" { - jsonObject["Monitored"] = journey.Monitored - } - if journey.JourneyPatternRef != "" { - jsonObject["JourneyPatternRef"] = journey.JourneyPatternRef - } - if journey.JourneyPatternName != "" { - jsonObject["JourneyPatternName"] = journey.JourneyPatternName - } - if journey.PublishedLineName != "" { - jsonObject["PublishedLineName"] = journey.PublishedLineName - } - if journey.DirectionName != "" { - jsonObject["DirectionName"] = journey.DirectionName - } - if journey.OriginAimedDepartureTime != "" { - jsonObject["OriginAimedDepartureTime"] = journey.OriginAimedDepartureTime - } - if journey.DestinationAimedArrivalTime != "" { - jsonObject["DestinationAimedArrivalTime"] = journey.DestinationAimedArrivalTime - } - if journey.BlockRef != "" { - jsonObject["BlockRef"] = journey.BlockRef - } - if journey.VehicleJourneyRef != "" { - jsonObject["VehicleJourneyRef"] = journey.VehicleJourneyRef - } - if journey.Occupancy != "" { - jsonObject["Occupancy"] = journey.Occupancy - } - if journey.DestinationDisplayAtOrigin != "" { - jsonObject["DestinationDisplayAtOrigin"] = journey.DestinationDisplayAtOrigin - } - if journey.ExtraJourney != "" { - jsonObject["ExtraJourney"] = journey.ExtraJourney - } - if journey.RouteRef != "" { - jsonObject["RouteRef"] = journey.RouteRef - } - if journey.GroupOfLinesRef != "" { - jsonObject["GroupOfLinesRef"] = journey.GroupOfLinesRef - } - if journey.ExternalLineRef != "" { - jsonObject["ExternalLineRef"] = journey.ExternalLineRef - } - if journey.InCongestion != "" { - jsonObject["InCongestion"] = journey.InCongestion - } - if journey.PredictionInaccurate != "" { - jsonObject["PredictionInaccurate"] = journey.PredictionInaccurate - } - if journey.JourneyNote != "" { - jsonObject["JourneyNote"] = journey.JourneyNote - } - if journey.Via.PlaceName != "" { - jsonObject["Via"] = journey.Via.PlaceName - } - - // Convert the JSON object to a JSON string - jsonString, err := json.Marshal(jsonObject) - if err != nil { - log.Fatal(err) - } - otherJson = string(jsonString) - values = append(values, otherJson) - - // Insert or update the record - id, action, err := database.InsertOrUpdateEstimatedVehicleJourney(db, values) - if err != nil { - fmt.Printf("Error inserting/updating estimated vehicle journey: %v\n", err) - } else { - if 1 == 0 { - fmt.Printf("Action: %s, ID: %d\n", action, id) - } - - if action == "insert" { - insertCount++ - } else if action == "update" { - updateCount++ - } - totalCount = insertCount + updateCount - - //fmt.Printf("Inserts: %d, Updates: %d, Total: %d\n", insertCount, updateCount, totalCount) - if totalCount%1000 == 0 { - fmt.Printf( - "Inserts: %d, Updates: %d, Total: %d; estimatedCalls = I: %d U: %d N: %d; recordedCalls = I: %d U: %d N: %d\n", - insertCount, - updateCount, - totalCount, - estimatedCallInsertCount, - estimatedCallUpdateCount, - estimatedCallNoneCount, - recordedCallInsertCount, - recordedCallUpdateCount, - recordedCallNoneCount, - ) - } - } - - for _, estimatedCall := range journey.EstimatedCalls { - for _, call := range estimatedCall.EstimatedCall { - var estimatedValues []interface{} - - //1 estimatedvehiclejourney - estimatedValues = append(estimatedValues, id) - //2 order - estimatedValues = append(estimatedValues, call.Order) - //3 stoppointref - estimatedValues = append(estimatedValues, call.StopPointRef) - //4 aimeddeparturetime - estimatedValues = append(estimatedValues, call.AimedDepartureTime) - //5 expecteddeparturetime - estimatedValues = append(estimatedValues, call.ExpectedDepartureTime) - //6 aimedarrivaltime - estimatedValues = append(estimatedValues, call.AimedArrivalTime) - //7 expectedarrivaltime - estimatedValues = append(estimatedValues, call.ExpectedArrivalTime) - //8 cancellation - estimatedValues = append(estimatedValues, call.Cancellation) - - //9 estimated_data (JSON) - estimatedJsonObject := make(map[string]interface{}) - // data allrady loged - if call.ExpectedDepartureTime != "" { - estimatedJsonObject["ExpectedDepartureTime"] = call.ExpectedDepartureTime - } - if call.ExpectedArrivalTime != "" { - estimatedJsonObject["ExpectedArrivalTime"] = call.ExpectedArrivalTime - } - if call.Cancellation != "" { - estimatedJsonObject["Cancellation"] = call.Cancellation - } - // The rest - if call.StopPointName != "" { - estimatedJsonObject["StopPointName"] = call.StopPointName - } - if call.RequestStop != "" { - estimatedJsonObject["RequestStop"] = call.RequestStop - } - if call.DepartureStatus != "" { - estimatedJsonObject["DepartureStatus"] = call.DepartureStatus - } - if call.DeparturePlatformName != "" { - estimatedJsonObject["DeparturePlatformName"] = call.DeparturePlatformName - } - if call.DepartureBoardingActivity != "" { - estimatedJsonObject["DepartureBoardingActivity"] = call.DepartureBoardingActivity - } - if call.DepartureStopAssignment.AimedQuayRef != "" { - estimatedJsonObject["DepartureStopAssignment.AimedQuayRef"] = call.DepartureStopAssignment.AimedQuayRef - } - if call.DepartureStopAssignment.ExpectedQuayRef != "" { - estimatedJsonObject["DepartureStopAssignment.ExpectedQuayRef"] = call.DepartureStopAssignment.ExpectedQuayRef - } - if call.DepartureStopAssignment.ActualQuayRef != "" { - estimatedJsonObject["DepartureStopAssignment.ActualQuayRef"] = call.DepartureStopAssignment.ActualQuayRef - } - if call.Extensions.StopsAtAirport != "" { - estimatedJsonObject["Extensions.StopsAtAirport"] = call.Extensions.StopsAtAirport - } - if call.ArrivalStatus != "" { - estimatedJsonObject["ArrivalStatus"] = call.ArrivalStatus - } - if call.ArrivalPlatformName != "" { - estimatedJsonObject["ArrivalPlatformName"] = call.ArrivalPlatformName - } - if call.ArrivalBoardingActivity != "" { - estimatedJsonObject["ArrivalBoardingActivity"] = call.ArrivalBoardingActivity - } - if call.ArrivalStopAssignment.AimedQuayRef != "" { - estimatedJsonObject["ArrivalStopAssignment.AimedQuayRef"] = call.ArrivalStopAssignment.AimedQuayRef - } - if call.ArrivalStopAssignment.ExpectedQuayRef != "" { - estimatedJsonObject["ArrivalStopAssignment.ExpectedQuayRef"] = call.ArrivalStopAssignment.ExpectedQuayRef - } - if call.ArrivalStopAssignment.ActualQuayRef != "" { - estimatedJsonObject["ArrivalStopAssignment.ActualQuayRef"] = call.ArrivalStopAssignment.ActualQuayRef - } - if call.CallNote != "" { - estimatedJsonObject["CallNote"] = call.CallNote - } - if call.DestinationDisplay != "" { - estimatedJsonObject["DestinationDisplay"] = call.DestinationDisplay - } - if call.ExpectedDeparturePredictionQuality.PredictionLevel != "" { - estimatedJsonObject["ExpectedDeparturePredictionQuality.PredictionLevel"] = call.ExpectedDeparturePredictionQuality.PredictionLevel - } - if call.ExpectedArrivalPredictionQuality.PredictionLevel != "" { - estimatedJsonObject["ExpectedArrivalPredictionQuality.PredictionLevel"] = call.ExpectedArrivalPredictionQuality.PredictionLevel - } - if call.TimingPoint != "" { - estimatedJsonObject["TimingPoint"] = call.TimingPoint - } - if call.SituationRef != "" { - estimatedJsonObject["SituationRef"] = call.SituationRef - } - if call.PredictionInaccurate != "" { - estimatedJsonObject["PredictionInaccurate"] = call.PredictionInaccurate - } - if call.Occupancy != "" { - estimatedJsonObject["Occupancy"] = call.Occupancy - } - - // Convert the JSON object to a JSON string - jsonString, err := json.Marshal(estimatedJsonObject) - if err != nil { - log.Fatal(err) - } - estimatedValues = append(estimatedValues, string(jsonString)) - - // Insert or update the record - stringValues := make([]string, len(estimatedValues)) - for i, v := range estimatedValues { - stringValues[i] = fmt.Sprintf("%v", v) - } - interfaceValues := make([]interface{}, len(stringValues)) - for i, v := range stringValues { - interfaceValues[i] = v - } - id, action, err := database.InsertOrUpdateEstimatedCall(ctx, db, interfaceValues, valkeyClient) - if err != nil { - log.Fatalf("Failed to insert or update estimated call: %v", err) - } else { - if 1 == 0 { - fmt.Printf("Action: %s, ID: %d\n", action, id) - } - - if action == "insert" { - estimatedCallInsertCount++ - } else if action == "update" { - estimatedCallUpdateCount++ - } else if action == "none" { - estimatedCallNoneCount++ - } - } - } - } - for _, recordedCall := range journey.RecordedCalls { - for _, call := range recordedCall.RecordedCall { - var recordedValues []interface{} - - //1 estimatedvehiclejourney - recordedValues = append(recordedValues, id) - //2 order - recordedValues = append(recordedValues, call.Order) - //3 stoppointref - recordedValues = append(recordedValues, call.StopPointRef) - //4 aimeddeparturetime - recordedValues = append(recordedValues, call.AimedDepartureTime) - //5 expecteddeparturetime - recordedValues = append(recordedValues, call.ExpectedDepartureTime) - //6 aimedarrivaltime - recordedValues = append(recordedValues, call.AimedArrivalTime) - //7 expectedarrivaltime - recordedValues = append(recordedValues, call.ExpectedArrivalTime) - //8 cancellation - recordedValues = append(recordedValues, call.Cancellation) - //9 actualdeparturetime - recordedValues = append(recordedValues, call.ActualDepartureTime) - //10 actualarrivaltime - recordedValues = append(recordedValues, call.ActualArrivalTime) - - //11 recorded_data (JSON) - recordedJsonObject := make(map[string]interface{}) - if call.StopPointName != "" { - recordedJsonObject["StopPointName"] = call.StopPointName - } - if call.ArrivalPlatformName != "" { - recordedJsonObject["ArrivalPlatformName"] = call.ArrivalPlatformName - } - if call.DeparturePlatformName != "" { - recordedJsonObject["DeparturePlatformName"] = call.DeparturePlatformName - } - if call.PredictionInaccurate != "" { - recordedJsonObject["PredictionInaccurate"] = call.PredictionInaccurate - } - if call.Occupancy != "" { - recordedJsonObject["Occupancy"] = call.Occupancy - } - - // Convert the JSON object to a JSON string - jsonString, err := json.Marshal(recordedJsonObject) - if err != nil { - log.Fatal(err) - } - recordedValues = append(recordedValues, string(jsonString)) - - // Insert or update the record - stringValues := make([]string, len(recordedValues)) - for i, v := range recordedValues { - stringValues[i] = fmt.Sprintf("%v", v) - } - interfaceValues := make([]interface{}, len(stringValues)) - for i, v := range stringValues { - interfaceValues[i] = v - } - - id, action, err := database.InsertOrUpdateRecordedCall(ctx, db, interfaceValues, valkeyClient) - if err != nil { - fmt.Printf("Error inserting/updating recorded call: %v\n", err) - } else { - if 1 == 0 { - fmt.Printf("Action: %s, ID: %d\n", action, id) - } - - if action == "insert" { - recordedCallInsertCount++ - //fmt.Printf("Action: %s, ID: %d\n", action, id) - } else if action == "update" { - recordedCallUpdateCount++ - } else if action == "none" { - recordedCallNoneCount++ - } - } - } - } + journeys := data.ServiceDelivery.EstimatedTimetableDelivery[0].EstimatedJourneyVersionFrame.EstimatedVehicleJourney + totalJourneys := len(journeys) + fmt.Printf("Processing %d journeys...\n", totalJourneys) + // Job structures + type evjJob struct { + index int } + + type callJob struct { + evjID int + values []interface{} + } + + // Channels + workerCount := 20 // Adjust based on your database and CPU + evjJobs := make(chan evjJob, workerCount*2) + estimatedCallJobs := make(chan callJob, workerCount*10) + recordedCallJobs := make(chan callJob, workerCount*10) + + var wg sync.WaitGroup + var callWg sync.WaitGroup + + // Start Estimated Call workers + for w := 0; w < workerCount; w++ { + callWg.Add(1) + go func() { + defer callWg.Done() + for job := range estimatedCallJobs { + id, action, err := database.InsertOrUpdateEstimatedCall(ctx, db, job.values, valkeyClient) + if err != nil { + log.Printf("Error inserting/updating estimated call: %v\n", err) + continue + } + if action == "insert" { + atomic.AddInt64(&estimatedCallInsertCount, 1) + } else if action == "update" { + atomic.AddInt64(&estimatedCallUpdateCount, 1) + } else if action == "none" { + atomic.AddInt64(&estimatedCallNoneCount, 1) + } + _ = id + } + }() + } + + // Start Recorded Call workers + for w := 0; w < workerCount; w++ { + callWg.Add(1) + go func() { + defer callWg.Done() + for job := range recordedCallJobs { + id, action, err := database.InsertOrUpdateRecordedCall(ctx, db, job.values, valkeyClient) + if err != nil { + log.Printf("Error inserting/updating recorded call: %v\n", err) + continue + } + if action == "insert" { + atomic.AddInt64(&recordedCallInsertCount, 1) + } else if action == "update" { + atomic.AddInt64(&recordedCallUpdateCount, 1) + } else if action == "none" { + atomic.AddInt64(&recordedCallNoneCount, 1) + } + _ = id + } + }() + } + + // Start EVJ workers + for w := 0; w < workerCount; w++ { + wg.Add(1) + go func() { + defer wg.Done() + for job := range evjJobs { + journey := &journeys[job.index] + + // Prepare values + var values []interface{} + var datedVehicleJourneyRef, otherJson string + + values = append(values, sid) + values = append(values, journey.RecordedAtTime) + values = append(values, journey.LineRef) + values = append(values, strings.ToLower(journey.DirectionRef)) + values = append(values, journey.DataSource) + + if journey.FramedVehicleJourneyRef.DatedVehicleJourneyRef != "" { + datedVehicleJourneyRef = journey.FramedVehicleJourneyRef.DatedVehicleJourneyRef + } else if journey.DatedVehicleJourneyRef != "" { + datedVehicleJourneyRef = journey.DatedVehicleJourneyRef + } else { + datedVehicleJourneyRef = "evj." + journey.EstimatedVehicleJourneyCode + } + values = append(values, datedVehicleJourneyRef) + + values = append(values, journey.VehicleMode) + values = append(values, journey.FramedVehicleJourneyRef.DataFrameRef) + values = append(values, journey.OriginRef) + values = append(values, journey.DestinationRef) + values = append(values, journey.OperatorRef) + values = append(values, journey.VehicleRef) + values = append(values, journey.Cancellation) + + // Create JSON object + jsonObject := make(map[string]interface{}) + if journey.OriginName != "" { + jsonObject["OriginName"] = journey.OriginName + } + if journey.DestinationName != "" { + jsonObject["DestinationName"] = journey.DestinationName + } + if journey.ProductCategoryRef != "" { + jsonObject["ProductCategoryRef"] = journey.ProductCategoryRef + } + if journey.ServiceFeatureRef != "" { + jsonObject["ServiceFeatureRef"] = journey.ServiceFeatureRef + } + if journey.Monitored != "" { + jsonObject["Monitored"] = journey.Monitored + } + if journey.JourneyPatternRef != "" { + jsonObject["JourneyPatternRef"] = journey.JourneyPatternRef + } + if journey.JourneyPatternName != "" { + jsonObject["JourneyPatternName"] = journey.JourneyPatternName + } + if journey.PublishedLineName != "" { + jsonObject["PublishedLineName"] = journey.PublishedLineName + } + if journey.DirectionName != "" { + jsonObject["DirectionName"] = journey.DirectionName + } + if journey.OriginAimedDepartureTime != "" { + jsonObject["OriginAimedDepartureTime"] = journey.OriginAimedDepartureTime + } + if journey.DestinationAimedArrivalTime != "" { + jsonObject["DestinationAimedArrivalTime"] = journey.DestinationAimedArrivalTime + } + if journey.BlockRef != "" { + jsonObject["BlockRef"] = journey.BlockRef + } + if journey.VehicleJourneyRef != "" { + jsonObject["VehicleJourneyRef"] = journey.VehicleJourneyRef + } + if journey.Occupancy != "" { + jsonObject["Occupancy"] = journey.Occupancy + } + if journey.DestinationDisplayAtOrigin != "" { + jsonObject["DestinationDisplayAtOrigin"] = journey.DestinationDisplayAtOrigin + } + if journey.ExtraJourney != "" { + jsonObject["ExtraJourney"] = journey.ExtraJourney + } + if journey.RouteRef != "" { + jsonObject["RouteRef"] = journey.RouteRef + } + if journey.GroupOfLinesRef != "" { + jsonObject["GroupOfLinesRef"] = journey.GroupOfLinesRef + } + if journey.ExternalLineRef != "" { + jsonObject["ExternalLineRef"] = journey.ExternalLineRef + } + if journey.InCongestion != "" { + jsonObject["InCongestion"] = journey.InCongestion + } + if journey.PredictionInaccurate != "" { + jsonObject["PredictionInaccurate"] = journey.PredictionInaccurate + } + if journey.JourneyNote != "" { + jsonObject["JourneyNote"] = journey.JourneyNote + } + if journey.Via.PlaceName != "" { + jsonObject["Via"] = journey.Via.PlaceName + } + + jsonString, err := json.Marshal(jsonObject) + if err != nil { + log.Printf("Error marshaling JSON: %v\n", err) + continue + } + otherJson = string(jsonString) + values = append(values, otherJson) + + // Insert or update EVJ + id, action, err := database.InsertOrUpdateEstimatedVehicleJourney(db, values) + if err != nil { + log.Printf("Error inserting/updating estimated vehicle journey: %v\n", err) + continue + } + + if action == "insert" { + atomic.AddInt64(&insertCount, 1) + } else if action == "update" { + atomic.AddInt64(&updateCount, 1) + } + + // Progress reporting + total := atomic.AddInt64(&insertCount, 0) + atomic.AddInt64(&updateCount, 0) + if total%1000 == 0 { + fmt.Printf( + "EVJ - I: %d, U: %d, Total: %d; EstCalls - I: %d U: %d N: %d; RecCalls - I: %d U: %d N: %d\n", + atomic.LoadInt64(&insertCount), + atomic.LoadInt64(&updateCount), + total, + atomic.LoadInt64(&estimatedCallInsertCount), + atomic.LoadInt64(&estimatedCallUpdateCount), + atomic.LoadInt64(&estimatedCallNoneCount), + atomic.LoadInt64(&recordedCallInsertCount), + atomic.LoadInt64(&recordedCallUpdateCount), + atomic.LoadInt64(&recordedCallNoneCount), + ) + } + + // Process Estimated Calls + for _, estimatedCall := range journey.EstimatedCalls { + for _, call := range estimatedCall.EstimatedCall { + var estimatedValues []interface{} + + estimatedValues = append(estimatedValues, id) + estimatedValues = append(estimatedValues, call.Order) + estimatedValues = append(estimatedValues, call.StopPointRef) + estimatedValues = append(estimatedValues, call.AimedDepartureTime) + estimatedValues = append(estimatedValues, call.ExpectedDepartureTime) + estimatedValues = append(estimatedValues, call.AimedArrivalTime) + estimatedValues = append(estimatedValues, call.ExpectedArrivalTime) + estimatedValues = append(estimatedValues, call.Cancellation) + + // estimated_data JSON + estimatedJsonObject := make(map[string]interface{}) + if call.ExpectedDepartureTime != "" { + estimatedJsonObject["ExpectedDepartureTime"] = call.ExpectedDepartureTime + } + if call.ExpectedArrivalTime != "" { + estimatedJsonObject["ExpectedArrivalTime"] = call.ExpectedArrivalTime + } + if call.Cancellation != "" { + estimatedJsonObject["Cancellation"] = call.Cancellation + } + if call.StopPointName != "" { + estimatedJsonObject["StopPointName"] = call.StopPointName + } + if call.RequestStop != "" { + estimatedJsonObject["RequestStop"] = call.RequestStop + } + if call.DepartureStatus != "" { + estimatedJsonObject["DepartureStatus"] = call.DepartureStatus + } + if call.DeparturePlatformName != "" { + estimatedJsonObject["DeparturePlatformName"] = call.DeparturePlatformName + } + if call.DepartureBoardingActivity != "" { + estimatedJsonObject["DepartureBoardingActivity"] = call.DepartureBoardingActivity + } + if call.DepartureStopAssignment.AimedQuayRef != "" { + estimatedJsonObject["DepartureStopAssignment.AimedQuayRef"] = call.DepartureStopAssignment.AimedQuayRef + } + if call.DepartureStopAssignment.ExpectedQuayRef != "" { + estimatedJsonObject["DepartureStopAssignment.ExpectedQuayRef"] = call.DepartureStopAssignment.ExpectedQuayRef + } + if call.DepartureStopAssignment.ActualQuayRef != "" { + estimatedJsonObject["DepartureStopAssignment.ActualQuayRef"] = call.DepartureStopAssignment.ActualQuayRef + } + if call.Extensions.StopsAtAirport != "" { + estimatedJsonObject["Extensions.StopsAtAirport"] = call.Extensions.StopsAtAirport + } + if call.ArrivalStatus != "" { + estimatedJsonObject["ArrivalStatus"] = call.ArrivalStatus + } + if call.ArrivalPlatformName != "" { + estimatedJsonObject["ArrivalPlatformName"] = call.ArrivalPlatformName + } + if call.ArrivalBoardingActivity != "" { + estimatedJsonObject["ArrivalBoardingActivity"] = call.ArrivalBoardingActivity + } + if call.ArrivalStopAssignment.AimedQuayRef != "" { + estimatedJsonObject["ArrivalStopAssignment.AimedQuayRef"] = call.ArrivalStopAssignment.AimedQuayRef + } + if call.ArrivalStopAssignment.ExpectedQuayRef != "" { + estimatedJsonObject["ArrivalStopAssignment.ExpectedQuayRef"] = call.ArrivalStopAssignment.ExpectedQuayRef + } + if call.ArrivalStopAssignment.ActualQuayRef != "" { + estimatedJsonObject["ArrivalStopAssignment.ActualQuayRef"] = call.ArrivalStopAssignment.ActualQuayRef + } + if call.CallNote != "" { + estimatedJsonObject["CallNote"] = call.CallNote + } + if call.DestinationDisplay != "" { + estimatedJsonObject["DestinationDisplay"] = call.DestinationDisplay + } + if call.ExpectedDeparturePredictionQuality.PredictionLevel != "" { + estimatedJsonObject["ExpectedDeparturePredictionQuality.PredictionLevel"] = call.ExpectedDeparturePredictionQuality.PredictionLevel + } + if call.ExpectedArrivalPredictionQuality.PredictionLevel != "" { + estimatedJsonObject["ExpectedArrivalPredictionQuality.PredictionLevel"] = call.ExpectedArrivalPredictionQuality.PredictionLevel + } + if call.TimingPoint != "" { + estimatedJsonObject["TimingPoint"] = call.TimingPoint + } + if call.SituationRef != "" { + estimatedJsonObject["SituationRef"] = call.SituationRef + } + if call.PredictionInaccurate != "" { + estimatedJsonObject["PredictionInaccurate"] = call.PredictionInaccurate + } + if call.Occupancy != "" { + estimatedJsonObject["Occupancy"] = call.Occupancy + } + + jsonString, err := json.Marshal(estimatedJsonObject) + if err != nil { + log.Printf("Error marshaling estimated call JSON: %v\n", err) + continue + } + estimatedValues = append(estimatedValues, string(jsonString)) + + // Convert to string values + interfaceValues := make([]interface{}, len(estimatedValues)) + for i, v := range estimatedValues { + interfaceValues[i] = fmt.Sprintf("%v", v) + } + + // Send to worker pool + estimatedCallJobs <- callJob{evjID: id, values: interfaceValues} + } + } + + // Process Recorded Calls + for _, recordedCall := range journey.RecordedCalls { + for _, call := range recordedCall.RecordedCall { + var recordedValues []interface{} + + recordedValues = append(recordedValues, id) + recordedValues = append(recordedValues, call.Order) + recordedValues = append(recordedValues, call.StopPointRef) + recordedValues = append(recordedValues, call.AimedDepartureTime) + recordedValues = append(recordedValues, call.ExpectedDepartureTime) + recordedValues = append(recordedValues, call.AimedArrivalTime) + recordedValues = append(recordedValues, call.ExpectedArrivalTime) + recordedValues = append(recordedValues, call.Cancellation) + recordedValues = append(recordedValues, call.ActualDepartureTime) + recordedValues = append(recordedValues, call.ActualArrivalTime) + + // recorded_data JSON + recordedJsonObject := make(map[string]interface{}) + if call.StopPointName != "" { + recordedJsonObject["StopPointName"] = call.StopPointName + } + if call.ArrivalPlatformName != "" { + recordedJsonObject["ArrivalPlatformName"] = call.ArrivalPlatformName + } + if call.DeparturePlatformName != "" { + recordedJsonObject["DeparturePlatformName"] = call.DeparturePlatformName + } + if call.PredictionInaccurate != "" { + recordedJsonObject["PredictionInaccurate"] = call.PredictionInaccurate + } + if call.Occupancy != "" { + recordedJsonObject["Occupancy"] = call.Occupancy + } + + jsonString, err := json.Marshal(recordedJsonObject) + if err != nil { + log.Printf("Error marshaling recorded call JSON: %v\n", err) + continue + } + recordedValues = append(recordedValues, string(jsonString)) + + // Convert to string values + interfaceValues := make([]interface{}, len(recordedValues)) + for i, v := range recordedValues { + interfaceValues[i] = fmt.Sprintf("%v", v) + } + + // Send to worker pool + recordedCallJobs <- callJob{evjID: id, values: interfaceValues} + } + } + } + }() + } + + // Send all EVJ jobs + for i := range journeys { + evjJobs <- evjJob{index: i} + } + close(evjJobs) + + // Wait for EVJ processing to complete + wg.Wait() + + // Close call job channels and wait for call processing to complete + close(estimatedCallJobs) + close(recordedCallJobs) + callWg.Wait() + + // Print final stats fmt.Printf( - "DONE: Inserts: %d, Updates: %d, Total: %d; estimatedCalls = I: %d U: %d N: %d; recordedCalls = I: %d U: %d N: %d\n", - insertCount, - updateCount, - totalCount, - estimatedCallInsertCount, - estimatedCallUpdateCount, - estimatedCallNoneCount, - recordedCallInsertCount, - recordedCallUpdateCount, - recordedCallNoneCount, + "\nDONE: EVJ - Inserts: %d, Updates: %d, Total: %d\n"+ + " EstimatedCalls - I: %d U: %d N: %d\n"+ + " RecordedCalls - I: %d U: %d N: %d\n", + atomic.LoadInt64(&insertCount), + atomic.LoadInt64(&updateCount), + atomic.LoadInt64(&insertCount)+atomic.LoadInt64(&updateCount), + atomic.LoadInt64(&estimatedCallInsertCount), + atomic.LoadInt64(&estimatedCallUpdateCount), + atomic.LoadInt64(&estimatedCallNoneCount), + atomic.LoadInt64(&recordedCallInsertCount), + atomic.LoadInt64(&recordedCallUpdateCount), + atomic.LoadInt64(&recordedCallNoneCount), ) + // Create map to hold JSON serviceDeliveryJsonObject := make(map[string]interface{}) - - // Add fields to JSON - serviceDeliveryJsonObject["Inserts"] = insertCount - serviceDeliveryJsonObject["Updates"] = updateCount - serviceDeliveryJsonObject["EstimatedCallInserts"] = estimatedCallInsertCount - serviceDeliveryJsonObject["EstimatedCallUpdates"] = estimatedCallUpdateCount - serviceDeliveryJsonObject["EstimatedCallNone"] = estimatedCallNoneCount - serviceDeliveryJsonObject["RecordedCallInserts"] = recordedCallInsertCount - serviceDeliveryJsonObject["RecordedCallUpdates"] = recordedCallUpdateCount - serviceDeliveryJsonObject["RecordedCallNone"] = recordedCallNoneCount + serviceDeliveryJsonObject["Inserts"] = atomic.LoadInt64(&insertCount) + serviceDeliveryJsonObject["Updates"] = atomic.LoadInt64(&updateCount) + serviceDeliveryJsonObject["EstimatedCallInserts"] = atomic.LoadInt64(&estimatedCallInsertCount) + serviceDeliveryJsonObject["EstimatedCallUpdates"] = atomic.LoadInt64(&estimatedCallUpdateCount) + serviceDeliveryJsonObject["EstimatedCallNone"] = atomic.LoadInt64(&estimatedCallNoneCount) + serviceDeliveryJsonObject["RecordedCallInserts"] = atomic.LoadInt64(&recordedCallInsertCount) + serviceDeliveryJsonObject["RecordedCallUpdates"] = atomic.LoadInt64(&recordedCallUpdateCount) + serviceDeliveryJsonObject["RecordedCallNone"] = atomic.LoadInt64(&recordedCallNoneCount) // Convert JSON object to JSON string serviceDeliveryJsonString, err := json.Marshal(serviceDeliveryJsonObject) @@ -438,4 +482,6 @@ func DBData(data *data.Data) { if err != nil { log.Fatal(err) } + + fmt.Println("Finished with this ServiceDelivery!") }