Simple sharding of delimited files to more sophisticated

A while back I came across the issue of having a large delimited file where I wanted to simply parallelize my python code across each line of the file. However, doing so all at once either took up too much RAM, or took too long.

So I made a simple Parallel Parser (ParPar) for sharding such delimited files and auto mapping a function across either each line of the file, or across each file of the shard.

It has served me sufficiently well, but I am quite sure it could be improved.

I am posting the relevant code here, but there may be snippets left out which can be obtained at the repo.

Issues:

  • The only main performance issue I have faced is that if whatever I am doing takes most / all cores, uses all ram, and then chews through swap, leaving everything in a laggy frozen state. It would be nice to know how to set a mem limit or something for the processes spawned such that if it hits the limit the processes are paused until some of the ongoing ones finish and release memory.

  • This creates shards by making many sub-directories with one file in the base. It may improve by being able to multi-shard… (shard a shard?) for when the first issue is encountered

Design Goals

I wanted to make a sharding library that was user friendly for the following:

  • taking a delimited file split it, either by line, or by columns (if columns have n-categories) and putting the subfiles in a directory accordingly
  • being able to restore a file sharded by the library (although not necessarily in the same order)
  • easily allow one to map a function to all of the sharded files, or to each line in the sharded files
  • allow one to have a progress bar for the status of the threaded function

Resources:

  • repository
  • pypi

Example Usage

from parpar import ParPar ppf = ParPar() ppf.shard_by_lines(     input_file="./short.csv",     output_dir="./short-lines",     number_of_lines=3, ) # 3 lines per sharded file ppf.shard(     input_file="./short.csv",     output_dir="./short-cols",     delim=",",     columns=[0,5], # assumes column 0 and column 5 are categorical )   def foo(file, a, b, **kwargs):     print(file, kwargs["dest"])  ppf.shard_file_apply(     './short-lines', # sharded directory     foo,     args=[1,2],     kwargs={         "output_shard_name": 'short-lines-alt',         "output_shard_loc": os.path.expanduser('~/Desktop')         # creates a new shard (with no files) with corresponding name at the specified loction         # in this case `/Desktop/short-lines-alt/<lines>/         # using kwargs["dest"] and kwargs["lock"] user can         # safely create a new shard processing each sharded file at a new location     }     ) 

Code

Notes:

  • the following functions are all inside a class called ParPar which takes no arguments for initialization.

  • This class houses two key variables:

    • _shared_current = Value('i', 0): shared memory counter
    • _shared_lock = Lock(): for race conditions, etc
  • this depends on a package called sil for the progress bar

subnote: while _shared_current could be written as Value('i', 0, lock=True) and then exclude the Lock, since the Lock is not just for the counter, I opted for excluding that option.

Sharding Files

def shard(self,         input_file: str,         output_dir: str,         columns: list,         delim: str = '\t',         newline: str = '\n',         sil_opts:dict = default_sil_options     )->list:         '''         Given the `input_file` and the column indicies, reads each line of the         `input_file` and dumps the content into a file in the directory:              output_dir/<columns[0]>/.../<columns[N]>/basename(<input_file>)          where <columns[i]> is the value found in the current line at position i         after being split by the specified `delim`.          WARNINGS:             everything in specified `output_dir` will be PERMANENTLY REMOVED.          Arguments:             input_file (str): full path to the file to be sharded.              output_dir (str): full path to a directory in which to dump. WARNING:                 everything in specified directory will be PERMANENTLY REMOVED.              columns (list): the columns (indices) across which to shard. The                 values found in these columns will be used as directory                 names (nested).              delim (str): How to split each field in a line of the file.                 Defaults to '\t'.              newline (str): How to split each line of the file. Defaults to '\n'.              sil_opts (dict): Defaults to {'length': 40, 'every': 1}. See the                 Sil package.          Returns:             sharded_files (list): list of all the sharded files         '''         lno = filelines(input_file) # number of lines in the input_file         sts = Sil(lno, **sil_opts)  # status indicator         basename = os.path.basename(input_file)         files_made = set({})         file_objs = {}          # delete current `output_dir` if it already exists         if os.path.isdir(output_dir):             shutil.rmtree(output_dir)          with open(input_file, 'r') as f:             for line in f:                 fields = linesplit(line, delim, newline)                 dest = dir_from_cols(fields, columns)                 files_made.add(dest)                 dir_path = os.path.join(output_dir, dest)                 if not os.path.isdir(dir_path):                     os.makedirs(dir_path)                     file_objs[dir_path] = open(os.path.join(dir_path, basename), 'a')                  o = file_objs[dir_path]                 o.write(linemend(fields, delim, newline))                 suffix = '\t{} files made'.format(len(files_made))                 sts.tick(suffix=suffix)          # close all made file objects         for fo in file_objs.values():             fo.close()          return self.shard_files(output_dir)      def shard_by_lines(self,         input_file:str,         output_dir:str,         number_of_lines:int,         sil_opts:dict = default_sil_options     )->list:         '''         Given the input_file and the columns, reads each line of the input_file         into output files in subdirectories labeled by the line numbers         `'start_stop'` based on the value `number_of_lines`:              output_dir/<n>_<n+number_of_lines>/basename(<input_file>)          WARNINGS:             everything in specified `output_dir` will be PERMANENTLY REMOVED.          Arguments:             input_file (str): full path to the file to be sharded.              output_dir (str): full path to a directory in which to dump. WARNING:                 everything in specified directory will be PERMANENTLY REMOVED.              number_of_lines (int): the number of lines which should be at most in                 each sharded file.              sil_opts (dict): Defaults to `{'length': 40, 'every': 1}`. See the                 Sil package.          Returns:             sharded_files (list): list of all the sharded files         '''         lno = filelines(input_file)         sts = Sil(lno, **sil_opts)         basename = os.path.basename(input_file)         files_made = set({})          file_objs = {}          if os.path.isdir(output_dir):             shutil.rmtree(output_dir)          with open(input_file, 'r') as f:             tally = 0             while tally < lno:                 if tally % number_of_lines == 0:                      dest = '{}_{}'.format(tally, tally+number_of_lines)                     files_made.add(dest)                     dir_path = os.path.join(output_dir, dest)                     if not os.path.isdir(dir_path):                         os.makedirs(dir_path)                         file_objs[dir_path] = open(os.path.join(dir_path, basename), 'a')                      o = file_objs[dir_path]                     for i in range(number_of_lines):                         line = f.readline()                         if not line:                             break                         o.write(line)                         sts.tick()                     tally += number_of_lines          for fo in file_objs.values():             fo.close()          return self.shard_files(output_dir) 

