用Python统计词频

Python012

用Python统计词频,第1张

def statistics(astr):

# astr.replace("\n", "")

slist = list(astr.split("\t"))

alist = []

[alist.append(i) for i in slist if i not in alist]

alist[-1] = alist[-1].replace("\n", "")

return alist

if __name__ == "__main__":

code_doc = {}

with open("test_data.txt", "r", encoding='utf-8') as fs:

for ln in fs.readlines():

l = statistics(ln)

for t in l:

if t not in code_doc:

code_doc.setdefault(t, 1)

else:

code_doc[t] += 1

for keys in code_doc.keys():

print(keys + ' ' + str(code_doc[keys]))

简单版:

#!/usr/bin/env python3

import re

import jieba

from collections import Counter

fname = 'counttest.txt'

with open(fname) as f:

    s = f.read()

pattern = re.compile(r'[a-zA-Z]+\-?[a-zA-Z]*')

english_words = Counter(pattern.findall(s))

other_words = Counter(jieba.cut(pattern.sub('', s)))

print('\n英文单词统计结果:\n'+'-'*17)

print('\n'.join(['{}: {}'.format(i, j) for i, j in english_words.most_common()]))

print('\n中文及符号统计结果:\n'+'-'*19)

print('\n'.join(['{}: {}'.format(i, j) for i, j in other_words.most_common()]))

复杂版:

#!/usr/bin/env python

# -*- coding: utf-8 -*-

from __future__ import print_function, division, unicode_literals

import sys, re, time, os, jieba

from collections import Counter

from datetime import datetime

class WordCounter(object):

    def __init__(self, from_file, to_file=None, coding=None, jieba_cut=None):

        '''根据设定的进程数,把文件from_file分割成大小基本相同,数量等同与进程数的文件段,

        来读取并统计词频,然后把结果写入to_file中,当其为None时直接打印在终端或命令行上。

        Args:

        @from_file 要读取的文件

        @to_file 结果要写入的文件

        @coding 文件的编码方式,默认为采用chardet模块读取前1万个字符来自动判断

        @jieba_cut 是否启用结巴分词,默认为None

        

        How to use:

        w = WordCounter('a.txt', 'b.txt')

        w.run()        

        '''

        if not os.path.isfile(from_file):

            raise Exception('No such file: 文件不存在')

        self.f1 = from_file

        self.filesize = os.path.getsize(from_file)

        self.f2 = to_file

        if coding is None:

            try:

                import chardet

            except ImportError:

                os.system('pip install chardet')

                print('-'*70)

                import chardet

            with open(from_file, 'rb') as f:    

                coding = chardet.detect(f.read(10000))['encoding']            

        self.coding = coding

        self._c = [Counter(), Counter()]

        self.jieba = False

        if jieba_cut is not None:                  

            self.jieba = True

            

    def run(self):

        start = time.time()

        if 1:

            self.count_direct(self.f1)          

        if self.f2 not in ['None', 'Null', 'none', 'null', None]:

            with open(self.f2, 'wb') as f:

                f.write(self.result.encode(self.coding))

        else:

            print('\nEnglish words:\n' + '-'*15)

            print(self.result)

        cost = '{:.1f}'.format(time.time()-start)

        size = humansize(self.filesize)

        tip = '\nFile size: {}. Cost time: {} seconds'     

#        print(tip.format(size, cost))

        self.cost = cost + 's'

    def count_direct(self, from_file):

        '''直接把文件内容全部读进内存并统计词频'''

        start = time.time()

        with open(from_file, 'rb') as f:

            line = f.read()

        for i in range(len(self._c)):

            self._c[i].update(self.parse(line)[i])  

                 

                    

    def parse(self, line):  #解析读取的文件流

        text = line.decode(self.coding)

        text = re.sub(r'\-\n', '', text) #考虑同一个单词被分割成两段的情况,删除行末的-号

        pattern = re.compile(r'[a-zA-Z]+\-?[a-zA-Z]*') #判断是否为英文单词

        english_words = pattern.findall(text)

        rest = pattern.sub('', text)        

        ex = Counter(jieba.cut(rest)) if self.jieba else Counter(text)

        return Counter(english_words), ex

        

    def flush(self):  #清空统计结果

        self._c = [Counter(), Counter()]

    @property

    def counter(self):  #返回统计结果的Counter类       

        return self._c

                    

    @property

    def result(self):  #返回统计结果的字符串型式,等同于要写入结果文件的内容

        ss = []

        for c in self._c:

            ss.append(['{}: {}'.format(i, j) for i, j in c.most_common()])

        

        tip = '\n\n中文及符号统计结果:\n'+'-'*15+'\n'

        return tip.join(['\n'.join(s) for s in ss])

def humansize(size):

    """将文件的大小转成带单位的形式

    >>> humansize(1024) == '1 KB'

    True

    >>> humansize(1000) == '1000 B'

    True

    >>> humansize(1024*1024) == '1 M'

    True

    >>> humansize(1024*1024*1024*2) == '2 G'

    True

    """

    units = ['B', 'KB', 'M', 'G', 'T']    

    for unit in units:

        if size < 1024:

            break

        size = size // 1024

    return '{} {}'.format(size, unit)

        

def main():

    if len(sys.argv) < 2:

        print('Usage: python wordcounter.py from_file to_file')

        exit(1)

    from_file, to_file = sys.argv[1:3]

    args = {'coding' : None, 'jieba_cut': 1}

    for i in sys.argv:

        for k in args:

            if re.search(r'{}=(.+)'.format(k), i):

                args[k] = re.findall(r'{}=(.+)'.format(k), i)[0]

    w = WordCounter(from_file, to_file, **args)

    w.run()

    

if __name__ == '__main__':

    import doctest

    doctest.testmod()

    main()

更复杂的:如果是比较大的文件,建议采用多进程,详情百度:多进程读取大文件并统计词频 jaket5219999

 #! python3

# -*- coding: utf-8 -*-

import os, codecs

import jieba

from collections import Counter

 

def get_words(txt):

    seg_list = jieba.cut(txt)

    c = Counter()

    for x in seg_list:

        if len(x)>1 and x != '\r\n':

            c[x] += 1

    print('常用词频度统计结果')

    for (k,v) in c.most_common(100):

        print('%s%s %s  %d' % ('  '*(5-len(k)), k, '*'*int(v/3), v))

 

if __name__ == '__main__':

    with codecs.open('19d.txt', 'r', 'utf8') as f:

        txt = f.read()

    get_words(txt)