Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,21 +1,8 @@
diff --git a/src/datatrove/pipeline/dedup/minhash.py b/src/datatrove/pipeline/dedup/minhash.py
index f644389..3f1b50b 100644
--- a/src/datatrove/pipeline/dedup/minhash.py
+++ b/src/datatrove/pipeline/dedup/minhash.py
@@ -256,6 +254,8 @@ class MinhashDedupSignature(PipelineStep):
doc_idx,
)
)
+ else:
+ logger.warning(f"No singles {doc.text=} on {rank=}")
for file in buckets:
file.close()

diff --git a/src/datatrove/utils/text.py b/src/datatrove/utils/text.py
index 7ab7d3d..d0c9cb6 100644
index 7ab7d3d..233940b 100644
--- a/src/datatrove/utils/text.py
+++ b/src/datatrove/utils/text.py
@@ -259,6 +257,8 @@ def simplify_text(text: str, config=DEF_TEXT_NORM_CONFIG) -> str:
@@ -259,6 +259,8 @@ def simplify_text(text: str, config=DEF_TEXT_NORM_CONFIG) -> str:

# from https://tedboy.github.io/nlps/_modules/nltk/util.html#ngrams
def ngrams(sequence: Iterable, n: int):
Expand Down
14 changes: 11 additions & 3 deletions corpus/llm-jp-corpus-v4/common/dedup/installer/install.sh
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
#!/bin/bash

work_dir=/model/experiments/0118_dedup_corpusv4_ja
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"

: "${work_dir:=/model/experiments/0118_dedup_corpusv4_ja}"
env_dir=${work_dir}/environment
venv_dir=${env_dir}/.venv
src_dir=${env_dir}/src
script_root=${work_dir}/scripts/corpus/llm-jp-corpus-v4/common/dedup
script_root=${SCRIPT_DIR}/..

export UV_PROJECT_ENVIRONMENT=$venv_dir

Expand All @@ -24,7 +26,13 @@ uv add --no-cache -r ${script_root}/installer/requirements.txt
# install customized datatrove
mkdir -p $src_dir
cd $src_dir || exit
git clone https://github.com/huggingface/datatrove.git -b v0.4.0
git clone https://github.com/huggingface/datatrove.git -b v0.4.0 --depth=1
cd datatrove || exit
patch -p1 <${script_root}/installer/datatrove_diff.patch
uv pip install --no-cache-dir ".[io,processing,cli]"

cd $src_dir || exit
git clone https://github.com/WorksApplications/SudachiPy.git -b v0.5.4 --depth=1
cd SudachiPy || exit
patch -p1 <${script_root}/installer/sudachipy_diff.patch
uv pip install --no-cache-dir "."
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
datatrove[io,processing,cli]==0.4.0 # will be re-installed, but install to resolve dependency
orjson
paramiko # required to run processes on local multi node
spacy[ja] # required when running minhash (datatrove)
sudachipy==0.5.4 # required when running minhash (datatrove)
paramiko # required to run processes on local multi node
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
diff --git a/sudachipy/__init__.py b/sudachipy/__init__.py
index 0b3e8a2..3bafbc0 100644
--- a/sudachipy/__init__.py
+++ b/sudachipy/__init__.py
@@ -16,9 +16,8 @@ from . import utf8inputtextbuilder
from . import tokenizer
from . import config

-from pkg_resources import get_distribution, DistributionNotFound
+import importlib.metadata
try:
- __version__ = get_distribution(__name__).version
-except DistributionNotFound:
- # package is not installed
+ __version__ = importlib.metadata.version(__name__)
+except importlib.metadata.PackageNotFoundError:
pass
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
# 類似重複除去スクリプト

