Coverage for jsonurl_py.py: 99%

376 statements  

« prev     ^ index     » next       coverage.py v7.6.8, created at 2024-12-06 11:15 +0000

1#! /usr/bin/env python3 

2 

3""" 

4Python implementation of jsonurl, an alternative format for JSON data model 

5 

6See https://jsonurl.org/ and https://github.com/jsonurl/specification/ 

7""" 

8 

9__version__ = "0.4.0" 

10 

11import re 

12import sys 

13from dataclasses import dataclass 

14from typing import Any, Dict, List, Optional, Tuple, overload 

15from urllib.parse import quote_plus 

16 

17if sys.hexversion >= 0x030A0000: # pragma: no cover 

18 

19 def _dataclass_kwonly(*a, **kw): 

20 return dataclass(*a, **kw, kw_only=True) # type: ignore 

21 

22else: 

23 _dataclass_kwonly = dataclass 

24 

25 

26@_dataclass_kwonly 

27class CommonOpts: 

28 """ 

29 Common options for both `dumps` and `loads` 

30 """ 

31 

32 implied_list: bool = False 

33 """ 

34 Implied-Array mode: Omit parantheses and assume data is list 

35 

36 See `spec section 2.9.1 <https://github.com/jsonurl/specification/#291-implied-arrays>`_ 

37 """ 

38 

39 implied_dict: bool = False 

40 """ 

41 Implied-Object mode: Omit parantheses and assume data is dict 

42 

43 See `spec section 2.9.2 <https://github.com/jsonurl/specification/#292-implied-objects>`_ 

44 """ 

45 

46 distinguish_empty_list_dict: bool = False 

47 """Distinguish between empty list and empty dict 

48 

49 Use ``(:)`` for empty dict and ``()`` for empty list as per `spec section 2.9.5 

50 <https://github.com/jsonurl/specification/#295-empty-objects-and-arrays>`_ 

51 """ 

52 

53 aqf: bool = False 

54 """Address bar Query string Friendly 

55 

56 Use ``!`` quoting instead of ``'`` as per `spec section 2.9.6 

57 <https://github.com/jsonurl/specification/#296-address-bar-query-string-friendly>`_ 

58 """ 

59 

60 

61@_dataclass_kwonly 

62class DumpOpts(CommonOpts): 

63 """ 

64 Options for `jsonurl_py.dumps` 

65 """ 

66 

67 safe: str = "" 

68 """ 

69 Additional characters considered safe and not requiring percent-encoding. 

70 

71 This is similar to the safe argument to `urllib.parse.quote_plus`. 

72 

73 By default only the characters ``[a-zA-Z0-9_.-~]`` are safe. Additional 

74 characters that can be marked safe are: ``!$*/;?@``. In AQF mode the 

75 characters "'" can also be marked as safe. 

76 

77 In AQF mode the characters `!(),:` are considered safe by default so that 

78 output is readable but on input they are also recognized in encoded form. 

79 """ 

80 

81 

82def _dump_list_data(arg: Any, opts: DumpOpts) -> str: 

83 return ",".join(_dump_any(x, opts) for x in arg) 

84 

85 

86def _dump_dict_data(arg: Any, opts: DumpOpts) -> str: 

87 return ",".join( 

88 _dump_any(k, opts) + ":" + _dump_any(v, opts) for k, v in arg.items() 

89 ) 

90 

91 

92def _dump_str(arg: str, opts: DumpOpts) -> str: 

93 if opts.aqf: 

94 if arg == "true": 

95 return "!true" 

96 if arg == "false": 

97 return "!false" 

98 if arg == "null": 

99 return "!null" 

100 if arg == "": 

101 return "!e" 

102 if RE_NUMBER.match(arg): 

103 return "!" + arg 

104 return quote_plus(arg, safe=opts.safe + "(),:!").translate( 

105 { 

106 ord("!"): "!!", 

107 ord("("): "!(", 

108 ord(")"): "!)", 

109 ord(","): "!,", 

110 ord(":"): "!:", 

111 } 

112 ) 

113 else: 

114 if arg == "true": 

115 return "'true'" 

116 if arg == "false": 

117 return "'false'" 

118 if arg == "null": 

119 return "'null'" 

120 if arg == "": 

121 return "''" 

122 if RE_NUMBER.match(arg): 

123 return "'" + arg + "'" 

124 return quote_plus(arg, safe=opts.safe) 

125 

126 

