banner
raye~

Raye's Journey

且趁闲身未老,尽放我、些子疏狂。
medium
tg_channel
twitter
github
email
nintendo switch
playstation
steam_profiles

pipedream免費自動化工作流使用教程

biaoti12

日常的 n8n 其實經常要崩,看通知我都看煩了(

Untitled

IFTTT 雖然好用,但還是稍顯簡陋,免費版最多支持兩個,於是搜索了下還有一個免費的平台叫 pipedream(貌似可以同時運行 5 個工作流,只是限制了每天調用 100 次,不過這也夠了)

整體體驗下來,pipedream 還是缺失了還能多功能的,比如 n8n 經常會用到的:

  • 導出工作流為 json 保存
  • 批量處理多個 item(n8n 是可以支持針對一個數組裡的每個 item 都運行一遍工作流節點的)
  • 搭建一個工作流略微繁瑣,但是很 geek,對程序員還比較友好

創建一個 rss 工作流#

這裡就純粹靠自己摸索出來的經驗了,不一定是最優解:

  1. rss 的節點,只需要提供一個 url,自動解析其中的數據

Untitled 1

  1. 對數據去重,這裡需要寫點代碼,比較繁瑣,放到後面專門講

Untitled 2

  1. 條件判斷,針對去重後的數據判斷是否有值,如果沒有直接返回

Untitled 3

  1. 發送 tg 消息

Untitled 4

數據存儲及去重#

pipedream 提供了一個專門用於存儲數據的頁面,可以可視化地查看存儲的數據,這一點比 n8n 要好

Untitled 5

Untitled 6

推薦每個工作流都有一個數據存儲位置,你可以將其理解為一個簡單的 kv 緩存

接下來解釋下我這裡去重的代碼,本質上和 n8n 裡的很相似

  • 我們要編寫的代碼位於 run 函數中,其他的不用動
  • steps.merge_rss_feeds.$return_value 是自動解析 rss 節點返回的 json 數據
  • let lastItemId = await this.myDataStore.get("guid"); 去 kv 中取出最近一次的記錄,是個異步操作,需要加 await
  • getId 函數就是獲得每個節點的 guid
  • newItems 存儲要發送的最新 item,是通過循環比對獲得的
  • await this.myDataStore.set("guid",getId(firstItem)) 覆蓋最近一次的記錄

不過要注意的是,pipedream 似乎不支持批量發送,即如果 newItems 有多個,也只能發送

export default defineComponent({
  props: {
    myDataStore: { type: "data_store" },
  },
  async run({ steps, $ }) {
    let items = steps.merge_rss_feeds.$return_value
    let lastItemId = await this.myDataStore.get("guid");
    let firstItem = items[0]
    let newItems = [];
    function getId(item) {
      return item.guid;
    }

    if (lastItemId) {
      for (const item of items) {
        if (getId(item) === lastItemId) {
          break;
        }
        newItems.push(item)
      }
    } else {
      newItems = [firstItem]
    }

    await this.myDataStore.set("guid",getId(firstItem))
    return newItems.reverse() // 先發送舊的消息,再發送最新的
  },
})

解決 pipedream 無法批量發送消息#

原本只是為了解決無法批量發送消息的問題,我想到直接用自定義代碼的方式來循環發送就好了,於是寫出了如下代碼:

import { axios } from "@pipedream/platform"
export default defineComponent({
  props: {
    telegram_bot_api: {
      type: "app",
      app: "telegram_bot_api",
    },
    chat_id: {
      type: "string",
      label: "Chatid",
      default: "@RayeJourney",
    },
  },
  async run({steps, $}) {
    const items = steps.data_stores.$return_value 
    const results = []; // 用於存儲每個消息發送操作的結果
    for (const item of items) {
      // 構造要發送的消息內容
      const message = `📝 ${item.title} #blog\n\n${item.url}`;

      // 發送消息
      const result = await axios($, {
        url: `https://api.telegram.org/bot${this.telegram_bot_api.$auth.token}/sendMessage`,
        method: 'POST',
        headers: {
          'Content-Type': 'application/json',
        },
        data: {
          chat_id: `${this.chatid}`, 
          text: message,
          parse_mode: 'HTML', // 如果消息內容包含 HTML,可以設置這個參數
        },
      });

      // 將每次發送操作的結果添加到結果數組中
      results.push(result.data);
    }

    // 返回所有發送操作的結果
    return results;
  },
})

這也是低代碼平台常見的解決方案了,通過在代碼中自定義的方式來增加新的 field,比如這裡就可以給當前節點增加選擇 tg 機器人,以及一個填寫 chat_id 的 field

telegram_bot_api: {
  type: "app",
  app: "telegram_bot_api",
},
chat_id: {
  type: "string",
  label: "Chatid",
  default: "@RayeJourney",
},

寫到這裡,我突然意識到可以直接把所有步驟合併到一起了…,索性幹脆合併到一起寫得了,簡單易懂還不需要折騰那麼多,最終代碼如下:

每次只需要改動幾個點:

  • 幾個必選項是需要填的
  • message_template 需要修改下,我這裡是把 item 當做 feed,然後 replace 替換的,所以代碼中也需要做下修改
import { axios } from "@pipedream/platform"
export default defineComponent({
  props: {
    my_data_store: { 
      type: "data_store",
      label: "MyDataStore"
    },
    my_data_key: {
      type: "string",
      label: "MyDataKey",
      default: "xlog"
    },
    telegram_bot_api: {
      type: "app",
      app: "telegram_bot_api",
    },
    chat_id: {
      type: "string",
      label: "Chatid",
      default: "@RayeJourney",
    },
    message_template: { // 由于 message 需要動態填充,改為 message_template
      type: "string",
      label: "Message",
      default: "📝 ${item.title} #blog\n\n${item.link}"
    }
  },
  async run({steps, $}) {
    const items = steps.merge_rss_feeds.$return_value // 直接獲取rss拉到的信息,最新的在最前面
    // 獲取db
    let lastItem = await this.my_data_store.get(`${this.my_data_key}`) // 確保使用正確的數據存儲訪問方式
    
    const results = []; // 移動到 run 方法的頂部
    let newItem = []; // 存儲要發送的消息
    const getId = (item) => {
      return item.link;
    };

    const sendMsg = async (item) => { // 確保 sendMsg 是異步的
      // 動態構造消息內容
      const message = this.message_template.replace('${item.title}', item.title).replace('${item.link}', item.link);
      try {
        const result = await axios($, {
          url: `https://api.telegram.org/bot${this.telegram_bot_api.$auth.token}/sendMessage`,
          method: 'POST',
          headers: {
            'Content-Type': 'application/json',
          },
          data: {
            chat_id: this.chat_id, // 直接使用 this.chat_id
            text: message,
            parse_mode: 'HTML',
          },
        });
        results.push(result.data); // 添加結果到 results 數組
      } catch (error) {
        console.error("Error sending message:", error);
        // 處理錯誤,例如添加錯誤信息到 results
        results.push({error: "Failed to send message", details: error.toString()});
      }
    };

    // 確保遍歷的邏輯正確處理最新和最舊的項
    for(const item of items){ // 不能優先處理最老的,一定要從最新的開始,因為會把舊的也加進來..
      if(lastItem && getId(item) === getId(lastItem)){
        break; // 如果找到了上次處理的最後一個項,則停止處理
      }
      newItem.push(item)
    }

    for(const item of newItem.reverse()){
      await sendMsg(item); // 等待發送消息
    } 
    // 更新數據存儲中的 lastItem 為本次處理的最新項
    if (items.length > 0) {
      await this.my_data_store.set(this.my_data_key, items[0]); // 假設 items[0] 是最新的項
    }

    // 返回所有發送操作的結果
    return results;
  },
})

最終的部署舉例#

如果你恰巧是通過 rsshub 獲取的鏈接,然後想推送到 tg,那麼就很簡單了(話說都已經用 rsshub,為啥不直接用 rss to tg 呢🤣)

以 xlog 的博客推送舉例,現在只需要三個部分:

  1. 觸發時間,兩個小時檢查一次足矣
  2. 拉取 rss 信息,比如我的 https://rayepeng.net/feed
  3. 然後添加一個 code 節點,將上文的代碼粘貼過去

點一下刷新 field

Untitled 7

然後上方就出現了這些框框,自行編輯
Untitled 8

  1. 第一次啟動工作流

第一次啟動工作裡的時候,由於此時還沒有任何數據,如果你直接運行的話會導致大量歷史消息全部發送出去

於是,建議這樣

Untitled 9

在 db 中,手動粘貼下這個值(因為批量發送的時候會對比,如果和 db 裡的一樣就不會再發送了)

Untitled 10

好了,現在就大功告成了,不得不說運行速度還是比 n8n 要快多了(

載入中......
此文章數據所有權由區塊鏈加密技術和智能合約保障僅歸創作者所有。