Module pymake.util.utils

Source code
import sys, os
from datetime import datetime
from collections import defaultdict
import logging
import hashlib
import json
from string import Template

import numpy as np
import scipy as sp

from builtins import input
basestring = (str, bytes)


try:
    from terminal import colorize
    colored = lambda *x : str(colorize(x[0], x[1]))
except ImportError:
    lgg = logging.getLogger('root')
    lgg.debug("needs `terminal' module for colors printing")
    colored = lambda *x : x[0]


#from itertools import cycle
class Cycle(object):
    def __init__(self, seq):
        self.seq = seq
        self.it = np.nditer([seq])
    def next(self):
        return self.__next__()
    def __next__(self):
        try:
            return next(self.it).item()
        except StopIteration:
            self.it.reset()
            # Exception on nditer when seq is empty (infinite recursivity)
            return self.next()

    def reset(self):
        return self.it.reset()

    def copy(self):
        return self.__class__(self.seq)

def get_dest_opt_filled(parser):
    ''' Return the {dest} name of the options filled in the command line

        Parameters
        ----------
        parser : ArgParser

        Returns
        -------
        set of string
    '''

    opts_in = [opt for opt in sys.argv if opt.startswith('-') and opt not in ['-vv','-vvv']]
    opt2dest_dict = dict( (opt, act.dest) for act in parser._get_optional_actions() for opt in act.option_strings )
    dests_in = set([opt2dest_dict[opt] for opt in opts_in])
    return dests_in

# Assign new values to an array according to a map list
def set_v_to(a, map):
    new_a = a.copy()
    for k, c in dict(map).iteritems():
        new_a[a==k] = c

    return new_a

# Re-order the confusion matrix in order to map the cluster (columns) to the best (classes) according to purity
# One class by topics !
# It modify confu and map in-place
# Return: list of tuple that map topic -> class
import sys
sys.setrecursionlimit(10000)
def map_class2cluster_from_confusion(confu, map=None, cpt=0, minmax='max'):
    assert(confu.shape[0] == confu.shape[1])

    if minmax == 'max':
        obj_f = np.argmax
    else:
        obj_f = np.argmin

    if len(confu) -1  == cpt:
        # Recursive stop condition
        return sorted(map)
    if map is None:
        confu = confu.copy()
        map = [ (i,i) for i in range(len(confu)) ]
        #map = np.array(map)

    #K = confu.shape[0]
    #C = confu.shape[1]
    previous_assigned = [i[1] for i in map[:cpt]]
    c_l = obj_f(np.delete(confu[cpt], previous_assigned))
    # Get the right id of the class
    for j in sorted(previous_assigned):
        # rectify c_l depending on which class where already assigning
        if c_l >= j:
            c_l += 1
        else:
            break
    m_l = confu[cpt, c_l]
    # Get the right id of the topic
    c_c = obj_f(confu[cpt:,c_l]) + cpt
    m_c = confu[c_c, c_l]
    if m_c > m_l:
        # Move the line corresponding to the max for this class to the top
        confu[[cpt, c_c], :] = confu[[c_c, cpt], :]
        map[cpt], map[c_c] = map[c_c], map[cpt] # Doesn't work if it's a numpy array
        return map_class2cluster_from_confusion(confu, map, cpt)
    else:
        # Map topic 1 to class c_l and return confu - topic 1 and class c_l
        map[cpt] = (map[cpt][0], c_l)
        cpt += 1
        return map_class2cluster_from_confusion(confu, map, cpt)


def drop_zeros(a_list):
    #return [i for i in a_list if i>0]
    return filter(lambda x: x != 0, a_list)

def nxG(y):
    import networkx as nx
    if type(y) is np.ndarray:
        if (y == y.T).all():
            # Undirected Graph
            typeG = nx.Graph()
        else:
            # Directed Graph
            typeG = nx.DiGraph()
        G = nx.from_numpy_matrix(y, create_using=typeG)
    else:
        G = y
    return G

