Coverage for jsonurl_py.py: 99%

377 statements  

« prev     ^ index     » next       coverage.py v6.5.0, created at 2022-11-20 21:12 +0000

1#! /usr/bin/env python3 

2 

3""" 

4Python implementation of jsonurl, an alternative format for JSON data model 

5 

6See https://jsonurl.org/ and https://github.com/jsonurl/specification/ 

7""" 

8 

9__version__ = "0.4.0" 

10 

11import re 

12import sys 

13from dataclasses import dataclass 

14from typing import Any, Dict, List, Optional, Tuple, overload 

15from urllib.parse import quote_plus 

16 

17if sys.hexversion >= 0x030A0000: # pragma: no cover 

18 

19 def _dataclass_kwonly(*a, **kw): 

20 return dataclass(*a, **kw, kw_only=True) # type: ignore 

21 

22else: 

23 _dataclass_kwonly = dataclass 

24 

25 

26@_dataclass_kwonly 

27class CommonOpts: 

28 """ 

29 Common options for both `dumps` and `loads` 

30 """ 

31 

32 implied_list: bool = False 

33 """ 

34 Implied-Array mode: Omit parantheses and assume data is list 

35 

36 See `spec section 2.9.1 <https://github.com/jsonurl/specification/#291-implied-arrays>`_ 

37 """ 

38 

39 implied_dict: bool = False 

40 """ 

41 Implied-Object mode: Omit parantheses and assume data is dict 

42 

43 See `spec section 2.9.2 <https://github.com/jsonurl/specification/#292-implied-objects>`_ 

44 """ 

45 

46 distinguish_empty_list_dict: bool = False 

47 """Distinguish between empty list and empty dict 

48 

49 Use ``(:)`` for empty dict and ``()`` for empty list as per `spec section 2.9.5 

50 <https://github.com/jsonurl/specification/#295-empty-objects-and-arrays>`_ 

51 """ 

52 

53 aqf: bool = False 

54 """Address bar Query string Friendly 

55 

56 Use ``!`` quoting instead of ``'`` as per `spec section 2.9.6 

57 <https://github.com/jsonurl/specification/#296-address-bar-query-string-friendly>`_ 

58 """ 

59 

60 

61@_dataclass_kwonly 

62class DumpOpts(CommonOpts): 

63 """ 

64 Options for `jsonurl_py.dumps` 

65 """ 

66 

67 safe: str = "" 

68 """ 

69 Additional characters considered safe and not requiring percent-encoding. 

70 

71 This is similar to the safe argument to `urllib.parse.quote_plus`. 

72 

73 By default only the characters ``[a-zA-Z0-9_.-~]`` are safe. Additional 

74 characters that can be marked safe are: ``!$*/;?@``. In AQF mode the 

75 characters "'" can also be marked as safe. 

76 

77 In AQF mode the characters `!(),:` are considered safe by default so that 

78 output is readable but on input they are also recognized in encoded form. 

79 """ 

80 

81 

82def _dump_list_data(arg: Any, opts: DumpOpts) -> str: 

83 return ",".join(_dump_any(x, opts) for x in arg) 

84 

85 

86def _dump_dict_data(arg: Any, opts: DumpOpts) -> str: 

87 return ",".join( 

88 _dump_any(k, opts) + ":" + _dump_any(v, opts) for k, v in arg.items() 

89 ) 

90 

91 

92def _dump_str(arg: str, opts: DumpOpts) -> str: 

93 if opts.aqf: 

94 if arg == "true": 

95 return "!true" 

96 if arg == "false": 

97 return "!false" 

98 if arg == "null": 

99 return "!null" 

100 if arg == "": 

101 return "!e" 

102 if RE_NUMBER.match(arg): 

103 return "!" + arg 

104 return quote_plus(arg, safe=opts.safe + "(),:!").translate( 

105 { 

106 ord("!"): "!!", 

107 ord("("): "!(", 

108 ord(")"): "!)", 

109 ord(","): "!,", 

110 ord(":"): "!:", 

111 } 

112 ) 

113 else: 

114 if arg == "true": 

115 return "'true'" 

116 if arg == "false": 

117 return "'false'" 

118 if arg == "null": 

119 return "'null'" 

120 if arg == "": 

121 return "''" 

122 if RE_NUMBER.match(arg): 

123 return "'" + arg + "'" 

124 return quote_plus(arg, safe=opts.safe) 

125 

126 

