使用 Python 构建 RabbitMQ 应用程序

工程 | Greg L. Turnquist | 2010 年 8 月 19 日 | ...

RabbitMQ 是一个强大的消息代理,基于高级消息队列协议 (AMQP)。由于 AMQP 规范的中立性,可以轻松地从包括 Python 在内的许多平台连接到它。在这篇博客文章中,我们将

  • 创建一个简单的股票行情 Python 应用程序
  • 创建一个决定何时买卖的券商 Python 应用程序。
  • 比较 RabbitMQ 团队创建的 AMQP 库 pikapy-amqplib
本博客的所有源代码都可以在 http://github.com/gregturn/amqp-demo 找到。这假设您已经按照适合您平台的说明安装了 RabbitMQ 并启动了它。就我个人而言,我让它在我的 Mac OS X 机器(雪豹)上运行。

顺便说一下

本博客文章中编写的代码仅用于演示目的。请勿依赖这些算法进行金融建议。
说完这些,让我们写一些代码吧!

构建股票行情

消息解决方案的一个好例子是股票行情系统。证券交易所向代理发布消息,指明股票名称、价格和时间。
import pickle
import random
import time

class Ticker(object):
    def __init__(self, publisher, qname):
        self.publisher = publisher

        # This quickly creates four random stock symbols
        chars = range(ord("A"), ord("Z")+1)
        def random_letter(): return chr(random.choice(chars))
        self.stock_symbols = [random_letter()+random_letter()+random_letter() for i in range(4)]

        self.last_quote = {}
        self.counter = 0
        self.time_format = "%a, %d %b %Y %H:%M:%S +0000"
        self.qname = qname

    def get_quote(self):
        symbol = random.choice(self.stock_symbols)
        if symbol in self.last_quote:
            previous_quote = self.last_quote[symbol]
            new_quote = random.uniform(0.9*previous_quote, 1.1*previous_quote)
            if abs(new_quote) - 0 < 1.0:
                new_quote = 1.0
            self.last_quote[symbol] = new_quote
        else:
            new_quote = random.uniform(10.0, 250.0)
            self.last_quote[symbol] = new_quote
        self.counter += 1
        return (symbol, self.last_quote[symbol], time.gmtime(), self.counter)

    def monitor(self):
        while True:
            quote = self.get_quote()
            print("New quote is %s" % str(quote))
            self.publisher.publish(pickle.dumps((quote[0], quote[1], time.strftime(self.time_format, quote[2]), quote[3])), routing_key="")
            secs = random.uniform(0.1, 0.5)
            #print("Sleeping %s seconds..." % secs)
            time.sleep(secs)

此应用程序随机创建四个股票代码,然后开始创建报价。它最初选择 10.0 到 250.0 之间的随机值,然后继续将价格随机调整到前一个价格的 90% 到 110% 之间。然后它随机等待 0.1 到 0.5 秒,然后再产生下一个报价。此代码设计的一个重要部分是将发布到 AMQP 代理与股票行情解耦。相反,它期望在构造时注入一个发布者服务。

值得注意的是,我们使用 pickle 来序列化我们的股票报价数据元组。在 AMQP 中,消息的主体只是一系列字节。存储什么以及如何序列化不属于规范的一部分,而是必须在发送方和接收方之间达成一致。在我们的情况下,发布者和订阅者都同意它包含一个 pickled 元组。

创建 AMQP 服务

下一步是创建我们的 AMQP 客户端服务。其目的是让我们轻松地正确隔离与 AMQP 服务器的通信,无论是通过发布还是通过消费事件。
from amqplib import client_0_8 as amqp

class PyAmqpLibPublisher(object):
    def __init__(self, exchange_name):
        self.exchange_name = exchange_name
        self.queue_exists = False

    def publish(self, message, routing_key):
        conn = amqp.Connection(host="127.0.0.1", userid="guest", password="guest", virtual_host="/", insist=False)

        ch = conn.channel()

        ch.exchange_declare(exchange=self.exchange_name, type="fanout", durable=False, auto_delete=False)

        msg = amqp.Message(message)
        msg.properties["content_type"] = "text/plain"
        msg.properties["delivery_mode"] = 2
        ch.basic_publish(exchange=self.exchange_name,
                         routing_key=routing_key,
                         msg=msg)
        ch.close()
        conn.close()

