Source code for hypua2jamo.encoder

# -*- coding: utf-8 -*-
# hypua2jamo: Convert Hanyang-PUA code to unicode Hangul Jamo
# Copyright (C) 2012,2018-2019  mete0r
#
# This file is part of hypua2jamo.
#
# hypua2jamo is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# hypua2jamo is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with hypua2jamo.  If not, see <http://www.gnu.org/licenses/>.
from __future__ import absolute_import
from __future__ import print_function
from array import array
from codecs import IncrementalEncoder
from struct import Struct
import io
import os.path

try:
    from cffi import FFI
    from hypua2jamo._cffi import lib as _cffi
except ImportError:
    cffi_available = False
else:
    cffi_available = True

try:
    from . import _cython
except ImportError:
    cython_available = False
else:
    cython_available = True


_UNICODE_SIZE = array('u').itemsize
try:
    unichr
except NameError:
    # for Python 3
    unichr = chr

pua_groups_length_struct = Struct('<H')
group_entry_struct = Struct('<H')
mapping_struct = Struct('<HH')
jamo_length_struct = Struct('<H')
jamo_struct = Struct('<H')

ushort = Struct('<H')
ushort_pair = Struct('<2H')


def read_struct(fp, struct):
    data = fp.read(struct.size)
    if len(data) != struct.size:
        raise Exception()
    return struct.unpack(data)


def load_pack_fp(fp):
    n_groups = read_struct(fp, ushort)[0]

    groupheaders = [
        read_struct(fp, ushort_pair)
        for i in range(0, n_groups)
    ]

    groups = []
    targetidx = 0
    for source_start, grouplength in groupheaders:
        source_end = source_start + grouplength - 1
        group = []
        for i in range(0, grouplength):
            source = source_start + i
            targetlen = read_struct(fp, ushort)[0]
            group.append((source, targetidx, targetlen))
            targetidx += targetlen
        groups.append((source_start, source_end, group))

    p2j_mapping = []
    for pua_start, pua_end, group in groups:
        jamo_seq_list = []
        for mapping in group:
            source, targetidx, targetlen = mapping
            target_struct = Struct('<{}H'.format(targetlen))
            target = read_struct(fp, target_struct)
            target = ''.join(
                unichr(jamo_code) for jamo_code in target
            )
            jamo_seq_list.append(target)
        p2j_mapping.append(
            (pua_start, pua_end, tuple(jamo_seq_list))
        )
    return tuple(p2j_mapping)


def load_pack(filename):
    filename = os.path.join(os.path.dirname(__file__), filename)
    with io.open(filename, 'rb') as fp:
        return load_pack_fp(fp)


c2d_mapping = load_pack('c2d.bin')
p2jc_mapping = load_pack('p2jc.bin')
p2jd_mapping = load_pack('p2jd.bin')


def lookup(mapping, pua_code):
    for pua_start, pua_end, jamo_seq_list in mapping:
        if pua_start <= pua_code <= pua_end:
            return jamo_seq_list[pua_code - pua_start]
    return unichr(pua_code)


class BaseEncoderImplementation(
    IncrementalEncoder
):

    def encode(self, pua_string, final=False):
        mapping = self.mapping
        return ''.join(
            lookup(mapping, ord(pua_chr))
            for pua_chr in pua_string
        )

    def reset(self):
        pass

    def getstate(self):
        return 0

    def setstate(self, state):
        pass


