Add diagrams and pictures
[clump.git] / libiceblink.py
CommitLineData
a051754e
MG
1import usb.core
2import usb.util
3import array
4import struct
5import binascii
6import time
7
8class ProtocolError(Exception):
9 pass
10
11class SPIProtocolError(Exception):
12 def __init__(self, cmd, rescode):
13 lut = {
14 3: "Resource in use",
15 4: "Resource not opened",
16 12: "Invalid enum"
17 }
18 if rescode in lut:
19 err = lut[rescode]
20 else:
21 err = "error code %d" % rescode
22
23 ProtocolError.__init__(self, "Command %s failed with error: %s" %( cmd,
24 err))
25
26class M25P10(object):
27 STAT_BUSY = 0x1
28 STAT_WEL = 0x2
29
30 CMD_GET_STATUS = 0x05
31 CMD_WRITE_ENABLE = 0x6
32 CMD_READ_ID = 0x9F
33 CMD_WAKEUP = 0xAB
34 CMD_CHIP_ERASE = 0xC7
35 CMD_PAGE_PROGRAM = 0x02
36 CMD_FAST_READ = 0xB
37
38 def __init__(self, iofn):
39 self.io = iofn
40
41 def wakeup(self):
42 self.io([self.CMD_WAKEUP])
43
44 def setWritable(self):
45 self.io([self.CMD_WRITE_ENABLE])
46
47 def chipErase(self):
48 self.setWritable()
49 self.io([self.CMD_CHIP_ERASE])
50 self.waitDone()
51
52 def read(self, addr, size):
53 return self.io([self.CMD_FAST_READ, (addr>>16) & 0xFF, (addr>>8)&0xFF, addr &
54 0xFF, 0x00], size+5)[5:]
55 def pageProgram(self, addr, buf):
56 self.setWritable()
57 assert len(buf) <= 256
58 assert addr & 0xFF == 0
59
60 self.io([self.CMD_PAGE_PROGRAM, (addr>>16) & 0xFF, (addr>>8)&0xFF, addr &
61 0xFF] + list(buf))
62 self.waitDone()
63
64 def waitDone(self):
65 while self.getStatus() & self.STAT_BUSY:
66 pass
67
68 def getStatus(self):
69 return self.io([self.CMD_GET_STATUS],2)[1]
70
71 def getID(self):
72 return self.io([self.CMD_READ_ID],4)[1:]
73
74
75class ICE40Board(object):
76
77 CMD_GET_BOARD_TYPE = 0xE2
78 CMD_GET_BOARD_SERIAL = 0xE4
79
80 class __ICE40BoardComm(object):
81 def __init__(self, dev):
82 self.__is_open = False
83 self.dev = dev
84
85 def __enter__(self):
86 self.open()
87 return self
88
89 def __exit__(self, type, err, traceback):
90 self.__cleanup()
91
92 def __del__(self):
93 self.__cleanup()
94
95 def __cleanup(self):
96 if self.__is_open:
97 self.close()
98
99 def open(self):
100 assert not self.__is_open
101 self.dev.checked_cmd(0x04, 0x00, "bcommopen", [0x00], noret=True)
102 self.__is_open = True
103
104 def close(self):
105 assert self.__is_open
106 self.dev.checked_cmd(0x04, 0x01, "bcommclose", [0x00], noret=True,
107 )
108 self.__is_open = False
109
110 def __check_counts(self, status, resb, wr, rd):
111 if status & 0x80:
112 wb = struct.unpack('<L', resb[:4])[0]
113 resb = resb[4:]
114 #print (wb, write_byte_count)
115 assert wb == wr
116
117 if status & 0x40:
118 rb = struct.unpack('<L', resb[:4])[0]
119 resb = resb[4:]
120 assert rb == rd
121
122 def readReg(self, regno):
123 self.dev.checked_cmd(0x04, 0x05, "bcommsetval", [0x00, regno,
124 0x01, 0x00, 0x00, 0x0])
125 res = self.dev.ep_datain.read(1)
126 status, pl = self.dev.cmd(0x04, 0x85, [0x00])
127 self.__check_counts(status, pl, 0, 1)
128 return res[0]
129
130 def writeReg(self, regno, value):
131 self.dev.checked_cmd(0x04, 0x04, "bcommsetval", [0x00, regno,
132 0x01, 0x00, 0x00, 0x0])
133 self.dev.ep_dataout.write([value])
134 status, pl = self.dev.cmd(0x04, 0x85, [0x00])
135 self.__check_counts(status, pl, 1, 0)
136
137 def readMulti(self, addrs):
138 pass
139
140 def writeMulti(self, addrvals):
141 pass
142
143 class __ICE40GPIO(object):
144 def __init__(self, dev):
145 self.__is_open = False
146 self.dev = dev
147
148 def __enter__(self):
149 self.open()
150 return self
151
152 def __exit__(self, type, err, traceback):
153 self.__cleanup()
154
155 def __del__(self):
156 self.__cleanup()
157
158 def open(self):
159 assert not self.__is_open
160 # Some kind of open
161 self.dev.checked_cmd(0x03, 0x00, "gpioopen", [0x00], noret=True)
162 self.__is_open = True
163
164 def close(self):
165 assert self.__is_open
166 self.dev.checked_cmd(0x03, 0x01, "gpioclose", 0x00)
167 self.__is_open = False
168
169 def __cleanup(self):
170 if self.__is_open:
171 self.close()
172
173 def __set_dir(self, direction):
174 self.dev.checked_cmd(0x03, 0x04, "0304", [0x00, direction, 0x00,
175 0x00, 0x00])
176
177 def __set_value(self, value):
178 self.dev.checked_cmd(0x03, 0x06, "0306", [0x00, value, 0x00, 0x00,
179 0x00],noret=True)
180
181 def ice40SetReset(self, assert_reset):
182 if assert_reset:
183 self.__set_dir(1)
184 self.__set_value(0)
185 else:
186 self.__set_dir(0)
187
188
189 class __ICE40SPIPort(object):
190 def __init__(self, dev, portno):
191 self.dev = dev
192 self.__portno = portno
193 assert portno == 0x00
194 self.__is_open = False
195
196 def __enter__(self):
197 self.open()
198 return self
199
200 def __exit__(self, type, exc, traceback):
201 self.__cleanup()
202
203 def __del__(self):
204 self.__cleanup()
205
206 def __cleanup(self):
207 if self.__is_open:
208 self.close()
209
210 def io(self, write_bytes, read_byte_count=0):
211 assert self.__is_open
212 write_bytes = list(write_bytes)
213
214 # Pad write bytes to include 00's for readback
215 if len(write_bytes) < read_byte_count:
216 write_bytes.extend([0] * (read_byte_count - len(write_bytes)))
217
218 write_byte_count = len(write_bytes)
219 read_bytes = []
220
221 # probably assert nCS
222 self.dev.checked_cmd(0x06, 0x06, "SPIStart", [0x00, 0x00])
223
224
225 # Start IO txn
226 self.dev.checked_cmd(0x06, 0x07, "SPIIOStart",
227
228 # the meaning of the first 3 bytes is unknown
229 struct.pack("<BBBBL", 0x00, 0x00, 0x00,
230 0x01 if read_byte_count else 0x00,
231 write_byte_count),noret=True)
232
233 # Do the IO
234 while write_bytes or len(read_bytes) < read_byte_count:
235 if write_bytes:
236 self.dev.ep_dataout.write(write_bytes[:64])
237 write_bytes = write_bytes[64:]
238
239 if read_byte_count:
240 to_read = min(64, read_byte_count)
241 read_bytes.extend(self.dev.ep_datain.read(to_read))
242
243
244
245 # End IO txn
246 status, resb =self.dev.cmd(0x06, 0x87,[0x00])
247
248 # status & 0x80 indicates presence of write size
249 # status & 0x40 indicates presence of read size
250 # validate values
251 if status & 0x80:
252 wb = struct.unpack('<L', resb[:4])[0]
253 resb = resb[4:]
254 #print (wb, write_byte_count)
255 assert wb == write_byte_count
256
257 if status & 0x40:
258 rb = struct.unpack('<L', resb[:4])[0]
259 resb = resb[4:]
260 assert rb == read_byte_count
261
262 # Clear CS
263 self.dev.checked_cmd(0x06, 0x06, "0606", [0x00, 0x01])
264
265 return bytes(read_bytes)
266
267 def open(self):
268 assert not self.__is_open
269 pl = self.dev.checked_cmd(0x06, 0x00, "SPIOpen", [self.__portno])
270 assert len(pl) == 0
271 self.__is_open = True
272
273 def close(self):
274 pl = self.dev.checked_cmd(0x06, 0x01, "SPIClose", [self.__portno])
275 assert len(pl) == 0
276 self.__is_open = False
277
278 def setMode(self):
279 """May be mode-setting. [0,2] causes bits to be returned shifted right
280 one"""
281 assert self.__is_open
282 pl = self.dev.checked_cmd(0x06, 0x05, "SpiMode", [0,0])
283 assert len(pl) == 0
284
285 def setSpeed(self, speed):
286 """ sets the desired speed for the SPI interface. Returns actual speed
287 set"""
288 pl = self.dev.checked_cmd(0x06, 0x03, "SPISpeed", b'\x00' +
289 struct.pack("<L", speed))
290 assert self.__is_open
291 return (struct.unpack("<L",pl),)
292
293
294 def __init__(self):
295 # find our self.device
296 self.dev = usb.core.find(idVendor=0x1443, idProduct=0x0007)
297 if self.dev is None:
298 raise ValueError('Device not found')
299
300 self.dev.set_configuration()
301
302 # get an endpoint instance
303 cfg = self.dev.get_active_configuration()
304 intf = usb.util.find_descriptor(cfg)
305
306 # Allocate and verify all the endpoints used for comms
307 self.ep_cmdout = usb.util.find_descriptor(intf, bEndpointAddress = 1)
308 self.ep_cmdin = usb.util.find_descriptor(intf, bEndpointAddress = 0x82)
309 self.ep_dataout = usb.util.find_descriptor(intf, bEndpointAddress = 3)
310 self.ep_datain = usb.util.find_descriptor(intf, bEndpointAddress = 0x84)
311
312 assert self.ep_cmdout is not None
313 assert self.ep_cmdin is not None
314 assert self.ep_dataout is not None
315 assert self.ep_datain is not None
316
317 # Make sure we're talking to what we expect
318 btype = self.get_board_type()
319 assert btype == 'iCE40'
320
321 def get_spi_port(self, pn):
322 return self.__ICE40SPIPort(self, pn)
323
324 def get_gpio(self):
325 return self.__ICE40GPIO(self)
326
327 def get_board_comm(self):
328 return self.__ICE40BoardComm(self)
329
330 def ctrl(self, selector, size_or_data, show=False):
331 val = self.dev.ctrl_transfer(0xC0 if isinstance(size_or_data, int) else 0x04,
332 selector, 0x00, 0x00, size_or_data)
333 if show and isinstance(val, array.array):
334 print("%2x < %s" % (selector, "".join("%02x" % i for i in val)))
335 return bytes(val)
336
337 def checked_cmd(self, cmd, subcmd, name, payload=[], ressize=16, show=False,
338 noret=False):
339 status, respl = self.cmd(cmd, subcmd, payload, ressize, show)
340 if status != 0:
341 raise SPIProtocolError(name, status)
342 if noret:
343 assert len(respl) == 0
344 return respl
345
346 def cmd(self, cmd, subcmd, payload, ressize=16, show=False):
347 res = self.cmd_i(
348 struct.pack("<BB", cmd, subcmd) + bytes(payload),
349 ressize)
350
351 status = res[0]
352 if show:
353 print ("%02x:%02x (%s) < %02x:(%s)" % (cmd, subcmd,
354 binascii.hexlify(bytes(payload)).decode('ascii'),
355 status,
356 binascii.hexlify(res[1:]).decode('ascii')))
357 return status, res[1:]
358
359 def cmd_i(self, cmd_bytes, result_size, show=False):
360 payload = struct.pack('<B', len(cmd_bytes)) + bytes(cmd_bytes)
361 self.ep_cmdout.write(payload)
362 res = bytes(self.ep_cmdin.read(result_size))
363
364 if show:
365 print("%s:%s" % (binascii.hexlify(payload).decode('ascii'),
366 binascii.hexlify(res).decode('ascii')))
367 assert res[0] == len(res[1:])
368 return res[1:]
369
370 def get_board_type(self):
371 btype = self.ctrl(0xE2, 16)
372 return btype[:btype.index(b'\x00')].decode('ascii')
373
374 def get_serial(self):
375 return self.ctrl(0xE4, 16).decode('ascii')
376
This page took 0.028879 seconds and 4 git commands to generate.