Compare commits
69 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
caeb7651d3 | ||
|
|
21a133fc57 | ||
|
|
a4c4159688 | ||
|
|
f2b984b6f9 | ||
|
|
348b3cd5b7 | ||
|
|
4c94654e0a | ||
|
|
5b840950c8 | ||
|
|
4c72bb1d4c | ||
|
|
ea36b60233 | ||
|
|
201811e2d3 | ||
|
|
bcdc24e47f | ||
|
|
4bb49e944f | ||
|
|
fbb20d576b | ||
|
|
823701c698 | ||
|
|
00e6667bc0 | ||
|
|
9b433cdd57 | ||
|
|
c106267d76 | ||
|
|
b95fbb477d | ||
|
|
7424600a8b | ||
|
|
5171dcf4f6 | ||
|
|
0324912eb4 | ||
|
|
c45723a5b0 | ||
|
|
779530f036 | ||
|
|
fd43b3e4bb | ||
|
|
bed666bd81 | ||
|
|
c09ea5784a | ||
|
|
8af34a9ab3 | ||
|
|
5bdc2acd81 | ||
|
|
3058d98a67 | ||
|
|
21b3bba164 | ||
|
|
c08bcb8432 | ||
|
|
14af72959e | ||
|
|
afd6b12acf | ||
|
|
d996411adf | ||
|
|
083a267b2a | ||
|
|
fe72a507e1 | ||
|
|
5203208fe7 | ||
|
|
2141c5f168 | ||
|
|
9919c159f2 | ||
|
|
07838da0ad | ||
|
|
b8a2a5837f | ||
|
|
df0b5135bd | ||
|
|
a2c1766dd1 | ||
|
|
97a6506a65 | ||
|
|
ba558803ff | ||
|
|
c6fc0070cf | ||
|
|
c456bdecdb | ||
|
|
1020dacf79 | ||
|
|
6c20858280 | ||
|
|
119898acc5 | ||
|
|
51fb986710 | ||
|
|
3c1b84197a | ||
|
|
75e007603f | ||
|
|
a50ef5b899 | ||
|
|
def3a9c38c | ||
|
|
c1992f7616 | ||
|
|
a274810818 | ||
|
|
42b75360c4 | ||
|
|
c99f22b131 | ||
|
|
7b96214476 | ||
|
|
c12ec02270 | ||
|
|
6db5b12d7b | ||
|
|
8f372b4213 | ||
|
|
ccbf9c8d72 | ||
|
|
fb4862c078 | ||
|
|
f3dbd4505f | ||
|
|
2462c6057e | ||
|
|
cda95aa4f2 | ||
|
|
b7e448df8d |
39
.github/workflows/docker-image.yml
vendored
39
.github/workflows/docker-image.yml
vendored
@@ -4,6 +4,7 @@ on:
|
|||||||
push:
|
push:
|
||||||
branches:
|
branches:
|
||||||
- main
|
- main
|
||||||
|
- dev
|
||||||
|
|
||||||
jobs:
|
jobs:
|
||||||
build:
|
build:
|
||||||
@@ -26,26 +27,32 @@ jobs:
|
|||||||
id: timestamp
|
id: timestamp
|
||||||
run: echo "TIMESTAMP=$(date +%Y%m%d%H%M%S)" >> $GITHUB_ENV
|
run: echo "TIMESTAMP=$(date +%Y%m%d%H%M%S)" >> $GITHUB_ENV
|
||||||
|
|
||||||
- name: Get the commit tag
|
- name: Get commit version
|
||||||
id: get-tag
|
id: commit-version
|
||||||
run: echo "GIT_TAG=$(git describe --tags --exact-match 2>/dev/null || echo 'no-tag')" >> $GITHUB_ENV
|
run: |
|
||||||
|
COMMIT_MSG=$(git log -1 --pretty=%B | head -n1 | xargs)
|
||||||
|
echo "Commit message: $COMMIT_MSG" # Debugging output
|
||||||
|
# Updated regex to handle both vX.Y, vX.Y.Z, and vX.Y-pre-release formats
|
||||||
|
if [[ "$COMMIT_MSG" =~ ^v[0-9]+\.[0-9]+(\.[0-9]+)?(-[a-zA-Z0-9._-]+)?$ ]]; then
|
||||||
|
echo "Version match: $COMMIT_MSG"
|
||||||
|
echo "VERSION=${COMMIT_MSG}" >> $GITHUB_ENV
|
||||||
|
else
|
||||||
|
echo "No version match, defaulting to 'dev'"
|
||||||
|
echo "VERSION=dev" >> $GITHUB_ENV
|
||||||
|
fi
|
||||||
|
|
||||||
- name: Build Docker image
|
- name: Build Docker image
|
||||||
run: |
|
run: |
|
||||||
if [ "${{ env.GIT_TAG }}" == "no-tag" ]; then
|
docker build -t ti1:${{ env.VERSION }} .
|
||||||
docker build -t ti1:dev-${{ env.TIMESTAMP }} .
|
|
||||||
else
|
|
||||||
docker build -t ti1:latest -t ti1:${{ env.GIT_TAG }} .
|
|
||||||
fi
|
|
||||||
|
|
||||||
- name: Push Docker image
|
- name: Push Docker image
|
||||||
run: |
|
run: |
|
||||||
if [ "${{ env.GIT_TAG }}" == "no-tag" ]; then
|
# Always push to 'dev' tag
|
||||||
docker tag ti1:dev-${{ env.TIMESTAMP }} ${{ secrets.DOCKER_USERNAME }}/ti1:dev-${{ env.TIMESTAMP }}
|
docker tag ti1:${{ env.VERSION }} ${{ secrets.DOCKER_USERNAME }}/ti1:dev
|
||||||
docker push ${{ secrets.DOCKER_USERNAME }}/ti1:dev-${{ env.TIMESTAMP }}
|
docker push ${{ secrets.DOCKER_USERNAME }}/ti1:dev
|
||||||
else
|
|
||||||
docker tag ti1:latest ${{ secrets.DOCKER_USERNAME }}/ti1:latest
|
# If the version is valid, also push that specific version tag
|
||||||
docker tag ti1:${{ env.GIT_TAG }} ${{ secrets.DOCKER_USERNAME }}/ti1:${{ env.GIT_TAG }}
|
if [[ "${{ env.VERSION }}" != "dev" ]]; then
|
||||||
docker push ${{ secrets.DOCKER_USERNAME }}/ti1:latest
|
docker tag ti1:${{ env.VERSION }} ${{ secrets.DOCKER_USERNAME }}/ti1:${{ env.VERSION }}
|
||||||
docker push ${{ secrets.DOCKER_USERNAME }}/ti1:${{ env.GIT_TAG }}
|
docker push ${{ secrets.DOCKER_USERNAME }}/ti1:${{ env.VERSION }}
|
||||||
fi
|
fi
|
||||||
|
|||||||
@@ -1,5 +1,5 @@
|
|||||||
# Use the official Golang image as the base image
|
# Use the official Golang image as the base image
|
||||||
FROM golang:1.22.1
|
FROM golang:1.26.0
|
||||||
|
|
||||||
# Set the Current Working Directory inside the container
|
# Set the Current Working Directory inside the container
|
||||||
WORKDIR /app
|
WORKDIR /app
|
||||||
|
|||||||
160
README.md
160
README.md
@@ -1,22 +1,164 @@
|
|||||||
# TI1
|
# TI1
|
||||||
|
|
||||||
The best thing to happen since yesterday at 3 pm
|
The best thing to happen since yesterday at 2:56 pm
|
||||||
|
|
||||||
## Usage
|
## Usage
|
||||||
|
|
||||||
To use this project, you can pull the Docker image from Docker Hub and run it using the following commands:
|
Start with getting Docker then do the following:
|
||||||
|
|
||||||
### Pull the Docker Image
|
### Create the setup files
|
||||||
|
Create a `docker-compose.yml`
|
||||||
|
```yaml
|
||||||
|
services:
|
||||||
|
db:
|
||||||
|
image: postgres:17.2
|
||||||
|
container_name: postgres-db
|
||||||
|
environment:
|
||||||
|
POSTGRES_USER: postgres
|
||||||
|
POSTGRES_PASSWORD: RootPassword
|
||||||
|
POSTGRES_DB: ti1
|
||||||
|
ports:
|
||||||
|
- "5432:5432"
|
||||||
|
volumes:
|
||||||
|
- ./postgres_data:/var/lib/postgresql/data # Store data in the current directory
|
||||||
|
- ./init.sql:/docker-entrypoint-initdb.d/init.sql:ro
|
||||||
|
networks:
|
||||||
|
- app-network
|
||||||
|
healthcheck:
|
||||||
|
test: ["CMD", "pg_isready", "-U", "postgres", "-d", "ti1", "-h", "db"]
|
||||||
|
interval: 10s
|
||||||
|
retries: 5
|
||||||
|
restart: always # Ensure the container always restarts
|
||||||
|
|
||||||
```sh
|
valkey:
|
||||||
docker pull pigwin1/ti1:latest
|
image: valkey/valkey:latest
|
||||||
|
container_name: valkey
|
||||||
|
environment:
|
||||||
|
VALKEY_PASSWORD: the_valkey_password
|
||||||
|
ports:
|
||||||
|
- "6379:6379"
|
||||||
|
volumes:
|
||||||
|
- ./valkey_data:/data
|
||||||
|
networks:
|
||||||
|
- app-network
|
||||||
|
restart: always # Ensure the container always restarts
|
||||||
|
|
||||||
|
ti1-container:
|
||||||
|
image: pigwin1/ti1:dev
|
||||||
|
container_name: ti1-container
|
||||||
|
environment:
|
||||||
|
DB_HOST: db
|
||||||
|
DB_PORT: 5432
|
||||||
|
DB_USER: ti1
|
||||||
|
DB_PASSWORD: ti1password
|
||||||
|
DB_NAME: ti1
|
||||||
|
DB_SSLMODE: disable
|
||||||
|
VALKEY_HOST: valkey
|
||||||
|
VALKEY_PORT: 6379
|
||||||
|
VALKEY_PASSWORD: the_valkey_password
|
||||||
|
depends_on:
|
||||||
|
db:
|
||||||
|
condition: service_healthy # Wait until the db service is healthy
|
||||||
|
valkey:
|
||||||
|
condition: service_started # Wait until the valkey service is started
|
||||||
|
networks:
|
||||||
|
- app-network
|
||||||
|
restart: always # Ensure the container always restarts
|
||||||
|
|
||||||
|
networks:
|
||||||
|
app-network:
|
||||||
|
driver: bridge
|
||||||
|
|
||||||
|
volumes:
|
||||||
|
postgres_data:
|
||||||
|
driver: local
|
||||||
|
valkey_data:
|
||||||
|
driver: local
|
||||||
```
|
```
|
||||||
|
|
||||||
### Run the Docker Container
|
Create `init.sql`
|
||||||
```sh
|
```sql
|
||||||
docker run -d --name ti1-container -e DB_HOST=<your_db_host> -e DB_PORT=<your_db_port> -e DB_USER=<your_db_user> -e DB_PASSWORD=<your_db_password> -e DB_NAME=<your_db_name> -e DB_SSLMODE=<your_db_sslmode> pigwin1/ti1:latest
|
-- Check if 'post' user exists; create if not
|
||||||
|
DO $$
|
||||||
|
BEGIN
|
||||||
|
IF NOT EXISTS (SELECT FROM pg_roles WHERE rolname = 'post') THEN
|
||||||
|
CREATE ROLE post WITH LOGIN PASSWORD 'postpassword';
|
||||||
|
GRANT ALL PRIVILEGES ON DATABASE ti1 TO post;
|
||||||
|
ALTER ROLE post WITH SUPERUSER;
|
||||||
|
END IF;
|
||||||
|
END
|
||||||
|
$$;
|
||||||
|
|
||||||
|
-- Check if 'ti1' user exists; create if not
|
||||||
|
DO $$
|
||||||
|
BEGIN
|
||||||
|
IF NOT EXISTS (SELECT FROM pg_roles WHERE rolname = 'ti1') THEN
|
||||||
|
CREATE ROLE ti1 WITH LOGIN PASSWORD 'ti1password';
|
||||||
|
GRANT ALL PRIVILEGES ON DATABASE ti1 TO ti1;
|
||||||
|
GRANT ALL PRIVILEGES ON ALL TABLES IN SCHEMA public TO ti1;
|
||||||
|
GRANT ALL PRIVILEGES ON ALL SEQUENCES IN SCHEMA public TO ti1;
|
||||||
|
GRANT ALL PRIVILEGES ON ALL FUNCTIONS IN SCHEMA public TO ti1;
|
||||||
|
-- Grant the ti1 user the necessary permissions on the public schema
|
||||||
|
GRANT USAGE, CREATE ON SCHEMA public TO ti1;
|
||||||
|
|
||||||
|
-- Grant all permissions (SELECT, INSERT, UPDATE, DELETE, etc.) on all existing tables in the public schema
|
||||||
|
GRANT ALL ON ALL TABLES IN SCHEMA public TO ti1;
|
||||||
|
|
||||||
|
-- Grant all permissions on all existing sequences in the public schema
|
||||||
|
GRANT ALL ON ALL SEQUENCES IN SCHEMA public TO ti1;
|
||||||
|
|
||||||
|
-- Grant all permissions on all functions in the public schema
|
||||||
|
GRANT ALL ON ALL FUNCTIONS IN SCHEMA public TO ti1;
|
||||||
|
|
||||||
|
-- Ensure that the ti1 user will have access to new tables, sequences, and functions created in the public schema
|
||||||
|
ALTER DEFAULT PRIVILEGES IN SCHEMA public GRANT ALL ON TABLES TO ti1;
|
||||||
|
ALTER DEFAULT PRIVILEGES IN SCHEMA public GRANT ALL ON SEQUENCES TO ti1;
|
||||||
|
ALTER DEFAULT PRIVILEGES IN SCHEMA public GRANT ALL ON FUNCTIONS TO ti1;
|
||||||
|
|
||||||
|
-- Optionally, grant full permissions on the entire database to ti1 (if needed)
|
||||||
|
-- GRANT ALL PRIVILEGES ON DATABASE ti1 TO ti1;
|
||||||
|
|
||||||
|
END IF;
|
||||||
|
END
|
||||||
|
$$;
|
||||||
|
```
|
||||||
|
|
||||||
|
Remember to change the password values
|
||||||
|
|
||||||
|
### Run the Docker Containers
|
||||||
|
```sh
|
||||||
|
docker compose up -d
|
||||||
|
```
|
||||||
|
|
||||||
|
### edit the postgress config (optinal)
|
||||||
|
open the config file
|
||||||
|
```sh
|
||||||
|
nano postgres_data/postgresql.conf
|
||||||
|
```
|
||||||
|
Change the following values
|
||||||
|
```conf
|
||||||
|
listen_addresses = '*'
|
||||||
|
max_connections = 200
|
||||||
|
shared_buffers = 16GB
|
||||||
|
work_mem = 256MB
|
||||||
|
maintenance_work_mem = 2GB
|
||||||
|
dynamic_shared_memory_type = posix
|
||||||
|
max_wal_size = 1GB
|
||||||
|
min_wal_size = 80MB
|
||||||
|
```
|
||||||
|
set these to what makes most sense for you
|
||||||
|
|
||||||
|
These values should also be set bet not necessarily changed
|
||||||
|
```conf
|
||||||
|
log_timezone = 'Etc/UTC'
|
||||||
|
datestyle = 'iso, mdy'
|
||||||
|
timezone = 'Etc/UTC'
|
||||||
|
lc_messages = 'en_US.utf8'
|
||||||
|
lc_monetary = 'en_US.utf8'
|
||||||
|
lc_numeric = 'en_US.utf8'
|
||||||
|
lc_time = 'en_US.utf8'
|
||||||
|
default_text_search_config = 'pg_catalog.english'
|
||||||
```
|
```
|
||||||
Replace `<your_db_host>`, `<your_db_port>`, `<your_db_user>`, `<your_db_password>`, `<your_db_name>`, and `<your_db_sslmode>` with your actual database configuration values.
|
|
||||||
|
|
||||||
### Docker Hub Repository
|
### Docker Hub Repository
|
||||||
You can find the Docker image on Docker Hub at the following link:
|
You can find the Docker image on Docker Hub at the following link:
|
||||||
|
|||||||
@@ -7,5 +7,12 @@
|
|||||||
"dbname": "ti1",
|
"dbname": "ti1",
|
||||||
"sslmode": "disable"
|
"sslmode": "disable"
|
||||||
},
|
},
|
||||||
|
"valkey": {
|
||||||
|
"host": "127.0.0.1",
|
||||||
|
"port": "6379",
|
||||||
|
"max_conns": 100,
|
||||||
|
"timeout_ms": 2000,
|
||||||
|
"password": "the_valkey_password"
|
||||||
|
},
|
||||||
"temp": "value"
|
"temp": "value"
|
||||||
}
|
}
|
||||||
@@ -4,6 +4,7 @@ import (
|
|||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
"os"
|
"os"
|
||||||
|
"strconv"
|
||||||
)
|
)
|
||||||
|
|
||||||
type Config struct {
|
type Config struct {
|
||||||
@@ -15,6 +16,13 @@ type Config struct {
|
|||||||
DBName string `json:"dbname"`
|
DBName string `json:"dbname"`
|
||||||
SSLMode string `json:"sslmode"`
|
SSLMode string `json:"sslmode"`
|
||||||
} `json:"database"`
|
} `json:"database"`
|
||||||
|
Valkey struct {
|
||||||
|
Host string `json:"host"`
|
||||||
|
Port string `json:"port"`
|
||||||
|
MaxConns int `json:"max_conns"`
|
||||||
|
TimeoutMs int `json:"timeout_ms"`
|
||||||
|
Password string `json:"password"` // Add this line
|
||||||
|
} `json:"valkey"`
|
||||||
Temp string `json:"temp"`
|
Temp string `json:"temp"`
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -53,7 +61,24 @@ func LoadConfig(file string) (Config, error) {
|
|||||||
if temp := os.Getenv("TEMP"); temp != "" {
|
if temp := os.Getenv("TEMP"); temp != "" {
|
||||||
config.Temp = temp
|
config.Temp = temp
|
||||||
}
|
}
|
||||||
//log.Println("Temp value:", config.Temp)
|
|
||||||
|
// Override Valkey settings with environment variables
|
||||||
|
if valkeyHost := os.Getenv("VALKEY_HOST"); valkeyHost != "" {
|
||||||
|
config.Valkey.Host = valkeyHost
|
||||||
|
}
|
||||||
|
if valkeyPort := os.Getenv("VALKEY_PORT"); valkeyPort != "" {
|
||||||
|
config.Valkey.Port = valkeyPort
|
||||||
|
}
|
||||||
|
if maxConns := os.Getenv("VALKEY_MAX_CONNS"); maxConns != "" {
|
||||||
|
if val, err := strconv.Atoi(maxConns); err == nil {
|
||||||
|
config.Valkey.MaxConns = val
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if timeoutMs := os.Getenv("VALKEY_TIMEOUT_MS"); timeoutMs != "" {
|
||||||
|
if val, err := strconv.Atoi(timeoutMs); err == nil {
|
||||||
|
config.Valkey.TimeoutMs = val
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
return config, nil
|
return config, nil
|
||||||
}
|
}
|
||||||
|
|||||||
11
config/db.go
11
config/db.go
@@ -4,6 +4,7 @@ import (
|
|||||||
"database/sql"
|
"database/sql"
|
||||||
"fmt"
|
"fmt"
|
||||||
"log"
|
"log"
|
||||||
|
"time"
|
||||||
|
|
||||||
_ "github.com/lib/pq"
|
_ "github.com/lib/pq"
|
||||||
)
|
)
|
||||||
@@ -17,7 +18,7 @@ func ConnectToPostgreSQL() (*sql.DB, error) {
|
|||||||
|
|
||||||
fmt.Println("Configuration loaded successfully!")
|
fmt.Println("Configuration loaded successfully!")
|
||||||
|
|
||||||
connStr := fmt.Sprintf("host=%s port=%s user=%s password=%s dbname=%s sslmode=%s",
|
connStr := fmt.Sprintf("host=%s port=%s user='%s' password='%s' dbname='%s' sslmode=%s",
|
||||||
config.Database.Host, config.Database.Port, config.Database.User, config.Database.Password, config.Database.DBName, config.Database.SSLMode)
|
config.Database.Host, config.Database.Port, config.Database.User, config.Database.Password, config.Database.DBName, config.Database.SSLMode)
|
||||||
|
|
||||||
// Open connection to database
|
// Open connection to database
|
||||||
@@ -26,7 +27,13 @@ func ConnectToPostgreSQL() (*sql.DB, error) {
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
fmt.Println("Connection to PostgreSQL opened successfully!")
|
// Set connection pool settings for high concurrency
|
||||||
|
db.SetMaxOpenConns(50) // Maximum number of open connections to the database
|
||||||
|
db.SetMaxIdleConns(25) // Maximum number of connections in the idle connection pool
|
||||||
|
db.SetConnMaxLifetime(1 * time.Hour) // Maximum amount of time a connection may be reused
|
||||||
|
db.SetConnMaxIdleTime(5 * time.Minute) // Maximum amount of time a connection may be idle
|
||||||
|
|
||||||
|
fmt.Println("Connection to PostgreSQL opened successfully :D")
|
||||||
|
|
||||||
// Ping database to verify connection
|
// Ping database to verify connection
|
||||||
err = db.Ping()
|
err = db.Ping()
|
||||||
|
|||||||
100
config/valkey.go
Normal file
100
config/valkey.go
Normal file
@@ -0,0 +1,100 @@
|
|||||||
|
package config
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"encoding/json"
|
||||||
|
"fmt"
|
||||||
|
"log"
|
||||||
|
"os"
|
||||||
|
"strconv"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/valkey-io/valkey-go"
|
||||||
|
)
|
||||||
|
|
||||||
|
type ValkeyConfig struct {
|
||||||
|
Host string `json:"host"`
|
||||||
|
Port string `json:"port"`
|
||||||
|
MaxConns int `json:"max_conns"`
|
||||||
|
TimeoutMs int `json:"timeout_ms"`
|
||||||
|
Password string `json:"password"` // Add this line
|
||||||
|
}
|
||||||
|
|
||||||
|
func LoadValkeyConfig(file string) (ValkeyConfig, error) {
|
||||||
|
var config ValkeyConfig
|
||||||
|
configFile, err := os.Open(file)
|
||||||
|
if err != nil {
|
||||||
|
return config, fmt.Errorf("failed to open config file: %w", err)
|
||||||
|
}
|
||||||
|
defer configFile.Close()
|
||||||
|
|
||||||
|
if err := json.NewDecoder(configFile).Decode(&config); err != nil {
|
||||||
|
return config, fmt.Errorf("failed to parse Valkey config: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Override with environment variables if set
|
||||||
|
if host := os.Getenv("VALKEY_HOST"); host != "" {
|
||||||
|
config.Host = host
|
||||||
|
}
|
||||||
|
if port := os.Getenv("VALKEY_PORT"); port != "" {
|
||||||
|
config.Port = port
|
||||||
|
}
|
||||||
|
if maxConns := os.Getenv("VALKEY_MAX_CONNS"); maxConns != "" {
|
||||||
|
if val, err := strconv.Atoi(maxConns); err == nil {
|
||||||
|
config.MaxConns = val
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if timeoutMs := os.Getenv("VALKEY_TIMEOUT_MS"); timeoutMs != "" {
|
||||||
|
if val, err := strconv.Atoi(timeoutMs); err == nil {
|
||||||
|
config.TimeoutMs = val
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if password := os.Getenv("VALKEY_PASSWORD"); password != "" {
|
||||||
|
config.Password = password
|
||||||
|
}
|
||||||
|
|
||||||
|
return config, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func ConnectToValkey(configPath string) (valkey.Client, error) {
|
||||||
|
fmt.Println("Loading configuration...")
|
||||||
|
config, err := LoadConfig(configPath)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("failed to load config: %v", err)
|
||||||
|
}
|
||||||
|
fmt.Println("Configuration loaded successfully!")
|
||||||
|
|
||||||
|
valkeyConfig := config.Valkey
|
||||||
|
|
||||||
|
// Setup Valkey client options
|
||||||
|
options := valkey.ClientOption{
|
||||||
|
InitAddress: []string{fmt.Sprintf("%s:%s", valkeyConfig.Host, valkeyConfig.Port)},
|
||||||
|
Password: valkeyConfig.Password,
|
||||||
|
// Additional options can be added here if required
|
||||||
|
}
|
||||||
|
|
||||||
|
fmt.Printf("Connecting to Valkey at %s:%s...\n", valkeyConfig.Host, valkeyConfig.Port)
|
||||||
|
client, err := valkey.NewClient(options)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("failed to connect to Valkey: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Optionally, perform a ping to validate the connection
|
||||||
|
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*time.Duration(valkeyConfig.TimeoutMs))
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
if err := client.Do(ctx, client.B().Ping().Build()).Error(); err != nil {
|
||||||
|
client.Close()
|
||||||
|
return nil, fmt.Errorf("failed to ping Valkey: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Println("Connected to Valkey successfully!")
|
||||||
|
return client, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func DisconnectFromValkey(client valkey.Client) error {
|
||||||
|
fmt.Println("Disconnecting from Valkey...")
|
||||||
|
client.Close()
|
||||||
|
log.Println("Disconnected from Valkey successfully!")
|
||||||
|
return nil
|
||||||
|
}
|
||||||
90
data/data.go
90
data/data.go
@@ -1,8 +1,12 @@
|
|||||||
package data
|
package data
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"crypto/tls"
|
||||||
"encoding/xml"
|
"encoding/xml"
|
||||||
|
"fmt"
|
||||||
|
"log"
|
||||||
"net/http"
|
"net/http"
|
||||||
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
type Data struct {
|
type Data struct {
|
||||||
@@ -125,21 +129,87 @@ type Data struct {
|
|||||||
} `xml:"ServiceDelivery"`
|
} `xml:"ServiceDelivery"`
|
||||||
}
|
}
|
||||||
|
|
||||||
func FetchData() (*Data, error) {
|
func FetchData(timestamp string) (*Data, error) {
|
||||||
client := &http.Client{}
|
// Configure HTTP client with timeout and HTTP/1.1 to avoid HTTP/2 stream errors
|
||||||
|
transport := &http.Transport{
|
||||||
resp, err := client.Get("https://api.entur.io/realtime/v1/rest/et?useOriginalId=true&maxSize=100000")
|
TLSClientConfig: &tls.Config{
|
||||||
if err != nil {
|
MinVersion: tls.VersionTLS12,
|
||||||
return nil, err
|
},
|
||||||
|
MaxIdleConns: 10,
|
||||||
|
MaxIdleConnsPerHost: 10,
|
||||||
|
IdleConnTimeout: 90 * time.Second,
|
||||||
|
DisableCompression: false,
|
||||||
|
ForceAttemptHTTP2: false, // Disable HTTP/2 to avoid stream errors
|
||||||
|
TLSHandshakeTimeout: 10 * time.Second,
|
||||||
|
ResponseHeaderTimeout: 30 * time.Second,
|
||||||
|
ExpectContinueTimeout: 1 * time.Second,
|
||||||
}
|
}
|
||||||
defer resp.Body.Close()
|
|
||||||
|
|
||||||
data := &Data{}
|
client := &http.Client{
|
||||||
|
Transport: transport,
|
||||||
|
Timeout: 180 * time.Second, // 3 minute timeout for large datasets
|
||||||
|
}
|
||||||
|
|
||||||
|
requestorId := "ti1-" + timestamp
|
||||||
|
url := "https://api.entur.io/realtime/v1/rest/et?useOriginalId=true&maxSize=100000&requestorId=" + requestorId
|
||||||
|
|
||||||
|
// Retry logic for transient failures
|
||||||
|
var resp *http.Response
|
||||||
|
var err error
|
||||||
|
var data *Data
|
||||||
|
maxRetries := 3
|
||||||
|
|
||||||
|
for attempt := 1; attempt <= maxRetries; attempt++ {
|
||||||
|
log.Printf("Fetching data from URL (attempt %d/%d): %s", attempt, maxRetries, url)
|
||||||
|
|
||||||
|
resp, err = client.Get(url)
|
||||||
|
if err != nil {
|
||||||
|
log.Printf("Request failed: %v", err)
|
||||||
|
if attempt < maxRetries {
|
||||||
|
waitTime := time.Duration(attempt*2) * time.Second
|
||||||
|
log.Printf("Retrying in %v...", waitTime)
|
||||||
|
time.Sleep(waitTime)
|
||||||
|
}
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check HTTP status code
|
||||||
|
if resp.StatusCode != http.StatusOK {
|
||||||
|
resp.Body.Close()
|
||||||
|
err = fmt.Errorf("HTTP error: %s (status code: %d)", resp.Status, resp.StatusCode)
|
||||||
|
log.Printf("%v", err)
|
||||||
|
if attempt < maxRetries {
|
||||||
|
waitTime := time.Duration(attempt*2) * time.Second
|
||||||
|
log.Printf("Retrying in %v...", waitTime)
|
||||||
|
time.Sleep(waitTime)
|
||||||
|
}
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// Try to decode the response
|
||||||
|
data = &Data{}
|
||||||
decoder := xml.NewDecoder(resp.Body)
|
decoder := xml.NewDecoder(resp.Body)
|
||||||
err = decoder.Decode(data)
|
err = decoder.Decode(data)
|
||||||
|
resp.Body.Close()
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
log.Printf("Failed to decode XML: %v", err)
|
||||||
|
if attempt < maxRetries {
|
||||||
|
waitTime := time.Duration(attempt*2) * time.Second
|
||||||
|
log.Printf("Retrying in %v...", waitTime)
|
||||||
|
time.Sleep(waitTime)
|
||||||
|
}
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// Success!
|
||||||
|
log.Printf("Successfully fetched and decoded data")
|
||||||
|
return data, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// All retries failed
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
return nil, fmt.Errorf("Failed to fetch data after %d attempts", maxRetries)
|
||||||
return data, nil
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,11 +1,24 @@
|
|||||||
package database
|
package database
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
|
"crypto/md5"
|
||||||
"database/sql"
|
"database/sql"
|
||||||
|
"encoding/hex"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"sync"
|
||||||
|
"ti1/valki"
|
||||||
|
|
||||||
|
"github.com/valkey-io/valkey-go"
|
||||||
)
|
)
|
||||||
|
|
||||||
func InsertOrUpdateEstimatedCall(db *sql.DB, values []interface{}) (int, string, error) {
|
type CallResult struct {
|
||||||
|
ID int
|
||||||
|
Action string
|
||||||
|
Error error
|
||||||
|
}
|
||||||
|
|
||||||
|
func InsertOrUpdateEstimatedCall(ctx context.Context, db *sql.DB, values []interface{}, valkeyClient valkey.Client) (int, string, error) {
|
||||||
// Replace empty strings with nil for timestamp fields
|
// Replace empty strings with nil for timestamp fields
|
||||||
for i, v := range values {
|
for i, v := range values {
|
||||||
if str, ok := v.(string); ok && str == "" {
|
if str, ok := v.(string); ok && str == "" {
|
||||||
@@ -13,6 +26,28 @@ func InsertOrUpdateEstimatedCall(db *sql.DB, values []interface{}) (int, string,
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Convert values to a single string and hash it using MD5
|
||||||
|
var valuesString string
|
||||||
|
for _, v := range values {
|
||||||
|
if v != nil {
|
||||||
|
valuesString += fmt.Sprintf("%v", v)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
hash := md5.Sum([]byte(valuesString))
|
||||||
|
hashString := hex.EncodeToString(hash[:])
|
||||||
|
|
||||||
|
estimatedVehicleJourneyID := values[0]
|
||||||
|
orderID := values[1]
|
||||||
|
key := fmt.Sprintf("%v.%v", estimatedVehicleJourneyID, orderID)
|
||||||
|
|
||||||
|
// Get the MD5 hash from Valkey
|
||||||
|
retrievedHash, err := valki.GetValkeyValue(ctx, valkeyClient, key)
|
||||||
|
if err != nil {
|
||||||
|
return 0, "", fmt.Errorf("failed to get value from Valkey: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check if the retrieved value matches the original MD5 hash
|
||||||
|
if retrievedHash != hashString {
|
||||||
query := `
|
query := `
|
||||||
INSERT INTO calls (
|
INSERT INTO calls (
|
||||||
estimatedvehiclejourney, "order", stoppointref,
|
estimatedvehiclejourney, "order", stoppointref,
|
||||||
@@ -32,24 +67,60 @@ func InsertOrUpdateEstimatedCall(db *sql.DB, values []interface{}) (int, string,
|
|||||||
estimated_data = EXCLUDED.estimated_data
|
estimated_data = EXCLUDED.estimated_data
|
||||||
RETURNING CASE WHEN xmax = 0 THEN 'insert' ELSE 'update' END, id;
|
RETURNING CASE WHEN xmax = 0 THEN 'insert' ELSE 'update' END, id;
|
||||||
`
|
`
|
||||||
stmt, err := db.Prepare(query)
|
|
||||||
|
err = valki.SetValkeyValue(ctx, valkeyClient, key, hashString)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, "", fmt.Errorf("error preparing statement: %v", err)
|
return 0, "", fmt.Errorf("failed to set value in Valkey: %w", err)
|
||||||
}
|
}
|
||||||
defer stmt.Close()
|
|
||||||
|
|
||||||
var action string
|
var action string
|
||||||
var id int
|
var id int
|
||||||
err = stmt.QueryRow(values...).Scan(&action, &id)
|
err = db.QueryRowContext(ctx, query, values...).Scan(&action, &id)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if 1 == 0 {
|
return 0, "", fmt.Errorf("error executing statement: %w", err)
|
||||||
fmt.Println("Executing query:", query)
|
|
||||||
for i, v := range values {
|
|
||||||
fmt.Printf("Value %d: (%v)\n", i+1, v)
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
return 0, "", fmt.Errorf("error executing statement: %v", err)
|
|
||||||
}
|
}
|
||||||
return id, action, nil
|
return id, action, nil
|
||||||
}
|
}
|
||||||
|
return 0, "none", nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// BatchInsertEstimatedCalls processes multiple estimated calls concurrently
|
||||||
|
func BatchInsertEstimatedCalls(ctx context.Context, db *sql.DB, batch [][]interface{}, valkeyClient valkey.Client, workerCount int) ([]CallResult, error) {
|
||||||
|
if len(batch) == 0 {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
results := make([]CallResult, len(batch))
|
||||||
|
jobs := make(chan int, len(batch))
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
|
||||||
|
// Start workers
|
||||||
|
for w := 0; w < workerCount; w++ {
|
||||||
|
wg.Add(1)
|
||||||
|
go func() {
|
||||||
|
defer wg.Done()
|
||||||
|
for idx := range jobs {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return
|
||||||
|
default:
|
||||||
|
id, action, err := InsertOrUpdateEstimatedCall(ctx, db, batch[idx], valkeyClient)
|
||||||
|
results[idx] = CallResult{
|
||||||
|
ID: id,
|
||||||
|
Action: action,
|
||||||
|
Error: err,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
|
||||||
|
// Send jobs
|
||||||
|
for i := range batch {
|
||||||
|
jobs <- i
|
||||||
|
}
|
||||||
|
close(jobs)
|
||||||
|
|
||||||
|
wg.Wait()
|
||||||
|
return results, nil
|
||||||
|
}
|
||||||
|
|||||||
@@ -1,10 +1,68 @@
|
|||||||
package database
|
package database
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"database/sql"
|
"database/sql"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"sync"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
type EVJResult struct {
|
||||||
|
ID int
|
||||||
|
Action string
|
||||||
|
Error error
|
||||||
|
Index int // To maintain order
|
||||||
|
}
|
||||||
|
|
||||||
|
// PreparedStatements holds reusable prepared statements
|
||||||
|
type PreparedStatements struct {
|
||||||
|
evjStmt *sql.Stmt
|
||||||
|
ecStmt *sql.Stmt
|
||||||
|
rcStmt *sql.Stmt
|
||||||
|
mu sync.Mutex
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewPreparedStatements(db *sql.DB) (*PreparedStatements, error) {
|
||||||
|
evjQuery := `
|
||||||
|
INSERT INTO estimatedvehiclejourney (servicedelivery, recordedattime, lineref, directionref, datasource, datedvehiclejourneyref, vehiclemode, dataframeref, originref, destinationref, operatorref, vehicleref, cancellation, other, firstservicedelivery)
|
||||||
|
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $1)
|
||||||
|
ON CONFLICT (lineref, directionref, datasource, datedvehiclejourneyref)
|
||||||
|
DO UPDATE SET
|
||||||
|
servicedelivery = EXCLUDED.servicedelivery,
|
||||||
|
recordedattime = EXCLUDED.recordedattime,
|
||||||
|
vehiclemode = COALESCE(EXCLUDED.vehiclemode, estimatedvehiclejourney.vehiclemode),
|
||||||
|
dataframeref = COALESCE(EXCLUDED.dataframeref, estimatedvehiclejourney.dataframeref),
|
||||||
|
originref = COALESCE(EXCLUDED.originref, estimatedvehiclejourney.originref),
|
||||||
|
destinationref = COALESCE(EXCLUDED.destinationref, estimatedvehiclejourney.destinationref),
|
||||||
|
operatorref = COALESCE(EXCLUDED.operatorref, estimatedvehiclejourney.operatorref),
|
||||||
|
vehicleref = COALESCE(EXCLUDED.vehicleref, estimatedvehiclejourney.vehicleref),
|
||||||
|
cancellation = COALESCE(EXCLUDED.cancellation, estimatedvehiclejourney.cancellation),
|
||||||
|
other = COALESCE(EXCLUDED.other, estimatedvehiclejourney.other)
|
||||||
|
RETURNING CASE WHEN xmax = 0 THEN 'insert' ELSE 'update' END, id;
|
||||||
|
`
|
||||||
|
|
||||||
|
evjStmt, err := db.Prepare(evjQuery)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("failed to prepare EVJ statement: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return &PreparedStatements{
|
||||||
|
evjStmt: evjStmt,
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ps *PreparedStatements) Close() {
|
||||||
|
if ps.evjStmt != nil {
|
||||||
|
ps.evjStmt.Close()
|
||||||
|
}
|
||||||
|
if ps.ecStmt != nil {
|
||||||
|
ps.ecStmt.Close()
|
||||||
|
}
|
||||||
|
if ps.rcStmt != nil {
|
||||||
|
ps.rcStmt.Close()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func InsertOrUpdateEstimatedVehicleJourney(db *sql.DB, values []interface{}) (int, string, error) {
|
func InsertOrUpdateEstimatedVehicleJourney(db *sql.DB, values []interface{}) (int, string, error) {
|
||||||
query := `
|
query := `
|
||||||
INSERT INTO estimatedvehiclejourney (servicedelivery, recordedattime, lineref, directionref, datasource, datedvehiclejourneyref, vehiclemode, dataframeref, originref, destinationref, operatorref, vehicleref, cancellation, other, firstservicedelivery)
|
INSERT INTO estimatedvehiclejourney (servicedelivery, recordedattime, lineref, directionref, datasource, datedvehiclejourneyref, vehiclemode, dataframeref, originref, destinationref, operatorref, vehicleref, cancellation, other, firstservicedelivery)
|
||||||
@@ -24,18 +82,54 @@ func InsertOrUpdateEstimatedVehicleJourney(db *sql.DB, values []interface{}) (in
|
|||||||
RETURNING CASE WHEN xmax = 0 THEN 'insert' ELSE 'update' END, id;
|
RETURNING CASE WHEN xmax = 0 THEN 'insert' ELSE 'update' END, id;
|
||||||
`
|
`
|
||||||
|
|
||||||
stmt, err := db.Prepare(query)
|
|
||||||
if err != nil {
|
|
||||||
return 0, "", fmt.Errorf("error preparing statement: %v", err)
|
|
||||||
}
|
|
||||||
defer stmt.Close()
|
|
||||||
|
|
||||||
var action string
|
var action string
|
||||||
var id int
|
var id int
|
||||||
err = stmt.QueryRow(values...).Scan(&action, &id)
|
err := db.QueryRow(query, values...).Scan(&action, &id)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, "", fmt.Errorf("error executing statement: %v", err)
|
return 0, "", fmt.Errorf("error executing EVJ statement: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return id, action, nil
|
return id, action, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// BatchInsertEVJ processes multiple EVJ inserts concurrently
|
||||||
|
func BatchInsertEVJ(ctx context.Context, db *sql.DB, batch [][]interface{}, workerCount int) ([]EVJResult, error) {
|
||||||
|
if len(batch) == 0 {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
results := make([]EVJResult, len(batch))
|
||||||
|
jobs := make(chan int, len(batch))
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
|
||||||
|
// Start workers
|
||||||
|
for w := 0; w < workerCount; w++ {
|
||||||
|
wg.Add(1)
|
||||||
|
go func() {
|
||||||
|
defer wg.Done()
|
||||||
|
for idx := range jobs {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return
|
||||||
|
default:
|
||||||
|
id, action, err := InsertOrUpdateEstimatedVehicleJourney(db, batch[idx])
|
||||||
|
results[idx] = EVJResult{
|
||||||
|
ID: id,
|
||||||
|
Action: action,
|
||||||
|
Error: err,
|
||||||
|
Index: idx,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
|
||||||
|
// Send jobs
|
||||||
|
for i := range batch {
|
||||||
|
jobs <- i
|
||||||
|
}
|
||||||
|
close(jobs)
|
||||||
|
|
||||||
|
wg.Wait()
|
||||||
|
return results, nil
|
||||||
|
}
|
||||||
|
|||||||
@@ -1,11 +1,18 @@
|
|||||||
package database
|
package database
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
|
"crypto/md5"
|
||||||
"database/sql"
|
"database/sql"
|
||||||
|
"encoding/hex"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"sync"
|
||||||
|
"ti1/valki"
|
||||||
|
|
||||||
|
"github.com/valkey-io/valkey-go"
|
||||||
)
|
)
|
||||||
|
|
||||||
func InsertOrUpdateRecordedCall(db *sql.DB, values []interface{}) (int, string, error) {
|
func InsertOrUpdateRecordedCall(ctx context.Context, db *sql.DB, values []interface{}, valkeyClient valkey.Client) (int, string, error) {
|
||||||
// Replace empty strings with nil for timestamp fields
|
// Replace empty strings with nil for timestamp fields
|
||||||
for i, v := range values {
|
for i, v := range values {
|
||||||
if str, ok := v.(string); ok && str == "" {
|
if str, ok := v.(string); ok && str == "" {
|
||||||
@@ -13,6 +20,28 @@ func InsertOrUpdateRecordedCall(db *sql.DB, values []interface{}) (int, string,
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Convert values to a single string and hash it using MD5
|
||||||
|
var valuesString string
|
||||||
|
for _, v := range values {
|
||||||
|
if v != nil {
|
||||||
|
valuesString += fmt.Sprintf("%v", v)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
hash := md5.Sum([]byte(valuesString))
|
||||||
|
hashString := hex.EncodeToString(hash[:])
|
||||||
|
|
||||||
|
estimatedVehicleJourneyID := values[0]
|
||||||
|
orderID := values[1]
|
||||||
|
key := fmt.Sprintf("%v.%v", estimatedVehicleJourneyID, orderID)
|
||||||
|
|
||||||
|
// Get the MD5 hash from Valkey
|
||||||
|
retrievedHash, err := valki.GetValkeyValue(ctx, valkeyClient, key)
|
||||||
|
if err != nil {
|
||||||
|
return 0, "", fmt.Errorf("failed to get value from Valkey: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check if the retrieved value matches the original MD5 hash
|
||||||
|
if retrievedHash != hashString {
|
||||||
query := `
|
query := `
|
||||||
INSERT INTO calls (
|
INSERT INTO calls (
|
||||||
estimatedvehiclejourney, "order", stoppointref,
|
estimatedvehiclejourney, "order", stoppointref,
|
||||||
@@ -35,24 +64,60 @@ func InsertOrUpdateRecordedCall(db *sql.DB, values []interface{}) (int, string,
|
|||||||
recorded_data = EXCLUDED.recorded_data
|
recorded_data = EXCLUDED.recorded_data
|
||||||
RETURNING CASE WHEN xmax = 0 THEN 'insert' ELSE 'update' END, id;
|
RETURNING CASE WHEN xmax = 0 THEN 'insert' ELSE 'update' END, id;
|
||||||
`
|
`
|
||||||
stmt, err := db.Prepare(query)
|
|
||||||
|
err = valki.SetValkeyValue(ctx, valkeyClient, key, hashString)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, "", fmt.Errorf("error preparing statement: %v", err)
|
return 0, "", fmt.Errorf("failed to set value in Valkey: %w", err)
|
||||||
}
|
}
|
||||||
defer stmt.Close()
|
|
||||||
|
|
||||||
var action string
|
var action string
|
||||||
var id int
|
var id int
|
||||||
err = stmt.QueryRow(values...).Scan(&action, &id)
|
err = db.QueryRowContext(ctx, query, values...).Scan(&action, &id)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if 1 == 0 {
|
return 0, "", fmt.Errorf("error executing statement: %w", err)
|
||||||
fmt.Println("Executing query:", query)
|
|
||||||
for i, v := range values {
|
|
||||||
fmt.Printf("Value %d: (%v)\n", i+1, v)
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
return 0, "", fmt.Errorf("error executing statement: %v", err)
|
|
||||||
}
|
}
|
||||||
return id, action, nil
|
return id, action, nil
|
||||||
}
|
}
|
||||||
|
return 0, "none", nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// BatchInsertRecordedCalls processes multiple recorded calls concurrently
|
||||||
|
func BatchInsertRecordedCalls(ctx context.Context, db *sql.DB, batch [][]interface{}, valkeyClient valkey.Client, workerCount int) ([]CallResult, error) {
|
||||||
|
if len(batch) == 0 {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
results := make([]CallResult, len(batch))
|
||||||
|
jobs := make(chan int, len(batch))
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
|
||||||
|
// Start workers
|
||||||
|
for w := 0; w < workerCount; w++ {
|
||||||
|
wg.Add(1)
|
||||||
|
go func() {
|
||||||
|
defer wg.Done()
|
||||||
|
for idx := range jobs {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return
|
||||||
|
default:
|
||||||
|
id, action, err := InsertOrUpdateRecordedCall(ctx, db, batch[idx], valkeyClient)
|
||||||
|
results[idx] = CallResult{
|
||||||
|
ID: id,
|
||||||
|
Action: action,
|
||||||
|
Error: err,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
|
||||||
|
// Send jobs
|
||||||
|
for i := range batch {
|
||||||
|
jobs <- i
|
||||||
|
}
|
||||||
|
close(jobs)
|
||||||
|
|
||||||
|
wg.Wait()
|
||||||
|
return results, nil
|
||||||
|
}
|
||||||
|
|||||||
@@ -6,25 +6,18 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
func InsertServiceDelivery(db *sql.DB, responseTimestamp string, recordedAtTime string) (int, error) {
|
func InsertServiceDelivery(db *sql.DB, responseTimestamp string, recordedAtTime string) (int, error) {
|
||||||
fmt.Println("Inserting ServiceDelivery...")
|
|
||||||
var id int
|
var id int
|
||||||
|
|
||||||
err := db.QueryRow("INSERT INTO public.ServiceDelivery (ResponseTimestamp, RecordedAtTime) VALUES ($1, $2) RETURNING ID", responseTimestamp, recordedAtTime).Scan(&id)
|
err := db.QueryRow("INSERT INTO public.ServiceDelivery (ResponseTimestamp, RecordedAtTime) VALUES ($1, $2) RETURNING ID", responseTimestamp, recordedAtTime).Scan(&id)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
fmt.Println(err)
|
return 0, fmt.Errorf("failed to insert service delivery: %w", err)
|
||||||
return 0, err
|
|
||||||
}
|
}
|
||||||
//fmt.Println("ServiceDelivery inserted successfully! (", id, ")")
|
|
||||||
return id, nil
|
return id, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func UpdateServiceDeliveryData(db *sql.DB, id int, data string) error {
|
func UpdateServiceDeliveryData(db *sql.DB, id int, data string) error {
|
||||||
fmt.Println("Updating ServiceDelivery data...")
|
|
||||||
_, err := db.Exec("UPDATE public.ServiceDelivery SET Data = $1 WHERE ID = $2", data, id)
|
_, err := db.Exec("UPDATE public.ServiceDelivery SET Data = $1 WHERE ID = $2", data, id)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
fmt.Println(err)
|
return fmt.Errorf("failed to update service delivery data: %w", err)
|
||||||
return err
|
|
||||||
}
|
}
|
||||||
fmt.Println("Finished with this ServiceDelivery!")
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,16 +1,26 @@
|
|||||||
package export
|
package export
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
"log"
|
"log"
|
||||||
"strings"
|
"strings"
|
||||||
|
"sync"
|
||||||
|
"sync/atomic"
|
||||||
"ti1/config"
|
"ti1/config"
|
||||||
"ti1/data"
|
"ti1/data"
|
||||||
"ti1/database"
|
"ti1/database"
|
||||||
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// DBData is the main entry point for data processing
|
||||||
func DBData(data *data.Data) {
|
func DBData(data *data.Data) {
|
||||||
|
DBDataOptimized(data)
|
||||||
|
}
|
||||||
|
|
||||||
|
// DBDataOptimized processes data with concurrent workers for better performance
|
||||||
|
func DBDataOptimized(data *data.Data) {
|
||||||
fmt.Println(data.ServiceDelivery.ResponseTimestamp)
|
fmt.Println(data.ServiceDelivery.ResponseTimestamp)
|
||||||
fmt.Println(data.ServiceDelivery.EstimatedTimetableDelivery[0].EstimatedJourneyVersionFrame.RecordedAtTime)
|
fmt.Println(data.ServiceDelivery.EstimatedTimetableDelivery[0].EstimatedJourneyVersionFrame.RecordedAtTime)
|
||||||
|
|
||||||
@@ -20,6 +30,15 @@ func DBData(data *data.Data) {
|
|||||||
}
|
}
|
||||||
defer db.Close()
|
defer db.Close()
|
||||||
|
|
||||||
|
// Connect to Valkey
|
||||||
|
valkeyClient, err := config.ConnectToValkey("config/conf.json")
|
||||||
|
if err != nil {
|
||||||
|
log.Fatalf("Failed to connect to Valkey: %v", err)
|
||||||
|
}
|
||||||
|
defer config.DisconnectFromValkey(valkeyClient)
|
||||||
|
|
||||||
|
ctx := context.Background()
|
||||||
|
|
||||||
// Get service id aka sid
|
// Get service id aka sid
|
||||||
sid, err := database.InsertServiceDelivery(db, data.ServiceDelivery.ResponseTimestamp, data.ServiceDelivery.EstimatedTimetableDelivery[0].EstimatedJourneyVersionFrame.RecordedAtTime)
|
sid, err := database.InsertServiceDelivery(db, data.ServiceDelivery.ResponseTimestamp, data.ServiceDelivery.EstimatedTimetableDelivery[0].EstimatedJourneyVersionFrame.RecordedAtTime)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -27,17 +46,96 @@ func DBData(data *data.Data) {
|
|||||||
}
|
}
|
||||||
fmt.Println("SID:", sid)
|
fmt.Println("SID:", sid)
|
||||||
|
|
||||||
// counters
|
// Record start time
|
||||||
var insertCount, updateCount, totalCount, estimatedCallInsertCount, estimatedCallUpdateCount, recordedCallInsertCount, recordedCallUpdateCount int
|
startTime := time.Now()
|
||||||
|
|
||||||
for _, journey := range data.ServiceDelivery.EstimatedTimetableDelivery[0].EstimatedJourneyVersionFrame.EstimatedVehicleJourney {
|
// Atomic counters for thread-safe counting
|
||||||
|
var insertCount, updateCount, estimatedCallInsertCount, estimatedCallUpdateCount, estimatedCallNoneCount, recordedCallInsertCount, recordedCallUpdateCount, recordedCallNoneCount int64
|
||||||
|
|
||||||
|
journeys := data.ServiceDelivery.EstimatedTimetableDelivery[0].EstimatedJourneyVersionFrame.EstimatedVehicleJourney
|
||||||
|
totalJourneys := len(journeys)
|
||||||
|
fmt.Printf("Processing %d journeys...\n", totalJourneys)
|
||||||
|
|
||||||
|
// Job structures
|
||||||
|
type evjJob struct {
|
||||||
|
index int
|
||||||
|
}
|
||||||
|
|
||||||
|
type callJob struct {
|
||||||
|
evjID int
|
||||||
|
values []interface{}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Channels
|
||||||
|
workerCount := 20 // Adjust based on your database and CPU
|
||||||
|
evjJobs := make(chan evjJob, workerCount*2)
|
||||||
|
estimatedCallJobs := make(chan callJob, workerCount*10)
|
||||||
|
recordedCallJobs := make(chan callJob, workerCount*10)
|
||||||
|
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
var callWg sync.WaitGroup
|
||||||
|
|
||||||
|
// Start Estimated Call workers
|
||||||
|
for w := 0; w < workerCount; w++ {
|
||||||
|
callWg.Add(1)
|
||||||
|
go func() {
|
||||||
|
defer callWg.Done()
|
||||||
|
for job := range estimatedCallJobs {
|
||||||
|
id, action, err := database.InsertOrUpdateEstimatedCall(ctx, db, job.values, valkeyClient)
|
||||||
|
if err != nil {
|
||||||
|
log.Printf("Error inserting/updating estimated call: %v\n", err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if action == "insert" {
|
||||||
|
atomic.AddInt64(&estimatedCallInsertCount, 1)
|
||||||
|
} else if action == "update" {
|
||||||
|
atomic.AddInt64(&estimatedCallUpdateCount, 1)
|
||||||
|
} else if action == "none" {
|
||||||
|
atomic.AddInt64(&estimatedCallNoneCount, 1)
|
||||||
|
}
|
||||||
|
_ = id
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
|
||||||
|
// Start Recorded Call workers
|
||||||
|
for w := 0; w < workerCount; w++ {
|
||||||
|
callWg.Add(1)
|
||||||
|
go func() {
|
||||||
|
defer callWg.Done()
|
||||||
|
for job := range recordedCallJobs {
|
||||||
|
id, action, err := database.InsertOrUpdateRecordedCall(ctx, db, job.values, valkeyClient)
|
||||||
|
if err != nil {
|
||||||
|
log.Printf("Error inserting/updating recorded call: %v\n", err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if action == "insert" {
|
||||||
|
atomic.AddInt64(&recordedCallInsertCount, 1)
|
||||||
|
} else if action == "update" {
|
||||||
|
atomic.AddInt64(&recordedCallUpdateCount, 1)
|
||||||
|
} else if action == "none" {
|
||||||
|
atomic.AddInt64(&recordedCallNoneCount, 1)
|
||||||
|
}
|
||||||
|
_ = id
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
|
||||||
|
// Start EVJ workers
|
||||||
|
for w := 0; w < workerCount; w++ {
|
||||||
|
wg.Add(1)
|
||||||
|
go func() {
|
||||||
|
defer wg.Done()
|
||||||
|
for job := range evjJobs {
|
||||||
|
journey := &journeys[job.index]
|
||||||
|
|
||||||
|
// Prepare values
|
||||||
var values []interface{}
|
var values []interface{}
|
||||||
var datedVehicleJourneyRef, otherJson string
|
var datedVehicleJourneyRef, otherJson string
|
||||||
|
|
||||||
values = append(values, sid)
|
values = append(values, sid)
|
||||||
values = append(values, journey.RecordedAtTime)
|
values = append(values, journey.RecordedAtTime)
|
||||||
values = append(values, journey.LineRef)
|
values = append(values, journey.LineRef)
|
||||||
//had to add to lowercase cus some values vary in case and it was causing duplicates
|
|
||||||
values = append(values, strings.ToLower(journey.DirectionRef))
|
values = append(values, strings.ToLower(journey.DirectionRef))
|
||||||
values = append(values, journey.DataSource)
|
values = append(values, journey.DataSource)
|
||||||
|
|
||||||
@@ -58,10 +156,8 @@ func DBData(data *data.Data) {
|
|||||||
values = append(values, journey.VehicleRef)
|
values = append(values, journey.VehicleRef)
|
||||||
values = append(values, journey.Cancellation)
|
values = append(values, journey.Cancellation)
|
||||||
|
|
||||||
// Create a map to hold the JSON object for the current journey
|
// Create JSON object
|
||||||
jsonObject := make(map[string]interface{})
|
jsonObject := make(map[string]interface{})
|
||||||
|
|
||||||
// Add relevant fields to the JSON object
|
|
||||||
if journey.OriginName != "" {
|
if journey.OriginName != "" {
|
||||||
jsonObject["OriginName"] = journey.OriginName
|
jsonObject["OriginName"] = journey.OriginName
|
||||||
}
|
}
|
||||||
@@ -132,69 +228,60 @@ func DBData(data *data.Data) {
|
|||||||
jsonObject["Via"] = journey.Via.PlaceName
|
jsonObject["Via"] = journey.Via.PlaceName
|
||||||
}
|
}
|
||||||
|
|
||||||
// Convert the JSON object to a JSON string
|
|
||||||
jsonString, err := json.Marshal(jsonObject)
|
jsonString, err := json.Marshal(jsonObject)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatal(err)
|
log.Printf("Error marshaling JSON: %v\n", err)
|
||||||
|
continue
|
||||||
}
|
}
|
||||||
otherJson = string(jsonString)
|
otherJson = string(jsonString)
|
||||||
values = append(values, otherJson)
|
values = append(values, otherJson)
|
||||||
|
|
||||||
// Insert or update the record
|
// Insert or update EVJ
|
||||||
id, action, err := database.InsertOrUpdateEstimatedVehicleJourney(db, values)
|
id, action, err := database.InsertOrUpdateEstimatedVehicleJourney(db, values)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
fmt.Printf("Error inserting/updating estimated vehicle journey: %v\n", err)
|
log.Printf("Error inserting/updating estimated vehicle journey: %v\n", err)
|
||||||
} else {
|
continue
|
||||||
if 1 == 0 {
|
|
||||||
fmt.Printf("Action: %s, ID: %d\n", action, id)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if action == "insert" {
|
if action == "insert" {
|
||||||
insertCount++
|
atomic.AddInt64(&insertCount, 1)
|
||||||
} else if action == "update" {
|
} else if action == "update" {
|
||||||
updateCount++
|
atomic.AddInt64(&updateCount, 1)
|
||||||
}
|
}
|
||||||
totalCount = insertCount + updateCount
|
|
||||||
|
|
||||||
//fmt.Printf("Inserts: %d, Updates: %d, Total: %d\n", insertCount, updateCount, totalCount)
|
// Progress reporting
|
||||||
if totalCount%1000 == 0 {
|
total := atomic.AddInt64(&insertCount, 0) + atomic.AddInt64(&updateCount, 0)
|
||||||
|
if total%1000 == 0 {
|
||||||
fmt.Printf(
|
fmt.Printf(
|
||||||
"Inserts: %d, Updates: %d, Total: %d; estimatedCalls = I: %d U: %d; recordedCalls = I: %d U: %d\n",
|
"EVJ - I: %d, U: %d, Total: %d; EstCalls - I: %d U: %d N: %d; RecCalls - I: %d U: %d N: %d\n",
|
||||||
insertCount,
|
atomic.LoadInt64(&insertCount),
|
||||||
updateCount,
|
atomic.LoadInt64(&updateCount),
|
||||||
totalCount,
|
total,
|
||||||
estimatedCallInsertCount,
|
atomic.LoadInt64(&estimatedCallInsertCount),
|
||||||
estimatedCallUpdateCount,
|
atomic.LoadInt64(&estimatedCallUpdateCount),
|
||||||
recordedCallInsertCount,
|
atomic.LoadInt64(&estimatedCallNoneCount),
|
||||||
recordedCallUpdateCount,
|
atomic.LoadInt64(&recordedCallInsertCount),
|
||||||
|
atomic.LoadInt64(&recordedCallUpdateCount),
|
||||||
|
atomic.LoadInt64(&recordedCallNoneCount),
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
|
// Process Estimated Calls
|
||||||
for _, estimatedCall := range journey.EstimatedCalls {
|
for _, estimatedCall := range journey.EstimatedCalls {
|
||||||
for _, call := range estimatedCall.EstimatedCall {
|
for _, call := range estimatedCall.EstimatedCall {
|
||||||
var estimatedValues []interface{}
|
var estimatedValues []interface{}
|
||||||
|
|
||||||
//1 estimatedvehiclejourney
|
|
||||||
estimatedValues = append(estimatedValues, id)
|
estimatedValues = append(estimatedValues, id)
|
||||||
//2 order
|
|
||||||
estimatedValues = append(estimatedValues, call.Order)
|
estimatedValues = append(estimatedValues, call.Order)
|
||||||
//3 stoppointref
|
|
||||||
estimatedValues = append(estimatedValues, call.StopPointRef)
|
estimatedValues = append(estimatedValues, call.StopPointRef)
|
||||||
//4 aimeddeparturetime
|
|
||||||
estimatedValues = append(estimatedValues, call.AimedDepartureTime)
|
estimatedValues = append(estimatedValues, call.AimedDepartureTime)
|
||||||
//5 expecteddeparturetime
|
|
||||||
estimatedValues = append(estimatedValues, call.ExpectedDepartureTime)
|
estimatedValues = append(estimatedValues, call.ExpectedDepartureTime)
|
||||||
//6 aimedarrivaltime
|
|
||||||
estimatedValues = append(estimatedValues, call.AimedArrivalTime)
|
estimatedValues = append(estimatedValues, call.AimedArrivalTime)
|
||||||
//7 expectedarrivaltime
|
|
||||||
estimatedValues = append(estimatedValues, call.ExpectedArrivalTime)
|
estimatedValues = append(estimatedValues, call.ExpectedArrivalTime)
|
||||||
//8 cancellation
|
|
||||||
estimatedValues = append(estimatedValues, call.Cancellation)
|
estimatedValues = append(estimatedValues, call.Cancellation)
|
||||||
|
|
||||||
//9 estimated_data (JSON)
|
// estimated_data JSON
|
||||||
estimatedJsonObject := make(map[string]interface{})
|
estimatedJsonObject := make(map[string]interface{})
|
||||||
// data allrady loged
|
|
||||||
if call.ExpectedDepartureTime != "" {
|
if call.ExpectedDepartureTime != "" {
|
||||||
estimatedJsonObject["ExpectedDepartureTime"] = call.ExpectedDepartureTime
|
estimatedJsonObject["ExpectedDepartureTime"] = call.ExpectedDepartureTime
|
||||||
}
|
}
|
||||||
@@ -204,7 +291,6 @@ func DBData(data *data.Data) {
|
|||||||
if call.Cancellation != "" {
|
if call.Cancellation != "" {
|
||||||
estimatedJsonObject["Cancellation"] = call.Cancellation
|
estimatedJsonObject["Cancellation"] = call.Cancellation
|
||||||
}
|
}
|
||||||
// The rest
|
|
||||||
if call.StopPointName != "" {
|
if call.StopPointName != "" {
|
||||||
estimatedJsonObject["StopPointName"] = call.StopPointName
|
estimatedJsonObject["StopPointName"] = call.StopPointName
|
||||||
}
|
}
|
||||||
@@ -275,64 +361,41 @@ func DBData(data *data.Data) {
|
|||||||
estimatedJsonObject["Occupancy"] = call.Occupancy
|
estimatedJsonObject["Occupancy"] = call.Occupancy
|
||||||
}
|
}
|
||||||
|
|
||||||
// Convert the JSON object to a JSON string
|
|
||||||
jsonString, err := json.Marshal(estimatedJsonObject)
|
jsonString, err := json.Marshal(estimatedJsonObject)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatal(err)
|
log.Printf("Error marshaling estimated call JSON: %v\n", err)
|
||||||
|
continue
|
||||||
}
|
}
|
||||||
estimatedValues = append(estimatedValues, string(jsonString))
|
estimatedValues = append(estimatedValues, string(jsonString))
|
||||||
|
|
||||||
// Insert or update the record
|
// Convert to string values
|
||||||
stringValues := make([]string, len(estimatedValues))
|
interfaceValues := make([]interface{}, len(estimatedValues))
|
||||||
for i, v := range estimatedValues {
|
for i, v := range estimatedValues {
|
||||||
stringValues[i] = fmt.Sprintf("%v", v)
|
interfaceValues[i] = fmt.Sprintf("%v", v)
|
||||||
}
|
|
||||||
interfaceValues := make([]interface{}, len(stringValues))
|
|
||||||
for i, v := range stringValues {
|
|
||||||
interfaceValues[i] = v
|
|
||||||
}
|
|
||||||
id, action, err := database.InsertOrUpdateEstimatedCall(db, interfaceValues)
|
|
||||||
if err != nil {
|
|
||||||
fmt.Printf("Error inserting/updating estimated call: %v\n", err)
|
|
||||||
} else {
|
|
||||||
if 1 == 0 {
|
|
||||||
fmt.Printf("Action: %s, ID: %d\n", action, id)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if action == "insert" {
|
// Send to worker pool
|
||||||
estimatedCallInsertCount++
|
estimatedCallJobs <- callJob{evjID: id, values: interfaceValues}
|
||||||
} else if action == "update" {
|
|
||||||
estimatedCallUpdateCount++
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Process Recorded Calls
|
||||||
for _, recordedCall := range journey.RecordedCalls {
|
for _, recordedCall := range journey.RecordedCalls {
|
||||||
for _, call := range recordedCall.RecordedCall {
|
for _, call := range recordedCall.RecordedCall {
|
||||||
var recordedValues []interface{}
|
var recordedValues []interface{}
|
||||||
|
|
||||||
//1 estimatedvehiclejourney
|
|
||||||
recordedValues = append(recordedValues, id)
|
recordedValues = append(recordedValues, id)
|
||||||
//2 order
|
|
||||||
recordedValues = append(recordedValues, call.Order)
|
recordedValues = append(recordedValues, call.Order)
|
||||||
//3 stoppointref
|
|
||||||
recordedValues = append(recordedValues, call.StopPointRef)
|
recordedValues = append(recordedValues, call.StopPointRef)
|
||||||
//4 aimeddeparturetime
|
|
||||||
recordedValues = append(recordedValues, call.AimedDepartureTime)
|
recordedValues = append(recordedValues, call.AimedDepartureTime)
|
||||||
//5 expecteddeparturetime
|
|
||||||
recordedValues = append(recordedValues, call.ExpectedDepartureTime)
|
recordedValues = append(recordedValues, call.ExpectedDepartureTime)
|
||||||
//6 aimedarrivaltime
|
|
||||||
recordedValues = append(recordedValues, call.AimedArrivalTime)
|
recordedValues = append(recordedValues, call.AimedArrivalTime)
|
||||||
//7 expectedarrivaltime
|
|
||||||
recordedValues = append(recordedValues, call.ExpectedArrivalTime)
|
recordedValues = append(recordedValues, call.ExpectedArrivalTime)
|
||||||
//8 cancellation
|
|
||||||
recordedValues = append(recordedValues, call.Cancellation)
|
recordedValues = append(recordedValues, call.Cancellation)
|
||||||
//9 actualdeparturetime
|
|
||||||
recordedValues = append(recordedValues, call.ActualDepartureTime)
|
recordedValues = append(recordedValues, call.ActualDepartureTime)
|
||||||
//10 actualarrivaltime
|
|
||||||
recordedValues = append(recordedValues, call.ActualArrivalTime)
|
recordedValues = append(recordedValues, call.ActualArrivalTime)
|
||||||
|
|
||||||
//11 recorded_data (JSON)
|
// recorded_data JSON
|
||||||
recordedJsonObject := make(map[string]interface{})
|
recordedJsonObject := make(map[string]interface{})
|
||||||
if call.StopPointName != "" {
|
if call.StopPointName != "" {
|
||||||
recordedJsonObject["StopPointName"] = call.StopPointName
|
recordedJsonObject["StopPointName"] = call.StopPointName
|
||||||
@@ -350,62 +413,73 @@ func DBData(data *data.Data) {
|
|||||||
recordedJsonObject["Occupancy"] = call.Occupancy
|
recordedJsonObject["Occupancy"] = call.Occupancy
|
||||||
}
|
}
|
||||||
|
|
||||||
// Convert the JSON object to a JSON string
|
|
||||||
jsonString, err := json.Marshal(recordedJsonObject)
|
jsonString, err := json.Marshal(recordedJsonObject)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatal(err)
|
log.Printf("Error marshaling recorded call JSON: %v\n", err)
|
||||||
|
continue
|
||||||
}
|
}
|
||||||
recordedValues = append(recordedValues, string(jsonString))
|
recordedValues = append(recordedValues, string(jsonString))
|
||||||
|
|
||||||
// Insert or update the record
|
// Convert to string values
|
||||||
stringValues := make([]string, len(recordedValues))
|
interfaceValues := make([]interface{}, len(recordedValues))
|
||||||
for i, v := range recordedValues {
|
for i, v := range recordedValues {
|
||||||
stringValues[i] = fmt.Sprintf("%v", v)
|
interfaceValues[i] = fmt.Sprintf("%v", v)
|
||||||
}
|
|
||||||
interfaceValues := make([]interface{}, len(stringValues))
|
|
||||||
for i, v := range stringValues {
|
|
||||||
interfaceValues[i] = v
|
|
||||||
}
|
}
|
||||||
|
|
||||||
id, action, err := database.InsertOrUpdateRecordedCall(db, interfaceValues)
|
// Send to worker pool
|
||||||
if err != nil {
|
recordedCallJobs <- callJob{evjID: id, values: interfaceValues}
|
||||||
fmt.Printf("Error inserting/updating recorded call: %v\n", err)
|
}
|
||||||
} else {
|
}
|
||||||
if 1 == 0 {
|
}
|
||||||
fmt.Printf("Action: %s, ID: %d\n", action, id)
|
}()
|
||||||
}
|
}
|
||||||
|
|
||||||
if action == "insert" {
|
// Send all EVJ jobs
|
||||||
recordedCallInsertCount++
|
for i := range journeys {
|
||||||
//fmt.Printf("Action: %s, ID: %d\n", action, id)
|
evjJobs <- evjJob{index: i}
|
||||||
} else if action == "update" {
|
|
||||||
recordedCallUpdateCount++
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
close(evjJobs)
|
||||||
|
|
||||||
}
|
// Wait for EVJ processing to complete
|
||||||
|
wg.Wait()
|
||||||
|
|
||||||
|
// Close call job channels and wait for call processing to complete
|
||||||
|
close(estimatedCallJobs)
|
||||||
|
close(recordedCallJobs)
|
||||||
|
callWg.Wait()
|
||||||
|
|
||||||
|
// Record end time
|
||||||
|
endTime := time.Now()
|
||||||
|
|
||||||
|
// Print final stats
|
||||||
fmt.Printf(
|
fmt.Printf(
|
||||||
"DONE: Inserts: %d, Updates: %d, Total: %d; estimatedCalls = I: %d U: %d; recordedCalls = I: %d U: %d\n",
|
"\nDONE: EVJ - Inserts: %d, Updates: %d, Total: %d\n"+
|
||||||
insertCount,
|
" EstimatedCalls - I: %d U: %d N: %d\n"+
|
||||||
updateCount,
|
" RecordedCalls - I: %d U: %d N: %d\n",
|
||||||
totalCount,
|
atomic.LoadInt64(&insertCount),
|
||||||
estimatedCallInsertCount,
|
atomic.LoadInt64(&updateCount),
|
||||||
estimatedCallUpdateCount,
|
atomic.LoadInt64(&insertCount)+atomic.LoadInt64(&updateCount),
|
||||||
recordedCallInsertCount,
|
atomic.LoadInt64(&estimatedCallInsertCount),
|
||||||
recordedCallUpdateCount,
|
atomic.LoadInt64(&estimatedCallUpdateCount),
|
||||||
|
atomic.LoadInt64(&estimatedCallNoneCount),
|
||||||
|
atomic.LoadInt64(&recordedCallInsertCount),
|
||||||
|
atomic.LoadInt64(&recordedCallUpdateCount),
|
||||||
|
atomic.LoadInt64(&recordedCallNoneCount),
|
||||||
)
|
)
|
||||||
|
|
||||||
// Create map to hold JSON
|
// Create map to hold JSON
|
||||||
serviceDeliveryJsonObject := make(map[string]interface{})
|
serviceDeliveryJsonObject := make(map[string]interface{})
|
||||||
|
serviceDeliveryJsonObject["Inserts"] = atomic.LoadInt64(&insertCount)
|
||||||
// Add fields to JSON
|
serviceDeliveryJsonObject["Updates"] = atomic.LoadInt64(&updateCount)
|
||||||
serviceDeliveryJsonObject["Inserts"] = insertCount
|
serviceDeliveryJsonObject["EstimatedCallInserts"] = atomic.LoadInt64(&estimatedCallInsertCount)
|
||||||
serviceDeliveryJsonObject["Updates"] = updateCount
|
serviceDeliveryJsonObject["EstimatedCallUpdates"] = atomic.LoadInt64(&estimatedCallUpdateCount)
|
||||||
serviceDeliveryJsonObject["EstimatedCallInserts"] = estimatedCallInsertCount
|
serviceDeliveryJsonObject["EstimatedCallNone"] = atomic.LoadInt64(&estimatedCallNoneCount)
|
||||||
serviceDeliveryJsonObject["EstimatedCallUpdates"] = estimatedCallUpdateCount
|
serviceDeliveryJsonObject["RecordedCallInserts"] = atomic.LoadInt64(&recordedCallInsertCount)
|
||||||
serviceDeliveryJsonObject["RecordedCallInserts"] = recordedCallInsertCount
|
serviceDeliveryJsonObject["RecordedCallUpdates"] = atomic.LoadInt64(&recordedCallUpdateCount)
|
||||||
serviceDeliveryJsonObject["RecordedCallUpdates"] = recordedCallUpdateCount
|
serviceDeliveryJsonObject["RecordedCallNone"] = atomic.LoadInt64(&recordedCallNoneCount)
|
||||||
|
serviceDeliveryJsonObject["StartTime"] = startTime.Format(time.RFC3339)
|
||||||
|
serviceDeliveryJsonObject["EndTime"] = endTime.Format(time.RFC3339)
|
||||||
|
serviceDeliveryJsonObject["Duration"] = endTime.Sub(startTime).String()
|
||||||
|
|
||||||
// Convert JSON object to JSON string
|
// Convert JSON object to JSON string
|
||||||
serviceDeliveryJsonString, err := json.Marshal(serviceDeliveryJsonObject)
|
serviceDeliveryJsonString, err := json.Marshal(serviceDeliveryJsonObject)
|
||||||
@@ -418,4 +492,6 @@ func DBData(data *data.Data) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatal(err)
|
log.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fmt.Println("Finished with this ServiceDelivery!")
|
||||||
}
|
}
|
||||||
|
|||||||
9
go.mod
9
go.mod
@@ -1,5 +1,10 @@
|
|||||||
module ti1
|
module ti1
|
||||||
|
|
||||||
go 1.22.1
|
go 1.26.0
|
||||||
|
|
||||||
require github.com/lib/pq v1.10.9
|
require (
|
||||||
|
github.com/lib/pq v1.11.2
|
||||||
|
github.com/valkey-io/valkey-go v1.0.71
|
||||||
|
)
|
||||||
|
|
||||||
|
require golang.org/x/sys v0.41.0 // indirect
|
||||||
|
|||||||
10
go.sum
10
go.sum
@@ -1,2 +1,12 @@
|
|||||||
github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw=
|
github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw=
|
||||||
github.com/lib/pq v1.10.9/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o=
|
github.com/lib/pq v1.10.9/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o=
|
||||||
|
github.com/lib/pq v1.11.2 h1:x6gxUeu39V0BHZiugWe8LXZYZ+Utk7hSJGThs8sdzfs=
|
||||||
|
github.com/lib/pq v1.11.2/go.mod h1:/p+8NSbOcwzAEI7wiMXFlgydTwcgTr3OSKMsD2BitpA=
|
||||||
|
github.com/valkey-io/valkey-go v1.0.52 h1:ojrR736satGucqpllYzal8fUrNNROc11V10zokAyIYg=
|
||||||
|
github.com/valkey-io/valkey-go v1.0.52/go.mod h1:BXlVAPIL9rFQinSFM+N32JfWzfCaUAqBpZkc4vPY6fM=
|
||||||
|
github.com/valkey-io/valkey-go v1.0.71 h1:tuKjGVLd7/I8CyUwqAq5EaD7isxQdlvJzXo3jS8pZW0=
|
||||||
|
github.com/valkey-io/valkey-go v1.0.71/go.mod h1:VGhZ6fs68Qrn2+OhH+6waZH27bjpgQOiLyUQyXuYK5k=
|
||||||
|
golang.org/x/sys v0.24.0 h1:Twjiwq9dn6R1fQcyiK+wQyHWfaz/BJB+YIpzU/Cv3Xg=
|
||||||
|
golang.org/x/sys v0.24.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
|
||||||
|
golang.org/x/sys v0.41.0 h1:Ivj+2Cp/ylzLiEU89QhWblYnOE9zerudt9Ftecq2C6k=
|
||||||
|
golang.org/x/sys v0.41.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks=
|
||||||
|
|||||||
13
main.go
13
main.go
@@ -9,6 +9,7 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
func main() {
|
func main() {
|
||||||
|
log.Println("ti1 v1.1.0")
|
||||||
log.Println("Starting...")
|
log.Println("Starting...")
|
||||||
|
|
||||||
// Setup the database
|
// Setup the database
|
||||||
@@ -17,10 +18,14 @@ func main() {
|
|||||||
log.Fatalf("Database setup failed: %v", err)
|
log.Fatalf("Database setup failed: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Get the current timestamp
|
||||||
|
starttimestamp := time.Now().Format("20060102T150405")
|
||||||
|
log.Printf("Starting timestamp: %s", starttimestamp)
|
||||||
|
|
||||||
for {
|
for {
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
|
|
||||||
data, err := data.FetchData()
|
data, err := data.FetchData(starttimestamp)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatal(err)
|
log.Fatal(err)
|
||||||
}
|
}
|
||||||
@@ -29,9 +34,9 @@ func main() {
|
|||||||
|
|
||||||
log.Println("finished in", time.Since(start))
|
log.Println("finished in", time.Since(start))
|
||||||
elapsed := time.Since(start)
|
elapsed := time.Since(start)
|
||||||
if elapsed < 5*time.Minute {
|
if elapsed < 20*time.Second {
|
||||||
log.Printf("starting again in %v", 5*time.Minute-elapsed)
|
log.Printf("starting again in %v", 20*time.Second-elapsed)
|
||||||
time.Sleep(1*time.Minute - elapsed)
|
time.Sleep(20*time.Second - elapsed)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
26
valki/commands.go
Normal file
26
valki/commands.go
Normal file
@@ -0,0 +1,26 @@
|
|||||||
|
package valki
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/valkey-io/valkey-go"
|
||||||
|
)
|
||||||
|
|
||||||
|
func SetValkeyValue(ctx context.Context, client valkey.Client, key, value string) error {
|
||||||
|
err := client.Do(ctx, client.B().Set().Key(key).Value(value).Ex(time.Hour).Build()).Error()
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to set value in Valkey: %v", err)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func GetValkeyValue(ctx context.Context, client valkey.Client, key string) (string, error) {
|
||||||
|
value, err := client.Do(ctx, client.B().Get().Key(key).Build()).ToString()
|
||||||
|
if err != nil {
|
||||||
|
return "hehe", nil
|
||||||
|
//return "", fmt.Errorf("failed to get value from Valkey: %v", err)
|
||||||
|
}
|
||||||
|
return value, nil
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user