《Python 3 程序开发指南》 学习笔记
import datetime
class IncidentError(Exception): pass
class Incident:
"""
Incident是存储航空事故的数据类型
>>> kwargs = dict(report_id="2007061289X")
>>> kwargs["date"] = datetime.date(2007, 6, 12)
>>> kwargs["airport"] = "Los Angeles"
>>> kwargs["aircraft_id"] = "8184XK"
>>> kwargs["aircraft_type"] = "CVS91"
>>> kwargs["pilot_percent_hours_on_type"] = 17.5
>>> kwargs["pilot_total_hours"] = 1258
>>> kwargs["midair"] = False
>>> incident = Incident(**kwargs)
>>> incident.report_id, incident.date, incident.airport
('2007061289X', datetime.date(2007, 6, 12), 'Los Angeles')
>>> incident.aircraft_id, incident.aircraft_type
('8184XK', 'CVS91')
>>> incident.pilot_percent_hours_on_type
17.5
>>> incident.pilot_total_hours, incident.midair
(1258, False)
>>> incident.midair = 1
Traceback (most recent call last):
...
AssertionError: invalid midair
>>> incident.pilot_percent_hours_on_type = -1
Traceback (most recent call last):
...
AssertionError: invalid pilot_percent_hours_on_type
"""
def __init__(self, report_id, date, airport, aircraft_id,
aircraft_type, pilot_percent_hours_on_type,
pilot_total_hours, midair, narrative=""):
"""
:param report_id: str Minimum length 8 and no whitespace
:param date: datetime.date
:param airport: str Nonempty and no newlines
:param aircraft_id: str Nonempty and no newlines
:param aircraft_type: str Nonempty and no newlines
:param pilot_percent_hours_on_type: float Range 0.0 to 100.0
:param pilot_total_hours: int Positive and nonzero
:param midair: bool
:param narrative: str Multiline
"""
assert len(report_id) >= 8 and len(report_id.split()) == 1, \
"invalid report ID"
self.__report_id = report_id
self.date = date
self.airport = airport
self.aircraft_id = aircraft_id
self.aircraft_type = aircraft_type
self.pilot_percent_hours_on_type = pilot_percent_hours_on_type
self.pilot_total_hours = pilot_total_hours
self.midair = midair
self.narrative = narrative
@property
def report_id(self):
return self.__report_id
@property
def date(self):
return self.__date
@date.setter
def date(self, date):
assert isinstance(date, datetime.date), "invalid date"
self.__date = date
@property
def airport(self):
return self.__airport
@airport.setter
def airport(self, airport):
assert airport and '\n' not in airport, "invalid airport"
self.__airport = airport
@property
def aircraft_id(self):
return self.__aircraft_id
@aircraft_id.setter
def aircraft_id(self, aircraft_id):
assert aircraft_id and "\n" not in aircraft_id, \
"invalid aircraft_id"
self.__aircraft_id = aircraft_id
@property
def aircraft_type(self):
return self.__aircraft_type
@aircraft_type.setter
def aircraft_type(self, aircraft_type):
assert aircraft_type and "\n" not in aircraft_type, \
"invalid aircraft_type"
self.__aircraft_type = aircraft_type
@property
def pilot_percent_hours_on_type(self):
return self.__pilot_percent_hours_on_type
@pilot_percent_hours_on_type.setter
def pilot_percent_hours_on_type(self, pilot_percent_hours_on_type):
assert 0.0 <= pilot_percent_hours_on_type <= 100.0, \
"invalid pilot_percent_hours_on_type"
self.__pilot_percent_hours_on_type = pilot_percent_hours_on_type
@property
def midair(self):
"""Whether the incident involved another aircraft"""
return self.__midair
@midair.setter
def midair(self, midair):
assert isinstance(midair, bool), "invalid midair"
self.__midair = midair
@property
def narrative(self):
"""The incident's narrative"""
return self.__narrative
@narrative.setter
def narrative(self, narrative):
#assert "\n" in narrative, "invalid narrative"
self.__narrative = narrative
class IncidentCollection(dict):
"""
存放事故集
IncidentCollection继承自dict
我们改写了一些方法,使得其返回值以
keys的排序为基
"""
def values(self):
for report_id in self.keys():
yield self[report_id]
def items(self):
for report_id in self.keys():
yield (report_id, self[report_id])
def __iter__(self):
for report_id in sorted(super().keys()):#不用super()就无限递归啦
yield report_id
keys = __iter__
if __name__ == "__main__":
import doctest
doctest.testmod()
7.1 二进制数据的读与写
7.1.1 带可选压缩的Pickle
pickle没有安全机制,因此,加载来自不可信源的pickle可能是危险的。
pickle可以导入任意模块并调用任意函数,因此来自不可信源的Pickle中的数据可能会被恶意操纵。
#self是一个dict,字典的值是Incident对象(pickle可以自动处理自定义类的对象)
def export_pickle(self, filename, compress=False):
"""事故数据保存pickle"""
fh = None
try:
if compress:
fh = gzip.open(filename, "wb") #如果要求压缩,用gzip打开
else:
fh = open(filename, "wb")
pickle.dump(self, fh, pickle.HIGHEST_PROTOCOL)
return True
except (EnvironmentError, pickle.PicklingError) as err:
print("{0}: export error:{1}".format(
os.path.basename(sys.argv[0]),
err
))
return False
finally:
if fh is not None:
fh.close()
GZIP_MAGIC = b"\x1F\x8B" #gzip压缩的文件的魔数
def import_pickle(self, filename):
"""载入函数
GZIP_MAGIC 魔数 == b"\x1F\x8B"
"""
fh = None
try:
fh = open(filename, "rb")
magic = fh.read(len(GZIP_MAGIC))
if magic == GZIP_MAGIC:
fh.close()
fh = gzip.open(filename, "rb")
else:
fh.seek(0) #把文件的指针拨回到初始位置
self.clear()
self.update(pickle.load(fh))
return True
except (EnvironmentError, pickle.UnpicklingError) as err:
print("{0}: import error: {1}".format(
os.path.basename(sys.argv[0]),
err
))
finally:
if fh is not None:
fh.close()
可pickled 的类型
布尔型、数值型以及字符串都可以pickled,类(包括自定义类)的实例也可以pickled,前提是其私有的dict是picklable。此外,内置的组合类型也能pickled。
bytes and bytearray
bytes 属于不变 bytearray不然
b1 = "小刚的蓝色水笔"
b2 = b1.encode()
b2
#b'\xe5\xb0\x8f\xe5\x88\x9a\xe7\x9a\x84\xe8\x93\x9d\xe8\x89\xb2\xe6\xb0\xb4\xe7\xac\x94'
b2.decode() # "小刚的蓝色水笔"
b1 = "小刚的蓝色水笔"
b2 = bytearray(b1.encode())
for i in b2:
print(type(i), i, hex(i))
b2
"""从这里就可以明白那一堆东西是啥了 就是16进制编码?"""
<class 'int'> 229 0xe5
<class 'int'> 176 0xb0
<class 'int'> 143 0x8f
<class 'int'> 229 0xe5
<class 'int'> 136 0x88
<class 'int'> 154 0x9a
<class 'int'> 231 0xe7
<class 'int'> 154 0x9a
<class 'int'> 132 0x84
<class 'int'> 232 0xe8
<class 'int'> 147 0x93
<class 'int'> 157 0x9d
<class 'int'> 232 0xe8
<class 'int'> 137 0x89
<class 'int'> 178 0xb2
<class 'int'> 230 0xe6
<class 'int'> 176 0xb0
<class 'int'> 180 0xb4
<class 'int'> 231 0xe7
<class 'int'> 172 0xac
<class 'int'> 148 0x94
bytearray(b'\xe5\xb0\x8f\xe5\x88\x9a\xe7\x9a\x84\xe8\x93\x9d\xe8\x89\xb2\xe6\xb0\xb4\xe7\xac\x94')
"""汉字是3个8位?"""
b1 = "小刚的蓝色水笔"
b2 = bytearray(b1.encode())
b2_1 = "小红".encode()
b2_2 = "粉色".encode()
b2[:6] = b2_1
b2[9:15] = b2_2
b2.decode() #“小红的粉色水笔”
b = "ABCDE".encode()
b1= "A".encode()
b[0], b[:1] # (65, b'A')
b[0] == b1 # False
b[0] == b1[0] # True
b[:1] == b1 # True
语法 | 描述 |
---|---|
ba.append(i) | 将整数i(0~255)附加到bytearray ba中 |
b.capitalize() | 返回bytes/bytearray b 的副本,并且第一个字符变为大写(如果是一个ASCII字符) |
b.center(width, byte) | 返回b的副本,b在长度位width的区域中间,并使用空格或给定的byte填充 |
b.count(x, start, end) | 返回bytes/bytearray x在bytes/bytearray b(或切片)中出现的次数 |
b.decode(encoding, error) | 返回一个str对象,代表使用UTF-8编码表示的(或指定encoding表示并根据可选的error参数进行错误处理)字节 |
b.endwith(x, start, end) | 如果b(或b的start:end分片)以bytes/bytearray x或元组x中任意bytes/bytearray结尾,就返回True,否则返回False |
b.expandtabs(size) | 返回bytes/bytearray b的副本,并且其中的制表符使用空格(个数为8的倍数,或指定的size)替代 |
ba.extend(seq) | 使用序列seq中的所有ints对bytearray ba进行扩展,所有ints必须在0到255之间 |
b.find(x, start, end) | 返回bytes/bytearray x在b(或b的start:end分片)中最左边的位置,如果没有找到,就返回-1.使用rfind()可以找到最右边的位置 |
b.fromhex(h) | 返回一个bytes对象,其字节对应的是str h中的十六进制整数 |
b.index(x, start, end) | 返回x在b(或b的start:end分片)中最左边的位置,如果没找到,就产生ValueError异常。使用rindex()方法可以找到最右边的位置 |
ba.insert(p, i) | 将整数i(取值范围0到255)插入到ba中的位置p处 |
b.isalnum() | 如果bytes/bytearray b 非空,并且b中的每个字符都是ASCII字母数字字符就返回True |
b.isalpha() | 如果bytes/bytearray b 非空,并且b中的每个字符都是ASCII字母字符,就返回True |
b.isdigit() | ...ASCII数字... |
b.islower() | 如果bytes/bytearray b包含至少一个可小写的ASCII字符,并且其所有可小写的字符都是小写的,就返回True |
b.isspace() | 如果bytes/bytearray b非空,并且b中的每个字符都是ASCII空格字符,就返回True |
b.istitle() | 如果b是非空并且首字母大写的,就返回True |
b.isupper() | 如果bytes/bytearray b包含至少一个可大写的ASCII字符,并且所有可大写的字符都是小写的,就返回True |
b.join(seq) | 返回序列seq中每个bytes/bytearray 进行连接后所得的结果,并在每俩个之间添加一个b(可以为空) |
b.ljust(width, byte) | 返回byte/bytearray b 的副本,并且要求左对齐,长度为width, 使用空格或给定的byte(可选的)进行填充。使用rjust()方法可以右对齐 |
b.lower() | 返回bytes/bytearray b的副本,其中ASCII字符都为小写 |
b.partition(sep) | 返回一个元组,其中包含3个bytes对象——包括b的最左边bytes/bytearray seq之前的那部分、seq本身和b中seq之后的那部分;如果b中不包含sep,就返回b以及俩个为空的bytes对象。使用rpartition()方法可以在sep的最右边出现处进行分割。 |
ba.pop(p) | 移除并返回ba中索引位置p处的整数 |
ba.remove(i) | 从bytearray ba 中移除整数i的首次出现 |
b.replace(x, y, n) | 返回b的一个副本,其中bytes.bytearray x的每个(或最多n个,如果给定)出现都用y进行替代 |
ba.reverse() | 反转bytearray ba的字节 |
b.split(x, n) | 返回一个字节列表,在x处进行分割(至多n次), 如果没有给定n,就在可能的地方都进行分割;如果没有给定x,就在空白字符处进行分割。使用rsplit()可以从右边开始分割 |
b.splitlines(f) | 返回对b进行分割(在行终结符处)后产生的行列表,如果f不为True,就剥离掉行终结符 |
b.startswith(x, start, end) | 如果bytes/bytearray b(或b的start:end分片)以bytes/bytearrays)引导,就返回True,否则返回False |
b.strip(x) | 返回b的副本,并剥离掉开始与结尾处的空白字符(或bytes/bytearray x中的字节), lstrip()只剥离起始处,rstrip()只剥离结尾处的 |
b.title() | 返回b的副本,其中每个字的第一个ASCII字符都是大写的,其他所有ASCII字符则都是小写的 |
b.translate(bt, d) | 返回b的一个副本,其中不包括来自d的字节,并且每个字节都被bytes bt的相应字节替换 |
b.upper() | 返回bytes/bytearray b的副本,其中ASCII字符都变为大写 |
b.zfill(w) | 返回b的副本,如果长度小于w,就使用引导字符(0x30)进行填充,使其长度为w |
7.1.2 带可选压缩的原始二进制数据
Little-endian, Big-endian | 小端,大端,低位,高位
7.2 文本文件的写入与分析
7.2.1 写入文本
def export_text(self, filename):
"""
输出文本,使用了textwrap模块
:param filename:
:return:
"""
wrapper = textwrap.TextWrapper(initial_indent=" ",
subsequent_indent=" ")
fh = None
try:
fh = open(filename, "w", encoding="utf8")
for incident in self.values():
narrative = "\n".join(wrapper.wrap(incident.narrative.strip()))
fh.write("[{0.report_id}]\n"
"date={0.date!s}\n"
"aircraft_id={0.aircraft_id}\n"
"aircraft_type={0.aircraft_type}\n"
"airport={airport}\n"
"pilot_percent_hours_on_type="
"{0.pilot_percent_hours_on_type}\n"
"pilot_total_hours={0.pilot_total_hours}\n"
"midair={0.midair:d}\n"
".NARRATIVE_START.\n{narrative}\n"
".NARRATIVE_END.\n\n".format(
incident,
airport=incident.airport.strip(),
narrative=narrative
))
return True
except EnvironmentError as err:
print("{0}: import error: {1}".format(
os.path.basename(sys.argv[0]),
err
))
finally:
if fh is not None:
fh.close()
7.2.2 分析文本
def import_text_manual(self, filename):
"""
读入文本
:param filename:
:return:
"""
fh = None
try:
fh = open(filename, encoding="utf8")
self.clear()
data = {}
narrative=None
for lino, line in enumerate(fh, start=1):
line = line.rstrip()#清除尾部空白字符,如果是空行就相当于跳过
if not line and narrative is None:
continue
if narrative is not None: #说明已经读入到结尾了
if line == ".NARRATIVE_END.":
data["narrative"] = textwrap.dedent(narrative).strip()
if len(data) != 9: #说明读入数据有缺失
raise IncidentError("missing data on"
"line {0}".format(
lino
))
incident = Incident(**data)
self[incident.report_id] = incident
data = {}
narrative = None
else:
narrative += line + "\n"
elif (not data and line[0] == "[" #说明读入的是id
and line[-1] == "]"):
data["report_id"] = line[1:-1]
elif "=" in line: #说明读入的是incident信息
key, value = line.split("=", 1)
if key == "date":
data[key] = datetime.datetime.strptime(value,
"%Y-%m-%d").date()
elif key == "pilot_percent_hours_on_type":
data[key] = float(value)
elif key == "pilot_total_hours":
data[key] = int(value)
elif key == "midair":
data[key] = bool(int(value))
else:
data[key] = value
elif line == ".NARRATIVE_START.":
narrative = ""
else:
raise KeyError("parsing error on line {0}".format(
lino
))
return True
except (EnvironmentError, ValueError, KeyError,
IncidentError) as err:
print("{0}: import error: {1}".format(
os.path.basename(sys.argv[0]),
err
))
finally:
if fh is not None:
fh.close()
7.2.3 使用正则表达式分析文本
def import_text_regex(self, filename):
"""
使用正则表达式分析文本
:param filename:
:return:
"""
incident_re = re.compile(
r"\[(?P<id>[^]]+)\](?P<keyvalues>.+?)" #id ID
r"^\.NARRATIVE_START\.$(?P<narrative>.*?)"#keyvalues 中间的信息
r"^\.NARRATIVE_END\.$", #narrative narrative
re.DOTALL|re.MULTILINE #标记
)
key_value_re = re.compile(
r"^\s*(?P<key>[^=]+)\s*=\s*"
r"(?P<value>.+)\s*$",
re.MULTILINE
)
fh = None
try:
fh = open(filename, encoding="utf8")
self.clear()
for incident_match in incident_re.finditer(fh.read()):
data = {}
data["report_id"] = incident_match.group("id")
data["narrative"] = textwrap.dedent(
incident_match.group("narrative")
).strip()
keyvalues = incident_match.group("keyvalues")
for match in key_value_re.finditer(keyvalues):
data[match.group("key")] = match.group("value")
data["date"] = datetime.datetime.strptime(
data["date"], "%Y-%m-%d"
).date()
data["pilot_percent_hours_on_type"] = float(data[
"pilot_percent_hours_on_type"])
data["pilot_total_hours"] = int(data[
"pilot_total_hours"
])
data["midair"] = bool(int(data["midair"]))
if len(data) != 9: #如果长度不足9说明数据缺失
raise IncidentError("missing data")
incident = Incident(data)
self[incident.report_id] = incident
return True
except (EnvironmentError, KeyError, ValueError,
IncidentError) as err:
print("{0}: import error: {1}".format(
os.path.basename(sys.argv[0]),
err
))
finally:
if fh is not None:
fh.close()
7.3 写入与分析XML文件
<?xml version="1.0" encoding="UTF-8"?>
<incidents>
<incident report_id="20070222008009G", date="2007-02-22"
aircraft_id="880342" aircraft_type="CE-172-M"
pilot_percent_hours_on_type="9.09090909091"
pilot_total_hour="448" midair="0">
<airport>BOWERMAN</airport>
<narrative>
On A GO-AROUND FROM A NIGHT CROSSWIND LANDING ATTEMPT THE AIRCRAFT HIT A
RUN WAY EDGE LIGHT DAMAGING ONE PROPELLER.
</narrative>
</incident>
<incident>
...
</incident>
<incident>
...
</incident>
<incident>
...
</incident>
7.3.1元素树
#import xml
def export_xml_etree(self, filename):
"""使用元素树写入XML文件"""
root = xml.etree.ElementTree.Element("incidents")
for incident in self.values():
element = xml.etree.ElementTree.Element("incident",
report_id=incident.report_id,
date=incident.date.isoformat(),
aircraft_id=incident.aircraft_id,
aircraft_type=incident.aicraft_type,
pilot_percent_hours_on_type=str(
incident.pilot_percent_hours_on_type
),
pilot_total_hours=str(incident.pilot_total_hours),
midair=str(int(incident.midair)))
airport = xml.etree.ElementTree.SubElement(element, "airport")
airport.text = incident.airport.strip()
narrative = xml.etree.ElementTree.SubElement(element, "narrative")
narrative.text = incident.narrative.strip()
root.append(element)
tree = xml.etree.ElementTree.ElementTree(root)
try:
tree.write(filename, "UTF-8")
except EnvironmentError as err:
print("{0}: import error: {1}".format(
os.path.basename(sys.argv[0]),
err
))
return True
def import_xml_tree(self, filename):
"""利用元素树读入xml文件"""
try:
tree = xml.etree.ElementTree.parse(filename)
except (EnvironmentError,
xml.parsers.expat.Expaterror) as err:
print("{0}: import error: {1}".format(
os.path.basename(sys.argv[0]),
err
))
self.clear()
for element in tree.finall("incident"):
try:
data = {}
for attribute in ("report_id", "date", "aircraft_id",
"aircraft_type",
"pilot_percent_hours_on_type",
"pilot_total_hours", "midair"):
data[attribute] = element.get(attribute)
data["date"] = datetime.datetime.strptime(
data["date"], "%Y-%m-%d"
).date()
data["pilot_percent_hours_on_type"] = (
float(data["pilot_percent_hours_on_type"])
)
data["pilot_total_hours"] = (
int(data["pilot_total_hours"])
)
data["midair"] = bool(int(data["midair"]))
data["airport"] = element.find("airport").text.strip()
narrative = element.fiond("narrative").text
data["narrative"] = (narrative.strip()
if narrative is not None else "")
incident = Incident(**data)
self[incident.report_id] = incident
except (ValueError, LookupError, IncidentError) as err:
print("{0}: import error: {1}".format(
os.path.basename(sys.argv[0]),
err
))
return False
return True
7.3.2 DOM
def export_xml_dom(self, filename):
"""使用DOM写入xml"""
dom = xml.dom.minidom.getDOMimplementation()
tree = dom.createDocument(None, "incidents", None)
root = tree.documentElement#取回根节点
for incident in self.values():
element = tree.createElement("incident")
for attribute, value in (
("report_id", incident.report_id),
("date", incident.date.isoformat()),
("aircraft_id", incident.aircraft_id),
("aircraft_type", incident.aircraft_type),
("pilot_percent_hours_on_type",
str(incident.pilot_percent_hours_on_type)),
("pilot_total_hours",
str(incident.pilot_total_hours)),
("midair", str(int(incident.midair)))
):
element.setAttribute(attribute, value)
for name, text in (("airport", incident.airport),
("narrative", incident.narrative)):
text_element = tree.createTextNode(text)
name_element = tree.createElement(name)
name_element.appendChild(text_element)
element.appendChild(name_element)
root.appendChild(element)
fh = None
try:
fh = open(filename, "w", encoding="utf8")
tree.writexml(fh, encodeing="UTF-8")
return True
except EnvironmentError as err:
print("{0}: import error: {1}".format(
os.path.basename(sys.argv[0]),
err
))
finally:
if fh is not None:
fh.close()
def import_xml_dom(self, filename):
"""使用Dom读入xml"""
def get_text(node_list):
"""处理子节点文档"""
text = []
for node in node_list:
if node.nodeType == node.TEXT_NODE:
text.append(node.data)
return "".join(text).strip()
try:
dom = xml.dom.minidom.parse(filename)
except (EnvironmentError,
xml.parsers.expat.Expaterror) as err:
print("{0}: import error: {1}".format(
os.path.basename(sys.argv[0]),
err
))
return False
self.clear()
for element in dom.getElementsByTagName("incident"):
try:
data = {}
for attribute in ("report_id", "date", "aircraft_id",
"aircraft_type",
"pilot_percent_hours_on_type",
"pilot_total_hours", "midair"):
data[attribute] = element.getAttribute(attribute)
data["date"] = datetime.datetime.strptime(
data["date"], "%Y-%m-%d"
).date()
data["pilot_percent_hours_on_type"] = \
float(data["pilot_percent_hours_on_type"])
data["pilot_total_hours"] = \
float(data["pilot_total_hours"])
data["midair"] = bool(int(data["midair"]))
airport = element.getElementByTagName("airport")[0]
data["airport"] = get_text(airport.childNodes)
narrative = element.getElementByTagName(
"narrative")[0]
data["narrative"] = get_text(narrative.childNodes)
incident = Incident(**data)
self[incident.report_id] = incident
except (ValueError, LookupError, IncidentError) as err:
print("{0}: import error: {1}".format(
os.path.basename(sys.argv[0]),
err
))
return False
return True
7.3.3 手动写入XML
def export_xml_manual(self, filename):
"""手动写入xml"""
fh = None
try:
fh = open(filename, "w", encoding="utf8")
fh.write('<?xml version="1.0" encoding="UTF-8"?>\n')
fh.write("<incidents>\n")
for incident in self.values():
fh.write('<incident report_id={report_id}'
'date="{0.date!s}"'
'aircraft_id={aircraft_id}'
'aircraft_type={aircraft_type}'
'pilot_percent_hours_on_type='
'"{0.pilot_percent_hours_on_type}"'
'pilot_total_hours="{0.pilot_total_hours}"'
'midair="{0.midair:d}">\n'
'<airport>{airport}</airport>\n'
'<narrative>\n{narrative}\n</narrative>\n'
'</incident>\n'.format(
incident,
report_id=xml.sax.saxutils.quoteattr(incident.report_id),
aircraft_id=xml.sax.saxutils.quoteattr(incident.aircraft_id),
aircraft_type=xml.sax.saxutils.quoteattr(incident.aircraft_type),#好像对引号也转义
airport=xml.sax.saxutils.escape(incident.airpot), #对 & < >等进行转义好像对引号不转义
narrative="\n".join(textwrap.wrap(
xml.sax.saxutils.escape(incident.narrative.strip()), 70
))
))
fh.write("</incidents>\n")
return True
except EnvironmentError as err:
print("{0}: import error: {1}".format(
os.path.basename(sys.argv[0]),
err
))
return False
finally:
if fh is not None:
fh.close()
7.3.4 使用SAX分析XML
与元素树和DOM在内存种表示整个XML文档不同的是,SAX分析其是逐步读入并处理的,从而可能更快,对内存的需求也不那么明显。然而,性能的优势不能仅靠假设,尤其是元素树与DOM都是用了快速的expat分析器。
class IncidentSaxHandler(xml.sax.handler.ContentHandler):
def __init__(self, incidents):
super().__init__()#对所有子类而言,这是一种好的做法
self.__data = {}
self.__text = ""
self.__incidents = incidents #相当于只拷贝对象引用
self.__incidents.clear() #所以需要清空
def startElement(self, name, attributes):
"""处理开始标签
读取到开始标签的时,都会以标签名一起标签属性作为参数
来调用xml.sax.handler.ContentHandler.startElement()方法,
当然,在这里我们对此进行了改写
"""
if name == "incident":
self.__data = {}
for key, value in attributes.items():
if key == "date":
self.__data[key] = datetime.datetime.strptime(
value, "%Y-%m-%d"
).date()
elif key == "pilot_percent_hours_on_type":
self.__data[key] = float(value)
elif key == "pilot_total_hours":
self.__data[key] = int(value)
elif key == "midair":
self.__data[key] = bool(int(value))
else:
self.__data[key] = value
self.__text = ""
def endElement(self, name):
"""
读取到结束标签时,将调用
xml.sax.handler.ContentHandler.endElement()方法,
当然我们对此进行了改写
:param name:
:return:
"""
if name == "incident":
if len(self.__data) != 9:
raise IncidentError("missing data")
incident = Incident(**self.__data)
self.__incidents[incident.report_id] = incident
elif name in frozenset({"airport", "narrative"}):
self.__data[name] = self.__text.strip()
self.text = ""
def characters(self, text):
"""
读取到文本时,SAX分析器将调用
xml.sax.handler.ContentHandler.characters()方法
:param text:
:return:
"""
self.__text += text
def import_xml_sax(self, filename):
"""使用SAX来分析xml"""
fh = None
try:
handler = IncidentSaxHandler(self) #文本分析器
parser = xml.sax.make_parser()
parser.setContentHandle(handler)#添加文本分析器
parser.parse(filename)
return True
except (EnvironmentError, ValueError, IncidentError,
xml.sax.SAXParseException) as err:
print("{0}: import error: {1}".format(
os.path.basename(sys.argv[0]),
err
))
return False
7.4 随机存取二进制文件
前面几节中,工作的基础是程序的所有数据都是作为一个整体读入内容,进行适当处理,最后在作为整体鞋出。有些情况下,将数据放在磁盘上,并只读入需要的部分,处理之后再将变化的部分写回磁盘,这是一种更好的解决方案。
7.4.1 通用的BinaryRecordFile类
BinaryRecoordFile.BinaryRecoordFile类的API类似于列表,因为我们可以获取/设置/删除给定的索引位置的记录。记录被删除后,只是简单地标记为“已删除”,这使得我们不必移动该记录后面地所有记录来保证连续性,也意味着删除操作之后,所有原始的索引位置仍然是有效的。另一个好处是,只要取消“已删除”标记,就可以反删除一条记录。当然,这种方法即便删除了记录,也仍然不能节省任何磁盘空间。为既觉这一问题,我们将提供适当的方法来“压缩”文件,移除已删除的记录(并使得改索引位置无效)。
Contact = struct.Struct("<15si")
contacts = BinartRecordFile.BinaryRecordFile(filename, Contact.size)
我们以"<15si"格式(小端字节顺序,一个15字节的字节字符串,一个4字节的有符号整数)创建了一个结构,用于表示每条记录。
之后创建了了一个BinartRecordFile.BinaryRecordFile实例,并使用一个文件名和一个记录大小做参数。如果文件不存在,会自动创建文件。
contacts[4] = Contact.pack("Abe Baker".encode("utf8"), 762)
contacts[5] = Contack.pack("Cindy Dove"encode("utf8), 987)
上面的操作对文件相应地方进行了改写。如果,索引位置处前没有记录,就用0x00字节填充,"Abe Baker"少于15个字节,后面也用0x00填充
文件对象属性与方法(表)
语法 | 描述 |
---|---|
f.close() | 关闭文件对象f,并将属性f.closed设置为True |
f.closed | 文件已关闭,则返回True |
f.encoding | bytes与str之间进行转换时使用的编码 |
f.fileno() | 返回底层文件的文件描述符(只对那些有文件描述符的文件对象是有用的) |
f.flush() | 清空文件对象f,这个翻译有毒啊,应该是刷新当前缓冲区,让缓冲区的内容立马写入文件,而无需等待。所以,上面加flush()的作用,应该就是如果设置为True,每一次写入后,都立马再从缓冲区写入文件,而不是等待。 |
f.isatty() | 如果文件对象与控制台关联,就返回True(只有在文件对象应用了真正的文件时才是可用的) |
f.mode | 文件对象打开时使用的模式, 只读 |
f.name | 文件对象f的文件名(如果有) |
f.newlines | 文本文件f中的换行字符串类型 |
f._next_() | 返回文件对象f的下一行 |
f.peek(n) | 返回n个字节,而不移动文件指针的位置 |
f.read(count) | 从文件对象f中读取至多count个字节,如果没有指定count,就读取从当前文件指针直至最后的每个字节。以二进制模式读时,返回bytes对象,以文本模式读时,返回str对象。如果没有要读的内容(以到文件结尾),就返回一个空的bytes或str对象 |
f.readable() | 如果f已经打开等待读取,就返回True |
f.readinto(ba) | 将至多len(ba)个字节读入到bytearray ba中,并返回读入的字节数——如果在文件结尾,就为0(只有在二进制模式下才可用) |
f.readline(count) | 读取下一行(如果指定count,并且在\n字符之前满足这一数值,那么至多读入count个字节),包括\n |
f.readlines(sizehint) | 读入到文件结尾之前的所有行,并以列表形式返回。如果给定sizehint,那么读入大概至多sizehint个字节(如果底层文件支持) |
f.seek(offset, whence) | 如果没有给定whence,或其为os.SEEK_SET,就按给定的offset(相对于文件起始点)移动文件指针(并作为下一次读、写的起点);如果whence为os.SEEK_CUR,就相当于当前文件指针位置将其移动offset(可以为负值)个(wgence为os.SEEK_END,则是相对于文件结尾)。在追加模式"a"下,写入总是在结尾处进行的,而不管文件指针在何处。在文本模式下,只应该使用tell()方法的返回值作为offset |
f.seekable() | 如果f支持随机存取,就返回True |
f.tell() | 返回当前指针位置(相对于文件起始处) |
f.truncate(size) | 截取文件到当前文件指针所在位置,如果给定size,就到size大小处 |
f.writable() | 如果f是为写操作而打开的,就返回True |
f.write(s) | 将bytes/bytearray对象s写入文件(该文件以二进制模式打开),或将str对象s写入到文件(该文件以文本模式打开) |
f.writelines(seq) | 将对象序列(对文本文件而言是字符串,对二进制文件而言是字节字符串)写入到文件 |
import os
#每条记录都已一个"state"字节引导,该字节或者是_DELETED,
#或者是_OKAY,如果是空记录,就是b'\x00'
_DELETED = b"\x01"
_OKAY = b"\x02"
class BinaryRecordFile:
def __init__(self, filename, record_size,
auto_flush=True):
#因为包含状态字节,所以比用户看到的长度多1
self.__record_size = record_size + 1
#如果文件存在就写入,否则为读
mode = "w+b" if not os.path.exists(filename) else "r+b"
self.__fh = open(filename, mode)
self.auto_flush = auto_flush #若为True,读之前写之后将清空
@property
def record_size(self):
return self.__record_size - 1
@property
def name(self):
return self.__fh.name
def flush(self):
self.__fh.flush()
def close(self):
self.__fh.close()
def __setitem__(self, index, record):
assert isinstance(record, (bytes, bytearray)),\
"binary data required"
assert len(record) == self.record_size, (
"record must be exactly {0} bytes".format(
self.record_size
)
)
self.__fh.seek(index * self.__record_size)
self.__fh.write(_OKAY)
self.__fh.write(record)
if self.auto_flush:
self.__fh.flush() #为什么要清空呢?
def __getitem__(self, index):
"""[] get"""
self.__seek_to_index(index)
state = self.__fh.read(1) #读取一个字节
if state != _OKAY: #如果state字节不是_OKAY,说明该条记录为空或被删除
return None
return self.__fh.read(self.record_size)
def __seek_to_index(self, index):
"""指针检查+拨转"""
if self.auto_flush:
self.__fh.flush()
self.__fh.seek(0, os.SEEK_END) #将指针拨到文件末尾
end = self.__fh.tell()
offset = index * self.__record_size
if offset >= end: #判断Index是否超出范围
raise IndexError("no record at index position {0}".format(
index
))
self.__fh.seek(offset)
def __delitem__(self, index):
"""删除某条记录"""
self.__seek_to_index(index)
state = self.__fh.read(1)
if state != _OKAY:
return
self.__fh.seek(index * self.__record_size)
self.__fh.write(_DELETED)
if self.auto_flush:
self.__fh.flush()
def undelete(self, index):
"""恢复记录(如果已经被删除了的话"""
self.__seek_to_index(index)
state = self.__fh.read(1)
if state == _DELETED:
self.__fh.seek(index * self.__record_size)
self.__fh.write(_OKAY)
if self.auto_flush:
self.__fh.flush()
return True
return False
def __len__(self):
"""返回记录数目,包含非_OKAY标记的记录"""
if self.auto_flush:
self.__fh.flush()
self.__fh.seek(0, os.SEEK_END)
end = self.__fh.tell()
return end // self.__record_size
def inplace_compact(self):
"""直接压缩,我感觉有许多多余的操作"""
index = 0
length = len(self)
while index < length:
"""感觉有点蠢蠢的"""
self.__seek_to_index(index)
state = self.__fh.read(1)
if state != _OKAY:
for next in range(index + 1, length):
self.__seek_to_index(next)
state = self.__fh.read(1)
if state == _OKAY:
self[index] = self[next]
del self[next]
break
else: #此else对应for只有正常循环结束才执行这部分代码
break
index += 1
#难道不是直接self.__fh.truncate(index * self.__record_size)
self.__seek_to_index(0)
state = self.__fh.read(1)
if state != _OKAY:
self.__fh.truncate(0)
else:
limit = None
for index in range(len(self)-1, 0, -1):
self.__seek_to_index(index)
state = self.__fh.read(1)
if state != _OKAY:
limit = index
else:
break
if limit is not None:
self.__fh.truncate(limit * self.__record_size)
self.__fh.flush()
def compact(self, keep_backup=False):
"""利用压缩文件和备份文件来压缩"""
compactfile = self.__fh.name + ".$$$"
backupfile = self.__fh.name + ".bak"
self.__fh.flush()
self.__fh.seek(0)
fh = open(compactfile, "wb")
while True:
data = self.__fh.read(self.__record_size)
if not data:
break
if data[:1] == _OKAY: #data[0] 获取的将是一个整数
fh.write(data)
fh.close()
self.__fh.close()
os.rename(self.__fh.name, backupfile)
os.rename(compactfile, self.__fh.name)
if not keep_backup:
os.remove(backupfile)
self.__fh = open(self.__fh.name, "r+b")
if __name__ == "__main__":
import doctest
doctest.testmod()
7.4.2 实例: BikeStock模块的类
import struct
from practice import BinaryRecordFile
class Bike:
def __init__(self, identity, name, quantity, price):
assert len(identity) > 3, ("invalid bike identity {0}".format(
identity
))
self.__identity = identity #自行车ID
self.name = name
self.quantity = quantity #自行车数量
self.price = price #自行车单价
@property
def identity(self):
return self.__identity
@property
def value(self):
return self.quantity * self.price
_BIKE_STRUCT = struct.Struct("<8s30sid")
def _bike_from_record(record):
ID, NAME, QUANTITY, PRICE = range(4)
parts = list(_BIKE_STRUCT.upack(record))
parts[ID] = parts[ID].decode("utf8").rstrip("\x00")
parts[NAME] = parts[NAME].decode("utf8").rstrip("\x00")
return Bike(*parts)
def _record_from_bike(bike):
return _BIKE_STRUCT.pack(bike.identity.encode("utf8"),
bike.name.encode("utf8"),
bike.quantity, bike.price
)
class BikeStock:
def __init__(self, filename):
self.__file = BinaryRecordFile.BinaryRecordFile(filename,
_BIKE_STRUCT.size)
self.__index_from_identity = {}
for index in range(len(self.__file)):
record = self.__file[index]
if record is not None:
bike = _bike_from_record(record)
self.__index_from_identity[bike.identity] = index
def append(self, bike):
index = len(self.__file)
self.__file[index] = _record_from_bike(bike)
self.__index_from_identity[bike.bike.identity] = index
def __delitem__(self, identity):
del self.__file[self.__index_from_identity[identity]]
def __getitem__(self, identity):
record = self.__file[self.__index_from_identity[identity]]
return None if record is None else _bike_from_record(record)
def __change_stock(self, identity, amount):
index = self.__index_from_identity[identity]
record = self.__file[index]
if record is None:
return False
bike = _bike_from_record(record)
bike.quantity += amount
self.__file[index] = _record_from_bike(bike)
return True
increase_stock = (lambda self, identity, amount:
self.__change_stock(identity, amount))
decrease_stock = (lambda self, identity, amount:
self.__change_stock(identity, -amount))
def __change_name(self, identity, name):
index = self.__index_from_identity[identity]
record = self.__file[index]
if record is None:
return False
bike = _bike_from_record(record)
bike.name = name
self.__file[index] = _record_from_bike(bike)
return True
def __change_price(self, identity, price):
index = self.__index_from_identity[identity]
record = self.__file[index]
if record is None:
return False
bike = _bike_from_record(record)
bike.price = price
self.__file[index] = _record_from_bike(bike)
return True
def __iter__(self):
for index in range(len(self.__file)):
record = self.__file[index]
if record is not None:
yield _bike_from_record(record)