retrieving the sharded files:

def shard_files(self, directory:str)->list:         '''         Arguments:             directory (str): The top-most directory of a shared file.          Returns:             (list): The list of all files under directory (regardless of depth).         '''         file_paths = []         for path, subdirs, files in os.walk(directory):             if not files: continue             file_paths += [                 os.path.join(path, f) for f in files                 if 'DS_Store' not in f             ]         return file_paths 

restoring a shard:

 def assemble_shard(self, directory:str, delim:str='\t', newline:str='\n')->list:         '''         Arguments:             directory (str): The top-most directory of a shared file.          Keyword Arguments:             delim (str): Defaults to '\t'             newline (str): Defaults to '\n'          Returns:             (list): The list of lists, where each sub-list is a record found                 in one of the sharded leaf files after being split by delim.                 (i.e. all records are returned together)         '''         results = []         files = self.shard_files(directory)           with Pool(processes=os.cpu_count()) as pool:             sarg = [(f, delim, newline) for f in files]             lines = pool.starmap(readlines_split, sarg)          return list(itertools.chain.from_iterable(lines)) 

Shard across lines

def shard_line_apply(self,         directory:str,         function,         args:list=[],         kwargs:dict={},         processes:int=None,         sil_opts:dict=default_sil_options     ):         '''         Parallelizes `function` across each _line_ of the sharded files found as         the leaves of `directory`.          Notes:             - if `processes` is None, **all** of them will be used i.e.                 `os.cpu_count()`              - Several keywords for `kwargs` are reserved:                 1. lock:          a lock, if needed, to prevent race conditions.                 2. full_path:     the full path to the file which was opened.                 3. relative_path: the path under (directory) to the file which                                   was opened.                 4. output_shard_name: if provided will rename the shard                 5. output_shard_loc:  if provided will move the shard          Arguments:             directory (str): The top-most directory of a shared file.              function (func): The function which will be parallelized. This                 function **MUST** be defined so that it can be called as:                     `function(line, *args, **kwargs)`          Keyword Arguments:             args (list): arguments to be passed to `function` on each thread.              kwargs (dict): key-word arguments to be passed to `function` on each                 thread.              processes (int): The number of processes to spawn. Defaults to                 **ALL** availble cpu cores on the calling computer.              sil_opts (dict): Defaults to `{'length': 40, 'every': 1}`.                 See the Sil package.         Returns:             None         '''         if processes is None: processes = os.cpu_count()         sfiles = self.shard_files(directory)         records = self.sharded_records(sfiles)         sts = Sil(records, **sil_opts)          with Pool(processes=processes) as pool:             self._shared_current.value = -1              sargs = [                 (directory, file, sts, function, args, kwargs) for file in sfiles             ]             results = pool.starmap(self._shard_line_apply, sargs)              pool.close()             pool.join()             pool.terminate()         return results       def _shard_line_apply(self,         directory:str,         file:str,         status,         function,         args:list,         kwargs:dict     ):         # multiprocessing.Lock         kwargs['lock']            = self._shared_lock         # multiprocessing.Value (shared memory counter for progress bar)         kwargs['shared_current']  = self._shared_current         # an instance of Sil         kwargs['status']          = status          # full path to the current sharded file being processes         kwargs['full_path']       = file         kwargs['shard_name']      = shardname(file, directory)         kwargs['shard_dir']       = sharddir(file, directory)         kwargs['shard_loc']       = shardloc(file, directory)         kwargs['relative_path']   = os.path.join(*superdirs(file, directory))         kwargs['current_process'] = current_process().name           cp = kwargs['current_process']          force_overwrite = kwargs['force_overwrite'] if 'force_overwrite' in kwargs else True          os_name = kwargs['output_shard_name'] if 'output_shard_name' in kwargs else None         os_loc  = kwargs['output_shard_loc']  if 'output_shard_loc'  in kwargs else kwargs['shard_loc']          if os_name is not None:             dest = os.path.join(os_loc, os_name, os.path.dirname(kwargs['relative_path']))             kwargs['dest'] = dest             self._shared_lock.acquire()             try:                 if os.path.isdir(dest):                     if force_overwrite:                         shutil.rmtree(dest)                 else:                     os.makedirs(dest)             finally:                 self._shared_lock.release()           with open(file, 'r') as f:             for line in f:                 function(line, *args, **kwargs)                 self._shared_lock.acquire()                 try:                     self._shared_current.value += 1                     suffix = '\tprocess: {}'.format(cp)                     status.update(current=self._shared_current.value, suffix=suffix)                 finally:                     self._shared_lock.release() 

