Redis®*: 使用 Python
如何使用 Python 连接 Redis
👋 欢迎来到 Stackhero 文档!
Stackhero 提供现成的 Redis 云 解决方案,具有众多优势,包括:
- 包含
Redis Commander网页界面。- 无限消息大小和传输。
- 只需点击即可轻松更新。
- 由专用私有 VM提供的最佳性能和强大安全性。
节省时间,简化生活:只需 5 分钟即可试用 Stackhero 的 Redis 云托管 解决方案!
选择合适的 Python 库连接 Redis
要将您的 Python 应用程序无缝连接到 Redis 实例,redis 库是一个流行且高效的选择。您可以通过以下建议开始安装:
pip install redis
pip freeze > requirements.txt
将 Python 连接到 Redis
让我们探索一个使用默认值的简单示例,这应该能满足大多数需求:
import redis
r = redis.from_url(
'rediss://default:<password>@<XXXXXX>.stackhero-network.com:<PORT_TLS>',
health_check_interval=10,
socket_connect_timeout=5,
retry_on_timeout=True,
socket_keepalive=True
)
为了增强安全性,请考虑使用环境变量来存储您的凭据。以下是实现方法:
import os
import redis
r = redis.from_url(
os.environ.get("STACKHERO_REDIS_URL_TLS"),
health_check_interval=10,
socket_connect_timeout=5,
retry_on_timeout=True,
socket_keepalive=True
)
确保您的环境变量包含如下定义:STACKHERO_REDIS_URL_TLS=rediss://default:<password>@<XXXXXX>.stackhero-network.com:<PORT_TLS>
使用 Redis 和 Python 的 Pub/Sub
Redis 的发布/订阅功能可以很容易地与 Python 一起使用。以下是一个简单的示例来指导您:
import redis
# 连接到 Redis
r = redis.from_url(
'rediss://default:<password>@<XXXXXX>.stackhero-network.com:<PORT_TLS>',
health_check_interval=10,
socket_connect_timeout=5,
socket_keepalive=True,
retry_on_timeout=True
)
# 创建一个 PubSub 实例
p = r.pubsub()
# 订阅频道 "test"
p.subscribe('test')
# 向频道 "test" 发布消息
r.publish('test', '这是一条测试消息')
# 从频道 "test" 获取第一个可用消息
p.get_message()
# 取消订阅频道 "test"
p.unsubscribe('test')
使用 Python 的高级 Redis Pub/Sub 示例
您可能希望通过以下示例探索更高级的 Pub/Sub 技术:
# 创建一个 PubSub 实例并忽略订阅消息
p = r.pubsub(ignore_subscribe_messages=True)
# 订阅多个频道
p.subscribe('test-1', 'test-2', ...)
# 取消订阅多个频道
p.unsubscribe('test-1', 'test-2', ...)
# 您还可以使用 "unsubscribe" 不带参数来断开所有已订阅频道的连接
p.unsubscribe()
# 使用模式订阅频道
p.psubscribe('my-*')
# 使用模式取消订阅频道
p.punsubscribe('my-*')
避免 Redis 和 Python 中的 "Connection Closed by Server" 错误
您可能会遇到 redis.exceptions.ConnectionError: Connection closed by server 错误,这可能是由于您的 Python 应用程序不活动导致连接自动关闭。当您的应用尝试重新连接时,可能会失败,从而导致此错误。
为了解决这个问题,请考虑在您的 Redis 连接中设置 health_check_interval 参数,如下所示:
r = redis.from_url(
'rediss://default:<password>@<XXXXXX>.stackhero-network.com:<PORT_TLS>',
health_check_interval=10,
socket_connect_timeout=5,
retry_on_timeout=True,
socket_keepalive=True
)
在使用 Redis 的 Pub/Sub 功能时,确保您的应用程序调用 get_message() 或 listen() 的频率高于指定的 health_check_interval(在此示例中为每 10 秒)。您可以参考 redis-py 官方文档 以获取更多详细信息。
如果这些调用未在间隔内进行,您可能仍会遇到 "Connection closed by server" 错误。一个实用的解决方案是定期使用 check_health() 函数。
以下是实现方法:
import redis
import threading
# 连接到 Redis
r = redis.from_url(
'rediss://default:<password>@<XXXXXX>.stackhero-network.com:<PORT_TLS>',
health_check_interval=10,
socket_connect_timeout=5,
retry_on_timeout=True,
socket_keepalive=True
)
# 创建一个 PubSub 实例
p = r.pubsub()
# 订阅频道 "test"
p.subscribe('test')
# 创建一个函数,每 5 秒调用一次 `check_health`
def redis_auto_check(p):
t = threading.Timer(5, redis_auto_check, [p])
t.start()
p.check_health()
# 调用 redis_auto_check 函数
redis_auto_check(p)