127def _dump_any(arg: Any, opts: DumpOpts) -> str: 

128 if arg is True: 

129 return "true" 

130 if arg is False: 

131 return "false" 

132 if arg is None: 

133 return "null" 

134 if isinstance(arg, str): 

135 return _dump_str(arg, opts) 

136 if isinstance(arg, int): 

137 return str(arg) 

138 if isinstance(arg, float): 

139 return str(arg) 

140 if isinstance(arg, list): 

141 return "(" + _dump_list_data(arg, opts) + ")" 

142 if isinstance(arg, dict): 

143 if len(arg) == 0 and opts.distinguish_empty_list_dict: 

144 return "(:)" 

145 else: 

146 return "(" + _dump_dict_data(arg, opts) + ")" 

147 raise TypeError(f"Bad value {arg!r} of type {type(arg)}") 

148 

149 

150@overload 

151def dumps(arg: Any, opts: Optional[DumpOpts] = None) -> str: 

152 ... 

153 

154 

155@overload 

156def dumps( 

157 arg: Any, 

158 *, 

159 implied_list: bool = False, 

160 implied_dict: bool = False, 

161 aqf: bool = False, 

162 safe: str = "", 

163 distinguish_empty_list_dict: bool = False, 

164) -> str: 

165 ... 

166 

167 

168def dumps(arg: Any, opts=None, **kw) -> str: 

169 """ 

170 Convert a json object into a jsonurl string 

171 

172 Options can be passed as a `DumpOpts` object or as individual keyword arguments. 

173 """ 

174 if opts is None: 

175 opts = DumpOpts(**kw) 

176 elif kw: 

177 raise ValueError("Either opts or kw, not both") 

178 check_can_mark_safe(opts.safe, opts.aqf) 

179 

180 if opts.implied_dict: 

181 return _dump_dict_data(arg, opts) 

182 if opts.implied_list: 

183 return _dump_list_data(arg, opts) 

184 return _dump_any(arg, opts) 

185 

186 

187def check_can_mark_safe(safe: str, aqf=False): 

188 """Check if a string can be marked as safe for jsonurl""" 

189 for c in safe: 

190 if c in "!$*/;?@": 

191 continue 

192 if aqf and c == "'": 

193 continue 

194 raise ValueError(f"Can't mark character {c!r} as safe") 

195 

196 

197@_dataclass_kwonly 

198class LoadOpts(CommonOpts): 

199 """ 

200 Options for `loads` method 

201 """ 

202 

203 

204RE_NUMBER = re.compile(r"^-?\d+(?:\.\d+)?(?:[eE][-+]?\d+)?$") 

205RE_INT_NUMBER = re.compile(r"^-?\d+$") 

206 

207 

208class ParseError(Exception): 

209 pass 

210 

211 

212def _load_hexdigit(arg: str, pos: int) -> int: 

213 char = arg[pos] 

214 if char >= "0" and char <= "9": 

215 return ord(char) - ord("0") 

216 elif char >= "a" and char <= "f": 

217 return ord(char) - ord("a") + 10 

218 elif char >= "A" and char <= "F": 

219 return ord(char) - ord("A") + 10 

220 else: 

221 raise ParseError(f"Invalid hex digit {char!r} at pos {pos}") 

222 

223 

224def _load_percent(arg: str, pos: int) -> Tuple[str, int]: 

225 arr = [] 

226 while pos < len(arg) and arg[pos] == "%": 

227 if pos + 2 >= len(arg): 

228 raise ParseError(f"Unterminated percent at pos {pos}") 

229 arr.append(_load_hexdigit(arg, pos + 1) * 16 + _load_hexdigit(arg, pos + 2)) 

230 pos += 3 

231 return bytes(arr).decode("utf-8"), pos 

232 

233 

234_UNENCODED_CHAR_LIST = ( 

235 "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789-._~!$*/;?@" 

236) 

237 

238 

239def _is_unencoded(char: str) -> bool: 

240 """If one of the sharacters listed as "unencoded" in jsonurl spec""" 

241 return char in _UNENCODED_CHAR_LIST 

242 

243 

244_AQF_PARTIAL_DECODE_SET = set([ord("("), ord(")"), ord(","), ord(":"), ord("!")]) 

245 

246 

247def _partial_decode_aqf(arg: str) -> str: 