[docs]class PUAComposedEncoder( BaseEncoderImplementation ): ''' PUA-to-Jamo(composed) encoder Pure python implementation. ''' mapping = p2jc_mapping
PUAComposedEncoderImplementationOnPurePython = PUAComposedEncoder
[docs]class PUADecomposedEncoder( BaseEncoderImplementation ): ''' PUA-to-Jamo(decomposed) encoder Pure python implementation. ''' mapping = p2jd_mapping
PUADecomposedEncoderImplementationOnPurePython = PUADecomposedEncoder
[docs]class JamoDecomposingEncoder( BaseEncoderImplementation ): ''' Jamo(composed)-to-Jamo(decomposed) encoder Pure python implementation. ''' mapping = c2d_mapping
JamoDecomposingEncoderImplementationOnPurePython = JamoDecomposingEncoder def encode_to_composed(pua_string): return ''.join( lookup(p2jc_mapping, ord(pua_chr)) for pua_chr in pua_string ) def encode_to_decomposed(pua_string): return ''.join( lookup(p2jd_mapping, ord(pua_chr)) for pua_chr in pua_string ) class BaseEncoderImplementationOnCFFI( IncrementalEncoder ): def reset(self): pass def getstate(self): return 0 def setstate(self, state): pass def encode(self, pua_string, final=False): ffi = self._ffi pua_array = array('u', pua_string) pua_ptr, pua_len = pua_array.buffer_info() pua_ptr = ffi.cast('void *', pua_ptr) jamo_size = self._calcsize(pua_ptr, pua_len) jamo_array = array('u', u' '*jamo_size) jamo_ptr = jamo_array.buffer_info()[0] jamo_ptr = ffi.cast('void *', jamo_ptr) jamo_len = self._encode(pua_ptr, pua_len, jamo_ptr) if jamo_size != jamo_len: raise Exception( 'p2jcx translation failed', jamo_size, jamo_len ) return jamo_array.tounicode() class PUAComposedEncoderImplementationOnCFFI( BaseEncoderImplementationOnCFFI ): ''' PUA-to-Jamo(composed) encoder CFFI implementation. ''' def __init__(self, errors='strict'): IncrementalEncoder.__init__(self, errors) if not cffi_available: raise NotImplementedError( 'hypua2jamo._cffi is not available' ) self._ffi = FFI() if _UNICODE_SIZE == 4: self._calcsize = _cffi.hypua_p2jc_ucs4_calcsize self._encode = _cffi.hypua_p2jc_ucs4_encode elif _UNICODE_SIZE == 2: self._calcsize = _cffi.hypua_p2jc_ucs2_calcsize self._encode = _cffi.hypua_p2jc_ucs2_encode else: raise AssertionError(_UNICODE_SIZE) class PUADecomposedEncoderImplementationOnCFFI( BaseEncoderImplementationOnCFFI ): ''' PUA-to-Jamo(decomposed) encoder CFFI implementation. ''' def __init__(self, errors='strict'): IncrementalEncoder.__init__(self, errors) if not cffi_available: raise NotImplementedError( 'hypua2jamo._cffi is not available' ) self._ffi = FFI() if _UNICODE_SIZE == 4: self._calcsize = _cffi.hypua_p2jd_ucs4_calcsize self._encode = _cffi.hypua_p2jd_ucs4_encode elif _UNICODE_SIZE == 2: self._calcsize = _cffi.hypua_p2jd_ucs2_calcsize self._encode = _cffi.hypua_p2jd_ucs2_encode else: raise AssertionError(_UNICODE_SIZE) class JamoDecomposingEncoderImplementationOnCFFI( BaseEncoderImplementationOnCFFI ): ''' PUA-to-Jamo(decomposed) encoder CFFI implementation. ''' def __init__(self, errors='strict'): IncrementalEncoder.__init__(self, errors) if not cffi_available: raise NotImplementedError( 'hypua2jamo._cffi is not available' ) self._ffi = FFI() if _UNICODE_SIZE == 4: self._calcsize = _cffi.hypua_c2d_ucs4_calcsize self._encode = _cffi.hypua_c2d_ucs4_encode elif _UNICODE_SIZE == 2: self._calcsize = _cffi.hypua_c2d_ucs2_calcsize self._encode = _cffi.hypua_c2d_ucs2_encode else: raise AssertionError(_UNICODE_SIZE) if cython_available: PUAComposedEncoder = _cython.PUAComposedEncoderImplementationOnCython PUADecomposedEncoder = _cython.PUADecomposedEncoderImplementationOnCython JamoDecomposingEncoder = _cython.JamoDecomposingEncoderImplementationOnCython # noqa elif cffi_available: PUAComposedEncoder = PUAComposedEncoderImplementationOnCFFI PUADecomposedEncoder = PUADecomposedEncoderImplementationOnCFFI JamoDecomposingEncoder = JamoDecomposingEncoderImplementationOnCFFI else: PUAComposedEncoder = PUAComposedEncoderImplementationOnPurePython PUADecomposedEncoder = PUADecomposedEncoderImplementationOnPurePython JamoDecomposingEncoder = JamoDecomposingEncoderImplementationOnPurePython