这里需要注意的一点是声明的交换器类型为“fanout”。这意味着绑定到它的每个队列都将收到消息的副本,而不会在代理端进行昂贵的处理。

您可能想知道为什么主体内容类型是“text/plain”,考虑到它是一个序列化消息。这是因为 Python 的 pickle 库以 ASCII 铠甲格式编码数据,可以使用任何工具查看而不会导致奇怪的行为。

低买高卖

一个简单而明智的建议是在价格低时买入,在价格高时卖出。这里我们将看一个简单的客户端,它订阅股票报价,收集价格的历史趋势以判断下一个价格是处于低端还是高端,然后决定买入或卖出。
import pickle
import random
import uuid

class Buyer(object):
    def __init__(self, client, qname, trend=5):
        self.holdings = {}
        self.cash = 100000.0
        self.history = {}
        self.qname = qname
        self.client = client
        self.trend = trend
        self.qname = uuid.uuid4().hex

    def decide_whether_to_buy_or_sell(self, quote):
        symbol, price, date, counter = quote
        #print "Thinking about whether to buy or sell %s at %s" % (symbol, price)

        if symbol not in self.history:
            self.history[symbol] = [price]
        else:
            self.history[symbol].append(price)

        if len(self.history[symbol]) >= self.trend:
            price_low = min(self.history[symbol][-self.trend:])
            price_max = max(self.history[symbol][-self.trend:])
            price_avg = sum(self.history[symbol][-self.trend:])/self.trend
            #print "Recent history of %s is %s" % (symbol, self.history[symbol][-self.trend:])
        else:
            price_low, price_max, price_avg = (-1, -1, -1)
            print "%s quotes until we start deciding whether to buy or sell %s" % (self.trend - len(self.history[symbol]), symbol)
            #print "Recent history of %s is %s" % (symbol, self.history[symbol])

        if price_low == -1: return

        #print "Trending minimum/avg/max of %s is %s-%s-%s" % (symbol, price_low, price_avg, price_max)
        #for symbol in self.holdings.keys():
        #    print "self.history[symbol][-1] = %s" % self.history[symbol][-1]
        #    print "self.holdings[symbol][0] = %s" % self.holdings[symbol][0]
        #    print "Value of %s is %s" % (symbol, float(self.holdings[symbol][0])*self.history[symbol][-1])
        value = sum([self.holdings[symbol][0]*self.history[symbol][-1] for symbol in self.holdings.keys()])
        print "Net worth is %s + %s = %s" % (self.cash, value, self.cash + value)

        if symbol not in self.holdings:
            if price < 1.01*price_low:
                shares_to_buy = random.choice([10, 15, 20, 25, 30])
                print "I don't own any %s yet, and the price is below the trending minimum of %s so I'm buying %s shares." % (symbol, price_low, shares_to_buy)
                cost = shares_to_buy * price
                print "Cost is %s, cash is %s" % (cost, self.cash)
                if cost < self.cash:
                    self.holdings[symbol] = (shares_to_buy, price, cost)
                    self.cash -= cost
                    print "Cash is now %s" % self.cash
                else:
                    print "Unfortunately, I don't have enough cash at this time."
        else:
            if price > self.holdings[symbol][1] and price > 0.99*price_max:
                print "+++++++ Price of %s is higher than my holdings, so I'm going to sell!" % symbol
                sale_value = self.holdings[symbol][0] * price
                print "Sale value is %s" % sale_value
                print "Holdings value is %s" % self.holdings[symbol][2]
                print "Total net is %s" % (sale_value - self.holdings[symbol][2])
                self.cash += sale_value
                print "Cash is now %s" % self.cash
                del self.holdings[symbol]

    def handle_pyamqplib_delivery(self, msg):
        self.handle(msg.delivery_info["channel"], msg.delivery_info["delivery_tag"], msg.body)

    def handle(self, ch, delivery_tag, body):
        quote = pickle.loads(body)
        #print "New price for %s => %s at %s" % quote
        ch.basic_ack(delivery_tag = delivery_tag)
        print "Received message %s" % quote[3]
        self.decide_whether_to_buy_or_sell(quote)

    def monitor(self):
        self.client.monitor(self.qname, self.handle_pyamqplib_delivery)

