1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
use chrono::{DateTime, Local, TimeZone};
use sqlx::MySqlPool;

use crate::{
    model::{
        api,
        db::{self, MessageRecord},
    },
    naive_to_local,
};

pub async fn get_messages(pool: &MySqlPool) -> anyhow::Result<Vec<MessageRecord>> {
    let messages = db::get_messages(pool).await?;
    Ok(messages)
}

pub async fn get_latest_message(pool: &MySqlPool) -> anyhow::Result<Option<MessageRecord>> {
    let message = db::get_latest_message(pool).await?;
    Ok(message)
}

/// after から before の期間のメッセージを API から取得し、DB に保存する
///
/// (ただし、traQ の検索の仕様上、件数は 10000 件を超えると 10000 件と表示されるため、10000 件までしか取得しない)
///
/// # Arguments
/// * `pool` - DB のコネクションプール
/// * `before` - 取得するメッセージの期間の終わり
/// * `after` - 取得するメッセージの期間の始まり
/// * `interval_ms` - メッセージを取得する間隔 (ミリ秒)
async fn fetch_messages_as_match_as_possible_at_once<TzB, TzA>(
    pool: &MySqlPool,
    before: Option<&DateTime<TzB>>,
    after: Option<&DateTime<TzA>>,
    interval_ms: u64,
) -> anyhow::Result<Vec<MessageRecord>>
where
    TzB: TimeZone,
    TzB::Offset: std::fmt::Display,
    TzA: TimeZone,
    TzA::Offset: std::fmt::Display,
{
    let mut messages = Vec::new();
    let (limit, res_messages) = api::get_messages_with_time_section(0, before, after).await?;

    db::insert_messages(
        pool,
        &res_messages
            .iter()
            .map(MessageRecord::from)
            .collect::<Vec<MessageRecord>>(),
    )
    .await?;

    messages.extend(res_messages);

    let mut now = messages.len();

    while now < limit {
        let (_, res_messages) = api::get_messages_with_time_section(now, before, after).await?;

        let interval = tokio::spawn(async move {
            std::thread::sleep(std::time::Duration::from_micros(interval_ms));
        });

        db::insert_messages(
            pool,
            &res_messages
                .iter()
                .map(MessageRecord::from)
                .collect::<Vec<MessageRecord>>(),
        )
        .await?;

        messages.extend(res_messages);

        now = messages.len();

        interval.await.unwrap();
    }

    if messages.len() > limit {
        messages.truncate(limit);
    }

    Ok(messages.iter().map(MessageRecord::from).collect::<Vec<_>>())
}

/// ある時点より新しいメッセージすべてを最大 limit 件取得し、DB に保存する
/// (ただし、DB に保存される件数は limit 件を上回る可能性がある)
pub async fn fetch_messages<Tz>(
    pool: &MySqlPool,
    limit: Option<usize>,
    after: Option<DateTime<Tz>>,
) -> anyhow::Result<Vec<MessageRecord>>
where
    Tz: TimeZone,
    Tz::Offset: std::fmt::Display,
{
    let mut messages = fetch_messages_as_match_as_possible_at_once(
        pool,
        None::<&DateTime<Local>>,
        after.as_ref(),
        300,
    )
    .await?;
    if messages.is_empty() {
        return Ok(messages);
    }
    loop {
        if let Some(limit) = limit {
            if messages.len() >= limit {
                break;
            }
        }
        let oldest_message = messages.last().unwrap();
        let oldest_message_id = &oldest_message.id;
        let oldest_message_created_at = oldest_message.created_at;
        let oldest_message_created_at_local = naive_to_local(oldest_message_created_at);
        let mut older_messages = fetch_messages_as_match_as_possible_at_once(
            pool,
            Some(&oldest_message_created_at_local),
            after.as_ref(),
            300,
        )
        .await?;

        // 現状保持しているメッセージの中で最も古いメッセージより新しいメッセージのみに絞る
        older_messages
            .retain(|m| m.id != *oldest_message_id && m.created_at >= oldest_message_created_at);

        if older_messages.is_empty() {
            break;
        }
        messages.extend(older_messages);
    }

    if let Some(limit) = limit {
        messages.truncate(limit);
    }

    Ok(messages)
}