日常的 n8n 其實經常要崩,看通知我都看煩了(
IFTTT 雖然好用,但還是稍顯簡陋,免費版最多支持兩個,於是搜索了下還有一個免費的平台叫 pipedream(貌似可以同時運行 5 個工作流,只是限制了每天調用 100 次,不過這也夠了)
整體體驗下來,pipedream 還是缺失了還能多功能的,比如 n8n 經常會用到的:
- 導出工作流為 json 保存
- 批量處理多個 item(n8n 是可以支持針對一個數組裡的每個 item 都運行一遍工作流節點的)
- 搭建一個工作流略微繁瑣,但是很 geek,對程序員還比較友好
創建一個 rss 工作流#
這裡就純粹靠自己摸索出來的經驗了,不一定是最優解:
- rss 的節點,只需要提供一個 url,自動解析其中的數據
- 對數據去重,這裡需要寫點代碼,比較繁瑣,放到後面專門講
- 條件判斷,針對去重後的數據判斷是否有值,如果沒有直接返回
- 發送 tg 消息
數據存儲及去重#
pipedream 提供了一個專門用於存儲數據的頁面,可以可視化地查看存儲的數據,這一點比 n8n 要好
推薦每個工作流都有一個數據存儲位置,你可以將其理解為一個簡單的 kv 緩存
接下來解釋下我這裡去重的代碼,本質上和 n8n 裡的很相似
- 我們要編寫的代碼位於 run 函數中,其他的不用動
steps.merge_rss_feeds.$return_value
是自動解析 rss 節點返回的 json 數據let lastItemId = await this.myDataStore.get("guid");
去 kv 中取出最近一次的記錄,是個異步操作,需要加 awaitgetId
函數就是獲得每個節點的 guidnewItems
存儲要發送的最新 item,是通過循環比對獲得的await this.myDataStore.set("guid",getId(firstItem))
覆蓋最近一次的記錄
不過要注意的是,pipedream 似乎不支持批量發送,即如果
newItems
有多個,也只能發送
export default defineComponent({
props: {
myDataStore: { type: "data_store" },
},
async run({ steps, $ }) {
let items = steps.merge_rss_feeds.$return_value
let lastItemId = await this.myDataStore.get("guid");
let firstItem = items[0]
let newItems = [];
function getId(item) {
return item.guid;
}
if (lastItemId) {
for (const item of items) {
if (getId(item) === lastItemId) {
break;
}
newItems.push(item)
}
} else {
newItems = [firstItem]
}
await this.myDataStore.set("guid",getId(firstItem))
return newItems.reverse() // 先發送舊的消息,再發送最新的
},
})
解決 pipedream 無法批量發送消息#
原本只是為了解決無法批量發送消息的問題,我想到直接用自定義代碼的方式來循環發送就好了,於是寫出了如下代碼:
import { axios } from "@pipedream/platform"
export default defineComponent({
props: {
telegram_bot_api: {
type: "app",
app: "telegram_bot_api",
},
chat_id: {
type: "string",
label: "Chatid",
default: "@RayeJourney",
},
},
async run({steps, $}) {
const items = steps.data_stores.$return_value
const results = []; // 用於存儲每個消息發送操作的結果
for (const item of items) {
// 構造要發送的消息內容
const message = `📝 ${item.title} #blog\n\n${item.url}`;
// 發送消息
const result = await axios($, {
url: `https://api.telegram.org/bot${this.telegram_bot_api.$auth.token}/sendMessage`,
method: 'POST',
headers: {
'Content-Type': 'application/json',
},
data: {
chat_id: `${this.chatid}`,
text: message,
parse_mode: 'HTML', // 如果消息內容包含 HTML,可以設置這個參數
},
});
// 將每次發送操作的結果添加到結果數組中
results.push(result.data);
}
// 返回所有發送操作的結果
return results;
},
})
這也是低代碼平台常見的解決方案了,通過在代碼中自定義的方式來增加新的 field,比如這裡就可以給當前節點增加選擇 tg 機器人,以及一個填寫 chat_id 的 field
telegram_bot_api: {
type: "app",
app: "telegram_bot_api",
},
chat_id: {
type: "string",
label: "Chatid",
default: "@RayeJourney",
},
寫到這裡,我突然意識到可以直接把所有步驟合併到一起了…,索性幹脆合併到一起寫得了,簡單易懂還不需要折騰那麼多,最終代碼如下:
每次只需要改動幾個點:
- 幾個必選項是需要填的
- message_template 需要修改下,我這裡是把 item 當做 feed,然後 replace 替換的,所以代碼中也需要做下修改
import { axios } from "@pipedream/platform"
export default defineComponent({
props: {
my_data_store: {
type: "data_store",
label: "MyDataStore"
},
my_data_key: {
type: "string",
label: "MyDataKey",
default: "xlog"
},
telegram_bot_api: {
type: "app",
app: "telegram_bot_api",
},
chat_id: {
type: "string",
label: "Chatid",
default: "@RayeJourney",
},
message_template: { // 由于 message 需要動態填充,改為 message_template
type: "string",
label: "Message",
default: "📝 ${item.title} #blog\n\n${item.link}"
}
},
async run({steps, $}) {
const items = steps.merge_rss_feeds.$return_value // 直接獲取rss拉到的信息,最新的在最前面
// 獲取db
let lastItem = await this.my_data_store.get(`${this.my_data_key}`) // 確保使用正確的數據存儲訪問方式
const results = []; // 移動到 run 方法的頂部
let newItem = []; // 存儲要發送的消息
const getId = (item) => {
return item.link;
};
const sendMsg = async (item) => { // 確保 sendMsg 是異步的
// 動態構造消息內容
const message = this.message_template.replace('${item.title}', item.title).replace('${item.link}', item.link);
try {
const result = await axios($, {
url: `https://api.telegram.org/bot${this.telegram_bot_api.$auth.token}/sendMessage`,
method: 'POST',
headers: {
'Content-Type': 'application/json',
},
data: {
chat_id: this.chat_id, // 直接使用 this.chat_id
text: message,
parse_mode: 'HTML',
},
});
results.push(result.data); // 添加結果到 results 數組
} catch (error) {
console.error("Error sending message:", error);
// 處理錯誤,例如添加錯誤信息到 results
results.push({error: "Failed to send message", details: error.toString()});
}
};
// 確保遍歷的邏輯正確處理最新和最舊的項
for(const item of items){ // 不能優先處理最老的,一定要從最新的開始,因為會把舊的也加進來..
if(lastItem && getId(item) === getId(lastItem)){
break; // 如果找到了上次處理的最後一個項,則停止處理
}
newItem.push(item)
}
for(const item of newItem.reverse()){
await sendMsg(item); // 等待發送消息
}
// 更新數據存儲中的 lastItem 為本次處理的最新項
if (items.length > 0) {
await this.my_data_store.set(this.my_data_key, items[0]); // 假設 items[0] 是最新的項
}
// 返回所有發送操作的結果
return results;
},
})
最終的部署舉例#
如果你恰巧是通過 rsshub 獲取的鏈接,然後想推送到 tg,那麼就很簡單了(話說都已經用 rsshub,為啥不直接用 rss to tg 呢🤣)
以 xlog 的博客推送舉例,現在只需要三個部分:
- 觸發時間,兩個小時檢查一次足矣
- 拉取 rss 信息,比如我的
https://rayepeng.net/feed
- 然後添加一個 code 節點,將上文的代碼粘貼過去
點一下刷新 field
然後上方就出現了這些框框,自行編輯
- 第一次啟動工作流
第一次啟動工作裡的時候,由於此時還沒有任何數據,如果你直接運行的話會導致大量歷史消息全部發送出去
於是,建議這樣
在 db 中,手動粘貼下這個值(因為批量發送的時候會對比,如果和 db 裡的一樣就不會再發送了)
好了,現在就大功告成了,不得不說運行速度還是比 n8n 要快多了(