一、获取股票K线行情数据
以查询苹果股票为例,我们可以用下面的代码查询苹果一分钟的K线:
import time import requests import json # Extra headers test_headers = { 'Content-Type':'application/json' } ''' github:https://github.com/alltick/realtime-forex-crypto-stock-tick-finance-websocket-api 申请免费token:https://alltick.co/register 官网:https://alltick.co 将如下JSON进行url的encode,复制到http的查询字符串的query字段里 {"trace":"python_http_test1","data":{"code":"AAPL.US","kline_type":1,"kline_timestamp_end":0,"query_kline_num":2,"adjust_type":0}} ''' test_url1 = 'https://quote.tradeswitcher.com/quote-stock-b-api/kline?token=e945d7d9-9e6e-4721-922a-7251a9d311d0-1678159756806&query=%7B%22trace%22%3A%22python_http_test1%22%2C%22data%22%3A%7B%22code%22%3A%22AAPL.US%22%2C%22kline_type%22%3A1%2C%22kline_timestamp_end%22%3A0%2C%22query_kline_num%22%3A2%2C%22adjust_type%22%3A0%7D%7D' resp1 = requests.get(url=test_url1, headers=test_headers) # Decoded text returned by the request text1 = resp1.text print(text1)
注意,如果想查询其它类型的K线数据则kline_type传入以下值:1-分钟K,2-为5分钟K,3-为15分钟K,4-为30分钟K,5-为小时K,6-为2小时K,7-为4小时K,8-为日K,9-为周K,10-为月K。
二、获取最新成交报价数据
import time import requests import json # Extra headers test_headers = { 'Content-Type':'application/json' } ''' github:https://github.com/alltick/realtime-forex-crypto-stock-tick-finance-websocket-api 申请免费token:https://alltick.co/register 官网:https://alltick.co 将如下JSON进行url的encode,复制到http的查询字符串的query字段里 {"trace":"python_http_test2","data":{"symbol_list":[{"code": "700.HK"},{"code": "UNH.US"},{"code": "600416.SH"}]}} ''' test_url1 = 'https://quote.tradeswitcher.com/quote-stock-b-api/trade-tick?token=e945d7d9-9e6e-4721-922a-7251a9d311d0-1678159756806&query=%7B%22trace%22%3A%22python_http_test2%22%2C%22data%22%3A%7B%22symbol_list%22%3A%5B%7B%22code%22%3A%20%22700.HK%22%7D%2C%7B%22code%22%3A%20%22UNH.US%22%7D%2C%7B%22code%22%3A%20%22600416.SH%22%7D%5D%7D%7D' resp1 = requests.get(url=test_url1, headers=test_headers) # Decoded text returned by the request text1 = resp1.text print(text1)
三、通过websocket订阅获取实时股票行情数据
import json import websocket # pip install websocket-client ''' github:https://github.com/alltick/realtime-forex-crypto-stock-tick-finance-websocket-api 申请免费token:https://alltick.co/register 官网:https://alltick.co ''' class Feed(object): def __init__(self): self.url = 'wss://quote.tradeswitcher.com/quote-stock-b-ws-api?token=e945d7d9-9e6e-4721-922a-7251a9d311d0-1678159756806' # 这里输入websocket的url self.ws = None def on_open(self, ws): """ Callback object which is called at opening websocket. 1 argument: @ ws: the WebSocketApp object """ print('A new WebSocketApp is opened!') # 开始订阅(举个例子) sub_param = { "cmd_id": 22002, "seq_id": 123, "trace":"3baaa938-f92c-4a74-a228-fd49d5e2f8bc-1678419657806", "data":{ "symbol_list":[ { "code": "700.HK", "depth_level": 5, }, { "code": "UNH.US", "depth_level": 5, }, { "code": "600416.SH", "depth_level": 5, } ] } } #如果希望长时间运行,除了需要发送订阅之外,还需要修改代码,定时发送心跳,避免连接断开,具体查看接口文档 sub_str = json.dumps(sub_param) ws.send(sub_str) print("depth quote are subscribed!") def on_data(self, ws, string, type, continue_flag): """ 4 argument. The 1st argument is this class object. The 2nd argument is utf-8 string which we get from the server. The 3rd argument is data type. ABNF.OPCODE_TEXT or ABNF.OPCODE_BINARY will be came. The 4th argument is continue flag. If 0, the data continue """ def on_message(self, ws, message): """ Callback object which is called when received data. 2 arguments: @ ws: the WebSocketApp object @ message: utf-8 data received from the server """ # 对收到的message进行解析 result = eval(message) print(result) def on_error(self, ws, error): """ Callback object which is called when got an error. 2 arguments: @ ws: the WebSocketApp object @ error: exception object """ print(error) def on_close(self, ws, close_status_code, close_msg): """ Callback object which is called when the connection is closed. 2 arguments: @ ws: the WebSocketApp object @ close_status_code @ close_msg """ print('The connection is closed!') def start(self): self.ws = websocket.WebSocketApp( self.url, on_open=self.on_open, on_message=self.on_message, on_data=self.on_data, on_error=self.on_error, on_close=self.on_close, ) self.ws.run_forever() if __name__ == "__main__": feed = Feed() feed.start()
四、获取最新盘口报价数据
import time import requests import json # Extra headers test_headers = { 'Content-Type':'application/json' } ''' github:https://github.com/alltick/realtime-forex-crypto-stock-tick-finance-websocket-api 申请免费token:https://alltick.co/register 官网:https://alltick.co 将如下JSON进行url的encode,复制到http的查询字符串的query字段里 {"trace":"python_http_test2","data":{"symbol_list":[{"code": "700.HK"},{"code": "UNH.US"},{"code": "600416.SH"}]}} ''' test_url1 = 'https://quote.tradeswitcher.com/quote-stock-b-api/depth-tick?token=e945d7d9-9e6e-4721-922a-7251a9d311d0-1678159756806&query=%7B%22trace%22%3A%22python_http_test2%22%2C%22data%22%3A%7B%22symbol_list%22%3A%5B%7B%22code%22%3A%20%22700.HK%22%7D%2C%7B%22code%22%3A%20%22UNH.US%22%7D%2C%7B%22code%22%3A%20%22600416.SH%22%7D%5D%7D%7D' resp1 = requests.get(url=test_url1, headers=test_headers) # Decoded text returned by the request text1 = resp1.text print(text1)
AllTick金融行情API免费试用
AllTick提供稳定可靠的金融行情数据,产品覆盖面广,包括港股CFD、美股CFD、外汇、商品、加密货币等。目前提供免费试用,有需要可以注册账户,并使用上面演示的代码做测试。有任何问题可以与我们的客服联系。