idk i just hope it works

This commit is contained in:
pigwin
2025-01-04 21:15:45 +00:00
parent 75e007603f
commit 3c1b84197a

View File

@@ -1,459 +1,463 @@
package export package export
import ( import (
"encoding/json" "encoding/json"
"fmt" "fmt"
"log" "log"
"strings" "strings"
"sync" "sync"
"ti1/config" "ti1/config"
"ti1/data" "ti1/data"
"ti1/database" "ti1/database"
) )
func DBData(data *data.Data) { func DBData(data *data.Data) {
fmt.Println(data.ServiceDelivery.ResponseTimestamp) fmt.Println(data.ServiceDelivery.ResponseTimestamp)
fmt.Println(data.ServiceDelivery.EstimatedTimetableDelivery[0].EstimatedJourneyVersionFrame.RecordedAtTime) fmt.Println(data.ServiceDelivery.EstimatedTimetableDelivery[0].EstimatedJourneyVersionFrame.RecordedAtTime)
db, err := config.ConnectToPostgreSQL() db, err := config.ConnectToPostgreSQL()
if err != nil { if err != nil {
log.Fatal(err) log.Fatal(err)
} }
defer db.Close() defer db.Close()
// Get service id aka sid // Get service id aka sid
sid, err := database.InsertServiceDelivery(db, data.ServiceDelivery.ResponseTimestamp, data.ServiceDelivery.EstimatedTimetableDelivery[0].EstimatedJourneyVersionFrame.RecordedAtTime) sid, err := database.InsertServiceDelivery(db, data.ServiceDelivery.ResponseTimestamp, data.ServiceDelivery.EstimatedTimetableDelivery[0].EstimatedJourneyVersionFrame.RecordedAtTime)
if err != nil { if err != nil {
log.Fatal(err) log.Fatal(err)
} }
fmt.Println("SID:", sid) fmt.Println("SID:", sid)
// counters // counters
var insertCount, updateCount, totalCount, estimatedCallInsertCount, estimatedCallUpdateCount, recordedCallInsertCount, recordedCallUpdateCount int var insertCount, updateCount, totalCount, estimatedCallInsertCount, estimatedCallUpdateCount, recordedCallInsertCount, recordedCallUpdateCount int
var mu sync.Mutex var mu sync.Mutex
// Create a worker pool // Create a worker pool
numWorkers := 10 numWorkers := 10
jobs := make(chan []interface{}, numWorkers) jobs := make(chan []interface{}, numWorkers)
results := make(chan struct { results := make(chan struct {
action string action string
id int id int
err error err error
}, numWorkers) }, numWorkers)
var wg sync.WaitGroup var wg sync.WaitGroup
// Worker function // Worker function
worker := func() { worker := func() {
defer wg.Done() defer wg.Done()
for values := range jobs { for values := range jobs {
id, action, err := database.InsertOrUpdateEstimatedVehicleJourney(db, values) id, action, err := database.InsertOrUpdateEstimatedVehicleJourney(db, values)
results <- struct { results <- struct {
action string action string
id int id int
err error err error
}{action, id, err} }{action, id, err}
} }
} }
// Start workers // Start workers
for i := 0; i < numWorkers; i++ { for i := 0; i < numWorkers; i++ {
wg.Add(1) wg.Add(1)
go worker() go worker()
} }
// Send jobs to workers // Send jobs to workers
go func() { go func() {
for _, journey := range data.ServiceDelivery.EstimatedTimetableDelivery[0].EstimatedVehicleJourney { for _, journey := range data.ServiceDelivery.EstimatedTimetableDelivery[0].EstimatedJourneyVersionFrame.EstimatedVehicleJourney {
var values []interface{} var values []interface{}
var datedVehicleJourneyRef, otherJson string var datedVehicleJourneyRef, otherJson string
values = append(values, sid) values = append(values, sid)
values = append(values, journey.RecordedAtTime) values = append(values, journey.RecordedAtTime)
values = append(values, journey.LineRef) values = append(values, journey.LineRef)
values = append(values, strings.ToLower(journey.DirectionRef)) values = append(values, strings.ToLower(journey.DirectionRef))
values = append(values, journey.DataSource) values = append(values, journey.DataSource)
if journey.FramedVehicleJourneyRef.DatedVehicleJourneyRef != "" { if journey.FramedVehicleJourneyRef.DatedVehicleJourneyRef != "" {
datedVehicleJourneyRef = journey.FramedVehicleJourneyRef.DatedVehicleJourneyRef datedVehicleJourneyRef = journey.FramedVehicleJourneyRef.DatedVehicleJourneyRef
} else if journey.DatedVehicleJourneyRef != "" { } else if journey.DatedVehicleJourneyRef != "" {
datedVehicleJourneyRef = journey.DatedVehicleJourneyRef datedVehicleJourneyRef = journey.DatedVehicleJourneyRef
} else { } else {
datedVehicleJourneyRef = "evj." + journey.EstimatedVehicleJourneyCode datedVehicleJourneyRef = "evj." + journey.EstimatedVehicleJourneyCode
} }
values = append(values, datedVehicleJourneyRef) values = append(values, datedVehicleJourneyRef)
values = append(values, journey.VehicleMode) values = append(values, journey.VehicleMode)
values = append(values, journey.FramedVehicleJourneyRef.DataFrameRef) values = append(values, journey.FramedVehicleJourneyRef.DataFrameRef)
values = append(values, journey.OriginRef) values = append(values, journey.OriginRef)
values = append(values, journey.DestinationRef) values = append(values, journey.DestinationRef)
values = append(values, journey.OperatorRef) values = append(values, journey.OperatorRef)
values = append(values, journey.VehicleRef) values = append(values, journey.VehicleRef)
values = append(values, journey.Cancellation) values = append(values, journey.Cancellation)
jsonObject := make(map[string]interface{}) jsonObject := make(map[string]interface{})
if journey.OriginName != "" { if journey.OriginName != "" {
jsonObject["OriginName"] = journey.OriginName jsonObject["OriginName"] = journey.OriginName
} }
if journey.DestinationName != "" { if journey.DestinationName != "" {
jsonObject["DestinationName"] = journey.DestinationName jsonObject["DestinationName"] = journey.DestinationName
} }
if journey.ProductCategoryRef != "" { if journey.ProductCategoryRef != "" {
jsonObject["ProductCategoryRef"] = journey.ProductCategoryRef jsonObject["ProductCategoryRef"] = journey.ProductCategoryRef
} }
if journey.ServiceFeatureRef != "" { if journey.ServiceFeatureRef != "" {
jsonObject["ServiceFeatureRef"] = journey.ServiceFeatureRef jsonObject["ServiceFeatureRef"] = journey.ServiceFeatureRef
} }
if journey.Monitored != "" { if journey.Monitored != "" {
jsonObject["Monitored"] = journey.Monitored jsonObject["Monitored"] = journey.Monitored
} }
if journey.JourneyPatternRef != "" { if journey.JourneyPatternRef != "" {
jsonObject["JourneyPatternRef"] = journey.JourneyPatternRef jsonObject["JourneyPatternRef"] = journey.JourneyPatternRef
} }
if journey.JourneyPatternName != "" { if journey.JourneyPatternName != "" {
jsonObject["JourneyPatternName"] = journey.JourneyPatternName jsonObject["JourneyPatternName"] = journey.JourneyPatternName
} }
if journey.PublishedLineName != "" { if journey.PublishedLineName != "" {
jsonObject["PublishedLineName"] = journey.PublishedLineName jsonObject["PublishedLineName"] = journey.PublishedLineName
} }
if journey.DirectionName != "" { if journey.DirectionName != "" {
jsonObject["DirectionName"] = journey.DirectionName jsonObject["DirectionName"] = journey.DirectionName
} }
if journey.OriginAimedDepartureTime != "" { if journey.OriginAimedDepartureTime != "" {
jsonObject["OriginAimedDepartureTime"] = journey.OriginAimedDepartureTime jsonObject["OriginAimedDepartureTime"] = journey.OriginAimedDepartureTime
} }
if journey.DestinationAimedArrivalTime != "" { if journey.DestinationAimedArrivalTime != "" {
jsonObject["DestinationAimedArrivalTime"] = journey.DestinationAimedArrivalTime jsonObject["DestinationAimedArrivalTime"] = journey.DestinationAimedArrivalTime
} }
if journey.BlockRef != "" { if journey.BlockRef != "" {
jsonObject["BlockRef"] = journey.BlockRef jsonObject["BlockRef"] = journey.BlockRef
} }
if journey.VehicleJourneyRef != "" { if journey.VehicleJourneyRef != "" {
jsonObject["VehicleJourneyRef"] = journey.VehicleJourneyRef jsonObject["VehicleJourneyRef"] = journey.VehicleJourneyRef
} }
if journey.Occupancy != "" { if journey.Occupancy != "" {
jsonObject["Occupancy"] = journey.Occupancy jsonObject["Occupancy"] = journey.Occupancy
} }
if journey.DestinationDisplayAtOrigin != "" { if journey.DestinationDisplayAtOrigin != "" {
jsonObject["DestinationDisplayAtOrigin"] = journey.DestinationDisplayAtOrigin jsonObject["DestinationDisplayAtOrigin"] = journey.DestinationDisplayAtOrigin
} }
if journey.ExtraJourney != "" { if journey.ExtraJourney != "" {
jsonObject["ExtraJourney"] = journey.ExtraJourney jsonObject["ExtraJourney"] = journey.ExtraJourney
} }
if journey.RouteRef != "" { if journey.RouteRef != "" {
jsonObject["RouteRef"] = journey.RouteRef jsonObject["RouteRef"] = journey.RouteRef
} }
if journey.GroupOfLinesRef != "" { if journey.GroupOfLinesRef != "" {
jsonObject["GroupOfLinesRef"] = journey.GroupOfLinesRef jsonObject["GroupOfLinesRef"] = journey.GroupOfLinesRef
} }
if journey.ExternalLineRef != "" { if journey.ExternalLineRef != "" {
jsonObject["ExternalLineRef"] = journey.ExternalLineRef jsonObject["ExternalLineRef"] = journey.ExternalLineRef
} }
if journey.InCongestion != "" { if journey.InCongestion != "" {
jsonObject["InCongestion"] = journey.InCongestion jsonObject["InCongestion"] = journey.InCongestion
} }
if journey.PredictionInaccurate != "" { if journey.PredictionInaccurate != "" {
jsonObject["PredictionInaccurate"] = journey.PredictionInaccurate jsonObject["PredictionInaccurate"] = journey.PredictionInaccurate
} }
if journey.JourneyNote != "" { if journey.JourneyNote != "" {
jsonObject["JourneyNote"] = journey.JourneyNote jsonObject["JourneyNote"] = journey.JourneyNote
} }
if journey.Via.PlaceName != "" { if journey.Via.PlaceName != "" {
jsonObject["Via"] = journey.Via.PlaceName jsonObject["Via"] = journey.Via.PlaceName
}
jsonString, err := json.Marshal(jsonObject)
if err != nil {
log.Fatal(err)
}
otherJson = string(jsonString)
values = append(values, otherJson)
// Insert or update the record and get the id
id, action, err := database.InsertOrUpdateEstimatedVehicleJourney(db, values)
if err != nil {
fmt.Printf("Error inserting/updating estimated vehicle journey: %v\n", err)
continue
} else {
if 1 == 0 {
fmt.Printf("Action: %s, ID: %d\n", action, id)
}
} }
jsonString, err := json.Marshal(jsonObject) // Add the missing code here
if err != nil { for _, estimatedCall := range journey.EstimatedCalls {
log.Fatal(err) for _, call := range estimatedCall.EstimatedCall {
} var estimatedValues []interface{}
otherJson = string(jsonString)
values = append(values, otherJson)
// Insert or update the record and get the id //1 estimatedvehiclejourney
id, action, err := database.InsertOrUpdateEstimatedVehicleJourney(db, values) estimatedValues = append(estimatedValues, id)
if err != nil { //2 order
fmt.Printf("Error inserting/updating estimated vehicle journey: %v\n", err) estimatedValues = append(estimatedValues, call.Order)
continue //3 stoppointref
} estimatedValues = append(estimatedValues, call.StopPointRef)
//4 aimeddeparturetime
estimatedValues = append(estimatedValues, call.AimedDepartureTime)
//5 expecteddeparturetime
estimatedValues = append(estimatedValues, call.ExpectedDepartureTime)
//6 aimedarrivaltime
estimatedValues = append(estimatedValues, call.AimedArrivalTime)
//7 expectedarrivaltime
estimatedValues = append(estimatedValues, call.ExpectedArrivalTime)
//8 cancellation
estimatedValues = append(estimatedValues, call.Cancellation)
// Add the missing code here //9 estimated_data (JSON)
for _, estimatedCall := range journey.EstimatedCalls { estimatedJsonObject := make(map[string]interface{})
for _, call := range estimatedCall.EstimatedCall { // data already logged
var estimatedValues []interface{} if call.ExpectedDepartureTime != "" {
estimatedJsonObject["ExpectedDepartureTime"] = call.ExpectedDepartureTime
}
if call.ExpectedArrivalTime != "" {
estimatedJsonObject["ExpectedArrivalTime"] = call.ExpectedArrivalTime
}
if call.Cancellation != "" {
estimatedJsonObject["Cancellation"] = call.Cancellation
}
// The rest
if call.StopPointName != "" {
estimatedJsonObject["StopPointName"] = call.StopPointName
}
if call.RequestStop != "" {
estimatedJsonObject["RequestStop"] = call.RequestStop
}
if call.DepartureStatus != "" {
estimatedJsonObject["DepartureStatus"] = call.DepartureStatus
}
if call.DeparturePlatformName != "" {
estimatedJsonObject["DeparturePlatformName"] = call.DeparturePlatformName
}
if call.DepartureBoardingActivity != "" {
estimatedJsonObject["DepartureBoardingActivity"] = call.DepartureBoardingActivity
}
if call.DepartureStopAssignment.AimedQuayRef != "" {
estimatedJsonObject["DepartureStopAssignment.AimedQuayRef"] = call.DepartureStopAssignment.AimedQuayRef
}
if call.DepartureStopAssignment.ExpectedQuayRef != "" {
estimatedJsonObject["DepartureStopAssignment.ExpectedQuayRef"] = call.DepartureStopAssignment.ExpectedQuayRef
}
if call.DepartureStopAssignment.ActualQuayRef != "" {
estimatedJsonObject["DepartureStopAssignment.ActualQuayRef"] = call.DepartureStopAssignment.ActualQuayRef
}
if call.Extensions.StopsAtAirport != "" {
estimatedJsonObject["Extensions.StopsAtAirport"] = call.Extensions.StopsAtAirport
}
if call.ArrivalStatus != "" {
estimatedJsonObject["ArrivalStatus"] = call.ArrivalStatus
}
if call.ArrivalPlatformName != "" {
estimatedJsonObject["ArrivalPlatformName"] = call.ArrivalPlatformName
}
if call.ArrivalBoardingActivity != "" {
estimatedJsonObject["ArrivalBoardingActivity"] = call.ArrivalBoardingActivity
}
if call.ArrivalStopAssignment.AimedQuayRef != "" {
estimatedJsonObject["ArrivalStopAssignment.AimedQuayRef"] = call.ArrivalStopAssignment.AimedQuayRef
}
if call.ArrivalStopAssignment.ExpectedQuayRef != "" {
estimatedJsonObject["ArrivalStopAssignment.ExpectedQuayRef"] = call.ArrivalStopAssignment.ExpectedQuayRef
}
if call.ArrivalStopAssignment.ActualQuayRef != "" {
estimatedJsonObject["ArrivalStopAssignment.ActualQuayRef"] = call.ArrivalStopAssignment.ActualQuayRef
}
if call.CallNote != "" {
estimatedJsonObject["CallNote"] = call.CallNote
}
if call.DestinationDisplay != "" {
estimatedJsonObject["DestinationDisplay"] = call.DestinationDisplay
}
if call.ExpectedDeparturePredictionQuality.PredictionLevel != "" {
estimatedJsonObject["ExpectedDeparturePredictionQuality.PredictionLevel"] = call.ExpectedDeparturePredictionQuality.PredictionLevel
}
if call.ExpectedArrivalPredictionQuality.PredictionLevel != "" {
estimatedJsonObject["ExpectedArrivalPredictionQuality.PredictionLevel"] = call.ExpectedArrivalPredictionQuality.PredictionLevel
}
if call.TimingPoint != "" {
estimatedJsonObject["TimingPoint"] = call.TimingPoint
}
if call.SituationRef != "" {
estimatedJsonObject["SituationRef"] = call.SituationRef
}
if call.PredictionInaccurate != "" {
estimatedJsonObject["PredictionInaccurate"] = call.PredictionInaccurate
}
if call.Occupancy != "" {
estimatedJsonObject["Occupancy"] = call.Occupancy
}
//1 estimatedvehiclejourney // Convert the JSON object to a JSON string
estimatedValues = append(estimatedValues, id) jsonString, err := json.Marshal(estimatedJsonObject)
//2 order if err != nil {
estimatedValues = append(estimatedValues, call.Order) log.Fatal(err)
//3 stoppointref }
estimatedValues = append(estimatedValues, call.StopPointRef) estimatedValues = append(estimatedValues, string(jsonString))
//4 aimeddeparturetime
estimatedValues = append(estimatedValues, call.AimedDepartureTime)
//5 expecteddeparturetime
estimatedValues = append(estimatedValues, call.ExpectedDepartureTime)
//6 aimedarrivaltime
estimatedValues = append(estimatedValues, call.AimedArrivalTime)
//7 expectedarrivaltime
estimatedValues = append(estimatedValues, call.ExpectedArrivalTime)
//8 cancellation
estimatedValues = append(estimatedValues, call.Cancellation)
//9 estimated_data (JSON) // Insert or update the record
estimatedJsonObject := make(map[string]interface{}) stringValues := make([]string, len(estimatedValues))
// data already logged for i, v := range stringValues {
if call.ExpectedDepartureTime != "" { stringValues[i] = fmt.Sprintf("%v", v)
estimatedJsonObject["ExpectedDepartureTime"] = call.ExpectedDepartureTime }
} interfaceValues := make([]interface{}, len(stringValues))
if call.ExpectedArrivalTime != "" { for i, v := range stringValues {
estimatedJsonObject["ExpectedArrivalTime"] = call.ExpectedArrivalTime interfaceValues[i] = v
} }
if call.Cancellation != "" { id, action, err := database.InsertOrUpdateEstimatedCall(db, interfaceValues)
estimatedJsonObject["Cancellation"] = call.Cancellation if err != nil {
} fmt.Printf("Error inserting/updating estimated call: %v\n", err)
// The rest } else {
if call.StopPointName != "" { if 1 == 0 {
estimatedJsonObject["StopPointName"] = call.StopPointName fmt.Printf("Action: %s, ID: %d\n", action, id)
} }
if call.RequestStop != "" {
estimatedJsonObject["RequestStop"] = call.RequestStop
}
if call.DepartureStatus != "" {
estimatedJsonObject["DepartureStatus"] = call.DepartureStatus
}
if call.DeparturePlatformName != "" {
estimatedJsonObject["DeparturePlatformName"] = call.DeparturePlatformName
}
if call.DepartureBoardingActivity != "" {
estimatedJsonObject["DepartureBoardingActivity"] = call.DepartureBoardingActivity
}
if call.DepartureStopAssignment.AimedQuayRef != "" {
estimatedJsonObject["DepartureStopAssignment.AimedQuayRef"] = call.DepartureStopAssignment.AimedQuayRef
}
if call.DepartureStopAssignment.ExpectedQuayRef != "" {
estimatedJsonObject["DepartureStopAssignment.ExpectedQuayRef"] = call.DepartureStopAssignment.ExpectedQuayRef
}
if call.DepartureStopAssignment.ActualQuayRef != "" {
estimatedJsonObject["DepartureStopAssignment.ActualQuayRef"] = call.DepartureStopAssignment.ActualQuayRef
}
if call.Extensions.StopsAtAirport != "" {
estimatedJsonObject["Extensions.StopsAtAirport"] = call.Extensions.StopsAtAirport
}
if call.ArrivalStatus != "" {
estimatedJsonObject["ArrivalStatus"] = call.ArrivalStatus
}
if call.ArrivalPlatformName != "" {
estimatedJsonObject["ArrivalPlatformName"] = call.ArrivalPlatformName
}
if call.ArrivalBoardingActivity != "" {
estimatedJsonObject["ArrivalBoardingActivity"] = call.ArrivalBoardingActivity
}
if call.ArrivalStopAssignment.AimedQuayRef != "" {
estimatedJsonObject["ArrivalStopAssignment.AimedQuayRef"] = call.ArrivalStopAssignment.AimedQuayRef
}
if call.ArrivalStopAssignment.ExpectedQuayRef != "" {
estimatedJsonObject["ArrivalStopAssignment.ExpectedQuayRef"] = call.ArrivalStopAssignment.ExpectedQuayRef
}
if call.ArrivalStopAssignment.ActualQuayRef != "" {
estimatedJsonObject["ArrivalStopAssignment.ActualQuayRef"] = call.ArrivalStopAssignment.ActualQuayRef
}
if call.CallNote != "" {
estimatedJsonObject["CallNote"] = call.CallNote
}
if call.DestinationDisplay != "" {
estimatedJsonObject["DestinationDisplay"] = call.DestinationDisplay
}
if call.ExpectedDeparturePredictionQuality.PredictionLevel != "" {
estimatedJsonObject["ExpectedDeparturePredictionQuality.PredictionLevel"] = call.ExpectedDeparturePredictionQuality.PredictionLevel
}
if call.ExpectedArrivalPredictionQuality.PredictionLevel != "" {
estimatedJsonObject["ExpectedArrivalPredictionQuality.PredictionLevel"] = call.ExpectedArrivalPredictionQuality.PredictionLevel
}
if call.TimingPoint != "" {
estimatedJsonObject["TimingPoint"] = call.TimingPoint
}
if call.SituationRef != "" {
estimatedJsonObject["SituationRef"] = call.SituationRef
}
if call.PredictionInaccurate != "" {
estimatedJsonObject["PredictionInaccurate"] = call.PredictionInaccurate
}
if call.Occupancy != "" {
estimatedJsonObject["Occupancy"] = call.Occupancy
}
// Convert the JSON object to a JSON string if action == "insert" {
jsonString, err := json.Marshal(estimatedJsonObject) estimatedCallInsertCount++
if err != nil { } else if action == "update" {
log.Fatal(err) estimatedCallUpdateCount++
} }
estimatedValues = append(estimatedValues, string(jsonString)) }
}
}
for _, recordedCall := range journey.RecordedCalls {
for _, call := range recordedCall.RecordedCall {
var recordedValues []interface{}
// Insert or update the record //1 estimatedvehiclejourney
stringValues := make([]string, len(estimatedValues)) recordedValues = append(recordedValues, id)
for i, v := range stringValues { //2 order
stringValues[i] = fmt.Sprintf("%v", v) recordedValues = append(recordedValues, call.Order)
} //3 stoppointref
interfaceValues := make([]interface{}, len(stringValues)) recordedValues = append(recordedValues, call.StopPointRef)
for i, v := range stringValues { //4 aimeddeparturetime
interfaceValues[i] = v recordedValues = append(recordedValues, call.AimedDepartureTime)
} //5 expecteddeparturetime
id, action, err := database.InsertOrUpdateEstimatedCall(db, interfaceValues) recordedValues = append(recordedValues, call.ExpectedDepartureTime)
if err != nil { //6 aimedarrivaltime
fmt.Printf("Error inserting/updating estimated call: %v\n", err) recordedValues = append(recordedValues, call.AimedArrivalTime)
} else { //7 expectedarrivaltime
if 1 == 0 { recordedValues = append(recordedValues, call.ExpectedArrivalTime)
fmt.Printf("Action: %s, ID: %d\n", action, id) //8 cancellation
} recordedValues = append(recordedValues, call.Cancellation)
//9 actualdeparturetime
recordedValues = append(recordedValues, call.ActualDepartureTime)
//10 actualarrivaltime
recordedValues = append(recordedValues, call.ActualArrivalTime)
if action == "insert" { //11 recorded_data (JSON)
estimatedCallInsertCount++ recordedJsonObject := make(map[string]interface{})
} else if action == "update" { if call.StopPointName != "" {
estimatedCallUpdateCount++ recordedJsonObject["StopPointName"] = call.StopPointName
} }
} if call.ArrivalPlatformName != "" {
} recordedJsonObject["ArrivalPlatformName"] = call.ArrivalPlatformName
} }
for _, recordedCall := range journey.RecordedCalls { if call.DeparturePlatformName != "" {
for _, call := range recordedCall.RecordedCall { recordedJsonObject["DeparturePlatformName"] = call.DeparturePlatformName
var recordedValues []interface{} }
if call.PredictionInaccurate != "" {
recordedJsonObject["PredictionInaccurate"] = call.PredictionInaccurate
}
if call.Occupancy != "" {
recordedJsonObject["Occupancy"] = call.Occupancy
}
//1 estimatedvehiclejourney // Convert the JSON object to a JSON string
recordedValues = append(recordedValues, id) jsonString, err := json.Marshal(recordedJsonObject)
//2 order if err != nil {
recordedValues = append(recordedValues, call.Order) log.Fatal(err)
//3 stoppointref }
recordedValues = append(recordedValues, call.StopPointRef) recordedValues = append(recordedValues, string(jsonString))
//4 aimeddeparturetime
recordedValues = append(recordedValues, call.AimedDepartureTime)
//5 expecteddeparturetime
recordedValues = append(recordedValues, call.ExpectedDepartureTime)
//6 aimedarrivaltime
recordedValues = append(recordedValues, call.AimedArrivalTime)
//7 expectedarrivaltime
recordedValues = append(recordedValues, call.ExpectedArrivalTime)
//8 cancellation
recordedValues = append(recordedValues, call.Cancellation)
//9 actualdeparturetime
recordedValues = append(recordedValues, call.ActualDepartureTime)
//10 actualarrivaltime
recordedValues = append(recordedValues, call.ActualArrivalTime)
//11 recorded_data (JSON) // Insert or update the record
recordedJsonObject := make(map[string]interface{}) stringValues := make([]string, len(recordedValues))
if call.StopPointName != "" { for i, v := range stringValues {
recordedJsonObject["StopPointName"] = call.StopPointName stringValues[i] = fmt.Sprintf("%v", v)
} }
if call.ArrivalPlatformName != "" { interfaceValues := make([]interface{}, len(stringValues))
recordedJsonObject["ArrivalPlatformName"] = call.ArrivalPlatformName for i, v := range stringValues {
} interfaceValues[i] = v
if call.DeparturePlatformName != "" { }
recordedJsonObject["DeparturePlatformName"] = call.DeparturePlatformName
}
if call.PredictionInaccurate != "" {
recordedJsonObject["PredictionInaccurate"] = call.PredictionInaccurate
}
if call.Occupancy != "" {
recordedJsonObject["Occupancy"] = call.Occupancy
}
// Convert the JSON object to a JSON string id, action, err := database.InsertOrUpdateRecordedCall(db, interfaceValues)
jsonString, err := json.Marshal(recordedJsonObject) if err != nil {
if err != nil { fmt.Printf("Error inserting/updating recorded call: %v\n", err)
log.Fatal(err) } else {
} if 1 == 0 {
recordedValues = append(recordedValues, string(jsonString)) fmt.Printf("Action: %s, ID: %d\n", action, id)
}
// Insert or update the record if action == "insert" {
stringValues := make([]string, len(recordedValues)) recordedCallInsertCount++
for i, v := range stringValues { //fmt.Printf("Action: %s, ID: %d\n", action, id)
stringValues[i] = fmt.Sprintf("%v", v) } else if action == "update" {
} recordedCallUpdateCount++
interfaceValues := make([]interface{}, len(stringValues)) }
for i, v := range stringValues { }
interfaceValues[i] = v }
} }
}
close(jobs)
}()
id, action, err := database.InsertOrUpdateRecordedCall(db, interfaceValues) // Collect results
if err != nil { go func() {
fmt.Printf("Error inserting/updating recorded call: %v\n", err) for result := range results {
} else { if result.err != nil {
if 1 == 0 { fmt.Printf("Error inserting/updating estimated vehicle journey: %v\n", result.err)
fmt.Printf("Action: %s, ID: %d\n", action, id) } else {
} mu.Lock()
if result.action == "insert" {
insertCount++
} else if result.action == "update" {
updateCount++
}
totalCount = insertCount + updateCount
mu.Unlock()
if action == "insert" { if totalCount%1000 == 0 {
recordedCallInsertCount++ fmt.Printf(
//fmt.Printf("Action: %s, ID: %d\n", action, id) "Inserts: %d, Updates: %d, Total: %d; estimatedCalls = I: %d U: %d; recordedCalls = I: %d U: %d\n",
} else if action == "update" { insertCount,
recordedCallUpdateCount++ updateCount,
} totalCount,
} estimatedCallInsertCount,
} estimatedCallUpdateCount,
} recordedCallInsertCount,
} recordedCallUpdateCount,
close(jobs) )
}() }
}
}
}()
// Collect results wg.Wait()
go func() { close(results)
for result := range results {
if result.err != nil {
fmt.Printf("Error inserting/updating estimated vehicle journey: %v\n", result.err)
} else {
mu.Lock()
if result.action == "insert" {
insertCount++
} else if result.action == "update" {
updateCount++
}
totalCount = insertCount + updateCount
mu.Unlock()
if totalCount%1000 == 0 { fmt.Printf(
fmt.Printf( "DONE: Inserts: %d, Updates: %d, Total: %d; estimatedCalls = I: %d U: %d; recordedCalls = I: %d U: %d\n",
"Inserts: %d, Updates: %d, Total: %d; estimatedCalls = I: %d U: %d; recordedCalls = I: %d U: %d\n", insertCount,
insertCount, updateCount,
updateCount, totalCount,
totalCount, estimatedCallInsertCount,
estimatedCallInsertCount, estimatedCallUpdateCount,
estimatedCallUpdateCount, recordedCallInsertCount,
recordedCallInsertCount, recordedCallUpdateCount,
recordedCallUpdateCount, )
)
}
}
}
}()
wg.Wait() serviceDeliveryJsonObject := make(map[string]interface{})
close(results) serviceDeliveryJsonObject["Inserts"] = insertCount
serviceDeliveryJsonObject["Updates"] = updateCount
serviceDeliveryJsonObject["EstimatedCallInserts"] = estimatedCallInsertCount
serviceDeliveryJsonObject["EstimatedCallUpdates"] = estimatedCallUpdateCount
serviceDeliveryJsonObject["RecordedCallInserts"] = recordedCallInsertCount
serviceDeliveryJsonObject["RecordedCallUpdates"] = recordedCallUpdateCount
fmt.Printf( serviceDeliveryJsonString, err := json.Marshal(serviceDeliveryJsonObject)
"DONE: Inserts: %d, Updates: %d, Total: %d; estimatedCalls = I: %d U: %d; recordedCalls = I: %d U: %d\n", if err != nil {
insertCount, log.Fatal(err)
updateCount, }
totalCount,
estimatedCallInsertCount,
estimatedCallUpdateCount,
recordedCallInsertCount,
recordedCallUpdateCount,
)
serviceDeliveryJsonObject := make(map[string]interface{}) err = database.UpdateServiceDeliveryData(db, sid, string(serviceDeliveryJsonString))
serviceDeliveryJsonObject["Inserts"] = insertCount if err != nil {
serviceDeliveryJsonObject["Updates"] = updateCount log.Fatal(err)
serviceDeliveryJsonObject["EstimatedCallInserts"] = estimatedCallInsertCount }
serviceDeliveryJsonObject["EstimatedCallUpdates"] = estimatedCallUpdateCount
serviceDeliveryJsonObject["RecordedCallInserts"] = recordedCallInsertCount
serviceDeliveryJsonObject["RecordedCallUpdates"] = recordedCallUpdateCount
serviceDeliveryJsonString, err := json.Marshal(serviceDeliveryJsonObject)
if err != nil {
log.Fatal(err)
}
err = database.UpdateServiceDeliveryData(db, sid, string(serviceDeliveryJsonString))
if err != nil {
log.Fatal(err)
}
} }