#
#
# Common/Utils
#
#

def retrieve_git_info():
    git_branch = subprocess.check_output(['git', 'rev-parse','--abbrev-ref' ,'HEAD']).strip().decode()
    git_hash = subprocess.check_output(['git', 'rev-parse', 'HEAD']).strip().decode()

    return {'git_branch':git_branch, 'git_hash':git_hash}

def hash_objects(obj, algo='md5'):
    """ Return a hash of the input """
    hashalgo = getattr(hashlib, algo)

    if isinstance(obj, (np.ndarray, list, tuple)):
        # array of int
        hashed_obj = hashalgo(np.asarray(obj).tobytes()).hexdigest()
    elif isinstance(obj, str):
        hashed_obj = hashalgo(obj.encode("utf-8")).hexdigest()
    elif isinstance(obj, dict):
        hashed_obj = hashalgo(json.dumps(obj, sort_keys=True).encode('utf8')).hexdigest()
    else:
        raise TypeError('Type of object unashable: %s' % (type(obj)))

    return hashed_obj

def ask_sure_exit(question):

    while True:
        a = input(question+' ').lower()
        if a in ('yes', 'y'):
            break
        elif a in ('no', 'n'):
            exit(2)
        else:
            print("Enter either [y|n]")

def make_path(f):
    bdir = os.path.dirname(f)
    if not os.path.exists(bdir) and bdir:
        os.makedirs(bdir)
    #fn = os.path.basename(bdir)
    #if not os.path.exists(fn) and fn:
    #    open(fn, 'a').close()
    return bdir



def Now():
    return  datetime.now()
def nowDiff(last):
    return datetime.now() - last
def ellapsed_time(text, since):
    current = datetime.now()
    delta = current - since
    print(text + ' : %s' % (delta))
    return current

def tail(filename, n_lines):
    _tail = []
    for i, line in enumerate(reverse_readline(filename)):
        if i == n_lines:
            break
        _tail.append(line)
    return _tail[::-1]
#import mmap
#def tail(filename, nlines):
#    """Returns last n lines from the filename. No exception handling"""
#    size = os.path.getsize(filename)
#    with open(filename, "rb") as f:
#        # for Windows the mmap parameters are different
#        fm = mmap.mmap(f.fileno(), 0, mmap.MAP_SHARED, mmap.PROT_READ)
#        try:
#            for i in range(size - 1, -1, -1):
#                if fm[i] == '\n':
#                    nlines -= 1
#                    if nlines == -1:
#                        break
#            return fm[i + 1 if i else 0:].splitlines()
#        finally:
#            pass
#

def reverse_readline(filename, buf_size=8192):
    """a generator that returns the lines of a file in reverse order"""
    with open(filename) as fh:
        segment = None
        offset = 0
        fh.seek(0, os.SEEK_END)
        file_size = remaining_size = fh.tell()
        while remaining_size > 0:
            offset = min(file_size, offset + buf_size)
            fh.seek(file_size - offset)
            buffer = fh.read(min(remaining_size, buf_size))
            remaining_size -= buf_size
            lines = buffer.split('\n')
            # the first line of the buffer is probably not a complete line so
            # we'll save it and append it to the last line of the next buffer
            # we read
            if segment is not None:
                # if the previous chunk starts right from the beginning of line
                # do not concact the segment to the last line of new chunk
                # instead, yield the segment first
                if buffer[-1] is not '\n':
                    lines[-1] += segment
                else:
                    yield segment
            segment = lines[0]
            for index in range(len(lines) - 1, 0, -1):
                if len(lines[index]):
                    yield lines[index]
        # Don't yield None if the file was empty
        if segment is not None:
            yield segment


class defaultdict2(defaultdict):
        def __missing__(self, key):
                if self.default_factory is None:
                        raise KeyError( key )
                else:
                        ret = self[key] = self.default_factory(key)
                        return ret

Functions

def Now()
Source code
def Now():
    return  datetime.now()
