pooling的规律与Python完毕,pooling原理python

Python搜索引擎实现原理和方法,python搜索引擎原理

如何在庞大的数据中高效的检索自己需要的东西?本篇内容介绍了Python做出一个大数据搜索引擎的原理和方法,以及中间进行数据分析的原理也给大家做了详细介绍。

布隆过滤器 (Bloom Filter)
第一步我们先要实现一个布隆过滤器。

布隆过滤器是大数据领域的一个常见算法,它的目的是过滤掉那些不是目标的元素。也就是说如果一个要搜索的词并不存在与我的数据中,那么它可以以很快的速度返回目标不存在。

让我们看看以下布隆过滤器的代码:

class Bloomfilter(object):
  """
  A Bloom filter is a probabilistic data-structure that trades space for accuracy
  when determining if a value is in a set. It can tell you if a value was possibly
  added, or if it was definitely not added, but it can't tell you for certain that
  it was added.
  """
  def __init__(self, size):
    """Setup the BF with the appropriate size"""
    self.values = [False] * size
    self.size = size

  def hash_value(self, value):
    """Hash the value provided and scale it to fit the BF size"""
    return hash(value) % self.size

  def add_value(self, value):
    """Add a value to the BF"""
    h = self.hash_value(value)
    self.values[h] = True

  def might_contain(self, value):
    """Check if the value might be in the BF"""
    h = self.hash_value(value)
    return self.values[h]

  def print_contents(self):
    """Dump the contents of the BF for debugging purposes"""
    print self.values

基本的数据结构是个数组(实际上是个位图,用1/0来记录数据是否存在),初始化是没有任何内容,所以全部置False。实际的使用当中,该数组的长度是非常大的,以保证效率。

利用哈希算法来决定数据应该存在哪一位,也就是数组的索引

当一个数据被加入到布隆过滤器的时候,计算它的哈希值然后把相应的位置为True

当检查一个数据是否已经存在或者说被索引过的时候,只要检查对应的哈希值所在的位的True/Fasle

看到这里,大家应该可以看出,如果布隆过滤器返回False,那么数据一定是没有索引过的,然而如果返回True,那也不能说数据一定就已经被索引过。在搜索过程中使用布隆过滤器可以使得很多没有命中的搜索提前返回来提高效率。

我们看看这段 code是如何运行的:

bf = Bloomfilter(10)
bf.add_value('dog')
bf.add_value('fish')
bf.add_value('cat')
bf.print_contents()
bf.add_value('bird')
bf.print_contents()
# Note: contents are unchanged after adding bird - it collides
for term in ['dog', 'fish', 'cat', 'bird', 'duck', 'emu']:
print '{}: {} {}'.format(term, bf.hash_value(term), bf.might_contain(term))

结果:

[False, False, False, False, True, True, False, False, False, True]
[False, False, False, False, True, True, False, False, False, True]
dog: 5 True
fish: 4 True
cat: 9 True
bird: 9 True
duck: 5 True
emu: 8 False

首先创建了一个容量为10的的布隆过滤器

图片 1

然后分别加入 ‘dog’,‘fish’,‘cat’三个对象,这时的布隆过滤器的内容如下:

图片 2

然后加入‘bird’对象,布隆过滤器的内容并没有改变,因为‘bird’和‘fish’恰好拥有相同的哈希。

图片 3

最后我们检查一堆对象(’dog’, ‘fish’, ‘cat’, ‘bird’, ‘duck’,
’emu’)是不是已经被索引了。结果发现‘duck’返回True,2而‘emu’返回False。因为‘duck’的哈希恰好和‘dog’是一样的。

图片 4

分词

下面一步我们要实现分词。
分词的目的是要把我们的文本数据分割成可搜索的最小单元,也就是词。这里我们主要针对英语,因为中文的分词涉及到自然语言处理,比较复杂,而英文基本只要用标点符号就好了。厦门叉车

下面我们看看分词的代码:

def major_segments(s):
  """
  Perform major segmenting on a string. Split the string by all of the major
  breaks, and return the set of everything found. The breaks in this implementation
  are single characters, but in Splunk proper they can be multiple characters.
  A set is used because ordering doesn't matter, and duplicates are bad.
  """
  major_breaks = ' '
  last = -1
  results = set()

  # enumerate() will give us (0, s[0]), (1, s[1]), ...
  for idx, ch in enumerate(s):
    if ch in major_breaks:
      segment = s[last+1:idx]
      results.add(segment)

      last = idx

  # The last character may not be a break so always capture
  # the last segment (which may end up being "", but yolo)  
  segment = s[last+1:]
  results.add(segment)

  return results

主要分割

主要分割使用空格来分词,实际的分词逻辑中,还会有其它的分隔符。例如Splunk的缺省分割符包括以下这些,用户也可以定义自己的分割符。

] < >( ) { } | ! ; , ‘ ” * \n \r \s \t & ? + %21 %26 %2526
%3B %7C %20 %2B %3D — %2520 %5D %5B %3A %0A %2C %28 %29

def minor_segments(s):
  """
  Perform minor segmenting on a string. This is like major
  segmenting, except it also captures from the start of the
  input to each break.
  """
  minor_breaks = '_.'
  last = -1
  results = set()

  for idx, ch in enumerate(s):
    if ch in minor_breaks:
      segment = s[last+1:idx]
      results.add(segment)

      segment = s[:idx]
      results.add(segment)

      last = idx

  segment = s[last+1:]
  results.add(segment)
  results.add(s)

  return results

次要分割

