通过使用Python的Bloomberg新数据API(COM v3)异步数据?

发布于 2021-01-29 15:07:41

有谁知道如何使用Python通过Bloomberg的新数据API(COM
v3)获取异步数据?我在wilmott.com上的下面找到了此代码,它可以正常工作,但适用于旧的API版本。

有人知道新版本的相应代码吗?

from win32com.client import DispatchWithEvents
from pythoncom import PumpWaitingMessages, Empty, Missing
from time import time

class BBCommEvent:
    def OnData(self, Security, cookie, Fields, Data, Status):
        print 'OnData: ' + `Data`

    def OnStatus(self, Status, SubStatus, StatusDescription):
        print 'OnStatus'

class TestAsync:
    def __init__(self):
        clsid = '{F2303261-4969-11D1-B305-00805F815CBF}'
        progid = 'Bloomberg.Data.1'

        print 'connecting to BBComm'        
        blp = DispatchWithEvents(clsid, BBCommEvent)
        blp.AutoRelease = False
        blp.Subscribe('EUR Curncy', 1, 'LAST_PRICE', Results = Empty)
        blp.Flush()

        end_time = time() + 5

        while 1:
            PumpWaitingMessages()
            if end_time < time():
                print 'timed out'
                break

if __name__ == "__main__":
    ta = TestAsync()
