python、大麦、抢票、周杰伦

学下python,用selenium来抢票试试,抢了一次,页面挤爆后退出了;这是优化后的,挤爆后会刷新继续抢;
看代码吧:

import platform
import random
import time
from threading import Thread

import requests
from selenium import webdriver
from selenium.common.exceptions import NoSuchElementException
from selenium.webdriver import ActionChains
from selenium.webdriver.android.webdriver import WebDriver
from selenium.webdriver.common.by import By
from selenium.webdriver.remote.webelement import WebElement
from selenium.webdriver.support import expected_conditions
from selenium.webdriver.support.wait import WebDriverWait

# ticket_page = 'https://detail.damai.cn/item.htm?spm=a2oeg.home.card_0.ditem_6.591b23e186kDhT&id=606050232403'
# ticket_page = 'https://detail.damai.cn/item.htm?id=607385815561'
# which_station = '昆明站'
# which_scene = '周六'
# which_price = 1580
# which_count = 3
# wait_display_count = 20
# wait_interval = 0.3
from 抢票 import usernames

default_chrome_path = 'D:\crx\chromedriver.exe'


def chrome_path():
    chrome_driver_path = default_chrome_path
    system = platform.system()
    if system == 'Darwin':
        chrome_driver_path = '/Users/yibin.cheng/Downloads/chromedriver'
    elif system == 'Windows':
        chrome_driver_path = 'D:\crx\chromedriver.exe'
    return chrome_driver_path


def options():
    chrome_option = webdriver.ChromeOptions()
    chrome_option.add_experimental_option('excludeSwitches', ['enable-automation'])
    proxy_ip = requests.get(
        "http://http.tiqu.alicdns.com/getip3?num=1&type=1&pro=0&city=0&yys=0&port=1&pack=71565&ts=0&ys=0&cs=0&lb=1&sb=0&pb=4&mr=1&regions=&gm=4").text
    #
    time.sleep(2.1)
    proxy_ip = proxy_ip.replace('\r', '').replace('\n', '')
    mobileEmulation = random.choice(
        [{
            'deviceName': "iPhone 6/7/8 Plus"}])  # {'deviceName': "iPhone X"}, {'deviceName': "iPad"},{'deviceName': "iPad Pro"}
    # chrome_option.add_argument("--proxy-server=http://{}".format(proxy_ip))
    # chrome_option.add_experimental_option('mobileEmulation', mobileEmulation)
    # 禁止加载图片
    No_Image_loading = {"profile.managed_default_content_settings.images": 2}
    chrome_option.add_experimental_option("prefs", No_Image_loading)
    # 禁止加载css样式表
    # chrome_option.set_preference('permissions.default.stylesheet', 2)
    return chrome_option