次要分割和主要分割的逻辑类似,只是还会把从开始部分到当前分割的结果加入。例如“1.2.3.4”的次要分割会有1,2,3,4,1.2,1.2.3

def segments(event):
  """Simple wrapper around major_segments / minor_segments"""
  results = set()
  for major in major_segments(event):
    for minor in minor_segments(major):
      results.add(minor)
  return results

分词的逻辑就是对文本先进行主要分割,对每一个主要分割在进行次要分割。然后把所有分出来的词返回。

我们看看这段 code是如何运行的:

for term in segments('src_ip = 1.2.3.4'):
print term

src
1.2
1.2.3.4
src_ip
3
1
1.2.3
ip
2
=
4

搜索
好了,有个分词和布隆过滤器这两个利器的支撑后,我们就可以来实现搜索的功能了。

上代码:

class Splunk(object):
  def __init__(self):
    self.bf = Bloomfilter(64)
    self.terms = {} # Dictionary of term to set of events
    self.events = []

  def add_event(self, event):
    """Adds an event to this object"""

    # Generate a unique ID for the event, and save it
    event_id = len(self.events)
    self.events.append(event)

    # Add each term to the bloomfilter, and track the event by each term
    for term in segments(event):
      self.bf.add_value(term)

      if term not in self.terms:
        self.terms[term] = set()
      self.terms[term].add(event_id)

  def search(self, term):
    """Search for a single term, and yield all the events that contain it"""

    # In Splunk this runs in O(1), and is likely to be in filesystem cache (memory)
    if not self.bf.might_contain(term):
      return

    # In Splunk this probably runs in O(log N) where N is the number of terms in the tsidx
    if term not in self.terms:
      return

    for event_id in sorted(self.terms[term]):
      yield self.events[event_id]

Splunk代表一个拥有搜索功能的索引集合

每一个集合中包含一个布隆过滤器,一个倒排词表(字典),和一个存储所有事件的数组

当一个事件被加入到索引的时候,会做以下的逻辑

为每一个事件生成一个unqie id,这里就是序号

对事件进行分词,把每一个词加入到倒排词表,也就是每一个词对应的事件的id的映射结构,注意,一个词可能对应多个事件,所以倒排表的的值是一个Set。倒排表是绝大部分搜索引擎的核心功能。

当一个词被搜索的时候,会做以下的逻辑

检查布隆过滤器,如果为假,直接返回

检查词表,如果被搜索单词不在词表中,直接返回

在倒排表中找到所有对应的事件id,然后返回事件的内容

我们运行下看看把:

s = Splunk()
s.add_event('src_ip = 1.2.3.4')
s.add_event('src_ip = 5.6.7.8')
s.add_event('dst_ip = 1.2.3.4')

for event in s.search('1.2.3.4'):
  print event
print '-'
for event in s.search('src_ip'):
  print event
print '-'
for event in s.search('ip'):
  print event

src_ip = 1.2.3.4
dst_ip = 1.2.3.4
-
src_ip = 1.2.3.4
src_ip = 5.6.7.8
-
src_ip = 1.2.3.4
src_ip = 5.6.7.8
dst_ip = 1.2.3.4

是不是很赞!

更复杂的搜索

更进一步,在搜索过程中,我们想用And和Or来实现更复杂的搜索逻辑。

上代码:

class SplunkM(object):
  def __init__(self):
    self.bf = Bloomfilter(64)
    self.terms = {} # Dictionary of term to set of events
    self.events = []

  def add_event(self, event):
    """Adds an event to this object"""

    # Generate a unique ID for the event, and save it
    event_id = len(self.events)
    self.events.append(event)

    # Add each term to the bloomfilter, and track the event by each term
    for term in segments(event):
      self.bf.add_value(term)
      if term not in self.terms:
        self.terms[term] = set()

      self.terms[term].add(event_id)

  def search_all(self, terms):
    """Search for an AND of all terms"""

    # Start with the universe of all events...
    results = set(range(len(self.events)))

    for term in terms:
      # If a term isn't present at all then we can stop looking
      if not self.bf.might_contain(term):
        return
      if term not in self.terms:
        return

      # Drop events that don't match from our results
      results = results.intersection(self.terms[term])

    for event_id in sorted(results):
      yield self.events[event_id]


  def search_any(self, terms):
    """Search for an OR of all terms"""
    results = set()

    for term in terms:
      # If a term isn't present, we skip it, but don't stop
      if not self.bf.might_contain(term):
        continue
      if term not in self.terms:
        continue

      # Add these events to our results
      results = results.union(self.terms[term])

    for event_id in sorted(results):
      yield self.events[event_id]

利用Python集合的intersection和union操作,可以很方便的支持And(求交集)和Or(求合集)的操作。

运行结果如下:

s = SplunkM()
s.add_event('src_ip = 1.2.3.4')
s.add_event('src_ip = 5.6.7.8')
s.add_event('dst_ip = 1.2.3.4')

for event in s.search_all(['src_ip', '5.6']):
  print event
print '-'
for event in s.search_any(['src_ip', 'dst_ip']):
  print event

src_ip = 5.6.7.8
-
src_ip = 1.2.3.4
src_ip = 5.6.7.8
dst_ip = 1.2.3.4

如何在庞大的数据中高效的检索自己需要的东西?本篇内容介绍了Python做出一个大数据搜…

Python 探针的实现原理,python探针原理

探针的实现主要涉及以下几个知识点:

sys.meta_path
sitecustomize.py
sys.meta_path

sys.meta_path 这个简单的来说就是可以实现 import hook 的功能,
当执行 import 相关的操作时,会触发 sys.meta_path 列表中定义的对象。
关于 sys.meta_path 更详细的资料请查阅 python 文档中 sys.meta_path
相关内容以及
PEP 0302 。