248 """Perform partial percent decoding for AQF 

249 

250 Affects (),:! 

251 

252 All the characters involved are ascii so they can't be part of a multi-byte 

253 character. Overlong encodings don't need handling either, they will reach 

254 _load_percent and be rejected by the python utf-8 decoder. 

255 

256 This is done so that the rest of the parser can check for structural 

257 characters without worrying about percent enconding. 

258 """ 

259 ret = "" 

260 spos = 0 

261 while True: 

262 epos = arg.find("%", spos) 

263 if epos == -1: 

264 return ret + arg[spos:] 

265 if epos + 2 >= len(arg): 

266 raise ParseError(f"Unterminated percent at pos {epos}") 

267 val = _load_hexdigit(arg, epos + 1) * 16 + _load_hexdigit(arg, epos + 2) 

268 if val in _AQF_PARTIAL_DECODE_SET: 

269 ret += arg[spos:epos] + chr(val) 

270 spos = epos + 3 

271 else: 

272 ret += arg[spos : epos + 3] 

273 spos = epos + 3 

274 

275 

276def _unquote_aqf(arg: str) -> str: 

277 ret = "" 

278 spos = 0 

279 while True: 

280 epos = arg.find("!", spos) 

281 if epos == -1: 

282 return ret + arg[spos:] 

283 if epos == len(arg) - 1: 

284 raise ParseError(f"Invalid trailing ! in atom {arg!r}") 

285 eval = arg[epos + 1] 

286 if eval in "():,0123456789+-!fnt": 

287 ret += arg[spos:epos] + eval 

288 spos = epos + 2 

289 else: 

290 raise ParseError(f"Invalid !-escaped char {hex(ord(eval))}") 

291 

292 

293def _convert_unquoted_atom(arg: Optional[str], decstr: str, opts: LoadOpts) -> Any: 

294 if arg is not None: 

295 if arg == "null": 

296 return None 

297 if arg == "true": 

298 return True 

299 if arg == "false": 

300 return False 

301 if re.match(RE_NUMBER, arg): 

302 if re.match(RE_INT_NUMBER, arg): 

303 return int(arg) 

304 else: 

305 return float(arg) 

306 if opts.aqf: 

307 if decstr == "!e": 

308 return "" 

309 else: 

310 return _unquote_aqf(decstr) 

311 else: 

312 return decstr 

313 

314 

315def _load_qstr(arg: str, pos: int) -> Tuple[str, int]: 

316 """Parse a quoted string until the closing '""" 

317 ret = "" 

318 while True: 

319 if pos == len(arg): 

320 raise ParseError(f"Unterminated quoted string") 

321 char = arg[pos] 

322 if char == "%": 

323 enc, pos = _load_percent(arg, pos) 

324 ret += enc 

325 elif char == "+": 

326 ret += " " 

327 pos += 1 

328 elif char == "'": 

329 return ret, pos + 1 

330 elif _is_unencoded(char) or char in "(,:)": 

331 ret += char 

332 pos += 1 

333 else: 

334 raise ParseError(f"Unexpected char {char!r} in quoted string at pos {pos}") 

335 

336 

337def _load_atom(arg: str, pos: int, opts: LoadOpts) -> Tuple[Any, int]: 

338 """Parse an atom: string, int, bool, null""" 

339 # on-the-fly decoding into ret 

340 ret = "" 

341 # raw contains the string without decoding to check for unquoted atoms. 

342 raw: Optional[str] = "" 

343 if pos == len(arg): 

344 raise ParseError(f"Unexpected empty value at pos {pos}") 

345 char = arg[pos] 

346 if char == "'" and not opts.aqf: 

347 return _load_qstr(arg, pos + 1) 

348 while True: 

349 if pos == len(arg): 

350 # We know string is not empty because we checked it before aposthrophe 

351 assert len(ret) 

352 return _convert_unquoted_atom(raw, ret, opts), pos 

353 char = arg[pos] 

354 if char == "%": 

355 enc, pos = _load_percent(arg, pos) 

356 ret += enc 

357 # no unquoted atom contains a percent 

358 raw = None 

359 continue 

360 elif char == "+": 

361 ret += " " 

362 if raw is not None: 

363 raw += "+" 

364 pos += 1 

365 continue 

366 elif opts.aqf and char == "!": 

367 ret += char 

368 if raw is not None: 

369 raw += char 

370 pos += 1 

371 if pos < len(arg): 

372 char = arg[pos] 

373 if char in "(),:!": 

374 ret += char 

375 if raw is not None: 

376 raw += char 