class PicTicket(object):
    def __init__(self, browser: WebDriver, username, password, ticket_url, which_station, which_scene, which_price,
                 which_count,
                 wait_interval, wait_display_count):
        self.browser = browser
        self.username = username
        self.password = password
        self.ticket_url = ticket_url
        self.which_station = which_station
        self.which_scene = which_scene
        self.which_price = which_price
        self.which_count = which_count
        self.wait_interval = wait_interval
        self.wait_display_count = wait_display_count

    def pick_ticket(self):
        self.browser.maximize_window()
        self.browser.get(self.ticket_url)
        # login_span = self.browser.find_element_by_xpath('//div[@class="login-user"]/span[@data-spm]')
        login_span = self.wait_display('//div[contains(@class, "login-user")]/span[@data-spm]')
        # login_span = self.browser.find_element_by_class_name('login-user').find_element_by_css_selector(
        #             'span[data-spm]')
        login_span.click()
        # time.sleep(0.5)

    def login_page(self):
        # frame = driver.find_element_by_id('alibaba-login-box')
        self.browser.switch_to.frame("alibaba-login-box")
        # login_input = self.browser.find_element_by_xpath('//input[@id="fm-login-id"]')
        login_input = self.wait_display('//input[@id="fm-login-id"]')
        login_password = self.browser.find_element_by_xpath('//input[@id="fm-login-password"]')
        login_input.send_keys(self.username)
        login_password.send_keys(self.password)
        time.sleep(0.1)
        self.slide_move()
        # slide = self.browser.find_element_by_id('nc_1_n1z')
        # if slide:
        #     slide_width = slide.size['width']
        #     tip = self.browser.find_element_by_id('nc_1_n1t')
        #     tip_width = tip.size['width']
        #     ##https://www.cnblogs.com/tashanzhishi/p/10761394.html 发爬虫 解决
        #     ActionChains(driver).click_and_hold(slide).perform()
        #     ActionChains(driver).move_by_offset(tip_width - slide_width, 0).perform()
        #     ActionChains(driver).release().perform()

        submit_btn = self.browser.find_element_by_css_selector('button[type="submit"]')
        submit_btn.click()

        # time.sleep(0.5)

    def slide_move(self):
        errorTip = []
        count = 0
        while (len(errorTip) > 0 or count == 0) and count < 3:
            slides = self.browser.find_elements_by_xpath('//span[@id="nc_1_n1z"]')
            # slide = self.browser.find_element_by_id('nc_1_n1z')
            if len(slides) > 0:
                slide = slides[0]
                slide_width = slide.size['width']
                if slide_width == 0:
                    print('slide width is 0')
                    return
                tip = self.browser.find_element_by_id('nc_1_n1t')
                tip_width = tip.size['width']
                slide_height = slide.size['height']
                add_height = slide_height / 3
                add_width = (tip_width - slide_width) / 3
                # 分三次滑动,模拟真实
                ##https://www.cnblogs.com/tashanzhishi/p/10761394.html 发爬虫 解决
                ActionChains(self.browser).click_and_hold(slide).perform()
                ActionChains(self.browser).move_by_offset(add_width, add_height).perform()
                time.sleep(0.1)
                ActionChains(self.browser).move_by_offset(add_width, add_height).perform()
                time.sleep(0.1)
                ActionChains(self.browser).move_by_offset(add_width, add_height).perform()
                ActionChains(self.browser).release().perform()
            try:
                errorTip = self.browser.find_element_by_xpath('//div[@class="login-error-msg"')
            except NoSuchElementException:
                pass
            count += 1

    def grab_ticket_page(self, mode: 1):
        self.wait_display('//div[@class="citylist"]')
        city_list = self.browser.find_elements_by_xpath('//div[@class="citylist"]/div')
        for city in city_list:
            if city.text == self.which_station:
                city.click()
                break
        # 场次
        scene_items = self.browser.find_elements_by_xpath(
            '//div[@class="select_left" and text()="场次"]'
            '/following-sibling::div'
            '//div[contains(@class,"select_right_list_item")]')
        if len(scene_items) == 1:
            scene_items[0].click()
        else:
            for scene in scene_items:
                if self.which_scene in scene.text:
                    scene.click()
                    break

        price_list = self.wait_display('//div[contains(@class, "select_right_list_item sku_item")]'
                                       '/div[@class="skuname" and not(@data-spm-anchor-id)]', True)
        # 扫票模式,如果出现有人退票,可立即知道并购买
        if mode == 2:
            temp = []
            for l in price_list:
                span_s = l.find_elements_by_xpath('../span')
                if len(span_s) == 0:
                    temp.append(l)
            price_list = temp
            if len(price_list) == 0:
                time.sleep(5)
                print('no ticket, refresh')
                return 'refresh'

        most_expensive = 0
        most_item = None
        for l in price_list:
            span_s = l.find_elements_by_xpath('../span')
            if len(span_s) > 0:
                continue
            temp = int(l.text[0:len(l.text) - 1])
            if temp > most_expensive:
                most_expensive = temp
                most_item = l
        if most_item is not None:
            self.scroll(most_item.text)
            most_item.click()
        else:
            # print('无票')
            # return
            pass

        # 等待
        left_items = self.browser.find_elements_by_xpath(
            '//div[@class="items card"]/div[@class="item"]/span[@class="digit"]')
        if len(left_items) > 0:
            seconds = int(left_items.pop().text)
            minutes = int(left_items.pop().text)
            hours = int(left_items.pop().text)
            days = int(left_items.pop().text)

            seconds += (minutes * 60) + (hours * 60 * 60) + (days * 24 * 60 * 60)
            time.sleep(seconds - 2)

        # 测试购买按钮是否disabled,没有disabled说明可以过买了
        buy_btn = self.browser.find_element_by_xpath('//div[contains(@class,"buybtn") and @data-spm="dbuy"]')
        clz = buy_btn.get_attribute('class')
        while 'disabled' in clz:
            time.sleep(self.wait_interval)
            buy_btn = self.browser.find_element_by_xpath('//div[contains(@class,"buybtn") and @data-spm="dbuy"]')
            clz = buy_btn.get_attribute('class')

        count = self.which_count if self.which_count else 1
        while count > 1:
            # 抢一张的时候不用获取按钮
            plus = self.wait_display('//a[contains(@class, "cafe-c-input-number-handler-up")]')
            plus.click()
            count -= 1
        buy_btn.click()
        return None

    def order_page(self):
        crash = self.browser.find_element_by_xpath('//*[contains(text(),"挤爆")]')
        if len(crash) > 0:
            return 'back'

        # first_viewer = self.browser.find_element_by_xpath('//div[contains(@class, "buyer-list-item")]')
        viewers = self.wait_display('//div[contains(@class, "buyer-list-item")]', True)
        index = 1
        for viewer in viewers:
            viewer.click()
            if index >= self.which_count:
                break

        # order_btn = self.browser.find_element_by_xpath('//div[@class="submit-wrapper"]/button')
        order_btn = self.wait_display('//div[@class="submit-wrapper"]/button')
        order_btn.click()
        time.sleep(0.1)
        order_btn.click()
        time.sleep(0.1)
        order_btn.click()
        time.sleep(10)
        return None

    # 滚动到标签
    def scroll(self, text):
        ele = self.browser.find_element_by_xpath(f'//div[contains(text(),"{text}")]')
        self.browser.execute_script("arguments[0].scrollIntoView();", ele)
        ele.click()

    def wait_display(self, xpath, multiple=False):
        ele = None
        count = 0
        while ele is None and count < self.wait_display_count:
            try:
                if multiple:
                    ele = self.browser.find_elements_by_xpath(xpath)
                    if len(ele) == 0:
                        ele = None
                        time.sleep(self.wait_interval)
                        continue
                else:
                    ele = self.browser.find_element_by_xpath(xpath)
                break
            except NoSuchElementException:
                time.sleep(self.wait_interval)
            finally:
                count += 1
        return ele


