banner
raye~

Raye's Journey

且趁闲身未老,尽放我、些子疏狂。
medium
tg_channel
twitter
github
email
nintendo switch
playstation
steam_profiles

Pipedream Free Automation Workflow Tutorial

biaoti12

In daily use, n8n often crashes, and I'm tired of checking notifications (

Untitled

Although IFTTT is useful, it still feels a bit rudimentary, with the free version supporting a maximum of two. So I searched for another free platform called pipedream (which seems to allow running five workflows simultaneously, but limits daily calls to 100 times, which is still enough).

Overall, pipedream lacks some multifunctionality that n8n often uses, such as:

  • Exporting workflows as JSON for saving
  • Batch processing multiple items (n8n supports running a workflow node for each item in an array)
  • Setting up a workflow is somewhat cumbersome, but it's very geeky and relatively friendly to programmers.

Creating an RSS Workflow#

Here are some experiences I've figured out on my own, which may not be the optimal solution:

  1. The RSS node only needs to provide a URL, which automatically parses the data.

Untitled 1

  1. Deduplicate the data; this requires writing some code, which is a bit cumbersome and will be discussed later.

Untitled 2

  1. Conditional judgment: check if there are values in the deduplicated data; if not, return directly.

Untitled 3

  1. Send a TG message.

Untitled 4

Data Storage and Deduplication#

Pipedream provides a dedicated page for storing data, allowing for a visual view of stored data, which is better than n8n.

Untitled 5

Untitled 6

It is recommended that each workflow has a data storage location, which you can understand as a simple KV cache.

Next, I will explain the deduplication code here, which is essentially similar to that in n8n.

  • The code we need to write is located in the run function; the others do not need to be changed.
  • steps.merge_rss_feeds.$return_value is the JSON data returned by the automatically parsed RSS node.
  • let lastItemId = await this.myDataStore.get("guid"); retrieves the most recent record from the KV store, which is an asynchronous operation and needs to be awaited.
  • The getId function obtains the GUID of each node.
  • newItems stores the latest items to be sent, obtained through looping comparisons.
  • await this.myDataStore.set("guid",getId(firstItem)) overwrites the most recent record.

However, it should be noted that pipedream does not seem to support batch sending; if newItems has multiple items, it can only send one at a time.

export default defineComponent({
  props: {
    myDataStore: { type: "data_store" },
  },
  async run({ steps, $ }) {
    let items = steps.merge_rss_feeds.$return_value
    let lastItemId = await this.myDataStore.get("guid");
    let firstItem = items[0]
    let newItems = [];
    function getId(item) {
      return item.guid;
    }

    if (lastItemId) {
      for (const item of items) {
        if (getId(item) === lastItemId) {
          break;
        }
        newItems.push(item)
      }
    } else {
      newItems = [firstItem]
    }

    await this.myDataStore.set("guid",getId(firstItem))
    return newItems.reverse() // Send older messages first, then the latest
  },
})

Solving Pipedream's Inability to Send Messages in Batches#

Originally, I just wanted to solve the issue of not being able to send messages in batches, so I thought of using custom code to loop through and send messages, resulting in the following code:

import { axios } from "@pipedream/platform"
export default defineComponent({
  props: {
    telegram_bot_api: {
      type: "app",
      app: "telegram_bot_api",
    },
    chat_id: {
      type: "string",
      label: "Chatid",
      default: "@RayeJourney",
    },
  },
  async run({steps, $}) {
    const items = steps.data_stores.$return_value 
    const results = []; // Used to store the result of each message sending operation
    for (const item of items) {
      // Construct the message content to be sent
      const message = `📝 ${item.title} #blog\n\n${item.url}`;

      // Send the message
      const result = await axios($, {
        url: `https://api.telegram.org/bot${this.telegram_bot_api.$auth.token}/sendMessage`,
        method: 'POST',
        headers: {
          'Content-Type': 'application/json',
        },
        data: {
          chat_id: `${this.chatid}`, 
          text: message,
          parse_mode: 'HTML', // Set this parameter if the message content contains HTML
        },
      });

      // Add the result of each sending operation to the results array
      results.push(result.data);
    }

    // Return the results of all sending operations
    return results;
  },
})