127def _dump_any(arg: Any, opts: DumpOpts) -> str: 

128 if arg is True: 

129 return "true" 

130 if arg is False: 

131 return "false" 

132 if arg is None: 

133 return "null" 

134 if isinstance(arg, str): 

135 return _dump_str(arg, opts) 

136 if isinstance(arg, int): 

137 return str(arg) 

138 if isinstance(arg, float): 

139 return str(arg) 

140 if isinstance(arg, list): 

141 return "(" + _dump_list_data(arg, opts) + ")" 

142 if isinstance(arg, dict): 

143 if len(arg) == 0 and opts.distinguish_empty_list_dict: 

144 return "(:)" 

145 else: 

146 return "(" + _dump_dict_data(arg, opts) + ")" 

147 raise TypeError(f"Bad value {arg!r} of type {type(arg)}") 

148 

149 

150@overload 

151def dumps(arg: Any, opts: Optional[DumpOpts] = None) -> str: ... 

152 

153 

154@overload 

155def dumps( 

156 arg: Any, 

157 *, 

158 implied_list: bool = False, 

159 implied_dict: bool = False, 

160 aqf: bool = False, 

161 safe: str = "", 

162 distinguish_empty_list_dict: bool = False, 

163) -> str: ... 

164 

165 

166def dumps(arg: Any, opts=None, **kw) -> str: 

167 """ 

168 Convert a json object into a jsonurl string 

169 

170 Options can be passed as a `DumpOpts` object or as individual keyword arguments. 

171 """ 

172 if opts is None: 

173 opts = DumpOpts(**kw) 

174 elif kw: 

175 raise ValueError("Either opts or kw, not both") 

176 check_can_mark_safe(opts.safe, opts.aqf) 

177 

178 if opts.implied_dict: 

179 return _dump_dict_data(arg, opts) 

180 if opts.implied_list: 

181 return _dump_list_data(arg, opts) 

182 return _dump_any(arg, opts) 

183 

184 

185def check_can_mark_safe(safe: str, aqf=False): 

186 """Check if a string can be marked as safe for jsonurl""" 

187 for c in safe: 

188 if c in "!$*/;?@": 

189 continue 

190 if aqf and c == "'": 

191 continue 

192 raise ValueError(f"Can't mark character {c!r} as safe") 

193 

194 

195@_dataclass_kwonly 

196class LoadOpts(CommonOpts): 

197 """ 

198 Options for `loads` method 

199 """ 

200 

201 

202RE_NUMBER = re.compile(r"^-?\d+(?:\.\d+)?(?:[eE][-+]?\d+)?$") 

203RE_INT_NUMBER = re.compile(r"^-?\d+$") 

204 

205 

206class ParseError(Exception): 

207 pass 

208 

209 

210def _load_hexdigit(arg: str, pos: int) -> int: 

211 char = arg[pos] 

212 if char >= "0" and char <= "9": 

213 return ord(char) - ord("0") 

214 elif char >= "a" and char <= "f": 

215 return ord(char) - ord("a") + 10 

216 elif char >= "A" and char <= "F": 

217 return ord(char) - ord("A") + 10 

218 else: 

219 raise ParseError(f"Invalid hex digit {char!r} at pos {pos}") 

220 

221 

222def _load_percent(arg: str, pos: int) -> Tuple[str, int]: 

223 arr = [] 

224 while pos < len(arg) and arg[pos] == "%": 

225 if pos + 2 >= len(arg): 

226 raise ParseError(f"Unterminated percent at pos {pos}") 

227 arr.append(_load_hexdigit(arg, pos + 1) * 16 + _load_hexdigit(arg, pos + 2)) 

228 pos += 3 

229 return bytes(arr).decode("utf-8"), pos 

230 

231 

232_UNENCODED_CHAR_LIST = ( 

233 "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789-._~!$*/;?@" 

234) 

235 

236 

237def _is_unencoded(char: str) -> bool: 

238 """If one of the sharacters listed as "unencoded" in jsonurl spec""" 

239 return char in _UNENCODED_CHAR_LIST 

240 

241 

242_AQF_PARTIAL_DECODE_SET = set([ord("("), ord(")"), ord(","), ord(":"), ord("!")]) 

243 

244 

245def _partial_decode_aqf(arg: str) -> str: 