def ask_sure_exit(question)
Source code
def ask_sure_exit(question):

    while True:
        a = input(question+' ').lower()
        if a in ('yes', 'y'):
            break
        elif a in ('no', 'n'):
            exit(2)
        else:
            print("Enter either [y|n]")
def colored(*x)
Source code
colored = lambda *x : str(colorize(x[0], x[1]))
def drop_zeros(a_list)
Source code
def drop_zeros(a_list):
    #return [i for i in a_list if i>0]
    return filter(lambda x: x != 0, a_list)
def ellapsed_time(text, since)
Source code
def ellapsed_time(text, since):
    current = datetime.now()
    delta = current - since
    print(text + ' : %s' % (delta))
    return current
def get_dest_opt_filled(parser)

Return the {dest} name of the options filled in the command line

Parameters

parser : ArgParser
 

Returns

set of string
 
Source code
def get_dest_opt_filled(parser):
    ''' Return the {dest} name of the options filled in the command line

        Parameters
        ----------
        parser : ArgParser

        Returns
        -------
        set of string
    '''

    opts_in = [opt for opt in sys.argv if opt.startswith('-') and opt not in ['-vv','-vvv']]
    opt2dest_dict = dict( (opt, act.dest) for act in parser._get_optional_actions() for opt in act.option_strings )
    dests_in = set([opt2dest_dict[opt] for opt in opts_in])
    return dests_in
def hash_objects(obj, algo='md5')

Return a hash of the input

Source code
def hash_objects(obj, algo='md5'):
    """ Return a hash of the input """
    hashalgo = getattr(hashlib, algo)

    if isinstance(obj, (np.ndarray, list, tuple)):
        # array of int
        hashed_obj = hashalgo(np.asarray(obj).tobytes()).hexdigest()
    elif isinstance(obj, str):
        hashed_obj = hashalgo(obj.encode("utf-8")).hexdigest()
    elif isinstance(obj, dict):
        hashed_obj = hashalgo(json.dumps(obj, sort_keys=True).encode('utf8')).hexdigest()
    else:
        raise TypeError('Type of object unashable: %s' % (type(obj)))

    return hashed_obj
def make_path(f)
Source code
def make_path(f):
    bdir = os.path.dirname(f)
    if not os.path.exists(bdir) and bdir:
        os.makedirs(bdir)
    #fn = os.path.basename(bdir)
    #if not os.path.exists(fn) and fn:
    #    open(fn, 'a').close()
    return bdir
def map_class2cluster_from_confusion(confu, map=None, cpt=0, minmax='max')
Source code
def map_class2cluster_from_confusion(confu, map=None, cpt=0, minmax='max'):
    assert(confu.shape[0] == confu.shape[1])

    if minmax == 'max':
        obj_f = np.argmax
    else:
        obj_f = np.argmin

    if len(confu) -1  == cpt:
        # Recursive stop condition
        return sorted(map)
    if map is None:
        confu = confu.copy()
        map = [ (i,i) for i in range(len(confu)) ]
        #map = np.array(map)

    #K = confu.shape[0]
    #C = confu.shape[1]
    previous_assigned = [i[1] for i in map[:cpt]]
    c_l = obj_f(np.delete(confu[cpt], previous_assigned))
    # Get the right id of the class
    for j in sorted(previous_assigned):
        # rectify c_l depending on which class where already assigning
        if c_l >= j:
            c_l += 1
        else:
            break
    m_l = confu[cpt, c_l]
    # Get the right id of the topic
    c_c = obj_f(confu[cpt:,c_l]) + cpt
    m_c = confu[c_c, c_l]
    if m_c > m_l:
        # Move the line corresponding to the max for this class to the top
        confu[[cpt, c_c], :] = confu[[c_c, cpt], :]
        map[cpt], map[c_c] = map[c_c], map[cpt] # Doesn't work if it's a numpy array
        return map_class2cluster_from_confusion(confu, map, cpt)
    else:
        # Map topic 1 to class c_l and return confu - topic 1 and class c_l
        map[cpt] = (map[cpt][0], c_l)
        cpt += 1
        return map_class2cluster_from_confusion(confu, map, cpt)
