Redis®*: Pythonでの使用

PythonでRedisに接続する方法

👋 Stackheroのドキュメントへようこそ!

Stackheroは、数多くの利点を提供する、すぐに使えるRedisクラウドソリューションを提供しています。

  • Redis Commander Web UIを含む。
  • メッセージサイズと転送が無制限
  • ワンクリックで簡単にアップデート
  • プライベートで専用のVMによる最適なパフォーマンスと強力なセキュリティ

時間を節約し、生活を簡素化:StackheroのRedisクラウドホスティングソリューションを試すのに5分しかかかりません!

PythonアプリケーションをRedisインスタンスにシームレスに接続するには、redisライブラリが人気で効率的な選択です。以下の提案でインストールを開始できます。

pip install redis
pip freeze > requirements.txt

ほとんどのニーズに対応するデフォルト値を使用した簡単な例を見てみましょう。

import redis

r = redis.from_url(
  'rediss://default:<password>@<XXXXXX>.stackhero-network.com:<PORT_TLS>',
  health_check_interval=10,
  socket_connect_timeout=5,
  retry_on_timeout=True,
  socket_keepalive=True
)

セキュリティを強化するために、資格情報に環境変数を使用することを検討してください。以下はその方法です。

import os
import redis

r = redis.from_url(
  os.environ.get("STACKHERO_REDIS_URL_TLS"),
  health_check_interval=10,
  socket_connect_timeout=5,
  retry_on_timeout=True,
  socket_keepalive=True
)

環境変数に次のような定義が含まれていることを確認してください: STACKHERO_REDIS_URL_TLS=rediss://default:<password>@<XXXXXX>.stackhero-network.com:<PORT_TLS>

RedisのPublish/Subscribe機能はPythonで簡単に利用できます。以下はそのためのシンプルな例です。

import redis

# Redisに接続
r = redis.from_url(
  'rediss://default:<password>@<XXXXXX>.stackhero-network.com:<PORT_TLS>',
  health_check_interval=10,
  socket_connect_timeout=5,
  socket_keepalive=True,
  retry_on_timeout=True
)

# PubSubインスタンスを作成
p = r.pubsub()

# チャンネル"test"にサブスクライブ
p.subscribe('test')

# チャンネル"test"にメッセージを公開
r.publish('test', 'This is a test message')

# チャンネル"test"から最初の利用可能なメッセージを取得
p.get_message()

# チャンネル"test"からサブスクライブ解除
p.unsubscribe('test')

以下の例を使って、より高度なPub/Sub技術を探求することができます。

# PubSubインスタンスを作成し、サブスクライブメッセージを無視
p = r.pubsub(ignore_subscribe_messages=True)

# 複数のチャンネルにサブスクライブ
p.subscribe('test-1', 'test-2', ...)

# 複数のチャンネルからサブスクライブ解除
p.unsubscribe('test-1', 'test-2', ...)

# 引数なしで"unsubscribe"を使用して、すべてのサブスクライブされたチャンネルから切断することもできます
p.unsubscribe()

# パターンを使用してチャンネルにサブスクライブ
p.psubscribe('my-*')

# パターンを使用してチャンネルからサブスクライブ解除
p.punsubscribe('my-*')

redis.exceptions.ConnectionError: Connection closed by serverエラーが発生することがあります。これは、Pythonアプリケーションの非アクティブ状態が原因で接続が自動的に閉じられるためです。アプリが再接続を試みると失敗し、このエラーが発生する可能性があります。

これを軽減するために、Redis接続でhealth_check_intervalパラメータを次のように設定することを検討してください。

r = redis.from_url(
  'rediss://default:<password>@<XXXXXX>.stackhero-network.com:<PORT_TLS>',
  health_check_interval=10,
  socket_connect_timeout=5,
  retry_on_timeout=True,
  socket_keepalive=True
)

RedisのPub/Sub機能を使用する際は、アプリケーションが指定されたhealth_check_interval(この例では10秒ごと)よりも頻繁にget_message()またはlisten()を呼び出すことを確認してください。詳細については、redis-py公式ドキュメントを参照できます。

この間隔内にこれらの呼び出しが行われない場合、「Connection closed by server」エラーが発生する可能性があります。実用的な解決策は、定期的にcheck_health()関数を使用することです。

以下はその実装方法です。

import redis
import threading

# Redisに接続
r = redis.from_url(
  'rediss://default:<password>@<XXXXXX>.stackhero-network.com:<PORT_TLS>',
  health_check_interval=10,
  socket_connect_timeout=5,
  retry_on_timeout=True,
  socket_keepalive=True
)

# PubSubインスタンスを作成
p = r.pubsub()

# チャンネル"test"にサブスクライブ
p.subscribe('test')

# 5秒ごとに`check_health`を呼び出す関数を作成
def redis_auto_check(p):
  t = threading.Timer(5, redis_auto_check, [p])
  t.start()
  p.check_health()

# redis_auto_check関数を呼び出す
redis_auto_check(p)