Coverage for jsonurl_py.py: 99%
377 statements
« prev ^ index » next coverage.py v6.5.0, created at 2022-11-20 21:12 +0000
« prev ^ index » next coverage.py v6.5.0, created at 2022-11-20 21:12 +0000
1#! /usr/bin/env python3
3"""
4Python implementation of jsonurl, an alternative format for JSON data model
6See https://jsonurl.org/ and https://github.com/jsonurl/specification/
7"""
9__version__ = "0.4.0"
11import re
12import sys
13from dataclasses import dataclass
14from typing import Any, Dict, List, Optional, Tuple, overload
15from urllib.parse import quote_plus
17if sys.hexversion >= 0x030A0000: # pragma: no cover
19 def _dataclass_kwonly(*a, **kw):
20 return dataclass(*a, **kw, kw_only=True) # type: ignore
22else:
23 _dataclass_kwonly = dataclass
26@_dataclass_kwonly
27class CommonOpts:
28 """
29 Common options for both `dumps` and `loads`
30 """
32 implied_list: bool = False
33 """
34 Implied-Array mode: Omit parantheses and assume data is list
36 See `spec section 2.9.1 <https://github.com/jsonurl/specification/#291-implied-arrays>`_
37 """
39 implied_dict: bool = False
40 """
41 Implied-Object mode: Omit parantheses and assume data is dict
43 See `spec section 2.9.2 <https://github.com/jsonurl/specification/#292-implied-objects>`_
44 """
46 distinguish_empty_list_dict: bool = False
47 """Distinguish between empty list and empty dict
49 Use ``(:)`` for empty dict and ``()`` for empty list as per `spec section 2.9.5
50 <https://github.com/jsonurl/specification/#295-empty-objects-and-arrays>`_
51 """
53 aqf: bool = False
54 """Address bar Query string Friendly
56 Use ``!`` quoting instead of ``'`` as per `spec section 2.9.6
57 <https://github.com/jsonurl/specification/#296-address-bar-query-string-friendly>`_
58 """
61@_dataclass_kwonly
62class DumpOpts(CommonOpts):
63 """
64 Options for `jsonurl_py.dumps`
65 """
67 safe: str = ""
68 """
69 Additional characters considered safe and not requiring percent-encoding.
71 This is similar to the safe argument to `urllib.parse.quote_plus`.
73 By default only the characters ``[a-zA-Z0-9_.-~]`` are safe. Additional
74 characters that can be marked safe are: ``!$*/;?@``. In AQF mode the
75 characters "'" can also be marked as safe.
77 In AQF mode the characters `!(),:` are considered safe by default so that
78 output is readable but on input they are also recognized in encoded form.
79 """
82def _dump_list_data(arg: Any, opts: DumpOpts) -> str:
83 return ",".join(_dump_any(x, opts) for x in arg)
86def _dump_dict_data(arg: Any, opts: DumpOpts) -> str:
87 return ",".join(
88 _dump_any(k, opts) + ":" + _dump_any(v, opts) for k, v in arg.items()
89 )
92def _dump_str(arg: str, opts: DumpOpts) -> str:
93 if opts.aqf:
94 if arg == "true":
95 return "!true"
96 if arg == "false":
97 return "!false"
98 if arg == "null":
99 return "!null"
100 if arg == "":
101 return "!e"
102 if RE_NUMBER.match(arg):
103 return "!" + arg
104 return quote_plus(arg, safe=opts.safe + "(),:!").translate(
105 {
106 ord("!"): "!!",
107 ord("("): "!(",
108 ord(")"): "!)",
109 ord(","): "!,",
110 ord(":"): "!:",
111 }
112 )
113 else:
114 if arg == "true":
115 return "'true'"
116 if arg == "false":
117 return "'false'"
118 if arg == "null":
119 return "'null'"
120 if arg == "":
121 return "''"
122 if RE_NUMBER.match(arg):
123 return "'" + arg + "'"
124 return quote_plus(arg, safe=opts.safe)
127def _dump_any(arg: Any, opts: DumpOpts) -> str:
128 if arg is True:
129 return "true"
130 if arg is False:
131 return "false"
132 if arg is None:
133 return "null"
134 if isinstance(arg, str):
135 return _dump_str(arg, opts)
136 if isinstance(arg, int):
137 return str(arg)
138 if isinstance(arg, float):
139 return str(arg)
140 if isinstance(arg, list):
141 return "(" + _dump_list_data(arg, opts) + ")"
142 if isinstance(arg, dict):
143 if len(arg) == 0 and opts.distinguish_empty_list_dict:
144 return "(:)"
145 else:
146 return "(" + _dump_dict_data(arg, opts) + ")"
147 raise TypeError(f"Bad value {arg!r} of type {type(arg)}")
150@overload
151def dumps(arg: Any, opts: Optional[DumpOpts] = None) -> str:
152 ...
155@overload
156def dumps(
157 arg: Any,
158 *,
159 implied_list: bool = False,
160 implied_dict: bool = False,
161 aqf: bool = False,
162 safe: str = "",
163 distinguish_empty_list_dict: bool = False,
164) -> str:
165 ...
168def dumps(arg: Any, opts=None, **kw) -> str:
169 """
170 Convert a json object into a jsonurl string
172 Options can be passed as a `DumpOpts` object or as individual keyword arguments.
173 """
174 if opts is None:
175 opts = DumpOpts(**kw)
176 elif kw:
177 raise ValueError("Either opts or kw, not both")
178 check_can_mark_safe(opts.safe, opts.aqf)
180 if opts.implied_dict:
181 return _dump_dict_data(arg, opts)
182 if opts.implied_list:
183 return _dump_list_data(arg, opts)
184 return _dump_any(arg, opts)
187def check_can_mark_safe(safe: str, aqf=False):
188 """Check if a string can be marked as safe for jsonurl"""
189 for c in safe:
190 if c in "!$*/;?@":
191 continue
192 if aqf and c == "'":
193 continue
194 raise ValueError(f"Can't mark character {c!r} as safe")
197@_dataclass_kwonly
198class LoadOpts(CommonOpts):
199 """
200 Options for `loads` method
201 """
204RE_NUMBER = re.compile(r"^-?\d+(?:\.\d+)?(?:[eE][-+]?\d+)?$")
205RE_INT_NUMBER = re.compile(r"^-?\d+$")
208class ParseError(Exception):
209 pass
212def _load_hexdigit(arg: str, pos: int) -> int:
213 char = arg[pos]
214 if char >= "0" and char <= "9":
215 return ord(char) - ord("0")
216 elif char >= "a" and char <= "f":
217 return ord(char) - ord("a") + 10
218 elif char >= "A" and char <= "F":
219 return ord(char) - ord("A") + 10
220 else:
221 raise ParseError(f"Invalid hex digit {char!r} at pos {pos}")
224def _load_percent(arg: str, pos: int) -> Tuple[str, int]:
225 arr = []
226 while pos < len(arg) and arg[pos] == "%":
227 if pos + 2 >= len(arg):
228 raise ParseError(f"Unterminated percent at pos {pos}")
229 arr.append(_load_hexdigit(arg, pos + 1) * 16 + _load_hexdigit(arg, pos + 2))
230 pos += 3
231 return bytes(arr).decode("utf-8"), pos
234_UNENCODED_CHAR_LIST = (
235 "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789-._~!$*/;?@"
236)
239def _is_unencoded(char: str) -> bool:
240 """If one of the sharacters listed as "unencoded" in jsonurl spec"""
241 return char in _UNENCODED_CHAR_LIST
244_AQF_PARTIAL_DECODE_SET = set([ord("("), ord(")"), ord(","), ord(":"), ord("!")])
247def _partial_decode_aqf(arg: str) -> str:
248 """Perform partial percent decoding for AQF
250 Affects (),:!
252 All the characters involved are ascii so they can't be part of a multi-byte
253 character. Overlong encodings don't need handling either, they will reach
254 _load_percent and be rejected by the python utf-8 decoder.
256 This is done so that the rest of the parser can check for structural
257 characters without worrying about percent enconding.
258 """
259 ret = ""
260 spos = 0
261 while True:
262 epos = arg.find("%", spos)
263 if epos == -1:
264 return ret + arg[spos:]
265 if epos + 2 >= len(arg):
266 raise ParseError(f"Unterminated percent at pos {epos}")
267 val = _load_hexdigit(arg, epos + 1) * 16 + _load_hexdigit(arg, epos + 2)
268 if val in _AQF_PARTIAL_DECODE_SET:
269 ret += arg[spos:epos] + chr(val)
270 spos = epos + 3
271 else:
272 ret += arg[spos : epos + 3]
273 spos = epos + 3
276def _unquote_aqf(arg: str) -> str:
277 ret = ""
278 spos = 0
279 while True:
280 epos = arg.find("!", spos)
281 if epos == -1:
282 return ret + arg[spos:]
283 if epos == len(arg) - 1:
284 raise ParseError(f"Invalid trailing ! in atom {arg!r}")
285 eval = arg[epos + 1]
286 if eval in "():,0123456789+-!fnt":
287 ret += arg[spos:epos] + eval
288 spos = epos + 2
289 else:
290 raise ParseError(f"Invalid !-escaped char {hex(ord(eval))}")
293def _convert_unquoted_atom(arg: Optional[str], decstr: str, opts: LoadOpts) -> Any:
294 if arg is not None:
295 if arg == "null":
296 return None
297 if arg == "true":
298 return True
299 if arg == "false":
300 return False
301 if re.match(RE_NUMBER, arg):
302 if re.match(RE_INT_NUMBER, arg):
303 return int(arg)
304 else:
305 return float(arg)
306 if opts.aqf:
307 if decstr == "!e":
308 return ""
309 else:
310 return _unquote_aqf(decstr)
311 else:
312 return decstr
315def _load_qstr(arg: str, pos: int) -> Tuple[str, int]:
316 """Parse a quoted string until the closing '"""
317 ret = ""
318 while True:
319 if pos == len(arg):
320 raise ParseError(f"Unterminated quoted string")
321 char = arg[pos]
322 if char == "%":
323 enc, pos = _load_percent(arg, pos)
324 ret += enc
325 elif char == "+":
326 ret += " "
327 pos += 1
328 elif char == "'":
329 return ret, pos + 1
330 elif _is_unencoded(char) or char in "(,:)":
331 ret += char
332 pos += 1
333 else:
334 raise ParseError(f"Unexpected char {char!r} in quoted string at pos {pos}")
337def _load_atom(arg: str, pos: int, opts: LoadOpts) -> Tuple[Any, int]:
338 """Parse an atom: string, int, bool, null"""
339 # on-the-fly decoding into ret
340 ret = ""
341 # raw contains the string without decoding to check for unquoted atoms.
342 raw: Optional[str] = ""
343 if pos == len(arg):
344 raise ParseError(f"Unexpected empty value at pos {pos}")
345 char = arg[pos]
346 if char == "'" and not opts.aqf:
347 return _load_qstr(arg, pos + 1)
348 while True:
349 if pos == len(arg):
350 # We know string is not empty because we checked it before aposthrophe
351 assert len(ret)
352 return _convert_unquoted_atom(raw, ret, opts), pos
353 char = arg[pos]
354 if char == "%":
355 enc, pos = _load_percent(arg, pos)
356 ret += enc
357 # no unquoted atom contains a percent
358 raw = None
359 continue
360 elif char == "+":
361 ret += " "
362 if raw is not None:
363 raw += "+"
364 pos += 1
365 continue
366 elif opts.aqf and char == "!":
367 ret += char
368 if raw is not None:
369 raw += char
370 pos += 1
371 if pos < len(arg):
372 char = arg[pos]
373 if char in "(),:!":
374 ret += char
375 if raw is not None:
376 raw += char
377 pos += 1
378 elif _is_unencoded(char) or char == "'":
379 ret += char
380 if raw is not None:
381 raw += char
382 pos += 1
383 else:
384 if len(ret) == 0:
385 raise ParseError(f"Unexpected empty value at pos {pos}")
386 return _convert_unquoted_atom(raw, ret, opts), pos
389def _load_list_data(arg: str, pos: int, opts: LoadOpts) -> list:
390 """Parse a list. pos points after the first item"""
391 ret: List[Any] = []
392 if pos == len(arg):
393 return ret
394 while True:
395 item, pos = _load_any(arg, pos, opts)
396 ret.append(item)
397 if pos == len(arg):
398 return ret
399 char = arg[pos]
400 if char == ",":
401 pos += 1
402 continue
403 raise ParseError(f"Unexpected char {char!r} at pos {pos} in list")
406def _load_list(
407 arg: str, pos: int, first_element: Any, opts: LoadOpts
408) -> Tuple[list, int]:
409 """Parse a list. pos points after the first item"""
410 ret = [first_element]
411 while True:
412 if pos == len(arg):
413 raise ParseError(f"Unterminated list")
414 char = arg[pos]
415 if char == ")":
416 return ret, pos + 1
417 if char == ",":
418 pos += 1
419 item, pos = _load_any(arg, pos, opts)
420 ret.append(item)
421 continue
422 raise ParseError(f"Unexpected char {char!r} at pos {pos} in list")
425def _load_dict(arg: str, pos: int, first_key: Any, opts: LoadOpts) -> Tuple[dict, int]:
426 first_val, pos = _load_any(arg, pos, opts)
427 ret = {first_key: first_val}
428 while True:
429 if pos == len(arg):
430 raise ParseError(f"Unterminated dict")
431 char = arg[pos]
432 if char == ")":
433 return ret, pos + 1
434 if char == ",":
435 pos += 1
436 key, pos = _load_atom(arg, pos, opts)
437 if pos == len(arg):
438 raise ParseError(f"Unterminated dict, missing value")
439 char = arg[pos]
440 if char != ":":
441 raise ParseError(f"Unexpected char {char!r} at pos {pos}, expected :")
442 pos += 1
443 val, pos = _load_any(arg, pos, opts)
444 ret[key] = val
447def _load_comp(arg: str, pos: int, opts: LoadOpts) -> Tuple[Any, int]:
448 """Parse a composite: list or dict"""
449 val, pos = _load_atom(arg, pos, opts)
450 if pos == len(arg):
451 raise ParseError("Unterminated composite")
452 char = arg[pos]
453 if char == ":":
454 pos += 1
455 return _load_dict(arg, pos, val, opts)
456 if char == ",":
457 return _load_list(arg, pos, val, opts)
458 if char == ")":
459 return _load_list(arg, pos, val, opts)
460 raise ParseError(f"Unexpected char {char} at pos {pos}, expected , or :")
463def _load_any(arg: str, pos: int, opts: LoadOpts) -> Tuple[Any, int]:
464 if pos == len(arg):
465 raise ParseError(f"Unexpected end of input")
466 if arg[pos] == "(":
467 pos += 1
468 if pos == len(arg):
469 raise ParseError("Unterminated composite, expected value")
470 char = arg[pos]
471 if char == "(":
472 first_val, pos = _load_any(arg, pos, opts)
473 return _load_list(arg, pos, first_val, opts)
474 if opts.distinguish_empty_list_dict and char == ":":
475 pos += 1
476 if pos == len(arg):
477 raise ParseError("Unterminated empty composite, expected )")
478 char = arg[pos]
479 if char == ")":
480 return {}, pos + 1
481 else:
482 raise ParseError("Unterminated empty composite, expected )")
483 if char == ")":
484 if opts.distinguish_empty_list_dict:
485 return [], pos + 1
486 else:
487 return {}, pos + 1
488 return _load_comp(arg, pos, opts)
489 else:
490 return _load_atom(arg, pos, opts)
493def _load_top(arg: str, pos: int, opts: LoadOpts) -> Any:
494 ret, pos = _load_any(arg, pos, opts)
495 if pos != len(arg):
496 char = arg[pos]
497 raise ParseError(f"Expected end of input at {pos}, got {char!r}")
498 return ret
501def _load_dict_data(arg: str, pos: int, opts: LoadOpts) -> dict:
502 ret: Dict[str, Any] = {}
503 if pos == len(arg):
504 return ret
505 while True:
506 key, pos = _load_atom(arg, pos, opts)
507 if pos == len(arg):
508 raise ParseError(f"Unterminated dict, missing value")
509 char = arg[pos]
510 if char != ":":
511 raise ParseError(f"Unexpected char {char!r} at pos {pos}, expected :")
512 pos += 1
513 val, pos = _load_any(arg, pos, opts)
514 ret[key] = val
515 if pos == len(arg):
516 return ret
517 char = arg[pos]
518 if char != ",":
519 raise ParseError(
520 f"Unexpected char {char!r} at pos {pos}, expected , or end of input"
521 )
522 pos += 1
525@overload
526def loads(arg: str, opts: Optional[LoadOpts] = None) -> Any:
527 ...
530@overload
531def loads(
532 arg: str,
533 *,
534 implied_dict: bool = False,
535 implied_list: bool = False,
536 aqf: bool = False,
537 distinguish_empty_list_dict: bool = False,
538) -> Any:
539 ...
542def loads(arg: str, opts=None, **kw) -> Any:
543 """
544 Convert a json object into a jsonurl string
546 Options can be passed as a `LoadOpts` object or as individual keyword arguments.
547 """
548 if opts is None:
549 opts = LoadOpts(**kw)
550 elif kw:
551 raise ValueError("Either opts or kw, not both")
553 if opts.aqf:
554 arg = _partial_decode_aqf(arg)
555 if opts.implied_dict:
556 return _load_dict_data(arg, 0, opts)
557 if opts.implied_list:
558 return _load_list_data(arg, 0, opts)
559 return _load_top(arg, 0, opts)
562def _add_common_args(parser):
563 parser.add_argument(
564 "-l",
565 "--implied-list",
566 action="store_true",
567 help="Implied Array mode",
568 )
569 parser.add_argument(
570 "-d",
571 "--implied-dict",
572 action="store_true",
573 help="Implied Object mode",
574 )
575 parser.add_argument(
576 "-a",
577 "--address-query-friendly",
578 dest="aqf",
579 action="store_true",
580 help="Address Bar Query String Friendly mode",
581 )
584def create_parser():
585 from argparse import ArgumentParser
587 parser = ArgumentParser(description=__doc__, prog="jsonurl-py")
588 subtop = parser.add_subparsers(dest="subcmd", metavar="SUBCMD", required=True)
590 sub = subtop.add_parser("load", help="Parse JSONURL input and output JSON")
591 _add_common_args(sub)
592 sub.add_argument("--indent", type=int, help="Output indent spaces per level")
594 sub = subtop.add_parser("dump", help="Parse JSON input and output JSONURL")
595 _add_common_args(sub)
597 return parser
600def main(argv=None):
601 import json
603 common_keys = ["implied_list", "implied_dict", "aqf"]
604 opts = create_parser().parse_args(argv)
605 if opts.subcmd == "load":
606 load_opts = LoadOpts(**{k: getattr(opts, k) for k in common_keys})
607 input = sys.stdin.read().rstrip("\n")
608 data = loads(input, load_opts)
609 sys.stdout.write(json.dumps(data, indent=opts.indent) + "\n")
610 elif opts.subcmd == "dump":
611 dump_opts = DumpOpts(**{k: getattr(opts, k) for k in common_keys})
612 input = sys.stdin.read()
613 data = json.loads(input)
614 sys.stdout.write(dumps(data, dump_opts) + "\n")
615 else: # pragma: no cover
616 raise ValueError(f"Unhandled subcmd {opts.subcmd}")
619if __name__ == "__main__":
620 main()