sys.meta_path 中的对象需要实现一个 find_module 方法,
这个 find_module 方法返回 None 或一个实现了 load_module 方法的对象
(代码可以从 github 上下载 part1) :

import sys

class MetaPathFinder:

  def find_module(self, fullname, path=None):
    print('find_module {}'.format(fullname))
    return MetaPathLoader()

class MetaPathLoader:

  def load_module(self, fullname):
    print('load_module {}'.format(fullname))
    sys.modules[fullname] = sys
    return sys

sys.meta_path.insert(0, MetaPathFinder())

if __name__ == '__main__':
  import http
  print(http)
  print(http.version_info)

load_module 方法返回一个 module 对象,这个对象就是 import 的 module
对象了。
比如我上面那样就把 http 替换为 sys 这个 module 了。

$ python meta_path1.py
find_module http
load_module http
 
sys.version_info(major=3, minor=5, micro=1, releaselevel=’final’,
serial=0)
通过 sys.meta_path 我们就可以实现 import hook 的功能:
当 import 预定的 module 时,对这个 module 里的对象来个狸猫换太子,
从而实现获取函数或方法的执行时间等探测信息。

上面说到了狸猫换太子,那么怎么对一个对象进行狸猫换太子的操作呢?
对于函数对象,我们可以使用装饰器的方式来替换函数对象(代码可以从 github
上下载 part2) :

import functools
import time

def func_wrapper(func):
  @functools.wraps(func)
  def wrapper(*args, **kwargs):
    print('start func')
    start = time.time()
    result = func(*args, **kwargs)
    end = time.time()
    print('spent {}s'.format(end - start))
    return result
  return wrapper

def sleep(n):
  time.sleep(n)
  return n

if __name__ == '__main__':
  func = func_wrapper(sleep)
  print(func(3))

执行结果:

$ python func_wrapper.py
start func
spent 3.004966974258423s
3

下面我们来实现一个计算指定模块的指定函数的执行时间的功能(代码可以从
github 上下载 part3) 。

假设我们的模块文件是 hello.py:

import time

def sleep(n):
  time.sleep(n)
  return n

我们的 import hook 是 hook.py:

import functools
import importlib
import sys
import time

_hook_modules = {'hello'}

class MetaPathFinder:

  def find_module(self, fullname, path=None):
    print('find_module {}'.format(fullname))
    if fullname in _hook_modules:
      return MetaPathLoader()

class MetaPathLoader:

  def load_module(self, fullname):
    print('load_module {}'.format(fullname))
    # ``sys.modules`` 中保存的是已经导入过的 module
    if fullname in sys.modules:
      return sys.modules[fullname]

    # 先从 sys.meta_path 中删除自定义的 finder
    # 防止下面执行 import_module 的时候再次触发此 finder
    # 从而出现递归调用的问题
    finder = sys.meta_path.pop(0)
    # 导入 module
    module = importlib.import_module(fullname)

    module_hook(fullname, module)

    sys.meta_path.insert(0, finder)
    return module

sys.meta_path.insert(0, MetaPathFinder())

def module_hook(fullname, module):
  if fullname == 'hello':
    module.sleep = func_wrapper(module.sleep)

def func_wrapper(func):
  @functools.wraps(func)
  def wrapper(*args, **kwargs):
    print('start func')
    start = time.time()
    result = func(*args, **kwargs)
    end = time.time()
    print('spent {}s'.format(end - start))
    return result
  return wrapper

测试代码:

>>> import hook
>>> import hello
find_module hello
load_module hello
>>>
>>> hello.sleep(3)
start func
spent 3.0029919147491455s
3
>>>

其实上面的代码已经实现了探针的基本功能。不过有一个问题就是上面的代码需要显示的
执行 import hook 操作才会注册上我们定义的 hook。

那么有没有办法在启动 python 解释器的时候自动执行 import hook
的操作呢?
答案就是可以通过定义 sitecustomize.py 的方式来实现这个功能。

sitecustomize.py
简单的说就是,python 解释器初始化的时候会自动 import PYTHONPATH 下存在的
sitecustomize 和 usercustomize 模块:

实验项目的目录结构如下(代码可以从 github 上下载 part4)

$ tree
.
├── sitecustomize.py
└── usercustomize.py
sitecustomize.py:

$ cat sitecustomize.py
print(‘this is sitecustomize’)
usercustomize.py:

$ cat usercustomize.py
print(‘this is usercustomize’)
把当前目录加到 PYTHONPATH 中,然后看看效果:

$ export PYTHONPATH=.
$ python
this is sitecustomize    <----
this is usercustomize    <----
Python 3.5.1 (default, Dec 24 2015, 17:20:27)
[GCC 4.2.1 Compatible Apple LLVM 7.0.2 (clang-700.1.81)] on darwin
Type "help", "copyright", "credits" or "license" for more information.
>>>

可以看到确实自动导入了。所以我们可以把之前的探测程序改为支持自动执行
import hook (代码可以从 github 上下载part5) 。

目录结构:

$ tree
.
├── hello.py
├── hook.py
├── sitecustomize.py
sitecustomize.py:

$ cat sitecustomize.py
import hook

结果:

$ export PYTHONPATH=.
$ python
find_module usercustomize
Python 3.5.1 (default, Dec 24 2015, 17:20:27)
[GCC 4.2.1 Compatible Apple LLVM 7.0.2 (clang-700.1.81)] on darwin
Type "help", "copyright", "credits" or "license" for more information.
find_module readline
find_module atexit
find_module rlcompleter
>>>
>>> import hello
find_module hello
load_module hello
>>>
>>> hello.sleep(3)
start func
spent 3.005002021789551s
3

