| | """ |
| | Test script to verify Redis job store functionality |
| | """ |
| | import sys |
| | import os |
| |
|
| | |
| | sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) |
| |
|
| | from backend.utils.redis_job_store import RedisJobStore |
| |
|
| | def test_redis_job_store(): |
| | """Test the Redis job store functionality.""" |
| | print("Testing Redis Job Store...") |
| |
|
| | |
| | try: |
| | redis_job_store = RedisJobStore('redis://localhost:6379/0') |
| | print("[OK] Redis job store initialized successfully") |
| | except Exception as e: |
| | print(f"[ERROR] Failed to initialize Redis job store: {e}") |
| | return False |
| |
|
| | |
| | try: |
| | job_id = redis_job_store.create_job(initial_status='pending', initial_data={'test': True}) |
| | print(f"[OK] Job created successfully with ID: {job_id}") |
| | except Exception as e: |
| | print(f"[ERROR] Failed to create job: {e}") |
| | return False |
| |
|
| | |
| | try: |
| | job = redis_job_store.get_job(job_id) |
| | if job and job['status'] == 'pending': |
| | print(f"[OK] Job retrieved successfully: {job}") |
| | else: |
| | print(f"[ERROR] Job not found or incorrect status: {job}") |
| | return False |
| | except Exception as e: |
| | print(f"[ERROR] Failed to get job: {e}") |
| | return False |
| |
|
| | |
| | try: |
| | success = redis_job_store.update_job(job_id, status='completed', result={'data': 'test result'}) |
| | if success: |
| | print("[OK] Job updated successfully") |
| | else: |
| | print("[ERROR] Job update failed") |
| | return False |
| | except Exception as e: |
| | print(f"[ERROR] Failed to update job: {e}") |
| | return False |
| |
|
| | |
| | try: |
| | job = redis_job_store.get_job(job_id) |
| | if job and job['status'] == 'completed' and job['result']['data'] == 'test result': |
| | print(f"[OK] Updated job retrieved successfully: {job}") |
| | else: |
| | print(f"[ERROR] Job not updated correctly: {job}") |
| | return False |
| | except Exception as e: |
| | print(f"[ERROR] Failed to get updated job: {e}") |
| | return False |
| |
|
| | |
| | try: |
| | success = redis_job_store.delete_job(job_id) |
| | if success: |
| | print("[OK] Job deleted successfully") |
| | else: |
| | print("[ERROR] Job deletion failed") |
| | return False |
| | except Exception as e: |
| | print(f"[ERROR] Failed to delete job: {e}") |
| | return False |
| |
|
| | |
| | try: |
| | |
| | try: |
| | redis_job_store.create_job(initial_status='invalid_status') |
| | print("[ERROR] Should have failed with invalid status") |
| | return False |
| | except ValueError: |
| | print("[OK] Correctly rejected invalid status") |
| |
|
| | |
| | try: |
| | redis_job_store.get_job("invalid job id with spaces") |
| | print("[ERROR] Should have failed with invalid job ID format") |
| | return False |
| | except ValueError: |
| | print("[OK] Correctly rejected invalid job ID format") |
| | except Exception as e: |
| | print(f"[ERROR] Validation tests failed: {e}") |
| | return False |
| |
|
| | print("[OK] All Redis job store tests passed!") |
| | return True |
| |
|
| | if __name__ == "__main__": |
| | success = test_redis_job_store() |
| | if success: |
| | print("\nRedis job store is working correctly!") |
| | else: |
| | print("\nRedis job store tests failed!") |
| | sys.exit(1) |