このディレクトリには、コーパスからの類似重複を1台のサーバ上で複数プロセス並列による処理で除去するためのスクリプトが含まれています。
重複除去は、[datatrove](https://github.com/huggingface/datatrove) に実装された Minhash-LSH をベースとしています。

## システム要件

動作確認を行ったシステムは以下の通りです。
- OS: Ubuntu 24.04.3 LTS
- Python: Python 3.12.3, pip 24.0 (apt によりインストール)

GNU Parallel を利用する場合は apt でインストールしてください。

```
$ sudo apt install parallel
```

## スクリプト実行順

以下の手順で実行します。

0. 事前準備

作業スペースを作成するディレクトリを環境変数 `work_dir` にセットします。
ここでは "/home/foo/work" とします。

$ cd corpus/llm-jp-corpus-v4/common/dedup/
$ export work_dir=/home/foo/work
$ bash installer/install.sh

"/home/foo/work" の下に "environment" ディレクトリが作成されます。
次のコマンドを実行して Python 仮想環境を有効にします。

$ . ${work_dir}/environment/.venv/bin/activate

以降の作業は仮想環境を有効にした状態で行います。

1. ファイルサイズを均一化して処理時間のバランスを取るためのリシャーディング

処理対象とするテキストコーパスをだいたい1GBずつになるように分割します。
テキストコーパスは各レコードが改行で区切られた JSON ファイル (jsonl, ndjson) で
レコードを識別する "id" とテキスト "text" フィールドを含む必要があります。

説明のため、コーパスファイルは bzip2 で圧縮された状態、ファイル名は `*.jsonl.bz2` で、環境変数 `CORPUS_DIR` に指定されたディレクトリ内に配置されているものとします。また、分割したファイルは bzip2 で圧縮し、環境変数 `INPUT_DIR` (重複除去処理の入力になるので) で指定されたディレクトリに出力するものとします。

次のコマンドを実行すると、 "${INPUT_DIR}" の下に "part_0000.jsonl.bz2" から "part_NNNN.jsonl.bz2" が出力されます。

$ rm -rf ${INPUT_DIR}
$ mkdir ${INPUT_DIR}
$ for f in ${CORPUS_DIR}/*.jsonl.bz2; do bzip2 -dc $f; done | split - -d --suffix-length=4 --line-bytes=1G --additional-suffix='.jsonl' --filter='bzip2 -c > $FILE.bz2' ${INPUT_DIR}/part_

bzip2 以外のフォーマットの場合は上のコマンドの `bzip2` を `gzip` などに置き換えてください。また分割するサイズは `--line-bytes=1G` の部分で指定しているので、コーパスが小さいなどの理由でより細かく分割したい場合は、たとえば `--line-bytes=100M` のように変更してください (細かく分割した方がより均等になります) 。

2. 重複レコード検出

分割したテキストコーパス内のレコードに対し、各レコードのテキストから signature を計算し、十分に近いものをクラスタにまとめて各クラスタの最初の1件のみを抽出します。

重複除去処理の結果を出力するディレクトリを環境変数 `OUTPUT_DIR` に指定します。このディレクトリはどこを指定しても構いませんが、以下の例では後でまとめて削除できるよう `${work_dir}/environment/` の下に置いています。

$ export OUTPUT_DIR=${work_dir}/environment/dedup_result
$ python -m minhash.local_single_node.minhash_dedup ${INPUT_DIR} ${OUTPUT_DIR}

Stage 2 で "too many open files" というエラーが出る場合、先にオープン可能なファイル数の上限を増やすと解決する可能性があります。

$ ulimit -n 65535

(補足1) minhash_dedup.py は以下のオプションパラメータを指定できます。

- `--ngram`: シグネチャの計算に利用する N-Gram の長さ、指定しない場合 5
- `--buckets`: バケット数、指定しない場合 20 で、値を大きくすると重複文書検出の recall が向上しますが precision が低下します
- `--hashes_per_bucket`: バケットあたりのハッシュ数、指定しない場合 10 で、値を大きくすると precision が向上しますが recall が低下します
- `--max_worker`: 並列実行するワーカーの最大値、指定しない場合 16
- `--stage`: どのステージまで実行するかを指定、指定しない場合 4

(補足2) 入力コーパスの JSON の "id" や "text" フィールドの名称が違う場合や、複数の項目を組み合わせて作成する必要がある場合、 `json_format.py` の中で adapter 関数を定義することで動的に変換できます。実装例 `custom_adapter` を参考にしてください。

3. 重複レコードの ID リスト作成

`${OUTPUT_DIR}/minhash-5gram-20buckets-10hashes/results/removed/*.jsonl.gz` に重複レコードと判定されて除去されたレコードが出力されているので、ここから除去するべきレコードの ID リストを作成します。環境変数 `RESULTS_DIR` に `results` ディレクトリのパスを指定してください。 `create_removed_idlist.py` はこの環境変数を参照するので、必ず指定する必要があります。

もし minhash_dedup.py を実行する際にオプションパラメータをデフォルト値以外に指定した場合、`${OUTPUT_DIR}` の下のディレクトリ名がパラメータに合わせて変化するので、正しいディレクトリ名に置き換えてください。

$ export RESULTS_DIR=${OUTPUT_DIR}/minhash-5gram-20buckets-10hashes/results
$ python -m minhash.local_single_node.create_removed_idlist

実行すると `${RESULTS_DIR}/removed_id_list.txt` に ID リストが出力されます。

4. 元のテキストコーパスから重複レコード除去

分割前のテキストコーパスから重複レコードの ID リストに含まれるレコードを除去します。元のコーパスのファイル名、レコードの内容および順序を保持したファイルを環境変数 `DEDUPED_DIR` で指定したディレクトリの下に出力します。`filter_removed_ids.py` も環境変数 `RESULTS_DIR` を参照するので、前の手順を実行した後にシェルを再起動した場合などは再度設定してください。

$ export DEDUPED_DIR=${CORPUS_DIR}/deduped # 任意のディレクトリで良い
$ mkdir -p ${DEDUPED_DIR}
(GNU parallel を利用する場合)
$ ls ${CORPUS_DIR}/*.jsonl.bz2 | parallel -j 10 "bzip2 -dc {} | python -m minhash.local_single_node.filter_removed_ids | bzip2 -c > ${DEDUPED_DIR}/{/}"
(GNU parallel を利用しない場合)
$ for f in ${CORPUS_DIR}/*.jsonl.bz2; do bzip2 -dc $f | python -m minhash.local_single_node.filter_removed_ids | bzip2 -c > ${DEDUPED_DIR}/${f##*/}; done