不过上面的探测程序其实还有一个问题,那就是需要手动修改 PYTHONPATH 。
用过探针程序的朋友应该会记得, 使用 newrelic
之类的探针只需要执行一条命令就 可以了: newrelic-admin run-program
python hello.py 实际上修改PYTHONPATH 的操作是在 newrelic-admin
这个程序里完成的。

下面我们也要来实现一个类似的命令行程序,就叫 agent.py 吧。

agent
还是在上一个程序的基础上修改。先调整一个目录结构,把 hook
操作放到一个单独的目录下, 方便设置
PYTHONPATH后不会有其他的干扰(代码可以从 github 上下载 part6 )。

$ mkdir bootstrap
$ mv hook.py bootstrap/_hook.py
$ touch bootstrap/__init__.py
$ touch agent.py
$ tree
.
├── bootstrap
│  ├── __init__.py
│  ├── _hook.py
│  └── sitecustomize.py
├── hello.py
├── test.py
├── agent.py

bootstrap/sitecustomize.py 的内容修改为:

$ cat bootstrap/sitecustomize.py
import _hook
agent.py 的内容如下:

import os
import sys

current_dir = os.path.dirname(os.path.realpath(__file__))
boot_dir = os.path.join(current_dir, 'bootstrap')

def main():
  args = sys.argv[1:]
  os.environ['PYTHONPATH'] = boot_dir
  # 执行后面的 python 程序命令
  # sys.executable 是 python 解释器程序的绝对路径 ``which python``
  # >>> sys.executable
  # '/usr/local/var/pyenv/versions/3.5.1/bin/python3.5'
  os.execl(sys.executable, sys.executable, *args)

if __name__ == '__main__':
  main()

test.py 的内容为:

$ cat test.py
import sys
import hello

print(sys.argv)
print(hello.sleep(3))

使用方法:

$ python agent.py test.py arg1 arg2
find_module usercustomize
find_module hello
load_module hello
['test.py', 'arg1', 'arg2']
start func
spent 3.005035161972046s
3

至此,我们就实现了一个简单的 python
探针程序。当然,跟实际使用的探针程序相比肯定是有
很大的差距的,这篇文章主要是讲解一下探针背后的实现原理。

如果大家对商用探针程序的具体实现感兴趣的话,可以看一下国外的 New Relic
或国内的 OneAPM, TingYun 等这些 APM 厂商的商用 python
探针的源代码,相信你会发现一些很有趣的事情。

探针的实现原理,python探针原理
探针的实现主要涉及以下几个知识点: sys.meta_path sitecustomize.py
sys.meta_path sys.meta_path 这个简单的来说就…

pooling的原理与Python实现,pooling原理python

本文首先阐述pooling所对应的操作,然后分析pooling背后蕴含的一些道理,最后给出pooling的Python实现。

一、pooling所对应的操作

首先从整体上对pooling有一个直观的概念(也就是对pooling的输入、输出以及具体功能进行描述,但是忽略具体的实现细节):pooling的输入是一个矩阵,输出是一个矩阵;完成的功能是,对输入矩阵的一个局部区域进行运作,使得该区域对应的输出能够最佳的代表该区域的特性。如图1所示,左图黄色矩阵代表输入矩阵,右图蓝色矩阵代表输出矩阵;动态的橙色矩阵代表选定输入矩阵的一个局部区域,然后寻找出该区域的一个最佳代表出来,最后将所有选出的代表按照与原始输入矩阵对应的空间位置关系在输出矩阵中进行排序。

这一过程可以用选举过程来类比。假如要选北京市长,一种可行的做法是,北京的每一个区各选一个最符合该区权益的代表,然后由选出的代表们决定如何选取北京市长。当然了,我们希望每一个区选出的代表最能符合该区的权益。与pooling做一个简单类比,北京〈-〉输入矩阵;朝阳区、海淀区等〈-〉局部区域;各区代表〈-〉输出矩阵(如果他们开会的时候按照地理位置就坐,这就和pooling的特性很像了)。

图1 pooling对特征的操作示意图

二、pooling背后蕴含的道理

在局部区域选取代表的过程中,我们一般的做法是:选取该区域最有声望的人作为代表(对应max
pooling)或者选取最能代表该区域所有人一般特性的人作为代表(对应mean
pooling),于此对应的是,pooling中也存在两种常用的做法:局部区域值最大的胜出作为该区域的代表或者将该区域所有的值取平均作为该区域的代表。

 

选取该区域最有声望的人作为代表 vs 选取最能代表该区域所有人一般特性的人作为代表 好处是:

1)
局部区域最有声望的人在选市长时不宜出现偏差,但他有可能倚老卖老,不能代表该区域一般民众的看法(局部的最大值,容易忽略该区域的一般特性)

2)
最能代表该区域所有人一般特性的人虽然能够代表该区域所有居民的最大权益,但是由于他的认知能力有限(局部均值较小,所以说他认知能力有限),在选市长时容易出现偏差。

3)
如果该区域内的人存在一定程度的自由活动的话(对应的是平移、旋转不变性),对上述两种选代表的方式基本是没有影响的。

 

pooling的正规解释

根据相关理论:(1)邻域大小受限造成的估计值方差增大;(2)误差造成估计均值的偏移。一般来说,mean-pooling能减小第一种误差,更多的保留图像的背景信息,max-pooling能减小第二种误差,更多的保留纹理信息。

 

