diff --git a/.env b/.env index 7f90f066..013d118e 100644 --- a/.env +++ b/.env @@ -2,3 +2,5 @@ DATABASE_URL=postgresql://stationapi:stationapi@localhost/stationapi DISABLE_BUS_FEATURE=false DISABLE_GRPC_WEB=false +ODPT_API_KEY= +ENABLE_RAIL_GTFS=true diff --git a/data/create_table.sql b/data/create_table.sql index 4a763710..b3c1fc85 100644 --- a/data/create_table.sql +++ b/data/create_table.sql @@ -643,7 +643,7 @@ ALTER TABLE public.gtfs_agencies OWNER TO stationapi; -- -- Name: gtfs_routes; Type: TABLE; Schema: public --- GTFS route information (bus lines) +-- GTFS route information (bus lines and rail lines from GTFS sources) -- CREATE UNLOGGED TABLE public.gtfs_routes ( @@ -656,18 +656,23 @@ CREATE UNLOGGED TABLE public.gtfs_routes ( route_long_name_zh TEXT, route_long_name_ko TEXT, route_desc TEXT, - route_type INTEGER NOT NULL DEFAULT 3, -- 3 = Bus + route_type INTEGER NOT NULL DEFAULT 3, -- 3 = Bus, 1 = Subway, 2 = Rail route_url TEXT, route_color VARCHAR(6), route_text_color VARCHAR(6), route_sort_order INTEGER, - line_cd INTEGER REFERENCES public.lines(line_cd) + line_cd INTEGER REFERENCES public.lines(line_cd), + transport_type INTEGER DEFAULT 1, -- 1: Bus, 2: Rail (GTFS) + company_cd INTEGER REFERENCES public.companies(company_cd), + source_id VARCHAR(50) -- GTFS source identifier (e.g., "toei_bus", "tokyo_metro") ); ALTER TABLE public.gtfs_routes OWNER TO stationapi; CREATE INDEX idx_gtfs_routes_agency_id ON public.gtfs_routes USING btree (agency_id); CREATE INDEX idx_gtfs_routes_line_cd ON public.gtfs_routes USING btree (line_cd); +CREATE INDEX idx_gtfs_routes_transport_type ON public.gtfs_routes USING btree (transport_type); +CREATE INDEX idx_gtfs_routes_source_id ON public.gtfs_routes USING btree (source_id); -- -- Name: gtfs_stops; Type: TABLE; Schema: public @@ -856,3 +861,62 @@ ALTER TABLE public.gtfs_feed_info OWNER TO stationapi; -- ============================================================ -- End of GTFS Bus Integration Schema -- ============================================================ + +-- ============================================================ +-- Stop Pattern Detection Schema +-- For detecting train type stop pattern changes from ODPT API +-- ============================================================ + +-- +-- Name: stop_pattern_snapshots; Type: TABLE; Schema: public +-- Snapshots of stop patterns extracted from ODPT TrainTimetable +-- + +CREATE TABLE public.stop_pattern_snapshots ( + id SERIAL PRIMARY KEY, + operator_id VARCHAR(100) NOT NULL, -- odpt.Operator:TokyoMetro + railway_id VARCHAR(100) NOT NULL, -- odpt.Railway:TokyoMetro.Marunouchi + train_type_id VARCHAR(100) NOT NULL, -- odpt.TrainType:TokyoMetro.Local + train_type_name VARCHAR(100), -- 各停 + station_ids TEXT[] NOT NULL, -- Array of station IDs + station_names TEXT[], -- Array of station names (for reference) + captured_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + captured_date DATE DEFAULT CURRENT_DATE, + UNIQUE(railway_id, train_type_id, captured_date) +); + +ALTER TABLE public.stop_pattern_snapshots OWNER TO stationapi; + +CREATE INDEX idx_stop_pattern_snapshots_railway ON public.stop_pattern_snapshots USING btree (railway_id, train_type_id); +CREATE INDEX idx_stop_pattern_snapshots_operator ON public.stop_pattern_snapshots USING btree (operator_id); +CREATE INDEX idx_stop_pattern_snapshots_captured ON public.stop_pattern_snapshots USING btree (captured_at DESC); + +-- +-- Name: stop_pattern_changes; Type: TABLE; Schema: public +-- Log of detected stop pattern changes +-- + +CREATE TABLE public.stop_pattern_changes ( + id SERIAL PRIMARY KEY, + operator_id VARCHAR(100) NOT NULL, + railway_id VARCHAR(100) NOT NULL, + railway_name VARCHAR(100), -- 丸ノ内線 + train_type_id VARCHAR(100) NOT NULL, + train_type_name VARCHAR(100), -- 各停 + change_type VARCHAR(20) NOT NULL, -- 'added' or 'removed' + station_id VARCHAR(100) NOT NULL, + station_name VARCHAR(100), + detected_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + acknowledged BOOLEAN DEFAULT FALSE, + acknowledged_at TIMESTAMP +); + +ALTER TABLE public.stop_pattern_changes OWNER TO stationapi; + +CREATE INDEX idx_stop_pattern_changes_detected ON public.stop_pattern_changes USING btree (detected_at DESC); +CREATE INDEX idx_stop_pattern_changes_unack ON public.stop_pattern_changes (acknowledged) WHERE acknowledged = FALSE; +CREATE INDEX idx_stop_pattern_changes_railway ON public.stop_pattern_changes USING btree (railway_id, train_type_id); + +-- ============================================================ +-- End of Stop Pattern Detection Schema +-- ============================================================ diff --git a/docs/railway-gtfs-integration-design.md b/docs/railway-gtfs-integration-design.md new file mode 100644 index 00000000..e74f183c --- /dev/null +++ b/docs/railway-gtfs-integration-design.md @@ -0,0 +1,324 @@ +# 鉄道データ自動更新設計書 + +## 概要 + +ダイヤ改正への迅速な対応を実現するため、ODPT(公共交通オープンデータセンター)のAPIを活用した**停車パターン差分検知システム**を構築する。 + +## 課題の整理 + +### 本当の課題 +- **駅マスタデータ**: ほぼ変わらない(新駅追加、駅名変更は稀) +- **種別停車パターン**: 頻繁に変わる(ダイヤ改正のたびに変更される) + +### GTFSの限界 +- GTFSは「時刻表データ」であり、「列車種別」という概念がない +- `odpt:TrainTimetable`(時刻表API)で個別列車の停車駅は取得可能 +- しかし「快速はこの駅に停車する」というマスタデータは存在しない + +### 解決アプローチ +時刻表データから停車パターンを**集計・推論**し、前回との**差分を検知**する。 + +## システム設計 + +### アーキテクチャ + +``` +┌─────────────────────────────────────────────────────────┐ +│ ODPT API (TrainTimetable) │ +│ 定期的に時刻表データを取得 │ +└─────────────────┬───────────────────────────────────────┘ + ▼ +┌─────────────────────────────────────────────────────────┐ +│ 停車パターン抽出エンジン │ +│ 種別×路線ごとに「どの駅に停車する便があるか」を集計 │ +│ 例: 中央線快速 → {東京, 神田, 御茶ノ水, 四ツ谷, ...} │ +└─────────────────┬───────────────────────────────────────┘ + ▼ +┌─────────────────────────────────────────────────────────┐ +│ 差分検知エンジン │ +│ 前回スナップショットと比較 │ +│ → 新規停車駅、停車取りやめ駅を検出 │ +└─────────────────┬───────────────────────────────────────┘ + ▼ +┌─────────────────────────────────────────────────────────┐ +│ 通知/ログ出力 │ +│ 変更があれば管理者に通知 │ +│ 将来的にはSlack/Discord連携も可能 │ +└─────────────────┬───────────────────────────────────────┘ + ▼ +┌─────────────────────────────────────────────────────────┐ +│ 人間が確認 │ +│ station_station_types.csv を手動更新 │ +└─────────────────────────────────────────────────────────┘ +``` + +### データベーススキーマ + +```sql +-- 停車パターンのスナップショット +CREATE TABLE stop_pattern_snapshots ( + id SERIAL PRIMARY KEY, + railway_id VARCHAR(100) NOT NULL, -- odpt.Railway:JR-East.ChuoRapid + train_type_id VARCHAR(100) NOT NULL, -- odpt.TrainType:JR-East.Rapid + train_type_name VARCHAR(100), -- 快速 + station_ids TEXT[] NOT NULL, -- 停車駅IDの配列 + station_names TEXT[], -- 停車駅名の配列(参照用) + captured_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + UNIQUE(railway_id, train_type_id, captured_at::date) +); + +-- 停車パターン変更ログ +CREATE TABLE stop_pattern_changes ( + id SERIAL PRIMARY KEY, + railway_id VARCHAR(100) NOT NULL, + train_type_id VARCHAR(100) NOT NULL, + train_type_name VARCHAR(100), + change_type VARCHAR(20) NOT NULL, -- 'added' or 'removed' + station_id VARCHAR(100) NOT NULL, + station_name VARCHAR(100), + detected_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + acknowledged BOOLEAN DEFAULT FALSE, -- 確認済みフラグ + acknowledged_at TIMESTAMP +); + +-- インデックス +CREATE INDEX idx_stop_pattern_snapshots_railway ON stop_pattern_snapshots(railway_id, train_type_id); +CREATE INDEX idx_stop_pattern_changes_detected ON stop_pattern_changes(detected_at DESC); +CREATE INDEX idx_stop_pattern_changes_unack ON stop_pattern_changes(acknowledged) WHERE acknowledged = FALSE; +``` + +### ODPT API エンドポイント + +``` +# 列車時刻表(メイン) +GET https://api.odpt.org/api/v4/odpt:TrainTimetable + ?odpt:operator=odpt.Operator:TokyoMetro + &acl:consumerKey={API_KEY} + +# 路線マスタ +GET https://api.odpt.org/api/v4/odpt:Railway + ?odpt:operator=odpt.Operator:TokyoMetro + &acl:consumerKey={API_KEY} + +# 列車種別マスタ +GET https://api.odpt.org/api/v4/odpt:TrainType + ?odpt:operator=odpt.Operator:TokyoMetro + &acl:consumerKey={API_KEY} + +# 駅マスタ +GET https://api.odpt.org/api/v4/odpt:Station + ?odpt:operator=odpt.Operator:TokyoMetro + &acl:consumerKey={API_KEY} +``` + +### TrainTimetable レスポンス例 + +```json +{ + "@id": "urn:ucode:_00001C000000000000010000030E9A5F", + "@type": "odpt:TrainTimetable", + "odpt:railway": "odpt.Railway:TokyoMetro.Marunouchi", + "odpt:trainNumber": "A0601", + "odpt:trainType": "odpt.TrainType:TokyoMetro.Local", + "odpt:trainTimetableObject": [ + { + "odpt:departureTime": "05:00", + "odpt:departureStation": "odpt.Station:TokyoMetro.Marunouchi.Ogikubo" + }, + { + "odpt:departureTime": "05:02", + "odpt:departureStation": "odpt.Station:TokyoMetro.Marunouchi.Minami-asagaya" + }, + ... + ] +} +``` + +### 停車パターン抽出ロジック + +```rust +/// 時刻表データから停車パターンを抽出 +async fn extract_stop_patterns( + timetables: &[TrainTimetable], +) -> HashMap<(RailwayId, TrainTypeId), HashSet> { + let mut patterns: HashMap<(RailwayId, TrainTypeId), HashSet> = HashMap::new(); + + for timetable in timetables { + let key = (timetable.railway.clone(), timetable.train_type.clone()); + let stations = patterns.entry(key).or_insert_with(HashSet::new); + + for stop in &timetable.train_timetable_object { + // 出発駅または到着駅を停車駅として記録 + if let Some(station) = &stop.departure_station { + stations.insert(station.clone()); + } + if let Some(station) = &stop.arrival_station { + stations.insert(station.clone()); + } + } + } + + patterns +} +``` + +### 差分検知ロジック + +```rust +/// 前回のパターンと比較して差分を検出 +fn detect_changes( + previous: &HashMap<(RailwayId, TrainTypeId), HashSet>, + current: &HashMap<(RailwayId, TrainTypeId), HashSet>, +) -> Vec { + let mut changes = Vec::new(); + + for (key, current_stations) in current { + if let Some(prev_stations) = previous.get(key) { + // 新規停車駅 + for station in current_stations.difference(prev_stations) { + changes.push(StopPatternChange { + railway_id: key.0.clone(), + train_type_id: key.1.clone(), + change_type: ChangeType::Added, + station_id: station.clone(), + }); + } + // 停車取りやめ駅 + for station in prev_stations.difference(current_stations) { + changes.push(StopPatternChange { + railway_id: key.0.clone(), + train_type_id: key.1.clone(), + change_type: ChangeType::Removed, + station_id: station.clone(), + }); + } + } else { + // 新規種別 + for station in current_stations { + changes.push(StopPatternChange { + railway_id: key.0.clone(), + train_type_id: key.1.clone(), + change_type: ChangeType::Added, + station_id: station.clone(), + }); + } + } + } + + changes +} +``` + +## 対応事業者 + +ODPT TrainTimetable API で取得可能な事業者: + +| 事業者 | Operator ID | 優先度 | 備考 | +|--------|-------------|--------|------| +| 東京メトロ | TokyoMetro | 高 | 全線対応 | +| 都営地下鉄 | Toei | 高 | 全線対応 | +| JR東日本 | JR-East | 高 | 関東在来線(新幹線除く) | +| 東武鉄道 | Tobu | 中 | 全線対応 | +| 西武鉄道 | Seibu | 中 | 全線対応 | +| 京王電鉄 | Keio | 中 | 全線対応 | +| 小田急電鉄 | Odakyu | 中 | 全線対応 | +| 東急電鉄 | Tokyu | 中 | 全線対応 | +| 京急電鉄 | Keikyu | 中 | 全線対応 | +| 京成電鉄 | Keisei | 低 | 全線対応 | +| 相鉄 | Sotetsu | 低 | 全線対応 | + +## 環境変数 + +```bash +# ODPT APIキー(必須) +ODPT_API_KEY=your_api_key_here + +# 差分検知の有効化 +ENABLE_STOP_PATTERN_DETECTION=true + +# 検知間隔(時間) +STOP_PATTERN_CHECK_INTERVAL_HOURS=24 + +# 通知設定(将来) +# SLACK_WEBHOOK_URL=https://hooks.slack.com/... +# DISCORD_WEBHOOK_URL=https://discord.com/api/webhooks/... +``` + +## 実装計画 + +### Phase 1: 基本機能(今回) +- [x] 複数GTFSソース対応(バス用) +- [ ] ODPT TrainTimetable API クライアント実装 +- [ ] 停車パターン抽出ロジック +- [ ] 差分検知ロジック +- [ ] DBスキーマ追加 +- [ ] CLIコマンド追加(手動実行用) + +### Phase 2: 自動化 +- [ ] 定期実行スケジューラ +- [ ] 通知機能(ログ → Slack/Discord) +- [ ] Webダッシュボード(変更一覧表示) + +### Phase 3: 自動適用 +- [ ] station_station_types との自動マッピング +- [ ] 変更の自動適用(要承認フロー) + +## 検出例 + +``` +[2025-03-15] 停車パターン変更を検出しました: + +路線: JR東日本 中央線快速 +種別: 通勤快速 + +新規停車: + + 中野駅 (odpt.Station:JR-East.Nakano) + +停車取りやめ: + - なし + +--- +路線: 東京メトロ 副都心線 +種別: 急行 + +新規停車: + + 雑司が谷駅 (odpt.Station:TokyoMetro.Zoshigaya) + +停車取りやめ: + - なし +``` + +## 制限事項 + +1. **完全な自動化は困難** + - ODPT の種別ID と StationAPI の type_cd のマッピングが必要 + - 同じ「快速」でも事業者によってIDが異なる + +2. **データの鮮度** + - ODPT データは毎日更新されるわけではない + - ダイヤ改正直後は反映に時間がかかる可能性 + +3. **新幹線非対応** + - JR東日本の新幹線はODPT対象外 + - 引き続き手動管理が必要 + +## 既存GTFS機能との関係 + +### 現在のGTFS機能(バス用) +- 都営バスのGTFSデータをインポート +- gtfs_routes, gtfs_trips, gtfs_stops 等のテーブルに保存 +- transport_type = 1 として stations/lines に統合 + +### 停車パターン検知(新規) +- GTFS とは別系統 +- TrainTimetable API を直接利用 +- 差分検知・通知が目的(データ統合ではない) + +両者は独立して動作し、干渉しない。 + +## 参考URL + +- ODPT 公式: https://www.odpt.org/ +- 開発者登録: https://developer.odpt.org/ +- データカタログ: https://ckan.odpt.org/ +- API仕様: https://developer.odpt.org/documents diff --git a/stationapi/.github/workflows/detect-stop-patterns.yml b/stationapi/.github/workflows/detect-stop-patterns.yml new file mode 100644 index 00000000..6b0f5452 --- /dev/null +++ b/stationapi/.github/workflows/detect-stop-patterns.yml @@ -0,0 +1,251 @@ +name: Detect Stop Pattern Changes + +on: + # schedule: + # # 毎日 09:00 JST (00:00 UTC) に実行 + # - cron: '0 0 * * *' + workflow_dispatch: + # 手動実行も可能 + inputs: + operators: + description: 'Operators to check (comma-separated, or "all")' + required: false + default: 'TokyoMetro,Toei' + +jobs: + detect-changes: + runs-on: ubuntu-latest + timeout-minutes: 30 + + steps: + - name: Configure AWS credentials + uses: aws-actions/configure-aws-credentials@v4 + with: + aws-access-key-id: ${{ secrets.AWS_ACCESS_KEY_ID }} + aws-secret-access-key: ${{ secrets.AWS_SECRET_ACCESS_KEY }} + aws-region: ${{ vars.AWS_REGION || 'ap-northeast-1' }} + + - name: Run ECS Task + id: run-task + env: + OPERATORS: ${{ github.event.inputs.operators || 'TokyoMetro,Toei' }} + run: | + # jqを使って安全にJSONを構築(JSONインジェクション対策) + OVERRIDES=$(jq -n \ + --arg operators "$OPERATORS" \ + '{ + containerOverrides: [{ + name: "detect-stop-patterns", + command: ["./detect_stop_patterns"], + environment: [ + {name: "OPERATORS", value: $operators} + ] + }] + }') + + # ECSタスクを実行 + set +e + TASK_ARN=$(aws ecs run-task \ + --cluster ${{ vars.ECS_CLUSTER }} \ + --task-definition ${{ vars.ECS_TASK_DEFINITION }} \ + --launch-type FARGATE \ + --network-configuration "awsvpcConfiguration={subnets=[${{ vars.ECS_SUBNETS }}],securityGroups=[${{ vars.ECS_SECURITY_GROUPS }}],assignPublicIp=ENABLED}" \ + --overrides "$OVERRIDES" \ + --query 'tasks[0].taskArn' \ + --output text 2>&1) + AWS_EXIT_CODE=$? + set -e + + # エラーチェック + if [ $AWS_EXIT_CODE -ne 0 ]; then + echo "Error: aws ecs run-task failed with exit code $AWS_EXIT_CODE" + echo "AWS CLI output: $TASK_ARN" + exit 1 + fi + + if [ -z "$TASK_ARN" ] || [ "$TASK_ARN" = "None" ] || [ "$TASK_ARN" = "null" ]; then + echo "Error: aws ecs run-task returned empty or invalid task ARN" + echo "Received: '$TASK_ARN'" + exit 1 + fi + + echo "task_arn=$TASK_ARN" >> $GITHUB_OUTPUT + echo "Started ECS task: $TASK_ARN" + + - name: Wait for ECS Task + id: wait-task + run: | + echo "Waiting for task to complete..." + aws ecs wait tasks-stopped \ + --cluster ${{ vars.ECS_CLUSTER }} \ + --tasks ${{ steps.run-task.outputs.task_arn }} + + # タスクの終了コードを確認 + # 0 = 変更なし, 10 = 変更あり, 1 = エラー + EXIT_CODE=$(aws ecs describe-tasks \ + --cluster ${{ vars.ECS_CLUSTER }} \ + --tasks ${{ steps.run-task.outputs.task_arn }} \ + --query 'tasks[0].containers[0].exitCode' \ + --output text) + + echo "Task completed with exit code: $EXIT_CODE" + echo "exit_code=$EXIT_CODE" >> $GITHUB_OUTPUT + + # 終了コードで変更検出を判定 + if [ "$EXIT_CODE" = "10" ]; then + echo "Changes detected (exit code 10)" + echo "has_changes=true" >> $GITHUB_OUTPUT + elif [ "$EXIT_CODE" = "0" ]; then + echo "No changes detected (exit code 0)" + echo "has_changes=false" >> $GITHUB_OUTPUT + else + echo "Error: ECS task failed with exit code $EXIT_CODE" + echo "has_changes=false" >> $GITHUB_OUTPUT + exit 1 + fi + + - name: Get task logs + id: logs + run: | + # CloudWatch Logsからログを取得 + TASK_ID=$(echo "${{ steps.run-task.outputs.task_arn }}" | rev | cut -d'/' -f1 | rev) + + # ログストリーム名を構築(タスク定義の設定に依存) + LOG_STREAM="detect-stop-patterns/$TASK_ID" + + # リトライループでログを取得(CloudWatch Logsへの配信遅延対策) + MAX_RETRIES=5 + RETRY_DELAY=10 + OUTPUT="" + + for i in $(seq 1 $MAX_RETRIES); do + echo "Attempting to fetch logs (attempt $i/$MAX_RETRIES)..." + + if RESULT=$(aws logs get-log-events \ + --log-group-name ${{ vars.ECS_LOG_GROUP }} \ + --log-stream-name "$LOG_STREAM" \ + --query 'events[*].message' \ + --output text 2>&1); then + if [ -n "$RESULT" ] && [ "$RESULT" != "None" ]; then + OUTPUT="$RESULT" + echo "Successfully fetched logs" + break + fi + else + echo "AWS CLI error: $RESULT" + fi + + if [ $i -lt $MAX_RETRIES ]; then + echo "Waiting ${RETRY_DELAY}s before retry..." + sleep $RETRY_DELAY + fi + done + + if [ -z "$OUTPUT" ]; then + echo "Warning: Could not fetch logs after $MAX_RETRIES attempts" + OUTPUT="(ログの取得に失敗しました)" + fi + + # シークレットパターンをマスク(UUIDやODPT IDは保持) + SANITIZED=$(echo "$OUTPUT" | \ + sed -E 's/ODPT_API_KEY=[^ ]*/ODPT_API_KEY=***REDACTED***/g' | \ + sed -E 's/DATABASE_URL=[^ ]*/DATABASE_URL=***REDACTED***/g' | \ + sed -E 's/postgres:\/\/[^@]*@/postgres:\/\/***:***@/g' | \ + sed -E 's/GITHUB_TOKEN=[^ ]*/GITHUB_TOKEN=***REDACTED***/g' | \ + sed -E 's/Bearer [A-Za-z0-9_-]+/Bearer ***REDACTED***/g' | \ + sed -E 's/AKIA[0-9A-Z]{16}/***AWS_ACCESS_KEY***/g' | \ + sed -E 's/[A-Za-z0-9+\/]{40}/***AWS_SECRET_KEY***/g' | \ + sed -E 's/ghp_[A-Za-z0-9]{36}/***GITHUB_PAT***/g' | \ + sed -E 's/gho_[A-Za-z0-9]{36}/***GITHUB_OAUTH***/g' | \ + sed -E 's/sk-[A-Za-z0-9]{48}/***OPENAI_KEY***/g' | \ + sed -E 's/xox[baprs]-[A-Za-z0-9-]+/***SLACK_TOKEN***/g') + + echo "$SANITIZED" + + # 結果を保存 + echo "result<> $GITHUB_OUTPUT + echo "$SANITIZED" >> $GITHUB_OUTPUT + echo "EOF" >> $GITHUB_OUTPUT + + - name: Create issue if changes detected + if: steps.wait-task.outputs.has_changes == 'true' + uses: actions/github-script@v8 + env: + LOG_RESULT: ${{ steps.logs.outputs.result }} + with: + script: | + const result = process.env.LOG_RESULT || ''; + // JSTで日付を計算(UTC+9時間) + const now = new Date(); + const jstDate = new Date(now.getTime() + 9 * 60 * 60 * 1000); + const today = jstDate.toISOString().split('T')[0]; + + const body = [ + '## 停車パターン変更が検出されました', + '', + '```', + result, + '```', + '', + '---', + 'このIssueは GitHub Actions により自動作成されました。', + '変更を確認したら、DBで `acknowledged = TRUE` に更新してください。' + ].join('\n'); + + const requiredLabels = ['stop-pattern-change', 'automated']; + + // ラベルが存在しない場合に作成する関数 + async function ensureLabelsExist(labels) { + for (const labelName of labels) { + try { + await github.rest.issues.getLabel({ + owner: context.repo.owner, + repo: context.repo.repo, + name: labelName + }); + } catch (error) { + if (error.status === 404) { + console.log(`Creating missing label: ${labelName}`); + await github.rest.issues.createLabel({ + owner: context.repo.owner, + repo: context.repo.repo, + name: labelName, + color: labelName === 'stop-pattern-change' ? 'fbca04' : '0e8a16', + description: labelName === 'stop-pattern-change' + ? '停車パターンの変更検出' + : '自動生成されたIssue' + }); + } else { + throw error; + } + } + } + } + + // Issueを作成する関数(422エラー時にラベル作成してリトライ) + async function createIssueWithRetry() { + try { + return await github.rest.issues.create({ + owner: context.repo.owner, + repo: context.repo.repo, + title: `[自動検出] 停車パターン変更 ${today}`, + body: body, + labels: requiredLabels + }); + } catch (error) { + if (error.status === 422) { + console.log('Got 422 error, ensuring labels exist and retrying...'); + await ensureLabelsExist(requiredLabels); + return await github.rest.issues.create({ + owner: context.repo.owner, + repo: context.repo.repo, + title: `[自動検出] 停車パターン変更 ${today}`, + body: body, + labels: requiredLabels + }); + } + throw error; + } + } + + await createIssueWithRetry(); diff --git a/stationapi/Cargo.toml b/stationapi/Cargo.toml index 29a1b36a..602bbc88 100644 --- a/stationapi/Cargo.toml +++ b/stationapi/Cargo.toml @@ -28,7 +28,7 @@ tonic-health = "0.12.3" tonic-reflection = "0.12.3" csv = "1.3.1" chrono = ">=0.4.20" -reqwest = { version = "0.12.12", default-features = false, features = ["blocking", "rustls-tls"] } +reqwest = { version = "0.12.12", default-features = false, features = ["blocking", "rustls-tls", "json"] } zip = ">=2.3.0" [build-dependencies] diff --git a/stationapi/docs/stop-pattern-detection-setup.md b/stationapi/docs/stop-pattern-detection-setup.md new file mode 100644 index 00000000..024d76ec --- /dev/null +++ b/stationapi/docs/stop-pattern-detection-setup.md @@ -0,0 +1,265 @@ +# 停車パターン差分検知システム セットアップガイド + +このドキュメントでは、停車パターン差分検知システムをGitHub Actions + ECSで定期実行するための設定手順を説明します。 + +## 概要 + +``` +┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐ +│ GitHub Actions │────▶│ ECS Task │────▶│ PostgreSQL │ +│ (cron 09:00) │ │ (detect_stop_ │ │ (VPC内) │ +│ │ │ patterns) │ │ │ +└────────┬────────┘ └────────┬────────┘ └─────────────────┘ + │ │ + │◀──────────────────────┘ + │ CloudWatch Logs経由で結果取得 + ▼ +┌─────────────────┐ +│ GitHub Issue │ +│ 自動作成 │ +└─────────────────┘ +``` + +- **GitHub Actions**: 毎日定時にECSタスクを起動し、結果に応じてIssueを作成 +- **ECS Task**: VPC内のDBにアクセスして差分を検出 +- **GitHub Token不要**: ECS側にGitHub Tokenを配置する必要なし + +## 1. AWS側の設定 + +### 1.1 CloudWatch Logsロググループ作成 + +```bash +aws logs create-log-group --log-group-name /ecs/detect-stop-patterns +``` + +### 1.2 ECSタスク定義 + +以下の内容でタスク定義を作成します。 + +```json +{ + "family": "detect-stop-patterns", + "containerDefinitions": [ + { + "name": "detect-stop-patterns", + "image": ".dkr.ecr.ap-northeast-1.amazonaws.com/stationapi:latest", + "command": ["./detect_stop_patterns", "-o", "all"], + "environment": [ + { + "name": "ODPT_API_KEY", + "value": "" + }, + { + "name": "DATABASE_URL", + "value": "postgres://user:pass@host:5432/dbname" + } + ], + "logConfiguration": { + "logDriver": "awslogs", + "options": { + "awslogs-group": "/ecs/detect-stop-patterns", + "awslogs-region": "ap-northeast-1", + "awslogs-stream-prefix": "detect-stop-patterns" + } + }, + "essential": true + } + ], + "networkMode": "awsvpc", + "requiresCompatibilities": ["FARGATE"], + "cpu": "256", + "memory": "512", + "executionRoleArn": "", + "taskRoleArn": "" +} +``` + +> **Note**: 機密情報(ODPT_API_KEY, DATABASE_URL)はAWS Secrets Managerを使用することを推奨します。 + +### 1.3 IAMポリシー(GitHub Actions用) + +GitHub ActionsからECSタスクを実行するためのIAMポリシーを作成します。 + +```json +{ + "Version": "2012-10-17", + "Statement": [ + { + "Sid": "ECSRunTask", + "Effect": "Allow", + "Action": [ + "ecs:RunTask", + "ecs:DescribeTasks" + ], + "Resource": "*" + }, + { + "Sid": "PassRole", + "Effect": "Allow", + "Action": "iam:PassRole", + "Resource": [ + "", + "" + ] + }, + { + "Sid": "CloudWatchLogs", + "Effect": "Allow", + "Action": [ + "logs:GetLogEvents", + "logs:DescribeLogStreams" + ], + "Resource": "arn:aws:logs:ap-northeast-1:*:log-group:/ecs/detect-stop-patterns:*" + } + ] +} +``` + +このポリシーをアタッチしたIAMユーザーを作成し、アクセスキーを発行します。 + +## 2. GitHub側の設定 + +### 2.1 Secrets + +リポジトリの **Settings → Secrets and variables → Actions → Secrets** で以下を設定: + +| 名前 | 説明 | +|------|------| +| `AWS_ACCESS_KEY_ID` | AWSアクセスキーID | +| `AWS_SECRET_ACCESS_KEY` | AWSシークレットアクセスキー | + +### 2.2 Variables + +同じ画面の **Variables** タブで以下を設定: + +| 名前 | 例 | 説明 | +|------|-----|------| +| `AWS_REGION` | `ap-northeast-1` | AWSリージョン | +| `ECS_CLUSTER` | `stationapi-cluster` | ECSクラスター名 | +| `ECS_TASK_DEFINITION` | `detect-stop-patterns` | タスク定義名 | +| `ECS_SUBNETS` | `subnet-xxx,subnet-yyy` | サブネットID(カンマ区切り) | +| `ECS_SECURITY_GROUPS` | `sg-xxx` | セキュリティグループID | +| `ECS_LOG_GROUP` | `/ecs/detect-stop-patterns` | CloudWatch Logsロググループ名 | + +### 2.3 Issueラベル作成 + +**Issues → Labels** で以下のラベルを作成: + +- `stop-pattern-change` - 停車パターン変更用 +- `automated` - 自動作成されたIssue用 + +## 3. データベース設定 + +以下のテーブルが必要です(`create_table.sql` に含まれています): + +```sql +-- スナップショット保存用 +CREATE TABLE stop_pattern_snapshots ( + id SERIAL PRIMARY KEY, + operator_id VARCHAR(100) NOT NULL, + railway_id VARCHAR(100) NOT NULL, + train_type_id VARCHAR(100) NOT NULL, + train_type_name VARCHAR(100), + station_ids TEXT[] NOT NULL, + station_names TEXT[], + captured_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + captured_date DATE NOT NULL, + UNIQUE(railway_id, train_type_id, captured_date) +); + +-- 変更ログ用 +CREATE TABLE stop_pattern_changes ( + id SERIAL PRIMARY KEY, + operator_id VARCHAR(100) NOT NULL, + railway_id VARCHAR(100) NOT NULL, + railway_name VARCHAR(100), + train_type_id VARCHAR(100) NOT NULL, + train_type_name VARCHAR(100), + change_type VARCHAR(20) NOT NULL, + station_id VARCHAR(100) NOT NULL, + station_name VARCHAR(100), + detected_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + acknowledged BOOLEAN DEFAULT FALSE, + acknowledged_at TIMESTAMP +); +``` + +## 4. 動作確認 + +### 手動実行 + +```bash +# GitHub CLIを使用 +gh workflow run detect-stop-patterns.yml + +# または特定のオペレーターを指定 +gh workflow run detect-stop-patterns.yml -f operators=TokyoMetro,JR-East +``` + +GitHub Actionsタブから「Run workflow」ボタンでも実行可能です。 + +### ログ確認 + +```bash +# 最新のワークフロー実行を確認 +gh run list --workflow=detect-stop-patterns.yml + +# 特定の実行のログを表示 +gh run view --log +``` + +## 5. 運用 + +### 定期実行スケジュール + +デフォルトでは毎日 09:00 JST(00:00 UTC)に実行されます。 + +変更する場合は `.github/workflows/detect-stop-patterns.yml` の cron 設定を編集: + +```yaml +on: + schedule: + - cron: '0 0 * * *' # UTC時間で指定 +``` + +### 変更の確認 + +Issueが作成されたら内容を確認し、対応後にDBを更新: + +```sql +-- 確認済みにする +UPDATE stop_pattern_changes +SET acknowledged = TRUE, acknowledged_at = NOW() +WHERE acknowledged = FALSE; +``` + +### ログローテーション + +自動的に以下のローテーションが実行されます: + +- **変更ログ**: acknowledged=TRUE かつ 90日経過したものを削除 +- **スナップショット**: 30日経過したものを削除 + +設定変更はCLIオプションで可能: + +```bash +./detect_stop_patterns --changes-retention 180 --snapshots-retention 60 +``` + +## トラブルシューティング + +### ECSタスクが起動しない + +- サブネットがインターネットアクセス可能か確認(NAT Gateway or Public IP) +- セキュリティグループでアウトバウンド443が許可されているか確認 +- タスク定義のCPU/メモリ設定を確認 + +### CloudWatch Logsにログが出ない + +- ログストリーム名のプレフィックス設定を確認 +- ECS実行ロールにCloudWatch Logs書き込み権限があるか確認 + +### Issueが作成されない + +- ワークフローの実行ログを確認 +- リポジトリにIssue作成権限があるか確認(GitHub Actionsのデフォルトトークンで可能) diff --git a/stationapi/src/bin/detect_stop_patterns.rs b/stationapi/src/bin/detect_stop_patterns.rs new file mode 100644 index 00000000..a1972695 --- /dev/null +++ b/stationapi/src/bin/detect_stop_patterns.rs @@ -0,0 +1,336 @@ +//! CLI tool for detecting train stop pattern changes +//! +//! Usage: +//! cargo run --bin detect_stop_patterns +//! +//! Environment variables: +//! ODPT_API_KEY: Required. API key for ODPT API. +//! DATABASE_URL: Required. PostgreSQL connection string. + +use sqlx::postgres::PgPoolOptions; +use stationapi::stop_pattern::{ + odpt_client::OdptOperator, GitHubIssueCreator, RotationConfig, StopPatternDetector, +}; +use std::env; +use tracing::{error, info}; + +/// Exit codes: +/// - 0: No changes detected +/// - 1: Error occurred +/// - 10: Changes detected +const EXIT_NO_CHANGES: i32 = 0; +const EXIT_ERROR: i32 = 1; +const EXIT_CHANGES_DETECTED: i32 = 10; + +#[tokio::main] +async fn main() { + match run().await { + Ok(has_changes) => { + if has_changes { + std::process::exit(EXIT_CHANGES_DETECTED); + } else { + std::process::exit(EXIT_NO_CHANGES); + } + } + Err(e) => { + error!("Error: {}", e); + std::process::exit(EXIT_ERROR); + } + } +} + +async fn run() -> Result> { + tracing_subscriber::fmt::init(); + + // Load .env.local if available + if dotenv::from_filename(".env.local").is_err() { + tracing::warn!("Could not load .env.local"); + } + + // Get API key + let api_key = match env::var("ODPT_API_KEY") { + Ok(key) => key, + Err(_) => { + eprintln!("Error: ODPT_API_KEY environment variable is required"); + eprintln!(); + eprintln!("Usage:"); + eprintln!(" ODPT_API_KEY=your_key cargo run --bin detect_stop_patterns"); + eprintln!(); + eprintln!("To get an API key, register at: https://developer.odpt.org/"); + return Err("ODPT_API_KEY environment variable is required".into()); + } + }; + + // Connect to database + let db_url = match env::var("DATABASE_URL") { + Ok(url) => url, + Err(_) => { + eprintln!("Error: DATABASE_URL environment variable is required"); + eprintln!(); + eprintln!("Usage:"); + eprintln!( + " DATABASE_URL=postgres://user:pass@host/db cargo run --bin detect_stop_patterns" + ); + eprintln!(); + eprintln!("The DATABASE_URL should be a valid PostgreSQL connection string."); + return Err("DATABASE_URL environment variable is required".into()); + } + }; + + let pool = PgPoolOptions::new() + .max_connections(5) + .connect(&db_url) + .await?; + + info!("Connected to database"); + + // Parse operators and rotation config from command line + let operators = parse_operators(); + let rotation_config = parse_rotation_config(); + + info!("Detecting stop patterns for {} operators", operators.len()); + for op in &operators { + info!(" - {} ({})", op.name(), op.id()); + } + + info!( + "Rotation: changes_retention={}d, snapshots_retention={}d, auto={}", + rotation_config.changes_retention_days, + rotation_config.snapshots_retention_days, + rotation_config.auto_rotate + ); + + // Create detector and run + let detector = + StopPatternDetector::new(api_key, pool).with_rotation_config(rotation_config.clone()); + let changes = detector.detect_changes(&operators).await?; + + // Output results + let output = StopPatternDetector::format_changes(&changes); + println!("{}", output); + + let has_changes = !changes.is_empty(); + + if has_changes { + info!("Changes have been saved to the database"); + info!("Review with: SELECT * FROM stop_pattern_changes WHERE acknowledged = FALSE;"); + + // Create GitHub issue if configured + if let Some((token, repo)) = get_github_config() { + info!("Creating GitHub issue for detected changes..."); + let creator = GitHubIssueCreator::new(token, repo); + match creator.create_issue(&changes).await { + Ok(url) => info!("GitHub issue created: {}", url), + Err(e) => error!("Failed to create GitHub issue: {}", e), + } + } + } + + Ok(has_changes) +} + +fn get_github_config() -> Option<(String, String)> { + let args: Vec = env::args().collect(); + + // Check for --github-issue flag + if !args.iter().any(|a| a == "--github-issue") { + return None; + } + + // Get token and repo from environment variables, tracking which are missing + let token_result = env::var("GITHUB_TOKEN"); + let repo_result = env::var("GITHUB_REPO"); + + let token_missing = token_result.is_err(); + let repo_missing = repo_result.is_err(); + + if token_missing || repo_missing { + let mut missing_vars = Vec::new(); + if token_missing { + missing_vars.push("GITHUB_TOKEN"); + } + if repo_missing { + missing_vars.push("GITHUB_REPO"); + } + eprintln!( + "Warning: --github-issue was specified but {} {} not set. GitHub issue creation will be skipped.", + missing_vars.join(" and "), + if missing_vars.len() > 1 { "are" } else { "is" } + ); + return None; + } + + Some((token_result.unwrap(), repo_result.unwrap())) +} + +fn parse_operators() -> Vec { + let args: Vec = env::args().collect(); + + // Check for --help + if args.iter().any(|a| a == "--help" || a == "-h") { + print_help(); + std::process::exit(0); + } + + // Check for --operators argument + for i in 0..args.len() { + if (args[i] == "--operators" || args[i] == "-o") && i + 1 < args.len() { + return parse_operator_list(&args[i + 1]); + } + } + + // Check for OPERATORS environment variable + if let Ok(operators_env) = env::var("OPERATORS") { + if !operators_env.is_empty() { + info!("Using OPERATORS from environment variable"); + return parse_operator_list(&operators_env); + } + } + + // Default: Tokyo Metro and Toei only (for faster initial testing) + vec![OdptOperator::TokyoMetro, OdptOperator::Toei] +} + +fn parse_operator_list(list: &str) -> Vec { + if list == "all" { + return OdptOperator::all(); + } + + let result: Vec = list + .split(',') + .filter_map(|s| match s.trim().to_lowercase().as_str() { + "tokyometro" | "tokyo-metro" | "metro" => Some(OdptOperator::TokyoMetro), + "toei" => Some(OdptOperator::Toei), + "jr-east" | "jreast" | "jr" => Some(OdptOperator::JREast), + "tobu" => Some(OdptOperator::Tobu), + "seibu" => Some(OdptOperator::Seibu), + "keio" => Some(OdptOperator::Keio), + "odakyu" => Some(OdptOperator::Odakyu), + "tokyu" => Some(OdptOperator::Tokyu), + "keikyu" => Some(OdptOperator::Keikyu), + "keisei" => Some(OdptOperator::Keisei), + "sotetsu" => Some(OdptOperator::Sotetsu), + _ => { + eprintln!("Warning: Unknown operator '{}', skipping", s); + None + } + }) + .collect(); + + if result.is_empty() { + eprintln!( + "Warning: No valid operators found in '{}'. Using all operators as default.", + list + ); + return OdptOperator::all(); + } + + result +} + +fn parse_rotation_config() -> RotationConfig { + use tracing::warn; + + let args: Vec = env::args().collect(); + let mut config = RotationConfig::default(); + + for i in 0..args.len() { + if args[i] == "--changes-retention" && i + 1 < args.len() { + match args[i + 1].parse::() { + Ok(days) if days > 0 => { + config.changes_retention_days = days; + } + Ok(days) => { + warn!( + "Invalid --changes-retention value '{}': must be positive, using default {}", + days, config.changes_retention_days + ); + } + Err(_) => { + warn!( + "Failed to parse --changes-retention value '{}': not a valid integer, using default {}", + args[i + 1], config.changes_retention_days + ); + } + } + } + if args[i] == "--snapshots-retention" && i + 1 < args.len() { + match args[i + 1].parse::() { + Ok(days) if days > 0 => { + config.snapshots_retention_days = days; + } + Ok(days) => { + warn!( + "Invalid --snapshots-retention value '{}': must be positive, using default {}", + days, config.snapshots_retention_days + ); + } + Err(_) => { + warn!( + "Failed to parse --snapshots-retention value '{}': not a valid integer, using default {}", + args[i + 1], config.snapshots_retention_days + ); + } + } + } + if args[i] == "--no-rotate" { + config.auto_rotate = false; + } + } + + config +} + +fn print_help() { + println!("detect_stop_patterns - Detect train stop pattern changes from ODPT API"); + println!(); + println!("USAGE:"); + println!(" detect_stop_patterns [OPTIONS]"); + println!(); + println!("OPTIONS:"); + println!(" -o, --operators Comma-separated list of operators, or 'all'"); + println!(" Default: TokyoMetro,Toei"); + println!(" --changes-retention "); + println!(" Days to keep acknowledged changes (default: 90)"); + println!(" --snapshots-retention "); + println!(" Days to keep snapshots (default: 30)"); + println!(" --no-rotate Disable automatic log rotation"); + println!(" --github-issue Create GitHub issue when changes detected"); + println!(" Requires GITHUB_TOKEN and GITHUB_REPO env vars"); + println!(" -h, --help Print this help message"); + println!(); + println!("OPERATORS:"); + println!(" TokyoMetro - Tokyo Metro (東京メトロ)"); + println!(" Toei - Toei Subway (都営地下鉄)"); + println!(" JR-East - JR East (JR東日本)"); + println!(" Tobu - Tobu Railway (東武鉄道)"); + println!(" Seibu - Seibu Railway (西武鉄道)"); + println!(" Keio - Keio Corporation (京王電鉄)"); + println!(" Odakyu - Odakyu Electric Railway (小田急電鉄)"); + println!(" Tokyu - Tokyu Corporation (東急電鉄)"); + println!(" Keikyu - Keikyu Corporation (京急電鉄)"); + println!(" Keisei - Keisei Electric Railway (京成電鉄)"); + println!(" Sotetsu - Sagami Railway (相鉄)"); + println!(); + println!("EXAMPLES:"); + println!(" # Detect changes for Tokyo Metro and Toei (default)"); + println!(" detect_stop_patterns"); + println!(); + println!(" # Detect changes for specific operators"); + println!(" detect_stop_patterns -o TokyoMetro,JR-East,Tokyu"); + println!(); + println!(" # Detect changes for all operators"); + println!(" detect_stop_patterns -o all"); + println!(); + println!("ENVIRONMENT VARIABLES:"); + println!(" ODPT_API_KEY Required. API key for ODPT API."); + println!(" DATABASE_URL Required. PostgreSQL connection string."); + println!(" OPERATORS Optional. Comma-separated list of operators or 'all'."); + println!(" Used if -o/--operators is not provided."); + println!( + " GITHUB_TOKEN Required for --github-issue. Personal access token with repo scope." + ); + println!(" GITHUB_REPO Required for --github-issue. Repository in 'owner/repo' format."); + println!(); + println!("To get an ODPT API key, register at: https://developer.odpt.org/"); +} diff --git a/stationapi/src/config.rs b/stationapi/src/config.rs index 62957a38..8d476d23 100644 --- a/stationapi/src/config.rs +++ b/stationapi/src/config.rs @@ -9,3 +9,28 @@ pub fn fetch_database_url() -> String { Err(env::VarError::NotUnicode(_)) => panic!("$DATABASE_URL should be written in Unicode."), } } + +/// Fetch the ODPT_API_KEY environment variable. +/// Returns None if the variable is not set. +pub fn fetch_odpt_api_key() -> Option { + env::var("ODPT_API_KEY").ok() +} + +/// Check if rail GTFS feature is enabled via ENABLE_RAIL_GTFS environment variable. +/// Defaults to true if ODPT_API_KEY is set, false otherwise. +pub fn is_rail_gtfs_enabled() -> bool { + match env::var("ENABLE_RAIL_GTFS") { + Ok(s) => s.to_lowercase() == "true" || s == "1", + Err(_) => fetch_odpt_api_key().is_some(), + } +} + +/// Check if a specific GTFS source is enabled via environment variable. +/// Example: GTFS_ENABLE_TOKYO_METRO=true +pub fn is_gtfs_source_enabled(source_id: &str) -> bool { + let env_key = format!("GTFS_ENABLE_{}", source_id.to_uppercase().replace('-', "_")); + match env::var(&env_key) { + Ok(s) => s.to_lowercase() == "true" || s == "1", + Err(_) => true, // Default to enabled + } +} diff --git a/stationapi/src/import.rs b/stationapi/src/import.rs index 1b991878..f91af13c 100644 --- a/stationapi/src/import.rs +++ b/stationapi/src/import.rs @@ -2,7 +2,9 @@ use csv::{ReaderBuilder, StringRecord}; use sqlx::{Connection, PgConnection}; -use stationapi::config::fetch_database_url; +use stationapi::config::{ + fetch_database_url, fetch_odpt_api_key, is_gtfs_source_enabled, is_rail_gtfs_enabled, +}; use std::collections::HashMap; use std::io::{Cursor, Read as _}; use std::path::Path; @@ -10,6 +12,84 @@ use std::{env, fs}; use tracing::{info, warn}; use zip::ZipArchive; +/// GTFSデータソースの設定 +#[derive(Debug, Clone)] +pub struct GtfsSource { + /// ソース識別子(例: "toei_bus", "tokyo_metro") + pub id: String, + /// 表示名 + pub name: String, + /// ダウンロードURL + pub url: String, + /// TransportType (1: バス, 2: 鉄道GTFS) + pub transport_type: i32, + /// 会社コード (companies テーブルの company_cd) + pub company_cd: i32, + /// 有効/無効 + pub enabled: bool, +} + +/// 利用可能なGTFSソースの一覧を取得 +pub fn get_gtfs_sources() -> Vec { + let api_key = fetch_odpt_api_key(); + let mut sources = Vec::new(); + + // 都営バス(公開API - APIキー不要) + if !is_bus_feature_disabled() { + sources.push(GtfsSource { + id: "toei_bus".into(), + name: "都営バス".into(), + url: "https://api-public.odpt.org/api/v4/files/Toei/data/ToeiBus-GTFS.zip".into(), + transport_type: 1, + company_cd: 119, + enabled: is_gtfs_source_enabled("toei_bus"), + }); + } + + // APIキーがある場合は鉄道GTFSを追加 + if let Some(key) = api_key { + if is_rail_gtfs_enabled() { + // 東京メトロ + if is_gtfs_source_enabled("tokyo_metro") { + sources.push(GtfsSource { + id: "tokyo_metro".into(), + name: "東京メトロ".into(), + url: format!( + "https://api.odpt.org/api/v4/files/TokyoMetro/data/TokyoMetro-GTFS.zip?acl:consumerKey={}", + key + ), + transport_type: 2, + company_cd: 28, + enabled: true, + }); + } + + // 都営地下鉄 + if is_gtfs_source_enabled("toei_subway") { + sources.push(GtfsSource { + id: "toei_subway".into(), + name: "都営地下鉄".into(), + url: format!( + "https://api.odpt.org/api/v4/files/Toei/data/Toei-Train-GTFS.zip?acl:consumerKey={}", + key + ), + transport_type: 2, + company_cd: 119, + enabled: true, + }); + } + + // TODO: 将来追加予定 + // - JR東日本(関東エリア) + // - 東武鉄道 + // - 相模鉄道 + // - 東京臨海高速鉄道 + } + } + + sources.into_iter().filter(|s| s.enabled).collect() +} + /// Type alias for GTFS trips batch row type TripBatchRow = ( String, @@ -168,34 +248,44 @@ struct Translation { ko: Option, // Korean } -/// GTFS download URL for Toei Bus -const TOEI_BUS_GTFS_URL: &str = - "https://api-public.odpt.org/api/v4/files/Toei/data/ToeiBus-GTFS.zip"; - -/// Download and extract GTFS data from ODPT API -fn download_gtfs() -> Result<(), Box> { - let gtfs_path = Path::new("data/ToeiBus-GTFS"); +/// Download and extract GTFS data for a specific source +fn download_gtfs_source( + source: &GtfsSource, +) -> Result<(), Box> { + let gtfs_path = Path::new("data").join(format!("gtfs-{}", source.id)); // Skip if directory already exists if gtfs_path.exists() { - info!("GTFS directory already exists, skipping download."); + info!( + "[{}] GTFS directory already exists, skipping download.", + source.name + ); return Ok(()); } - info!("Downloading GTFS data from ODPT API..."); + info!("[{}] Downloading GTFS data from ODPT API...", source.name); // Download the ZIP file - let response = reqwest::blocking::get(TOEI_BUS_GTFS_URL)?; + let response = reqwest::blocking::get(&source.url)?; if !response.status().is_success() { - return Err(format!("Failed to download GTFS: HTTP {}", response.status()).into()); + return Err(format!( + "[{}] Failed to download GTFS: HTTP {}", + source.name, + response.status() + ) + .into()); } let bytes = response.bytes()?; - info!("Downloaded {} bytes, extracting...", bytes.len()); + info!( + "[{}] Downloaded {} bytes, extracting...", + source.name, + bytes.len() + ); // Create the target directory - fs::create_dir_all(gtfs_path)?; + fs::create_dir_all(>fs_path)?; // Extract the ZIP file let cursor = Cursor::new(bytes); @@ -224,46 +314,48 @@ fn download_gtfs() -> Result<(), Box> { let mut contents = Vec::new(); file.read_to_end(&mut contents)?; fs::write(&output_path, &contents)?; - - info!("Extracted: {}", output_name); } - info!("GTFS extraction completed."); + info!("[{}] GTFS extraction completed.", source.name); + Ok(()) +} + +/// Download all GTFS sources (blocking, for use with spawn_blocking) +fn download_all_gtfs_sources( + sources: Vec, +) -> Result<(), Box> { + for source in &sources { + if let Err(e) = download_gtfs_source(source) { + warn!("[{}] Failed to download GTFS: {}", source.name, e); + // Continue with other sources even if one fails + } + } Ok(()) } -/// Import GTFS data from ToeiBus-GTFS directory +/// Import all GTFS data from multiple sources /// All imports are wrapped in a transaction - if any step fails, all changes are rolled back pub async fn import_gtfs() -> Result<(), Box> { - // Check if bus feature is disabled - if is_bus_feature_disabled() { - info!("Bus feature is disabled, skipping GTFS import."); + let sources = get_gtfs_sources(); + + if sources.is_empty() { + info!("No GTFS sources enabled, skipping GTFS import."); return Ok(()); } - // Download GTFS data if not present (use spawn_blocking to avoid blocking async runtime) - tokio::task::spawn_blocking(download_gtfs) + info!("Found {} GTFS sources to import", sources.len()); + + // Download all GTFS data (use spawn_blocking to avoid blocking async runtime) + let sources_for_download = sources.clone(); + tokio::task::spawn_blocking(move || download_all_gtfs_sources(sources_for_download)) .await .map_err(|e| format!("Failed to spawn blocking task: {}", e))? .map_err(|e| -> Box { e })?; - let gtfs_path = Path::new("data/ToeiBus-GTFS"); - - if !gtfs_path.exists() { - info!("GTFS directory not found, skipping GTFS import."); - return Ok(()); - } - - // Load translations for multi-language support (before transaction to avoid holding lock) - let translations = load_gtfs_translations(gtfs_path)?; - let db_url = fetch_database_url(); let mut conn = PgConnection::connect(&db_url).await?; - info!( - "Starting GTFS import from {:?} (using transaction)...", - gtfs_path - ); + info!("Starting GTFS import (using transaction)..."); // Begin transaction - all changes will be rolled back if any step fails let mut tx = conn.begin().await?; @@ -297,32 +389,49 @@ pub async fn import_gtfs() -> Result<(), Box> { .execute(&mut *tx) .await?; - // Import agencies - import_gtfs_agencies(&mut tx, gtfs_path).await?; + // Import each source + for source in &sources { + let gtfs_path = Path::new("data").join(format!("gtfs-{}", source.id)); + + if !gtfs_path.exists() { + warn!("[{}] GTFS directory not found, skipping.", source.name); + continue; + } + + info!("[{}] Starting import from {:?}...", source.name, gtfs_path); + + // Load translations for multi-language support + let translations = load_gtfs_translations(>fs_path)?; + + // Import agencies + import_gtfs_agencies(&mut tx, >fs_path).await?; - // Import routes - import_gtfs_routes(&mut tx, gtfs_path).await?; + // Import routes with source info + import_gtfs_routes_with_source(&mut tx, >fs_path, source).await?; - // Import stops with translations - import_gtfs_stops(&mut tx, gtfs_path, &translations).await?; + // Import stops with translations + import_gtfs_stops(&mut tx, >fs_path, &translations).await?; - // Import calendar - import_gtfs_calendar(&mut tx, gtfs_path).await?; + // Import calendar + import_gtfs_calendar(&mut tx, >fs_path).await?; - // Import calendar_dates - import_gtfs_calendar_dates(&mut tx, gtfs_path).await?; + // Import calendar_dates + import_gtfs_calendar_dates(&mut tx, >fs_path).await?; - // Import shapes - import_gtfs_shapes(&mut tx, gtfs_path).await?; + // Import shapes + import_gtfs_shapes(&mut tx, >fs_path).await?; - // Import trips - import_gtfs_trips(&mut tx, gtfs_path).await?; + // Import trips + import_gtfs_trips(&mut tx, >fs_path, source).await?; - // Import stop_times (largest file, needs batch processing) - import_gtfs_stop_times(&mut tx, gtfs_path).await?; + // Import stop_times (largest file, needs batch processing) + import_gtfs_stop_times(&mut tx, >fs_path).await?; - // Import feed_info - import_gtfs_feed_info(&mut tx, gtfs_path).await?; + // Import feed_info + import_gtfs_feed_info(&mut tx, >fs_path).await?; + + info!("[{}] Import completed.", source.name); + } // Commit transaction - all changes are now permanent tx.commit().await?; @@ -456,23 +565,31 @@ async fn import_gtfs_agencies( Ok(()) } -/// Import routes from routes.txt -async fn import_gtfs_routes( +/// Import routes from routes.txt with source information +/// Routes are prefixed with source_id to allow multiple sources +async fn import_gtfs_routes_with_source( conn: &mut PgConnection, gtfs_path: &Path, + source: &GtfsSource, ) -> Result<(), Box> { let routes_path = gtfs_path.join("routes.txt"); if !routes_path.exists() { - warn!("routes.txt not found, skipping routes import."); + warn!( + "[{}] routes.txt not found, skipping routes import.", + source.name + ); return Ok(()); } let mut rdr = ReaderBuilder::new().from_path(&routes_path)?; + let mut count = 0; for result in rdr.records() { let record = result?; - // route_id,agency_id,route_short_name,route_long_name,route_desc,route_type,route_url,route_color,route_text_color,jp_parent_route_id - let route_id = record.get(0).unwrap_or(""); + // route_id,agency_id,route_short_name,route_long_name,route_desc,route_type,route_url,route_color,route_text_color + let original_route_id = record.get(0).unwrap_or(""); + // Prefix route_id with source_id to make it unique across sources + let route_id = format!("{}:{}", source.id, original_route_id); let agency_id = record.get(1).filter(|s| !s.is_empty()); let route_short_name = record.get(2).filter(|s| !s.is_empty()); let route_long_name = record.get(3).filter(|s| !s.is_empty()); @@ -484,11 +601,22 @@ async fn import_gtfs_routes( sqlx::query( r#"INSERT INTO gtfs_routes - (route_id, agency_id, route_short_name, route_long_name, route_desc, route_type, route_url, route_color, route_text_color) - VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) - ON CONFLICT (route_id) DO NOTHING"#, + (route_id, agency_id, route_short_name, route_long_name, route_desc, route_type, route_url, route_color, route_text_color, transport_type, company_cd, source_id) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12) + ON CONFLICT (route_id) DO UPDATE SET + agency_id = EXCLUDED.agency_id, + route_short_name = EXCLUDED.route_short_name, + route_long_name = EXCLUDED.route_long_name, + route_desc = EXCLUDED.route_desc, + route_type = EXCLUDED.route_type, + route_url = EXCLUDED.route_url, + route_color = EXCLUDED.route_color, + route_text_color = EXCLUDED.route_text_color, + transport_type = EXCLUDED.transport_type, + company_cd = EXCLUDED.company_cd, + source_id = EXCLUDED.source_id"#, ) - .bind(route_id) + .bind(&route_id) .bind(agency_id) .bind(route_short_name) .bind(route_long_name) @@ -497,11 +625,16 @@ async fn import_gtfs_routes( .bind(route_url) .bind(route_color) .bind(route_text_color) + .bind(source.transport_type) + .bind(source.company_cd) + .bind(&source.id) .execute(&mut *conn) .await?; + + count += 1; } - info!("Imported routes."); + info!("[{}] Imported {} routes.", source.name, count); Ok(()) } @@ -914,13 +1047,18 @@ async fn insert_shapes_batch( } /// Import trips from trips.txt +/// Routes are prefixed with source_id to match gtfs_routes async fn import_gtfs_trips( conn: &mut PgConnection, gtfs_path: &Path, + source: &GtfsSource, ) -> Result<(), Box> { let trips_path = gtfs_path.join("trips.txt"); if !trips_path.exists() { - warn!("trips.txt not found, skipping trips import."); + warn!( + "[{}] trips.txt not found, skipping trips import.", + source.name + ); return Ok(()); } @@ -931,7 +1069,9 @@ async fn import_gtfs_trips( for result in rdr.records() { let record = result?; - let route_id = record.get(0).unwrap_or("").to_string(); + let original_route_id = record.get(0).unwrap_or(""); + // Prefix route_id with source_id to match gtfs_routes + let route_id = format!("{}:{}", source.id, original_route_id); let service_id = record.get(1).unwrap_or("").to_string(); let trip_id = record.get(2).unwrap_or("").to_string(); let trip_headsign = record @@ -982,7 +1122,7 @@ async fn import_gtfs_trips( batch.clear(); if count % 50000 == 0 { - info!("Imported {} trips...", count); + info!("[{}] Imported {} trips...", source.name, count); } } } @@ -993,7 +1133,7 @@ async fn import_gtfs_trips( count += batch.len(); } - info!("Imported {} trips.", count); + info!("[{}] Imported {} trips.", source.name, count); Ok(()) } @@ -1411,6 +1551,10 @@ struct GtfsRouteRow { route_text_color: Option, #[allow(dead_code)] route_sort_order: Option, + transport_type: Option, + company_cd: Option, + #[allow(dead_code)] + source_id: Option, } /// Row type for reading gtfs_stops @@ -1444,16 +1588,11 @@ struct GtfsStopRow { platform_code: Option, } -/// Integrate GTFS bus data into stations/lines tables +/// Integrate GTFS data (bus and rail) into stations/lines tables /// /// This function wraps all integration operations in a single database transaction. /// If any step fails, all changes are rolled back to maintain database consistency. pub async fn integrate_gtfs_to_stations() -> Result<(), Box> { - if is_bus_feature_disabled() { - info!("Bus feature is disabled, skipping GTFS integration."); - return Ok(()); - } - let db_url = fetch_database_url(); let mut conn = PgConnection::connect(&db_url).await?; @@ -1472,14 +1611,14 @@ pub async fn integrate_gtfs_to_stations() -> Result<(), Box Result<(), Box Result<(), Box> { @@ -1509,9 +1649,14 @@ async fn integrate_gtfs_routes_to_lines( .fetch_all(&mut *conn) .await?; - let company_cd = 119; // Tokyo Metropolitan Bureau of Transportation (東京都交通局) + let mut bus_count = 0; + let mut rail_count = 0; for route in &routes { + // Use transport_type and company_cd from gtfs_routes if available, otherwise default to bus (1) and 119 + let transport_type = route.transport_type.unwrap_or(1); + let company_cd = route.company_cd.unwrap_or(119); + let line_cd = generate_bus_line_cd(&route.route_id); let line_name = route .route_short_name @@ -1527,12 +1672,21 @@ async fn integrate_gtfs_routes_to_lines( let line_name_r = route.route_long_name.clone().unwrap_or_default(); + // Determine line_type based on GTFS route_type + // GTFS route_type: 0=Tram, 1=Subway, 2=Rail, 3=Bus + let line_type = match route.route_type { + 0 => 4, // Tram -> line_type 4 (路面電車) + 1 => 3, // Subway -> line_type 3 (地下鉄) + 2 => 2, // Rail -> line_type 2 (JR在来線相当) + _ => 99, // Bus/Others -> line_type 99 (その他) + }; + sqlx::query( r#"INSERT INTO lines ( line_cd, company_cd, line_name, line_name_k, line_name_h, line_name_r, line_color_c, line_type, e_status, e_sort, transport_type ) VALUES ( - $1, $2, $3, $4, $5, $6, $7, $8, 0, $1, 1 + $1, $2, $3, $4, $5, $6, $7, $8, 0, $1, $9 ) ON CONFLICT (line_cd) DO NOTHING"#, ) @@ -1543,7 +1697,8 @@ async fn integrate_gtfs_routes_to_lines( .bind(&line_name) // line_name_h .bind(&line_name_r) // line_name_r .bind(&line_color) - .bind(route.route_type) + .bind(line_type) + .bind(transport_type) .execute(&mut *conn) .await?; @@ -1553,9 +1708,20 @@ async fn integrate_gtfs_routes_to_lines( .bind(&route.route_id) .execute(&mut *conn) .await?; + + if transport_type == 1 { + bus_count += 1; + } else { + rail_count += 1; + } } - info!("Integrated {} routes as lines.", routes.len()); + info!( + "Integrated {} routes as lines (bus: {}, rail: {}).", + routes.len(), + bus_count, + rail_count + ); Ok(()) } diff --git a/stationapi/src/lib.rs b/stationapi/src/lib.rs index 22e1e70e..f737f0fd 100644 --- a/stationapi/src/lib.rs +++ b/stationapi/src/lib.rs @@ -2,6 +2,7 @@ pub mod config; pub mod domain; pub mod infrastructure; pub mod presentation; +pub mod stop_pattern; pub mod use_case; pub mod proto { diff --git a/stationapi/src/stop_pattern/detector.rs b/stationapi/src/stop_pattern/detector.rs new file mode 100644 index 00000000..8ab0501e --- /dev/null +++ b/stationapi/src/stop_pattern/detector.rs @@ -0,0 +1,465 @@ +//! Stop Pattern Detector +//! +//! Detects changes in train stop patterns by comparing current data with stored snapshots. + +use super::odpt_client::{OdptClient, OdptOperator, StopPattern}; +use sqlx::PgPool; +use std::collections::HashSet; +use tracing::info; + +/// Configuration for log rotation +#[derive(Debug, Clone)] +pub struct RotationConfig { + /// Days to retain acknowledged changes (default: 90) + pub changes_retention_days: i32, + /// Number of snapshot generations to keep per railway/train_type (default: 30) + pub snapshots_retention_days: i32, + /// Whether to run rotation automatically after detection (default: true) + pub auto_rotate: bool, +} + +impl Default for RotationConfig { + fn default() -> Self { + Self { + changes_retention_days: 90, + snapshots_retention_days: 30, + auto_rotate: true, + } + } +} + +/// Result of log rotation +#[derive(Debug, Clone, Default)] +pub struct RotationResult { + /// Number of deleted change records + pub deleted_changes: i64, + /// Number of deleted snapshot records + pub deleted_snapshots: i64, +} + +/// Represents a detected change in stop pattern +#[derive(Debug, Clone)] +pub struct StopPatternChange { + pub operator_id: String, + pub railway_id: String, + pub railway_name: String, + pub train_type_id: String, + pub train_type_name: String, + pub change_type: ChangeType, + pub station_id: String, + pub station_name: String, +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum ChangeType { + Added, + Removed, +} + +impl ChangeType { + pub fn as_str(&self) -> &'static str { + match self { + ChangeType::Added => "added", + ChangeType::Removed => "removed", + } + } +} + +/// Previous snapshot from database +#[derive(Debug, Clone)] +struct StoredSnapshot { + pub railway_id: String, + pub train_type_id: String, + pub station_ids: Vec, +} + +/// Stop Pattern Detector +pub struct StopPatternDetector { + client: OdptClient, + pool: PgPool, + rotation_config: RotationConfig, +} + +impl StopPatternDetector { + pub fn new(api_key: String, pool: PgPool) -> Self { + Self { + client: OdptClient::new(api_key), + pool, + rotation_config: RotationConfig::default(), + } + } + + pub fn with_rotation_config(mut self, config: RotationConfig) -> Self { + self.rotation_config = config; + self + } + + /// Run detection for specified operators + pub async fn detect_changes( + &self, + operators: &[OdptOperator], + ) -> Result, Box> { + info!( + "Starting stop pattern detection for {} operators", + operators.len() + ); + + // Fetch current patterns from ODPT API + let current_patterns = self.client.extract_all_stop_patterns(operators).await?; + + info!("Fetched {} current patterns", current_patterns.len()); + + // Get previous snapshots from database + let previous_snapshots = self.get_latest_snapshots().await?; + info!("Loaded {} previous snapshots", previous_snapshots.len()); + + // Detect changes + let changes = self.compare_patterns(¤t_patterns, &previous_snapshots); + + if !changes.is_empty() { + info!("Detected {} changes", changes.len()); + + // Save changes to database + self.save_changes(&changes).await?; + } else { + info!("No changes detected"); + } + + // Save current patterns as new snapshots + self.save_snapshots(¤t_patterns).await?; + + // Run automatic rotation if enabled + if self.rotation_config.auto_rotate { + self.rotate_old_records().await?; + } + + Ok(changes) + } + + /// Rotate old records to prevent unbounded growth + /// + /// Deletes: + /// - Acknowledged changes older than changes_retention_days + /// - Snapshots older than snapshots_retention_days + pub async fn rotate_old_records( + &self, + ) -> Result> { + let mut result = RotationResult::default(); + + // Delete old acknowledged changes + let deleted_changes = sqlx::query_scalar::<_, i64>( + r#" + WITH deleted AS ( + DELETE FROM stop_pattern_changes + WHERE acknowledged = TRUE + AND detected_at < CURRENT_TIMESTAMP - ($1 || ' days')::INTERVAL + RETURNING 1 + ) + SELECT COUNT(*) FROM deleted + "#, + ) + .bind(self.rotation_config.changes_retention_days) + .fetch_one(&self.pool) + .await?; + + result.deleted_changes = deleted_changes; + + // Delete old snapshots (keep only recent ones) + let deleted_snapshots = sqlx::query_scalar::<_, i64>( + r#" + WITH deleted AS ( + DELETE FROM stop_pattern_snapshots + WHERE captured_at < CURRENT_TIMESTAMP - ($1 || ' days')::INTERVAL + RETURNING 1 + ) + SELECT COUNT(*) FROM deleted + "#, + ) + .bind(self.rotation_config.snapshots_retention_days) + .fetch_one(&self.pool) + .await?; + + result.deleted_snapshots = deleted_snapshots; + + if result.deleted_changes > 0 || result.deleted_snapshots > 0 { + info!( + "Rotation: deleted {} old changes, {} old snapshots", + result.deleted_changes, result.deleted_snapshots + ); + } + + Ok(result) + } + + /// Get the latest snapshot for each railway/train_type combination + async fn get_latest_snapshots( + &self, + ) -> Result, Box> { + let rows = sqlx::query_as::<_, (String, String, Vec)>( + r#" + SELECT DISTINCT ON (railway_id, train_type_id) + railway_id, train_type_id, station_ids + FROM stop_pattern_snapshots + ORDER BY railway_id, train_type_id, captured_at DESC + "#, + ) + .fetch_all(&self.pool) + .await?; + + Ok(rows + .into_iter() + .map(|(railway_id, train_type_id, station_ids)| StoredSnapshot { + railway_id, + train_type_id, + station_ids, + }) + .collect()) + } + + /// Compare current patterns with previous snapshots + fn compare_patterns( + &self, + current: &[StopPattern], + previous: &[StoredSnapshot], + ) -> Vec { + let mut changes = Vec::new(); + + // Build lookup map for previous snapshots + let prev_map: std::collections::HashMap<(&str, &str), &StoredSnapshot> = previous + .iter() + .map(|s| ((s.railway_id.as_str(), s.train_type_id.as_str()), s)) + .collect(); + + for pattern in current { + let key = (pattern.railway_id.as_str(), pattern.train_type_id.as_str()); + + let current_stations: HashSet<&str> = + pattern.station_ids.iter().map(|s| s.as_str()).collect(); + + if let Some(prev) = prev_map.get(&key) { + let prev_stations: HashSet<&str> = + prev.station_ids.iter().map(|s| s.as_str()).collect(); + + // Find added stations + for station_id in current_stations.difference(&prev_stations) { + let station_name = pattern + .station_ids + .iter() + .zip(pattern.station_names.iter()) + .find(|(id, _)| id.as_str() == *station_id) + .map(|(_, name)| name.clone()) + .unwrap_or_else(|| station_id.to_string()); + + changes.push(StopPatternChange { + operator_id: pattern.operator_id.clone(), + railway_id: pattern.railway_id.clone(), + railway_name: pattern.railway_name.clone(), + train_type_id: pattern.train_type_id.clone(), + train_type_name: pattern.train_type_name.clone(), + change_type: ChangeType::Added, + station_id: station_id.to_string(), + station_name, + }); + } + + // Find removed stations + for station_id in prev_stations.difference(¤t_stations) { + changes.push(StopPatternChange { + operator_id: pattern.operator_id.clone(), + railway_id: pattern.railway_id.clone(), + railway_name: pattern.railway_name.clone(), + train_type_id: pattern.train_type_id.clone(), + train_type_name: pattern.train_type_name.clone(), + change_type: ChangeType::Removed, + station_id: station_id.to_string(), + station_name: station_id.to_string(), // Name not available for removed + }); + } + } + // Note: We don't report "new" railway/train_type combinations as changes + // since that would generate too much noise on first run + } + + changes + } + + /// Save detected changes to database + async fn save_changes( + &self, + changes: &[StopPatternChange], + ) -> Result<(), Box> { + for change in changes { + sqlx::query( + r#" + INSERT INTO stop_pattern_changes + (operator_id, railway_id, railway_name, train_type_id, train_type_name, + change_type, station_id, station_name) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8) + "#, + ) + .bind(&change.operator_id) + .bind(&change.railway_id) + .bind(&change.railway_name) + .bind(&change.train_type_id) + .bind(&change.train_type_name) + .bind(change.change_type.as_str()) + .bind(&change.station_id) + .bind(&change.station_name) + .execute(&self.pool) + .await?; + } + + Ok(()) + } + + /// Save current patterns as snapshots + async fn save_snapshots( + &self, + patterns: &[StopPattern], + ) -> Result<(), Box> { + for pattern in patterns { + sqlx::query( + r#" + INSERT INTO stop_pattern_snapshots + (operator_id, railway_id, train_type_id, train_type_name, station_ids, station_names, captured_date) + VALUES ($1, $2, $3, $4, $5, $6, CURRENT_DATE) + ON CONFLICT (railway_id, train_type_id, captured_date) + DO UPDATE SET + station_ids = EXCLUDED.station_ids, + station_names = EXCLUDED.station_names, + train_type_name = EXCLUDED.train_type_name + "#, + ) + .bind(&pattern.operator_id) + .bind(&pattern.railway_id) + .bind(&pattern.train_type_id) + .bind(&pattern.train_type_name) + .bind(&pattern.station_ids) + .bind(&pattern.station_names) + .execute(&self.pool) + .await?; + } + + info!("Saved {} snapshots", patterns.len()); + Ok(()) + } + + /// Get unacknowledged changes + pub async fn get_unacknowledged_changes( + &self, + ) -> Result, Box> { + let rows = sqlx::query_as::< + _, + ( + String, + String, + String, + String, + String, + String, + String, + String, + ), + >( + r#" + SELECT operator_id, railway_id, railway_name, train_type_id, train_type_name, + change_type, station_id, station_name + FROM stop_pattern_changes + WHERE acknowledged = FALSE + ORDER BY detected_at DESC + "#, + ) + .fetch_all(&self.pool) + .await?; + + Ok(rows + .into_iter() + .map( + |( + operator_id, + railway_id, + railway_name, + train_type_id, + train_type_name, + change_type, + station_id, + station_name, + )| { + StopPatternChange { + operator_id, + railway_id, + railway_name, + train_type_id, + train_type_name, + change_type: if change_type == "added" { + ChangeType::Added + } else { + ChangeType::Removed + }, + station_id, + station_name, + } + }, + ) + .collect()) + } + + /// Format changes for display + pub fn format_changes(changes: &[StopPatternChange]) -> String { + if changes.is_empty() { + return "変更は検出されませんでした。".to_string(); + } + + let mut output = String::new(); + output.push_str(&format!("検出された変更: {} 件\n\n", changes.len())); + + // Group by railway and train type + let mut grouped: std::collections::HashMap<(&str, &str), Vec<&StopPatternChange>> = + std::collections::HashMap::new(); + + for change in changes { + let key = (change.railway_id.as_str(), change.train_type_id.as_str()); + grouped.entry(key).or_default().push(change); + } + + for ((railway_id, _train_type_id), changes) in grouped { + let first = changes.first().unwrap(); + output.push_str(&format!("路線: {} ({})\n", first.railway_name, railway_id)); + output.push_str(&format!("種別: {}\n", first.train_type_name)); + output.push('\n'); + + let added: Vec<_> = changes + .iter() + .filter(|c| c.change_type == ChangeType::Added) + .collect(); + let removed: Vec<_> = changes + .iter() + .filter(|c| c.change_type == ChangeType::Removed) + .collect(); + + if !added.is_empty() { + output.push_str("新規停車:\n"); + for change in added { + output.push_str(&format!( + " + {} ({})\n", + change.station_name, change.station_id + )); + } + } + + if !removed.is_empty() { + output.push_str("停車取りやめ:\n"); + for change in removed { + output.push_str(&format!( + " - {} ({})\n", + change.station_name, change.station_id + )); + } + } + + output.push_str("---\n"); + } + + output + } +} diff --git a/stationapi/src/stop_pattern/github_issue.rs b/stationapi/src/stop_pattern/github_issue.rs new file mode 100644 index 00000000..75310ffd --- /dev/null +++ b/stationapi/src/stop_pattern/github_issue.rs @@ -0,0 +1,147 @@ +//! GitHub Issue Creator +//! +//! Creates GitHub issues when stop pattern changes are detected. + +use super::detector::StopPatternChange; +use serde::Serialize; +use tracing::{error, info}; + +/// GitHub Issue creator for stop pattern changes +pub struct GitHubIssueCreator { + client: reqwest::Client, + token: String, + repo: String, // format: "owner/repo" +} + +#[derive(Serialize)] +struct CreateIssueRequest { + title: String, + body: String, + labels: Vec, +} + +impl GitHubIssueCreator { + /// Create a new GitHubIssueCreator + /// + /// # Arguments + /// * `token` - GitHub personal access token with `repo` scope + /// * `repo` - Repository in "owner/repo" format + pub fn new(token: String, repo: String) -> Self { + Self { + client: reqwest::Client::new(), + token, + repo, + } + } + + /// Create an issue for detected changes + pub async fn create_issue( + &self, + changes: &[StopPatternChange], + ) -> Result> { + if changes.is_empty() { + return Ok("No changes to report".to_string()); + } + + let today = chrono::Utc::now().format("%Y-%m-%d").to_string(); + let title = format!("[自動検出] 停車パターン変更 {}", today); + + let body = self.format_issue_body(changes); + + let request = CreateIssueRequest { + title, + body, + labels: vec!["stop-pattern-change".to_string(), "automated".to_string()], + }; + + let url = format!("https://api.github.com/repos/{}/issues", self.repo); + + let response = self + .client + .post(&url) + .header("Authorization", format!("Bearer {}", self.token)) + .header("User-Agent", "stationapi-stop-pattern-detector") + .header("Accept", "application/vnd.github+json") + .header("X-GitHub-Api-Version", "2022-11-28") + .json(&request) + .send() + .await?; + + if response.status().is_success() { + let issue: serde_json::Value = response.json().await?; + let issue_url = issue["html_url"].as_str().unwrap_or("unknown"); + info!("Created GitHub issue: {}", issue_url); + Ok(issue_url.to_string()) + } else { + let status = response.status(); + let error_body = response.text().await.unwrap_or_default(); + error!("Failed to create issue: {} - {}", status, error_body); + Err(format!("GitHub API error: {} - {}", status, error_body).into()) + } + } + + fn format_issue_body(&self, changes: &[StopPatternChange]) -> String { + let mut body = String::new(); + + body.push_str("## 停車パターン変更が検出されました\n\n"); + body.push_str(&format!("検出された変更: **{}件**\n\n", changes.len())); + + // Group by railway + let mut grouped: std::collections::HashMap<(&str, &str), Vec<&StopPatternChange>> = + std::collections::HashMap::new(); + + for change in changes { + let key = (change.railway_id.as_str(), change.train_type_id.as_str()); + grouped.entry(key).or_default().push(change); + } + + for ((railway_id, _), changes) in grouped { + let first = changes.first().unwrap(); + body.push_str(&format!("### {} ({})\n", first.railway_name, railway_id)); + body.push_str(&format!("種別: {}\n\n", first.train_type_name)); + + let added: Vec<_> = changes + .iter() + .filter(|c| c.change_type == super::detector::ChangeType::Added) + .collect(); + let removed: Vec<_> = changes + .iter() + .filter(|c| c.change_type == super::detector::ChangeType::Removed) + .collect(); + + if !added.is_empty() { + body.push_str("**新規停車:**\n"); + for change in added { + body.push_str(&format!( + "- {} (`{}`)\n", + change.station_name, change.station_id + )); + } + body.push('\n'); + } + + if !removed.is_empty() { + body.push_str("**停車取りやめ:**\n"); + for change in removed { + body.push_str(&format!( + "- {} (`{}`)\n", + change.station_name, change.station_id + )); + } + body.push('\n'); + } + } + + body.push_str("---\n"); + body.push_str("このIssueは自動作成されました。\n"); + body.push_str("変更を確認したら、DBで `acknowledged = TRUE` に更新してください。\n\n"); + body.push_str("```sql\n"); + body.push_str( + "UPDATE stop_pattern_changes SET acknowledged = TRUE, acknowledged_at = NOW()\n", + ); + body.push_str("WHERE acknowledged = FALSE;\n"); + body.push_str("```\n"); + + body + } +} diff --git a/stationapi/src/stop_pattern/mod.rs b/stationapi/src/stop_pattern/mod.rs new file mode 100644 index 00000000..43bf7804 --- /dev/null +++ b/stationapi/src/stop_pattern/mod.rs @@ -0,0 +1,13 @@ +//! Stop Pattern Detection Module +//! +//! This module provides functionality to detect changes in train stop patterns +//! by fetching data from ODPT (Open Data for Public Transportation) API +//! and comparing it with previously stored snapshots. + +pub mod detector; +pub mod github_issue; +pub mod odpt_client; + +pub use detector::{RotationConfig, RotationResult, StopPatternDetector}; +pub use github_issue::GitHubIssueCreator; +pub use odpt_client::OdptClient; diff --git a/stationapi/src/stop_pattern/odpt_client.rs b/stationapi/src/stop_pattern/odpt_client.rs new file mode 100644 index 00000000..4b572361 --- /dev/null +++ b/stationapi/src/stop_pattern/odpt_client.rs @@ -0,0 +1,466 @@ +//! ODPT API Client +//! +//! Client for fetching train timetable data from the ODPT (Open Data for Public Transportation) API. + +use serde::{Deserialize, Serialize}; +use std::collections::{HashMap, HashSet}; +use tracing::{info, warn}; + +const ODPT_API_BASE_URL: &str = "https://api.odpt.org/api/v4"; + +/// ODPT API operators supported for stop pattern detection +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +pub enum OdptOperator { + TokyoMetro, + Toei, + JREast, + Tobu, + Seibu, + Keio, + Odakyu, + Tokyu, + Keikyu, + Keisei, + Sotetsu, +} + +impl OdptOperator { + pub fn id(&self) -> &'static str { + match self { + OdptOperator::TokyoMetro => "TokyoMetro", + OdptOperator::Toei => "Toei", + OdptOperator::JREast => "JR-East", + OdptOperator::Tobu => "Tobu", + OdptOperator::Seibu => "Seibu", + OdptOperator::Keio => "Keio", + OdptOperator::Odakyu => "Odakyu", + OdptOperator::Tokyu => "Tokyu", + OdptOperator::Keikyu => "Keikyu", + OdptOperator::Keisei => "Keisei", + OdptOperator::Sotetsu => "Sotetsu", + } + } + + pub fn name(&self) -> &'static str { + match self { + OdptOperator::TokyoMetro => "東京メトロ", + OdptOperator::Toei => "都営地下鉄", + OdptOperator::JREast => "JR東日本", + OdptOperator::Tobu => "東武鉄道", + OdptOperator::Seibu => "西武鉄道", + OdptOperator::Keio => "京王電鉄", + OdptOperator::Odakyu => "小田急電鉄", + OdptOperator::Tokyu => "東急電鉄", + OdptOperator::Keikyu => "京急電鉄", + OdptOperator::Keisei => "京成電鉄", + OdptOperator::Sotetsu => "相鉄", + } + } + + pub fn all() -> Vec { + vec![ + OdptOperator::TokyoMetro, + OdptOperator::Toei, + OdptOperator::JREast, + OdptOperator::Tobu, + OdptOperator::Seibu, + OdptOperator::Keio, + OdptOperator::Odakyu, + OdptOperator::Tokyu, + OdptOperator::Keikyu, + OdptOperator::Keisei, + OdptOperator::Sotetsu, + ] + } +} + +/// Train timetable object (stop in a trip) +#[derive(Debug, Clone, Deserialize, Serialize)] +pub struct TrainTimetableObject { + #[serde(rename = "odpt:departureTime")] + pub departure_time: Option, + #[serde(rename = "odpt:departureStation")] + pub departure_station: Option, + #[serde(rename = "odpt:arrivalTime")] + pub arrival_time: Option, + #[serde(rename = "odpt:arrivalStation")] + pub arrival_station: Option, +} + +/// Train timetable from ODPT API +#[derive(Debug, Clone, Deserialize, Serialize)] +pub struct TrainTimetable { + #[serde(rename = "@id")] + pub id: String, + #[serde(rename = "odpt:operator")] + pub operator: String, + #[serde(rename = "odpt:railway")] + pub railway: String, + #[serde(rename = "odpt:trainNumber")] + pub train_number: Option, + #[serde(rename = "odpt:trainType")] + pub train_type: Option, + #[serde(rename = "odpt:trainTimetableObject")] + pub train_timetable_object: Vec, +} + +/// Railway information from ODPT API +#[derive(Debug, Clone, Deserialize, Serialize)] +pub struct Railway { + #[serde(rename = "@id")] + pub id: String, + #[serde(rename = "odpt:operator")] + pub operator: String, + #[serde(rename = "dc:title")] + pub title: Option, + #[serde(rename = "odpt:railwayTitle")] + pub railway_title: Option, +} + +#[derive(Debug, Clone, Deserialize, Serialize)] +#[serde(untagged)] +pub enum RailwayTitle { + Simple(String), + Multilang(HashMap), +} + +impl Railway { + pub fn get_name(&self) -> String { + match &self.railway_title { + Some(RailwayTitle::Simple(s)) => s.clone(), + Some(RailwayTitle::Multilang(map)) => { + map.get("ja").or(map.get("en")).cloned().unwrap_or_default() + } + None => self.title.clone().unwrap_or_default(), + } + } +} + +/// Train type information from ODPT API +#[derive(Debug, Clone, Deserialize, Serialize)] +pub struct TrainType { + #[serde(rename = "@id")] + pub id: String, + #[serde(rename = "odpt:operator")] + pub operator: String, + #[serde(rename = "dc:title")] + pub title: Option, + #[serde(rename = "odpt:trainTypeTitle")] + pub train_type_title: Option, +} + +#[derive(Debug, Clone, Deserialize, Serialize)] +#[serde(untagged)] +pub enum TrainTypeTitle { + Simple(String), + Multilang(HashMap), +} + +impl TrainType { + pub fn get_name(&self) -> String { + match &self.train_type_title { + Some(TrainTypeTitle::Simple(s)) => s.clone(), + Some(TrainTypeTitle::Multilang(map)) => { + map.get("ja").or(map.get("en")).cloned().unwrap_or_default() + } + None => self.title.clone().unwrap_or_default(), + } + } +} + +/// Station information from ODPT API +#[derive(Debug, Clone, Deserialize, Serialize)] +pub struct Station { + #[serde(rename = "@id")] + pub id: String, + #[serde(rename = "odpt:operator")] + pub operator: String, + #[serde(rename = "odpt:railway")] + pub railway: Option, + #[serde(rename = "dc:title")] + pub title: Option, + #[serde(rename = "odpt:stationTitle")] + pub station_title: Option, +} + +#[derive(Debug, Clone, Deserialize, Serialize)] +#[serde(untagged)] +pub enum StationTitle { + Simple(String), + Multilang(HashMap), +} + +impl Station { + pub fn get_name(&self) -> String { + match &self.station_title { + Some(StationTitle::Simple(s)) => s.clone(), + Some(StationTitle::Multilang(map)) => { + map.get("ja").or(map.get("en")).cloned().unwrap_or_default() + } + None => self.title.clone().unwrap_or_default(), + } + } +} + +/// Extracted stop pattern for a railway/train type combination +#[derive(Debug, Clone)] +pub struct StopPattern { + pub operator_id: String, + pub railway_id: String, + pub railway_name: String, + pub train_type_id: String, + pub train_type_name: String, + pub station_ids: Vec, + pub station_names: Vec, +} + +/// ODPT API Client +pub struct OdptClient { + pub(crate) api_key: String, + client: reqwest::Client, +} + +impl OdptClient { + pub fn new(api_key: String) -> Self { + Self { + api_key, + client: reqwest::Client::new(), + } + } + + /// Fetch train timetables for an operator + pub async fn fetch_train_timetables( + &self, + operator: OdptOperator, + ) -> Result, Box> { + let url = format!( + "{}/odpt:TrainTimetable?odpt:operator=odpt.Operator:{}&acl:consumerKey={}", + ODPT_API_BASE_URL, + operator.id(), + self.api_key + ); + + info!( + "Fetching train timetables for {} ({})...", + operator.name(), + operator.id() + ); + + let response = self.client.get(&url).send().await?; + + if !response.status().is_success() { + return Err(format!( + "Failed to fetch timetables for {}: HTTP {}", + operator.id(), + response.status() + ) + .into()); + } + + let timetables: Vec = response.json().await?; + info!( + "Fetched {} timetables for {}", + timetables.len(), + operator.name() + ); + + Ok(timetables) + } + + /// Fetch railways for an operator + pub async fn fetch_railways( + &self, + operator: OdptOperator, + ) -> Result, Box> { + let url = format!( + "{}/odpt:Railway?odpt:operator=odpt.Operator:{}&acl:consumerKey={}", + ODPT_API_BASE_URL, + operator.id(), + self.api_key + ); + + let response = self.client.get(&url).send().await?; + + if !response.status().is_success() { + return Err(format!( + "Failed to fetch railways for {}: HTTP {}", + operator.id(), + response.status() + ) + .into()); + } + + let railways: Vec = response.json().await?; + Ok(railways) + } + + /// Fetch train types for an operator + pub async fn fetch_train_types( + &self, + operator: OdptOperator, + ) -> Result, Box> { + let url = format!( + "{}/odpt:TrainType?odpt:operator=odpt.Operator:{}&acl:consumerKey={}", + ODPT_API_BASE_URL, + operator.id(), + self.api_key + ); + + let response = self.client.get(&url).send().await?; + + if !response.status().is_success() { + return Err(format!( + "Failed to fetch train types for {}: HTTP {}", + operator.id(), + response.status() + ) + .into()); + } + + let train_types: Vec = response.json().await?; + Ok(train_types) + } + + /// Fetch stations for an operator + pub async fn fetch_stations( + &self, + operator: OdptOperator, + ) -> Result, Box> { + let url = format!( + "{}/odpt:Station?odpt:operator=odpt.Operator:{}&acl:consumerKey={}", + ODPT_API_BASE_URL, + operator.id(), + self.api_key + ); + + let response = self.client.get(&url).send().await?; + + if !response.status().is_success() { + return Err(format!( + "Failed to fetch stations for {}: HTTP {}", + operator.id(), + response.status() + ) + .into()); + } + + let stations: Vec = response.json().await?; + Ok(stations) + } + + /// Extract stop patterns from train timetables + pub async fn extract_stop_patterns( + &self, + operator: OdptOperator, + ) -> Result, Box> { + // Fetch all required data + let timetables = self.fetch_train_timetables(operator).await?; + let railways = self.fetch_railways(operator).await?; + let train_types = self.fetch_train_types(operator).await?; + let stations = self.fetch_stations(operator).await?; + + // Build lookup maps + let railway_names: HashMap = railways + .iter() + .map(|r| (r.id.clone(), r.get_name())) + .collect(); + + let train_type_names: HashMap = train_types + .iter() + .map(|t| (t.id.clone(), t.get_name())) + .collect(); + + let station_names: HashMap = stations + .iter() + .map(|s| (s.id.clone(), s.get_name())) + .collect(); + + // Group timetables by railway and train type + let mut patterns: HashMap<(String, String), HashSet> = HashMap::new(); + + for timetable in &timetables { + let train_type_id = match &timetable.train_type { + Some(t) => t.clone(), + None => continue, // Skip if no train type + }; + + let key = (timetable.railway.clone(), train_type_id); + + let station_set = patterns.entry(key).or_default(); + + for obj in &timetable.train_timetable_object { + if let Some(station) = &obj.departure_station { + station_set.insert(station.clone()); + } + if let Some(station) = &obj.arrival_station { + station_set.insert(station.clone()); + } + } + } + + // Convert to StopPattern structs + let operator_id = format!("odpt.Operator:{}", operator.id()); + let mut result: Vec = Vec::new(); + + for ((railway_id, train_type_id), station_ids) in patterns { + let railway_name = railway_names + .get(&railway_id) + .cloned() + .unwrap_or_else(|| railway_id.clone()); + + let train_type_name = train_type_names + .get(&train_type_id) + .cloned() + .unwrap_or_else(|| train_type_id.clone()); + + // Sort station IDs for consistent ordering + let mut station_ids: Vec = station_ids.into_iter().collect(); + station_ids.sort(); + + let station_names_list: Vec = station_ids + .iter() + .map(|id| station_names.get(id).cloned().unwrap_or_else(|| id.clone())) + .collect(); + + result.push(StopPattern { + operator_id: operator_id.clone(), + railway_id, + railway_name, + train_type_id, + train_type_name, + station_ids, + station_names: station_names_list, + }); + } + + info!( + "Extracted {} stop patterns for {}", + result.len(), + operator.name() + ); + + Ok(result) + } + + /// Extract stop patterns for multiple operators + pub async fn extract_all_stop_patterns( + &self, + operators: &[OdptOperator], + ) -> Result, Box> { + let mut all_patterns: Vec = Vec::new(); + + for operator in operators { + match self.extract_stop_patterns(*operator).await { + Ok(patterns) => { + all_patterns.extend(patterns); + } + Err(e) => { + warn!("Failed to extract patterns for {}: {}", operator.name(), e); + // Continue with other operators + } + } + } + + Ok(all_patterns) + } +}