Compare commits
54 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
1bcbbf6f44 | ||
|
|
8f973853bd | ||
|
|
6632c38c0c | ||
|
|
8c0bd734c6 | ||
|
|
087b0ec637 | ||
|
|
e90e35cfbc | ||
|
|
c45723a5b0 | ||
|
|
779530f036 | ||
|
|
fd43b3e4bb | ||
|
|
bed666bd81 | ||
|
|
c09ea5784a | ||
|
|
8af34a9ab3 | ||
|
|
5bdc2acd81 | ||
|
|
3058d98a67 | ||
|
|
21b3bba164 | ||
|
|
c08bcb8432 | ||
|
|
14af72959e | ||
|
|
afd6b12acf | ||
|
|
d996411adf | ||
|
|
083a267b2a | ||
|
|
fe72a507e1 | ||
|
|
5203208fe7 | ||
|
|
2141c5f168 | ||
|
|
9919c159f2 | ||
|
|
07838da0ad | ||
|
|
b8a2a5837f | ||
|
|
df0b5135bd | ||
|
|
a2c1766dd1 | ||
|
|
97a6506a65 | ||
|
|
ba558803ff | ||
|
|
c6fc0070cf | ||
|
|
c456bdecdb | ||
|
|
1020dacf79 | ||
|
|
6c20858280 | ||
|
|
119898acc5 | ||
|
|
51fb986710 | ||
|
|
3c1b84197a | ||
|
|
75e007603f | ||
|
|
a50ef5b899 | ||
|
|
def3a9c38c | ||
|
|
c1992f7616 | ||
|
|
a274810818 | ||
|
|
42b75360c4 | ||
|
|
c99f22b131 | ||
|
|
7b96214476 | ||
|
|
c12ec02270 | ||
|
|
6db5b12d7b | ||
|
|
8f372b4213 | ||
|
|
ccbf9c8d72 | ||
|
|
fb4862c078 | ||
|
|
f3dbd4505f | ||
|
|
2462c6057e | ||
|
|
cda95aa4f2 | ||
|
|
b7e448df8d |
39
.github/workflows/docker-image.yml
vendored
39
.github/workflows/docker-image.yml
vendored
@@ -4,6 +4,7 @@ on:
|
||||
push:
|
||||
branches:
|
||||
- main
|
||||
- dev
|
||||
|
||||
jobs:
|
||||
build:
|
||||
@@ -26,26 +27,32 @@ jobs:
|
||||
id: timestamp
|
||||
run: echo "TIMESTAMP=$(date +%Y%m%d%H%M%S)" >> $GITHUB_ENV
|
||||
|
||||
- name: Get the commit tag
|
||||
id: get-tag
|
||||
run: echo "GIT_TAG=$(git describe --tags --exact-match 2>/dev/null || echo 'no-tag')" >> $GITHUB_ENV
|
||||
- name: Get commit version
|
||||
id: commit-version
|
||||
run: |
|
||||
COMMIT_MSG=$(git log -1 --pretty=%B)
|
||||
echo "Commit message: $COMMIT_MSG" # Debugging output
|
||||
# Updated regex to handle both vX.Y, vX.Y.Z, and vX.Y-pre-release formats
|
||||
if [[ "$COMMIT_MSG" =~ ^v[0-9]+\.[0-9]+(\.[0-9]+)?(-[a-zA-Z0-9._-]+)?$ ]]; then
|
||||
echo "Version match: $COMMIT_MSG"
|
||||
echo "VERSION=${COMMIT_MSG}" >> $GITHUB_ENV
|
||||
else
|
||||
echo "No version match, defaulting to 'dev'"
|
||||
echo "VERSION=dev" >> $GITHUB_ENV
|
||||
fi
|
||||
|
||||
- name: Build Docker image
|
||||
run: |
|
||||
if [ "${{ env.GIT_TAG }}" == "no-tag" ]; then
|
||||
docker build -t ti1:dev-${{ env.TIMESTAMP }} .
|
||||
else
|
||||
docker build -t ti1:latest -t ti1:${{ env.GIT_TAG }} .
|
||||
fi
|
||||
docker build -t ti1:${{ env.VERSION }} .
|
||||
|
||||
- name: Push Docker image
|
||||
run: |
|
||||
if [ "${{ env.GIT_TAG }}" == "no-tag" ]; then
|
||||
docker tag ti1:dev-${{ env.TIMESTAMP }} ${{ secrets.DOCKER_USERNAME }}/ti1:dev-${{ env.TIMESTAMP }}
|
||||
docker push ${{ secrets.DOCKER_USERNAME }}/ti1:dev-${{ env.TIMESTAMP }}
|
||||
else
|
||||
docker tag ti1:latest ${{ secrets.DOCKER_USERNAME }}/ti1:latest
|
||||
docker tag ti1:${{ env.GIT_TAG }} ${{ secrets.DOCKER_USERNAME }}/ti1:${{ env.GIT_TAG }}
|
||||
docker push ${{ secrets.DOCKER_USERNAME }}/ti1:latest
|
||||
docker push ${{ secrets.DOCKER_USERNAME }}/ti1:${{ env.GIT_TAG }}
|
||||
# Always push to 'dev' tag
|
||||
docker tag ti1:${{ env.VERSION }} ${{ secrets.DOCKER_USERNAME }}/ti1:dev
|
||||
docker push ${{ secrets.DOCKER_USERNAME }}/ti1:dev
|
||||
|
||||
# If the version is valid, also push that specific version tag
|
||||
if [[ "${{ env.VERSION }}" != "dev" ]]; then
|
||||
docker tag ti1:${{ env.VERSION }} ${{ secrets.DOCKER_USERNAME }}/ti1:${{ env.VERSION }}
|
||||
docker push ${{ secrets.DOCKER_USERNAME }}/ti1:${{ env.VERSION }}
|
||||
fi
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
# Use the official Golang image as the base image
|
||||
FROM golang:1.22.1
|
||||
FROM golang:1.23.4
|
||||
|
||||
# Set the Current Working Directory inside the container
|
||||
WORKDIR /app
|
||||
|
||||
160
README.md
160
README.md
@@ -1,22 +1,164 @@
|
||||
# TI1
|
||||
|
||||
The best thing to happen since yesterday at 3 pm
|
||||
The best thing to happen since yesterday at 2:56 pm
|
||||
|
||||
## Usage
|
||||
|
||||
To use this project, you can pull the Docker image from Docker Hub and run it using the following commands:
|
||||
Start with getting Docker then do the following:
|
||||
|
||||
### Pull the Docker Image
|
||||
### Create the setup files
|
||||
Create a `docker-compose.yml`
|
||||
```yaml
|
||||
services:
|
||||
db:
|
||||
image: postgres:17.2
|
||||
container_name: postgres-db
|
||||
environment:
|
||||
POSTGRES_USER: postgres
|
||||
POSTGRES_PASSWORD: RootPassword
|
||||
POSTGRES_DB: ti1
|
||||
ports:
|
||||
- "5432:5432"
|
||||
volumes:
|
||||
- ./postgres_data:/var/lib/postgresql/data # Store data in the current directory
|
||||
- ./init.sql:/docker-entrypoint-initdb.d/init.sql:ro
|
||||
networks:
|
||||
- app-network
|
||||
healthcheck:
|
||||
test: ["CMD", "pg_isready", "-U", "postgres", "-d", "ti1", "-h", "db"]
|
||||
interval: 10s
|
||||
retries: 5
|
||||
restart: always # Ensure the container always restarts
|
||||
|
||||
```sh
|
||||
docker pull pigwin1/ti1:latest
|
||||
valkey:
|
||||
image: valkey/valkey:latest
|
||||
container_name: valkey
|
||||
environment:
|
||||
VALKEY_PASSWORD: the_valkey_password
|
||||
ports:
|
||||
- "6379:6379"
|
||||
volumes:
|
||||
- ./valkey_data:/data
|
||||
networks:
|
||||
- app-network
|
||||
restart: always # Ensure the container always restarts
|
||||
|
||||
ti1-container:
|
||||
image: pigwin1/ti1:dev
|
||||
container_name: ti1-container
|
||||
environment:
|
||||
DB_HOST: db
|
||||
DB_PORT: 5432
|
||||
DB_USER: ti1
|
||||
DB_PASSWORD: ti1password
|
||||
DB_NAME: ti1
|
||||
DB_SSLMODE: disable
|
||||
VALKEY_HOST: valkey
|
||||
VALKEY_PORT: 6379
|
||||
VALKEY_PASSWORD: the_valkey_password
|
||||
depends_on:
|
||||
db:
|
||||
condition: service_healthy # Wait until the db service is healthy
|
||||
valkey:
|
||||
condition: service_started # Wait until the valkey service is started
|
||||
networks:
|
||||
- app-network
|
||||
restart: always # Ensure the container always restarts
|
||||
|
||||
networks:
|
||||
app-network:
|
||||
driver: bridge
|
||||
|
||||
volumes:
|
||||
postgres_data:
|
||||
driver: local
|
||||
valkey_data:
|
||||
driver: local
|
||||
```
|
||||
|
||||
### Run the Docker Container
|
||||
```sh
|
||||
docker run -d --name ti1-container -e DB_HOST=<your_db_host> -e DB_PORT=<your_db_port> -e DB_USER=<your_db_user> -e DB_PASSWORD=<your_db_password> -e DB_NAME=<your_db_name> -e DB_SSLMODE=<your_db_sslmode> pigwin1/ti1:latest
|
||||
Create `init.sql`
|
||||
```sql
|
||||
-- Check if 'post' user exists; create if not
|
||||
DO $$
|
||||
BEGIN
|
||||
IF NOT EXISTS (SELECT FROM pg_roles WHERE rolname = 'post') THEN
|
||||
CREATE ROLE post WITH LOGIN PASSWORD 'postpassword';
|
||||
GRANT ALL PRIVILEGES ON DATABASE ti1 TO post;
|
||||
ALTER ROLE post WITH SUPERUSER;
|
||||
END IF;
|
||||
END
|
||||
$$;
|
||||
|
||||
-- Check if 'ti1' user exists; create if not
|
||||
DO $$
|
||||
BEGIN
|
||||
IF NOT EXISTS (SELECT FROM pg_roles WHERE rolname = 'ti1') THEN
|
||||
CREATE ROLE ti1 WITH LOGIN PASSWORD 'ti1password';
|
||||
GRANT ALL PRIVILEGES ON DATABASE ti1 TO ti1;
|
||||
GRANT ALL PRIVILEGES ON ALL TABLES IN SCHEMA public TO ti1;
|
||||
GRANT ALL PRIVILEGES ON ALL SEQUENCES IN SCHEMA public TO ti1;
|
||||
GRANT ALL PRIVILEGES ON ALL FUNCTIONS IN SCHEMA public TO ti1;
|
||||
-- Grant the ti1 user the necessary permissions on the public schema
|
||||
GRANT USAGE, CREATE ON SCHEMA public TO ti1;
|
||||
|
||||
-- Grant all permissions (SELECT, INSERT, UPDATE, DELETE, etc.) on all existing tables in the public schema
|
||||
GRANT ALL ON ALL TABLES IN SCHEMA public TO ti1;
|
||||
|
||||
-- Grant all permissions on all existing sequences in the public schema
|
||||
GRANT ALL ON ALL SEQUENCES IN SCHEMA public TO ti1;
|
||||
|
||||
-- Grant all permissions on all functions in the public schema
|
||||
GRANT ALL ON ALL FUNCTIONS IN SCHEMA public TO ti1;
|
||||
|
||||
-- Ensure that the ti1 user will have access to new tables, sequences, and functions created in the public schema
|
||||
ALTER DEFAULT PRIVILEGES IN SCHEMA public GRANT ALL ON TABLES TO ti1;
|
||||
ALTER DEFAULT PRIVILEGES IN SCHEMA public GRANT ALL ON SEQUENCES TO ti1;
|
||||
ALTER DEFAULT PRIVILEGES IN SCHEMA public GRANT ALL ON FUNCTIONS TO ti1;
|
||||
|
||||
-- Optionally, grant full permissions on the entire database to ti1 (if needed)
|
||||
-- GRANT ALL PRIVILEGES ON DATABASE ti1 TO ti1;
|
||||
|
||||
END IF;
|
||||
END
|
||||
$$;
|
||||
```
|
||||
|
||||
Remember to change the password values
|
||||
|
||||
### Run the Docker Containers
|
||||
```sh
|
||||
docker compose up -d
|
||||
```
|
||||
|
||||
### edit the postgress config (optinal)
|
||||
open the config file
|
||||
```sh
|
||||
nano postgres_data/postgresql.conf
|
||||
```
|
||||
Change the following values
|
||||
```conf
|
||||
listen_addresses = '*'
|
||||
max_connections = 100
|
||||
shared_buffers = 16GB
|
||||
work_mem = 256MB
|
||||
maintenance_work_mem = 2GB
|
||||
dynamic_shared_memory_type = posix
|
||||
max_wal_size = 1GB
|
||||
min_wal_size = 80MB
|
||||
```
|
||||
set these to what makes most sense for you
|
||||
|
||||
These values should also be set bet not necessarily changed
|
||||
```conf
|
||||
log_timezone = 'Etc/UTC'
|
||||
datestyle = 'iso, mdy'
|
||||
timezone = 'Etc/UTC'
|
||||
lc_messages = 'en_US.utf8'
|
||||
lc_monetary = 'en_US.utf8'
|
||||
lc_numeric = 'en_US.utf8'
|
||||
lc_time = 'en_US.utf8'
|
||||
default_text_search_config = 'pg_catalog.english'
|
||||
```
|
||||
Replace `<your_db_host>`, `<your_db_port>`, `<your_db_user>`, `<your_db_password>`, `<your_db_name>`, and `<your_db_sslmode>` with your actual database configuration values.
|
||||
|
||||
### Docker Hub Repository
|
||||
You can find the Docker image on Docker Hub at the following link:
|
||||
|
||||
@@ -7,5 +7,14 @@
|
||||
"dbname": "ti1",
|
||||
"sslmode": "disable"
|
||||
},
|
||||
"temp": "value"
|
||||
"valkey": {
|
||||
"host": "127.0.0.1",
|
||||
"port": "6379",
|
||||
"max_conns": 50,
|
||||
"timeout_ms": 5000,
|
||||
"password": "the_valkey_password"
|
||||
},
|
||||
"temp": "value",
|
||||
"dataset_id": "",
|
||||
"excluded_dataset_ids": ""
|
||||
}
|
||||
@@ -4,8 +4,11 @@ import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"os"
|
||||
"strconv"
|
||||
)
|
||||
|
||||
const configFilePath = "config/conf.json"
|
||||
|
||||
type Config struct {
|
||||
Database struct {
|
||||
Host string `json:"host"`
|
||||
@@ -15,12 +18,21 @@ type Config struct {
|
||||
DBName string `json:"dbname"`
|
||||
SSLMode string `json:"sslmode"`
|
||||
} `json:"database"`
|
||||
Temp string `json:"temp"`
|
||||
Valkey struct {
|
||||
Host string `json:"host"`
|
||||
Port string `json:"port"`
|
||||
MaxConns int `json:"max_conns"`
|
||||
TimeoutMs int `json:"timeout_ms"`
|
||||
Password string `json:"password"`
|
||||
} `json:"valkey"`
|
||||
Temp string `json:"temp"`
|
||||
DatasetId string `json:"dataset_id"`
|
||||
ExcludedDatasetIds string `json:"excluded_dataset_ids"`
|
||||
}
|
||||
|
||||
func LoadConfig(file string) (Config, error) {
|
||||
func LoadConfig() (Config, error) {
|
||||
var config Config
|
||||
configFile, err := os.Open(file)
|
||||
configFile, err := os.Open(configFilePath)
|
||||
if err != nil {
|
||||
return config, fmt.Errorf("failed to open config file: %w", err)
|
||||
}
|
||||
@@ -53,7 +65,32 @@ func LoadConfig(file string) (Config, error) {
|
||||
if temp := os.Getenv("TEMP"); temp != "" {
|
||||
config.Temp = temp
|
||||
}
|
||||
//log.Println("Temp value:", config.Temp)
|
||||
|
||||
// Override Valkey settings with environment variables
|
||||
if valkeyHost := os.Getenv("VALKEY_HOST"); valkeyHost != "" {
|
||||
config.Valkey.Host = valkeyHost
|
||||
}
|
||||
if valkeyPort := os.Getenv("VALKEY_PORT"); valkeyPort != "" {
|
||||
config.Valkey.Port = valkeyPort
|
||||
}
|
||||
if maxConns := os.Getenv("VALKEY_MAX_CONNS"); maxConns != "" {
|
||||
if val, err := strconv.Atoi(maxConns); err == nil {
|
||||
config.Valkey.MaxConns = val
|
||||
}
|
||||
}
|
||||
if timeoutMs := os.Getenv("VALKEY_TIMEOUT_MS"); timeoutMs != "" {
|
||||
if val, err := strconv.Atoi(timeoutMs); err == nil {
|
||||
config.Valkey.TimeoutMs = val
|
||||
}
|
||||
}
|
||||
|
||||
// Override datasetId and excludedDatasetIds with environment variables
|
||||
if datasetId := os.Getenv("DATASET_ID"); datasetId != "" {
|
||||
config.DatasetId = datasetId
|
||||
}
|
||||
if excludedDatasetIds := os.Getenv("EXCLUDED_DATASET_IDS"); excludedDatasetIds != "" {
|
||||
config.ExcludedDatasetIds = excludedDatasetIds
|
||||
}
|
||||
|
||||
return config, nil
|
||||
}
|
||||
|
||||
14
config/db.go
14
config/db.go
@@ -4,20 +4,21 @@ import (
|
||||
"database/sql"
|
||||
"fmt"
|
||||
"log"
|
||||
"time"
|
||||
|
||||
_ "github.com/lib/pq"
|
||||
)
|
||||
|
||||
func ConnectToPostgreSQL() (*sql.DB, error) {
|
||||
fmt.Println("Connecting to PostgreSQL...")
|
||||
config, err := LoadConfig("config/conf.json")
|
||||
config, err := LoadConfig()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
fmt.Println("Configuration loaded successfully!")
|
||||
|
||||
connStr := fmt.Sprintf("host=%s port=%s user=%s password=%s dbname=%s sslmode=%s",
|
||||
connStr := fmt.Sprintf("host=%s port=%s user='%s' password='%s' dbname='%s' sslmode=%s",
|
||||
config.Database.Host, config.Database.Port, config.Database.User, config.Database.Password, config.Database.DBName, config.Database.SSLMode)
|
||||
|
||||
// Open connection to database
|
||||
@@ -26,7 +27,12 @@ func ConnectToPostgreSQL() (*sql.DB, error) {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
fmt.Println("Connection to PostgreSQL opened successfully!")
|
||||
// Set connection pool settings
|
||||
db.SetMaxOpenConns(25) // Maximum number of open connections to the database
|
||||
db.SetMaxIdleConns(25) // Maximum number of connections in the idle connection pool
|
||||
db.SetConnMaxLifetime(1 * time.Hour) // Maximum amount of time a connection may be reused
|
||||
|
||||
fmt.Println("Connection to PostgreSQL opened successfully :D")
|
||||
|
||||
// Ping database to verify connection
|
||||
err = db.Ping()
|
||||
@@ -51,7 +57,7 @@ func DisconnectFromPostgreSQL(db *sql.DB) error {
|
||||
}
|
||||
|
||||
func PrintDBConfig() {
|
||||
config, err := LoadConfig("config/conf.json")
|
||||
config, err := LoadConfig()
|
||||
if err != nil {
|
||||
fmt.Println("Error loading config:", err)
|
||||
return
|
||||
|
||||
100
config/valkey.go
Normal file
100
config/valkey.go
Normal file
@@ -0,0 +1,100 @@
|
||||
package config
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"log"
|
||||
"os"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"github.com/valkey-io/valkey-go"
|
||||
)
|
||||
|
||||
type ValkeyConfig struct {
|
||||
Host string `json:"host"`
|
||||
Port string `json:"port"`
|
||||
MaxConns int `json:"max_conns"`
|
||||
TimeoutMs int `json:"timeout_ms"`
|
||||
Password string `json:"password"`
|
||||
}
|
||||
|
||||
func LoadValkeyConfig(file string) (ValkeyConfig, error) {
|
||||
var config ValkeyConfig
|
||||
configFile, err := os.Open(file)
|
||||
if err != nil {
|
||||
return config, fmt.Errorf("failed to open config file: %w", err)
|
||||
}
|
||||
defer configFile.Close()
|
||||
|
||||
if err := json.NewDecoder(configFile).Decode(&config); err != nil {
|
||||
return config, fmt.Errorf("failed to parse Valkey config: %w", err)
|
||||
}
|
||||
|
||||
// Override with environment variables if set
|
||||
if host := os.Getenv("VALKEY_HOST"); host != "" {
|
||||
config.Host = host
|
||||
}
|
||||
if port := os.Getenv("VALKEY_PORT"); port != "" {
|
||||
config.Port = port
|
||||
}
|
||||
if maxConns := os.Getenv("VALKEY_MAX_CONNS"); maxConns != "" {
|
||||
if val, err := strconv.Atoi(maxConns); err == nil {
|
||||
config.MaxConns = val
|
||||
}
|
||||
}
|
||||
if timeoutMs := os.Getenv("VALKEY_TIMEOUT_MS"); timeoutMs != "" {
|
||||
if val, err := strconv.Atoi(timeoutMs); err == nil {
|
||||
config.TimeoutMs = val
|
||||
}
|
||||
}
|
||||
if password := os.Getenv("VALKEY_PASSWORD"); password != "" {
|
||||
config.Password = password
|
||||
}
|
||||
|
||||
return config, nil
|
||||
}
|
||||
|
||||
func ConnectToValkey() (valkey.Client, error) {
|
||||
fmt.Println("Loading configuration...")
|
||||
config, err := LoadConfig()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to load config: %v", err)
|
||||
}
|
||||
fmt.Println("Configuration loaded successfully!")
|
||||
|
||||
valkeyConfig := config.Valkey
|
||||
|
||||
// Setup Valkey client options
|
||||
options := valkey.ClientOption{
|
||||
InitAddress: []string{fmt.Sprintf("%s:%s", valkeyConfig.Host, valkeyConfig.Port)},
|
||||
Password: valkeyConfig.Password,
|
||||
// Additional options can be added here if required
|
||||
}
|
||||
|
||||
fmt.Printf("Connecting to Valkey at %s:%s...\n", valkeyConfig.Host, valkeyConfig.Port)
|
||||
client, err := valkey.NewClient(options)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to connect to Valkey: %v", err)
|
||||
}
|
||||
|
||||
// Optionally, perform a ping to validate the connection
|
||||
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*time.Duration(valkeyConfig.TimeoutMs))
|
||||
defer cancel()
|
||||
|
||||
if err := client.Do(ctx, client.B().Ping().Build()).Error(); err != nil {
|
||||
client.Close()
|
||||
return nil, fmt.Errorf("failed to ping Valkey: %v", err)
|
||||
}
|
||||
|
||||
log.Println("Connected to Valkey successfully!")
|
||||
return client, nil
|
||||
}
|
||||
|
||||
func DisconnectFromValkey(client valkey.Client) error {
|
||||
fmt.Println("Disconnecting from Valkey...")
|
||||
client.Close()
|
||||
log.Println("Disconnected from Valkey successfully!")
|
||||
return nil
|
||||
}
|
||||
16
data/data.go
16
data/data.go
@@ -2,7 +2,9 @@ package data
|
||||
|
||||
import (
|
||||
"encoding/xml"
|
||||
"log"
|
||||
"net/http"
|
||||
"strings"
|
||||
)
|
||||
|
||||
type Data struct {
|
||||
@@ -125,10 +127,20 @@ type Data struct {
|
||||
} `xml:"ServiceDelivery"`
|
||||
}
|
||||
|
||||
func FetchData() (*Data, error) {
|
||||
func FetchData(timestamp, datasetId, excludedDatasetIds string) (*Data, error) {
|
||||
client := &http.Client{}
|
||||
requestorId := "ti1-" + timestamp
|
||||
|
||||
resp, err := client.Get("https://api.entur.io/realtime/v1/rest/et?useOriginalId=true&maxSize=100000")
|
||||
baseURL := "https://api.entur.io/realtime/v1/rest/et?useOriginalId=true&maxSize=100000&requestorId=" + requestorId
|
||||
|
||||
if datasetId != "" {
|
||||
baseURL += "&datasetId=" + datasetId
|
||||
} else if excludedDatasetIds != "" {
|
||||
baseURL += "&excludedDatasetIds=" + strings.ReplaceAll(excludedDatasetIds, ",", "&excludedDatasetIds=")
|
||||
}
|
||||
|
||||
log.Println("Fetching data from URL:", baseURL)
|
||||
resp, err := client.Get(baseURL)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@@ -1,11 +1,17 @@
|
||||
package database
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/md5"
|
||||
"database/sql"
|
||||
"encoding/hex"
|
||||
"fmt"
|
||||
"ti1/valki"
|
||||
|
||||
"github.com/valkey-io/valkey-go"
|
||||
)
|
||||
|
||||
func InsertOrUpdateEstimatedCall(db *sql.DB, values []interface{}) (int, string, error) {
|
||||
func InsertOrUpdateEstimatedCall(ctx context.Context, db *sql.DB, values []interface{}, valkeyClient valkey.Client) (int, string, error) {
|
||||
// Replace empty strings with nil for timestamp fields
|
||||
for i, v := range values {
|
||||
if str, ok := v.(string); ok && str == "" {
|
||||
@@ -13,43 +19,71 @@ func InsertOrUpdateEstimatedCall(db *sql.DB, values []interface{}) (int, string,
|
||||
}
|
||||
}
|
||||
|
||||
query := `
|
||||
INSERT INTO calls (
|
||||
estimatedvehiclejourney, "order", stoppointref,
|
||||
aimeddeparturetime, expecteddeparturetime,
|
||||
aimedarrivaltime, expectedarrivaltime,
|
||||
cancellation, estimated_data
|
||||
)
|
||||
VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9)
|
||||
ON CONFLICT (estimatedvehiclejourney, "order")
|
||||
DO UPDATE SET
|
||||
stoppointref = EXCLUDED.stoppointref,
|
||||
aimeddeparturetime = EXCLUDED.aimeddeparturetime,
|
||||
expecteddeparturetime = EXCLUDED.expecteddeparturetime,
|
||||
aimedarrivaltime = EXCLUDED.aimedarrivaltime,
|
||||
expectedarrivaltime = EXCLUDED.expectedarrivaltime,
|
||||
cancellation = EXCLUDED.cancellation,
|
||||
estimated_data = EXCLUDED.estimated_data
|
||||
RETURNING CASE WHEN xmax = 0 THEN 'insert' ELSE 'update' END, id;
|
||||
`
|
||||
stmt, err := db.Prepare(query)
|
||||
if err != nil {
|
||||
return 0, "", fmt.Errorf("error preparing statement: %v", err)
|
||||
}
|
||||
defer stmt.Close()
|
||||
|
||||
var action string
|
||||
var id int
|
||||
err = stmt.QueryRow(values...).Scan(&action, &id)
|
||||
if err != nil {
|
||||
if 1 == 0 {
|
||||
fmt.Println("Executing query:", query)
|
||||
for i, v := range values {
|
||||
fmt.Printf("Value %d: (%v)\n", i+1, v)
|
||||
}
|
||||
|
||||
// Convert values to a single string and hash it using MD5
|
||||
var valuesString string
|
||||
for _, v := range values {
|
||||
if v != nil {
|
||||
valuesString += fmt.Sprintf("%v", v)
|
||||
}
|
||||
return 0, "", fmt.Errorf("error executing statement: %v", err)
|
||||
}
|
||||
return id, action, nil
|
||||
hash := md5.Sum([]byte(valuesString))
|
||||
hashString := hex.EncodeToString(hash[:])
|
||||
//fmt.Println("HashString:", hashString)
|
||||
|
||||
estimatedVehicleJourneyID := values[0]
|
||||
orderID := values[1]
|
||||
key := fmt.Sprintf("%v.%v", estimatedVehicleJourneyID, orderID)
|
||||
//fmt.Printf("Estimated Vehicle Journey ID: %v, Order ID: %v\n", estimatedVehicleJourneyID, orderID)
|
||||
|
||||
var err error
|
||||
|
||||
// Get the MD5 hash from Valkey
|
||||
retrievedHash, err := valki.GetValkeyValue(ctx, valkeyClient, key)
|
||||
if err != nil {
|
||||
return 0, "", fmt.Errorf("failed to get value from Valkey: %v", err)
|
||||
}
|
||||
|
||||
// Check if the retrieved value matches the original MD5 hash
|
||||
if retrievedHash != hashString {
|
||||
query := `
|
||||
INSERT INTO calls (
|
||||
estimatedvehiclejourney, "order", stoppointref,
|
||||
aimeddeparturetime, expecteddeparturetime,
|
||||
aimedarrivaltime, expectedarrivaltime,
|
||||
cancellation, estimated_data
|
||||
)
|
||||
VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9)
|
||||
ON CONFLICT (estimatedvehiclejourney, "order")
|
||||
DO UPDATE SET
|
||||
stoppointref = EXCLUDED.stoppointref,
|
||||
aimeddeparturetime = EXCLUDED.aimeddeparturetime,
|
||||
expecteddeparturetime = EXCLUDED.expecteddeparturetime,
|
||||
aimedarrivaltime = EXCLUDED.aimedarrivaltime,
|
||||
expectedarrivaltime = EXCLUDED.expectedarrivaltime,
|
||||
cancellation = EXCLUDED.cancellation,
|
||||
estimated_data = EXCLUDED.estimated_data
|
||||
RETURNING CASE WHEN xmax = 0 THEN 'insert' ELSE 'update' END, id;
|
||||
`
|
||||
stmt, err := db.Prepare(query)
|
||||
if err != nil {
|
||||
return 0, "", fmt.Errorf("error preparing statement: %v", err)
|
||||
}
|
||||
defer stmt.Close()
|
||||
|
||||
err = valki.SetValkeyValue(ctx, valkeyClient, key, hashString)
|
||||
if err != nil {
|
||||
return 0, "", fmt.Errorf("failed to set value in Valkey: %v", err)
|
||||
}
|
||||
|
||||
var action string
|
||||
var id int
|
||||
err = stmt.QueryRow(values...).Scan(&action, &id)
|
||||
if err != nil {
|
||||
return 0, "", fmt.Errorf("error executing statement: %v", err)
|
||||
}
|
||||
return id, action, nil
|
||||
} else {
|
||||
//fmt.Printf("MATCH!!! Original Hash: %s, Retrieved Hash: %s\n", hashString, retrievedHash)
|
||||
return 0, "none", nil
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,11 +1,17 @@
|
||||
package database
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/md5"
|
||||
"database/sql"
|
||||
"encoding/hex"
|
||||
"fmt"
|
||||
"ti1/valki"
|
||||
|
||||
"github.com/valkey-io/valkey-go"
|
||||
)
|
||||
|
||||
func InsertOrUpdateRecordedCall(db *sql.DB, values []interface{}) (int, string, error) {
|
||||
func InsertOrUpdateRecordedCall(ctx context.Context, db *sql.DB, values []interface{}, valkeyClient valkey.Client) (int, string, error) {
|
||||
// Replace empty strings with nil for timestamp fields
|
||||
for i, v := range values {
|
||||
if str, ok := v.(string); ok && str == "" {
|
||||
@@ -13,46 +19,71 @@ func InsertOrUpdateRecordedCall(db *sql.DB, values []interface{}) (int, string,
|
||||
}
|
||||
}
|
||||
|
||||
query := `
|
||||
INSERT INTO calls (
|
||||
estimatedvehiclejourney, "order", stoppointref,
|
||||
aimeddeparturetime, expecteddeparturetime,
|
||||
aimedarrivaltime, expectedarrivaltime,
|
||||
cancellation, actualdeparturetime, actualarrivaltime,
|
||||
recorded_data
|
||||
)
|
||||
VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11)
|
||||
ON CONFLICT (estimatedvehiclejourney, "order")
|
||||
DO UPDATE SET
|
||||
stoppointref = EXCLUDED.stoppointref,
|
||||
aimeddeparturetime = EXCLUDED.aimeddeparturetime,
|
||||
expecteddeparturetime = EXCLUDED.expecteddeparturetime,
|
||||
aimedarrivaltime = EXCLUDED.aimedarrivaltime,
|
||||
expectedarrivaltime = EXCLUDED.expectedarrivaltime,
|
||||
cancellation = EXCLUDED.cancellation,
|
||||
actualdeparturetime = EXCLUDED.actualdeparturetime,
|
||||
actualarrivaltime = EXCLUDED.actualarrivaltime,
|
||||
recorded_data = EXCLUDED.recorded_data
|
||||
RETURNING CASE WHEN xmax = 0 THEN 'insert' ELSE 'update' END, id;
|
||||
`
|
||||
stmt, err := db.Prepare(query)
|
||||
if err != nil {
|
||||
return 0, "", fmt.Errorf("error preparing statement: %v", err)
|
||||
}
|
||||
defer stmt.Close()
|
||||
|
||||
var action string
|
||||
var id int
|
||||
err = stmt.QueryRow(values...).Scan(&action, &id)
|
||||
if err != nil {
|
||||
if 1 == 0 {
|
||||
fmt.Println("Executing query:", query)
|
||||
for i, v := range values {
|
||||
fmt.Printf("Value %d: (%v)\n", i+1, v)
|
||||
}
|
||||
|
||||
// Convert values to a single string and hash it using MD5
|
||||
var valuesString string
|
||||
for _, v := range values {
|
||||
if v != nil {
|
||||
valuesString += fmt.Sprintf("%v", v)
|
||||
}
|
||||
return 0, "", fmt.Errorf("error executing statement: %v", err)
|
||||
}
|
||||
return id, action, nil
|
||||
hash := md5.Sum([]byte(valuesString))
|
||||
hashString := hex.EncodeToString(hash[:])
|
||||
|
||||
estimatedVehicleJourneyID := values[0]
|
||||
orderID := values[1]
|
||||
key := fmt.Sprintf("%v.%v", estimatedVehicleJourneyID, orderID)
|
||||
|
||||
var err error
|
||||
|
||||
// Get the MD5 hash from Valkey
|
||||
retrievedHash, err := valki.GetValkeyValue(ctx, valkeyClient, key)
|
||||
if err != nil {
|
||||
return 0, "", fmt.Errorf("failed to get value from Valkey: %v", err)
|
||||
}
|
||||
|
||||
// Check if the retrieved value matches the original MD5 hash
|
||||
if retrievedHash != hashString {
|
||||
query := `
|
||||
INSERT INTO calls (
|
||||
estimatedvehiclejourney, "order", stoppointref,
|
||||
aimeddeparturetime, expecteddeparturetime,
|
||||
aimedarrivaltime, expectedarrivaltime,
|
||||
cancellation, actualdeparturetime, actualarrivaltime,
|
||||
recorded_data
|
||||
)
|
||||
VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11)
|
||||
ON CONFLICT (estimatedvehiclejourney, "order")
|
||||
DO UPDATE SET
|
||||
stoppointref = EXCLUDED.stoppointref,
|
||||
aimeddeparturetime = EXCLUDED.aimeddeparturetime,
|
||||
expecteddeparturetime = EXCLUDED.expecteddeparturetime,
|
||||
aimedarrivaltime = EXCLUDED.aimedarrivaltime,
|
||||
expectedarrivaltime = EXCLUDED.expectedarrivaltime,
|
||||
cancellation = EXCLUDED.cancellation,
|
||||
actualdeparturetime = EXCLUDED.actualdeparturetime,
|
||||
actualarrivaltime = EXCLUDED.actualarrivaltime,
|
||||
recorded_data = EXCLUDED.recorded_data
|
||||
RETURNING CASE WHEN xmax = 0 THEN 'insert' ELSE 'update' END, id;
|
||||
`
|
||||
stmt, err := db.Prepare(query)
|
||||
if err != nil {
|
||||
return 0, "", fmt.Errorf("error preparing statement: %v", err)
|
||||
}
|
||||
defer stmt.Close()
|
||||
|
||||
err = valki.SetValkeyValue(ctx, valkeyClient, key, hashString)
|
||||
if err != nil {
|
||||
return 0, "", fmt.Errorf("failed to set value in Valkey: %v", err)
|
||||
}
|
||||
|
||||
var action string
|
||||
var id int
|
||||
err = stmt.QueryRow(values...).Scan(&action, &id)
|
||||
if err != nil {
|
||||
return 0, "", fmt.Errorf("error executing statement: %v", err)
|
||||
}
|
||||
return id, action, nil
|
||||
} else {
|
||||
return 0, "none", nil
|
||||
}
|
||||
}
|
||||
|
||||
@@ -3,13 +3,37 @@ package database
|
||||
import (
|
||||
"database/sql"
|
||||
"fmt"
|
||||
"ti1/config"
|
||||
)
|
||||
|
||||
func GetDatasetVariable(config config.Config) string {
|
||||
if config.DatasetId != "" {
|
||||
fmt.Println(config.DatasetId)
|
||||
return config.DatasetId
|
||||
} else if config.ExcludedDatasetIds != "" {
|
||||
result := "EX." + config.ExcludedDatasetIds
|
||||
fmt.Println(result)
|
||||
return result
|
||||
}
|
||||
fmt.Println("")
|
||||
return ""
|
||||
}
|
||||
|
||||
func InsertServiceDelivery(db *sql.DB, responseTimestamp string, recordedAtTime string) (int, error) {
|
||||
fmt.Println("Inserting ServiceDelivery...")
|
||||
var id int
|
||||
|
||||
err := db.QueryRow("INSERT INTO public.ServiceDelivery (ResponseTimestamp, RecordedAtTime) VALUES ($1, $2) RETURNING ID", responseTimestamp, recordedAtTime).Scan(&id)
|
||||
// Load configuration
|
||||
config, err := config.LoadConfig()
|
||||
if err != nil {
|
||||
fmt.Println("Error loading config:", err)
|
||||
return 0, err
|
||||
}
|
||||
|
||||
// Get dataset variable
|
||||
datasetVariable := GetDatasetVariable(config)
|
||||
|
||||
err = db.QueryRow("INSERT INTO public.ServiceDelivery (ResponseTimestamp, RecordedAtTime, Source) VALUES ($1, $2, $3) RETURNING ID", responseTimestamp, recordedAtTime, datasetVariable).Scan(&id)
|
||||
if err != nil {
|
||||
fmt.Println(err)
|
||||
return 0, err
|
||||
|
||||
@@ -68,6 +68,7 @@ func SetupDB() error {
|
||||
id INTEGER PRIMARY KEY DEFAULT nextval('public.servicedelivery_id_seq'),
|
||||
responsetimestamp TIMESTAMPTZ,
|
||||
recordedattime TIMESTAMPTZ,
|
||||
source VARCHAR,
|
||||
data JSON
|
||||
);`,
|
||||
}
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
package export
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"log"
|
||||
@@ -20,6 +21,15 @@ func DBData(data *data.Data) {
|
||||
}
|
||||
defer db.Close()
|
||||
|
||||
// Connect to Valkey
|
||||
valkeyClient, err := config.ConnectToValkey()
|
||||
if err != nil {
|
||||
log.Fatalf("Failed to connect to Valkey: %v", err)
|
||||
}
|
||||
defer config.DisconnectFromValkey(valkeyClient)
|
||||
|
||||
ctx := context.Background()
|
||||
|
||||
// Get service id aka sid
|
||||
sid, err := database.InsertServiceDelivery(db, data.ServiceDelivery.ResponseTimestamp, data.ServiceDelivery.EstimatedTimetableDelivery[0].EstimatedJourneyVersionFrame.RecordedAtTime)
|
||||
if err != nil {
|
||||
@@ -28,7 +38,7 @@ func DBData(data *data.Data) {
|
||||
fmt.Println("SID:", sid)
|
||||
|
||||
// counters
|
||||
var insertCount, updateCount, totalCount, estimatedCallInsertCount, estimatedCallUpdateCount, recordedCallInsertCount, recordedCallUpdateCount int
|
||||
var insertCount, updateCount, totalCount, estimatedCallInsertCount, estimatedCallUpdateCount, estimatedCallNoneCount, recordedCallInsertCount, recordedCallUpdateCount, recordedCallNoneCount int
|
||||
|
||||
for _, journey := range data.ServiceDelivery.EstimatedTimetableDelivery[0].EstimatedJourneyVersionFrame.EstimatedVehicleJourney {
|
||||
var values []interface{}
|
||||
@@ -159,14 +169,16 @@ func DBData(data *data.Data) {
|
||||
//fmt.Printf("Inserts: %d, Updates: %d, Total: %d\n", insertCount, updateCount, totalCount)
|
||||
if totalCount%1000 == 0 {
|
||||
fmt.Printf(
|
||||
"Inserts: %d, Updates: %d, Total: %d; estimatedCalls = I: %d U: %d; recordedCalls = I: %d U: %d\n",
|
||||
"Inserts: %d, Updates: %d, Total: %d; estimatedCalls = I: %d U: %d N: %d; recordedCalls = I: %d U: %d N: %d\n",
|
||||
insertCount,
|
||||
updateCount,
|
||||
totalCount,
|
||||
estimatedCallInsertCount,
|
||||
estimatedCallUpdateCount,
|
||||
estimatedCallNoneCount,
|
||||
recordedCallInsertCount,
|
||||
recordedCallUpdateCount,
|
||||
recordedCallNoneCount,
|
||||
)
|
||||
}
|
||||
}
|
||||
@@ -291,9 +303,9 @@ func DBData(data *data.Data) {
|
||||
for i, v := range stringValues {
|
||||
interfaceValues[i] = v
|
||||
}
|
||||
id, action, err := database.InsertOrUpdateEstimatedCall(db, interfaceValues)
|
||||
id, action, err := database.InsertOrUpdateEstimatedCall(ctx, db, interfaceValues, valkeyClient)
|
||||
if err != nil {
|
||||
fmt.Printf("Error inserting/updating estimated call: %v\n", err)
|
||||
log.Fatalf("Failed to insert or update estimated call: %v", err)
|
||||
} else {
|
||||
if 1 == 0 {
|
||||
fmt.Printf("Action: %s, ID: %d\n", action, id)
|
||||
@@ -303,6 +315,8 @@ func DBData(data *data.Data) {
|
||||
estimatedCallInsertCount++
|
||||
} else if action == "update" {
|
||||
estimatedCallUpdateCount++
|
||||
} else if action == "none" {
|
||||
estimatedCallNoneCount++
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -367,7 +381,7 @@ func DBData(data *data.Data) {
|
||||
interfaceValues[i] = v
|
||||
}
|
||||
|
||||
id, action, err := database.InsertOrUpdateRecordedCall(db, interfaceValues)
|
||||
id, action, err := database.InsertOrUpdateRecordedCall(ctx, db, interfaceValues, valkeyClient)
|
||||
if err != nil {
|
||||
fmt.Printf("Error inserting/updating recorded call: %v\n", err)
|
||||
} else {
|
||||
@@ -380,6 +394,8 @@ func DBData(data *data.Data) {
|
||||
//fmt.Printf("Action: %s, ID: %d\n", action, id)
|
||||
} else if action == "update" {
|
||||
recordedCallUpdateCount++
|
||||
} else if action == "none" {
|
||||
recordedCallNoneCount++
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -387,14 +403,16 @@ func DBData(data *data.Data) {
|
||||
|
||||
}
|
||||
fmt.Printf(
|
||||
"DONE: Inserts: %d, Updates: %d, Total: %d; estimatedCalls = I: %d U: %d; recordedCalls = I: %d U: %d\n",
|
||||
"DONE: Inserts: %d, Updates: %d, Total: %d; estimatedCalls = I: %d U: %d N: %d; recordedCalls = I: %d U: %d N: %d\n",
|
||||
insertCount,
|
||||
updateCount,
|
||||
totalCount,
|
||||
estimatedCallInsertCount,
|
||||
estimatedCallUpdateCount,
|
||||
estimatedCallNoneCount,
|
||||
recordedCallInsertCount,
|
||||
recordedCallUpdateCount,
|
||||
recordedCallNoneCount,
|
||||
)
|
||||
// Create map to hold JSON
|
||||
serviceDeliveryJsonObject := make(map[string]interface{})
|
||||
@@ -404,8 +422,10 @@ func DBData(data *data.Data) {
|
||||
serviceDeliveryJsonObject["Updates"] = updateCount
|
||||
serviceDeliveryJsonObject["EstimatedCallInserts"] = estimatedCallInsertCount
|
||||
serviceDeliveryJsonObject["EstimatedCallUpdates"] = estimatedCallUpdateCount
|
||||
serviceDeliveryJsonObject["EstimatedCallNone"] = estimatedCallNoneCount
|
||||
serviceDeliveryJsonObject["RecordedCallInserts"] = recordedCallInsertCount
|
||||
serviceDeliveryJsonObject["RecordedCallUpdates"] = recordedCallUpdateCount
|
||||
serviceDeliveryJsonObject["RecordedCallNone"] = recordedCallNoneCount
|
||||
|
||||
// Convert JSON object to JSON string
|
||||
serviceDeliveryJsonString, err := json.Marshal(serviceDeliveryJsonObject)
|
||||
|
||||
7
go.mod
7
go.mod
@@ -1,5 +1,10 @@
|
||||
module ti1
|
||||
|
||||
go 1.22.1
|
||||
go 1.23.4
|
||||
|
||||
require github.com/lib/pq v1.10.9
|
||||
|
||||
require (
|
||||
github.com/valkey-io/valkey-go v1.0.52 // indirect
|
||||
golang.org/x/sys v0.24.0 // indirect
|
||||
)
|
||||
|
||||
4
go.sum
4
go.sum
@@ -1,2 +1,6 @@
|
||||
github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw=
|
||||
github.com/lib/pq v1.10.9/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o=
|
||||
github.com/valkey-io/valkey-go v1.0.52 h1:ojrR736satGucqpllYzal8fUrNNROc11V10zokAyIYg=
|
||||
github.com/valkey-io/valkey-go v1.0.52/go.mod h1:BXlVAPIL9rFQinSFM+N32JfWzfCaUAqBpZkc4vPY6fM=
|
||||
golang.org/x/sys v0.24.0 h1:Twjiwq9dn6R1fQcyiK+wQyHWfaz/BJB+YIpzU/Cv3Xg=
|
||||
golang.org/x/sys v0.24.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
|
||||
|
||||
18
main.go
18
main.go
@@ -2,6 +2,7 @@ package main
|
||||
|
||||
import (
|
||||
"log"
|
||||
"ti1/config"
|
||||
"ti1/data"
|
||||
"ti1/database"
|
||||
"ti1/export"
|
||||
@@ -9,18 +10,29 @@ import (
|
||||
)
|
||||
|
||||
func main() {
|
||||
log.Println("ti1 v0.2.1")
|
||||
log.Println("Starting...")
|
||||
|
||||
// Load configuration
|
||||
cfg, err := config.LoadConfig()
|
||||
if err != nil {
|
||||
log.Fatalf("Failed to load config: %v", err)
|
||||
}
|
||||
|
||||
// Setup the database
|
||||
err := database.SetupDB()
|
||||
err = database.SetupDB()
|
||||
if err != nil {
|
||||
log.Fatalf("Database setup failed: %v", err)
|
||||
}
|
||||
|
||||
// Get the current timestamp
|
||||
starttimestamp := time.Now().Format("20060102T150405")
|
||||
log.Printf("Starting timestamp: %s", starttimestamp)
|
||||
|
||||
for {
|
||||
start := time.Now()
|
||||
|
||||
data, err := data.FetchData()
|
||||
data, err := data.FetchData(starttimestamp, cfg.DatasetId, cfg.ExcludedDatasetIds)
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
@@ -31,7 +43,7 @@ func main() {
|
||||
elapsed := time.Since(start)
|
||||
if elapsed < 5*time.Minute {
|
||||
log.Printf("starting again in %v", 5*time.Minute-elapsed)
|
||||
time.Sleep(1*time.Minute - elapsed)
|
||||
time.Sleep(5*time.Minute - elapsed)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
26
valki/commands.go
Normal file
26
valki/commands.go
Normal file
@@ -0,0 +1,26 @@
|
||||
package valki
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/valkey-io/valkey-go"
|
||||
)
|
||||
|
||||
func SetValkeyValue(ctx context.Context, client valkey.Client, key, value string) error {
|
||||
err := client.Do(ctx, client.B().Set().Key(key).Value(value).Ex(time.Hour).Build()).Error()
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to set value in Valkey: %v", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func GetValkeyValue(ctx context.Context, client valkey.Client, key string) (string, error) {
|
||||
value, err := client.Do(ctx, client.B().Get().Key(key).Build()).ToString()
|
||||
if err != nil {
|
||||
return "hehe", nil
|
||||
//return "", fmt.Errorf("failed to get value from Valkey: %v", err)
|
||||
}
|
||||
return value, nil
|
||||
}
|
||||
Reference in New Issue
Block a user