一般情况下pooling的输入维度高、输出维度低,这在一定程度上可以理解为降维,根据上述对pooling原理的阐述,我们可以推断,这种降维过程极大的保留了输入的一些最重要的信息。在实际应用pooling的过程中,我们需要根据实际问题的特点,具体分析了。其实,知道了pooling的操作及其原理,如果她与具体问题结合的较好,则不失为一个很好的创新点哦,哈哈。

 

三、pooing的Python实现

笔者在写代码时的一些思考如下,核心就是将一个复杂问题拆分为一个可以直接用代码实现的问题:

1)
输入矩阵可以为mxn,也可以为mxnxp,如果直接考虑这两种形式写代码的时候无从下手(要考虑的情况有点多,并且多维的矩阵我自己容易搞晕)。仔细分析发现如果我将
     
 mxn矩阵的pooling实现,那么mxnxp矩阵就可以运用mxn矩阵的实现轻而易举实现了。

2)
针对mxn矩阵输入,有可能图1橙色方框不能恰好覆盖输入矩阵,因此需要对输入矩阵进行扩展。扩展也很简单,只要最后一个poolStride对应的poolSize能够覆盖输入矩阵,
     其他的肯定可以覆盖了。

3) 最后就是for循环进行类似操作过程处理了。

def pooling(inputMap,poolSize=3,poolStride=2,mode='max'):
    """INPUTS:
              inputMap - input array of the pooling layer
              poolSize - X-size(equivalent to Y-size) of receptive field
              poolStride - the stride size between successive pooling squares

       OUTPUTS:
               outputMap - output array of the pooling layer

       Padding mode - 'edge'
    """
    # inputMap sizes
    in_row,in_col = np.shape(inputMap)

    # outputMap sizes
    out_row,out_col = int(np.floor(in_row/poolStride)),int(np.floor(in_col/poolStride))
    row_remainder,col_remainder = np.mod(in_row,poolStride),np.mod(in_col,poolStride)
    if row_remainder != 0:
        out_row +=1
    if col_remainder != 0:
        out_col +=1
    outputMap = np.zeros((out_row,out_col))

    # padding
    temp_map = np.lib.pad(inputMap, ((0,poolSize-row_remainder),(0,poolSize-col_remainder)), 'edge')

    # max pooling
    for r_idx in range(0,out_row):
        for c_idx in range(0,out_col):
            startX = c_idx * poolStride
            startY = r_idx * poolStride
            poolField = temp_map[startY:startY + poolSize, startX:startX + poolSize]
            poolOut = np.max(poolField)
            outputMap[r_idx,c_idx] = poolOut

    # retrun outputMap
    return  outputMap

# 测试实例

test = np.array([[1,2,3,4],[5,6,7,8],[9,10,11,12]])
test_result = pooling(test, 2, 2, ‘max’)
print(test_result)

 测试结果:

图片 5

总结: 先理解一项技术的输入、输出以及其完成的功能;然后在生活中寻找类似的例子;最后,将该项技术分解为可以实现的步骤。

 

本文首先阐述pooling所对应的操作,然后分析pooling背后蕴含的一些道理,最后给出pooling的Python实现…

浅谈用Python实现一个大数据搜索引擎,浅谈python

搜索是大数据领域里常见的需求。Splunk和ELK分别是该领域在非开源和开源领域里的领导者。本文利用很少的Python代码实现了一个基本的数据搜索功能,试图让大家理解大数据搜索的基本原理。

布隆过滤器 (Bloom Filter)

第一步我们先要实现一个布隆过滤器。

布隆过滤器是大数据领域的一个常见算法,它的目的是过滤掉那些不是目标的元素。也就是说如果一个要搜索的词并不存在与我的数据中,那么它可以以很快的速度返回目标不存在。

让我们看看以下布隆过滤器的代码:

class Bloomfilter(object):
  """
  A Bloom filter is a probabilistic data-structure that trades space for accuracy
  when determining if a value is in a set. It can tell you if a value was possibly
  added, or if it was definitely not added, but it can't tell you for certain that
  it was added.
  """
  def __init__(self, size):
    """Setup the BF with the appropriate size"""
    self.values = [False] * size
    self.size = size

  def hash_value(self, value):
    """Hash the value provided and scale it to fit the BF size"""
    return hash(value) % self.size

  def add_value(self, value):
    """Add a value to the BF"""
    h = self.hash_value(value)
    self.values[h] = True

  def might_contain(self, value):
    """Check if the value might be in the BF"""
    h = self.hash_value(value)
    return self.values[h]

  def print_contents(self):
    """Dump the contents of the BF for debugging purposes"""
    print self.values
  1. 基本的数据结构是个数组(实际上是个位图,用1/0来记录数据是否存在),初始化是没有任何内容,所以全部置False。实际的使用当中,该数组的长度是非常大的,以保证效率。
  2. 利用哈希算法来决定数据应该存在哪一位,也就是数组的索引
  3. 当一个数据被加入到布隆过滤器的时候,计算它的哈希值然后把相应的位置为True
  4. 当检查一个数据是否已经存在或者说被索引过的时候,只要检查对应的哈希值所在的位的True/Fasle

看到这里,大家应该可以看出,如果布隆过滤器返回False,那么数据一定是没有索引过的,然而如果返回True,那也不能说数据一定就已经被索引过。在搜索过程中使用布隆过滤器可以使得很多没有命中的搜索提前返回来提高效率。

我们看看这段 code是如何运行的:

