123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606 |
- #!/usr/bin/env python
- # License: GPL v3 Copyright: 2018, Kovid Goyal <kovid at kovidgoyal.net>
- import codecs
- import os
- import sys
- from base64 import standard_b64encode
- from collections import defaultdict, deque
- from contextlib import suppress
- from enum import IntEnum
- from itertools import count
- from typing import Any, Callable, ClassVar, DefaultDict, Deque, Dict, Generic, Iterator, List, Optional, Sequence, Tuple, Type, TypeVar, Union, cast
- from kitty.conf.utils import positive_float, positive_int
- from kitty.fast_data_types import create_canvas
- from kitty.typing import GRT_C, CompletedProcess, GRT_a, GRT_d, GRT_f, GRT_m, GRT_o, GRT_t, HandlerType
- from kitty.utils import ScreenSize, fit_image, which
- from .operations import cursor
- try:
- fsenc = sys.getfilesystemencoding() or 'utf-8'
- codecs.lookup(fsenc)
- except Exception:
- fsenc = 'utf-8'
- class Dispose(IntEnum):
- undefined = 0
- none = 1
- background = 2
- previous = 3
- class Frame:
- gap: int # milliseconds
- canvas_width: int
- canvas_height: int
- width: int
- height: int
- index: int
- xdpi: float
- ydpi: float
- canvas_x: int
- canvas_y: int
- mode: str
- needs_blend: bool
- dimensions_swapped: bool
- dispose: Dispose
- path: str = ''
- def __init__(self, identify_data: Union['Frame', Dict[str, str]]):
- if isinstance(identify_data, Frame):
- for k in Frame.__annotations__:
- setattr(self, k, getattr(identify_data, k))
- else:
- self.gap = max(0, int(identify_data['gap']) * 10)
- sz, pos = identify_data['canvas'].split('+', 1)
- self.canvas_width, self.canvas_height = map(positive_int, sz.split('x', 1))
- self.canvas_x, self.canvas_y = map(int, pos.split('+', 1))
- self.width, self.height = map(positive_int, identify_data['size'].split('x', 1))
- self.xdpi, self.ydpi = map(positive_float, identify_data['dpi'].split('x', 1))
- self.index = positive_int(identify_data['index'])
- q = identify_data['transparency'].lower()
- self.mode = 'rgba' if q in ('blend', 'true') else 'rgb'
- self.needs_blend = q == 'blend'
- self.dispose = getattr(Dispose, identify_data['dispose'].lower())
- self.dimensions_swapped = identify_data.get('orientation') in ('5', '6', '7', '8')
- if self.dimensions_swapped:
- self.canvas_width, self.canvas_height = self.canvas_height, self.canvas_width
- self.width, self.height = self.height, self.width
- def __repr__(self) -> str:
- canvas = f'{self.canvas_width}x{self.canvas_height}:{self.canvas_x}+{self.canvas_y}'
- geom = f'{self.width}x{self.height}'
- return f'Frame(index={self.index}, gap={self.gap}, geom={geom}, canvas={canvas}, dispose={self.dispose.name})'
- class ImageData:
- def __init__(self, fmt: str, width: int, height: int, mode: str, frames: List[Frame]):
- self.width, self.height, self.fmt, self.mode = width, height, fmt, mode
- self.transmit_fmt: GRT_f = (24 if self.mode == 'rgb' else 32)
- self.frames = frames
- def __len__(self) -> int:
- return len(self.frames)
- def __iter__(self) -> Iterator[Frame]:
- yield from self.frames
- def __repr__(self) -> str:
- frames = '\n '.join(map(repr, self.frames))
- return f'Image(fmt={self.fmt}, mode={self.mode},\n {frames}\n)'
- class OpenFailed(ValueError):
- def __init__(self, path: str, message: str):
- ValueError.__init__(
- self, f'Failed to open image: {path} with error: {message}'
- )
- self.path = path
- class ConvertFailed(ValueError):
- def __init__(self, path: str, message: str):
- ValueError.__init__(
- self, f'Failed to convert image: {path} with error: {message}'
- )
- self.path = path
- class NoImageMagick(Exception):
- pass
- class OutdatedImageMagick(ValueError):
- def __init__(self, detailed_error: str):
- super().__init__('ImageMagick on this system is too old ImageMagick 7+ required which was first released in 2016')
- self.detailed_error = detailed_error
- last_imagemagick_cmd: Sequence[str] = ()
- def run_imagemagick(path: str, cmd: Sequence[str], keep_stdout: bool = True) -> 'CompletedProcess[bytes]':
- global last_imagemagick_cmd
- import subprocess
- last_imagemagick_cmd = cmd
- try:
- p = subprocess.run(cmd, stdout=subprocess.PIPE if keep_stdout else subprocess.DEVNULL, stderr=subprocess.PIPE)
- except FileNotFoundError:
- raise NoImageMagick('ImageMagick is required to process images')
- if p.returncode != 0:
- raise OpenFailed(path, p.stderr.decode('utf-8'))
- return p
- def identify(path: str) -> ImageData:
- import json
- q = (
- '{"fmt":"%m","canvas":"%g","transparency":"%A","gap":"%T","index":"%p","size":"%wx%h",'
- '"dpi":"%xx%y","dispose":"%D","orientation":"%[EXIF:Orientation]"},'
- )
- exe = which('magick')
- if exe:
- cmd = [exe, 'identify']
- else:
- cmd = ['identify']
- p = run_imagemagick(path, cmd + ['-format', q, '--', path])
- raw = p.stdout.rstrip(b',')
- data = json.loads(b'[' + raw + b']')
- first = data[0]
- frames = list(map(Frame, data))
- image_fmt = first['fmt'].lower()
- if image_fmt == 'gif' and not any(f.gap > 0 for f in frames):
- # Some broken GIF images have all zero gaps, browsers with their usual
- # idiot ideas render these with a default 100ms gap https://bugzilla.mozilla.org/show_bug.cgi?id=125137
- # Browsers actually force a 100ms gap at any zero gap frame, but that
- # just means it is impossible to deliberately use zero gap frames for
- # sophisticated blending, so we dont do that.
- for f in frames:
- f.gap = 100
- mode = 'rgb'
- for f in frames:
- if f.mode == 'rgba':
- mode = 'rgba'
- break
- return ImageData(image_fmt, frames[0].canvas_width, frames[0].canvas_height, mode, frames)
- class RenderedImage(ImageData):
- def __init__(self, fmt: str, width: int, height: int, mode: str):
- super().__init__(fmt, width, height, mode, [])
- def render_image(
- path: str, output_prefix: str,
- m: ImageData,
- available_width: int, available_height: int,
- scale_up: bool,
- only_first_frame: bool = False,
- remove_alpha: str = '',
- flip: bool = False, flop: bool = False,
- ) -> RenderedImage:
- import tempfile
- has_multiple_frames = len(m) > 1
- get_multiple_frames = has_multiple_frames and not only_first_frame
- exe = which('magick')
- if exe:
- cmd = [exe, 'convert']
- else:
- exe = which('convert')
- if exe is None:
- raise OSError('Failed to find the ImageMagick convert executable, make sure it is present in PATH')
- cmd = [exe]
- if remove_alpha:
- cmd += ['-background', remove_alpha, '-alpha', 'remove']
- else:
- cmd += ['-background', 'none']
- if flip:
- cmd.append('-flip')
- if flop:
- cmd.append('-flop')
- cmd += ['--', path]
- if only_first_frame and has_multiple_frames:
- cmd[-1] += '[0]'
- cmd.append('-auto-orient')
- scaled = False
- width, height = m.width, m.height
- if scale_up:
- if width < available_width:
- r = available_width / width
- width, height = available_width, int(height * r)
- scaled = True
- if scaled or width > available_width or height > available_height:
- width, height = fit_image(width, height, available_width, available_height)
- resize_cmd = ['-resize', f'{width}x{height}!']
- if get_multiple_frames:
- # we have to coalesce, resize and de-coalesce all frames
- resize_cmd = ['-coalesce'] + resize_cmd + ['-deconstruct']
- cmd += resize_cmd
- cmd += ['-depth', '8', '-set', 'filename:f', '%w-%h-%g-%p']
- ans = RenderedImage(m.fmt, width, height, m.mode)
- if only_first_frame:
- ans.frames = [Frame(m.frames[0])]
- else:
- ans.frames = list(map(Frame, m.frames))
- bytes_per_pixel = 3 if m.mode == 'rgb' else 4
- def check_resize(frame: Frame) -> None:
- # ImageMagick sometimes generates RGBA images smaller than the specified
- # size. See https://github.com/kovidgoyal/kitty/issues/276 for examples
- sz = os.path.getsize(frame.path)
- expected_size = bytes_per_pixel * frame.width * frame.height
- if sz < expected_size:
- missing = expected_size - sz
- if missing % (bytes_per_pixel * width) != 0:
- raise ConvertFailed(
- path, 'ImageMagick failed to convert {} correctly,'
- ' it generated {} < {} of data (w={}, h={}, bpp={})'.format(
- path, sz, expected_size, frame.width, frame.height, bytes_per_pixel))
- frame.height -= missing // (bytes_per_pixel * frame.width)
- if frame.index == 0:
- ans.height = frame.height
- ans.width = frame.width
- with tempfile.TemporaryDirectory(dir=os.path.dirname(output_prefix)) as tdir:
- output_template = os.path.join(tdir, f'im-%[filename:f].{m.mode}')
- if get_multiple_frames:
- cmd.append('+adjoin')
- run_imagemagick(path, cmd + [output_template])
- unseen = {x.index for x in m}
- for x in os.listdir(tdir):
- try:
- parts = x.split('.', 1)[0].split('-')
- index = int(parts[-1])
- unseen.discard(index)
- f = ans.frames[index]
- f.width, f.height = map(positive_int, parts[1:3])
- sz, pos = parts[3].split('+', 1)
- f.canvas_width, f.canvas_height = map(positive_int, sz.split('x', 1))
- f.canvas_x, f.canvas_y = map(int, pos.split('+', 1))
- except Exception:
- raise OutdatedImageMagick(f'Unexpected output filename: {x!r} produced by ImageMagick command: {last_imagemagick_cmd}')
- f.path = output_prefix + f'-{index}.{m.mode}'
- os.rename(os.path.join(tdir, x), f.path)
- check_resize(f)
- f = ans.frames[0]
- if f.width != ans.width or f.height != ans.height:
- with open(f.path, 'r+b') as ff:
- data = ff.read()
- ff.seek(0)
- ff.truncate()
- cd = create_canvas(data, f.width, f.canvas_x, f.canvas_y, ans.width, ans.height, 3 if ans.mode == 'rgb' else 4)
- ff.write(cd)
- if get_multiple_frames:
- if unseen:
- raise ConvertFailed(path, f'Failed to render {len(unseen)} out of {len(m)} frames of animation')
- elif not ans.frames[0].path:
- raise ConvertFailed(path, 'Failed to render image')
- return ans
- def render_as_single_image(
- path: str, m: ImageData,
- available_width: int, available_height: int,
- scale_up: bool,
- tdir: Optional[str] = None,
- remove_alpha: str = '', flip: bool = False, flop: bool = False,
- ) -> Tuple[str, int, int]:
- import tempfile
- fd, output = tempfile.mkstemp(prefix='tty-graphics-protocol-', suffix=f'.{m.mode}', dir=tdir)
- os.close(fd)
- result = render_image(
- path, output, m, available_width, available_height, scale_up,
- only_first_frame=True, remove_alpha=remove_alpha, flip=flip, flop=flop)
- os.rename(result.frames[0].path, output)
- return output, result.width, result.height
- def can_display_images() -> bool:
- ans: Optional[bool] = getattr(can_display_images, 'ans', None)
- if ans is None:
- ans = which('convert') is not None
- setattr(can_display_images, 'ans', ans)
- return ans
- ImageKey = Tuple[str, int, int]
- SentImageKey = Tuple[int, int, int]
- T = TypeVar('T')
- class Alias(Generic[T]):
- currently_processing: ClassVar[str] = ''
- def __init__(self, defval: T) -> None:
- self.name = ''
- self.defval = defval
- def __get__(self, instance: Optional['GraphicsCommand'], cls: Optional[Type['GraphicsCommand']] = None) -> T:
- if instance is None:
- return self.defval
- return cast(T, instance._actual_values.get(self.name, self.defval))
- def __set__(self, instance: 'GraphicsCommand', val: T) -> None:
- if val == self.defval:
- instance._actual_values.pop(self.name, None)
- else:
- instance._actual_values[self.name] = val
- def __set_name__(self, owner: Type['GraphicsCommand'], name: str) -> None:
- if len(name) == 1:
- Alias.currently_processing = name
- self.name = Alias.currently_processing
- class GraphicsCommand:
- a = action = Alias(cast(GRT_a, 't'))
- q = quiet = Alias(0)
- f = format = Alias(32)
- t = transmission_type = Alias(cast(GRT_t, 'd'))
- s = data_width = animation_state = Alias(0)
- v = data_height = loop_count = Alias(0)
- S = data_size = Alias(0)
- O = data_offset = Alias(0) # noqa
- i = image_id = Alias(0)
- I = image_number = Alias(0) # noqa
- p = placement_id = Alias(0)
- o = compression = Alias(cast(Optional[GRT_o], None))
- m = more = Alias(cast(GRT_m, 0))
- x = left_edge = Alias(0)
- y = top_edge = Alias(0)
- w = width = Alias(0)
- h = height = Alias(0)
- X = cell_x_offset = blend_mode = Alias(0)
- Y = cell_y_offset = bgcolor = Alias(0)
- c = columns = other_frame_number = dest_frame = Alias(0)
- r = rows = frame_number = source_frame = Alias(0)
- z = z_index = gap = Alias(0)
- C = cursor_movement = compose_mode = Alias(cast(GRT_C, 0))
- d = delete_action = Alias(cast(GRT_d, 'a'))
- def __init__(self) -> None:
- self._actual_values: Dict[str, Any] = {}
- def __repr__(self) -> str:
- return self.serialize().decode('ascii').replace('\033', '^]')
- def clone(self) -> 'GraphicsCommand':
- ans = GraphicsCommand()
- ans._actual_values = self._actual_values.copy()
- return ans
- def serialize(self, payload: Union[bytes, str] = b'') -> bytes:
- items = []
- for k, val in self._actual_values.items():
- items.append(f'{k}={val}')
- ans: List[bytes] = []
- w = ans.append
- w(b'\033_G')
- w(','.join(items).encode('ascii'))
- if payload:
- w(b';')
- if isinstance(payload, str):
- payload = standard_b64encode(payload.encode('utf-8'))
- w(payload)
- w(b'\033\\')
- return b''.join(ans)
- def clear(self) -> None:
- self._actual_values = {}
- def iter_transmission_chunks(self, data: Optional[bytes] = None, level: int = -1, compression_threshold: int = 1024) -> Iterator[bytes]:
- if data is None:
- yield self.serialize()
- return
- gc = self.clone()
- gc.S = len(data)
- if level and len(data) >= compression_threshold:
- import zlib
- compressed = zlib.compress(data, level)
- if len(compressed) < len(data):
- gc.o = 'z'
- data = compressed
- gc.S = len(data)
- data = standard_b64encode(data)
- while data:
- chunk, data = data[:4096], data[4096:]
- gc.m = 1 if data else 0
- yield gc.serialize(chunk)
- gc.clear()
- class Placement:
- cmd: GraphicsCommand
- x: int = 0
- y: int = 0
- def __init__(self, cmd: GraphicsCommand, x: int = 0, y: int = 0):
- self.cmd = cmd
- self.x = x
- self.y = y
- class ImageManager:
- def __init__(self, handler: HandlerType):
- self.image_id_counter = count()
- self.handler = handler
- self.filesystem_ok: Optional[bool] = None
- self.image_data: Dict[str, ImageData] = {}
- self.failed_images: Dict[str, Exception] = {}
- self.converted_images: Dict[ImageKey, ImageKey] = {}
- self.sent_images: Dict[ImageKey, int] = {}
- self.image_id_to_image_data: Dict[int, ImageData] = {}
- self.image_id_to_converted_data: Dict[int, ImageKey] = {}
- self.transmission_status: Dict[int, Union[str, int]] = {}
- self.placements_in_flight: DefaultDict[int, Deque[Placement]] = defaultdict(deque)
- self.update_image_placement_for_resend: Optional[Callable[[int, Placement], bool]]
- @property
- def next_image_id(self) -> int:
- return next(self.image_id_counter) + 2
- @property
- def screen_size(self) -> ScreenSize:
- return self.handler.screen_size
- def __enter__(self) -> None:
- import tempfile
- self.tdir = tempfile.mkdtemp(prefix='kitten-images-')
- with tempfile.NamedTemporaryFile(dir=self.tdir, delete=False) as f:
- f.write(b'abcd')
- gc = GraphicsCommand()
- gc.a = 'q'
- gc.s = gc.v = gc.i = 1
- gc.t = 'f'
- self.handler.cmd.gr_command(gc, standard_b64encode(f.name.encode(fsenc)))
- def __exit__(self, *a: Any) -> None:
- import shutil
- shutil.rmtree(self.tdir, ignore_errors=True)
- self.handler.cmd.clear_images_on_screen(delete_data=True)
- self.delete_all_sent_images()
- del self.handler
- def delete_all_sent_images(self) -> None:
- gc = GraphicsCommand()
- gc.a = 'd'
- for img_id in self.transmission_status:
- gc.i = img_id
- self.handler.cmd.gr_command(gc)
- self.transmission_status.clear()
- def handle_response(self, apc: str) -> None:
- cdata, payload = apc[1:].partition(';')[::2]
- control = {}
- for x in cdata.split(','):
- k, v = x.partition('=')[::2]
- control[k] = v
- try:
- image_id = int(control.get('i', '0'))
- except Exception:
- image_id = 0
- if image_id == 1:
- self.filesystem_ok = payload == 'OK'
- return
- if not image_id:
- return
- if not self.transmission_status.get(image_id):
- self.transmission_status[image_id] = payload
- else:
- in_flight = self.placements_in_flight[image_id]
- if in_flight:
- pl = in_flight.popleft()
- if payload.startswith('ENOENT:'):
- with suppress(Exception):
- self.resend_image(image_id, pl)
- if not in_flight:
- self.placements_in_flight.pop(image_id, None)
- def resend_image(self, image_id: int, pl: Placement) -> None:
- if self.update_image_placement_for_resend is not None and not self.update_image_placement_for_resend(image_id, pl):
- return
- image_data = self.image_id_to_image_data[image_id]
- skey = self.image_id_to_converted_data[image_id]
- self.transmit_image(image_data, image_id, *skey)
- with cursor(self.handler.write):
- self.handler.cmd.set_cursor_position(pl.x, pl.y)
- self.handler.cmd.gr_command(pl.cmd)
- def send_image(self, path: str, max_cols: Optional[int] = None, max_rows: Optional[int] = None, scale_up: bool = False) -> SentImageKey:
- path = os.path.abspath(path)
- if path in self.failed_images:
- raise self.failed_images[path]
- if path not in self.image_data:
- try:
- self.image_data[path] = identify(path)
- except Exception as e:
- self.failed_images[path] = e
- raise
- m = self.image_data[path]
- ss = self.screen_size
- if max_cols is None:
- max_cols = ss.cols
- if max_rows is None:
- max_rows = ss.rows
- available_width = max_cols * ss.cell_width
- available_height = max_rows * ss.cell_height
- key = path, available_width, available_height
- skey = self.converted_images.get(key)
- if skey is None:
- try:
- self.converted_images[key] = skey = self.convert_image(path, available_width, available_height, m, scale_up)
- except Exception as e:
- self.failed_images[path] = e
- raise
- final_width, final_height = skey[1:]
- if final_width == 0:
- return 0, 0, 0
- image_id = self.sent_images.get(skey)
- if image_id is None:
- image_id = self.next_image_id
- self.transmit_image(m, image_id, *skey)
- self.sent_images[skey] = image_id
- self.image_id_to_converted_data[image_id] = skey
- self.image_id_to_image_data[image_id] = m
- return image_id, skey[1], skey[2]
- def hide_image(self, image_id: int) -> None:
- gc = GraphicsCommand()
- gc.a = 'd'
- gc.i = image_id
- self.handler.cmd.gr_command(gc)
- def show_image(self, image_id: int, x: int, y: int, src_rect: Optional[Tuple[int, int, int, int]] = None) -> None:
- gc = GraphicsCommand()
- gc.a = 'p'
- gc.i = image_id
- if src_rect is not None:
- gc.x, gc.y, gc.w, gc.h = map(int, src_rect)
- self.placements_in_flight[image_id].append(Placement(gc, x, y))
- with cursor(self.handler.write):
- self.handler.cmd.set_cursor_position(x, y)
- self.handler.cmd.gr_command(gc)
- def convert_image(self, path: str, available_width: int, available_height: int, image_data: ImageData, scale_up: bool = False) -> ImageKey:
- rgba_path, width, height = render_as_single_image(path, image_data, available_width, available_height, scale_up, tdir=self.tdir)
- return rgba_path, width, height
- def transmit_image(self, image_data: ImageData, image_id: int, rgba_path: str, width: int, height: int) -> int:
- self.transmission_status[image_id] = 0
- gc = GraphicsCommand()
- gc.a = 't'
- gc.f = image_data.transmit_fmt
- gc.s = width
- gc.v = height
- gc.i = image_id
- if self.filesystem_ok:
- gc.t = 'f'
- self.handler.cmd.gr_command(
- gc, standard_b64encode(rgba_path.encode(fsenc)))
- else:
- import zlib
- with open(rgba_path, 'rb') as f:
- data = f.read()
- gc.S = len(data)
- data = zlib.compress(data)
- gc.o = 'z'
- data = standard_b64encode(data)
- while data:
- chunk, data = data[:4096], data[4096:]
- gc.m = 1 if data else 0
- self.handler.cmd.gr_command(gc, chunk)
- gc.clear()
- return image_id
|