This is also a common solution in low-code platforms, where new fields can be added through custom code, such as here where you can add a selection for the TG bot and a field for entering the chat_id.

telegram_bot_api: {
  type: "app",
  app: "telegram_bot_api",
},
chat_id: {
  type: "string",
  label: "Chatid",
  default: "@RayeJourney",
},

As I wrote this, I suddenly realized that I could merge all the steps together… so I simply combined them into one, making it simple and easy to understand without needing to fuss over so much. The final code is as follows:

You only need to change a few points each time:

  • A few required options need to be filled in.
  • The message_template needs to be modified; here I treat the item as a feed and replace it, so the code also needs to be adjusted.
import { axios } from "@pipedream/platform"
export default defineComponent({
  props: {
    my_data_store: { 
      type: "data_store",
      label: "MyDataStore"
    },
    my_data_key: {
      type: "string",
      label: "MyDataKey",
      default: "xlog"
    },
    telegram_bot_api: {
      type: "app",
      app: "telegram_bot_api",
    },
    chat_id: {
      type: "string",
      label: "Chatid",
      default: "@RayeJourney",
    },
    message_template: { // Since message needs to be dynamically filled, changed to message_template
      type: "string",
      label: "Message",
      default: "📝 ${item.title} #blog\n\n${item.link}"
    }
  },
  async run({steps, $}) {
    const items = steps.merge_rss_feeds.$return_value // Directly get the RSS pulled information, the latest is at the front
    // Get the db
    let lastItem = await this.my_data_store.get(`${this.my_data_key}`) // Ensure using the correct data storage access method
    
    const results = []; // Move to the top of the run method
    let newItem = []; // Store the messages to be sent
    const getId = (item) => {
      return item.link;
    };

    const sendMsg = async (item) => { // Ensure sendMsg is asynchronous
      // Dynamically construct the message content
      const message = this.message_template.replace('${item.title}', item.title).replace('${item.link}', item.link);
      try {
        const result = await axios($, {
          url: `https://api.telegram.org/bot${this.telegram_bot_api.$auth.token}/sendMessage`,
          method: 'POST',
          headers: {
            'Content-Type': 'application/json',
          },
          data: {
            chat_id: this.chat_id, // Directly use this.chat_id
            text: message,
            parse_mode: 'HTML',
          },
        });
        results.push(result.data); // Add result to results array
      } catch (error) {
        console.error("Error sending message:", error);
        // Handle errors, e.g., add error information to results
        results.push({error: "Failed to send message", details: error.toString()});
      }
    };

    // Ensure the traversal logic correctly handles the latest and oldest items
    for(const item of items){ // Must not prioritize processing the oldest; always start from the latest, as it will also add the old ones..
      if(lastItem && getId(item) === getId(lastItem)){
        break; // Stop processing if the last processed item is found
      }
      newItem.push(item)
    }

    for(const item of newItem.reverse()){
      await sendMsg(item); // Wait to send the message
    } 
    // Update the lastItem in the data store to the latest item processed this time
    if (items.length > 0) {
      await this.my_data_store.set(this.my_data_key, items[0]); // Assume items[0] is the latest item
    }

    // Return the results of all sending operations
    return results;
  },
})

Final Deployment Example#

If you happen to obtain links through RSSHub and want to push them to TG, it's very simple (by the way, since you've already used RSSHub, why not just use RSS to TG? 🤣)

Taking the example of pushing xlog's blog, you only need three parts now:

  1. Trigger time, checking every two hours is sufficient.
  2. Pull RSS information, for example, my https://rayepeng.net/feed.
  3. Then add a code node and paste the code from above.

Click to refresh the field.

Untitled 7

Then the boxes will appear above, and you can edit them yourself.

Untitled 8

  1. First time starting the workflow

When starting the workflow for the first time, since there is no data at this point, if you run it directly, it will cause a large number of historical messages to be sent out.

So, it is recommended to do this:

Untitled 9

Manually paste this value in the DB (because during batch sending, it will compare; if it matches the one in the DB, it will not send again).

Untitled 10

Alright, now it's all done. I must say the running speed is much faster than n8n (

Loading...
Ownership of this post data is guaranteed by blockchain and smart contracts to the creator alone.