246 """Perform partial percent decoding for AQF 

247 

248 Affects (),:! 

249 

250 All the characters involved are ascii so they can't be part of a multi-byte 

251 character. Overlong encodings don't need handling either, they will reach 

252 _load_percent and be rejected by the python utf-8 decoder. 

253 

254 This is done so that the rest of the parser can check for structural 

255 characters without worrying about percent enconding. 

256 """ 

257 ret = "" 

258 spos = 0 

259 while True: 

260 epos = arg.find("%", spos) 

261 if epos == -1: 

262 return ret + arg[spos:] 

263 if epos + 2 >= len(arg): 

264 raise ParseError(f"Unterminated percent at pos {epos}") 

265 val = _load_hexdigit(arg, epos + 1) * 16 + _load_hexdigit(arg, epos + 2) 

266 if val in _AQF_PARTIAL_DECODE_SET: 

267 ret += arg[spos:epos] + chr(val) 

268 spos = epos + 3 

269 else: 

270 ret += arg[spos : epos + 3] 

271 spos = epos + 3 

272 

273 

274def _unquote_aqf(arg: str) -> str: 

275 ret = "" 

276 spos = 0 

277 while True: 

278 epos = arg.find("!", spos) 

279 if epos == -1: 

280 return ret + arg[spos:] 

281 if epos == len(arg) - 1: 

282 raise ParseError(f"Invalid trailing ! in atom {arg!r}") 

283 eval = arg[epos + 1] 

284 if eval in "():,0123456789+-!fnt": 

285 ret += arg[spos:epos] + eval 

286 spos = epos + 2 

287 else: 

288 raise ParseError(f"Invalid !-escaped char {hex(ord(eval))}") 

289 

290 

291def _convert_unquoted_atom(arg: Optional[str], decstr: str, opts: LoadOpts) -> Any: 

292 if arg is not None: 

293 if arg == "null": 

294 return None 

295 if arg == "true": 

296 return True 

297 if arg == "false": 

298 return False 

299 if re.match(RE_NUMBER, arg): 

300 if re.match(RE_INT_NUMBER, arg): 

301 return int(arg) 

302 else: 

303 return float(arg) 

304 if opts.aqf: 

305 if decstr == "!e": 

306 return "" 

307 else: 

308 return _unquote_aqf(decstr) 

309 else: 

310 return decstr 

311 

312 

313def _load_qstr(arg: str, pos: int) -> Tuple[str, int]: 

314 """Parse a quoted string until the closing '""" 

315 ret = "" 

316 while True: 

317 if pos == len(arg): 

318 raise ParseError(f"Unterminated quoted string") 

319 char = arg[pos] 

320 if char == "%": 

321 enc, pos = _load_percent(arg, pos) 

322 ret += enc 

323 elif char == "+": 

324 ret += " " 

325 pos += 1 

326 elif char == "'": 

327 return ret, pos + 1 

328 elif _is_unencoded(char) or char in "(,:)": 

329 ret += char 

330 pos += 1 

331 else: 

332 raise ParseError(f"Unexpected char {char!r} in quoted string at pos {pos}") 

333 

334 

335def _load_atom(arg: str, pos: int, opts: LoadOpts) -> Tuple[Any, int]: 

336 """Parse an atom: string, int, bool, null""" 

337 # on-the-fly decoding into ret 

338 ret = "" 

339 # raw contains the string without decoding to check for unquoted atoms. 

340 raw: Optional[str] = "" 

341 if pos == len(arg): 

342 raise ParseError(f"Unexpected empty value at pos {pos}") 

343 char = arg[pos] 

344 if char == "'" and not opts.aqf: 

345 return _load_qstr(arg, pos + 1) 

346 while True: 

347 if pos == len(arg): 

348 # We know string is not empty because we checked it before aposthrophe 

349 assert len(ret) 

350 return _convert_unquoted_atom(raw, ret, opts), pos 

351 char = arg[pos] 

352 if char == "%": 

353 enc, pos = _load_percent(arg, pos) 

354 ret += enc 

355 # no unquoted atom contains a percent 

356 raw = None 

357 continue 

358 elif char == "+": 

359 ret += " " 

360 if raw is not None: 

361 raw += "+" 

362 pos += 1 

363 continue 

364 elif opts.aqf and char == "!": 

365 ret += char 

366 if raw is not None: 

367 raw += char 

368 pos += 1 

369 if pos < len(arg): 

370 char = arg[pos] 

371 if char in "(),:!": 

372 ret += char 

373 if raw is not None: 

374 raw += char 