GNU parallel を利用すると並列に処理するので短時間で終わります。並列数は `-j` の後の数字で指定します。

5. 作業ディレクトリを削除

作業完了後、 Python 仮想環境を無効化し、 `${work_dir}/environment/` 内の作業ファイルを削除します。

$ deactivate
$ rm -rf ${work_dir}/environment/


## 関連リポジトリ

参考:[datatrove](https://github.com/huggingface/datatrove)
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
# Duplicate Removal Script

This directory contains scripts for removing similar duplicates from a corpus using parallel processing across multiple processes on a single server.
Duplicate removal is based on Minhash-LSH implemented in [datatrove](https://github.com/huggingface/datatrove).

## System Requirements

The following systems have been tested for compatibility:
- OS: Ubuntu 24.04.3 LTS
- Python: Python 3.12.3, pip 24.0 (installed via apt)

If you are going to use GNU Parallel, install it via apt.

```
$ sudo apt install parallel
```

## Script Execution Procedure

Execute in the following order.

0. Prerequisites

Set the directory for your workspace in the environment variable `work_dir`.
Here, we use "/home/foo/work".

$ cd corpus/llm-jp-corpus-v4/common/dedup/
$ export work_dir=/home/foo/work
$ bash installer/install.sh

An "environment" directory will be created under "/home/foo/work".
Run the following command to activate the Python virtual environment.

$ . ${work_dir}/environment/.venv/bin/activate

All subsequent operations should be performed in the activated virtual environment.

1. Resharding to balance processing time by making file sizes uniform

Split the text corpus to be processed into roughly 1GB chunks.
The text corpus consists of JSON files (jsonl, ndjson) where each record is separated by a line break.
Each record must contain an "id" field for identification and a "text" field for the text content.

For illustrative purposes, assume the corpus files are compressed with bzip2, named `*.jsonl.bz2`, and located in the directory specified by the environment variable `CORPUS_DIR`. Furthermore, assume the split files are compressed with bzip2 and output to the directory specified by the environment variable `INPUT_DIR` (as this serves as input for deduplication processing).

Executing the following command outputs files named "part_0000.jsonl.bz2" through "part_NNNN.jsonl.bz2" under "${INPUT_DIR}".

$ rm -rf ${INPUT_DIR}
$ mkdir ${INPUT_DIR}
$ for f in ${CORPUS_DIR}/*.jsonl.bz2; do bzip2 -dc $f; done | split - -d --suffix-length=4 --line-bytes=1G --additional-suffix='.jsonl' --filter='bzip2 -c > $FILE.bz2' ${INPUT_DIR}/part_

For formats other than bzip2, replace `bzip2` in the above command with `gzip` or so. The split size is specified by `--line-bytes=1G`. If you want finer splits (e.g., due to a small corpus), change it to something like `--line-bytes=100M` (finer splits result in more even distribution).

2. Duplicate Record Detection

For each record in the split text corpus, calculate a signature from its text. Group sufficiently similar records into clusters and extract only the first record from each cluster.

Specify the directory for outputting the deduplication results using the environment variable `OUTPUT_DIR`. This directory can be specified anywhere, but in the example below, it is placed under `${work_dir}/environment/` so it can be deleted later in bulk.

$ export OUTPUT_DIR=${work_dir}/environment/dedup_result
$ python -m minhash.local_single_node.minhash_dedup ${INPUT_DIR} ${OUTPUT_DIR}

If you encounter a "too many open files" error during Stage 2, increasing the maximum number of open files beforehand may resolve the issue.

$ ulimit -n 65535

(Note 1) minhash_dedup.py accepts the following option parameters.

- `--ngram`: Length of the N-Gram used for signature calculation. Defaults to 5 if not specified.
- `--buckets`: Number of buckets. Default is 20. Increasing this value improves recall for duplicate document detection but reduces precision.
- `--hashes_per_bucket`: Number of hashes per bucket. Default is 10. Increasing this value improves precision but reduces recall.
- `--max_worker`: Maximum number of parallel workers. Default is 16.
- `--stage`: Specifies the stage up to which to execute. Default is 4.

(Note 2) If the names of the "id" or "text" fields in the input corpus JSON differ, or if multiple items need to be combined to create the output, you can dynamically transform the data by defining an adapter function within `json_format.py`. Please refer to the implementation example `custom_adapter`.

3. Creating a List of IDs for Duplicate Records

Records identified as duplicates and removed are output to `${OUTPUT_DIR}/minhash-5gram-20buckets-10hashes/results/removed/*.jsonl.gz`. Create a list of IDs for records to be removed from this file. Specify the path to the `results` directory in the environment variable `RESULTS_DIR`. Since `create_removed_idlist.py` references this environment variable, it must be specified.

If you specified non-default option parameters when running `minhash_dedup.py`, the directory name under `${OUTPUT_DIR}` will change accordingly. Replace it with the correct directory name.

$ export RESULTS_DIR=${OUTPUT_DIR}/minhash-5gram-20buckets-10hashes/results
$ python -m minhash.local_single_node.create_removed_idlist

Running this will output the ID list to `${RESULTS_DIR}/removed_id_list.txt`.

4. Remove duplicate records from the original text corpus

Removes records contained in the duplicate ID list from the pre-split text corpus. Outputs files that preserve the original corpus filenames, record contents, and order to the directory specified by the environment variable `DEDUPED_DIR`. Since `filter_removed_ids.py` also references the environment variable `RESULTS_DIR`, you must reset it if you restart the shell after executing the previous step.

$ export DEDUPED_DIR=${CORPUS_DIR}/deduped # Any directory is acceptable
$ mkdir -p ${DEDUPED_DIR}
(When using GNU parallel)
$ ls ${CORPUS_DIR}/*.jsonl.bz2 | parallel -j 10 "bzip2 -dc {} | python -m minhash.local_single_node.filter_removed_ids | bzip2 -c > ${DEDUPED_DIR}/{/}"
(When not using GNU parallel)
$ for f in ${CORPUS_DIR}/*.jsonl.bz2; do bzip2 -dc $f | python -m minhash.local_single_node.filter_removed_ids | bzip2 -c > ${DEDUPED_DIR}/${f##*/}; done

Using GNU parallel enables parallel processing, completing the task quickly. The number of parallel processes is specified by the number after `-j`.

5. Delete the working directory

After completing the work, deactivate the Python virtual environment and delete the working files in `${work_dir}/environment/`.

$ deactivate
$ rm -rf ${work_dir}/environment/


## Related Repositories

Reference: [datatrove](https://github.com/huggingface/datatrove)
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
#!/usr/bin/env python
import gzip
import json
import os
from pathlib import Path

from .json_format import adapter


def get_id(record: dict) -> str:
adapted = adapter(None, record, "", 0)
return adapted["id"]


def iter_remove_records(removed_dir: Path):
for file in removed_dir.glob("*.jsonl.gz"):
with gzip.open(file, "rt") as fin:
for line in fin:
record = json.loads(line)
doc_id = get_id(record)
if doc_id == "":
raise RuntimeError(f"No id found in '{line}'")

yield doc_id


def main():
results_dir = Path(os.environ.get(
"RESULTS_DIR",
str(Path(__file__).parent / "test_results")))
removed_dir = results_dir / "removed/"

with open(results_dir / "removed_id_list.txt", "wt") as fout:
for doc_id in iter_remove_records(Path(removed_dir)):
print(doc_id, file=fout)


if __name__ == "__main__":
main()
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
#!/usr/bin/env python
import bz2
import json
import os
from pathlib import Path
import sys
from typing import Set

from .json_format import adapter

removed_id_list: Set[str] = set()


def get_id(record: dict) -> str:
adapted = adapter(None, record, "", 0)
return adapted["id"]


def get_removed_id_list_path() -> Path:
results_dir = Path(os.environ.get(
"RESULTS_DIR",
str(Path(__file__).parent / "test_results")))

return results_dir / "removed_id_list.txt"


def read_removed_id_list():
global removed_id_list
with open(get_removed_id_list_path(), "rt") as fin:
for line in fin:
removed_id_list.add(line.strip())


def filter_removed_records():
global removed_id_list

for line in sys.stdin:
record = json.loads(line)
doc_id = get_id(record)
if doc_id in removed_id_list:
continue

print(line, end="")


if __name__ == "__main__":
read_removed_id_list()
filter_removed_records()
Loading