使用 pandas 的“大数据”工作流 [已关闭]

2022-09-05 01:05:31

几个月来,在学习熊猫的同时,我一直试图找出这个问题的答案。我使用 SAS 进行日常工作,它非常适合用于核心外支持。但是,由于许多其他原因,SAS作为一种软件是可怕的。

有一天,我希望用python和pandas取代我对SAS的使用,但我目前缺乏大型数据集的核心工作流程。我不是在谈论需要分布式网络的“大数据”,而是文件太大而无法容纳内存,但足够小以容纳硬盘。

我的第一个想法是用于在磁盘上保存大型数据集,并仅将我需要的部分拉入数据帧中进行分析。其他人提到MongoDB是一种更易于使用的替代品。我的问题是这样的:HDFStore

有哪些最佳实践工作流可用于完成以下任务:

  1. 将平面文件加载到永久的磁盘数据库结构中
  2. 查询该数据库以检索要馈送到 pandas 数据结构中的数据
  3. 在熊猫中操作片段后更新数据库

现实世界的例子将不胜感激,特别是来自任何在“大数据”上使用熊猫的人。

编辑 - 我希望它如何工作的一个例子:

  1. 以迭代方式导入大型平面文件,并将其存储在永久的磁盘数据库结构中。这些文件通常太大,无法放入内存。
  2. 为了使用Pandas,我想读取可以放入内存的数据子集(通常一次只有几列)。
  3. 我将通过对所选列执行各种操作来创建新列。
  4. 然后,我必须将这些新列追加到数据库结构中。

我正在尝试找到执行这些步骤的最佳实践方法。阅读有关熊猫和pytables的链接,似乎附加新列可能是一个问题。

编辑 - 具体回答杰夫的问题:

  1. 我正在建立消费者信用风险模型。数据种类包括电话、SSN和地址特征;属性值;贬损信息,如犯罪记录,破产等...我每天使用的数据集平均有近1,000到2,000个混合数据类型的字段:数字和字符数据的连续,名义和序数变量。我很少追加行,但我确实执行了许多创建新列的操作。
  2. 典型操作涉及使用条件逻辑将多个列合并到一个新的复合列中。例如。这些操作的结果是我的数据集中的每条记录都有一个新列。if var1 > 2 then newvar = 'A' elif var2 = 4 then newvar = 'B'
  3. 最后,我想将这些新列追加到磁盘上数据结构中。我将重复步骤 2,使用交叉表和描述性统计来探索数据,试图找到有趣、直观的模型关系。
  4. 典型的项目文件通常约为 1GB。文件被组织成这样一种方式,其中一行由消费者数据的记录组成。每条记录的每行都有相同的列数。情况将始终如此。
  5. 在创建新列时,我很少会按行进行子集化。但是,在创建报告或生成描述性统计信息时,我对行进行子集化是很常见的。例如,我可能希望为特定业务线(例如零售信用卡)创建一个简单的频率。为此,除了要报告的任何列之外,我还将仅选择业务线 = 零售的那些记录。但是,在创建新列时,我会提取所有数据行,并且仅提取操作所需的列。
  6. 建模过程要求我分析每列,寻找与某些结果变量的有趣关系,并创建新的复合列来描述这些关系。我探索的列通常是在小集合中完成的。例如,我将重点介绍一组20列,仅处理房地产价值,并观察它们与贷款违约的关系。一旦探索了这些并创建了新的列,我就会转到另一组列,比如大学教育,并重复这个过程。我正在做的是创建候选变量来解释我的数据与某些结果之间的关系。在这个过程的最后,我应用了一些学习技术,从这些复合柱中创建一个方程。

我很少会向数据集添加行。我几乎总是会创建新列(统计/机器学习术语中的变量或特征)。


答案 1

我经常以这种方式使用数十千兆字节的数据,例如,我在磁盘上有表,我通过查询读取,创建数据并追加回来。

值得一读的是文档此线程的后期,以获取有关如何存储数据的几项建议。

