Coverage for jsonurl_py.py: 99%
376 statements
« prev ^ index » next coverage.py v7.6.8, created at 2024-12-06 11:15 +0000
« prev ^ index » next coverage.py v7.6.8, created at 2024-12-06 11:15 +0000
1#! /usr/bin/env python3
3"""
4Python implementation of jsonurl, an alternative format for JSON data model
6See https://jsonurl.org/ and https://github.com/jsonurl/specification/
7"""
9__version__ = "0.4.0"
11import re
12import sys
13from dataclasses import dataclass
14from typing import Any, Dict, List, Optional, Tuple, overload
15from urllib.parse import quote_plus
17if sys.hexversion >= 0x030A0000: # pragma: no cover
19 def _dataclass_kwonly(*a, **kw):
20 return dataclass(*a, **kw, kw_only=True) # type: ignore
22else:
23 _dataclass_kwonly = dataclass
26@_dataclass_kwonly
27class CommonOpts:
28 """
29 Common options for both `dumps` and `loads`
30 """
32 implied_list: bool = False
33 """
34 Implied-Array mode: Omit parantheses and assume data is list
36 See `spec section 2.9.1 <https://github.com/jsonurl/specification/#291-implied-arrays>`_
37 """
39 implied_dict: bool = False
40 """
41 Implied-Object mode: Omit parantheses and assume data is dict
43 See `spec section 2.9.2 <https://github.com/jsonurl/specification/#292-implied-objects>`_
44 """
46 distinguish_empty_list_dict: bool = False
47 """Distinguish between empty list and empty dict
49 Use ``(:)`` for empty dict and ``()`` for empty list as per `spec section 2.9.5
50 <https://github.com/jsonurl/specification/#295-empty-objects-and-arrays>`_
51 """
53 aqf: bool = False
54 """Address bar Query string Friendly
56 Use ``!`` quoting instead of ``'`` as per `spec section 2.9.6
57 <https://github.com/jsonurl/specification/#296-address-bar-query-string-friendly>`_
58 """
61@_dataclass_kwonly
62class DumpOpts(CommonOpts):
63 """
64 Options for `jsonurl_py.dumps`
65 """
67 safe: str = ""
68 """
69 Additional characters considered safe and not requiring percent-encoding.
71 This is similar to the safe argument to `urllib.parse.quote_plus`.
73 By default only the characters ``[a-zA-Z0-9_.-~]`` are safe. Additional
74 characters that can be marked safe are: ``!$*/;?@``. In AQF mode the
75 characters "'" can also be marked as safe.
77 In AQF mode the characters `!(),:` are considered safe by default so that
78 output is readable but on input they are also recognized in encoded form.
79 """
82def _dump_list_data(arg: Any, opts: DumpOpts) -> str:
83 return ",".join(_dump_any(x, opts) for x in arg)
86def _dump_dict_data(arg: Any, opts: DumpOpts) -> str:
87 return ",".join(
88 _dump_any(k, opts) + ":" + _dump_any(v, opts) for k, v in arg.items()
89 )
92def _dump_str(arg: str, opts: DumpOpts) -> str:
93 if opts.aqf:
94 if arg == "true":
95 return "!true"
96 if arg == "false":
97 return "!false"
98 if arg == "null":
99 return "!null"
100 if arg == "":
101 return "!e"
102 if RE_NUMBER.match(arg):
103 return "!" + arg
104 return quote_plus(arg, safe=opts.safe + "(),:!").translate(
105 {
106 ord("!"): "!!",
107 ord("("): "!(",
108 ord(")"): "!)",
109 ord(","): "!,",
110 ord(":"): "!:",
111 }
112 )
113 else:
114 if arg == "true":
115 return "'true'"
116 if arg == "false":
117 return "'false'"
118 if arg == "null":
119 return "'null'"
120 if arg == "":
121 return "''"
122 if RE_NUMBER.match(arg):
123 return "'" + arg + "'"
124 return quote_plus(arg, safe=opts.safe)
127def _dump_any(arg: Any, opts: DumpOpts) -> str:
128 if arg is True:
129 return "true"
130 if arg is False:
131 return "false"
132 if arg is None:
133 return "null"
134 if isinstance(arg, str):
135 return _dump_str(arg, opts)
136 if isinstance(arg, int):
137 return str(arg)
138 if isinstance(arg, float):
139 return str(arg)
140 if isinstance(arg, list):
141 return "(" + _dump_list_data(arg, opts) + ")"
142 if isinstance(arg, dict):
143 if len(arg) == 0 and opts.distinguish_empty_list_dict:
144 return "(:)"
145 else:
146 return "(" + _dump_dict_data(arg, opts) + ")"
147 raise TypeError(f"Bad value {arg!r} of type {type(arg)}")
150@overload
151def dumps(arg: Any, opts: Optional[DumpOpts] = None) -> str: ...
154@overload
155def dumps(
156 arg: Any,
157 *,
158 implied_list: bool = False,
159 implied_dict: bool = False,
160 aqf: bool = False,
161 safe: str = "",
162 distinguish_empty_list_dict: bool = False,
163) -> str: ...
166def dumps(arg: Any, opts=None, **kw) -> str:
167 """
168 Convert a json object into a jsonurl string
170 Options can be passed as a `DumpOpts` object or as individual keyword arguments.
171 """
172 if opts is None:
173 opts = DumpOpts(**kw)
174 elif kw:
175 raise ValueError("Either opts or kw, not both")
176 check_can_mark_safe(opts.safe, opts.aqf)
178 if opts.implied_dict:
179 return _dump_dict_data(arg, opts)
180 if opts.implied_list:
181 return _dump_list_data(arg, opts)
182 return _dump_any(arg, opts)
185def check_can_mark_safe(safe: str, aqf=False):
186 """Check if a string can be marked as safe for jsonurl"""
187 for c in safe:
188 if c in "!$*/;?@":
189 continue
190 if aqf and c == "'":
191 continue
192 raise ValueError(f"Can't mark character {c!r} as safe")
195@_dataclass_kwonly
196class LoadOpts(CommonOpts):
197 """
198 Options for `loads` method
199 """
202RE_NUMBER = re.compile(r"^-?\d+(?:\.\d+)?(?:[eE][-+]?\d+)?$")
203RE_INT_NUMBER = re.compile(r"^-?\d+$")
206class ParseError(Exception):
207 pass
210def _load_hexdigit(arg: str, pos: int) -> int:
211 char = arg[pos]
212 if char >= "0" and char <= "9":
213 return ord(char) - ord("0")
214 elif char >= "a" and char <= "f":
215 return ord(char) - ord("a") + 10
216 elif char >= "A" and char <= "F":
217 return ord(char) - ord("A") + 10
218 else:
219 raise ParseError(f"Invalid hex digit {char!r} at pos {pos}")
222def _load_percent(arg: str, pos: int) -> Tuple[str, int]:
223 arr = []
224 while pos < len(arg) and arg[pos] == "%":
225 if pos + 2 >= len(arg):
226 raise ParseError(f"Unterminated percent at pos {pos}")
227 arr.append(_load_hexdigit(arg, pos + 1) * 16 + _load_hexdigit(arg, pos + 2))
228 pos += 3
229 return bytes(arr).decode("utf-8"), pos
232_UNENCODED_CHAR_LIST = (
233 "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789-._~!$*/;?@"
234)
237def _is_unencoded(char: str) -> bool:
238 """If one of the sharacters listed as "unencoded" in jsonurl spec"""
239 return char in _UNENCODED_CHAR_LIST
242_AQF_PARTIAL_DECODE_SET = set([ord("("), ord(")"), ord(","), ord(":"), ord("!")])
245def _partial_decode_aqf(arg: str) -> str:
246 """Perform partial percent decoding for AQF
248 Affects (),:!
250 All the characters involved are ascii so they can't be part of a multi-byte
251 character. Overlong encodings don't need handling either, they will reach
252 _load_percent and be rejected by the python utf-8 decoder.
254 This is done so that the rest of the parser can check for structural
255 characters without worrying about percent enconding.
256 """
257 ret = ""
258 spos = 0
259 while True:
260 epos = arg.find("%", spos)
261 if epos == -1:
262 return ret + arg[spos:]
263 if epos + 2 >= len(arg):
264 raise ParseError(f"Unterminated percent at pos {epos}")
265 val = _load_hexdigit(arg, epos + 1) * 16 + _load_hexdigit(arg, epos + 2)
266 if val in _AQF_PARTIAL_DECODE_SET:
267 ret += arg[spos:epos] + chr(val)
268 spos = epos + 3
269 else:
270 ret += arg[spos : epos + 3]
271 spos = epos + 3
274def _unquote_aqf(arg: str) -> str:
275 ret = ""
276 spos = 0
277 while True:
278 epos = arg.find("!", spos)
279 if epos == -1:
280 return ret + arg[spos:]
281 if epos == len(arg) - 1:
282 raise ParseError(f"Invalid trailing ! in atom {arg!r}")
283 eval = arg[epos + 1]
284 if eval in "():,0123456789+-!fnt":
285 ret += arg[spos:epos] + eval
286 spos = epos + 2
287 else:
288 raise ParseError(f"Invalid !-escaped char {hex(ord(eval))}")
291def _convert_unquoted_atom(arg: Optional[str], decstr: str, opts: LoadOpts) -> Any:
292 if arg is not None:
293 if arg == "null":
294 return None
295 if arg == "true":
296 return True
297 if arg == "false":
298 return False
299 if re.match(RE_NUMBER, arg):
300 if re.match(RE_INT_NUMBER, arg):
301 return int(arg)
302 else:
303 return float(arg)
304 if opts.aqf:
305 if decstr == "!e":
306 return ""
307 else:
308 return _unquote_aqf(decstr)
309 else:
310 return decstr
313def _load_qstr(arg: str, pos: int) -> Tuple[str, int]:
314 """Parse a quoted string until the closing '"""
315 ret = ""
316 while True:
317 if pos == len(arg):
318 raise ParseError(f"Unterminated quoted string")
319 char = arg[pos]
320 if char == "%":
321 enc, pos = _load_percent(arg, pos)
322 ret += enc
323 elif char == "+":
324 ret += " "
325 pos += 1
326 elif char == "'":
327 return ret, pos + 1
328 elif _is_unencoded(char) or char in "(,:)":
329 ret += char
330 pos += 1
331 else:
332 raise ParseError(f"Unexpected char {char!r} in quoted string at pos {pos}")
335def _load_atom(arg: str, pos: int, opts: LoadOpts) -> Tuple[Any, int]:
336 """Parse an atom: string, int, bool, null"""
337 # on-the-fly decoding into ret
338 ret = ""
339 # raw contains the string without decoding to check for unquoted atoms.
340 raw: Optional[str] = ""
341 if pos == len(arg):
342 raise ParseError(f"Unexpected empty value at pos {pos}")
343 char = arg[pos]
344 if char == "'" and not opts.aqf:
345 return _load_qstr(arg, pos + 1)
346 while True:
347 if pos == len(arg):
348 # We know string is not empty because we checked it before aposthrophe
349 assert len(ret)
350 return _convert_unquoted_atom(raw, ret, opts), pos
351 char = arg[pos]
352 if char == "%":
353 enc, pos = _load_percent(arg, pos)
354 ret += enc
355 # no unquoted atom contains a percent
356 raw = None
357 continue
358 elif char == "+":
359 ret += " "
360 if raw is not None:
361 raw += "+"
362 pos += 1
363 continue
364 elif opts.aqf and char == "!":
365 ret += char
366 if raw is not None:
367 raw += char
368 pos += 1
369 if pos < len(arg):
370 char = arg[pos]
371 if char in "(),:!":
372 ret += char
373 if raw is not None:
374 raw += char
375 pos += 1
376 elif _is_unencoded(char) or char == "'":
377 ret += char
378 if raw is not None:
379 raw += char
380 pos += 1
381 else:
382 if len(ret) == 0:
383 raise ParseError(f"Unexpected empty value at pos {pos}")
384 return _convert_unquoted_atom(raw, ret, opts), pos
387def _load_list_data(arg: str, pos: int, opts: LoadOpts) -> list:
388 """Parse a list. pos points after the first item"""
389 ret: List[Any] = []
390 if pos == len(arg):
391 return ret
392 while True:
393 item, pos = _load_any(arg, pos, opts)
394 ret.append(item)
395 if pos == len(arg):
396 return ret
397 char = arg[pos]
398 if char == ",":
399 pos += 1
400 continue
401 raise ParseError(f"Unexpected char {char!r} at pos {pos} in list")
404def _load_list(
405 arg: str, pos: int, first_element: Any, opts: LoadOpts
406) -> Tuple[list, int]:
407 """Parse a list. pos points after the first item"""
408 ret = [first_element]
409 while True:
410 if pos == len(arg):
411 raise ParseError(f"Unterminated list")
412 char = arg[pos]
413 if char == ")":
414 return ret, pos + 1
415 if char == ",":
416 pos += 1
417 item, pos = _load_any(arg, pos, opts)
418 ret.append(item)
419 continue
420 raise ParseError(f"Unexpected char {char!r} at pos {pos} in list")
423def _load_dict(arg: str, pos: int, first_key: Any, opts: LoadOpts) -> Tuple[dict, int]:
424 first_val, pos = _load_any(arg, pos, opts)
425 ret = {first_key: first_val}
426 while True:
427 if pos == len(arg):
428 raise ParseError(f"Unterminated dict")
429 char = arg[pos]
430 if char == ")":
431 return ret, pos + 1
432 if char == ",":
433 pos += 1
434 key, pos = _load_atom(arg, pos, opts)
435 if pos == len(arg):
436 raise ParseError(f"Unterminated dict, missing value")
437 char = arg[pos]
438 if char != ":":
439 raise ParseError(f"Unexpected char {char!r} at pos {pos}, expected :")
440 pos += 1
441 val, pos = _load_any(arg, pos, opts)
442 ret[key] = val
445def _load_comp(arg: str, pos: int, opts: LoadOpts) -> Tuple[Any, int]:
446 """Parse a composite: list or dict"""
447 val, pos = _load_atom(arg, pos, opts)
448 if pos == len(arg):
449 raise ParseError("Unterminated composite")
450 char = arg[pos]
451 if char == ":":
452 pos += 1
453 return _load_dict(arg, pos, val, opts)
454 if char == ",":
455 return _load_list(arg, pos, val, opts)
456 if char == ")":
457 return _load_list(arg, pos, val, opts)
458 raise ParseError(f"Unexpected char {char} at pos {pos}, expected , or :")
461def _load_any(arg: str, pos: int, opts: LoadOpts) -> Tuple[Any, int]:
462 if pos == len(arg):
463 raise ParseError(f"Unexpected end of input")
464 if arg[pos] == "(":
465 pos += 1
466 if pos == len(arg):
467 raise ParseError("Unterminated composite, expected value")
468 char = arg[pos]
469 if char == "(":
470 first_val, pos = _load_any(arg, pos, opts)
471 return _load_list(arg, pos, first_val, opts)
472 if opts.distinguish_empty_list_dict and char == ":":
473 pos += 1
474 if pos == len(arg):
475 raise ParseError("Unterminated empty composite, expected )")
476 char = arg[pos]
477 if char == ")":
478 return {}, pos + 1
479 else:
480 raise ParseError("Unterminated empty composite, expected )")
481 if char == ")":
482 if opts.distinguish_empty_list_dict:
483 return [], pos + 1
484 else:
485 return {}, pos + 1
486 return _load_comp(arg, pos, opts)
487 else:
488 return _load_atom(arg, pos, opts)
491def _load_top(arg: str, pos: int, opts: LoadOpts) -> Any:
492 ret, pos = _load_any(arg, pos, opts)
493 if pos != len(arg):
494 char = arg[pos]
495 raise ParseError(f"Expected end of input at {pos}, got {char!r}")
496 return ret
499def _load_dict_data(arg: str, pos: int, opts: LoadOpts) -> dict:
500 ret: Dict[str, Any] = {}
501 if pos == len(arg):
502 return ret
503 while True:
504 key, pos = _load_atom(arg, pos, opts)
505 if pos == len(arg):
506 raise ParseError(f"Unterminated dict, missing value")
507 char = arg[pos]
508 if char != ":":
509 raise ParseError(f"Unexpected char {char!r} at pos {pos}, expected :")
510 pos += 1
511 val, pos = _load_any(arg, pos, opts)
512 ret[key] = val
513 if pos == len(arg):
514 return ret
515 char = arg[pos]
516 if char != ",":
517 raise ParseError(
518 f"Unexpected char {char!r} at pos {pos}, expected , or end of input"
519 )
520 pos += 1
523@overload
524def loads(arg: str, opts: Optional[LoadOpts] = None) -> Any: ...
527@overload
528def loads(
529 arg: str,
530 *,
531 implied_dict: bool = False,
532 implied_list: bool = False,
533 aqf: bool = False,
534 distinguish_empty_list_dict: bool = False,
535) -> Any: ...
538def loads(arg: str, opts=None, **kw) -> Any:
539 """
540 Convert a json object into a jsonurl string
542 Options can be passed as a `LoadOpts` object or as individual keyword arguments.
543 """
544 if opts is None:
545 opts = LoadOpts(**kw)
546 elif kw:
547 raise ValueError("Either opts or kw, not both")
549 if opts.aqf:
550 arg = _partial_decode_aqf(arg)
551 if opts.implied_dict:
552 return _load_dict_data(arg, 0, opts)
553 if opts.implied_list:
554 return _load_list_data(arg, 0, opts)
555 return _load_top(arg, 0, opts)
558def _add_common_args(parser):
559 parser.add_argument(
560 "-l",
561 "--implied-list",
562 action="store_true",
563 help="Implied Array mode",
564 )
565 parser.add_argument(
566 "-d",
567 "--implied-dict",
568 action="store_true",
569 help="Implied Object mode",
570 )
571 parser.add_argument(
572 "-a",
573 "--address-query-friendly",
574 dest="aqf",
575 action="store_true",
576 help="Address Bar Query String Friendly mode",
577 )
580def create_parser():
581 from argparse import ArgumentParser
583 parser = ArgumentParser(description=__doc__, prog="jsonurl-py")
584 subtop = parser.add_subparsers(dest="subcmd", metavar="SUBCMD", required=True)
586 sub = subtop.add_parser("load", help="Parse JSONURL input and output JSON")
587 _add_common_args(sub)
588 sub.add_argument("--indent", type=int, help="Output indent spaces per level")
590 sub = subtop.add_parser("dump", help="Parse JSON input and output JSONURL")
591 _add_common_args(sub)
593 return parser
596def main(argv=None):
597 import json
599 common_keys = ["implied_list", "implied_dict", "aqf"]
600 opts = create_parser().parse_args(argv)
601 if opts.subcmd == "load":
602 load_opts = LoadOpts(**{k: getattr(opts, k) for k in common_keys})
603 input = sys.stdin.read().rstrip("\n")
604 data = loads(input, load_opts)
605 sys.stdout.write(json.dumps(data, indent=opts.indent) + "\n")
606 elif opts.subcmd == "dump":
607 dump_opts = DumpOpts(**{k: getattr(opts, k) for k in common_keys})
608 input = sys.stdin.read()
609 data = json.loads(input)
610 sys.stdout.write(dumps(data, dump_opts) + "\n")
611 else: # pragma: no cover
612 raise ValueError(f"Unhandled subcmd {opts.subcmd}")
615if __name__ == "__main__":
616 main()