didnt work well
This commit is contained in:
10
config/db.go
10
config/db.go
@@ -4,7 +4,6 @@ import (
|
|||||||
"database/sql"
|
"database/sql"
|
||||||
"fmt"
|
"fmt"
|
||||||
"log"
|
"log"
|
||||||
"time"
|
|
||||||
|
|
||||||
_ "github.com/lib/pq"
|
_ "github.com/lib/pq"
|
||||||
)
|
)
|
||||||
@@ -29,15 +28,6 @@ func ConnectToPostgreSQL() (*sql.DB, error) {
|
|||||||
|
|
||||||
fmt.Println("Connection to PostgreSQL opened successfully :D")
|
fmt.Println("Connection to PostgreSQL opened successfully :D")
|
||||||
|
|
||||||
// Set the maximum number of open connections to 20
|
|
||||||
db.SetMaxOpenConns(20)
|
|
||||||
|
|
||||||
// Set the maximum number of idle connections to 10
|
|
||||||
db.SetMaxIdleConns(10)
|
|
||||||
|
|
||||||
// Set the maximum connection lifetime to 1 hour
|
|
||||||
db.SetConnMaxLifetime(1 * time.Hour)
|
|
||||||
|
|
||||||
// Ping database to verify connection
|
// Ping database to verify connection
|
||||||
err = db.Ping()
|
err = db.Ping()
|
||||||
|
|
||||||
|
|||||||
@@ -5,7 +5,6 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
"log"
|
"log"
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
|
||||||
"ti1/config"
|
"ti1/config"
|
||||||
"ti1/data"
|
"ti1/data"
|
||||||
"ti1/database"
|
"ti1/database"
|
||||||
@@ -30,40 +29,7 @@ func DBData(data *data.Data) {
|
|||||||
|
|
||||||
// counters
|
// counters
|
||||||
var insertCount, updateCount, totalCount, estimatedCallInsertCount, estimatedCallUpdateCount, recordedCallInsertCount, recordedCallUpdateCount int
|
var insertCount, updateCount, totalCount, estimatedCallInsertCount, estimatedCallUpdateCount, recordedCallInsertCount, recordedCallUpdateCount int
|
||||||
var mu sync.Mutex
|
|
||||||
|
|
||||||
// Create a worker pool
|
|
||||||
numWorkers := 10
|
|
||||||
jobs := make(chan []interface{}, numWorkers)
|
|
||||||
results := make(chan struct {
|
|
||||||
action string
|
|
||||||
id int
|
|
||||||
err error
|
|
||||||
}, numWorkers)
|
|
||||||
|
|
||||||
var wg sync.WaitGroup
|
|
||||||
|
|
||||||
// Worker function
|
|
||||||
worker := func() {
|
|
||||||
defer wg.Done()
|
|
||||||
for values := range jobs {
|
|
||||||
id, action, err := database.InsertOrUpdateEstimatedVehicleJourney(db, values)
|
|
||||||
results <- struct {
|
|
||||||
action string
|
|
||||||
id int
|
|
||||||
err error
|
|
||||||
}{action, id, err}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Start workers
|
|
||||||
for i := 0; i < numWorkers; i++ {
|
|
||||||
wg.Add(1)
|
|
||||||
go worker()
|
|
||||||
}
|
|
||||||
|
|
||||||
// Send jobs to workers
|
|
||||||
go func() {
|
|
||||||
for _, journey := range data.ServiceDelivery.EstimatedTimetableDelivery[0].EstimatedJourneyVersionFrame.EstimatedVehicleJourney {
|
for _, journey := range data.ServiceDelivery.EstimatedTimetableDelivery[0].EstimatedJourneyVersionFrame.EstimatedVehicleJourney {
|
||||||
var values []interface{}
|
var values []interface{}
|
||||||
var datedVehicleJourneyRef, otherJson string
|
var datedVehicleJourneyRef, otherJson string
|
||||||
@@ -71,6 +37,7 @@ func DBData(data *data.Data) {
|
|||||||
values = append(values, sid)
|
values = append(values, sid)
|
||||||
values = append(values, journey.RecordedAtTime)
|
values = append(values, journey.RecordedAtTime)
|
||||||
values = append(values, journey.LineRef)
|
values = append(values, journey.LineRef)
|
||||||
|
//had to add to lowercase cus some values vary in case and it was causing duplicates
|
||||||
values = append(values, strings.ToLower(journey.DirectionRef))
|
values = append(values, strings.ToLower(journey.DirectionRef))
|
||||||
values = append(values, journey.DataSource)
|
values = append(values, journey.DataSource)
|
||||||
|
|
||||||
@@ -91,7 +58,10 @@ func DBData(data *data.Data) {
|
|||||||
values = append(values, journey.VehicleRef)
|
values = append(values, journey.VehicleRef)
|
||||||
values = append(values, journey.Cancellation)
|
values = append(values, journey.Cancellation)
|
||||||
|
|
||||||
|
// Create a map to hold the JSON object for the current journey
|
||||||
jsonObject := make(map[string]interface{})
|
jsonObject := make(map[string]interface{})
|
||||||
|
|
||||||
|
// Add relevant fields to the JSON object
|
||||||
if journey.OriginName != "" {
|
if journey.OriginName != "" {
|
||||||
jsonObject["OriginName"] = journey.OriginName
|
jsonObject["OriginName"] = journey.OriginName
|
||||||
}
|
}
|
||||||
@@ -162,6 +132,7 @@ func DBData(data *data.Data) {
|
|||||||
jsonObject["Via"] = journey.Via.PlaceName
|
jsonObject["Via"] = journey.Via.PlaceName
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Convert the JSON object to a JSON string
|
||||||
jsonString, err := json.Marshal(jsonObject)
|
jsonString, err := json.Marshal(jsonObject)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatal(err)
|
log.Fatal(err)
|
||||||
@@ -169,18 +140,37 @@ func DBData(data *data.Data) {
|
|||||||
otherJson = string(jsonString)
|
otherJson = string(jsonString)
|
||||||
values = append(values, otherJson)
|
values = append(values, otherJson)
|
||||||
|
|
||||||
// Insert or update the record and get the id
|
// Insert or update the record
|
||||||
id, action, err := database.InsertOrUpdateEstimatedVehicleJourney(db, values)
|
id, action, err := database.InsertOrUpdateEstimatedVehicleJourney(db, values)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
fmt.Printf("Error inserting/updating estimated vehicle journey: %v\n", err)
|
fmt.Printf("Error inserting/updating estimated vehicle journey: %v\n", err)
|
||||||
continue
|
|
||||||
} else {
|
} else {
|
||||||
if 1 == 0 {
|
if 1 == 0 {
|
||||||
fmt.Printf("Action: %s, ID: %d\n", action, id)
|
fmt.Printf("Action: %s, ID: %d\n", action, id)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if action == "insert" {
|
||||||
|
insertCount++
|
||||||
|
} else if action == "update" {
|
||||||
|
updateCount++
|
||||||
|
}
|
||||||
|
totalCount = insertCount + updateCount
|
||||||
|
|
||||||
|
//fmt.Printf("Inserts: %d, Updates: %d, Total: %d\n", insertCount, updateCount, totalCount)
|
||||||
|
if totalCount%1000 == 0 {
|
||||||
|
fmt.Printf(
|
||||||
|
"Inserts: %d, Updates: %d, Total: %d; estimatedCalls = I: %d U: %d; recordedCalls = I: %d U: %d\n",
|
||||||
|
insertCount,
|
||||||
|
updateCount,
|
||||||
|
totalCount,
|
||||||
|
estimatedCallInsertCount,
|
||||||
|
estimatedCallUpdateCount,
|
||||||
|
recordedCallInsertCount,
|
||||||
|
recordedCallUpdateCount,
|
||||||
|
)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Add the missing code here
|
|
||||||
for _, estimatedCall := range journey.EstimatedCalls {
|
for _, estimatedCall := range journey.EstimatedCalls {
|
||||||
for _, call := range estimatedCall.EstimatedCall {
|
for _, call := range estimatedCall.EstimatedCall {
|
||||||
var estimatedValues []interface{}
|
var estimatedValues []interface{}
|
||||||
@@ -204,7 +194,7 @@ func DBData(data *data.Data) {
|
|||||||
|
|
||||||
//9 estimated_data (JSON)
|
//9 estimated_data (JSON)
|
||||||
estimatedJsonObject := make(map[string]interface{})
|
estimatedJsonObject := make(map[string]interface{})
|
||||||
// data already logged
|
// data allrady loged
|
||||||
if call.ExpectedDepartureTime != "" {
|
if call.ExpectedDepartureTime != "" {
|
||||||
estimatedJsonObject["ExpectedDepartureTime"] = call.ExpectedDepartureTime
|
estimatedJsonObject["ExpectedDepartureTime"] = call.ExpectedDepartureTime
|
||||||
}
|
}
|
||||||
@@ -294,7 +284,7 @@ func DBData(data *data.Data) {
|
|||||||
|
|
||||||
// Insert or update the record
|
// Insert or update the record
|
||||||
stringValues := make([]string, len(estimatedValues))
|
stringValues := make([]string, len(estimatedValues))
|
||||||
for i, v := range stringValues {
|
for i, v := range estimatedValues {
|
||||||
stringValues[i] = fmt.Sprintf("%v", v)
|
stringValues[i] = fmt.Sprintf("%v", v)
|
||||||
}
|
}
|
||||||
interfaceValues := make([]interface{}, len(stringValues))
|
interfaceValues := make([]interface{}, len(stringValues))
|
||||||
@@ -369,7 +359,7 @@ func DBData(data *data.Data) {
|
|||||||
|
|
||||||
// Insert or update the record
|
// Insert or update the record
|
||||||
stringValues := make([]string, len(recordedValues))
|
stringValues := make([]string, len(recordedValues))
|
||||||
for i, v := range stringValues {
|
for i, v := range recordedValues {
|
||||||
stringValues[i] = fmt.Sprintf("%v", v)
|
stringValues[i] = fmt.Sprintf("%v", v)
|
||||||
}
|
}
|
||||||
interfaceValues := make([]interface{}, len(stringValues))
|
interfaceValues := make([]interface{}, len(stringValues))
|
||||||
@@ -394,44 +384,8 @@ func DBData(data *data.Data) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
close(jobs)
|
|
||||||
}()
|
|
||||||
|
|
||||||
// Collect results
|
|
||||||
go func() {
|
|
||||||
for result := range results {
|
|
||||||
if result.err != nil {
|
|
||||||
fmt.Printf("Error inserting/updating estimated vehicle journey: %v\n", result.err)
|
|
||||||
} else {
|
|
||||||
mu.Lock()
|
|
||||||
if result.action == "insert" {
|
|
||||||
insertCount++
|
|
||||||
} else if result.action == "update" {
|
|
||||||
updateCount++
|
|
||||||
}
|
}
|
||||||
totalCount = insertCount + updateCount
|
|
||||||
mu.Unlock()
|
|
||||||
|
|
||||||
if totalCount%1000 == 0 {
|
|
||||||
fmt.Printf(
|
|
||||||
"Inserts: %d, Updates: %d, Total: %d; estimatedCalls = I: %d U: %d; recordedCalls = I: %d U: %d\n",
|
|
||||||
insertCount,
|
|
||||||
updateCount,
|
|
||||||
totalCount,
|
|
||||||
estimatedCallInsertCount,
|
|
||||||
estimatedCallUpdateCount,
|
|
||||||
recordedCallInsertCount,
|
|
||||||
recordedCallUpdateCount,
|
|
||||||
)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
|
|
||||||
wg.Wait()
|
|
||||||
close(results)
|
|
||||||
|
|
||||||
fmt.Printf(
|
fmt.Printf(
|
||||||
"DONE: Inserts: %d, Updates: %d, Total: %d; estimatedCalls = I: %d U: %d; recordedCalls = I: %d U: %d\n",
|
"DONE: Inserts: %d, Updates: %d, Total: %d; estimatedCalls = I: %d U: %d; recordedCalls = I: %d U: %d\n",
|
||||||
insertCount,
|
insertCount,
|
||||||
@@ -442,8 +396,10 @@ func DBData(data *data.Data) {
|
|||||||
recordedCallInsertCount,
|
recordedCallInsertCount,
|
||||||
recordedCallUpdateCount,
|
recordedCallUpdateCount,
|
||||||
)
|
)
|
||||||
|
// Create map to hold JSON
|
||||||
serviceDeliveryJsonObject := make(map[string]interface{})
|
serviceDeliveryJsonObject := make(map[string]interface{})
|
||||||
|
|
||||||
|
// Add fields to JSON
|
||||||
serviceDeliveryJsonObject["Inserts"] = insertCount
|
serviceDeliveryJsonObject["Inserts"] = insertCount
|
||||||
serviceDeliveryJsonObject["Updates"] = updateCount
|
serviceDeliveryJsonObject["Updates"] = updateCount
|
||||||
serviceDeliveryJsonObject["EstimatedCallInserts"] = estimatedCallInsertCount
|
serviceDeliveryJsonObject["EstimatedCallInserts"] = estimatedCallInsertCount
|
||||||
@@ -451,11 +407,13 @@ func DBData(data *data.Data) {
|
|||||||
serviceDeliveryJsonObject["RecordedCallInserts"] = recordedCallInsertCount
|
serviceDeliveryJsonObject["RecordedCallInserts"] = recordedCallInsertCount
|
||||||
serviceDeliveryJsonObject["RecordedCallUpdates"] = recordedCallUpdateCount
|
serviceDeliveryJsonObject["RecordedCallUpdates"] = recordedCallUpdateCount
|
||||||
|
|
||||||
|
// Convert JSON object to JSON string
|
||||||
serviceDeliveryJsonString, err := json.Marshal(serviceDeliveryJsonObject)
|
serviceDeliveryJsonString, err := json.Marshal(serviceDeliveryJsonObject)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatal(err)
|
log.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Update ServiceDelivery data in database
|
||||||
err = database.UpdateServiceDeliveryData(db, sid, string(serviceDeliveryJsonString))
|
err = database.UpdateServiceDeliveryData(db, sid, string(serviceDeliveryJsonString))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatal(err)
|
log.Fatal(err)
|
||||||
|
|||||||
Reference in New Issue
Block a user