def grab_ticket(pt: PicTicket):
    try:
        pt.pick_ticket()
        pt.login_page()
        refresh = pt.grab_ticket_page()
        while refresh == 'refresh':
            pt.browser.refresh()
            refresh = pt.grab_ticket_page()
        back = pt.order_page()
        while back == 'back':
            pt.browser.back()
            back = pt.grab_ticket_page()
    except Exception as e:
        print(e)
    finally:
        if pt is not None:
            pt.browser.quit()


if __name__ == '__main__':
    system_account_threads = 5
    system_wait_interval = 0.01
    system_wait_count = 200
    # chromedriver_path = 'D:\crx\chromedriver.exe'
    ticket_url = 'https://detail.damai.cn/item.htm?id=607385815561'
    want_station = '中国香港站'
    want_day_key = '周五'
    want_price = '1380元'
    want_total = 2

    accounts = usernames.damai

    threads = []
    for account in accounts:
        for i in range(0, system_account_threads):
            driver = webdriver.Chrome(chrome_path(), options=options())
            driver.maximize_window()
            pt = PicTicket(driver, account['username'], account['password'], ticket_url, want_station, want_day_key,
                           want_price, want_total, system_wait_interval, system_wait_count)
            thread = Thread(target=grab_ticket, args=(pt,))
            thread.start()
            threads.append(thread)

    for t in threads:
        t.join()

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。