(2023.07.01 Sat @SZ 颖隆大厦)
由JSON生成FIX message
JSON序列作为用户/开发者提供的信息,成为被转化为FIX message的输入。该过程中建议使用Python pydantic model实现数据校验。
基本步骤如下
- 提供基础信息,生成JSON序列
- 通过pydantic model对JSON序列校验
- 将校验过的JSON序列复制给
quickfix
中对应的字段
首先由用户提供基础信息,生成JSON序列,该案例中 仅列出部分信息
userinfo =
{
"body": {
"deliver_to_location_id": "XX",
"sending_time": "20220122-08:18:07.578",
"cl_ord_id": "TXXX001",
"side": "B",
"symbol": "HKD",
"order_qty": 9981,
"currency": "EUR",
"ord_type": "1",
"transact_time": "20230315-13:36:42.113",
"trade_date": "20220122",
"security_id_source": "A",
"settl_date": "20220826",
"num_days_interest": 31,
"security_type": "CHEQUE",
"product": 4,
"maturity_date": "20220926",
"action_type": "N",
"coupon_day_count": 7,
"text": "this is a test text",
}
}
定义pydantic model,包含FIX中的必要字段。
import datetime
from typing import Union, Optional
from pydantic import BaseModel
class SomeModel(BaseModel):
cl_ord_id: str # 11
currency: str # 15
msg_seq_num: int = 1 # 34
order_qty: Union[float, int] # 38
ord_type: str # 40
sender_comp_id: str = "Placeholder" # by default, set to Placeholder# 49
sending_time: str = datetime.datetime.now().isoformat().replace("-", "").replace("T", "-")[:21] # set to now string by default, 52
side: str # 54
symbol: str # 55
transact_time: str # 60
trade_date: str # 75
deliver_to_location_id: str = "FX" # 145
security_id_source: str # 22
num_days_interest: int # 157, int
security_type: str # 167
product: int # 460
maturity_date: str # 541
action_type: str # 6812
coupon_day_count: int # 1950
text: str = "" # 58
将输入的JSON序列转换为SomeModel
对象。
user_info = SomeModel(**userinfo["body"])
此时查看 user_info
对象,返回如下
>> user_info
SomeModel(cl_ord_id='TXXX001', currency='EUR', msg_seq_num=1, order_qty=9981.0, ord_type='1', sender_comp_id='Placeholder', sending_time='20220122-08:18:07.578', side='B', symbol='HKD', transact_time='20230315-13:36:42.113', trade_date='20220122', deliver_to_location_id='XX', security_id_source='A', num_days_interest=31, security_type='CHEQUE', product=4, maturity_date='20220926', action_type='N', coupon_day_count=7, text='this is a test text')
考虑到quickfix
中的字段命名采用Camel case,仲需要将JSON中 的snake case字段名做变换。
name_mapping = {key: "".join(t.capitalize() for t in key.split("_"))
for key in SomeModel.__fields__.keys()}
接下来将用户输入的信息传递给FIX对象,采用Python中的quickfix
工具包生成FIX message。赋值方式:给quickfix
中的对应字段的对象赋值,并将该对象传递给quickfix.Message
方法。
import quickfix as fix
message = fix.Message()
# header initialisation
header = message.getHeader()
header.setField(fix.BeginString(fix.BeginString_FIX44))
header.setField(fix.MsgType(fix.MsgType_NewOrderSingle)) # 39 = D
for u in userinfo.dict().keys():
if u == "side" or "time" in u.lower():
continue
print(u)
try:
tmpcmd = f"fix.{name_mapping[u]}(userinfo.{u})"
print(tmpcmd)
tmp = eval(tmpcmd)
message.setField(tmp)
except:
print(f"key = {u}")
str(message).replace("\x01", "|")
输出结果如下:
cl_ord_id
fix.ClOrdID(userinfo.cl_ord_id)
currency
fix.Currency(userinfo.currency)
msg_seq_num
fix.MsgSeqNum(userinfo.msg_seq_num)
order_qty
fix.OrderQty(userinfo.order_qty)
ord_type
fix.OrdType(userinfo.ord_type)
sender_comp_id
fix.SenderCompID(userinfo.sender_comp_id)
sending_time
fix.SendingTime(userinfo.sending_time)
-------ERROR------
symbol
fix.Symbol(userinfo.symbol)
-------ERROR------
transact_time
fix.TransactTime(userinfo.transact_time)
-------ERROR------
trade_date
fix.TradeDate(userinfo.trade_date)
deliver_to_location_id
fix.DeliverToLocationID(userinfo.deliver_to_location_id)
security_id_source
fix.SecurityIDSource(userinfo.security_id_source)
num_days_interest
fix.NumDaysInterest(userinfo.num_days_interest)
security_type
fix.SecurityType(userinfo.security_type)
product
fix.Product(userinfo.product)
maturity_date
fix.MaturityDate(userinfo.maturity_date)
action_type
fix.ActionType(userinfo.action_type)
-------ERROR------
coupon_day_count
fix.CouponDayCount(userinfo.coupon_day_count)
-------ERROR------
text
fix.Text(userinfo.text)
'8=FIX.4.4|9=140|35=D|11=TXXX001|15=EUR|22=A|34=1|38=9981|40=1|
49=Placeholder|58=this is a test text|75=20220122|145=XX|
157=31|167=CHEQUE|460=4|541=20220926|10=005|'
(2023.07.02 Sun @SZ 颖隆大厦)
上面的案例只是给出了最简形式的实现 。从运行结果的log中发现有若干error,包括所有和timestamp
有关的对象无法赋值,quickfix
不包含的方法/对象无法赋值和quickfix.Symbol
无法赋值。
下面的代码是对上面代码的细化,针对不同情况使用不同的处理方式。
首先定义不包含在quickfix
中的方法和对象,ActionType
和CouponDayCount
,是Bloomberg FX系统中使用的对象。
class ActionType(fix.StringField):
number = 6812
required = False
messages = []
groups = []
def __init__(self, value=None):
if value is None:
fix.StringField.__init__(self, self.number)
else:
fix.StringField.__init__(self, self.number, value)
class CouponDayCount(fix.IntField):
number = 1950
required = False
messages = []
groups = []
def __init__(self, value=None):
if value is None:
fix.IntField.__init__(self, self.number)
else:
fix.IntField.__init__(self, self.number, value)
for u in userinfo.dict().keys():
# for different objects in FIX, diff methods are used to process data
if name_mapping[u] not in dir(fix):
# user-defined object
user_defined_key = name_mapping[u]
print(f"user defined key: {user_defined_key}")
class_type = eval(f"{user_defined_key}.__base__.__name__")
if class_type.lower().startswith("string"):
tmp = eval(f"{user_defined_key}('{userinfo.dict()[u]}')")
elif class_type.lower().startswith("int"):
tmp = eval(f"{user_defined_key}({userinfo.dict()[u]})")
message.setField(tmp)
continue
tmp_key = eval(f"fix.{name_mapping[u]}")
print(f"type(tmp_key) = {type(tmp_key)}")
if isinstance(tmp_key, str):
# string-type object
print(1)
continue
tmp_type = tmp_key.__base__.__name__
if tmp_type in ("UtcTimeStampField"):
# timestamp object
tmp_key = eval(f"fix.{name_mapping[u]}()")
tmp_value = getattr(userinfo, f"{u}") # eval(f"userinfo.{u}")
tmp_key.setString(tmp_value)
header.setField(tmp_key)
elif tmp_type in ("StringField", "IntField"):
# string/int object
tmpcmd = f"fix.{name_mapping[u]}(userinfo.{u})"
print(tmpcmd)
tmp = eval(tmpcmd)
message.setField(tmp)
elif tmp_type in ("CharField"):
# char, side
if u == "side":
side_mapping = {"B": fix.Side_BUY, "S": fix.Side_SELL}
tmp = eval(f"fix.Side('{side_mapping[userinfo.dict()[u]]}')")
message.setField(tmp)
运行结果
>> str(message).replace("\x01", "|")
'8=FIX.4.4|9=209|35=D|52=20220122-08:18:07.578|60=20230315-13:36:42.113|
11=TXXX001|15=EUR|22=A|34=1|38=9981|40=1|49=Placeholder|54=1|
58=this is a test text|75=20220122|145=XX|157=31|167=CHEQUE|460=4|
541=20220926|1950=7|6812=N|10=042|'
对于quickfix中没有的对象,根据对象类型由quickfix.StringField
等类继承从而实现定义,对于timestamp类型对象,将该对象赋值给header,而对于side
对象,则加入判断并加入引号实现赋值。
在FIX message中加入repeated group
(2023.07.08, 09 Sat, Sun @SZ 汇泰大厦)
生成FIX message过程中不时需要加入group,比如party/counterparty group, note group(Bloomberg-specific)等。
FIX中group信息中可能不只包含一条信息,也就是相同的tag可能会出现多次。在传输group信息到FIX中之前需要先从quickfix.Group
继承创建新的group类。
下面案例创建一个计算party个数的group。tag 453用于计算累计有多少个party,而tag 448, 452和447是每个party中对应的信息。
import quickfix as fix
class NoPartyIds(fix.Group):
number = 453
required = False
messages = ['PARTY']
def __init__(self):
fields = fix.IntArray(3)
fields[0] = 448
fields[1] = 452
fields[2] = 447
fields[3] = 0
fix.Group.__init__(self, self.number, 453, fields)
另需要从pydantic.BaseModel
中创建一个PartyModel,其中的tag 447是optional,即用户传递值的时候可不对447赋值。
class Parties(BaseModel):
"""there may be more than one parties, which share the same key
"""
party_id: str # 448
party_id_source: Optional[str] = None # 447
party_role: int # 452
同时要在SomeModel
中加入Parties
,否则数据验证时无法将用户初始化的Party
相关数据传入FIX信息。对前面已经定义的SomeModel
做继承,加入新的parties
变量,默认状态为None
。
class SomeModel(SomeModel):
parties: Optional[list[Parties]] = None
用户输入的数据也加入相应字段。
user_msg['body'].update({'parties': [{'party_id': 'p1',
'party_role': 0}, {'party_id': 'p2', 'party_id_source': 'blp',
'party_role': 1}]})
并重新进行数据验证
user_info = SomeModel(**userinfo["body"])
也需要重新对name_mapping做定义
name_mapping = {key: "".join(t.capitalize() for t in key.split("_"))
for key in SomeModel.__fields__.keys()}
接下来在FIX信息的赋值过程中,加入处理group的部分。(代码需debug)
for u in userinfo.dict().keys():
# for different objects in FIX, diff methods are used to process data
if name_mapping[u] not in dir(fix):
# user-defined object
user_defined_key = name_mapping[u]
print(f"user defined key: {user_defined_key}")
class_type = eval(f"{user_defined_key}.__base__.__name__")
if class_type.lower().startswith("string"):
tmp = eval(f"{user_defined_key}('{userinfo.dict()[u]}')")
elif class_type.lower().startswith("int"):
tmp = eval(f"{user_defined_key}({userinfo.dict()[u]})")
elif class_type.lower().startswith("basemodel"):
for i, t in enumerate(userinfo.dict()[u]):
if u == "parties":
grp = NoPartyIds()
print(f"i = {i}")
print(f"t = {t}")
for k in t.keys():
if t[k] is None:
print(f"{k} has an empty value, simply jump")
continue
# tmp = f"fix.{name_mapping[k]}('{userinfo.dict()[u][i][k]}')"
# print(f"tmp = {tmp}")
# eval_command = eval(tmp)
# print(f"eval_command: {eval_command}")
#class_
print(f"k = {k}")
class_type1 = eval(f"fix.{name_mapping[k]}.__base__.__name__")
print(f"class type 1: {class_type1}")
tmp_eval = eval(f"userinfo.dict()['{u}'][{i}]['{k}']")
print(f"tmp_eval: {tmp_eval}")
if class_type1.lower() in ("stringfield"):
t1 = f"fix.{name_mapping[k]}('{tmp_eval}')"
print(f"t1 = {t1}")
tmp = eval(t1)
elif class_type1.lower() in ("intfield"):
tmp = eval(f"fix.{name_mapping[k]}({tmp_eval})")
grp.setField(tmp)
message.addGroup(grp)
print(f"message after adding group: {str(message)}")
message.setField(tmp)
continue
tmp_key = eval(f"fix.{name_mapping[u]}")
print(f"type(tmp_key) = {type(tmp_key)}")
if isinstance(tmp_key, str):
# string-type object
print(1)
continue
tmp_type = tmp_key.__base__.__name__
if tmp_type in ("UtcTimeStampField"):
# timestamp object
tmp_key = eval(f"fix.{name_mapping[u]}()")
tmp_value = getattr(userinfo, f"{u}") # eval(f"userinfo.{u}")
tmp_key.setString(tmp_value)
header.setField(tmp_key)
elif tmp_type in ("StringField", "IntField"):
# string/int object
tmpcmd = f"fix.{name_mapping[u]}(userinfo.{u})"
print(tmpcmd)
tmp = eval(tmpcmd)
message.setField(tmp)
elif tmp_type in ("CharField"):
# char, side
if u == "side":
side_mapping = {"B": fix.Side_BUY, "S": fix.Side_SELL}
tmp = eval(f"fix.Side('{side_mapping[userinfo.dict()[u]]}')")
message.setField(tmp)
str(message).replace("\x01", "|")