377 pos += 1 

378 elif _is_unencoded(char) or char == "'": 

379 ret += char 

380 if raw is not None: 

381 raw += char 

382 pos += 1 

383 else: 

384 if len(ret) == 0: 

385 raise ParseError(f"Unexpected empty value at pos {pos}") 

386 return _convert_unquoted_atom(raw, ret, opts), pos 

387 

388 

389def _load_list_data(arg: str, pos: int, opts: LoadOpts) -> list: 

390 """Parse a list. pos points after the first item""" 

391 ret: List[Any] = [] 

392 if pos == len(arg): 

393 return ret 

394 while True: 

395 item, pos = _load_any(arg, pos, opts) 

396 ret.append(item) 

397 if pos == len(arg): 

398 return ret 

399 char = arg[pos] 

400 if char == ",": 

401 pos += 1 

402 continue 

403 raise ParseError(f"Unexpected char {char!r} at pos {pos} in list") 

404 

405 

406def _load_list( 

407 arg: str, pos: int, first_element: Any, opts: LoadOpts 

408) -> Tuple[list, int]: 

409 """Parse a list. pos points after the first item""" 

410 ret = [first_element] 

411 while True: 

412 if pos == len(arg): 

413 raise ParseError(f"Unterminated list") 

414 char = arg[pos] 

415 if char == ")": 

416 return ret, pos + 1 

417 if char == ",": 

418 pos += 1 

419 item, pos = _load_any(arg, pos, opts) 

420 ret.append(item) 

421 continue 

422 raise ParseError(f"Unexpected char {char!r} at pos {pos} in list") 

423 

424 

425def _load_dict(arg: str, pos: int, first_key: Any, opts: LoadOpts) -> Tuple[dict, int]: 

426 first_val, pos = _load_any(arg, pos, opts) 

427 ret = {first_key: first_val} 

428 while True: 

429 if pos == len(arg): 

430 raise ParseError(f"Unterminated dict") 

431 char = arg[pos] 

432 if char == ")": 

433 return ret, pos + 1 

434 if char == ",": 

435 pos += 1 

436 key, pos = _load_atom(arg, pos, opts) 

437 if pos == len(arg): 

438 raise ParseError(f"Unterminated dict, missing value") 

439 char = arg[pos] 

440 if char != ":": 

441 raise ParseError(f"Unexpected char {char!r} at pos {pos}, expected :") 

442 pos += 1 

443 val, pos = _load_any(arg, pos, opts) 

444 ret[key] = val 

445 

446 

447def _load_comp(arg: str, pos: int, opts: LoadOpts) -> Tuple[Any, int]: 

448 """Parse a composite: list or dict""" 

449 val, pos = _load_atom(arg, pos, opts) 

450 if pos == len(arg): 

451 raise ParseError("Unterminated composite") 

452 char = arg[pos] 

453 if char == ":": 

454 pos += 1 

455 return _load_dict(arg, pos, val, opts) 

456 if char == ",": 

457 return _load_list(arg, pos, val, opts) 

458 if char == ")": 

459 return _load_list(arg, pos, val, opts) 

460 raise ParseError(f"Unexpected char {char} at pos {pos}, expected , or :") 

461 

462 

463def _load_any(arg: str, pos: int, opts: LoadOpts) -> Tuple[Any, int]: 

464 if pos == len(arg): 

465 raise ParseError(f"Unexpected end of input") 

466 if arg[pos] == "(": 

467 pos += 1 

468 if pos == len(arg): 

469 raise ParseError("Unterminated composite, expected value") 

470 char = arg[pos] 

471 if char == "(": 

472 first_val, pos = _load_any(arg, pos, opts) 

473 return _load_list(arg, pos, first_val, opts) 

474 if opts.distinguish_empty_list_dict and char == ":": 

475 pos += 1 

476 if pos == len(arg): 

477 raise ParseError("Unterminated empty composite, expected )") 

478 char = arg[pos] 

479 if char == ")": 

480 return {}, pos + 1 

481 else: 

482 raise ParseError("Unterminated empty composite, expected )") 

483 if char == ")": 

484 if opts.distinguish_empty_list_dict: 

485 return [], pos + 1 

486 else: 

487 return {}, pos + 1 

488 return _load_comp(arg, pos, opts) 

489 else: 

490 return _load_atom(arg, pos, opts) 

491 

492 

493def _load_top(arg: str, pos: int, opts: LoadOpts) -> Any: 