bf = Bloomfilter(10)
bf.add_value('dog')
bf.add_value('fish')
bf.add_value('cat')
bf.print_contents()
bf.add_value('bird')
bf.print_contents()
# Note: contents are unchanged after adding bird - it collides
for term in ['dog', 'fish', 'cat', 'bird', 'duck', 'emu']:
  print '{}: {} {}'.format(term, bf.hash_value(term), bf.might_contain(term))

结果:

[False, False, False, False, True, True, False, False, False, True]
[False, False, False, False, True, True, False, False, False, True]
dog: 5 True
fish: 4 True
cat: 9 True
bird: 9 True
duck: 5 True
emu: 8 False

首先创建了一个容量为10的的布隆过滤器

图片 6

然后分别加入 ‘dog’,‘fish’,‘cat’三个对象,这时的布隆过滤器的内容如下:

图片 7

然后加入‘bird’对象,布隆过滤器的内容并没有改变,因为‘bird’和‘fish’恰好拥有相同的哈希。

图片 8

最后我们检查一堆对象(’dog’, ‘fish’, ‘cat’, ‘bird’, ‘duck’,
’emu’)是不是已经被索引了。结果发现‘duck’返回True,2而‘emu’返回False。因为‘duck’的哈希恰好和‘dog’是一样的。

图片 9

分词

下面一步我们要实现分词。
分词的目的是要把我们的文本数据分割成可搜索的最小单元,也就是词。这里我们主要针对英语,因为中文的分词涉及到自然语言处理,比较复杂,而英文基本只要用标点符号就好了。

下面我们看看分词的代码:

def major_segments(s):
  """
  Perform major segmenting on a string. Split the string by all of the major
  breaks, and return the set of everything found. The breaks in this implementation
  are single characters, but in Splunk proper they can be multiple characters.
  A set is used because ordering doesn't matter, and duplicates are bad.
  """
  major_breaks = ' '
  last = -1
  results = set()

  # enumerate() will give us (0, s[0]), (1, s[1]), ...
  for idx, ch in enumerate(s):
    if ch in major_breaks:
      segment = s[last+1:idx]
      results.add(segment)

      last = idx

  # The last character may not be a break so always capture
  # the last segment (which may end up being "", but yolo)  
  segment = s[last+1:]
  results.add(segment)

  return results

主要分割

主要分割使用空格来分词,实际的分词逻辑中,还会有其它的分隔符。例如Splunk的缺省分割符包括以下这些,用户也可以定义自己的分割符。

] < > ( ) { } | ! ; , ‘ ” * \n \r \s \t & ? + %21 %26 %2526
%3B %7C %20 %2B %3D — %2520 %5D %5B %3A %0A %2C %28 %29

def minor_segments(s):
  """
  Perform minor segmenting on a string. This is like major
  segmenting, except it also captures from the start of the
  input to each break.
  """
  minor_breaks = '_.'
  last = -1
  results = set()

  for idx, ch in enumerate(s):
    if ch in minor_breaks:
      segment = s[last+1:idx]
      results.add(segment)

      segment = s[:idx]
      results.add(segment)

      last = idx

  segment = s[last+1:]
  results.add(segment)
  results.add(s)

  return results

次要分割

次要分割和主要分割的逻辑类似,只是还会把从开始部分到当前分割的结果加入。例如“1.2.3.4”的次要分割会有1,2,3,4,1.2,1.2.3

def segments(event):
  """Simple wrapper around major_segments / minor_segments"""
  results = set()
  for major in major_segments(event):
    for minor in minor_segments(major):
      results.add(minor)
  return results

分词的逻辑就是对文本先进行主要分割,对每一个主要分割在进行次要分割。然后把所有分出来的词返回。

我们看看这段 code是如何运行的:

for term in segments('src_ip = 1.2.3.4'):
    print term

src
1.2
1.2.3.4
src_ip
3
1
1.2.3
ip

Python无损音乐搜索引擎实现代码,python无损

图片 10 

研究了一段时间酷狗音乐的接口,完美破解了其vip音乐下载方式,想着能更好的追求开源,故写下此篇文章,本文仅供学习参考。虽然没什么技术含量,但都是自己一点一点码出来,一点一点抓出来的。

一、综述:

根据酷狗的搜索接口以及无损音乐下载接口,做出爬虫系统。采用flask框架,前端提取搜索关键字,后端调用爬虫系统采集数据,并将数据前端呈现;

运行环境:windows/linux  python2.7

二、爬虫开发:

通过抓包的方式对酷狗客户端进行抓包,抓到两个接口:

1、搜索接口:

这个接口通过传递关键字,其返回的是一段json数据,数据包含音乐名称、歌手、专辑、总数据量等信息,当然最重要的是数据包含音乐各个品质的hash。

图片 11 

默认接口返回的数据只包含30首音乐,为了能拿到所有的数据,只需要把pagesize更改就可以,所以我提取了总数据数量,然后再次发动一次数据请求,拿到全部的数据。当然,这个总数据量也就是json中的total也是作为搜索结果的依据,如果total
== 0 则判断无法搜索到数据。

搜索到数据后,我就要提取无损音乐的hash,这个hash是音乐下载的关键,无损音乐hash键名:SQFileHash,提取到无损hash(如果是32个0就表示None),我把他的名称、歌手、hash以字典形式传递给下一个模块。

代码实现:

a.请求模块(复用率高):