关注者
0
被浏览
58
1 个回答
  • 面试哥
    面试哥 2021-01-29
    为面试而生,有面试问题,就找面试哥。

    我终于弄明白了。我做了一些combrowse.py侦探工作,并与BBG API下载中的JAVA,C,C
    ++和.NET示例进行了比较。有趣的是,当涉及到这些事情时,彭博服务台人员知道几乎为零,或者也许我只是在和错误的人交谈。

    这是我的代码。

    automationHandler.py:

    import win32com.client
    from pythoncom import PumpWaitingMessages
    from time import time, strftime
    import constants
    
    class EventHandler:
        def OnProcessEvent(self, result):
            event = win32com.client.gencache.EnsureDispatch(result) 
            if event.EventType == constants.SUBSCRIPTION_DATA:
                self.getData(event)
            elif event.EventType == constants.SUBSCRIPTION_STATUS:
                self.getStatus(event)
            else:
                self.getMisc(event)
        def getData(self, event):
            iterator = event.CreateMessageIterator()
            while iterator.Next():
                message = iterator.Message  
                dataString = ''
                for fieldIndex, field in enumerate(constants.fields):           
                    if message.AsElement.HasElement(field):
                        element = message.GetElement(field)
                        if element.IsNull:
                            theValue = ''
                        else:
                            theValue = ', Value: ' + str(element.Value) 
                        dataString = dataString + ', (Type: ' + element.Name + theValue + ')'
                print strftime('%m/%d/%y %H:%M:%S') + ', MessageType: ' + message.MessageTypeAsString + ', CorrelationId: ' + str(message.CorrelationId) + dataString
        def getMisc(self, event):
            iterator = event.CreateMessageIterator()
            while iterator.Next():
                message = iterator.Message
                print strftime('%m/%d/%y %H:%M:%S') + ', MessageType: ' + message.MessageTypeAsString
        def getStatus(self, event):
            iterator = event.CreateMessageIterator()
            while iterator.Next():
                message = iterator.Message
                if message.AsElement.HasElement('reason'):
                    element = message.AsElement.GetElement('reason')
                    print strftime('%m/%d/%y %H:%M:%S') + ', MessageType: ' + message.MessageTypeAsString + ', CorrelationId: ' + str(message.CorrelationId) + ', Category: ' + element.GetElement('category').Value + ', Description: ' + element.GetElement('description').Value 
                if message.AsElement.HasElement('exceptions'):
                    element = message.AsElement.GetElement('exceptions')
                    exceptionString = ''
                    for n in range(element.NumValues):
                        exceptionInfo = element.GetValue(n)
                        fieldId = exceptionInfo.GetElement('fieldId')
                        reason = exceptionInfo.GetElement('reason')
                        exceptionString = exceptionString + ', (Field: ' + fieldId.Value + ', Category: ' + reason.GetElement('category').Value + ', Description: ' + reason.GetElement('description').Value + ') ' 
                    print strftime('%m/%d/%y %H:%M:%S') + ', MessageType: ' + message.MessageTypeAsString + ', CorrelationId: ' + str(message.CorrelationId) + exceptionString
    
    class bloombergSource:
        def __init__(self):
            session = win32com.client.DispatchWithEvents('blpapicom.Session' , EventHandler)
            session.Start()
            started = session.OpenService('//blp/mktdata')
            subscriptions = session.CreateSubscriptionList()
            for tickerIndex, ticker in enumerate(constants.tickers):
                if len(constants.interval) > 0:
                    subscriptions.AddEx(ticker, constants.fields, constants.interval, session.CreateCorrelationId(tickerIndex))
                else:
                    subscriptions.Add(ticker, constants.fields, session.CreateCorrelationId(tickerIndex))   
            session.Subscribe(subscriptions)
            endTime = time() + 2
            while True:
                PumpWaitingMessages()
                if endTime < time():                
                    break
    
    if __name__ == "__main__":
        aBloombergSource = bloombergSource()
    

    constants.py:

    ADMIN = 1
    AUTHORIZATION_STATUS = 11
    BLPSERVICE_STATUS = 9
    PARTIAL_RESPONSE = 6
    PUBLISHING_DATA = 13
    REQUEST_STATUS = 4
    RESOLUTION_STATUS = 12
    RESPONSE = 5
    SESSION_STATUS = 2
    SUBSCRIPTION_DATA = 8
    SUBSCRIPTION_STATUS = 3
    TIMEOUT = 10
    TOKEN_STATUS = 15
    TOPIC_STATUS = 14
    UNKNOWN = -1
    fields = ['BID']
    tickers = ['AUD Curncy']
    interval = '' #'interval=5.0'
    

    对于历史数据,我使用了以下简单脚本:

    import win32com.client
    
    session = win32com.client.Dispatch('blpapicom.Session')
    session.QueueEvents = True
    session.Start()
    started = session.OpenService('//blp/refdata')
    dataService = session.GetService('//blp/refdata')
    request = dataService.CreateRequest('HistoricalDataRequest')
    request.GetElement('securities').AppendValue('5 HK Equity')
    request.GetElement('fields').AppendValue('PX_LAST')
    request.Set('periodicitySelection', 'DAILY')
    request.Set('startDate', '20090119')
    request.Set('endDate', '20090130')
    cid = session.SendRequest(request)
    ADMIN = 1
    AUTHORIZATION_STATUS = 11
    BLPSERVICE_STATUS = 9
    PARTIAL_RESPONSE = 6
    PUBLISHING_DATA = 13
    REQUEST_STATUS = 4
    RESOLUTION_STATUS = 12
    RESPONSE = 5
    SESSION_STATUS = 2
    SUBSCRIPTION_DATA = 8
    SUBSCRIPTION_STATUS = 3
    TIMEOUT = 10
    TOKEN_STATUS = 15
    TOPIC_STATUS = 14
    UNKNOWN = -1
    stayHere = True
    while stayHere:
        event = session.NextEvent();
        if event.EventType == PARTIAL_RESPONSE or event.EventType == RESPONSE:
            iterator = event.CreateMessageIterator()
            iterator.Next()
            message = iterator.Message
            securityData = message.GetElement('securityData')
            securityName = securityData.GetElement('security')
            fieldData = securityData.GetElement('fieldData')
            returnList = [[0 for col in range(fieldData.GetValue(row).NumValues+1)] for row in range(fieldData.NumValues)]
            for row in range(fieldData.NumValues):
                rowField = fieldData.GetValue(row)
                for col in range(rowField.NumValues+1):
                    colField = rowField.GetElement(col)
                    returnList[row][col] = colField.Value
            stayHere = False
            break
    element = None
    iterator = None
    message = None
    event = None
    session = None
    print returnList
    


知识点
面圈网VIP题库

面圈网VIP题库全新上线,海量真题题库资源。 90大类考试,超10万份考试真题开放下载啦

去下载看看