Redis®*: 使用 Python
如何將 Redis 與 Python 連接
👋 歡迎來到 Stackhero 文件!
Stackhero 提供一個即用型的 Redis cloud 解決方案,帶來多項好處,包括:
- 包含
Redis Commander網頁介面。- 無限制的訊息大小和傳輸。
- 只需一鍵即可輕鬆進行 更新。
- 由 專用私有 VM 提供的最佳 效能 和強大 安全性。
節省時間,簡化生活:只需 5 分鐘 即可嘗試 Stackhero 的 Redis cloud hosting 解決方案!
選擇合適的 Python 庫來連接 Redis
要將您的 Python 應用程式無縫連接到 Redis 實例,redis 庫是一個受歡迎且高效的選擇。您可以按照以下建議開始安裝:
pip install redis
pip freeze > requirements.txt
將 Python 連接到 Redis
讓我們探索一個使用默認值的簡單範例,應該能滿足大多數需求:
import redis
r = redis.from_url(
'rediss://default:<password>@<XXXXXX>.stackhero-network.com:<PORT_TLS>',
health_check_interval=10,
socket_connect_timeout=5,
retry_on_timeout=True,
socket_keepalive=True
)
為了增強安全性,考慮使用環境變數來存儲您的憑證。以下是如何操作:
import os
import redis
r = redis.from_url(
os.environ.get("STACKHERO_REDIS_URL_TLS"),
health_check_interval=10,
socket_connect_timeout=5,
retry_on_timeout=True,
socket_keepalive=True
)
確保您的環境變數包含如下定義:STACKHERO_REDIS_URL_TLS=rediss://default:<password>@<XXXXXX>.stackhero-network.com:<PORT_TLS>
使用 Redis 和 Python 的 Pub/Sub 功能
Redis 的發布/訂閱功能可以輕鬆地與 Python 一起使用。以下是一個簡單的範例來指導您:
import redis
# 連接到 Redis
r = redis.from_url(
'rediss://default:<password>@<XXXXXX>.stackhero-network.com:<PORT_TLS>',
health_check_interval=10,
socket_connect_timeout=5,
socket_keepalive=True,
retry_on_timeout=True
)
# 創建一個 PubSub 實例
p = r.pubsub()
# 訂閱 "test" 頻道
p.subscribe('test')
# 發布一條消息到 "test" 頻道
r.publish('test', '這是一條測試消息')
# 從 "test" 頻道獲取第一條可用消息
p.get_message()
# 取消訂閱 "test" 頻道
p.unsubscribe('test')
使用 Python 的高級 Redis Pub/Sub 範例
您可能想探索更高級的 Pub/Sub 技術,以下是一些範例:
# 創建一個 PubSub 實例並忽略訂閱消息
p = r.pubsub(ignore_subscribe_messages=True)
# 訂閱多個頻道
p.subscribe('test-1', 'test-2', ...)
# 取消訂閱多個頻道
p.unsubscribe('test-1', 'test-2', ...)
# 您也可以使用 "unsubscribe" 不帶參數來斷開所有已訂閱頻道的連接
p.unsubscribe()
# 使用模式訂閱頻道
p.psubscribe('my-*')
# 使用模式取消訂閱頻道
p.punsubscribe('my-*')
避免 Redis 和 Python 中的 "Connection Closed by Server" 錯誤
您可能會遇到 redis.exceptions.ConnectionError: Connection closed by server 錯誤,這可能是由於您的 Python 應用程式不活動,導致連接自動關閉。當您的應用程式嘗試重新連接時,可能會失敗,從而導致此錯誤。
為了減輕這種情況,考慮在您的 Redis 連接中設置 health_check_interval 參數,如下所示:
r = redis.from_url(
'rediss://default:<password>@<XXXXXX>.stackhero-network.com:<PORT_TLS>',
health_check_interval=10,
socket_connect_timeout=5,
retry_on_timeout=True,
socket_keepalive=True
)
在使用 Redis 的 Pub/Sub 功能時,確保您的應用程式比指定的 health_check_interval(在此範例中為每 10 秒)更頻繁地調用 get_message() 或 listen()。您可以參考 redis-py 官方文檔 以獲取更多詳細信息。
如果這些調用未在間隔內完成,您可能仍會遇到 "Connection closed by server" 錯誤。一個實用的解決方案是定期使用 check_health() 函數。
以下是如何實現它:
import redis
import threading
# 連接到 Redis
r = redis.from_url(
'rediss://default:<password>@<XXXXXX>.stackhero-network.com:<PORT_TLS>',
health_check_interval=10,
socket_connect_timeout=5,
retry_on_timeout=True,
socket_keepalive=True
)
# 創建一個 PubSub 實例
p = r.pubsub()
# 訂閱 "test" 頻道
p.subscribe('test')
# 創建一個函數,每 5 秒調用一次 `check_health`
def redis_auto_check(p):
t = threading.Timer(5, redis_auto_check, [p])
t.start()
p.check_health()
# 調用 redis_auto_check 函數
redis_auto_check(p)