Rabbit MQ Python Version

product

# encoding=utf-8
import pika
import json

system_exchange = "exchange"

# Create a new instance of PlainCredentials
credentials = pika.PlainCredentials('', '')

# Create a new ConnectionParameters instance
parameters = pika.ConnectionParameters('', , '', credentials)


class MessageQueue(object):
    """
    define queue
    """

    def __init__(self, channel, queue_name):
        self.channel = channel
        self.queue_name = queue_name
        self.durable = False

    def get_message(self):
        # Get a single message from the AMQP broker. Returns a sequence with
        # the method frame, message properties, and body.
        return self.channel.basic_get(self.queue_name)

    def message_count(self):
        return self.channel.queue_declare(self.queue_name, durable=self.durable).method.message_count

    def set_consumer(self, callback, auto_ack=True):
        # Sends the AMQP command Basic.Consume to the broker and binds messages
        # for the consumer_tag to the consumer callback. If you do not pass in
        # a consumer_tag, one will be automatically generated for you. Returns
        # the consumer tag.
        self.channel.basic_consume(on_message_callback=callback, queue=self.queue_name, auto_ack=auto_ack)


class MessageChannel(object):

    def __init__(self):
        # Create a new instance of the Connection object
        self.connection = pika.BlockingConnection(parameters)
        # Create a new channel with the next available channel number or pass
        # in a channel number to use. Must be non-zero if you would like to
        # specify but it is recommended that you let Pika manage the channel
        # numbers
        self.channel = self.connection.channel()
        # This method creates an exchange if it does not already exist, and if
        # the exchange exists, verifies that it is of the correct and expected
        # class.
        self.channel.exchange_declare(exchange=system_exchange, exchange_type='topic')
        # Specify quality of service. This method requests a specific quality
        # of service. The QoS can be specified for the current channel or for all
        # channels on the connection. The client can request that messages be sent
        # in advance so that when the client finishes processing a message, the
        # following message is already held locally, rather than needing to be
        # sent down the channel. Prefetching gives a performance improvement
        self.channel.basic_qos(prefetch_count=1)

    def define_queue(self, queue_name, routing_key=None, exclusive=False):
        """
        define queue and routing_key
        :param queue_name:
        :param routing_key:
        :param exclusive:
        :return:
        """
        # Declare queue, create if needed. This method creates or checks a
        # queue. When creating a new queue the client can specify various
        # properties that control the durability of the queue and its contents,
        # and the level of sharing for the queue
        self.channel.queue_declare(queue=queue_name, durable=False, exclusive=exclusive)
        if routing_key:
            # Bind the queue to the specified exchange
            self.channel.queue_bind(queue=queue_name, exchange=system_exchange, routing_key=routing_key)
        return MessageQueue(self.channel, queue_name)

    def publish(self, routing_key, msg):
        # Publish to the channel with the given exchange, routing key, and body.
        self.channel.basic_publish(body=msg, exchange=system_exchange, routing_key=routing_key)
        print(" [x] Sent %s" % msg)

    def start_consuming(self):
        # Processes I/O events and dispatches timers and `basic_consume`
        # callbacks until all consumers are cancelled.
        self.channel.start_consuming()

    def close(self):
        self.channel.close()
        self.connection.close()


if __name__ == "__main__":
    channel = MessageChannel()
    send = 0
    while send <= 30:
        mq = channel.define_queue("kaola")
        message_count = mq.message_count()
        if message_count < 10:
            channel.publish("", json.dumps({'url': ''}))
            send += 1
    channel.close()



customer&product

# encoding=utf-8
import pika
import asyncio
import time

from pyppeteer import launch

# credentials = pika.PlainCredentials('', '')
# parameters = pika.ConnectionParameters('', 5672, '', credentials)
# connection = pika.BlockingConnection(parameters)
# channel = connection.channel()

# channel.queue_declare(queue='hello')

from my_daniel import MessageChannel, MessageQueue

print(' [*] Waiting for messages. To exit press CTRL+C')


def open_url(url):
    async def get_data(url):
        browser = await launch(headless=False, userDataDir='./userdata', args=['--disable-infobars'])
        await browser.pages()
        page = await browser.newPage()
        await page.goto(url)
        await page.setViewport({
            'width': 1350,
            'height': 850
        })
        frame = page
        await frame.evaluate(
            '''() =>{ Object.defineProperties(navigator,{ webdriver:{ get: () => false } }) }''')
        time.sleep(6)
        page_source = await page.content()
        await page.close()
        await browser.close()
        return page_source

    loop = asyncio.new_event_loop()
    asyncio.set_event_loop(loop)
    return loop.run_until_complete(get_data(url))


def callback(ch, method, properties, body):
    import json
    body = json.loads(body)
    print(body['url'])
    print(" [x] Received %r" % (body,))
    page_source = open_url(body["url"])
    channel.publish("harvested", json.dumps({"page_source": page_source}))
    # channel.basic_publish(exchange="exchange",routing_key="harvested", body=json.dumps({"harvested": {"title":"nihao","price":1259,"page_source":str(page_source)}}))


if __name__ == '__main__':
    channel = MessageChannel()
    mq = channel.define_queue("kaola")
    mq.set_consumer(callback)
    channel.start_consuming()
    channel.close()

customer

# encoding:utf-8
from my_daniel import MessageChannel
receive_channel = MessageChannel()
# receive_channel.define_queue('harvested')
mq = receive_channel.define_queue("harvested")


def callback(ch, method, properties, body):
    print(body)


# mq.set_consumer(callback)
print(mq.message_count())
receive_channel.start_consuming()
receive_channel.close()
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 194,491评论 5 459
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 81,856评论 2 371
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 141,745评论 0 319
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,196评论 1 263
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 61,073评论 4 355
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,112评论 1 272
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,531评论 3 381
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,215评论 0 253
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,485评论 1 290
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,578评论 2 309
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,356评论 1 326
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,215评论 3 312
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,583评论 3 299
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 28,898评论 0 17
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,174评论 1 250
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,497评论 2 341
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 40,697评论 2 335