375 pos += 1 

376 elif _is_unencoded(char) or char == "'": 

377 ret += char 

378 if raw is not None: 

379 raw += char 

380 pos += 1 

381 else: 

382 if len(ret) == 0: 

383 raise ParseError(f"Unexpected empty value at pos {pos}") 

384 return _convert_unquoted_atom(raw, ret, opts), pos 

385 

386 

387def _load_list_data(arg: str, pos: int, opts: LoadOpts) -> list: 

388 """Parse a list. pos points after the first item""" 

389 ret: List[Any] = [] 

390 if pos == len(arg): 

391 return ret 

392 while True: 

393 item, pos = _load_any(arg, pos, opts) 

394 ret.append(item) 

395 if pos == len(arg): 

396 return ret 

397 char = arg[pos] 

398 if char == ",": 

399 pos += 1 

400 continue 

401 raise ParseError(f"Unexpected char {char!r} at pos {pos} in list") 

402 

403 

404def _load_list( 

405 arg: str, pos: int, first_element: Any, opts: LoadOpts 

406) -> Tuple[list, int]: 

407 """Parse a list. pos points after the first item""" 

408 ret = [first_element] 

409 while True: 

410 if pos == len(arg): 

411 raise ParseError(f"Unterminated list") 

412 char = arg[pos] 

413 if char == ")": 

414 return ret, pos + 1 

415 if char == ",": 

416 pos += 1 

417 item, pos = _load_any(arg, pos, opts) 

418 ret.append(item) 

419 continue 

420 raise ParseError(f"Unexpected char {char!r} at pos {pos} in list") 

421 

422 

423def _load_dict(arg: str, pos: int, first_key: Any, opts: LoadOpts) -> Tuple[dict, int]: 

424 first_val, pos = _load_any(arg, pos, opts) 

425 ret = {first_key: first_val} 

426 while True: 

427 if pos == len(arg): 

428 raise ParseError(f"Unterminated dict") 

429 char = arg[pos] 

430 if char == ")": 

431 return ret, pos + 1 

432 if char == ",": 

433 pos += 1 

434 key, pos = _load_atom(arg, pos, opts) 

435 if pos == len(arg): 

436 raise ParseError(f"Unterminated dict, missing value") 

437 char = arg[pos] 

438 if char != ":": 

439 raise ParseError(f"Unexpected char {char!r} at pos {pos}, expected :") 

440 pos += 1 

441 val, pos = _load_any(arg, pos, opts) 

442 ret[key] = val 

443 

444 

445def _load_comp(arg: str, pos: int, opts: LoadOpts) -> Tuple[Any, int]: 

446 """Parse a composite: list or dict""" 

447 val, pos = _load_atom(arg, pos, opts) 

448 if pos == len(arg): 

449 raise ParseError("Unterminated composite") 

450 char = arg[pos] 

451 if char == ":": 

452 pos += 1 

453 return _load_dict(arg, pos, val, opts) 

454 if char == ",": 

455 return _load_list(arg, pos, val, opts) 

456 if char == ")": 

457 return _load_list(arg, pos, val, opts) 

458 raise ParseError(f"Unexpected char {char} at pos {pos}, expected , or :") 

459 

460 

461def _load_any(arg: str, pos: int, opts: LoadOpts) -> Tuple[Any, int]: 

462 if pos == len(arg): 

463 raise ParseError(f"Unexpected end of input") 

464 if arg[pos] == "(": 

465 pos += 1 

466 if pos == len(arg): 

467 raise ParseError("Unterminated composite, expected value") 

468 char = arg[pos] 

469 if char == "(": 

470 first_val, pos = _load_any(arg, pos, opts) 

471 return _load_list(arg, pos, first_val, opts) 

472 if opts.distinguish_empty_list_dict and char == ":": 

473 pos += 1 

474 if pos == len(arg): 

475 raise ParseError("Unterminated empty composite, expected )") 

476 char = arg[pos] 

477 if char == ")": 

478 return {}, pos + 1 

479 else: 

480 raise ParseError("Unterminated empty composite, expected )") 

481 if char == ")": 

482 if opts.distinguish_empty_list_dict: 

483 return [], pos + 1 

484 else: 

485 return {}, pos + 1 

486 return _load_comp(arg, pos, opts) 

487 else: 

488 return _load_atom(arg, pos, opts) 

489 

490 

491def _load_top(arg: str, pos: int, opts: LoadOpts) -> Any: 

