日常的 n8n 其实经常要崩,看通知我都看烦了(
IFTTT 虽然好用,但还是稍显简陋,免费版最多支持两个,于是搜索了下还有一个免费的平台叫 pipedream(貌似可以同时运行 5 个工作流,只是限制了每天调用 100 次数,不过这也够了)
整体体验下来,pipedream 还是缺失了还能多功能的,比如 n8n 经常会用到的:
- ワークフローを json 形式でエクスポートして保存
- 複数のアイテムを一括処理(n8n は配列内の各アイテムに対してワークフローノードを実行することができる)
- ワークフローの構築はやや煩雑だが、非常に geek で、プログラマーには比較的優しい
RSS ワークフローの作成#
ここでは純粋に自分の経験をもとにしたもので、最適解とは限らない:
- RSS ノードには、URL を提供するだけで、自動的にデータを解析する
- データの重複排除、ここでは少しコードを書く必要があり、やや煩雑なので後で詳しく説明する
- 条件判断、重複排除後のデータに値があるかどうかを判断し、なければ直接返す
- tg メッセージを送信する
データの保存と重複排除#
pipedream はデータを保存するための専用ページを提供しており、保存されたデータを視覚的に確認できる。この点は n8n よりも優れている。
各ワークフローにはデータ保存場所があることをお勧めします。これは単純な kv キャッシュとして理解できます。
次に、ここでの重複排除のコードを説明します。本質的には n8n のものと非常に似ています。
- 私たちが記述するコードは run 関数内にあり、他は触れない
steps.merge_rss_feeds.$return_value
は自動的に解析された rss ノードから返された json データですlet lastItemId = await this.myDataStore.get("guid");
kv から最近の記録を取得します。これは非同期操作であり、await を追加する必要がありますgetId
関数は各ノードの guid を取得しますnewItems
は送信する最新のアイテムを格納します。これはループ比較によって得られますawait this.myDataStore.set("guid",getId(firstItem))
最近の記録を上書きします
ただし、pipedream はバッチ送信をサポートしていないようです。つまり、
newItems
に複数のアイテムがある場合でも、送信できるのは 1 つだけです。
export default defineComponent({
props: {
myDataStore: { type: "data_store" },
},
async run({ steps, $ }) {
let items = steps.merge_rss_feeds.$return_value
let lastItemId = await this.myDataStore.get("guid");
let firstItem = items[0]
let newItems = [];
function getId(item) {
return item.guid;
}
if (lastItemId) {
for (const item of items) {
if (getId(item) === lastItemId) {
break;
}
newItems.push(item)
}
} else {
newItems = [firstItem]
}
await this.myDataStore.set("guid",getId(firstItem))
return newItems.reverse() // 先に古いメッセージを送信し、その後最新のメッセージを送信
},
})
pipedream でのバッチメッセージ送信の解決#
元々はバッチメッセージ送信の問題を解決するためだけに、カスタムコードの方法でループ送信を考えました。そのため、以下のコードを書きました:
import { axios } from "@pipedream/platform"
export default defineComponent({
props: {
telegram_bot_api: {
type: "app",
app: "telegram_bot_api",
},
chat_id: {
type: "string",
label: "Chatid",
default: "@RayeJourney",
},
},
async run({steps, $}) {
const items = steps.data_stores.$return_value
const results = []; // 各メッセージ送信操作の結果を格納するための配列
for (const item of items) {
// 送信するメッセージ内容を構築
const message = `📝 ${item.title} #blog\n\n${item.url}`;
// メッセージを送信
const result = await axios($, {
url: `https://api.telegram.org/bot${this.telegram_bot_api.$auth.token}/sendMessage`,
method: 'POST',
headers: {
'Content-Type': 'application/json',
},
data: {
chat_id: `${this.chatid}`,
text: message,
parse_mode: 'HTML', // メッセージ内容にHTMLが含まれている場合はこのパラメータを設定
},
});
// 各送信操作の結果を結果配列に追加
results.push(result.data);
}
// すべての送信操作の結果を返す
return results;
},
})
これも低コードプラットフォームでよく見られる解決策で、コード内でカスタムの方法で新しいフィールドを追加することができます。例えば、ここでは現在のノードに tg ボットを選択するフィールドと chat_id を入力するフィールドを追加できます。
telegram_bot_api: {
type: "app",
app: "telegram_bot_api",
},
chat_id: {
type: "string",
label: "Chatid",
default: "@RayeJourney",
},
ここまで書いて、すべてのステップを一つにまとめることができることに気づきました…、思い切って一緒に書いてしまうことにしました。シンプルでわかりやすく、そんなに手間をかける必要もありません。最終的なコードは以下の通りです:
毎回いくつかのポイントを変更するだけで済みます:
- 必須項目はいくつか入力する必要があります
- message_template を変更する必要があります。ここでは item をフィードとして扱い、replace で置き換えていますので、コード内でも変更が必要です。
import { axios } from "@pipedream/platform"
export default defineComponent({
props: {
my_data_store: {
type: "data_store",
label: "MyDataStore"
},
my_data_key: {
type: "string",
label: "MyDataKey",
default: "xlog"
},
telegram_bot_api: {
type: "app",
app: "telegram_bot_api",
},
chat_id: {
type: "string",
label: "Chatid",
default: "@RayeJourney",
},
message_template: { // メッセージは動的に埋め込む必要があるため、message_templateに変更
type: "string",
label: "Message",
default: "📝 ${item.title} #blog\n\n${item.link}"
}
},
async run({steps, $}) {
const items = steps.merge_rss_feeds.$return_value // RSSから取得した情報を直接取得、最新のものが最初に来る
// DBを取得
let lastItem = await this.my_data_store.get(`${this.my_data_key}`) // 正しいデータストレージアクセス方法を使用していることを確認
const results = []; // runメソッドの先頭に移動
let newItem = []; // 送信するメッセージを格納
const getId = (item) => {
return item.link;
};
const sendMsg = async (item) => { // sendMsgは非同期であることを確認
// 動的にメッセージ内容を構築
const message = this.message_template.replace('${item.title}', item.title).replace('${item.link}', item.link);
try {
const result = await axios($, {
url: `https://api.telegram.org/bot${this.telegram_bot_api.$auth.token}/sendMessage`,
method: 'POST',
headers: {
'Content-Type': 'application/json',
},
data: {
chat_id: this.chat_id, // this.chat_idを直接使用
text: message,
parse_mode: 'HTML',
},
});
results.push(result.data); // 結果をresults配列に追加
} catch (error) {
console.error("メッセージ送信エラー:", error);
// エラーを処理し、結果にエラー情報を追加
results.push({error: "メッセージ送信に失敗しました", details: error.toString()});
}
};
// 最新と最古の項目を正しく処理するためのロジックを確認
for(const item of items){ // 最古のものを優先して処理するのではなく、最新のものから始める必要があります。古いものも追加されてしまうためです。
if(lastItem && getId(item) === getId(lastItem)){
break; // 最後に処理した項目を見つけた場合、処理を停止
}
newItem.push(item)
}
for(const item of newItem.reverse()){
await sendMsg(item); // メッセージ送信を待機
}
// データストレージ内のlastItemを今回処理した最新の項目に更新
if (items.length > 0) {
await this.my_data_store.set(this.my_data_key, items[0]); // items[0]が最新の項目であると仮定
}
// すべての送信操作の結果を返す
return results;
},
})
最終的なデプロイの例#
もしあなたがちょうど rsshub からリンクを取得して、それを tg にプッシュしたい場合、非常に簡単です(そもそも rsshub を使っているのに、なぜ直接 rss to tg を使わないのか🤣)
xlog のブログをプッシュする例として、今は 3 つの部分だけで済みます:
- トリガー時間、2 時間ごとにチェックすれば十分です
- RSS 情報を取得する、例えば私の
https://rayepeng.net/feed
- そして、コードノードを追加し、上記のコードを貼り付けます
フィールドを更新するだけで
その後、上部にこれらのボックスが表示され、自分で編集します
- ワークフローを初めて起動する
ワークフローを初めて起動する際、まだデータがないため、直接実行すると大量の過去のメッセージがすべて送信されてしまいます。
そのため、次のようにすることをお勧めします。
DB にこの値を手動で貼り付けます(バッチ送信時に比較が行われ、DB 内のものと同じであれば再送信されません)。
さて、これで大功告成です。n8n よりも実行速度がかなり速いことは間違いありません(