此客户端的买卖股票策略与接收 RabbitMQ 消息的机制很好地隔离开来。

  1. monitor 是启动监听新股票报价的主要钩子。它将 handle_pyamqplib_delivery 注册为每当新报价到达时调用的回调方法。
  2. handle_pyamqplib_delivery 提取消息的重要部分并将它们交给 handle。插入此额外方法调用的原因是为了支持将 py-amqplib 替换为 pika,我们稍后将介绍。
  3. handle 反序列化消息的不透明主体,在通道上向代理确认消息的接收,然后触发其决定买卖的算法。
  4. decide_whether_to_buy_or_sell 分解股票报价的元组,然后将其价格添加到其股票代码历史记录中。它旨在收集最少数量的报价后才做出决定。您会这样吗?然后它计算趋势的最小值和最大值,如果价格相对接近最小值,则买入。但是,如果它已经持有股票,则等待价格上涨到高于其最初支付的价格。发生这种情况时,它会卖出。
遗漏的部分是 self.client.monitor 函数。self.client 是我们之前编写的 AMQP 服务的钩子,我们需要一种方法将我们的队列绑定到交换器以接收消息。以下函数需要添加到 PyAmqpLibPublisher 中。
    def monitor(self, qname, callback):
        conn = amqp.Connection(host="127.0.0.1", userid="guest", password="guest")

        ch = conn.channel()

        if not self.queue_exists:
            ch.queue_declare(queue=qname, durable=False, exclusive=False, auto_delete=False)
            ch.queue_bind(queue=qname, exchange=self.exchange_name)
            print "Binding queue %s to exchange %s" % (qname, self.exchange_name)
            #ch.queue_bind(queue=qname, exchange=self.exchange_name, routing_key=qname)
            self.queue_exists = True

        ch.basic_consume(callback=callback, queue=qname)

        while True:
            ch.wait()
        print 'Close reason:', conn.connection_close

这展示了连接到我们的 RabbitMQ 代理、声明队列、将其绑定到 fanout 交换器,然后注册回调的基本模式。

但我们不要纠结于如何使这个算法更好地挑选赢家和输家。相反,让我们认识到,这使得任何金融公司都可以通过创建一个唯一的队列,绑定到股票系统的 fanout 交换器,然后编写自己的算法来做出金融决策,从而非常容易地订阅股票报价。

用 pika 替换 py-amqplib

AMQP 是一个编写得很好的规范。它包含一个 XML 格式,支持自动生成客户端库。这意味着符合规范的库很容易替换,并且可以根据其实现的优点进行选择。Python 社区中一个流行的库是 py-amqplib。正如其项目网站上所述,它的一个限制是它会阻塞,并且目前不提供并发性。pika 则提供两者。

重点是,从 py-amqplib 迁移到 pika 其实非常容易。基于 AMQP 的方法是相同的,并且基本概念也相同。让我们看看使用 pika 编写另一个 AMQP 服务。

import pika

class PikaPublisher(object):
    def __init__(self, exchange_name):
        self.exchange_name = exchange_name
        self.queue_exists = False

    def publish(self, message, routing_key):
        conn = pika.AsyncoreConnection(pika.ConnectionParameters(
                '127.0.0.1',
                credentials=pika.PlainCredentials('guest', 'guest')))

        ch = conn.channel()

        ch.exchange_declare(exchange=self.exchange_name, type="fanout", durable=False, auto_delete=False)

        ch.basic_publish(exchange=self.exchange_name,
                         routing_key=routing_key,
                         body=message,
                         properties=pika.BasicProperties(
                                content_type = "text/plain",
                                delivery_mode = 2, # persistent
                                ),
                         block_on_flow_control = True)
        ch.close()
        conn.close()

    def monitor(self, qname, callback):
        conn = pika.AsyncoreConnection(pika.ConnectionParameters(
                '127.0.0.1',
                credentials=pika.PlainCredentials('guest', 'guest')))

        ch = conn.channel()

        if not self.queue_exists:
            ch.queue_declare(queue=qname, durable=False, exclusive=False, auto_delete=False)
            ch.queue_bind(queue=qname, exchange=self.exchange_name)
            print "Binding queue %s to exchange %s" % (qname, self.exchange_name)
            #ch.queue_bind(queue=qname, exchange=self.exchange_name, routing_key=qname)
            self.queue_exists = True

        ch.basic_consume(callback, queue=qname)

        pika.asyncore_loop()
        print 'Close reason:', conn.connection_close