492 ret, pos = _load_any(arg, pos, opts) 

493 if pos != len(arg): 

494 char = arg[pos] 

495 raise ParseError(f"Expected end of input at {pos}, got {char!r}") 

496 return ret 

497 

498 

499def _load_dict_data(arg: str, pos: int, opts: LoadOpts) -> dict: 

500 ret: Dict[str, Any] = {} 

501 if pos == len(arg): 

502 return ret 

503 while True: 

504 key, pos = _load_atom(arg, pos, opts) 

505 if pos == len(arg): 

506 raise ParseError(f"Unterminated dict, missing value") 

507 char = arg[pos] 

508 if char != ":": 

509 raise ParseError(f"Unexpected char {char!r} at pos {pos}, expected :") 

510 pos += 1 

511 val, pos = _load_any(arg, pos, opts) 

512 ret[key] = val 

513 if pos == len(arg): 

514 return ret 

515 char = arg[pos] 

516 if char != ",": 

517 raise ParseError( 

518 f"Unexpected char {char!r} at pos {pos}, expected , or end of input" 

519 ) 

520 pos += 1 

521 

522 

523@overload 

524def loads(arg: str, opts: Optional[LoadOpts] = None) -> Any: ... 

525 

526 

527@overload 

528def loads( 

529 arg: str, 

530 *, 

531 implied_dict: bool = False, 

532 implied_list: bool = False, 

533 aqf: bool = False, 

534 distinguish_empty_list_dict: bool = False, 

535) -> Any: ... 

536 

537 

538def loads(arg: str, opts=None, **kw) -> Any: 

539 """ 

540 Convert a json object into a jsonurl string 

541 

542 Options can be passed as a `LoadOpts` object or as individual keyword arguments. 

543 """ 

544 if opts is None: 

545 opts = LoadOpts(**kw) 

546 elif kw: 

547 raise ValueError("Either opts or kw, not both") 

548 

549 if opts.aqf: 

550 arg = _partial_decode_aqf(arg) 

551 if opts.implied_dict: 

552 return _load_dict_data(arg, 0, opts) 

553 if opts.implied_list: 

554 return _load_list_data(arg, 0, opts) 

555 return _load_top(arg, 0, opts) 

556 

557 

558def _add_common_args(parser): 

559 parser.add_argument( 

560 "-l", 

561 "--implied-list", 

562 action="store_true", 

563 help="Implied Array mode", 

564 ) 

565 parser.add_argument( 

566 "-d", 

567 "--implied-dict", 

568 action="store_true", 

569 help="Implied Object mode", 

570 ) 

571 parser.add_argument( 

572 "-a", 

573 "--address-query-friendly", 

574 dest="aqf", 

575 action="store_true", 

576 help="Address Bar Query String Friendly mode", 

577 ) 

578 

579 

580def create_parser(): 

581 from argparse import ArgumentParser 

582 

583 parser = ArgumentParser(description=__doc__, prog="jsonurl-py") 

584 subtop = parser.add_subparsers(dest="subcmd", metavar="SUBCMD", required=True) 

585 

586 sub = subtop.add_parser("load", help="Parse JSONURL input and output JSON") 

587 _add_common_args(sub) 

588 sub.add_argument("--indent", type=int, help="Output indent spaces per level") 

589 

590 sub = subtop.add_parser("dump", help="Parse JSON input and output JSONURL") 

591 _add_common_args(sub) 

592 

593 return parser 

594 

595 

596def main(argv=None): 

597 import json 

598 

599 common_keys = ["implied_list", "implied_dict", "aqf"] 

600 opts = create_parser().parse_args(argv) 

601 if opts.subcmd == "load": 

602 load_opts = LoadOpts(**{k: getattr(opts, k) for k in common_keys}) 

603 input = sys.stdin.read().rstrip("\n") 

604 data = loads(input, load_opts) 

605 sys.stdout.write(json.dumps(data, indent=opts.indent) + "\n") 

606 elif opts.subcmd == "dump": 

607 dump_opts = DumpOpts(**{k: getattr(opts, k) for k in common_keys}) 

608 input = sys.stdin.read() 

609 data = json.loads(input) 

610 sys.stdout.write(dumps(data, dump_opts) + "\n") 

611 else: # pragma: no cover 

612 raise ValueError(f"Unhandled subcmd {opts.subcmd}") 

613 

614 

615if __name__ == "__main__": 

616 main()