494 ret, pos = _load_any(arg, pos, opts) 

495 if pos != len(arg): 

496 char = arg[pos] 

497 raise ParseError(f"Expected end of input at {pos}, got {char!r}") 

498 return ret 

499 

500 

501def _load_dict_data(arg: str, pos: int, opts: LoadOpts) -> dict: 

502 ret: Dict[str, Any] = {} 

503 if pos == len(arg): 

504 return ret 

505 while True: 

506 key, pos = _load_atom(arg, pos, opts) 

507 if pos == len(arg): 

508 raise ParseError(f"Unterminated dict, missing value") 

509 char = arg[pos] 

510 if char != ":": 

511 raise ParseError(f"Unexpected char {char!r} at pos {pos}, expected :") 

512 pos += 1 

513 val, pos = _load_any(arg, pos, opts) 

514 ret[key] = val 

515 if pos == len(arg): 

516 return ret 

517 char = arg[pos] 

518 if char != ",": 

519 raise ParseError( 

520 f"Unexpected char {char!r} at pos {pos}, expected , or end of input" 

521 ) 

522 pos += 1 

523 

524 

525@overload 

526def loads(arg: str, opts: Optional[LoadOpts] = None) -> Any: 

527 ... 

528 

529 

530@overload 

531def loads( 

532 arg: str, 

533 *, 

534 implied_dict: bool = False, 

535 implied_list: bool = False, 

536 aqf: bool = False, 

537 distinguish_empty_list_dict: bool = False, 

538) -> Any: 

539 ... 

540 

541 

542def loads(arg: str, opts=None, **kw) -> Any: 

543 """ 

544 Convert a json object into a jsonurl string 

545 

546 Options can be passed as a `LoadOpts` object or as individual keyword arguments. 

547 """ 

548 if opts is None: 

549 opts = LoadOpts(**kw) 

550 elif kw: 

551 raise ValueError("Either opts or kw, not both") 

552 

553 if opts.aqf: 

554 arg = _partial_decode_aqf(arg) 

555 if opts.implied_dict: 

556 return _load_dict_data(arg, 0, opts) 

557 if opts.implied_list: 

558 return _load_list_data(arg, 0, opts) 

559 return _load_top(arg, 0, opts) 

560 

561 

562def _add_common_args(parser): 

563 parser.add_argument( 

564 "-l", 

565 "--implied-list", 

566 action="store_true", 

567 help="Implied Array mode", 

568 ) 

569 parser.add_argument( 

570 "-d", 

571 "--implied-dict", 

572 action="store_true", 

573 help="Implied Object mode", 

574 ) 

575 parser.add_argument( 

576 "-a", 

577 "--address-query-friendly", 

578 dest="aqf", 

579 action="store_true", 

580 help="Address Bar Query String Friendly mode", 

581 ) 

582 

583 

584def create_parser(): 

585 from argparse import ArgumentParser 

586 

587 parser = ArgumentParser(description=__doc__, prog="jsonurl-py") 

588 subtop = parser.add_subparsers(dest="subcmd", metavar="SUBCMD", required=True) 

589 

590 sub = subtop.add_parser("load", help="Parse JSONURL input and output JSON") 

591 _add_common_args(sub) 

592 sub.add_argument("--indent", type=int, help="Output indent spaces per level") 

593 

594 sub = subtop.add_parser("dump", help="Parse JSON input and output JSONURL") 

595 _add_common_args(sub) 

596 

597 return parser 

598 

599 

600def main(argv=None): 

601 import json 

602 

603 common_keys = ["implied_list", "implied_dict", "aqf"] 

604 opts = create_parser().parse_args(argv) 

605 if opts.subcmd == "load": 

606 load_opts = LoadOpts(**{k: getattr(opts, k) for k in common_keys}) 

607 input = sys.stdin.read().rstrip("\n") 

608 data = loads(input, load_opts) 

609 sys.stdout.write(json.dumps(data, indent=opts.indent) + "\n") 

610 elif opts.subcmd == "dump": 

611 dump_opts = DumpOpts(**{k: getattr(opts, k) for k in common_keys}) 

612 input = sys.stdin.read() 

613 data = json.loads(input) 

614 sys.stdout.write(dumps(data, dump_opts) + "\n") 

615 else: # pragma: no cover 

616 raise ValueError(f"Unhandled subcmd {opts.subcmd}") 

617 

618 

619if __name__ == "__main__": 

620 main()