# coding=utf-8
import requests
import json
headers = {
  'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8',
  'Accept-Encoding': 'gzip, deflate',
  'Accept-Language': 'zh-CN,zh;q=0.9',
  'Cache-Control': 'max-age=0',
  'Connection': 'keep-alive',
  'User-Agent': 'Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) '
         'Chrome/63.0.3239.132 Safari/537.36',
}
def parse(url):
  ret = json.loads(requests.get(url, headers=headers, timeout=5).text)
  # 返回的是已经转换过后的字典数据
  return ret
if __name__ == '__main__':
  parse()

b.搜索模块

# coding=utf-8
import copy
import MusicParse
def search(keyword):
  search_url = 'http://songsearch.kugou.com/song_search_v2?keyword={}page=1'.format(keyword)
  # 这里需要判断一下,ip与搜索字段可能会限制搜索,total进行判断
  total = MusicParse.parse(search_url)['data']['total']
  if total != 0:
    search_total_url = search_url + '&pagesize=%d' % total
    music_list = MusicParse.parse(search_total_url)['data']['lists']
    item, items = {}, []
    for music in music_list:
      if music['SQFileHash'] != '0'*32:
        item['Song'] = music['SongName'] # 歌名
        item['Singer'] = music['SingerName'] # 歌手
        item['Hash'] = music['SQFileHash'] # 歌曲无损hash
        items.append(copy.deepcopy(item))
    return items
  else:
    return None
if __name__ == '__main__':
  search()

到这步,音乐搜索接口以及利用完了,下面就是无损音乐搜索了。

2、音乐下载接口:

# V2版系统,pc版
Music_api_1 = 'http://trackercdnbj.kugou.com/i/v2/?cmd=23&pid=1&behavior=download'
# V2版系统,手机版(备用)
Music_api_2 = 'http://trackercdn.kugou.com/i/v2/?appid=1005&pid=2&cmd=25&behavior=play'
# 老版系统(备用)
Music_api_3 = 'http://trackercdn.kugou.com/i/?cmd=4&pid=1&forceDown=0&vip=1'

我这里准备三个接口,根据酷狗系统版本不同,采用不同加密方式,酷狗音乐下载的关键就是音乐接口处提交的key的加密方式,key是由hash加密生成的,不同的酷狗版本,加密方式不同:

酷狗v2版key的生成:md5(hash +”kgcloudv2″)

酷狗老版key的生成:md5(hash +”kgcloud”)

只要将音乐的hash+key添加到api_url
,get提交过去,就能返回一段json数据,数据中就包括了音乐的下载链接,然后在提取其download_url。下面我将采用酷狗的第一个接口完成代码的实现,当然,酷狗存在地区的限制,接口有效性也带检测,如果第一个不行就换一个,你懂得!!!然后我把数据做成字典进行传递。

代码实现:

# coding=utf-8
import copy
import hashlib
import MusicParse
import MusicSearch
# V2版系统,pc版,加密方式为md5(hash +"kgcloudv2")
Music_api_1 = 'http://trackercdnbj.kugou.com/i/v2/?cmd=23&pid=1&behavior=download'
# V2版系统,手机版,加密方式为md5(hash +"kgcloudv2") (备用)
Music_api_2 = 'http://trackercdn.kugou.com/i/v2/?appid=1005&pid=2&cmd=25&behavior=play'
# 老版系统,加密方式为md5(hash +"kgcloud")(备用)
Music_api_3 = 'http://trackercdn.kugou.com/i/?cmd=4&pid=1&forceDown=0&vip=1'
def V2Md5(Hash): # 用于生成key,适用于V2版酷狗系统
  return hashlib.md5((Hash + 'kgcloudv2').encode('utf-8')).hexdigest()
def Md5(Hash): # 用于老版酷狗系统
  return hashlib.md5((Hash + 'kgcloud').encode('utf-8')).hexdigest()
def HighSearch(keyword):
  music_list = MusicSearch.search(keyword)
  if music_list is not None:
    item, items = {}, []
    for music in music_list:
      Hash = str.lower(music['Hash'].encode('utf-8'))
      key_new = V2Md5(Hash) # 生成v2系统key
      try:
        DownUrl = MusicParse.parse(Music_api_1 + '&hash=%s&key=%s' % (Hash, key_new))['url']
        item['Song'] = music['Song'].encode('utf-8') # 歌名
        item['Singer'] = music['Singer'].encode('utf-8') # 歌手
        item['url'] = DownUrl
        items.append(copy.deepcopy(item))
      except KeyError:
        pass
    return items
if __name__ == '__main__':
  HighSearch()

酷狗的爬虫系统就设计完毕了,下面开始使用flask框架搭建前后端了。

三、引擎搭建

这个搜索引擎是基于flask框架的,设计思路比较简单,就是前端传递post数据(keyword)传递到后端,后端拿着这个keyword传递给爬虫,爬虫把数据返回给系统,系统在前端渲染出来。

代码实现:

# coding=utf-8
import sys
from flask import Flask
from flask import request, render_template
from KgSpider import HighMusicSearch
reload(sys)
sys.setdefaultencoding('utf-8')
app = Flask(__name__)
@app.route('/', methods=['GET', 'POST'])
def search():
  if request.method == 'GET':
    return render_template('index.html')
  elif request.method == 'POST':
    keyword = request.form.get('keyword')
    items = HighMusicSearch.HighSearch(keyword)
    if items != None:
      return render_template('list.html', list=items)
    else:
      return '找不到!!!不支持英文'
  else:
    return render_template('404.html')
if __name__ == '__main__':
  app.run(debug=True)

四、调试

整改引擎系统,也就设计完毕,然我们试试效果:

1.启动脚本:python run.py

图片 12 

2.输入关键字进行搜索

图片 13 

五、总结

