diff --git a/internal/autoimport.go b/internal/autoimport.go new file mode 100644 index 0000000..c7d7f1a --- /dev/null +++ b/internal/autoimport.go @@ -0,0 +1,290 @@ +package internal + +import ( + "context" + "database/sql" + "fmt" + "log/slog" + "os" + "path/filepath" + "strings" + "time" +) + +// AutoImportService manages automatic file imports for all users +type AutoImportService struct { + dataDir string + checkInterval time.Duration + cancelFunc context.CancelFunc + ctx context.Context +} + +// NewAutoImportService creates a new auto-import service +func NewAutoImportService(dataDir string) *AutoImportService { + ctx, cancel := context.WithCancel(context.Background()) + return &AutoImportService{ + dataDir: dataDir, + checkInterval: 1 * time.Minute, + cancelFunc: cancel, + ctx: ctx, + } +} + +// Start begins the auto-import background job +func (s *AutoImportService) Start() { + slog.Info("Starting auto-import service", "checkInterval", s.checkInterval) + + go func() { + // Run immediately on start + s.scanAllUsers() + + // Then run on interval + ticker := time.NewTicker(s.checkInterval) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + s.scanAllUsers() + case <-s.ctx.Done(): + slog.Info("Auto-import service stopped") + return + } + } + }() +} + +// Stop gracefully stops the auto-import service +func (s *AutoImportService) Stop() { + slog.Info("Stopping auto-import service") + s.cancelFunc() +} + +// scanAllUsers scans all user directories for files to import +func (s *AutoImportService) scanAllUsers() { + entries, err := os.ReadDir(s.dataDir) + if err != nil { + slog.Error("Failed to read data directory", "error", err) + return + } + + for _, entry := range entries { + if !entry.IsDir() { + continue + } + + userID := entry.Name() + s.scanUserDirectory(userID) + } +} + +// scanUserDirectory scans a single user's ingest directory +func (s *AutoImportService) scanUserDirectory(userID string) { + ingestDir := filepath.Join(s.dataDir, userID, "ingest") + + // Check if ingest directory exists + if _, err := os.Stat(ingestDir); os.IsNotExist(err) { + // Create ingest directory if it doesn't exist + if err := os.MkdirAll(ingestDir, 0755); err != nil { + slog.Error("Failed to create ingest directory", "userID", userID, "error", err) + } + return + } + + entries, err := os.ReadDir(ingestDir) + if err != nil { + slog.Error("Failed to read ingest directory", "userID", userID, "error", err) + return + } + + for _, entry := range entries { + if entry.IsDir() { + continue + } + + filename := entry.Name() + + // Skip hidden files (starting with .) + if strings.HasPrefix(filename, ".") { + continue + } + + // Skip log files + if strings.HasSuffix(filename, ".log") { + continue + } + + filePath := filepath.Join(ingestDir, filename) + s.processFile(userID, filePath, filename) + } +} + +// processFile processes a single file for import +func (s *AutoImportService) processFile(userID, filePath, filename string) { + // Check if file is stable (not being written to) + if !s.isFileStable(filePath) { + slog.Debug("File not stable yet, skipping", "userID", userID, "file", filename) + return + } + + slog.Info("Processing file for import", "userID", userID, "file", filename) + + // Create log file for this import + logPath := filePath + ".log" + logFile, err := os.Create(logPath) + if err != nil { + slog.Error("Failed to create log file", "userID", userID, "file", filename, "error", err) + return + } + defer logFile.Close() + + logWriter := &importLogger{ + file: logFile, + userID: userID, + filename: filename, + } + + logWriter.log("Starting import of %s", filename) + + // Get username from auth database + username, err := GetUsernameByID(userID) + if err != nil { + logWriter.log("ERROR: Failed to get username: %v", err) + slog.Error("Failed to get username", "userID", userID, "error", err) + return + } + + // Get user database + userDB, err := GetUserDB(userID, username) + if err != nil { + logWriter.log("ERROR: Failed to get user database: %v", err) + slog.Error("Failed to get user database", "userID", userID, "error", err) + return + } + + // Determine file type and parse + var parseErr error + if strings.HasSuffix(strings.ToLower(filename), ".xml") { + logWriter.log("Detected XML backup file") + parseErr = s.parseXMLBackup(userDB, filePath, logWriter) + } else { + logWriter.log("ERROR: Unsupported file type") + slog.Warn("Unsupported file type", "userID", userID, "file", filename) + return + } + + // Move file to complete directory + completeDir := filepath.Join(s.dataDir, userID, "complete") + if err := os.MkdirAll(completeDir, 0755); err != nil { + logWriter.log("ERROR: Failed to create complete directory: %v", err) + slog.Error("Failed to create complete directory", "userID", userID, "error", err) + return + } + + // Generate unique filename if file already exists in complete dir + completePath := filepath.Join(completeDir, filename) + if _, err := os.Stat(completePath); err == nil { + // File exists, add timestamp + timestamp := time.Now().Format("20060102_150405") + ext := filepath.Ext(filename) + base := strings.TrimSuffix(filename, ext) + filename = fmt.Sprintf("%s_%s%s", base, timestamp, ext) + completePath = filepath.Join(completeDir, filename) + } + + if parseErr != nil { + logWriter.log("ERROR: Import failed: %v", parseErr) + logWriter.log("File will remain in ingest directory for manual review") + slog.Error("Import failed", "userID", userID, "file", filename, "error", parseErr) + } else { + // Move file to complete directory + if err := os.Rename(filePath, completePath); err != nil { + logWriter.log("ERROR: Failed to move file to complete directory: %v", err) + slog.Error("Failed to move file", "userID", userID, "error", err) + return + } + + // Move log file too + logDestPath := completePath + ".log" + logFile.Close() // Close before moving + if err := os.Rename(logPath, logDestPath); err != nil { + slog.Warn("Failed to move log file", "userID", userID, "error", err) + } + + logWriter.log("Import completed successfully") + logWriter.log("File moved to: %s", completePath) + slog.Info("Import completed", "userID", userID, "file", filename) + } +} + +// isFileStable checks if a file has finished being written +// Returns true if file size hasn't changed in the last 5 seconds +func (s *AutoImportService) isFileStable(filePath string) bool { + info1, err := os.Stat(filePath) + if err != nil { + return false + } + + size1 := info1.Size() + mod1 := info1.ModTime() + + // Wait 5 seconds + time.Sleep(5 * time.Second) + + info2, err := os.Stat(filePath) + if err != nil { + return false + } + + size2 := info2.Size() + mod2 := info2.ModTime() + + // File is stable if size and modification time haven't changed + return size1 == size2 && mod1.Equal(mod2) +} + +// parseXMLBackup parses an XML backup file +func (s *AutoImportService) parseXMLBackup(userDB *sql.DB, filePath string, logger *importLogger) error { + logger.log("Parsing XML backup file") + + file, err := os.Open(filePath) + if err != nil { + return fmt.Errorf("failed to open file: %w", err) + } + defer file.Close() + + // Get file info for progress tracking + fileInfo, _ := file.Stat() + fileSize := fileInfo.Size() + logger.log("File size: %d bytes", fileSize) + + // Parse the XML backup using streaming parser + totalProcessed, totalSkipped, err := ParseSMSBackupStreaming(userDB, file, 100) + if err != nil { + return fmt.Errorf("failed to parse backup: %w", err) + } + + logger.log("Import statistics:") + logger.log(" Total processed: %d", totalProcessed) + logger.log(" Total skipped (duplicates): %d", totalSkipped) + + return nil +} + +// importLogger writes log messages to a file +type importLogger struct { + file *os.File + userID string + filename string +} + +func (l *importLogger) log(format string, args ...interface{}) { + timestamp := time.Now().Format("2006-01-02 15:04:05") + message := fmt.Sprintf(format, args...) + logLine := fmt.Sprintf("[%s] %s\n", timestamp, message) + + l.file.WriteString(logLine) + l.file.Sync() // Ensure it's written to disk + + slog.Info("Auto-import", "userID", l.userID, "file", l.filename, "message", message) +} diff --git a/main.go b/main.go index 9ef656c..8c7dd28 100644 --- a/main.go +++ b/main.go @@ -123,6 +123,12 @@ func main() { logger.Info("Serving static files from ./frontend/dist with SPA routing support") } + // Start auto-import service + dataDir := dbPathPrefix + "/data" + autoImportService := internal.NewAutoImportService(dataDir) + autoImportService.Start() + defer autoImportService.Stop() + // Start pprof server in a separate goroutine for profiling go func() { port := os.Getenv("PORT")