Redis®*: 使用 Python

如何將 Redis 與 Python 連接

👋 歡迎來到 Stackhero 文件!

Stackhero 提供一個即用型的 Redis cloud 解決方案,帶來多項好處,包括:

  • 包含 Redis Commander 網頁介面
  • 無限制的訊息大小和傳輸。
  • 只需一鍵即可輕鬆進行 更新
  • 專用私有 VM 提供的最佳 效能 和強大 安全性

節省時間簡化生活:只需 5 分鐘 即可嘗試 Stackhero 的 Redis cloud hosting 解決方案!

要將您的 Python 應用程式無縫連接到 Redis 實例,redis 庫是一個受歡迎且高效的選擇。您可以按照以下建議開始安裝:

pip install redis
pip freeze > requirements.txt

讓我們探索一個使用默認值的簡單範例,應該能滿足大多數需求:

import redis

r = redis.from_url(
  'rediss://default:<password>@<XXXXXX>.stackhero-network.com:<PORT_TLS>',
  health_check_interval=10,
  socket_connect_timeout=5,
  retry_on_timeout=True,
  socket_keepalive=True
)

為了增強安全性,考慮使用環境變數來存儲您的憑證。以下是如何操作:

import os
import redis

r = redis.from_url(
  os.environ.get("STACKHERO_REDIS_URL_TLS"),
  health_check_interval=10,
  socket_connect_timeout=5,
  retry_on_timeout=True,
  socket_keepalive=True
)

確保您的環境變數包含如下定義:STACKHERO_REDIS_URL_TLS=rediss://default:<password>@<XXXXXX>.stackhero-network.com:<PORT_TLS>

Redis 的發布/訂閱功能可以輕鬆地與 Python 一起使用。以下是一個簡單的範例來指導您:

import redis

# 連接到 Redis
r = redis.from_url(
  'rediss://default:<password>@<XXXXXX>.stackhero-network.com:<PORT_TLS>',
  health_check_interval=10,
  socket_connect_timeout=5,
  socket_keepalive=True,
  retry_on_timeout=True
)

# 創建一個 PubSub 實例
p = r.pubsub()

# 訂閱 "test" 頻道
p.subscribe('test')

# 發布一條消息到 "test" 頻道
r.publish('test', '這是一條測試消息')

# 從 "test" 頻道獲取第一條可用消息
p.get_message()

# 取消訂閱 "test" 頻道
p.unsubscribe('test')

您可能想探索更高級的 Pub/Sub 技術,以下是一些範例:

# 創建一個 PubSub 實例並忽略訂閱消息
p = r.pubsub(ignore_subscribe_messages=True)

# 訂閱多個頻道
p.subscribe('test-1', 'test-2', ...)

# 取消訂閱多個頻道
p.unsubscribe('test-1', 'test-2', ...)

# 您也可以使用 "unsubscribe" 不帶參數來斷開所有已訂閱頻道的連接
p.unsubscribe()

# 使用模式訂閱頻道
p.psubscribe('my-*')

# 使用模式取消訂閱頻道
p.punsubscribe('my-*')

您可能會遇到 redis.exceptions.ConnectionError: Connection closed by server 錯誤,這可能是由於您的 Python 應用程式不活動,導致連接自動關閉。當您的應用程式嘗試重新連接時,可能會失敗,從而導致此錯誤。

為了減輕這種情況,考慮在您的 Redis 連接中設置 health_check_interval 參數,如下所示:

r = redis.from_url(
  'rediss://default:<password>@<XXXXXX>.stackhero-network.com:<PORT_TLS>',
  health_check_interval=10,
  socket_connect_timeout=5,
  retry_on_timeout=True,
  socket_keepalive=True
)

在使用 Redis 的 Pub/Sub 功能時,確保您的應用程式比指定的 health_check_interval(在此範例中為每 10 秒)更頻繁地調用 get_message()listen()。您可以參考 redis-py 官方文檔 以獲取更多詳細信息。

如果這些調用未在間隔內完成,您可能仍會遇到 "Connection closed by server" 錯誤。一個實用的解決方案是定期使用 check_health() 函數。

以下是如何實現它:

import redis
import threading

# 連接到 Redis
r = redis.from_url(
  'rediss://default:<password>@<XXXXXX>.stackhero-network.com:<PORT_TLS>',
  health_check_interval=10,
  socket_connect_timeout=5,
  retry_on_timeout=True,
  socket_keepalive=True
)

# 創建一個 PubSub 實例
p = r.pubsub()

# 訂閱 "test" 頻道
p.subscribe('test')

# 創建一個函數,每 5 秒調用一次 `check_health`
def redis_auto_check(p):
  t = threading.Timer(5, redis_auto_check, [p])
  t.start()
  p.check_health()

# 調用 redis_auto_check 函數
redis_auto_check(p)