def nowDiff(last)
Source code
def nowDiff(last):
    return datetime.now() - last
def nxG(y)
Source code
def nxG(y):
    import networkx as nx
    if type(y) is np.ndarray:
        if (y == y.T).all():
            # Undirected Graph
            typeG = nx.Graph()
        else:
            # Directed Graph
            typeG = nx.DiGraph()
        G = nx.from_numpy_matrix(y, create_using=typeG)
    else:
        G = y
    return G
def retrieve_git_info()
Source code
def retrieve_git_info():
    git_branch = subprocess.check_output(['git', 'rev-parse','--abbrev-ref' ,'HEAD']).strip().decode()
    git_hash = subprocess.check_output(['git', 'rev-parse', 'HEAD']).strip().decode()

    return {'git_branch':git_branch, 'git_hash':git_hash}
def reverse_readline(filename, buf_size=8192)

a generator that returns the lines of a file in reverse order

Source code
def reverse_readline(filename, buf_size=8192):
    """a generator that returns the lines of a file in reverse order"""
    with open(filename) as fh:
        segment = None
        offset = 0
        fh.seek(0, os.SEEK_END)
        file_size = remaining_size = fh.tell()
        while remaining_size > 0:
            offset = min(file_size, offset + buf_size)
            fh.seek(file_size - offset)
            buffer = fh.read(min(remaining_size, buf_size))
            remaining_size -= buf_size
            lines = buffer.split('\n')
            # the first line of the buffer is probably not a complete line so
            # we'll save it and append it to the last line of the next buffer
            # we read
            if segment is not None:
                # if the previous chunk starts right from the beginning of line
                # do not concact the segment to the last line of new chunk
                # instead, yield the segment first
                if buffer[-1] is not '\n':
                    lines[-1] += segment
                else:
                    yield segment
            segment = lines[0]
            for index in range(len(lines) - 1, 0, -1):
                if len(lines[index]):
                    yield lines[index]
        # Don't yield None if the file was empty
        if segment is not None:
            yield segment
def set_v_to(a, map)
Source code
def set_v_to(a, map):
    new_a = a.copy()
    for k, c in dict(map).iteritems():
        new_a[a==k] = c

    return new_a
def tail(filename, n_lines)
Source code
def tail(filename, n_lines):
    _tail = []
    for i, line in enumerate(reverse_readline(filename)):
        if i == n_lines:
            break
        _tail.append(line)
    return _tail[::-1]

Classes

class Cycle (seq)
Source code
class Cycle(object):
    def __init__(self, seq):
        self.seq = seq
        self.it = np.nditer([seq])
    def next(self):
        return self.__next__()
    def __next__(self):
        try:
            return next(self.it).item()
        except StopIteration:
            self.it.reset()
            # Exception on nditer when seq is empty (infinite recursivity)
            return self.next()

    def reset(self):
        return self.it.reset()

    def copy(self):
        return self.__class__(self.seq)

Methods

def copy(self)
Source code
def copy(self):
    return self.__class__(self.seq)
def next(self)
Source code
def next(self):
    return self.__next__()
def reset(self)
Source code
def reset(self):
    return self.it.reset()
class defaultdict2 (*args, **kwargs)

defaultdict(default_factory[, …]) –> dict with default factory

The default factory is called without arguments to produce a new value when a key is not present, in getitem only. A defaultdict compares equal to a dict with the same items. All remaining arguments are treated the same as if they were passed to the dict constructor, including keyword arguments.

Source code
class defaultdict2(defaultdict):
        def __missing__(self, key):
                if self.default_factory is None:
                        raise KeyError( key )
                else:
                        ret = self[key] = self.default_factory(key)
                        return ret

Ancestors

  • collections.defaultdict
  • builtins.dict