引擎搭建完毕,也能正常的运行了,但是这只是一个模型,完全没有考虑,多用户访问带来的压力,很容易崩溃,当然经过我的测试,发现只能搜索中文,英文完全无效,why?别问我,我也不知道!!!当然在这里我也想说一下,请尊重版权!!!虽然我是口是心非!!!!!

项目地址: 码云项目地址

总结

以上所述是小编给大家介绍的Python无损音乐搜索引擎实现代码,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对帮客之家网站的支持!

研究了一段时间酷狗音乐的接口,完美破解了其vip音乐下载方式,想着能更好的追求开源,故写…

2

4

搜索

好了,有个分词和布隆过滤器这两个利器的支撑后,我们就可以来实现搜索的功能了。

上代码:

class Splunk(object):
  def __init__(self):
    self.bf = Bloomfilter(64)
    self.terms = {} # Dictionary of term to set of events
    self.events = []

  def add_event(self, event):
    """Adds an event to this object"""

    # Generate a unique ID for the event, and save it
    event_id = len(self.events)
    self.events.append(event)

    # Add each term to the bloomfilter, and track the event by each term
    for term in segments(event):
      self.bf.add_value(term)

      if term not in self.terms:
        self.terms[term] = set()
      self.terms[term].add(event_id)

  def search(self, term):
    """Search for a single term, and yield all the events that contain it"""

    # In Splunk this runs in O(1), and is likely to be in filesystem cache (memory)
    if not self.bf.might_contain(term):
      return

    # In Splunk this probably runs in O(log N) where N is the number of terms in the tsidx
    if term not in self.terms:
      return

    for event_id in sorted(self.terms[term]):
      yield self.events[event_id]

Splunk代表一个拥有搜索功能的索引集合

每一个集合中包含一个布隆过滤器,一个倒排词表(字典),和一个存储所有事件的数组

当一个事件被加入到索引的时候,会做以下的逻辑

  1. 为每一个事件生成一个unqie id,这里就是序号
  2. 对事件进行分词,把每一个词加入到倒排词表,也就是每一个词对应的事件的id的映射结构,注意,一个词可能对应多个事件,所以倒排表的的值是一个Set。倒排表是绝大部分搜索引擎的核心功能。

当一个词被搜索的时候,会做以下的逻辑

  1. 检查布隆过滤器,如果为假,直接返回
  2. 检查词表,如果被搜索单词不在词表中,直接返回
  3. 在倒排表中找到所有对应的事件id,然后返回事件的内容

我们运行下看看把:

s = Splunk()
s.add_event('src_ip = 1.2.3.4')
s.add_event('src_ip = 5.6.7.8')
s.add_event('dst_ip = 1.2.3.4')

for event in s.search('1.2.3.4'):
  print event
print '-'
for event in s.search('src_ip'):
  print event
print '-'
for event in s.search('ip'):
  print event

src_ip = 1.2.3.4
dst_ip = 1.2.3.4
-
src_ip = 1.2.3.4
src_ip = 5.6.7.8
-
src_ip = 1.2.3.4
src_ip = 5.6.7.8
dst_ip = 1.2.3.4

是不是很赞!

更复杂的搜索

更进一步,在搜索过程中,我们想用And和Or来实现更复杂的搜索逻辑。

上代码:

class SplunkM(object):
  def __init__(self):
    self.bf = Bloomfilter(64)
    self.terms = {} # Dictionary of term to set of events
    self.events = []

  def add_event(self, event):
    """Adds an event to this object"""

    # Generate a unique ID for the event, and save it
    event_id = len(self.events)
    self.events.append(event)

    # Add each term to the bloomfilter, and track the event by each term
    for term in segments(event):
      self.bf.add_value(term)
      if term not in self.terms:
        self.terms[term] = set()

      self.terms[term].add(event_id)

  def search_all(self, terms):
    """Search for an AND of all terms"""

    # Start with the universe of all events...
    results = set(range(len(self.events)))

    for term in terms:
      # If a term isn't present at all then we can stop looking
      if not self.bf.might_contain(term):
        return
      if term not in self.terms:
        return

      # Drop events that don't match from our results
      results = results.intersection(self.terms[term])

    for event_id in sorted(results):
      yield self.events[event_id]


  def search_any(self, terms):
    """Search for an OR of all terms"""
    results = set()

    for term in terms:
      # If a term isn't present, we skip it, but don't stop
      if not self.bf.might_contain(term):
        continue
      if term not in self.terms:
        continue

      # Add these events to our results
      results = results.union(self.terms[term])

    for event_id in sorted(results):
      yield self.events[event_id]

利用Python集合的intersection和union操作,可以很方便的支持And(求交集)和Or(求合集)的操作。

运行结果如下:

s = SplunkM()
s.add_event('src_ip = 1.2.3.4')
s.add_event('src_ip = 5.6.7.8')
s.add_event('dst_ip = 1.2.3.4')

for event in s.search_all(['src_ip', '5.6']):
  print event
print '-'
for event in s.search_any(['src_ip', 'dst_ip']):
  print event

src_ip = 5.6.7.8
-
src_ip = 1.2.3.4
src_ip = 5.6.7.8
dst_ip = 1.2.3.4

总结

以上的代码只是为了说明大数据搜索的基本原理,包括布隆过滤器,分词和倒排表。如果大家真的想要利用这代码来实现真正的搜索功能,还差的太远。所有的内容来自于Splunk
Conf2017。希望对大家的学习有所帮助,也希望大家多多支持帮客之家。

搜索是大数据领域里常见的需求。Splunk和ELK分别是该领域在非开源和开源领域里的领导者。…

相关文章