"""
|
Tests multithreading behaviour for reading and
|
parsing files for each parser defined in parsers.py
|
"""
|
from contextlib import ExitStack
|
from io import BytesIO
|
from multiprocessing.pool import ThreadPool
|
|
import numpy as np
|
import pytest
|
|
import pandas as pd
|
from pandas import DataFrame
|
import pandas._testing as tm
|
from pandas.util.version import Version
|
|
xfail_pyarrow = pytest.mark.usefixtures("pyarrow_xfail")
|
|
# We'll probably always skip these for pyarrow
|
# Maybe we'll add our own tests for pyarrow too
|
pytestmark = [
|
pytest.mark.single_cpu,
|
pytest.mark.slow,
|
]
|
|
|
@pytest.mark.filterwarnings("ignore:Passing a BlockManager:DeprecationWarning")
|
def test_multi_thread_string_io_read_csv(all_parsers, request):
|
# see gh-11786
|
parser = all_parsers
|
if parser.engine == "pyarrow":
|
pa = pytest.importorskip("pyarrow")
|
if Version(pa.__version__) < Version("16.0"):
|
request.applymarker(
|
pytest.mark.xfail(reason="# ValueError: Found non-unique column index")
|
)
|
max_row_range = 100
|
num_files = 10
|
|
bytes_to_df = (
|
"\n".join([f"{i:d},{i:d},{i:d}" for i in range(max_row_range)]).encode()
|
for _ in range(num_files)
|
)
|
|
# Read all files in many threads.
|
with ExitStack() as stack:
|
files = [stack.enter_context(BytesIO(b)) for b in bytes_to_df]
|
|
pool = stack.enter_context(ThreadPool(8))
|
|
results = pool.map(parser.read_csv, files)
|
first_result = results[0]
|
|
for result in results:
|
tm.assert_frame_equal(first_result, result)
|
|
|
def _generate_multi_thread_dataframe(parser, path, num_rows, num_tasks):
|
"""
|
Generate a DataFrame via multi-thread.
|
|
Parameters
|
----------
|
parser : BaseParser
|
The parser object to use for reading the data.
|
path : str
|
The location of the CSV file to read.
|
num_rows : int
|
The number of rows to read per task.
|
num_tasks : int
|
The number of tasks to use for reading this DataFrame.
|
|
Returns
|
-------
|
df : DataFrame
|
"""
|
|
def reader(arg):
|
"""
|
Create a reader for part of the CSV.
|
|
Parameters
|
----------
|
arg : tuple
|
A tuple of the following:
|
|
* start : int
|
The starting row to start for parsing CSV
|
* nrows : int
|
The number of rows to read.
|
|
Returns
|
-------
|
df : DataFrame
|
"""
|
start, nrows = arg
|
|
if not start:
|
return parser.read_csv(
|
path, index_col=0, header=0, nrows=nrows, parse_dates=["date"]
|
)
|
|
return parser.read_csv(
|
path,
|
index_col=0,
|
header=None,
|
skiprows=int(start) + 1,
|
nrows=nrows,
|
parse_dates=[9],
|
)
|
|
tasks = [
|
(num_rows * i // num_tasks, num_rows // num_tasks) for i in range(num_tasks)
|
]
|
|
with ThreadPool(processes=num_tasks) as pool:
|
results = pool.map(reader, tasks)
|
|
header = results[0].columns
|
|
for r in results[1:]:
|
r.columns = header
|
|
final_dataframe = pd.concat(results)
|
return final_dataframe
|
|
|
@xfail_pyarrow # ValueError: The 'nrows' option is not supported
|
def test_multi_thread_path_multipart_read_csv(all_parsers):
|
# see gh-11786
|
num_tasks = 4
|
num_rows = 48
|
|
parser = all_parsers
|
file_name = "__thread_pool_reader__.csv"
|
df = DataFrame(
|
{
|
"a": np.random.default_rng(2).random(num_rows),
|
"b": np.random.default_rng(2).random(num_rows),
|
"c": np.random.default_rng(2).random(num_rows),
|
"d": np.random.default_rng(2).random(num_rows),
|
"e": np.random.default_rng(2).random(num_rows),
|
"foo": ["foo"] * num_rows,
|
"bar": ["bar"] * num_rows,
|
"baz": ["baz"] * num_rows,
|
"date": pd.date_range("20000101 09:00:00", periods=num_rows, freq="s"),
|
"int": np.arange(num_rows, dtype="int64"),
|
}
|
)
|
|
with tm.ensure_clean(file_name) as path:
|
df.to_csv(path)
|
|
final_dataframe = _generate_multi_thread_dataframe(
|
parser, path, num_rows, num_tasks
|
)
|
tm.assert_frame_equal(df, final_dataframe)
|