这与前面展示的另一个服务非常相似。创建连接略有不同,但包含相同的数据,例如 broker 的主机以及 usernamepasswordbasic_publish 略有不同,消息及其属性一起放在方法调用内部。py-amqplib 在稍微不同的结构中声明了整个消息及其属性,然后将其作为单个参数传递给 basic_publish。规范的好处在于知道所有重要的部分都存在于这两个库中。

与 py-amqplib 相比,pika 支持不同的等待机制。py-amqplib 有阻塞式等待,而 pika 既提供阻塞机制,也提供使用Python 的 asyncore 工具进行异步操作的机制。我们可以在未来的关于 RabbitMQ 和 Python 的博客文章中探讨这一点。

这两个库之间的回调方法签名略有不同。我们需要更新我们的券商客户端以适当地处理它。

    def handle_pyamqplib_delivery(self, msg):
        self.handle(msg.delivery_info["channel"], msg.delivery_info["delivery_tag"], msg.body)

将其与 pika 的回调方法签名进行比较。

    def handle_pika_delivery(self, ch, method, header, body):
        self.handle(ch, delivery_tag, body)

它们非常接近。重要的部分都在那里。区别在于 pika 将消息的部分拆分开,而 py-amqplib 将所有部分组合在一个单一类中。这就是为什么回调方法和实际提取消息主体的方**法**之间存在解耦的原因。通过提取必要的部分,可以在这两个库之间切换,而无需重写我们的买/卖算法。

运行代码

有了所有这些代码,我们需要运行它们。编写一个运行脚本并启动它们很容易。
########################################
# To run this demo using py-amqplib,
# uncomment this block, and  comment out
# the next block.
########################################

#from amqplib_client import *
#publisher = PyAmqpLibPublisher(exchange_name="my_exchange")

########################################
# To run this demo using pika,
# uncomment this block, and comment out
# the previous block
########################################

from pika_client import *
publisher = PikaPublisher(exchange_name="my_exchange")

########################################
# This part doesn't have to change
########################################

from ticker_system import *
ticker = Ticker(publisher, "")
ticker.monitor()

这个运行程序可以在运行 py-amqplib 版本或 pika 版本的股票行情系统之间切换。现在我们只需要一个券商服务的运行程序。

########################################
# To run this demo using py-amqplib,
# uncomment this block, and  comment out
# the next block.
########################################

#from amqplib_client import *
#publisher = PyAmqpLibPublisher(exchange_name="my_exchange")

########################################
# To run this demo using pika,
# uncomment this block, and comment out
# the previous block
########################################

from pika_client import *
publisher = PikaPublisher(exchange_name="my_exchange")

########################################
# This part doesn't have to change
########################################

from buy_low_sell_high import *
buyer = Buyer(publisher, "", trend=25)
print "Buyer = %s" % id(buyer)
buyer.monitor()

在未来的博客文章中,我们可以考虑使用 Pythonic DI 容器运行相同的代码。

好的规范提供了极好的选择

AMQP 规范使得基于技术优点以外的因素选择库变得容易。通过将 AMQP 的机制与生成报价和解析报价的逻辑分离,可以很容易地替换 py-amqplib 和 pika。核心方法名称是相同的。几个参数也是相同的。但更重要的是:架构概念是相同的。现在,选择哪个库的决定不仅可以包括技术优点,还可以包括客户支持、规范合规性、同步与异步支持以及可用性等因素。

获取 Spring 时事通讯

订阅 Spring 时事通讯,保持联系

订阅

抢占先机

VMware 提供培训和认证,助您加速进步。

了解更多

获取支持

Tanzu Spring 提供 OpenJDK™、Spring 和 Apache Tomcat® 的支持和二进制文件,一次简单订阅即可获得。

了解更多

即将举行的活动

查看 Spring 社区所有即将举行的活动。

查看所有