# -*- coding: utf-8 -*-
# 时间工具
import time
import numpy as nu
import util
class Date(object):
"""
日期对象
"""
__seconds = 0
def __init__(self, seconds=None):
"""
初始化时间对象
:param seconds: 时间戳秒
"""
self.setTime(seconds)
def setTime(self, seconds=None):
"""
设置时间戳
:param seconds:
:return:
"""
if seconds is None:
seconds = self.now()
self.__seconds = seconds
def timeFormat(self, format='%Y-%m-%d %H:%M:%S', seconds=None):
"""
时间格式化
:param format: 格式化方案
:param seconds: 秒 如果不填则默认为当前对象秒
:return:
"""
seconds = self.__check_seconds(seconds)
return time.strftime(format, time.localtime(seconds))
def getSecond(self, seconds=None):
"""
获取秒钟
:param seconds:
:return:
"""
seconds = self.__check_seconds(seconds)
return time.strftime('%S', time.localtime(seconds))
def getSeparate(self, seconds=None):
"""
获取分钟
:param seconds:
:return:
"""
seconds = self.__check_seconds(seconds)
return time.strftime('%M', time.localtime(seconds))
def getHour(self, seconds=None):
"""
获取小时
:param seconds:
:return:
"""
seconds = self.__check_seconds(seconds)
return time.strftime('%H', time.localtime(seconds))
def getDay(self, seconds=None):
"""
获取月中的日
:param seconds:
:return:
"""
seconds = self.__check_seconds(seconds)
return time.strftime('%d', time.localtime(seconds))
def getMonth(self, seconds=None):
"""
获取月
:param seconds:
:return:
"""
seconds = self.__check_seconds(seconds)
return time.strftime('%m', time.localtime(seconds))
def getYear(self, seconds=None):
"""
获取年
:param seconds:
:return:
"""
seconds = self.__check_seconds(seconds)
return time.strftime('%Y', time.localtime(seconds))
def __check_seconds(self, seconds=None):
"""
校验时间戳-秒 参数
:param seconds:
:return:
"""
if seconds is None:
if self.__seconds is None:
seconds = self.now()
else:
seconds = self.__seconds
return seconds
@staticmethod
def now(mode='s'):
"""
获取时间戳方案
:param mode: s 秒 ms 毫秒 ns 纳秒
:return:
"""
time_mode = nu.array(util.TimeMode)
if not (time_mode == mode).any():
mode = util.TimeMode[0]
ns = str(time.time_ns())
if mode == util.TimeMode[0]:
return int(ns[:len(ns) - 9])
if mode == util.TimeMode[1]:
return int(ns[:len(ns) - 6])
if mode == util.TimeMode[2]:
return int(ns)
# -*- coding: utf-8 -*-
# 异常对象组
class AuthException(Exception):
"""
自定义异常对象 - 带消息参数方案
"""
def __init__(self, msg):
self.msg = msg
def __str__(self):
return self.msg
# -*- coding: utf-8 -*-
# 随机工具
from exceptions import AuthException
from util.time_util import Date
class OrderNumber(object):
"""
订单号生成对象
"""
# 开始时间戳
__twepoch = 1420041600000
# 机器id所占的位数
__worker_id_bits = 5
# 数据标识id所占的位数
__datacenter_id_bits = 5
# 支持的最大机器id
__max_worker_id = -1 ^ (-1 << __worker_id_bits)
# 支持的最大标识id
__max_datacenter_id = -1 ^ (-1 << __datacenter_id_bits)
# 序列在id中占的位数
__sequence_bits = 12
# 机器id向左偏移位
__worker_id_shift = __sequence_bits
# 数据标识id向左偏移数
__datacenter_id_shift = __sequence_bits + __worker_id_shift
# 时间戳向左偏移数
__timestamp_left_shift = __sequence_bits + __worker_id_bits + __datacenter_id_bits
# 生成序列的掩码
__sequence_mask = -1 ^ (-1 << __sequence_bits)
# 当前工作的机器id
__worker_id = 0
# 毫秒内序列
__datacenter_id = 0
# 毫秒内序列
__sequence = 0
# 上一次生成id时的时间戳
__last_timestamp = -1
def __init__(self, worker_id=0, datacenter_id=0):
"""
构造函数
:param worker_id: 工作id
:param datacenter_id: 数据中心id
"""
if worker_id > self.__max_worker_id or worker_id < 0:
raise AuthException("worker Id can't be greater than {} or less than 0".format(self.__max_worker_id))
if datacenter_id > self.__max_datacenter_id or datacenter_id < 0:
raise AuthException(
"datacenter Id can't be greater than %d or less than 0".format(self.__max_datacenter_id))
self.__worker_id = worker_id
self.__datacenter_id = datacenter_id
@staticmethod
def __time_gen():
"""
获取当前时间(毫秒级)
:return:
"""
return Date().now('ms')
def __time_next_millis(self, last_timestamp: int):
"""
阻塞到下一毫秒级,直到获取到新的时间戳
:param last_timestamp: 上一次生成id的时间戳
:return:
"""
timestamp = self.__time_gen()
while timestamp <= last_timestamp:
timestamp = self.__time_gen()
return timestamp
def next_id(self):
"""
获取下一个id
:return:
"""
timestamp = self.__time_gen()
if timestamp < self.__last_timestamp:
raise AuthException("Clock moved backwards. Refusing to generate id for {} milliseconds".format(
self.__last_timestamp - timestamp))
if self.__last_timestamp == timestamp:
self.__sequence = (self.__sequence + 1) & self.__sequence_mask
if self.__sequence == 0:
timestamp = self.__time_next_millis(self.__last_timestamp)
else:
self.__sequence = 0
self.__last_timestamp = timestamp
return ((timestamp - self.__twepoch) << self.__timestamp_left_shift) | (
self.__datacenter_id << self.__datacenter_id_shift) | (
self.__worker_id << self.__worker_id_shift) | self.__sequence
# demo
# import util.random_util as ur
# import threading
#
#
# orders = []
# threads = []
#
#
# def for_run(worker_id=0, datacenter_id=0, thread_num=1):
# order = ur.OrderNumber(worker_id, datacenter_id)
# for i in range(0, 2000):
# order_number = str(order.next_id())
# orders.append(order_number)
# print('进程{}: 生成的订单号为:'.format(str(thread_num)) + order_number)
#
#
# if __name__ == '__main__':
# for j in range(0, 30):
# t = threading.Thread(target=for_run, args=(j, 0, j))
# threads.append(t)
# for t in threads:
# t.setDaemon(True)
# t.start()
# for t in threads:
# t.join()
# print(len(orders))
# orders = list(set(orders))
# print(len(orders))