test_health_endpoint.py 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106
  1. import json
  2. from unittest.mock import MagicMock, Mock
  3. import pytest
  4. import sqlalchemy
  5. from redis.exceptions import RedisError
  6. from reflex.app import health
  7. from reflex.model import get_db_status
  8. from reflex.utils.prerequisites import get_redis_status
  9. @pytest.mark.asyncio
  10. @pytest.mark.parametrize(
  11. "mock_redis_client, expected_status",
  12. [
  13. # Case 1: Redis client is available and responds to ping
  14. (Mock(ping=lambda: None), True),
  15. # Case 2: Redis client raises RedisError
  16. (Mock(ping=lambda: (_ for _ in ()).throw(RedisError)), False),
  17. # Case 3: Redis client is not used
  18. (None, None),
  19. ],
  20. )
  21. async def test_get_redis_status(mock_redis_client, expected_status, mocker):
  22. # Mock the `get_redis_sync` function to return the mock Redis client
  23. mock_get_redis_sync = mocker.patch(
  24. "reflex.utils.prerequisites.get_redis_sync", return_value=mock_redis_client
  25. )
  26. # Call the function
  27. status = await get_redis_status()
  28. # Verify the result
  29. assert status == expected_status
  30. mock_get_redis_sync.assert_called_once()
  31. @pytest.mark.asyncio
  32. @pytest.mark.parametrize(
  33. "mock_engine, execute_side_effect, expected_status",
  34. [
  35. # Case 1: Database is accessible
  36. (MagicMock(), None, True),
  37. # Case 2: Database connection error (OperationalError)
  38. (
  39. MagicMock(),
  40. sqlalchemy.exc.OperationalError("error", "error", "error"),
  41. False,
  42. ),
  43. ],
  44. )
  45. async def test_get_db_status(mock_engine, execute_side_effect, expected_status, mocker):
  46. # Mock get_engine to return the mock_engine
  47. mock_get_engine = mocker.patch("reflex.model.get_engine", return_value=mock_engine)
  48. # Mock the connection and its execute method
  49. if mock_engine:
  50. mock_connection = mock_engine.connect.return_value.__enter__.return_value
  51. if execute_side_effect:
  52. # Simulate execute method raising an exception
  53. mock_connection.execute.side_effect = execute_side_effect
  54. else:
  55. # Simulate successful execute call
  56. mock_connection.execute.return_value = None
  57. # Call the function
  58. status = await get_db_status()
  59. # Verify the result
  60. assert status == expected_status
  61. mock_get_engine.assert_called_once()
  62. @pytest.mark.asyncio
  63. @pytest.mark.parametrize(
  64. "db_status, redis_status, expected_status, expected_code",
  65. [
  66. # Case 1: Both services are connected
  67. (True, True, {"status": True, "db": True, "redis": True}, 200),
  68. # Case 2: Database not connected, Redis connected
  69. (False, True, {"status": False, "db": False, "redis": True}, 503),
  70. # Case 3: Database connected, Redis not connected
  71. (True, False, {"status": False, "db": True, "redis": False}, 503),
  72. # Case 4: Both services not connected
  73. (False, False, {"status": False, "db": False, "redis": False}, 503),
  74. # Case 5: Database Connected, Redis not used
  75. (True, None, {"status": True, "db": True, "redis": False}, 200),
  76. ],
  77. )
  78. async def test_health(db_status, redis_status, expected_status, expected_code, mocker):
  79. # Mock get_db_status and get_redis_status
  80. mocker.patch("reflex.app.get_db_status", return_value=db_status)
  81. mocker.patch(
  82. "reflex.utils.prerequisites.get_redis_status", return_value=redis_status
  83. )
  84. # Call the async health function
  85. response = await health()
  86. print(json.loads(response.body))
  87. print(expected_status)
  88. # Verify the response content and status code
  89. assert response.status_code == expected_code
  90. assert json.loads(response.body) == expected_status