影响你如何存储数据的细节,比如:
尽可能多地提供细节;我可以帮助你开发一个结构。

  1. 数据大小,行数,列数,列类型;您是追加行,还是仅追加列?
  2. 典型操作将是什么样子。例如,对列进行查询以选择一堆行和特定列,然后执行操作(在内存中),创建新列,保存这些列。
    (举一个玩具的例子可以让我们提供更具体的建议。
  3. 处理完毕后,您该怎么办?步骤 2 是临时的还是可重复的?
  4. 输入平面文件:多少个,粗略的总大小(以 Gb 为单位)。例如,如何按记录组织这些?是每个都包含不同的字段,还是每个文件都有一些记录,每个文件中的所有字段都包括在内?
  5. 您是否曾经根据条件选择行(记录)的子集(例如,选择字段为 A > 5 的行)?然后做某事,或者你只是选择字段A,B,C和所有记录(然后做某事)?
  6. 您是否“处理”了所有列(分组),或者是否有一个良好的比例只能用于报告(例如,您希望保留数据,但在最终结果时间之前不需要拉入该列的显式)?

溶液

确保您至少安装了熊猫 0.10.1

逐块读取迭代文件多个表查询

由于 pytables 已针对行进行优化(这是您查询的内容),因此我们将为每组字段创建一个表。通过这种方式,可以轻松选择一小组字段(这将与大表一起使用,但这样做更有效率...我想我将来也许能够解决这个限制...无论如何,这更直观):
(以下是伪代码。

import numpy as np
import pandas as pd

# create a store
store = pd.HDFStore('mystore.h5')

# this is the key to your storage:
#    this maps your fields to a specific group, and defines 
#    what you want to have as data_columns.
#    you might want to create a nice class wrapping this
#    (as you will want to have this map and its inversion)  
group_map = dict(
    A = dict(fields = ['field_1','field_2',.....], dc = ['field_1',....,'field_5']),
    B = dict(fields = ['field_10',......        ], dc = ['field_10']),
    .....
    REPORTING_ONLY = dict(fields = ['field_1000','field_1001',...], dc = []),

)

group_map_inverted = dict()
for g, v in group_map.items():
    group_map_inverted.update(dict([ (f,g) for f in v['fields'] ]))

读取文件并创建存储(实质上是执行以下操作):append_to_multiple

for f in files:
   # read in the file, additional options may be necessary here
   # the chunksize is not strictly necessary, you may be able to slurp each 
   # file into memory in which case just eliminate this part of the loop 
   # (you can also change chunksize if necessary)
   for chunk in pd.read_table(f, chunksize=50000):
       # we are going to append to each table by group
       # we are not going to create indexes at this time
       # but we *ARE* going to create (some) data_columns

       # figure out the field groupings
       for g, v in group_map.items():
             # create the frame for this group
             frame = chunk.reindex(columns = v['fields'], copy = False)    

             # append it
             store.append(g, frame, index=False, data_columns = v['dc'])

现在,您已经在文件中拥有了所有表(实际上,如果您愿意,可以将它们存储在单独的文件中,并且可能必须将文件名添加到group_map,但可能这不是必需的)。

以下是获取列并创建新列的方式:

frame = store.select(group_that_I_want)
# you can optionally specify:
# columns = a list of the columns IN THAT GROUP (if you wanted to
#     select only say 3 out of the 20 columns in this sub-table)
# and a where clause if you want a subset of the rows

# do calculations on this frame
new_frame = cool_function_on_frame(frame)

# to 'add columns', create a new group (you probably want to
# limit the columns in this new_group to be only NEW ones
# (e.g. so you don't overlap from the other tables)
# add this info to the group_map
store.append(new_group, new_frame.reindex(columns = new_columns_created, copy = False), data_columns = new_columns_created)

当您准备好post_processing时:

# This may be a bit tricky; and depends what you are actually doing.
# I may need to modify this function to be a bit more general:
report_data = store.select_as_multiple([groups_1,groups_2,.....], where =['field_1>0', 'field_1000=foo'], selector = group_1)

关于data_columns,您实际上不需要定义任何data_columns;它们允许您根据列对行进行子选择。例如:

store.select(group, where = ['field_1000=foo', 'field_1001>0'])

在最终的报告生成阶段,您可能最感兴趣的是它们(实质上,数据列与其他列隔离开来,如果您定义很多,这可能会对效率产生一些影响)。

您可能还希望:

  • 创建一个函数,该函数获取字段列表,在groups_map中查找组,然后选择这些组并连接结果,以便获得结果帧(这实质上是select_as_multiple所做的)。这样,结构对您来说将非常透明。
  • 某些数据列上的索引(使行子集化速度更快)。
  • 启用压缩。

当您有问题时,请告诉我!


答案 2

我认为上面的答案缺少一种我发现非常有用的简单方法。

当我的文件太大而无法在内存中加载时,我会将该文件分解为多个较小的文件(按行或列)

示例:如果30天内的交易数据约为30GB大小,我将其分解为每天约1GB大小的文件。我随后分别处理每个文件,并在最后聚合结果

最大的优点之一是它允许并行处理文件(多个线程或进程)

另一个优点是文件操作(如示例中的添加/删除日期)可以通过常规shell命令完成,这在更高级/复杂的文件格式中是不可能的。

这种方法并不涵盖所有方案,但在很多方案中都非常有用。