Shard across files

def shard_file_apply(self,         directory:str,         function,         args:list=[],         kwargs:dict={},         processes:int=None,         sil_opts:dict=default_sil_options     ):         '''         Parallelizes `function` across each of the sharded files found as         the leaves of `directory`.          Notes:             - if `processes` is None, **all** of them will be used i.e.                 `os.cpu_count()`              - Several keywords for `kwargs` are reserved:                 1. lock:          a lock, if needed, to prevent race conditions.                 2. full_path:     the full path to the file which was opened.                 3. relative_path: the path under (directory) to the file which                                   was opened.                 4. output_shard_name: if provided will rename the shard                 5. output_shard_loc:  if provided will move the shard          Arguments:             directory (str): The top-most directory of a shared file.              function (func): The function which will be parallelized. This                 function **MUST** be defined so that it can be called as:                     `function(line, *args, **kwargs)`          Keyword Arguments:             args (list): arguments to be passed to `function` on each thread.              kwargs (dict): key-word arguments to be passed to `function` on each                 thread.              processes (int): The number of processes to spawn. Defaults to                 **ALL** availble cpu cores on the calling computer.              sil_opts (dict): Defaults to `{'length': 40, 'every': 1}`.                 See the Sil package.         Returns:             None         '''         if processes is None: processes = os.cpu_count()         sfiles = self.shard_files(directory)         records = self.sharded_records(sfiles)         sts = Sil(records, **sil_opts)          with Pool(processes=processes) as pool:             self._shared_current.value = -1              sargs = [                 (directory, file, sts, function, args, kwargs) for file in sfiles             ]             pool.starmap(self._shard_file_apply, sargs)              pool.close()             pool.join()             pool.terminate()       def _shard_file_apply(self,         directory:str,         file:str,         status,         function,         args:list,         kwargs:dict     ):         # multiprocessing.Lock         kwargs['lock']            = self._shared_lock         # multiprocessing.Value (shared memory counter for progress bar)         kwargs['shared_current']  = self._shared_current         kwargs['status']          = status         kwargs['full_path']       = file         kwargs['shard_name']      = shardname(file, directory)         kwargs['shard_dir']       = sharddir(file, directory)         kwargs['shard_loc']       = shardloc(file, directory)         kwargs['relative_path']   = os.path.join(*superdirs(file, directory))         kwargs['current_process'] = current_process().name          force_overwrite = kwargs['force_overwrite'] if 'force_overwrite' in kwargs else True           os_name = kwargs['output_shard_name'] if 'output_shard_name' in kwargs else None         os_loc = kwargs['output_shard_loc'] if 'output_shard_loc' in kwargs else kwargs['shard_loc']          if os_name is not None:             dest = os.path.join(os_loc, os_name, os.path.dirname(kwargs['relative_path']))             kwargs['dest'] = dest             self._shared_lock.acquire()             try:                 if os.path.isdir(dest):                     if force_overwrite:                         shutil.rmtree(dest)                 else:                     os.makedirs(dest)             finally:                 self._shared_lock.release()           function(file, *args, **kwargs)  

Autosuggest at scale – trie sharding

While reading on the design for autosuggest implementation on large scale systems (like google), I’m able to understand the usage of trie and how top “n” terms are stored at each node to quickly retrieve the list. However, I’m not able to get my head around the logic of efficient way of “sharding” the trie in a distributed system. Sharding on the first letter/first two letters isn’t obviously a neat solution and I’ve read somewhere else on using a hash of the term – but that requires an aggregation server that pulls up results from all the servers and aggregate them. Doesn’t sound like an efficient thing to do at “web” scale.

Would the ideal approach be something like calculating the actual density and breaking up the tree accordingly (sort of application managed shard/partitioning ?) – but think it would incur lots of maintenance and re-balancing?

Can someone advice or point me to any reference?

A related question to this – what if I wanted to store top “n” results for different time windows. Like, top 10 in last day, top 10 in last month, top 10 of all time. What’s the best solution? – Store the pointer list at the tree node for each time window? What if the set of windows are not finite?

Thanks

Note: Earlier I posted the same question on stack overflow, but after reading some guidelines, thought it’s appropriate to open question here instead.