]>
Commit | Line | Data |
---|---|---|
1 | import usb.core | |
2 | import usb.util | |
3 | import array | |
4 | import struct | |
5 | import binascii | |
6 | import time | |
7 | ||
8 | class ProtocolError(Exception): | |
9 | pass | |
10 | ||
11 | class SPIProtocolError(Exception): | |
12 | def __init__(self, cmd, rescode): | |
13 | lut = { | |
14 | 3: "Resource in use", | |
15 | 4: "Resource not opened", | |
16 | 12: "Invalid enum" | |
17 | } | |
18 | if rescode in lut: | |
19 | err = lut[rescode] | |
20 | else: | |
21 | err = "error code %d" % rescode | |
22 | ||
23 | ProtocolError.__init__(self, "Command %s failed with error: %s" %( cmd, | |
24 | err)) | |
25 | ||
26 | class M25P10(object): | |
27 | STAT_BUSY = 0x1 | |
28 | STAT_WEL = 0x2 | |
29 | ||
30 | CMD_GET_STATUS = 0x05 | |
31 | CMD_WRITE_ENABLE = 0x6 | |
32 | CMD_READ_ID = 0x9F | |
33 | CMD_WAKEUP = 0xAB | |
34 | CMD_CHIP_ERASE = 0xC7 | |
35 | CMD_PAGE_PROGRAM = 0x02 | |
36 | CMD_FAST_READ = 0xB | |
37 | ||
38 | def __init__(self, iofn): | |
39 | self.io = iofn | |
40 | ||
41 | def wakeup(self): | |
42 | self.io([self.CMD_WAKEUP]) | |
43 | ||
44 | def setWritable(self): | |
45 | self.io([self.CMD_WRITE_ENABLE]) | |
46 | ||
47 | def chipErase(self): | |
48 | self.setWritable() | |
49 | self.io([self.CMD_CHIP_ERASE]) | |
50 | self.waitDone() | |
51 | ||
52 | def read(self, addr, size): | |
53 | return self.io([self.CMD_FAST_READ, (addr>>16) & 0xFF, (addr>>8)&0xFF, addr & | |
54 | 0xFF, 0x00], size+5)[5:] | |
55 | def pageProgram(self, addr, buf): | |
56 | self.setWritable() | |
57 | assert len(buf) <= 256 | |
58 | assert addr & 0xFF == 0 | |
59 | ||
60 | self.io([self.CMD_PAGE_PROGRAM, (addr>>16) & 0xFF, (addr>>8)&0xFF, addr & | |
61 | 0xFF] + list(buf)) | |
62 | self.waitDone() | |
63 | ||
64 | def waitDone(self): | |
65 | while self.getStatus() & self.STAT_BUSY: | |
66 | pass | |
67 | ||
68 | def getStatus(self): | |
69 | return self.io([self.CMD_GET_STATUS],2)[1] | |
70 | ||
71 | def getID(self): | |
72 | return self.io([self.CMD_READ_ID],4)[1:] | |
73 | ||
74 | ||
75 | class ICE40Board(object): | |
76 | ||
77 | CMD_GET_BOARD_TYPE = 0xE2 | |
78 | CMD_GET_BOARD_SERIAL = 0xE4 | |
79 | ||
80 | class __ICE40BoardComm(object): | |
81 | def __init__(self, dev): | |
82 | self.__is_open = False | |
83 | self.dev = dev | |
84 | ||
85 | def __enter__(self): | |
86 | self.open() | |
87 | return self | |
88 | ||
89 | def __exit__(self, type, err, traceback): | |
90 | self.__cleanup() | |
91 | ||
92 | def __del__(self): | |
93 | self.__cleanup() | |
94 | ||
95 | def __cleanup(self): | |
96 | if self.__is_open: | |
97 | self.close() | |
98 | ||
99 | def open(self): | |
100 | assert not self.__is_open | |
101 | self.dev.checked_cmd(0x04, 0x00, "bcommopen", [0x00], noret=True) | |
102 | self.__is_open = True | |
103 | ||
104 | def close(self): | |
105 | assert self.__is_open | |
106 | self.dev.checked_cmd(0x04, 0x01, "bcommclose", [0x00], noret=True, | |
107 | ) | |
108 | self.__is_open = False | |
109 | ||
110 | def __check_counts(self, status, resb, wr, rd): | |
111 | if status & 0x80: | |
112 | wb = struct.unpack('<L', resb[:4])[0] | |
113 | resb = resb[4:] | |
114 | #print (wb, write_byte_count) | |
115 | assert wb == wr | |
116 | ||
117 | if status & 0x40: | |
118 | rb = struct.unpack('<L', resb[:4])[0] | |
119 | resb = resb[4:] | |
120 | assert rb == rd | |
121 | ||
122 | def readReg(self, regno): | |
123 | self.dev.checked_cmd(0x04, 0x05, "bcommsetval", [0x00, regno, | |
124 | 0x01, 0x00, 0x00, 0x0]) | |
125 | res = self.dev.ep_datain.read(1) | |
126 | status, pl = self.dev.cmd(0x04, 0x85, [0x00]) | |
127 | self.__check_counts(status, pl, 0, 1) | |
128 | return res[0] | |
129 | ||
130 | def writeReg(self, regno, value): | |
131 | self.dev.checked_cmd(0x04, 0x04, "bcommsetval", [0x00, regno, | |
132 | 0x01, 0x00, 0x00, 0x0]) | |
133 | self.dev.ep_dataout.write([value]) | |
134 | status, pl = self.dev.cmd(0x04, 0x85, [0x00]) | |
135 | self.__check_counts(status, pl, 1, 0) | |
136 | ||
137 | def readMulti(self, addrs): | |
138 | pass | |
139 | ||
140 | def writeMulti(self, addrvals): | |
141 | pass | |
142 | ||
143 | class __ICE40GPIO(object): | |
144 | def __init__(self, dev): | |
145 | self.__is_open = False | |
146 | self.dev = dev | |
147 | ||
148 | def __enter__(self): | |
149 | self.open() | |
150 | return self | |
151 | ||
152 | def __exit__(self, type, err, traceback): | |
153 | self.__cleanup() | |
154 | ||
155 | def __del__(self): | |
156 | self.__cleanup() | |
157 | ||
158 | def open(self): | |
159 | assert not self.__is_open | |
160 | # Some kind of open | |
161 | self.dev.checked_cmd(0x03, 0x00, "gpioopen", [0x00], noret=True) | |
162 | self.__is_open = True | |
163 | ||
164 | def close(self): | |
165 | assert self.__is_open | |
166 | self.dev.checked_cmd(0x03, 0x01, "gpioclose", 0x00) | |
167 | self.__is_open = False | |
168 | ||
169 | def __cleanup(self): | |
170 | if self.__is_open: | |
171 | self.close() | |
172 | ||
173 | def __set_dir(self, direction): | |
174 | self.dev.checked_cmd(0x03, 0x04, "0304", [0x00, direction, 0x00, | |
175 | 0x00, 0x00]) | |
176 | ||
177 | def __set_value(self, value): | |
178 | self.dev.checked_cmd(0x03, 0x06, "0306", [0x00, value, 0x00, 0x00, | |
179 | 0x00],noret=True) | |
180 | ||
181 | def ice40SetReset(self, assert_reset): | |
182 | if assert_reset: | |
183 | self.__set_dir(1) | |
184 | self.__set_value(0) | |
185 | else: | |
186 | self.__set_dir(0) | |
187 | ||
188 | ||
189 | class __ICE40SPIPort(object): | |
190 | def __init__(self, dev, portno): | |
191 | self.dev = dev | |
192 | self.__portno = portno | |
193 | assert portno == 0x00 | |
194 | self.__is_open = False | |
195 | ||
196 | def __enter__(self): | |
197 | self.open() | |
198 | return self | |
199 | ||
200 | def __exit__(self, type, exc, traceback): | |
201 | self.__cleanup() | |
202 | ||
203 | def __del__(self): | |
204 | self.__cleanup() | |
205 | ||
206 | def __cleanup(self): | |
207 | if self.__is_open: | |
208 | self.close() | |
209 | ||
210 | def io(self, write_bytes, read_byte_count=0): | |
211 | assert self.__is_open | |
212 | write_bytes = list(write_bytes) | |
213 | ||
214 | # Pad write bytes to include 00's for readback | |
215 | if len(write_bytes) < read_byte_count: | |
216 | write_bytes.extend([0] * (read_byte_count - len(write_bytes))) | |
217 | ||
218 | write_byte_count = len(write_bytes) | |
219 | read_bytes = [] | |
220 | ||
221 | # probably assert nCS | |
222 | self.dev.checked_cmd(0x06, 0x06, "SPIStart", [0x00, 0x00]) | |
223 | ||
224 | ||
225 | # Start IO txn | |
226 | self.dev.checked_cmd(0x06, 0x07, "SPIIOStart", | |
227 | ||
228 | # the meaning of the first 3 bytes is unknown | |
229 | struct.pack("<BBBBL", 0x00, 0x00, 0x00, | |
230 | 0x01 if read_byte_count else 0x00, | |
231 | write_byte_count),noret=True) | |
232 | ||
233 | # Do the IO | |
234 | while write_bytes or len(read_bytes) < read_byte_count: | |
235 | if write_bytes: | |
236 | self.dev.ep_dataout.write(write_bytes[:64]) | |
237 | write_bytes = write_bytes[64:] | |
238 | ||
239 | if read_byte_count: | |
240 | to_read = min(64, read_byte_count) | |
241 | read_bytes.extend(self.dev.ep_datain.read(to_read)) | |
242 | ||
243 | ||
244 | ||
245 | # End IO txn | |
246 | status, resb =self.dev.cmd(0x06, 0x87,[0x00]) | |
247 | ||
248 | # status & 0x80 indicates presence of write size | |
249 | # status & 0x40 indicates presence of read size | |
250 | # validate values | |
251 | if status & 0x80: | |
252 | wb = struct.unpack('<L', resb[:4])[0] | |
253 | resb = resb[4:] | |
254 | #print (wb, write_byte_count) | |
255 | assert wb == write_byte_count | |
256 | ||
257 | if status & 0x40: | |
258 | rb = struct.unpack('<L', resb[:4])[0] | |
259 | resb = resb[4:] | |
260 | assert rb == read_byte_count | |
261 | ||
262 | # Clear CS | |
263 | self.dev.checked_cmd(0x06, 0x06, "0606", [0x00, 0x01]) | |
264 | ||
265 | return bytes(read_bytes) | |
266 | ||
267 | def open(self): | |
268 | assert not self.__is_open | |
269 | pl = self.dev.checked_cmd(0x06, 0x00, "SPIOpen", [self.__portno]) | |
270 | assert len(pl) == 0 | |
271 | self.__is_open = True | |
272 | ||
273 | def close(self): | |
274 | pl = self.dev.checked_cmd(0x06, 0x01, "SPIClose", [self.__portno]) | |
275 | assert len(pl) == 0 | |
276 | self.__is_open = False | |
277 | ||
278 | def setMode(self): | |
279 | """May be mode-setting. [0,2] causes bits to be returned shifted right | |
280 | one""" | |
281 | assert self.__is_open | |
282 | pl = self.dev.checked_cmd(0x06, 0x05, "SpiMode", [0,0]) | |
283 | assert len(pl) == 0 | |
284 | ||
285 | def setSpeed(self, speed): | |
286 | """ sets the desired speed for the SPI interface. Returns actual speed | |
287 | set""" | |
288 | pl = self.dev.checked_cmd(0x06, 0x03, "SPISpeed", b'\x00' + | |
289 | struct.pack("<L", speed)) | |
290 | assert self.__is_open | |
291 | return (struct.unpack("<L",pl),) | |
292 | ||
293 | ||
294 | def __init__(self): | |
295 | # find our self.device | |
296 | self.dev = usb.core.find(idVendor=0x1443, idProduct=0x0007) | |
297 | if self.dev is None: | |
298 | raise ValueError('Device not found') | |
299 | ||
300 | self.dev.set_configuration() | |
301 | ||
302 | # get an endpoint instance | |
303 | cfg = self.dev.get_active_configuration() | |
304 | intf = usb.util.find_descriptor(cfg) | |
305 | ||
306 | # Allocate and verify all the endpoints used for comms | |
307 | self.ep_cmdout = usb.util.find_descriptor(intf, bEndpointAddress = 1) | |
308 | self.ep_cmdin = usb.util.find_descriptor(intf, bEndpointAddress = 0x82) | |
309 | self.ep_dataout = usb.util.find_descriptor(intf, bEndpointAddress = 3) | |
310 | self.ep_datain = usb.util.find_descriptor(intf, bEndpointAddress = 0x84) | |
311 | ||
312 | assert self.ep_cmdout is not None | |
313 | assert self.ep_cmdin is not None | |
314 | assert self.ep_dataout is not None | |
315 | assert self.ep_datain is not None | |
316 | ||
317 | # Make sure we're talking to what we expect | |
318 | btype = self.get_board_type() | |
319 | assert btype == 'iCE40' | |
320 | ||
321 | def get_spi_port(self, pn): | |
322 | return self.__ICE40SPIPort(self, pn) | |
323 | ||
324 | def get_gpio(self): | |
325 | return self.__ICE40GPIO(self) | |
326 | ||
327 | def get_board_comm(self): | |
328 | return self.__ICE40BoardComm(self) | |
329 | ||
330 | def ctrl(self, selector, size_or_data, show=False): | |
331 | val = self.dev.ctrl_transfer(0xC0 if isinstance(size_or_data, int) else 0x04, | |
332 | selector, 0x00, 0x00, size_or_data) | |
333 | if show and isinstance(val, array.array): | |
334 | print("%2x < %s" % (selector, "".join("%02x" % i for i in val))) | |
335 | return bytes(val) | |
336 | ||
337 | def checked_cmd(self, cmd, subcmd, name, payload=[], ressize=16, show=False, | |
338 | noret=False): | |
339 | status, respl = self.cmd(cmd, subcmd, payload, ressize, show) | |
340 | if status != 0: | |
341 | raise SPIProtocolError(name, status) | |
342 | if noret: | |
343 | assert len(respl) == 0 | |
344 | return respl | |
345 | ||
346 | def cmd(self, cmd, subcmd, payload, ressize=16, show=False): | |
347 | res = self.cmd_i( | |
348 | struct.pack("<BB", cmd, subcmd) + bytes(payload), | |
349 | ressize) | |
350 | ||
351 | status = res[0] | |
352 | if show: | |
353 | print ("%02x:%02x (%s) < %02x:(%s)" % (cmd, subcmd, | |
354 | binascii.hexlify(bytes(payload)).decode('ascii'), | |
355 | status, | |
356 | binascii.hexlify(res[1:]).decode('ascii'))) | |
357 | return status, res[1:] | |
358 | ||
359 | def cmd_i(self, cmd_bytes, result_size, show=False): | |
360 | payload = struct.pack('<B', len(cmd_bytes)) + bytes(cmd_bytes) | |
361 | self.ep_cmdout.write(payload) | |
362 | res = bytes(self.ep_cmdin.read(result_size)) | |
363 | ||
364 | if show: | |
365 | print("%s:%s" % (binascii.hexlify(payload).decode('ascii'), | |
366 | binascii.hexlify(res).decode('ascii'))) | |
367 | assert res[0] == len(res[1:]) | |
368 | return res[1:] | |
369 | ||
370 | def get_board_type(self): | |
371 | btype = self.ctrl(0xE2, 16) | |
372 | return btype[:btype.index(b'\x00')].decode('ascii') | |
373 | ||
374 | def get_serial(self): | |
375 | return self.ctrl(0xE4, 16).decode('ascii') | |
376 |